diff --git a/arduino_alvik/__init__.py b/arduino_alvik/__init__.py index dbdfc6a..00a67aa 100644 --- a/arduino_alvik/__init__.py +++ b/arduino_alvik/__init__.py @@ -4,8 +4,8 @@ __author__ = "Lucio Rossi , Giovanni Bruno " __license__ = "MPL 2.0" -__version__ = "1.1.2" +__version__ = "1.1.3" __maintainer__ = "Lucio Rossi , Giovanni Bruno " -__required_firmware_version__ = "1.1.0" +__required_firmware_version__ = "1.1.1" from .arduino_alvik import * diff --git a/arduino_alvik/arduino_alvik.py b/arduino_alvik/arduino_alvik.py index e80674f..0c11b77 100644 --- a/arduino_alvik/arduino_alvik.py +++ b/arduino_alvik/arduino_alvik.py @@ -1,4 +1,3 @@ -import sys import struct from machine import I2C import _thread @@ -16,12 +15,72 @@ from .__init__ import __required_firmware_version__ +def writes_uart(method): + def wrapper(*args, **kwargs): + with ArduinoAlvik._write_lock: + return method(*args, **kwargs) + + return wrapper + + +def reads_uart(method): + def wrapper(*args, **kwargs): + with ArduinoAlvik._read_lock: + return method(*args, **kwargs) + + return wrapper + + +class _AlvikRLock: + def __init__(self): + """Alvik re-entrant Lock implementation""" + self._lock = _thread.allocate_lock() + self._owner = None + self._count = 0 + + def acquire(self): + tid = _thread.get_ident() + + if self._owner == tid: + self._count += 1 + return True + + self._lock.acquire() + self._owner = tid + self._count = 1 + return True + + def release(self): + tid = _thread.get_ident() + + if self._owner != tid: + raise RuntimeError("Cannot release an unowned lock") + + self._count -= 1 + if self._count == 0: + self._owner = None + self._lock.release() + + def locked(self): + return self._lock.locked() + + def __enter__(self): + self.acquire() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.release() + + class ArduinoAlvik: _update_thread_running = False _update_thread_id = None _events_thread_running = False _events_thread_id = None + _write_lock = _AlvikRLock() + _read_lock = _AlvikRLock() + def __new__(cls): if not hasattr(cls, '_instance'): cls._instance = super(ArduinoAlvik, cls).__new__(cls) @@ -107,16 +166,20 @@ def _print_battery_status(percentage: float, is_charging) -> None: :param is_charging: True if the battery is charging :return: """ - sys.stdout.write(bytes('\r'.encode('utf-8'))) + print("\033[2K\033[1G", end='\r') if percentage > 97: marks_str = ' \U0001F50B' else: marks_str = ' \U0001FAAB' - charging_str = f' \U0001F50C' if is_charging else ' \U000026A0WARNING: battery is discharging!' - word = marks_str + f" {percentage}%" + charging_str + " \t" - sys.stdout.write(bytes((word.encode('utf-8')))) + if is_charging: + charging_str = ' \U0001F50C ' + else: + charging_str = ' \U000026A0 WARNING: battery is discharging!' + word = marks_str + f" {percentage}% {charging_str} \t" + print(word, end='') - def _lenghty_op(self, iterations=10000000) -> int: + @staticmethod + def _lengthy_op(self, iterations=10000000) -> int: result = 0 for i in range(1, iterations): result += i * i @@ -131,7 +194,7 @@ def _idle(self, delay_=1, check_on_thread=False, blocking=False) -> None: self.i2c.set_single_thread(True) if blocking: - self._lenghty_op(50000) + self._lengthy_op(50000) else: sleep_ms(500) led_val = 0 @@ -154,7 +217,7 @@ def _idle(self, delay_=1, check_on_thread=False, blocking=False) -> None: self._battery_perc = abs(soc_perc) self._print_battery_status(round(soc_perc), self._battery_is_charging) if blocking: - self._lenghty_op(10000) + self._lengthy_op(10000) else: sleep_ms(delay_) if soc_perc > 97: @@ -166,13 +229,16 @@ def _idle(self, delay_=1, check_on_thread=False, blocking=False) -> None: led_val = (led_val + 1) % 2 self.i2c.set_single_thread(False) if self.is_on(): - print("********** Alvik is on **********") - except KeyboardInterrupt: + print("\n********** Alvik is on **********") + except KeyboardInterrupt as e: self.stop() - sys.exit() + raise e + except OSError as e: + print(f'\nUnable to read SOC: {e}') + raise e except Exception as e: - pass - print(f'Unable to read SOC: {e}') + print(f'\nUnhandled exception: {e} {type(e)}') + raise e finally: LEDR.value(1) LEDG.value(1) @@ -194,18 +260,14 @@ def _snake_robot(duration: int = 1000): frame = '' for i in range(0, cycles): - sys.stdout.write(bytes('\r'.encode('utf-8'))) + print("\033[2K\033[1G", end='\r') pre = ' ' * i between = ' ' * (i % 2 + 1) post = ' ' * 5 frame = pre + snake + between + robot + post - sys.stdout.write(bytes(frame.encode('utf-8'))) + print(frame, end='') sleep_ms(200) - sys.stdout.write(bytes('\r'.encode('utf-8'))) - clear_frame = ' ' * len(frame) - sys.stdout.write(bytes(clear_frame.encode('utf-8'))) - def begin(self) -> int: """ Begins all Alvik operations @@ -225,15 +287,15 @@ def begin(self) -> int: self._snake_robot(1000) self._wait_for_ack() if not self._wait_for_fw_check(): - print('\n********** PLEASE UPDATE ALVIK FIRMWARE (required: '+'.'.join(map(str,self._required_fw_version))+')! Check documentation **********\n') - sys.exit(-2) + self.stop() + raise Exception('\n********** PLEASE UPDATE ALVIK FIRMWARE (required: '+'.'.join(map(str,self._required_fw_version))+')! Check documentation **********\n') self._snake_robot(2000) self.set_illuminator(True) self.set_behaviour(1) self.set_behaviour(2) self._set_color_reference() if self._has_events_registered(): - print('Starting events thread') + print('\n********** Starting events thread **********\n') self._start_events_thread() self.set_servo_positions(90, 90) return 0 @@ -280,6 +342,7 @@ def _wait_for_fw_check(self, timeout=5) -> bool: return False @staticmethod + @reads_uart def _flush_uart(): """ Empties the UART buffer @@ -314,6 +377,7 @@ def _wait_for_target(self, idle_time): # print(self._last_ack) sleep_ms(100) + @writes_uart def is_target_reached(self) -> bool: """ Returns True if robot has sent an M or R acknowledgment. @@ -331,6 +395,7 @@ def is_target_reached(self) -> bool: return True return False + @writes_uart def set_behaviour(self, behaviour: int): """ Sets the behaviour of Alvik @@ -340,6 +405,7 @@ def set_behaviour(self, behaviour: int): self._packeter.packetC1B(ord('B'), behaviour & 0xFF) uart.write(self._packeter.msg[0:self._packeter.msg_size]) + @writes_uart def rotate(self, angle: float, unit: str = 'deg', blocking: bool = True): """ Rotates the robot by given angle @@ -356,6 +422,7 @@ def rotate(self, angle: float, unit: str = 'deg', blocking: bool = True): if blocking: self._wait_for_target(idle_time=(angle / MOTOR_CONTROL_DEG_S)) + @writes_uart def move(self, distance: float, unit: str = 'cm', blocking: bool = True): """ Moves the robot by given distance @@ -409,6 +476,7 @@ def get_wheels_speed(self, unit: str = 'rpm') -> (float | None, float | None): """ return self.left_wheel.get_speed(unit), self.right_wheel.get_speed(unit) + @writes_uart def set_wheels_speed(self, left_speed: float, right_speed: float, unit: str = 'rpm'): """ Sets left/right motor speed @@ -428,12 +496,13 @@ def set_wheels_speed(self, left_speed: float, right_speed: float, unit: str = 'r self._packeter.packetC2F(ord('J'), left_speed, right_speed) uart.write(self._packeter.msg[0:self._packeter.msg_size]) + @writes_uart def set_wheels_position(self, left_angle: float, right_angle: float, unit: str = 'deg', blocking: bool = True): """ Sets left/right motor angle :param left_angle: :param right_angle: - :param unit: the speed unit of measurement (default: 'rpm') + :param unit: the speed unit of measurement (default: 'deg') :param blocking: :return: """ @@ -491,6 +560,7 @@ def get_line_sensors(self) -> (int | None, int | None, int | None): return self._left_line, self._center_line, self._right_line + @writes_uart def drive(self, linear_velocity: float, angular_velocity: float, linear_unit: str = 'cm/s', angular_unit: str = 'deg/s'): """ @@ -531,6 +601,7 @@ def get_drive_speed(self, linear_unit: str = 'cm/s', angular_unit: str = 'deg/s' return convert_speed(self._linear_velocity, 'mm/s', linear_unit), angular_velocity + @writes_uart def reset_pose(self, x: float, y: float, theta: float, distance_unit: str = 'cm', angle_unit: str = 'deg'): """ Resets the robot pose @@ -560,6 +631,7 @@ def get_pose(self, distance_unit: str = 'cm', angle_unit: str = 'deg') \ convert_distance(self._y, 'mm', distance_unit), convert_angle(self._theta, 'deg', angle_unit)) + @writes_uart def set_servo_positions(self, a_position: int, b_position: int): """ Sets A/B servomotor angle @@ -587,10 +659,16 @@ def get_ack(self) -> str: """ return self._last_ack - # def send_ack(self): - # self._packeter.packetC1B(ord('X'), ACK_) - # uart.write(self._packeter.msg[0:self._packeter.msg_size]) + @writes_uart + def send_ack(self, ack: str = 'K'): + """ + Sends an ack message on UART + :return: + """ + self._packeter.packetC1B(ord('X'), ord(ack)) + uart.write(self._packeter.msg[0:self._packeter.msg_size]) + @writes_uart def _set_leds(self, led_state: int): """ Sets the LEDs state @@ -652,6 +730,7 @@ def _update(self, delay_=1): self._read_message() sleep_ms(delay_) + @reads_uart def _read_message(self) -> None: """ Read a message from the uC @@ -829,6 +908,13 @@ def get_shake(self) -> bool: """ return bool(self._move_bits & 0b00000001) + def get_lifted(self) -> bool: + """ + Returns true if Alvik is lifted + :return: + """ + return bool(self._move_bits & 0b00000010) + def get_tilt(self) -> str: """ Returns the tilt string eg: "X", "-Z" etc @@ -1260,6 +1346,24 @@ def on_shake(self, callback: callable, args: tuple = ()) -> None: """ self._move_events.register_callback('on_shake', callback, args) + def on_lift(self, callback: callable, args: tuple = ()) -> None: + """ + Register callback when Alvik is lifted + :param callback: + :param args: + :return: + """ + self._move_events.register_callback('on_lift', callback, args) + + def on_drop(self, callback: callable, args: tuple = ()) -> None: + """ + Register callback when Alvik is dropped + :param callback: + :param args: + :return: + """ + self._move_events.register_callback('on_drop', callback, args) + def on_x_tilt(self, callback: callable, args: tuple = ()) -> None: """ Register callback when Alvik is tilted on X-axis @@ -1522,7 +1626,6 @@ def writeto_mem(self, addr, memaddr, buf, addrsize=8) -> None: return i2c.writeto_mem(addr, memaddr, buf, addrsize=addrsize) - class _ArduinoAlvikServo: def __init__(self, packeter: ucPack, label: str, servo_id: int, position: list[int | None]): @@ -1530,7 +1633,8 @@ def __init__(self, packeter: ucPack, label: str, servo_id: int, position: list[i self._label = label self._id = servo_id self._position = position - + + @writes_uart def set_position(self, position): """ Sets the position of the servo @@ -1559,7 +1663,8 @@ def __init__(self, packeter: ucPack, label: int, wheel_diameter_mm: float = WHEE self._speed = None self._position = None self._alvik = alvik - + + @writes_uart def reset(self, initial_position: float = 0.0, unit: str = 'deg'): """ Resets the wheel reference position @@ -1571,6 +1676,7 @@ def reset(self, initial_position: float = 0.0, unit: str = 'deg'): self._packeter.packetC2B1F(ord('W'), self._label & 0xFF, ord('Z'), initial_position) uart.write(self._packeter.msg[0:self._packeter.msg_size]) + @writes_uart def set_pid_gains(self, kp: float = MOTOR_KP_DEFAULT, ki: float = MOTOR_KI_DEFAULT, kd: float = MOTOR_KD_DEFAULT): """ Set PID gains for Alvik wheels @@ -1590,6 +1696,7 @@ def stop(self): """ self.set_speed(0) + @writes_uart def set_speed(self, velocity: float, unit: str = 'rpm'): """ Sets the motor speed @@ -1626,6 +1733,7 @@ def get_position(self, unit: str = 'deg') -> float | None: """ return convert_angle(self._position, 'deg', unit) + @writes_uart def set_position(self, position: float, unit: str = 'deg', blocking: bool = True): """ Sets left/right motor speed @@ -1656,6 +1764,7 @@ def __init__(self, packeter: ucPack, label: str, led_state: list[int | None], rg self._rgb_mask = rgb_mask self._led_state = led_state + @writes_uart def set_color(self, red: bool, green: bool, blue: bool): """ Sets the LED's r,g,b state @@ -1966,7 +2075,7 @@ class _ArduinoAlvikMoveEvents(_ArduinoAlvikEvents): Event class to handle move events """ - available_events = ['on_shake', 'on_x_tilt', 'on_y_tilt', 'on_z_tilt', + available_events = ['on_shake', 'on_lift', 'on_drop', 'on_x_tilt', 'on_y_tilt', 'on_z_tilt', 'on_nx_tilt', 'on_ny_tilt', 'on_nz_tilt'] NZ_TILT = 0x80 @@ -1993,6 +2102,26 @@ def _is_shaken(current_state, new_state) -> bool: """ return not bool(current_state & 0b00000001) and bool(new_state & 0b00000001) + @staticmethod + def _is_lifted(current_state, new_state) -> bool: + """ + True if Alvik was lifted + :param current_state: + :param new_state: + :return: + """ + return not bool(current_state & 0b00000010) and bool(new_state & 0b00000010) + + @staticmethod + def _is_dropped(current_state, new_state) -> bool: + """ + True if Alvik was dropped + :param current_state: + :param new_state: + :return: + """ + return bool(current_state & 0b00000010) and not bool(new_state & 0b00000010) + @staticmethod def _is_x_tilted(current_state, new_state) -> bool: """ @@ -2066,6 +2195,12 @@ def update_state(self, state: int | None): if self._is_shaken(self._current_state, state): self.execute_callback('on_shake') + if self._is_lifted(self._current_state, state): + self.execute_callback('on_lift') + + if self._is_dropped(self._current_state, state): + self.execute_callback('on_drop') + if self._is_x_tilted(self._current_state, state): self.execute_callback('on_x_tilt') diff --git a/arduino_alvik/stm32_flash.py b/arduino_alvik/stm32_flash.py index 5ec1724..83b3ff2 100644 --- a/arduino_alvik/stm32_flash.py +++ b/arduino_alvik/stm32_flash.py @@ -1,5 +1,4 @@ import os -import sys from time import sleep_ms from machine import UART, Pin @@ -322,8 +321,9 @@ def STM32_writeMEM(file_path: str, toggle: "Generator" = None): print(f"STM32 ERROR FLASHING PAGE: {writeAddress}") return - sys.stdout.write('\r') - sys.stdout.write(f"{int((i/file_pages)*100)}%") + percentage = int((i / file_pages) * 100) + print("\033[2K\033[1G", end='\r') + print(f"Flashing STM32: {percentage:>3}%", end='') i = i + 1 _incrementAddress(writeAddress) if toggle is not None: diff --git a/examples/communication/modulino.py b/examples/communication/modulino.py index c8606f9..e5950d7 100644 --- a/examples/communication/modulino.py +++ b/examples/communication/modulino.py @@ -14,8 +14,7 @@ pixels = ModulinoPixels(alvik.i2c) if not pixels.connected: - print("🤷 No pixel modulino found") - sys.exit(-2) + raise Exception("🤷 No pixel modulino found") while True: try: diff --git a/examples/events/hot_wheels.py b/examples/events/hot_wheels.py new file mode 100644 index 0000000..7ce9894 --- /dev/null +++ b/examples/events/hot_wheels.py @@ -0,0 +1,44 @@ +from arduino_alvik import ArduinoAlvik +from time import sleep_ms + + +def stop_when_up(alvik): + print("lift") + alvik.set_wheels_speed(0, 0) + + +def run_when_down(alvik): + print("drop") + alvik.set_wheels_speed(20, 20) + + +alvik = ArduinoAlvik() +alvik.on_lift(stop_when_up, (alvik,)) +alvik.on_drop(run_when_down, (alvik,)) +alvik.begin() +color_val = 0 + + +def blinking_leds(val): + alvik.left_led.set_color(val & 0x01, val & 0x02, val & 0x04) + alvik.right_led.set_color(val & 0x02, val & 0x04, val & 0x01) + + +while not alvik.get_touch_ok(): + sleep_ms(100) + +alvik.set_wheels_speed(20, 20) + +while not alvik.get_touch_cancel(): + + try: + blinking_leds(color_val) + color_val = (color_val + 1) % 7 + sleep_ms(500) + + except KeyboardInterrupt as e: + print('over') + alvik.stop() + break + +alvik.stop() diff --git a/examples/events/motion_events.py b/examples/events/motion_events.py index b0f8146..f8ea953 100644 --- a/examples/events/motion_events.py +++ b/examples/events/motion_events.py @@ -31,6 +31,8 @@ def simple_print(custom_text: str = '') -> None: alvik = ArduinoAlvik() alvik.on_shake(toggle_left_led, ("ALVIK WAS SHAKEN... YOU MAKE ME SHIVER :)", toggle_value(), )) +alvik.on_lift(simple_print, ("ALVIK WAS LIFTED",)) +alvik.on_drop(simple_print, ("ALVIK WAS DROPPED",)) alvik.on_x_tilt(simple_print, ("TILTED ON X",)) alvik.on_nx_tilt(simple_print, ("TILTED ON -X",)) alvik.on_y_tilt(simple_print, ("TILTED ON Y",)) diff --git a/examples/reload_modules.py b/examples/reload_modules.py new file mode 100644 index 0000000..bd5108a --- /dev/null +++ b/examples/reload_modules.py @@ -0,0 +1,14 @@ +import sys + +def reload_modules(): + to_be_reloaded = [] + + for m in sys.modules: + to_be_reloaded.append(m) + del sys.modules[m] + + for m in to_be_reloaded: + exec(f'import {m}') + + +reload_modules() \ No newline at end of file diff --git a/examples/update_firmware.py b/examples/update_firmware.py new file mode 100644 index 0000000..96845fe --- /dev/null +++ b/examples/update_firmware.py @@ -0,0 +1,8 @@ +from arduino_alvik import update_firmware + +# this is a patch to fix possible running threads on Alvik +from arduino_alvik import ArduinoAlvik +alvik = ArduinoAlvik() +alvik.stop() + +update_firmware('/firmware.bin') \ No newline at end of file diff --git a/package.json b/package.json index 134ac1b..f946578 100644 --- a/package.json +++ b/package.json @@ -13,5 +13,5 @@ ["github:arduino/ucPack-mpy", "0.1.7"], ["github:arduino/arduino-runtime-mpy", "0.4.0"] ], - "version": "1.1.2" + "version": "1.1.3" }