akari_client.serial.dynamixel_communicator のソースコード

from __future__ import annotations

import contextlib
import pathlib
import threading
from typing import Iterator

from dynamixel_sdk import COMM_SUCCESS, PacketHandler, PortHandler

DEFAULT_BAUDRATE = 1000000
DEFAULT_DEVICE_NAME = pathlib.Path("/dev/ttyUSB_dynamixel")
DEFAULT_PROTOCOL_VERSION = 2.0


def get_baudrate_control_value(baudrate: int) -> int:
    """Dynamixelのボーレート設定値を取得する"""
    if baudrate == 1000000:
        return 3
    elif baudrate == 57600:
        return 1
    else:
        raise ValueError("Either 57600 or 1000000 is supported")


[ドキュメント]class DynamixelCommunicator: def __init__( self, port_handler: PortHandler, packet_handler: PacketHandler, ) -> None: """dynamixelと通信を行うクラス Args: port_handler: dynamixel通信ポートのハンドラ packet_handler: dynamixelパケット通信のハンドラ """ self._port_handler = port_handler self._packet_handler = packet_handler self._lock = threading.Lock()
[ドキュメント] @classmethod @contextlib.contextmanager def open( cls, serial_port: pathlib.Path = DEFAULT_DEVICE_NAME, baudrate: int = DEFAULT_BAUDRATE, protocol_version: float = DEFAULT_PROTOCOL_VERSION, ) -> Iterator[DynamixelCommunicator]: """dynamixelと通信を行うクラスを初期化する""" port_handler = PortHandler(str(serial_port)) try: if port_handler.setBaudRate(baudrate): print("Succeeded to change the baudrate") else: raise RuntimeError("Failed to change the baudrate") if port_handler.openPort(): print("Succeeded to open the port") else: raise RuntimeError("Failed to open port") packet_handler = PacketHandler(protocol_version) yield cls( port_handler=port_handler, packet_handler=packet_handler, ) finally: port_handler.closePort()
[ドキュメント] def read(self, device_id: int, address: int, length: int) -> int: """dynamixelからデータを読み込む。 Args: device_id: DynamixelのID address: データアドレス length: データ長 Returns: データ値 """ value: int result: int err: int with self._lock: if length == 1: value, result, err = self._packet_handler.read1ByteTxRx( self._port_handler, device_id, address ) elif length == 2: value, result, err = self._packet_handler.read2ByteTxRx( self._port_handler, device_id, address ) elif length == 4: value, result, err = self._packet_handler.read4ByteTxRx( self._port_handler, device_id, address ) else: raise ValueError(f"invalid length: {length}") if result != COMM_SUCCESS: raise RuntimeError(self._packet_handler.getTxRxResult(result)) elif err != 0: raise RuntimeError(self._packet_handler.getRxPacketError(err)) return value
[ドキュメント] def write(self, device_id: int, address: int, length: int, value: int) -> None: """dynamixelにデータを書き込む。 Args: device_id: DynamixelのID address: データアドレス length: データ長 value: データ値 """ result: int err: int with self._lock: if length == 1: result, err = self._packet_handler.write1ByteTxRx( self._port_handler, device_id, address, value ) elif length == 2: result, err = self._packet_handler.write2ByteTxRx( self._port_handler, device_id, address, value ) elif length == 4: result, err = self._packet_handler.write4ByteTxRx( self._port_handler, device_id, address, value ) else: raise ValueError(f"invalid length: {length}") if result != COMM_SUCCESS: raise RuntimeError(self._packet_handler.getTxRxResult(result)) elif err != 0: raise RuntimeError(self._packet_handler.getRxPacketError(err))