Source code for ingenialink.canopen.servo

import canopen
from canopen.emcy import EmcyConsumer

from ingenialink.constants import CAN_MAX_WRITE_SIZE
from ingenialink.exceptions import ILAccessError, ILIOError
from ingenialink.utils._utils import convert_bytes_to_dtype, convert_dtype_to_bytes
from ingenialink.servo import Servo
from ingenialink.canopen.dictionary import CanopenDictionary
from ingenialink.canopen.register import CanopenRegister
from ingenialink.enums.register import REG_DTYPE, REG_ACCESS

import ingenialogger

logger = ingenialogger.get_logger(__name__)

CANOPEN_SDO_RESPONSE_TIMEOUT = 0.3


[docs]class CanopenServo(Servo): """CANopen Servo instance. Args: target (int): Node ID to be connected. node (canopen.RemoteNode): Remote Node of the drive. dictionary_path (str): Path to the dictionary. eds (str): Path to the eds file. servo_status_listener (bool): Toggle the listener of the servo for its status, errors, faults, etc. """ DICTIONARY_CLASS = CanopenDictionary MAX_WRITE_SIZE = CAN_MAX_WRITE_SIZE STATUS_WORD_REGISTERS = "CIA402_DRV_STATE_STATUS" RESTORE_COCO_ALL = "CIA301_COMMS_RESTORE_ALL" STORE_COCO_ALL = "CIA301_COMMS_STORE_ALL" MONITORING_DATA = CanopenRegister( identifier="", units="", subnode=0, idx=0x58B2, subidx=0x00, cyclic="CONFIG", dtype=REG_DTYPE.U16, access=REG_ACCESS.RO, ) DIST_DATA = CanopenRegister( identifier="", units="", subnode=0, idx=0x58B4, subidx=0x00, cyclic="CONFIG", dtype=REG_DTYPE.U16, access=REG_ACCESS.RW, ) def __init__(self, target, node, dictionary_path=None, eds=None, servo_status_listener=False): self.eds = eds self.__node = node self.__emcy_consumer = EmcyConsumer() super(CanopenServo, self).__init__(target, dictionary_path, servo_status_listener)
[docs] def read(self, reg, subnode=1): value = super().read(reg, subnode=subnode) if isinstance(value, str): value = value.replace("\x00", "") return value
[docs] def store_parameters(self, subnode=None, sdo_timeout=3): """Store all the current parameters of the target subnode. Args: subnode (int): Subnode of the axis. `None` by default which stores all the parameters. sdo_timeout (int): Timeout value for each SDO response. Raises: ILError: Invalid subnode. ILObjectNotExist: Failed to write to the registers. """ self._change_sdo_timeout(sdo_timeout) super().store_parameters(subnode) self._change_sdo_timeout(CANOPEN_SDO_RESPONSE_TIMEOUT)
def _write_raw(self, reg, data): try: self._lock.acquire() self.__node.sdo.download(reg.idx, reg.subidx, data) except Exception as e: logger.error("Failed writing %s. Exception: %s", str(reg.identifier), e) error_raised = f"Error writing {reg.identifier}" raise ILIOError(error_raised) finally: self._lock.release() def _read_raw(self, reg): try: self._lock.acquire() value = self.__node.sdo.upload(reg.idx, reg.subidx) except Exception as e: logger.error("Failed reading %s. Exception: %s", str(reg.identifier), e) error_raised = f"Error reading {reg.identifier}" raise ILIOError(error_raised) finally: self._lock.release() return value
[docs] def emcy_subscribe(self, cb): """Subscribe to emergency messages. Args: cb: Callback Returns: int: Assigned slot. """ self.__emcy_consumer.add_callback(cb) return len(self.__emcy_consumer.callbacks) - 1
[docs] def emcy_unsubscribe(self, slot): """Unsubscribe from emergency messages. Args: slot (int): Assigned slot when subscribed. """ del self.__emcy_consumer.callbacks[slot]
def _change_sdo_timeout(self, value): """Changes the SDO timeout of the node.""" self.__node.sdo.RESPONSE_TIMEOUT = value @staticmethod def __monitoring_disturbance_map_can_address(address, subnode): """Map CAN register address to IPB register address.""" return address - (0x2000 + (0x800 * (subnode - 1))) def _monitoring_disturbance_data_to_map_register(self, subnode, address, dtype, size): """Arrange necessary data to map a monitoring/disturbance register. Args: subnode (int): Subnode to be targeted. address (int): Register address to map. dtype (int): Register data type. size (int): Size of data in bytes. """ ipb_address = self.__monitoring_disturbance_map_can_address(address, subnode) return super()._monitoring_disturbance_data_to_map_register( subnode, ipb_address, dtype, size ) @property def node(self): """canopen.RemoteNode: Remote node of the servo.""" return self.__node