How to Flash New Firmware on Multiple Sensors Programmatically
This article describes the process and script to update firmware on multiple Ouster Sensors.
Instructions
Download/Copy Script (autoflash.py)
Install any dependent python packages listed in the imports.
Add permission to execute the script:
$ sudo chmod 755 /path/to/autoflash.py
Run the script from the folder it is located in:
$ python3 autoflash.py /path/to/ousteros-image.img [host [host …]]
where [host [host …]] is a list of hosts you would like to update the firmware on
192.0.2.0 192.0.2.1 192.0.2.3
Example:
$ python3 autoflash.py /path/to/ousteros-image.img 192.0.2.0 192.0.2.1 192.0.2.3
Download autoflash.py:
autoflash.py (RAW):
#!/usr/bin/env python3
import argparse
import json
import os
import queue
import requests
import sys
import threading
import time
def post_firmware_image(host, image):
with open(image, 'rb') as image_file:
print('{}: flashing {}'.format(host, image_file.name))
res = requests.post(url='http://{}/api/v1/system/firmware'.format(host),
data=image_file,
headers={'Content-Type': 'application/octet-stream'})
print('{}: result {}'.format(host, res))
return res.ok
def get_firmware_version(host):
r = requests.get(url='http://{}/api/v1/system/firmware'.format(host))
d = r.json()
fw = d.get('fw', None)
# 'fw' key moved to 'response', handle both
if 'response' in d:
fw = d['response'].get('fw', None)
return fw
def parse_firmware_image_version(image):
(root, ext) = os.path.splitext(os.path.basename(image))
return root
def upgrade_firmware(host, image, force=False):
image_version = parse_firmware_image_version(image)
remote_version = None
try:
remote_version = get_firmware_version(host)
except:
print('{}: Problem connecting, skipping.'.format(host))
return False
if not force and remote_version == image_version:
print('Remote version matches new image version "{}"'.format(remote_version))
return True
# Expect request library to raise exception if failed, don't catch here
# let propagate up.
ok = post_firmware_image(host, image)
# Wait for system to reboot
time.sleep(30)
for i in range(0, 30):
try:
remote_version = get_firmware_version(host)
except:
# Expect connection errors while rebooting
time.sleep(1)
print('{}: Firmware "{}"'.format(host, remote_version))
ok = True if remote_version == image_version else False
return ok
def upgrade_thread(qin, qout, force=False):
while True:
d = qin.get()
if d == None:
break
# Attempt to upgrade firmware
ok = upgrade_firmware(d['host'], d['image_name'], force)
# Output the worker result
out = {'host': d['host'], 'ok': ok}
qout.put(out)
# Mark this upgrade as done
qin.task_done()
#
# Function to flash several hosts in parallel and report status to caller
#
def autoflash(hosts, image, force = False, thread_max = 5):
qin = queue.Queue()
qout = queue.Queue()
for host in hosts:
qin.put({'host': host, 'image_name': image})
thread_cnt = min(len(hosts), thread_max)
threads = []
for i in range(thread_cnt):
t = threading.Thread(name="UpgradeThread-"+str(i),
target=upgrade_thread,
args=(qin,qout, force))
t.start()
threads.append(t)
# Wait for all remote devices to be updated
qin.join()
# Break out of worker thread while loop
[qin.put(None) for t in threads]
# Wait for threads to cleanly exit
[t.join() for t in threads]
results = []
while not qout.empty():
d = qout.get()
results += [d]
return results
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('image', help='Image file to flash')
parser.add_argument('hosts', help='Hosts to flash', nargs='*')
parser.add_argument('-f', '--force', help="Force update", action='store_true')
args = parser.parse_args()
results = autoflash(args.hosts, args.image, args.force)
print('Results: {}'.format(results))
all_ok = all(r['ok'] for r in results)
print('Result Ok: {}'.format(all_ok))
sys.exit(0 if all_ok else 1)