Source code for ingenialink.canopen.servo

from typing import Optional, Union, Callable, Any

import ingenialogger
import canopen
from canopen.emcy import EmcyConsumer

from ingenialink.constants import CAN_MAX_WRITE_SIZE
from ingenialink.exceptions import ILIOError
from ingenialink.servo import Servo
from ingenialink.canopen.dictionary import CanopenDictionary
from ingenialink.canopen.register import CanopenRegister
from ingenialink.enums.register import REG_DTYPE, REG_ACCESS
from ingenialink.register import Register


logger = ingenialogger.get_logger(__name__)

CANOPEN_SDO_RESPONSE_TIMEOUT = 0.3


[docs]class CanopenServo(Servo): """CANopen Servo instance. Args: target: Node ID to be connected. node: Remote Node of the drive. dictionary_path: Path to the dictionary. servo_status_listener: Toggle the listener of the servo for its status, errors, faults, etc. """ DICTIONARY_CLASS = CanopenDictionary MAX_WRITE_SIZE = CAN_MAX_WRITE_SIZE STATUS_WORD_REGISTERS = "CIA402_DRV_STATE_STATUS" RESTORE_COCO_ALL = "CIA301_COMMS_RESTORE_ALL" STORE_COCO_ALL = "CIA301_COMMS_STORE_ALL" MONITORING_DATA = CanopenRegister( idx=0x58B2, subidx=0x00, cyclic="CONFIG", dtype=REG_DTYPE.U16, access=REG_ACCESS.RO, subnode=0, ) DIST_DATA = CanopenRegister( idx=0x58B4, subidx=0x00, cyclic="CONFIG", dtype=REG_DTYPE.U16, access=REG_ACCESS.RW, subnode=0, ) def __init__( self, target: int, node: canopen.RemoteNode, dictionary_path: str, servo_status_listener: bool = False, ) -> None: self.__node = node self.__emcy_consumer = EmcyConsumer() super(CanopenServo, self).__init__(target, dictionary_path, servo_status_listener)
[docs] def read(self, reg: Union[str, Register], subnode: int = 1) -> Union[int, float, str]: value = super().read(reg, subnode=subnode) if isinstance(value, str): value = value.replace("\x00", "") return value
[docs] def store_parameters(self, subnode: Optional[int] = None, sdo_timeout: int = 3) -> None: """Store all the current parameters of the target subnode. Args: subnode: Subnode of the axis. `None` by default which stores all the parameters. sdo_timeout: Timeout value for each SDO response. Raises: ILError: Invalid subnode. ILObjectNotExist: Failed to write to the registers. """ self._change_sdo_timeout(sdo_timeout) super().store_parameters(subnode) self._change_sdo_timeout(CANOPEN_SDO_RESPONSE_TIMEOUT)
def _write_raw(self, reg: CanopenRegister, data: bytes) -> None: # type: ignore [override] try: self._lock.acquire() self.__node.sdo.download(reg.idx, reg.subidx, data) except Exception as e: logger.error("Failed writing %s. Exception: %s", str(reg.identifier), e) error_raised = f"Error writing {reg.identifier}" raise ILIOError(error_raised) from e finally: self._lock.release() def _read_raw(self, reg: CanopenRegister) -> bytes: # type: ignore [override] try: self._lock.acquire() value = self.__node.sdo.upload(reg.idx, reg.subidx) except Exception as e: logger.error("Failed reading %s. Exception: %s", str(reg.identifier), e) error_raised = f"Error reading {reg.identifier}" raise ILIOError(error_raised) finally: self._lock.release() if not isinstance(value, bytes): return bytes() return value
[docs] def emcy_subscribe(self, cb: Callable[..., Any]) -> int: """Subscribe to emergency messages. Args: cb: Callback Returns: Assigned slot. """ self.__emcy_consumer.add_callback(cb) return len(self.__emcy_consumer.callbacks) - 1
[docs] def emcy_unsubscribe(self, slot: int) -> None: """Unsubscribe from emergency messages. Args: slot: Assigned slot when subscribed. """ del self.__emcy_consumer.callbacks[slot]
def _change_sdo_timeout(self, value: float) -> None: """Changes the SDO timeout of the node.""" self.__node.sdo.RESPONSE_TIMEOUT = value @staticmethod def __monitoring_disturbance_map_can_address(address: int, subnode: int) -> int: """Map CAN register address to IPB register address.""" return address - (0x2000 + (0x800 * (subnode - 1))) def _monitoring_disturbance_data_to_map_register( self, subnode: int, address: int, dtype: int, size: int ) -> int: """Arrange necessary data to map a monitoring/disturbance register. Args: subnode: Subnode to be targeted. address: Register address to map. dtype: Register data type. size: Size of data in bytes. """ ipb_address = self.__monitoring_disturbance_map_can_address(address, subnode) return super()._monitoring_disturbance_data_to_map_register( subnode, ipb_address, dtype, size ) @property def node(self) -> canopen.RemoteNode: """Remote node of the servo.""" return self.__node