Source code for ingenialink.ethercat.servo

from typing import TYPE_CHECKING

import ingenialogger

try:
    import pysoem  # type: ignore
except ImportError as ex:
    pysoem = None
    pysoem_import_error = ex

if TYPE_CHECKING:
    from pysoem import CdefSlave

from ingenialink.exceptions import ILIOError
from ingenialink.servo import Servo
from ingenialink.ethercat.dictionary import EthercatDictionary
from ingenialink.ethercat.register import EthercatRegister
from ingenialink.register import REG_DTYPE, REG_ACCESS
from ingenialink.constants import CAN_MAX_WRITE_SIZE, CANOPEN_ADDRESS_OFFSET, MAP_ADDRESS_OFFSET

logger = ingenialogger.get_logger(__name__)


[docs]class EthercatServo(Servo): """Ethercat Servo instance. Args: slave: Slave to be connected. slave_id: Slave ID. dictionary_path: Path to the dictionary. servo_status_listener: Toggle the listener of the servo for its status, errors, faults, etc. Raises: ImportError: WinPcap is not installed """ DICTIONARY_CLASS = EthercatDictionary MAX_WRITE_SIZE = CAN_MAX_WRITE_SIZE WRONG_WORKING_COUNTER = -1 MONITORING_DATA = EthercatRegister( identifier="MONITORING_DATA", units="", subnode=0, idx=0x58B2, subidx=0x01, cyclic="CONFIG", dtype=REG_DTYPE.U16, access=REG_ACCESS.RO, ) DIST_DATA = EthercatRegister( identifier="DISTURBANCE_DATA", units="", subnode=0, idx=0x58B4, subidx=0x01, cyclic="CONFIG", dtype=REG_DTYPE.U16, access=REG_ACCESS.WO, ) def __init__( self, slave: "CdefSlave", slave_id: int, dictionary_path: str, servo_status_listener: bool = False, ): if not pysoem: raise pysoem_import_error self.__slave = slave self.slave_id = slave_id super(EthercatServo, self).__init__(slave_id, dictionary_path, servo_status_listener) def _read_raw( # type: ignore [override] self, reg: EthercatRegister, buffer_size: int = 0, complete_access: bool = False ) -> bytes: self._lock.acquire() try: value: bytes = self.__slave.sdo_read(reg.idx, reg.subidx, buffer_size, complete_access) self._check_working_counter() except (pysoem.SdoError, pysoem.MailboxError, pysoem.PacketError, ILIOError) as e: raise ILIOError(f"Error reading {reg.identifier}. Reason: {e}") from e finally: self._lock.release() return value def _write_raw(self, reg: EthercatRegister, data: bytes, complete_access: bool = False) -> None: # type: ignore [override] self._lock.acquire() try: self.__slave.sdo_write(reg.idx, reg.subidx, data, complete_access) self._check_working_counter() except (pysoem.SdoError, pysoem.MailboxError, pysoem.PacketError, ILIOError) as e: raise ILIOError(f"Error writing {reg.identifier}. Reason: {e}") from e finally: self._lock.release() def _monitoring_read_data(self) -> bytes: # type: ignore [override] """Read monitoring data frame.""" return self._read_raw(self.MONITORING_DATA, buffer_size=1024, complete_access=True) def _disturbance_write_data(self, data: bytearray) -> None: # type: ignore [override] """Write disturbance data.""" return self._write_raw(self.DIST_DATA, bytes(data), complete_access=True) @staticmethod def __monitoring_disturbance_map_can_address(address: int, subnode: int) -> int: """Map CAN register address to IPB register address. Args: subnode: Subnode to be targeted. address: Register address to map. """ mapped_address: int = address - ( CANOPEN_ADDRESS_OFFSET + (MAP_ADDRESS_OFFSET * (subnode - 1)) ) return mapped_address def _monitoring_disturbance_data_to_map_register( self, subnode: int, address: int, dtype: int, size: int ) -> int: """Arrange necessary data to map a monitoring/disturbance register. Args: subnode: Subnode to be targeted. address: Register address to map. dtype: Register data type. size: Size of data in bytes. """ ipb_address = self.__monitoring_disturbance_map_can_address(address, subnode) mapped_address: int = super()._monitoring_disturbance_data_to_map_register( subnode, ipb_address, dtype, size ) return mapped_address def _check_working_counter(self) -> None: """Check if the slave responds with a correct working counter. Raises: ILIOError: If the received working counter is incorrect. """ if self.__slave.mbx_receive() == self.WRONG_WORKING_COUNTER: raise ILIOError("Wrong working counter") @property def slave(self) -> "CdefSlave": """Ethercat slave""" return self.__slave