akari_client.grpc.joints_controller のソースコード

from typing import List

import grpc
from akari_proto import joints_controller_pb2
from akari_proto.grpc.error import deserialize_error
from akari_proto.joints_controller_pb2_grpc import JointsControllerServiceStub
from google.protobuf.empty_pb2 import Empty

from ..joint_controller import PositionLimit, RevoluteJointController
from ._error import serializer


class _GrpcJointsController:
    def __init__(self, channel: grpc.Channel) -> None:
        self._stub = JointsControllerServiceStub(channel)

    @property
    def stub(self) -> JointsControllerServiceStub:
        return self._stub

    @deserialize_error(serializer)
    def get_joint_names(self) -> List[str]:
        res: joints_controller_pb2.GetJointNamesResponse = self._stub.GetJointNames(
            Empty()
        )
        return list(res.joint_names)


[ドキュメント]class GrpcJointController(RevoluteJointController): def __init__(self, joint_name: str, client: _GrpcJointsController) -> None: self._joint_name = joint_name self._client = client
[ドキュメント] @deserialize_error(serializer) def get_position_limit(self) -> PositionLimit: res: joints_controller_pb2.GetPositionLimitResponse = ( self._client.stub.GetPositionLimit(self.joint_specifier) ) limit = PositionLimit(min=res.min, max=res.max) return limit
@property def joint_name(self) -> str: return self._joint_name @property def joint_specifier(self) -> joints_controller_pb2.JointSpecifier: return joints_controller_pb2.JointSpecifier( joint_name=self._joint_name, )
[ドキュメント] @deserialize_error(serializer) def get_servo_enabled(self) -> bool: res: joints_controller_pb2.GetServoEnabledResponse = ( self._client.stub.GetServoEnabled(self.joint_specifier) ) return res.enabled
[ドキュメント] @deserialize_error(serializer) def set_servo_enabled(self, enabled: bool) -> None: request = joints_controller_pb2.SetServoEnabledRequest( target_joint=self.joint_specifier, enabled=enabled, ) self._client.stub.SetServoEnabled(request)
[ドキュメント] @deserialize_error(serializer) def set_profile_acceleration(self, rad_per_sec2: float) -> None: request = joints_controller_pb2.SetProfileAccelerationRequest( target_joint=self.joint_specifier, rad_per_sec2=rad_per_sec2, ) self._client.stub.SetProfileAcceleration(request)
[ドキュメント] @deserialize_error(serializer) def get_profile_acceleration(self) -> float: res: joints_controller_pb2.GetProfileAccelerationResponse = ( self._client.stub.GetProfileAcceleration(self.joint_specifier) ) return res.rad_per_sec2
[ドキュメント] @deserialize_error(serializer) def set_profile_velocity(self, rad_per_sec: float) -> None: request = joints_controller_pb2.SetProfileVelocityRequest( target_joint=self.joint_specifier, rad_per_sec=rad_per_sec, ) self._client.stub.SetProfileVelocity(request)
[ドキュメント] @deserialize_error(serializer) def get_profile_velocity(self) -> float: res: joints_controller_pb2.GetProfileVelocityResponse = ( self._client.stub.GetProfileVelocity(self.joint_specifier) ) return res.rad_per_sec
[ドキュメント] @deserialize_error(serializer) def set_goal_position(self, rad: float) -> None: request = joints_controller_pb2.SetGoalPositionRequest( target_joint=self.joint_specifier, rad=rad, ) self._client.stub.SetGoalPosition(request)
[ドキュメント] @deserialize_error(serializer) def get_present_position(self) -> float: res: joints_controller_pb2.GetPresentPositionResponse = ( self._client.stub.GetPresentPosition(self.joint_specifier) ) return res.rad
[ドキュメント] @deserialize_error(serializer) def get_moving_state(self) -> bool: res: joints_controller_pb2.GetMovingStateResponse = ( self._client.stub.GetMovingState(self.joint_specifier) ) return res.moving