from __future__ import annotations
import contextlib
import pathlib
import threading
from typing import Iterator
from dynamixel_sdk import COMM_SUCCESS, PacketHandler, PortHandler
DEFAULT_BAUDRATE = 1000000
DEFAULT_DEVICE_NAME = pathlib.Path("/dev/ttyUSB_dynamixel")
DEFAULT_PROTOCOL_VERSION = 2.0
def get_baudrate_control_value(baudrate: int) -> int:
    """Dynamixelのボーレート設定値を取得する"""
    if baudrate == 1000000:
        return 3
    elif baudrate == 57600:
        return 1
    else:
        raise ValueError("Either 57600 or 1000000 is supported")
[ドキュメント]class DynamixelCommunicator:
    def __init__(
        self,
        port_handler: PortHandler,
        packet_handler: PacketHandler,
    ) -> None:
        """dynamixelと通信を行うクラス
        Args:
            port_handler: dynamixel通信ポートのハンドラ
            packet_handler: dynamixelパケット通信のハンドラ
        """
        self._port_handler = port_handler
        self._packet_handler = packet_handler
        self._lock = threading.Lock()
[ドキュメント]    @classmethod
    @contextlib.contextmanager
    def open(
        cls,
        serial_port: pathlib.Path = DEFAULT_DEVICE_NAME,
        baudrate: int = DEFAULT_BAUDRATE,
        protocol_version: float = DEFAULT_PROTOCOL_VERSION,
    ) -> Iterator[DynamixelCommunicator]:
        """dynamixelと通信を行うクラスを初期化する"""
        port_handler = PortHandler(str(serial_port))
        try:
            if port_handler.setBaudRate(baudrate):
                print("Succeeded to change the baudrate")
            else:
                raise RuntimeError("Failed to change the baudrate")
            if port_handler.openPort():
                print("Succeeded to open the port")
            else:
                raise RuntimeError("Failed to open port")
            packet_handler = PacketHandler(protocol_version)
            yield cls(
                port_handler=port_handler,
                packet_handler=packet_handler,
            )
        finally:
            port_handler.closePort() 
[ドキュメント]    def read(self, device_id: int, address: int, length: int) -> int:
        """dynamixelからデータを読み込む。
        Args:
            device_id: DynamixelのID
            address: データアドレス
            length: データ長
        Returns:
            データ値
        """
        value: int
        result: int
        err: int
        with self._lock:
            if length == 1:
                value, result, err = self._packet_handler.read1ByteTxRx(
                    self._port_handler, device_id, address
                )
            elif length == 2:
                value, result, err = self._packet_handler.read2ByteTxRx(
                    self._port_handler, device_id, address
                )
            elif length == 4:
                value, result, err = self._packet_handler.read4ByteTxRx(
                    self._port_handler, device_id, address
                )
            else:
                raise ValueError(f"invalid length: {length}")
        if result != COMM_SUCCESS:
            raise RuntimeError(self._packet_handler.getTxRxResult(result))
        elif err != 0:
            raise RuntimeError(self._packet_handler.getRxPacketError(err))
        return value 
[ドキュメント]    def write(self, device_id: int, address: int, length: int, value: int) -> None:
        """dynamixelにデータを書き込む。
        Args:
            device_id: DynamixelのID
            address: データアドレス
            length: データ長
            value: データ値
        """
        result: int
        err: int
        with self._lock:
            if length == 1:
                result, err = self._packet_handler.write1ByteTxRx(
                    self._port_handler, device_id, address, value
                )
            elif length == 2:
                result, err = self._packet_handler.write2ByteTxRx(
                    self._port_handler, device_id, address, value
                )
            elif length == 4:
                result, err = self._packet_handler.write4ByteTxRx(
                    self._port_handler, device_id, address, value
                )
            else:
                raise ValueError(f"invalid length: {length}")
        if result != COMM_SUCCESS:
            raise RuntimeError(self._packet_handler.getTxRxResult(result))
        elif err != 0:
            raise RuntimeError(self._packet_handler.getRxPacketError(err))