Source code for ingenialink.servo

import os
import time
import threading
import xml.etree.ElementTree as ET
from xml.dom import minidom
from abc import abstractmethod

from ingenialink.exceptions import (
    ILIOError,
    ILRegisterNotFoundError,
    ILError,
    ILStateError,
    ILAccessError,
)
from ingenialink.register import Register
from ingenialink.utils._utils import (
    get_drive_identification,
    cleanup_register,
    raise_err,
    convert_bytes_to_dtype,
    convert_dtype_to_bytes,
)
from ingenialink.constants import (
    PASSWORD_RESTORE_ALL,
    PASSWORD_STORE_ALL,
    DEFAULT_PDS_TIMEOUT,
    MONITORING_BUFFER_SIZE,
    DEFAULT_DRIVE_NAME,
)
from ingenialink.utils import constants
from ingenialink.enums.register import REG_DTYPE, REG_ADDRESS_TYPE, REG_ACCESS
from ingenialink.enums.servo import SERVO_STATE

import ingenialogger

logger = ingenialogger.get_logger(__name__)

OPERATION_TIME_OUT = -3


[docs]class ServoStatusListener(threading.Thread): """Reads the status word to check if the drive is alive. Args: servo (Servo): Servo instance of the drive. """ def __init__(self, servo): super(ServoStatusListener, self).__init__() self.__servo = servo self.__stop = False
[docs] def run(self): """Checks if the drive is alive by reading the status word register""" previous_states = self.__servo.status while not self.__stop: for subnode in range(1, self.__servo.subnodes): try: current_state = self.__servo.get_state(subnode) if previous_states[subnode] != current_state: previous_states[subnode] = current_state self.__servo._notify_state(current_state, subnode) except ILIOError as e: logger.error("Error getting drive status. Exception : %s", e) time.sleep(1.5)
[docs] def stop(self): """Stops the loop that reads the status word register""" self.__stop = True
[docs]class Servo: """Declaration of a general Servo object. Args: target (str, int): Target ID of the servo. dictionary_path (str): Path to the dictionary file. servo_status_listener (bool): Toggle the listener of the servo for its status, errors, faults, etc. Raises: ILCreationError: If the servo cannot be created. """ DICTIONARY_CLASS = None MAX_WRITE_SIZE = None STATUS_WORD_REGISTERS = "DRV_STATE_STATUS" RESTORE_COCO_ALL = "DRV_RESTORE_COCO_ALL" RESTORE_MOCO_ALL_REGISTERS = "DRV_RESTORE_MOCO_ALL" STORE_COCO_ALL = "DRV_STORE_COCO_ALL" STORE_MOCO_ALL_REGISTERS = "DRV_STORE_MOCO_ALL" CONTROL_WORD_REGISTERS = "DRV_STATE_CONTROL" SERIAL_NUMBER_REGISTERS = ["DRV_ID_SERIAL_NUMBER_COCO", "DRV_ID_SERIAL_NUMBER"] SOFTWARE_VERSION_REGISTERS = ["DRV_APP_COCO_VERSION", "DRV_ID_SOFTWARE_VERSION"] PRODUCT_ID_REGISTERS = ["DRV_ID_PRODUCT_CODE_COCO", "DRV_ID_PRODUCT_CODE"] REVISION_NUMBER_REGISTERS = ["DRV_ID_REVISION_NUMBER_COCO", "DRV_ID_REVISION_NUMBER"] MONITORING_DIST_ENABLE = "MON_DIST_ENABLE" MONITORING_REMOVE_DATA = "MON_REMOVE_DATA" MONITORING_NUMBER_MAPPED_REGISTERS = "MON_CFG_TOTAL_MAP" MONITORING_BYTES_PER_BLOCK = "MON_CFG_BYTES_PER_BLOCK" MONITORING_ACTUAL_NUMBER_BYTES = "MON_CFG_BYTES_VALUE" MONITORING_DATA = None MONITORING_DISTURBANCE_VERSION = "MON_DIST_VERSION" DISTURBANCE_ENABLE = "DIST_ENABLE" DISTURBANCE_REMOVE_DATA = "DIST_REMOVE_DATA" DISTURBANCE_NUMBER_MAPPED_REGISTERS = "DIST_CFG_MAP_REGS" DIST_NUMBER_SAMPLES = "DIST_CFG_SAMPLES" DIST_DATA = None def __init__(self, target, dictionary_path=None, servo_status_listener=False): self.target = target if dictionary_path is not None: self._dictionary = self.DICTIONARY_CLASS(dictionary_path) else: self._dictionary = None self._info = None self.name = DEFAULT_DRIVE_NAME prod_name = "" if self.dictionary.part_number is None else self.dictionary.part_number self.full_name = f"{prod_name} {self.name} ({self.target})" """str: Obtains the servo full name.""" self.units_torque = None """SERVO_UNITS_TORQUE: Torque units.""" self.units_pos = None """SERVO_UNITS_POS: Position units.""" self.units_vel = None """SERVO_UNITS_VEL: Velocity units.""" self.units_acc = None """SERVO_UNITS_ACC: Acceleration units.""" self._lock = threading.RLock() self.__observers_servo_state = [] self.__listener_servo_status = None self.__monitoring = {} self.__disturbance = {"data": bytearray()} if servo_status_listener: self.start_status_listener() else: self.stop_status_listener()
[docs] def start_status_listener(self): """Start listening for servo status events (SERVO_STATE).""" if self.__listener_servo_status is not None: return self.__listener_servo_status = ServoStatusListener(self) self.__listener_servo_status.start()
[docs] def stop_status_listener(self): """Stop listening for servo status events (SERVO_STATE).""" if self.__listener_servo_status is None: return if self.__listener_servo_status.is_alive(): self.__listener_servo_status.stop() self.__listener_servo_status.join() self.__listener_servo_status = None
[docs] def load_configuration(self, config_file, subnode=None): """Write current dictionary storage to the servo drive. Args: config_file (str): Path to the dictionary. subnode (int): Subnode of the axis. Raises: FileNotFoundError: If the configuration file cannot be found. ValueError: If a configuration file from a subnode different from 0 is attempted to be loaded to subnode 0. ValueError: If an invalid subnode is provided. """ if subnode is not None and (not isinstance(subnode, int) or subnode < 0): raise ValueError("Invalid subnode") _, registers = self._read_configuration_file(config_file) dest_subnodes = [int(element.attrib["subnode"]) for element in registers] if subnode == 0 and subnode not in dest_subnodes: raise ValueError(f"Cannot load {config_file} to subnode {subnode}") cast_data = {"float": float, "str": str} for element in registers: try: if "storage" in element.attrib and element.attrib["access"] == "rw": if subnode is None: element_subnode = int(element.attrib["subnode"]) else: element_subnode = subnode reg_dtype = element.attrib["dtype"] reg_data = element.attrib["storage"] self.write( element.attrib["id"], cast_data.get(reg_dtype, int)(reg_data), subnode=element_subnode, ) except ILError as e: logger.error( "Exception during load_configuration, register %s: %s", str(element.attrib["id"]), e, )
[docs] def save_configuration(self, config_file, subnode=None): """Read all dictionary registers content and put it to the dictionary storage. Args: config_file (str): Destination path for the configuration file. subnode (int): Subnode of the axis. """ if subnode is not None and (not isinstance(subnode, int) or subnode < 0): raise ILError("Invalid subnode") prod_code, rev_number = get_drive_identification(self, subnode) tree = ET.Element("IngeniaDictionary") header = ET.SubElement(tree, "Header") version = ET.SubElement(header, "Version") version.text = "2" default_language = ET.SubElement(header, "DefaultLanguage") default_language.text = "en_US" body = ET.SubElement(tree, "Body") device = ET.SubElement(body, "Device") registers = ET.SubElement(device, "Registers") device.set("Interface", self.dictionary.interface) if self.dictionary.part_number is not None: device.set("PartNumber", self.dictionary.part_number) device.set("ProductCode", str(prod_code)) device.set("RevisionNumber", str(rev_number)) device.set("firmwareVersion", self.dictionary.firmware_version) access_ops = {value: key for key, value in self.dictionary.access_xdf_options.items()} dtype_ops = {value: key for key, value in self.dictionary.dtype_xdf_options.items()} if subnode is None: subnodes = range(self.dictionary.subnodes) else: subnodes = [subnode] for subnode in subnodes: registers_dict = self.dictionary.registers(subnode=subnode) for reg_id, register in registers_dict.items(): if (register.address_type == REG_ADDRESS_TYPE.NVM_NONE) or ( register.access != REG_ACCESS.RW ): continue register_xml = ET.SubElement(registers, "Register") register_xml.set("access", access_ops[register.access]) register_xml.set("dtype", dtype_ops[register.dtype]) register_xml.set("id", reg_id) self.__update_register_dict(register_xml, subnode) register_xml.set("subnode", str(subnode)) dom = minidom.parseString(ET.tostring(tree, encoding="utf-8")) with open(config_file, "wb") as f: f.write(dom.toprettyxml(indent="\t").encode())
@staticmethod def _read_configuration_file(config_file): """Read a configuration file. Returns the device metadata and the registers list. Args: config_file (str): Path to the dictionary. Returns: device: list: Register list. Raises: FileNotFoundError: If the configuration file cannot be found. """ if not os.path.isfile(config_file): raise FileNotFoundError(f"Could not find {config_file}.") with open(config_file, "r", encoding="utf-8") as xml_file: tree = ET.parse(xml_file) root = tree.getroot() device = root.find("Body/Device") axis = tree.findall("*/Device/Axes/Axis") if axis: # Multiaxis registers = root.findall("./Body/Device/Axes/Axis/Registers/Register") else: # Single axis registers = root.findall("./Body/Device/Registers/Register") return device, registers
[docs] def restore_parameters(self, subnode=None): """Restore all the current parameters of all the slave to default. .. note:: The drive needs a power cycle after this in order for the changes to be properly applied. Args: subnode (int): Subnode of the axis. `None` by default which restores all the parameters. Raises: ILError: Invalid subnode. ILObjectNotExist: Failed to write to the registers. """ if subnode is None: # Restore all self.write(reg=self.RESTORE_COCO_ALL, data=PASSWORD_RESTORE_ALL, subnode=0) logger.info("Restore all successfully done.") elif subnode == 0: # Restore subnode 0 raise ILError("The current firmware version does not have this feature implemented.") elif subnode > 0: # Restore axis self.write( reg=self.RESTORE_MOCO_ALL_REGISTERS, data=PASSWORD_RESTORE_ALL, subnode=subnode ) logger.info(f"Restore subnode {subnode} successfully done.") else: raise ILError("Invalid subnode {subnode}.") time.sleep(1.5)
[docs] def store_parameters(self, subnode=None): """Store all the current parameters of the target subnode. Args: subnode (int): Subnode of the axis. `None` by default which stores all the parameters. Raises: ILError: Invalid subnode. ILObjectNotExist: Failed to write to the registers. """ r = 0 try: if subnode is None: # Store all try: self.write(reg=self.STORE_COCO_ALL, data=PASSWORD_STORE_ALL, subnode=0) logger.info("Store all successfully done.") except ILError as e: logger.warning(f"Store all COCO failed. Reason: {e}. Trying MOCO...") r = -1 if r < 0: for dict_subnode in range(1, self.dictionary.subnodes): self.write( reg=self.STORE_MOCO_ALL_REGISTERS, data=PASSWORD_STORE_ALL, subnode=dict_subnode, ) logger.info(f"Store axis {dict_subnode} successfully done.") elif subnode == 0: # Store subnode 0 raise ILError( "The current firmware version does not have this feature implemented." ) elif subnode > 0: # Store axis self.write( reg=self.STORE_MOCO_ALL_REGISTERS, data=PASSWORD_STORE_ALL, subnode=subnode ) logger.info(f"Store axis {subnode} successfully done.") else: raise ILError("Invalid subnode.") finally: time.sleep(1.5)
[docs] def enable(self, subnode=1, timeout=DEFAULT_PDS_TIMEOUT): """Enable PDS. Args: subnode (int): Subnode of the drive. timeout (int): Timeout in milliseconds. Raises: ILTimeoutError: The servo could not be enabled due to timeout. ILError: Failed to enable PDS. """ r = 0 # Try fault reset if faulty if self.get_state(subnode) in [ SERVO_STATE.FAULT, SERVO_STATE.FAULTR, ]: self.fault_reset(subnode=subnode) while self.get_state(subnode) != SERVO_STATE.ENABLED: # Read the current state state = self.get_state(subnode) # Check state and command action to reach enabled cmd = constants.IL_MC_PDS_CMD_EO if state == SERVO_STATE.FAULT: raise ILStateError(None) elif state == SERVO_STATE.NRDY: cmd = constants.IL_MC_PDS_CMD_DV elif state == SERVO_STATE.DISABLED: cmd = constants.IL_MC_PDS_CMD_SD elif state == SERVO_STATE.RDY: cmd = constants.IL_MC_PDS_CMD_SOEO self.write(self.CONTROL_WORD_REGISTERS, cmd, subnode=subnode) # Wait for state change r = self.state_wait_change(state, timeout, subnode=subnode) if r < 0: raise_err(r) raise_err(r)
[docs] def disable(self, subnode=1, timeout=DEFAULT_PDS_TIMEOUT): """Disable PDS. Args: subnode (int): Subnode of the drive. timeout (int): Timeout in milliseconds. Raises: ILTimeoutError: The servo could not be disabled due to timeout. ILError: Failed to disable PDS. """ r = 0 while self.get_state(subnode) != SERVO_STATE.DISABLED: state = self.get_state(subnode) if state in [ SERVO_STATE.FAULT, SERVO_STATE.FAULTR, ]: # Try fault reset if faulty self.fault_reset(subnode=subnode) elif state != SERVO_STATE.DISABLED: # Check state and command action to reach disabled self.write(self.CONTROL_WORD_REGISTERS, constants.IL_MC_PDS_CMD_DV, subnode=subnode) # Wait until state changes r = self.state_wait_change(state, timeout, subnode=subnode) if r < 0: raise_err(r) raise_err(r)
[docs] def fault_reset(self, subnode=1, timeout=DEFAULT_PDS_TIMEOUT): """Executes a fault reset on the drive. Args: subnode (int): Subnode of the drive. timeout (int): Timeout in milliseconds. Raises: ILTimeoutError: If fault reset spend too much time. ILError: Failed to fault reset. """ r = 0 state = self.get_state(subnode=subnode) if state in [ SERVO_STATE.FAULT, SERVO_STATE.FAULTR, ]: # Check if faulty, if so try to reset (0->1) self.write(self.CONTROL_WORD_REGISTERS, 0, subnode=subnode) self.write(self.CONTROL_WORD_REGISTERS, constants.IL_MC_CW_FR, subnode=subnode) # Wait until status word changes r = self.state_wait_change(state, timeout, subnode=subnode) raise_err(r)
[docs] def status_word_wait_change(self, status_word, timeout, subnode=1): """Waits for a status word change. Args: status_word (int): Status word to wait for. timeout (int): Maximum value to wait for the change. subnode (int): Subnode of the drive. Returns: int: Error code. """ r = 0 start_time = int(round(time.time() * 1000)) actual_status_word = self.read(self.STATUS_WORD_REGISTERS, subnode=subnode) while actual_status_word == status_word: current_time = int(round(time.time() * 1000)) time_diff = current_time - start_time if time_diff > timeout: return OPERATION_TIME_OUT actual_status_word = self.read(self.STATUS_WORD_REGISTERS, subnode=subnode) return r
[docs] def state_wait_change(self, state, timeout, subnode=1): """Waits for a state change. Args: state (SERVO_STATE): Servo state to wait for. timeout (int): Maximum value to wait for the change. subnode (int): Subnode of the drive. Returns: int: Error code. """ r = 0 start_time = int(round(time.time() * 1000)) actual_state = self.get_state(subnode) while actual_state == state: current_time = int(round(time.time() * 1000)) time_diff = current_time - start_time if time_diff > timeout: return OPERATION_TIME_OUT actual_state = self.get_state(subnode) return r
[docs] def get_state(self, subnode=1): """SERVO_STATE: Current drive state.""" status_word = self.read(self.STATUS_WORD_REGISTERS, subnode=subnode) state = self.status_word_decode(status_word) return state
[docs] @staticmethod def status_word_decode(status_word): """Decodes the status word to a known value. Args: status_word (int): Read value for the status word. Returns: SERVO_STATE: Status word value. """ if (status_word & constants.IL_MC_PDS_STA_NRTSO_MSK) == constants.IL_MC_PDS_STA_NRTSO: state = SERVO_STATE.NRDY elif (status_word & constants.IL_MC_PDS_STA_SOD_MSK) == constants.IL_MC_PDS_STA_SOD: state = SERVO_STATE.DISABLED elif (status_word & constants.IL_MC_PDS_STA_RTSO_MSK) == constants.IL_MC_PDS_STA_RTSO: state = SERVO_STATE.RDY elif (status_word & constants.IL_MC_PDS_STA_SO_MSK) == constants.IL_MC_PDS_STA_SO: state = SERVO_STATE.ON elif (status_word & constants.IL_MC_PDS_STA_OE_MSK) == constants.IL_MC_PDS_STA_OE: state = SERVO_STATE.ENABLED elif (status_word & constants.IL_MC_PDS_STA_QSA_MSK) == constants.IL_MC_PDS_STA_QSA: state = SERVO_STATE.QSTOP elif (status_word & constants.IL_MC_PDS_STA_FRA_MSK) == constants.IL_MC_PDS_STA_FRA: state = SERVO_STATE.FAULTR elif (status_word & constants.IL_MC_PDS_STA_F_MSK) == constants.IL_MC_PDS_STA_F: state = SERVO_STATE.FAULT else: state = SERVO_STATE.NRDY return state
[docs] def monitoring_enable(self): """Enable monitoring process.""" self.write(self.MONITORING_DIST_ENABLE, data=1, subnode=0)
[docs] def monitoring_disable(self): """Disable monitoring process.""" self.write(self.MONITORING_DIST_ENABLE, data=0, subnode=0)
[docs] def monitoring_remove_data(self): """Remove monitoring data.""" self.write(self.MONITORING_REMOVE_DATA, data=1, subnode=0)
[docs] def monitoring_set_mapped_register(self, channel, address, subnode, dtype, size): """Set monitoring mapped register. Args: channel (int): Identity channel number. address (int): Register address to map. subnode (int): Subnode to be targeted. dtype (int): Register data type. size (int): Size of data in bytes. """ self.__monitoring[channel] = {"size": size, "dtype": REG_DTYPE(dtype), "processed_data": []} data = self._monitoring_disturbance_data_to_map_register(subnode, address, dtype, size) self.write(self.__monitoring_map_register(), data=data, subnode=0) self.__monitoring_update_num_mapped_registers()
[docs] def monitoring_get_num_mapped_registers(self): """Obtain the number of monitoring mapped registers. Returns: int: Actual number of mapped registers. """ return self.read(self.MONITORING_NUMBER_MAPPED_REGISTERS, 0)
[docs] def monitoring_get_bytes_per_block(self): """Obtain Bytes x Block configured. Returns: int: Actual number of Bytes x Block configured. """ return self.read(self.MONITORING_BYTES_PER_BLOCK, subnode=0)
[docs] def monitoring_remove_all_mapped_registers(self): """Remove all monitoring mapped registers.""" self.write(self.MONITORING_NUMBER_MAPPED_REGISTERS, data=0, subnode=0) self.__monitoring = {}
[docs] def monitoring_actual_number_bytes(self): """Get the number of monitoring bytes left to be read.""" return self.read(self.MONITORING_ACTUAL_NUMBER_BYTES, subnode=0)
[docs] def monitoring_read_data(self): """Obtain processed monitoring data. Returns: array: Actual processed monitoring data. """ num_available_bytes = self.monitoring_actual_number_bytes() monitoring_data = [] while num_available_bytes > 0: if num_available_bytes < MONITORING_BUFFER_SIZE: limit = num_available_bytes else: limit = MONITORING_BUFFER_SIZE tmp_data = self._monitoring_read_data()[:limit] monitoring_data.append(tmp_data) num_available_bytes = self.monitoring_actual_number_bytes() self.__monitoring_process_data(monitoring_data)
[docs] def monitoring_channel_data(self, channel, dtype=None): """Obtain processed monitoring data of a channel. Args: channel (int): Identity channel number. dtype (REG_DTYPE): Data type of the register to map. Note: The dtype argument is not necessary for this function, it was added to maintain compatibility with IPB's implementation of monitoring. Returns: List: Monitoring data. """ return self.__monitoring[channel]["processed_data"]
[docs] def disturbance_enable(self): """Enable disturbance process.""" self.write(self.DISTURBANCE_ENABLE, data=1, subnode=0)
[docs] def disturbance_disable(self): """Disable disturbance process.""" self.write(self.DISTURBANCE_ENABLE, data=0, subnode=0)
[docs] def disturbance_remove_data(self): """Remove disturbance data.""" self.write(self.DISTURBANCE_REMOVE_DATA, data=1, subnode=0) self.disturbance_data = bytearray()
[docs] def disturbance_set_mapped_register(self, channel, address, subnode, dtype, size): """Set monitoring mapped register. Args: channel (int): Identity channel number. address (int): Register address to map. subnode (int): Subnode to be targeted. dtype (int): Register data type. size (int): Size of data in bytes. """ self.__disturbance[channel] = {"size": size, "dtype": REG_DTYPE(dtype).name} data = self._monitoring_disturbance_data_to_map_register(subnode, address, dtype, size) self.write(self.__disturbance_map_register(), data=data, subnode=0) self.__disturbance_update_num_mapped_registers()
[docs] def disturbance_get_num_mapped_registers(self): """Obtain the number of disturbance mapped registers. Returns: int: Actual number of mapped registers. """ return self.read(self.DISTURBANCE_NUMBER_MAPPED_REGISTERS, 0)
[docs] def disturbance_remove_all_mapped_registers(self): """Remove all disturbance mapped registers.""" self.write(self.DISTURBANCE_NUMBER_MAPPED_REGISTERS, data=0, subnode=0) self.__disturbance = {"data": bytearray()}
[docs] def subscribe_to_status(self, callback): """Subscribe to state changes. Args: callback (function): Callback function. Returns: int: Assigned slot. """ if callback in self.__observers_servo_state: logger.info("Callback already subscribed.") return self.__observers_servo_state.append(callback)
[docs] def unsubscribe_from_status(self, callback): """Unsubscribe from state changes. Args: callback (function): Callback function. """ if callback not in self.__observers_servo_state: logger.info("Callback not subscribed.") return self.__observers_servo_state.remove(callback)
[docs] def is_alive(self): """Checks if the servo responds to a reading a register. Returns: bool: Return code with the result of the read. """ _is_alive = True try: self.read(self.STATUS_WORD_REGISTERS) except ILError as e: _is_alive = False logger.error(e) return _is_alive
[docs] def reload_errors(self, dictionary): """Force to reload all dictionary errors. Args: dictionary (str): Dictionary. """ pass
def _get_reg(self, reg, subnode=1): """Validates a register. Args: reg (Register): Targeted register to validate. subnode (int): Subnode for the register. Returns: Register: Instance of the desired register from the dictionary. Raises: ValueError: If the dictionary is not loaded. ILWrongRegisterError: If the register has invalid format. """ if isinstance(reg, Register): return reg elif isinstance(reg, str): _dict = self.dictionary if not _dict: raise ValueError("No dictionary loaded") if reg not in _dict.registers(subnode): raise ILRegisterNotFoundError(f"Register {reg} not found.") return _dict.registers(subnode)[reg] else: raise TypeError("Invalid register") def __update_register_dict(self, register, subnode): """Updates the register from a dictionary with the storage parameters. Args: register (Element): Register element to be updated. subnode (int): Target subnode. Returns: """ try: storage = self.read(register.attrib["id"], subnode=subnode) register.set("storage", str(storage)) # Update register object reg = self.dictionary.registers(subnode)[register.attrib["id"]] reg.storage = storage reg.storage_valid = 1 except BaseException as e: logger.error( "Exception during save_configuration, register %s: %s", str(register.attrib["id"]), e, ) def _notify_state(self, state, subnode): """Notify the state to the observers. Args: state (SERVO_STATE): Current servo state. subnode (int): Subnode of the drive. """ for callback in self.__observers_servo_state: callback(state, None, subnode) def __read_coco_moco_register(self, register_coco, register_moco): """Reads the COCO register and if it does not exist, reads the MOCO register Args: register_coco (str): COCO Register ID to be read. register_moco (str): MOCO Register ID to be read. Returns: (int, str): Read value of the register. """ try: return self.read(register_coco, subnode=0) except ILError: logger.warning( f"Error reading register {register_coco.identifier} from COCO. Trying MOCO" ) try: return self.read(register_moco, subnode=1) except ILError: raise ILError(f"Error reading register {register_moco.identifier} from MOCO.") def __monitoring_map_register(self): """Get the first available Monitoring Mapped Register slot. Returns: str: Monitoring Mapped Register ID. """ if self.monitoring_number_mapped_registers < 10: register_id = f"MON_CFG_REG{self.monitoring_number_mapped_registers}_MAP" else: register_id = f"MON_CFG_REFG{self.monitoring_number_mapped_registers}_MAP" return register_id @staticmethod def _monitoring_disturbance_data_to_map_register(subnode, address, dtype, size): """Arrange necessary data to map a monitoring/disturbance register. Args: subnode (int): Subnode to be targeted. address (int): Register address to map. dtype (int): Register data type. size (int): Size of data in bytes. """ data_h = address | subnode << 12 data_l = dtype << 8 | size return (data_h << 16) | data_l def __monitoring_update_num_mapped_registers(self): """Update the number of mapped monitoring registers.""" self.write( self.MONITORING_NUMBER_MAPPED_REGISTERS, data=self.monitoring_number_mapped_registers + 1, subnode=0, ) def __monitoring_process_data(self, monitoring_data): """Arrange monitoring data.""" data_bytes = bytearray() for i in range(len(monitoring_data)): data_bytes += monitoring_data[i] bytes_per_block = self.monitoring_get_bytes_per_block() number_of_blocks = len(data_bytes) // bytes_per_block number_of_channels = self.monitoring_get_num_mapped_registers() for channel in range(number_of_channels): self.__monitoring[channel]["processed_data"] = [] for block in range(number_of_blocks): block_data = data_bytes[ block * bytes_per_block : block * bytes_per_block + bytes_per_block ] for channel in range(number_of_channels): channel_data_size = self.__monitoring[channel]["size"] val = convert_bytes_to_dtype( block_data[:channel_data_size], self.__monitoring[channel]["dtype"] ) self.__monitoring[channel]["processed_data"].append(val) block_data = block_data[channel_data_size:] def __disturbance_map_register(self): """Get the first available Disturbance Mapped Register slot. Returns: str: Disturbance Mapped Register ID. """ return f"DIST_CFG_REG{self.disturbance_number_mapped_registers}_MAP" def __disturbance_update_num_mapped_registers(self): """Update the number of mapped disturbance registers.""" self.write( self.DISTURBANCE_NUMBER_MAPPED_REGISTERS, data=self.disturbance_number_mapped_registers + 1, subnode=0, ) def _disturbance_create_data_chunks(self, channels, dtypes, data_arr, max_size): """Divide disturbance data into chunks. Args: channels (int or list of int): Channel identifier. dtypes (int or list of int): Data type. data_arr (list or list of list): Data array. max_size (int): Max chunk size in bytes. """ if not isinstance(channels, list): channels = [channels] if not isinstance(dtypes, list): dtypes = [dtypes] if not isinstance(data_arr[0], list): data_arr = [data_arr] num_samples = len(data_arr[0]) self.write(self.DIST_NUMBER_SAMPLES, num_samples, subnode=0) data = bytearray() for sample_idx in range(num_samples): for channel in range(len(data_arr)): val = convert_dtype_to_bytes(data_arr[channel][sample_idx], dtypes[channel]) data += val chunks = [data[i : i + max_size] for i in range(0, len(data), max_size)] return data, chunks
[docs] def write(self, reg, data, subnode=1): """Writes a data to a target register. Args: reg (Register, str): Target register to be written. data (int, str, float): Data to be written. subnode (int): Target axis of the drive. Raises: ILAccessError: Wrong access to the register. ILIOError: Error reading the register. """ _reg = self._get_reg(reg, subnode) if _reg.access == REG_ACCESS.RO: raise ILAccessError("Register is Read-only") value = convert_dtype_to_bytes(data, _reg.dtype) self._write_raw(_reg, value)
[docs] def read(self, reg, subnode=1): """Read a register value from servo. Args: reg (str, Register): Register. subnode (int): Target axis of the drive. Returns: int, float or str: Value stored in the register. Raises: ILAccessError: Wrong access to the register. ILIOError: Error writing the register. """ _reg = self._get_reg(reg, subnode) access = _reg.access if access == REG_ACCESS.WO: raise ILAccessError("Register is Write-only") raw_read = self._read_raw(_reg) value = convert_bytes_to_dtype(raw_read, _reg.dtype) return value
[docs] def replace_dictionary(self, dictionary): """Deletes and creates a new instance of the dictionary. Args: dictionary (str): Path to the dictionary. """ self._dictionary = self.DICTIONARY_CLASS(dictionary)
[docs] def disturbance_write_data(self, channels, dtypes, data_arr): """Write disturbance data. Args: channels (int or list of int): Channel identifier. dtypes (int or list of int): Data type. data_arr (list or list of list): Data array. """ data, chunks = self._disturbance_create_data_chunks( channels, dtypes, data_arr, self.MAX_WRITE_SIZE ) for chunk in chunks: self._write_raw(self.DIST_DATA, data=chunk) self.disturbance_data = data
def _monitoring_read_data(self): """Read monitoring data frame.""" return self._read_raw(self.MONITORING_DATA) @abstractmethod def _write_raw(self, reg, data): """Write raw bytes to a target register. Args: reg (Register): Target register to be written. data (bytearray): Data to be written. subnode (int): Target axis of the drive. Raises: ILIOError: Error writing the register. """ raise NotImplementedError @abstractmethod def _read_raw(self, reg): """Read raw bytes from a target register. Args: reg (Register): Register. Returns: bytearray: Raw bytes reading from servo. Raises: ILIOError: Error reading the register. """ raise NotImplementedError @property def dictionary(self): """Returns dictionary object""" return self._dictionary @property def full_name(self): """str: Drive full name.""" return self.__full_name @full_name.setter def full_name(self, new_name): self.__full_name = new_name @property def status(self): """dict: Servo status.""" status = {subnode: self.get_state(subnode) for subnode in range(1, self.subnodes)} return status @property def subnodes(self): """int: Number of subnodes.""" return self.dictionary.subnodes @property def errors(self): """dict: Errors.""" return self.dictionary.errors.errors @property def info(self): """dict: Servo information.""" serial_number = self.__read_coco_moco_register( self.SERIAL_NUMBER_REGISTERS[0], self.SERIAL_NUMBER_REGISTERS[1] ) sw_version = self.__read_coco_moco_register( self.SOFTWARE_VERSION_REGISTERS[0], self.SOFTWARE_VERSION_REGISTERS[1] ) product_code = self.__read_coco_moco_register( self.PRODUCT_ID_REGISTERS[0], self.PRODUCT_ID_REGISTERS[1] ) revision_number = self.__read_coco_moco_register( self.REVISION_NUMBER_REGISTERS[0], self.REVISION_NUMBER_REGISTERS[1] ) hw_variant = "A" return { "name": self.name, "serial_number": serial_number, "firmware_version": sw_version, "product_code": product_code, "revision_number": revision_number, "hw_variant": hw_variant, } @property def monitoring_number_mapped_registers(self): """Get the number of mapped monitoring registers.""" return self.read(self.MONITORING_NUMBER_MAPPED_REGISTERS, subnode=0) @property def monitoring_data_size(self): """Obtain monitoring data size. Returns: int: Current monitoring data size in bytes. """ number_of_samples = self.read("MON_CFG_WINDOW_SAMP", subnode=0) return self.monitoring_get_bytes_per_block() * number_of_samples @property def disturbance_data(self): """Obtain disturbance data. Returns: array: Current disturbance data. """ return self.__disturbance["data"] @disturbance_data.setter def disturbance_data(self, value): """Set disturbance data. Args: value (array): Array with the disturbance to send. """ self.__disturbance["data"] = value @property def disturbance_data_size(self): """Obtain disturbance data size. Returns: int: Current disturbance data size. """ return len(self.__disturbance["data"]) @property def disturbance_number_mapped_registers(self): """Get the number of mapped disturbance registers.""" return self.read(self.DISTURBANCE_NUMBER_MAPPED_REGISTERS, subnode=0)