from typing import Optional, Union, Callable, Any
import ingenialogger
import canopen
from canopen.emcy import EmcyConsumer
from ingenialink.constants import CAN_MAX_WRITE_SIZE
from ingenialink.exceptions import ILIOError
from ingenialink.servo import Servo
from ingenialink.canopen.dictionary import CanopenDictionary
from ingenialink.canopen.register import CanopenRegister
from ingenialink.enums.register import REG_DTYPE, REG_ACCESS
from ingenialink.register import Register
logger = ingenialogger.get_logger(__name__)
CANOPEN_SDO_RESPONSE_TIMEOUT = 0.3
[docs]class CanopenServo(Servo):
"""CANopen Servo instance.
Args:
target: Node ID to be connected.
node: Remote Node of the drive.
dictionary_path: Path to the dictionary.
servo_status_listener: Toggle the listener of the servo for
its status, errors, faults, etc.
"""
DICTIONARY_CLASS = CanopenDictionary
MAX_WRITE_SIZE = CAN_MAX_WRITE_SIZE
STATUS_WORD_REGISTERS = "CIA402_DRV_STATE_STATUS"
RESTORE_COCO_ALL = "CIA301_COMMS_RESTORE_ALL"
STORE_COCO_ALL = "CIA301_COMMS_STORE_ALL"
MONITORING_DATA = CanopenRegister(
idx=0x58B2,
subidx=0x00,
cyclic="CONFIG",
dtype=REG_DTYPE.U16,
access=REG_ACCESS.RO,
subnode=0,
)
DIST_DATA = CanopenRegister(
idx=0x58B4,
subidx=0x00,
cyclic="CONFIG",
dtype=REG_DTYPE.U16,
access=REG_ACCESS.RW,
subnode=0,
)
def __init__(
self,
target: int,
node: canopen.RemoteNode,
dictionary_path: str,
servo_status_listener: bool = False,
) -> None:
self.__node = node
self.__emcy_consumer = EmcyConsumer()
super(CanopenServo, self).__init__(target, dictionary_path, servo_status_listener)
[docs] def read(self, reg: Union[str, Register], subnode: int = 1) -> Union[int, float, str]:
value = super().read(reg, subnode=subnode)
if isinstance(value, str):
value = value.replace("\x00", "")
return value
[docs] def store_parameters(self, subnode: Optional[int] = None, sdo_timeout: int = 3) -> None:
"""Store all the current parameters of the target subnode.
Args:
subnode: Subnode of the axis. `None` by default which stores all the parameters.
sdo_timeout: Timeout value for each SDO response.
Raises:
ILError: Invalid subnode.
ILObjectNotExist: Failed to write to the registers.
"""
self._change_sdo_timeout(sdo_timeout)
super().store_parameters(subnode)
self._change_sdo_timeout(CANOPEN_SDO_RESPONSE_TIMEOUT)
def _write_raw(self, reg: CanopenRegister, data: bytes) -> None: # type: ignore [override]
try:
self._lock.acquire()
self.__node.sdo.download(reg.idx, reg.subidx, data)
except Exception as e:
logger.error("Failed writing %s. Exception: %s", str(reg.identifier), e)
error_raised = f"Error writing {reg.identifier}"
raise ILIOError(error_raised) from e
finally:
self._lock.release()
def _read_raw(self, reg: CanopenRegister) -> bytes: # type: ignore [override]
try:
self._lock.acquire()
value = self.__node.sdo.upload(reg.idx, reg.subidx)
except Exception as e:
logger.error("Failed reading %s. Exception: %s", str(reg.identifier), e)
error_raised = f"Error reading {reg.identifier}"
raise ILIOError(error_raised)
finally:
self._lock.release()
if not isinstance(value, bytes):
return bytes()
return value
[docs] def emcy_subscribe(self, cb: Callable[..., Any]) -> int:
"""Subscribe to emergency messages.
Args:
cb: Callback
Returns:
Assigned slot.
"""
self.__emcy_consumer.add_callback(cb)
return len(self.__emcy_consumer.callbacks) - 1
[docs] def emcy_unsubscribe(self, slot: int) -> None:
"""Unsubscribe from emergency messages.
Args:
slot: Assigned slot when subscribed.
"""
del self.__emcy_consumer.callbacks[slot]
def _change_sdo_timeout(self, value: float) -> None:
"""Changes the SDO timeout of the node."""
self.__node.sdo.RESPONSE_TIMEOUT = value
@staticmethod
def __monitoring_disturbance_map_can_address(address: int, subnode: int) -> int:
"""Map CAN register address to IPB register address."""
return address - (0x2000 + (0x800 * (subnode - 1)))
def _monitoring_disturbance_data_to_map_register(
self, subnode: int, address: int, dtype: int, size: int
) -> int:
"""Arrange necessary data to map a monitoring/disturbance register.
Args:
subnode: Subnode to be targeted.
address: Register address to map.
dtype: Register data type.
size: Size of data in bytes.
"""
ipb_address = self.__monitoring_disturbance_map_can_address(address, subnode)
return super()._monitoring_disturbance_data_to_map_register(
subnode, ipb_address, dtype, size
)
@property
def node(self) -> canopen.RemoteNode:
"""Remote node of the servo."""
return self.__node