akari_client.serial.m5stack_communicator のソースコード

from __future__ import annotations

import json
import pathlib
import threading
import time
from typing import Any, Dict, Optional, cast

import serial

from ..m5stack_client import M5ComDict

BAUDRATE = 500000
DEVICE_NAME = pathlib.Path("/dev/ttyUSB_M5Stack")
TIMEOUT = 0.2
RESPONSE_TIMEOUT = 1.0


[ドキュメント]class M5SerialCommunicator: def __init__( self, baudrate: int = BAUDRATE, port: pathlib.Path = DEVICE_NAME, timeout: float = TIMEOUT, ) -> None: self._reference_time = time.time() self._condition = threading.Condition() self._latest_msg: Optional[M5ComDict] = None self._thread: Optional[threading.Thread] = None self._send_lock = threading.Lock() self._exit = False self._serial = serial.Serial( baudrate=baudrate, port=str(port), timeout=timeout, ) def __enter__(self) -> M5SerialCommunicator: return self def __exit__(self, exc_type: Any, exc: Any, tb: Any) -> None: self.stop() def start(self) -> None: if self._thread is not None: return self._exit = False self._thread = threading.Thread(target=self._read, daemon=True) self._thread.start() def stop(self) -> None: thread = self._thread if thread is None: return self._thread = None self._exit = True if thread.is_alive(): thread.join() def _read_line(self) -> None: res: Dict[str, Any] = {} data_str = self._serial.readline().decode("utf-8") received = json.loads(data_str) res["din0"] = bool(received["io"]["di0"]) res["din1"] = bool(received["io"]["di1"]) res["ain0"] = int(received["io"]["ai0"]) res["dout0"] = bool(received["io"]["do0"]) res["dout1"] = bool(received["io"]["do1"]) res["pwmout0"] = int(received["io"]["po0"]) res["general0"] = float(received["io"]["gn0"]) res["general1"] = float(received["io"]["gn1"]) res["button_a"] = bool(received["btn"]["a"]) res["button_b"] = bool(received["btn"]["b"]) res["button_c"] = bool(received["btn"]["c"]) res["temperature"] = float(received["tmp"]) res["pressure"] = float(received["pre"]) res["brightness"] = int(received["bri"]) res["is_response"] = bool(received["co"]) res["time"] = self.current_time with self._condition: self._latest_msg = cast(M5ComDict, res) self._condition.notify_all() def _read(self) -> None: self._serial.reset_input_buffer() while not self._exit: if self._serial.in_waiting > 0: try: self._read_line() except Exception as e: print(f"An error occured in read thread: {e}") else: time.sleep(0.1) @property def reference_time(self) -> float: return self._reference_time @property def current_time(self) -> float: return time.time() - self.reference_time def send(self, data: bytes) -> None: with self._send_lock: self._serial.write(data) def send_data(self, data: Dict[str, Any], sync: bool = True) -> None: json_data = json.dumps(data, ensure_ascii=False) self.send(bytes(json_data, "UTF-8")) if sync: self._wait_response() def _wait_response(self) -> None: called_time = self.current_time def _predicate() -> bool: elapsed = self.current_time - called_time return ( self._latest_msg is not None and self._latest_msg["is_response"] ) or (elapsed > RESPONSE_TIMEOUT) with self._condition: while True: self._condition.wait() if _predicate(): break def get(self) -> M5ComDict: now = self.current_time def _predicate() -> bool: return self._latest_msg is not None and self._latest_msg["time"] > now with self._condition: self._condition.wait_for(_predicate) msg = self._latest_msg assert msg is not None return msg