| import json |
| import sys |
| from time import localtime, sleep, ticks_diff, ticks_ms |
|
|
| import uos |
| from data_logging import ( |
| get_local_timestamp, |
| get_onboard_temperature, |
| write_payload_backup, |
| ) |
| from machine import PWM, Pin |
| from ufastrsa.genprime import genrsa |
| from ufastrsa.rsa import RSA |
| from uio import StringIO |
|
|
|
|
| def beep(buzzer, power=0.005): |
| buzzer.freq(300) |
| buzzer.duty_u16(round(65535 * power)) |
| sleep(0.15) |
| buzzer.duty_u16(0) |
|
|
|
|
| def get_traceback(err): |
| try: |
| with StringIO() as f: |
| sys.print_exception(err, f) |
| return f.getvalue() |
| except Exception as err2: |
| print(err2) |
| return f"Failed to extract file and line number due to {err2}.\nOriginal error: {err}" |
|
|
|
|
| def merge_two_dicts(x, y): |
| z = x.copy() |
| z.update(y) |
| return z |
|
|
|
|
| def path_exists(path): |
| |
| |
| parent = "" |
| name = path |
|
|
| |
| index = path.rstrip("/").rfind("/") |
| if index >= 0: |
| index += 1 |
| parent = path[: index - 1] |
| name = path[index:] |
|
|
| |
| |
| return any((name == x[0]) for x in uos.ilistdir(parent)) |
|
|
|
|
| def encrypt_id(my_id, verbose=False): |
| rsa_path = "rsa.json" |
| |
| try: |
| with open(rsa_path, "r") as f: |
| cipher_data = json.load(f) |
| cipher = RSA( |
| cipher_data["bits"], |
| n=cipher_data["n"], |
| e=cipher_data["e"], |
| d=cipher_data["d"], |
| ) |
| except (KeyError, OSError) as e: |
| print(e) |
| print("Generating new RSA parameters...") |
| bits = 256 |
| bits, n, e, d = genrsa(bits, e=65537) |
| cipher = RSA(bits, n=n, e=e, d=d) |
| with open("rsa.json", "w") as f: |
| json.dump(dict(bits=bits, n=n, e=e, d=d), f) |
|
|
| if verbose: |
| with open(rsa_path, "r") as f: |
| cipher_data = json.load(f) |
| print("RSA parameters (keep private):") |
| print(cipher_data) |
|
|
| my_id = int.from_bytes(cipher.pkcs_encrypt(my_id), "big") |
| return my_id |
|
|
|
|
| def decrypt_id(my_id): |
| rsa_path = "rsa.json" |
| if path_exists(rsa_path): |
| with open(rsa_path, "r") as f: |
| cipher_data = json.load(f) |
| cipher = RSA( |
| cipher_data["bits"], |
| n=cipher_data["n"], |
| e=cipher_data["e"], |
| d=cipher_data["d"], |
| ) |
| else: |
| bits = 256 |
| bits, n, e, d = genrsa(bits, e=65537) |
| cipher = RSA(bits, n=n, e=e, d=d) |
| with open("rsa.json", "w") as f: |
| json.dump(dict(bits=bits, n=n, e=e, d=d), f) |
|
|
| my_id = int.from_bytes(cipher.pkcs_decrypt(my_id), "big") |
| return my_id |
|
|
|
|
| def get_onboard_led(): |
| try: |
| onboard_led = Pin("LED", Pin.OUT) |
| except Exception as e: |
| print(e) |
| onboard_led = Pin(25, Pin.OUT) |
| return onboard_led |
|
|
|
|
| class Experiment(object): |
| def __init__( |
| self, |
| run_experiment_fn, |
| devices, |
| reset_experiment_fn=None, |
| validate_inputs_fn=None, |
| emergency_shutdown_fn=None, |
| buzzer=None, |
| sdcard_ready=False, |
| ) -> None: |
| self.validate_inputs_fn = validate_inputs_fn |
| self.run_experiment_fn = run_experiment_fn |
| self.reset_experiment_fn = reset_experiment_fn |
| self.devices = devices |
| self.emergency_shutdown_fn = emergency_shutdown_fn |
| self.buzzer = buzzer |
| self.sdcard_ready = sdcard_ready |
|
|
| if self.reset_experiment_fn is None: |
|
|
| def do_nothing(*args, **kwargs): |
| pass |
|
|
| self.reset_experiment_fn = do_nothing |
|
|
| if self.emergency_shutdown_fn is None: |
| self.emergency_shutdown_fn = self.reset_experiment_fn |
|
|
| if self.validate_inputs_fn is None: |
|
|
| def no_input_validation(*args, **kwargs): |
| return True |
|
|
| self.validate_inputs_fn = no_input_validation |
|
|
| if self.buzzer is None: |
| self.buzzer = PWM(Pin(18)) |
|
|
| def try_experiment(self, msg): |
| payload_data = {} |
| |
| |
|
|
| print(msg) |
|
|
| |
| |
| try: |
| parameters = json.loads(msg) |
| payload_data["_input_message"] = parameters |
|
|
| |
| self.validate_inputs_fn(parameters) |
|
|
| beep(self.buzzer) |
| sensor_data = self.run_experiment_fn(parameters, self.devices) |
| payload_data = merge_two_dicts(payload_data, sensor_data) |
|
|
| except Exception as err: |
| print(err) |
| if "_input_message" not in payload_data.keys(): |
| payload_data["_input_message"] = msg |
| payload_data["error"] = get_traceback(err) |
|
|
| try: |
| payload_data["onboard_temperature_K"] = get_onboard_temperature(unit="K") |
| payload_data["sd_card_ready"] = self.sdcard_ready |
| stamp, time_str = get_local_timestamp(return_str=True) |
| payload_data["utc_timestamp"] = stamp |
| payload_data["utc_time_str"] = time_str |
| except OverflowError as e: |
| print(get_traceback(e)) |
| except Exception as e: |
| print(get_traceback(e)) |
|
|
| try: |
| parameters = json.loads(msg) |
| self.reset_experiment_fn(parameters, devices=self.devices) |
| except Exception as e: |
| try: |
| self.emergency_shutdown_fn(devices=self.devices) |
| payload_data["reset_error"] = get_traceback(e) |
| except Exception as e: |
| payload_data["emergency_error"] = get_traceback(e) |
|
|
| return payload_data |
|
|
| def write_to_sd_card(self, payload_data, fpath="/sd/experiments.txt"): |
| try: |
| write_payload_backup(payload_data, fpath=fpath) |
| except Exception as e: |
| w = f"Failed to write to SD card: {get_traceback(e)}" |
| print(w) |
| payload_data["warning"] = w |
|
|
| return payload_data |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| def heartbeat(client, first, ping_interval_ms=15000): |
| global lastping |
| if first: |
| client.ping() |
| lastping = ticks_ms() |
| if ticks_diff(ticks_ms(), lastping) >= ping_interval_ms: |
| client.ping() |
| lastping = ticks_ms() |
| return |
|
|
|
|
| def sign_of_life(led, first, blink_interval_ms=5000): |
| global last_blink |
| if first: |
| led.on() |
| last_blink = ticks_ms() |
| time_since = ticks_diff(ticks_ms(), last_blink) |
| if led.value() == 0 and time_since >= blink_interval_ms: |
| led.toggle() |
| last_blink = ticks_ms() |
| elif led.value() == 1 and time_since >= 500: |
| led.toggle() |
| last_blink = ticks_ms() |
|
|
|
|
| class DummyMotor: |
| def __init__(self): |
| pass |
|
|
|
|
| class DummySensor: |
| def __init__(self): |
| pass |
|
|