How to Flash New Firmware on Multiple Sensors Programmatically

This article describes the process and script to update firmware on multiple Ouster Sensors.

Instructions

  1. Download/Copy Script (autoflash.py)

  2. Install any dependent python packages listed in the imports.

  3. Add permission to execute the script:

    • $ sudo chmod 755 /path/to/autoflash.py

  4. Run the script from the folder it is located in:

    • $ python3 autoflash.py /path/to/ousteros-image.img [host [host …]]

      • where [host [host …]] is a list of hosts you would like to update the firmware on

        • 192.0.2.0 192.0.2.1 192.0.2.3

      • Example:

        • $ python3 autoflash.py /path/to/ousteros-image.img 192.0.2.0 192.0.2.1 192.0.2.3

Download autoflash.py:

autoflash.py (RAW):

#!/usr/bin/env python3 import argparse import json import os import queue import requests import sys import threading import time def post_firmware_image(host, image): with open(image, 'rb') as image_file: print('{}: flashing {}'.format(host, image_file.name)) res = requests.post(url='http://{}/api/v1/system/firmware'.format(host), data=image_file, headers={'Content-Type': 'application/octet-stream'}) print('{}: result {}'.format(host, res)) return res.ok def get_firmware_version(host): r = requests.get(url='http://{}/api/v1/system/firmware'.format(host)) d = r.json() fw = d.get('fw', None) # 'fw' key moved to 'response', handle both if 'response' in d: fw = d['response'].get('fw', None) return fw def parse_firmware_image_version(image): (root, ext) = os.path.splitext(os.path.basename(image)) return root def upgrade_firmware(host, image, force=False): image_version = parse_firmware_image_version(image) remote_version = None try: remote_version = get_firmware_version(host) except: print('{}: Problem connecting, skipping.'.format(host)) return False if not force and remote_version == image_version: print('Remote version matches new image version "{}"'.format(remote_version)) return True # Expect request library to raise exception if failed, don't catch here # let propagate up. ok = post_firmware_image(host, image) # Wait for system to reboot time.sleep(30) for i in range(0, 30): try: remote_version = get_firmware_version(host) except: # Expect connection errors while rebooting time.sleep(1) print('{}: Firmware "{}"'.format(host, remote_version)) ok = True if remote_version == image_version else False return ok def upgrade_thread(qin, qout, force=False): while True: d = qin.get() if d == None: break # Attempt to upgrade firmware ok = upgrade_firmware(d['host'], d['image_name'], force) # Output the worker result out = {'host': d['host'], 'ok': ok} qout.put(out) # Mark this upgrade as done qin.task_done() # # Function to flash several hosts in parallel and report status to caller # def autoflash(hosts, image, force = False, thread_max = 5): qin = queue.Queue() qout = queue.Queue() for host in hosts: qin.put({'host': host, 'image_name': image}) thread_cnt = min(len(hosts), thread_max) threads = [] for i in range(thread_cnt): t = threading.Thread(name="UpgradeThread-"+str(i), target=upgrade_thread, args=(qin,qout, force)) t.start() threads.append(t) # Wait for all remote devices to be updated qin.join() # Break out of worker thread while loop [qin.put(None) for t in threads] # Wait for threads to cleanly exit [t.join() for t in threads] results = [] while not qout.empty(): d = qout.get() results += [d] return results if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('image', help='Image file to flash') parser.add_argument('hosts', help='Hosts to flash', nargs='*') parser.add_argument('-f', '--force', help="Force update", action='store_true') args = parser.parse_args() results = autoflash(args.hosts, args.image, args.force) print('Results: {}'.format(results)) all_ok = all(r['ok'] for r in results) print('Result Ok: {}'.format(all_ok)) sys.exit(0 if all_ok else 1)