How to Flash New Firmware on Multiple Sensors Programmatically

How to Flash New Firmware on Multiple Sensors Programmatically

This article describes the process and script to update firmware on multiple Ouster Sensors.

Instructions

  1. Download/Copy Script (autoflash.py)

  2. Install any dependent python packages listed in the imports.

  3. Add permission to execute the script:

    • $ sudo chmod 755 /path/to/autoflash.py

  4. Run the script from the folder it is located in:

    • $ python3 autoflash.py /path/to/ousteros-image.img [host [host …]]

      • where [host [host …]] is a list of hosts you would like to update the firmware on

        • 192.0.2.0 192.0.2.1 192.0.2.3

      • Example:

        • $ python3 autoflash.py /path/to/ousteros-image.img 192.0.2.0 192.0.2.1 192.0.2.3

Download autoflash.py:

autoflash.py (RAW):

#!/usr/bin/env python3 import argparse import json import os import queue import requests import sys import threading import time def post_firmware_image(host, image): with open(image, 'rb') as image_file: print('{}: flashing {}'.format(host, image_file.name)) res = requests.post(url='http://{}/api/v1/system/firmware'.format(host), data=image_file, headers={'Content-Type': 'application/octet-stream'}) print('{}: result {}'.format(host, res)) return res.ok def get_firmware_version(host): r = requests.get(url='http://{}/api/v1/system/firmware'.format(host)) d = r.json() fw = d.get('fw', None) # 'fw' key moved to 'response', handle both if 'response' in d: fw = d['response'].get('fw', None) return fw def parse_firmware_image_version(image): (root, ext) = os.path.splitext(os.path.basename(image)) return root def upgrade_firmware(host, image, force=False): image_version = parse_firmware_image_version(image) remote_version = None try: remote_version = get_firmware_version(host) except: print('{}: Problem connecting, skipping.'.format(host)) return False if not force and remote_version == image_version: print('Remote version matches new image version "{}"'.format(remote_version)) return True # Expect request library to raise exception if failed, don't catch here # let propagate up. ok = post_firmware_image(host, image) # Wait for system to reboot time.sleep(30) for i in range(0, 30): try: remote_version = get_firmware_version(host) except: # Expect connection errors while rebooting time.sleep(1) print('{}: Firmware "{}"'.format(host, remote_version)) ok = True if remote_version == image_version else False return ok def upgrade_thread(qin, qout, force=False): while True: d = qin.get() if d == None: break # Attempt to upgrade firmware ok = upgrade_firmware(d['host'], d['image_name'], force) # Output the worker result out = {'host': d['host'], 'ok': ok} qout.put(out) # Mark this upgrade as done qin.task_done() # # Function to flash several hosts in parallel and report status to caller # def autoflash(hosts, image, force = False, thread_max = 5): qin = queue.Queue() qout = queue.Queue() for host in hosts: qin.put({'host': host, 'image_name': image}) thread_cnt = min(len(hosts), thread_max) threads = [] for i in range(thread_cnt): t = threading.Thread(name="UpgradeThread-"+str(i), target=upgrade_thread, args=(qin,qout, force)) t.start() threads.append(t) # Wait for all remote devices to be updated qin.join() # Break out of worker thread while loop [qin.put(None) for t in threads] # Wait for threads to cleanly exit [t.join() for t in threads] results = [] while not qout.empty(): d = qout.get() results += [d] return results if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('image', help='Image file to flash') parser.add_argument('hosts', help='Hosts to flash', nargs='*') parser.add_argument('-f', '--force', help="Force update", action='store_true') args = parser.parse_args() results = autoflash(args.hosts, args.image, args.force) print('Results: {}'.format(results)) all_ok = all(r['ok'] for r in results) print('Result Ok: {}'.format(all_ok)) sys.exit(0 if all_ok else 1)

 

Related articles