Source code for ingenialink.canopen.network

import contextlib
import os
import re
import tempfile
from collections import defaultdict
from enum import Enum
from threading import Thread
from time import sleep
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

import canopen
import ingenialogger
from can import CanError
from can.interfaces.ixxat.exceptions import VCIError
from canopen import Network as NetworkLib

from ingenialink.canopen.register import CanopenRegister
from ingenialink.canopen.servo import (
    CANOPEN_SDO_RESPONSE_TIMEOUT,
    REG_ACCESS,
    REG_DTYPE,
    CanopenServo,
)
from ingenialink.exceptions import ILError, ILFirmwareLoadError, ILObjectNotExist
from ingenialink.network import NET_DEV_EVT, NET_PROT, NET_STATE, Network
from ingenialink.utils.mcb import MCB

logger = ingenialogger.get_logger(__name__)

PROG_STAT_1 = CanopenRegister(
    idx=0x1F51,
    subidx=0x01,
    cyclic="CONFIG",
    dtype=REG_DTYPE.U8,
    access=REG_ACCESS.RW,
    identifier="CIA302_BL_PROGRAM_CONTROL_1",
    subnode=0,
)
PROG_DL_1 = CanopenRegister(
    idx=0x1F50,
    subidx=0x01,
    cyclic="CONFIG",
    dtype=REG_DTYPE.DOMAIN,
    access=REG_ACCESS.RW,
    identifier="CIA302_BL_PROGRAM_DATA",
    subnode=0,
)
FORCE_BOOT = CanopenRegister(
    idx=0x5EDE,
    subidx=0x00,
    cyclic="CONFIG",
    dtype=REG_DTYPE.U32,
    access=REG_ACCESS.WO,
    identifier="DRV_BOOT_COCO_FORCE",
    subnode=0,
)

CIA301_DRV_ID_DEVICE_TYPE = CanopenRegister(
    idx=0x1000,
    subidx=0x00,
    cyclic="CONFIG",
    dtype=REG_DTYPE.U32,
    access=REG_ACCESS.RO,
    identifier="",
    subnode=0,
)

CANOPEN_BOTT_NODE_GUARDING_PERIOD = 5
CANOPEN_SEND_FW_SDO_RESPONSE_TIMEOUT = 10  # Seconds
BOOTLOADER_MSG_SIZE = 256  # Size in Bytes
RECONNECTION_TIMEOUT = 180  # Seconds
POLLING_MAX_TRIES = 5  # Seconds

PROG_CTRL_STATE_STOP = 0x00
PROG_CTRL_STATE_START = 0x01
PROG_CTRL_STATE_CLEAR = 0x03
PROG_CTRL_STATE_FLASH = 0x80

APPLICATION_LOADED_STATE = 402

CAN_CHANNELS: Dict[str, Union[Tuple[int, int], Tuple[str, str]]] = {
    "kvaser": (0, 1),
    "pcan": ("PCAN_USBBUS1", "PCAN_USBBUS2"),
    "ixxat": (0, 1),
    "virtual": (0, 1),
}


[docs]class CAN_DEVICE(Enum): """CAN Device.""" KVASER = "kvaser" PCAN = "pcan" IXXAT = "ixxat" VIRTUAL = "virtual"
[docs]class CAN_BAUDRATE(Enum): """Baudrates.""" Baudrate_1M = 1000000 """1 Mbit/s""" Baudrate_500K = 500000 """500 Kbit/s""" Baudrate_250K = 250000 """250 Kbit/s""" Baudrate_125K = 125000 """125 Kbit/s""" Baudrate_100K = 100000 """100 Kbit/s""" Baudrate_50K = 50000 """50 Kbit/s"""
CAN_BIT_TIMMING = { CAN_BAUDRATE.Baudrate_1M: 0, CAN_BAUDRATE.Baudrate_500K: 2, CAN_BAUDRATE.Baudrate_250K: 3, CAN_BAUDRATE.Baudrate_125K: 4, CAN_BAUDRATE.Baudrate_100K: 5, CAN_BAUDRATE.Baudrate_50K: 6, }
[docs]class NetStatusListener(Thread): """Network status listener thread to check if the drive is alive. Args: network: Network instance of the CANopen communication. """ def __init__(self, network: "CanopenNetwork"): super(NetStatusListener, self).__init__() self.__network = network self.__stop = False
[docs] def run(self) -> None: timestamps = {} if self.__network._connection is None: return while not self.__stop: for node_id, node in list(self.__network._connection.nodes.items()): sleep(1.5) current_timestamp = node.nmt.timestamp if node_id not in timestamps: timestamps[node_id] = current_timestamp continue is_alive = current_timestamp != timestamps[node_id] servo_state = self.__network._get_servo_state(node_id) if is_alive: if servo_state != NET_STATE.CONNECTED: self.__network._notify_status(node_id, NET_DEV_EVT.ADDED) self.__network._set_servo_state(node_id, NET_STATE.CONNECTED) timestamps[node_id] = node.nmt.timestamp elif servo_state == NET_STATE.DISCONNECTED: self.__network._reset_connection() else: self.__network._notify_status(node_id, NET_DEV_EVT.REMOVED) self.__network._set_servo_state(node_id, NET_STATE.DISCONNECTED)
def stop(self) -> None: self.__stop = True
[docs]class CanopenNetwork(Network): """Network of the CANopen communication. Args: device: Targeted device to connect. channel: Targeted channel number of the transceiver. baudrate: Baudrate to communicate through. """ def __init__( self, device: CAN_DEVICE, channel: int = 0, baudrate: CAN_BAUDRATE = CAN_BAUDRATE.Baudrate_1M, ): super(CanopenNetwork, self).__init__() self.servos: List[CanopenServo] = [] self.__device = device.value self.__channel: Union[int, str] = CAN_CHANNELS[self.__device][channel] self.__baudrate = baudrate.value self._connection: Optional[NetworkLib] = None self.__net_state = NET_STATE.DISCONNECTED self.__servos_state: Dict[int, NET_STATE] = {} self.__listener_net_status: Optional[NetStatusListener] = None self.__observers_net_state: Dict[int, List[Callable[[NET_DEV_EVT], Any]]] = defaultdict( list ) self.__observers_fw_load_status_msg: List[Callable[[str], Any]] = [] self.__observers_fw_load_progress: List[Callable[[int], Any]] = [] self.__observers_fw_load_errors_enabled: List[Callable[[bool], Any]] = [] self.__fw_load_status_msg = "" self.__fw_load_progress = 0 self.__fw_load_errors_enabled = True
[docs] def scan_slaves(self) -> List[int]: """Scans for nodes in the network. Returns: Containing all the detected node IDs. """ is_connection_created = False if self._connection is None: is_connection_created = True try: self._setup_connection() except ILError: self._teardown_connection() return [] if self._connection is None: return [] self._connection.scanner.reset() try: self._connection.scanner.search() except Exception as e: logger.error("Error searching for nodes. Exception: {}".format(e)) logger.info("Resetting bus") if ( self._connection is not None and self._connection.bus is not None and hasattr(self._connection.bus, "reset") ): self._connection.bus.reset() sleep(0.05) nodes = self._connection.scanner.nodes if is_connection_created: self._teardown_connection() return nodes # type: ignore [no-any-return]
[docs] def connect_to_slave( # type: ignore [override] self, target: int, dictionary: str, servo_status_listener: bool = False, net_status_listener: bool = False, ) -> CanopenServo: """Connects to a drive through a given target node ID. Args: target: Targeted node ID to be connected. dictionary: Path to the dictionary file. servo_status_listener: Toggle the listener of the servo for its status, errors, faults, etc. net_status_listener: Toggle the listener of the network status, connection and disconnection. """ nodes = self.scan_slaves() if len(nodes) < 1: raise ILError("Could not find any nodes in the network") self._setup_connection() if self._connection is None: raise ILError("Connection has not been established") if target in nodes: try: node = self._connection.add_node(target) node.nmt.start_node_guarding(1) servo = CanopenServo( target, node, dictionary, servo_status_listener=servo_status_listener ) self.servos.append(servo) self._set_servo_state(target, NET_STATE.CONNECTED) if net_status_listener: self.start_status_listener() return servo except Exception as e: logger.error("Failed connecting to node %i. Exception: %s", target, e) raise ILError( "Failed connecting to node {}. " "Please check the connection settings and verify " "the transceiver is properly connected.".format(target) ) else: logger.error("Node id not found") raise ILError("Node id {} not found in the network.".format(target))
[docs] def disconnect_from_slave(self, servo: CanopenServo) -> None: # type: ignore [override] """Disconnects the slave from the network. Args: servo: Instance of the servo connected. """ self.stop_status_listener() servo.stop_status_listener() self.servos.remove(servo) if not self.servos: self._teardown_connection()
def _setup_connection(self) -> None: """Creates a network interface object establishing an empty connection with all the network attributes already specified.""" if self._connection is None: self._connection = canopen.Network() try: self._connection.connect( bustype=self.__device, channel=self.__channel, bitrate=self.__baudrate ) except CanError as e: logger.error("Transceiver not found in network. Exception: %s", e) raise ILError( "Error connecting to the transceiver. " "Please verify the transceiver " "is properly connected." ) except OSError as e: logger.error("Transceiver drivers not properly installed. Exception: %s", e) if hasattr(e, "winerror") and e.winerror == 126: e.strerror = "Driver module not found. Drivers might not be properly installed." raise ILError(e) except Exception as e: logger.error("Failed trying to connect. Exception: %s", e) raise ILError("Failed trying to connect. {}".format(e)) else: logger.info("Connection already established") def _teardown_connection(self) -> None: """Tears down the already established connection and deletes the network interface""" if self._connection is None: logger.warning("Can not disconnect. The connection is not established yet.") return self._connection.disconnect() self._connection = None logger.info("Tear down connection.") def _reset_connection(self) -> None: """Resets the established CANopen network. Raises: ILError: If the connection was not established yet. """ if self._connection is None: raise ILError("Can not reset connection. The connection is not established yet.") try: self._connection.disconnect() except BaseException as e: logger.error("Disconnection failed. Exception: %", e) try: for node in self._connection.scanner.nodes: self._connection.nodes[node].nmt.stop_node_guarding() if self._connection.bus: self._connection.bus.flush_tx_buffer() logger.info("Bus flushed") except Exception as e: logger.error("Could not stop guarding. Exception: %", e) try: self._connection.connect( bustype=self.__device, channel=self.__channel, bitrate=self.__baudrate ) for servo in self.servos: node = self._connection.add_node(servo.target) node.nmt.start_node_guarding(1) except BaseException as e: logger.error("Connection failed. Exception: %s", e)
[docs] def load_firmware( # type: ignore [override] self, target: int, fw_file: str, callback_status_msg: Optional[Callable[[str], None]] = None, callback_progress: Optional[Callable[[int], None]] = None, callback_errors_enabled: Optional[Callable[[bool], None]] = None, ) -> None: """Loads a given firmware file to a target. .. warning :: It is needed to disconnect the drive(:func:`disconnect_from_slave`) after loading the firmware since the `Servo` object's data will become obsolete. Args: target: Targeted node ID to be loaded. fw_file: Path to the firmware file. callback_status_msg: Subscribed callback function for the status message when loading a firmware. callback_progress: Subscribed callback function for the live progress when loading a firmware. callback_errors_enabled: Subscribed callback function for knowing when to toggle the error detection when loading firmware. Raises: FileNotFoundError: Firmware file does not exist. ILFirmwareLoadError: The firmware load process fails with an error message. """ servo = self.__load_fw_checks(target, fw_file, callback_status_msg) start_network_listener_after_reconnect = self.is_listener_started() start_servo_listener_after_reconnect = servo.is_listener_started() self.stop_status_listener() servo.stop_status_listener() initial_status = servo.read(PROG_STAT_1, subnode=0) _, file_extension = os.path.splitext(fw_file) if file_extension == ".sfu": fw_file = self.__optimize_firmware_file(fw_file, callback_status_msg) self.__force_boot(servo, callback_status_msg) self.__program_control_to_flash(int(initial_status), servo, callback_status_msg) try: self.__send_fw_file(fw_file, servo, callback_status_msg, callback_progress) self.__program_control_to_stop(servo, callback_status_msg) self.__program_control_to_start(servo, callback_status_msg) finally: if file_extension == ".sfu": os.remove(fw_file) logger.info("Bootloader finished successfully!") if callback_status_msg: callback_status_msg("Bootloader finished successfully!") if start_network_listener_after_reconnect: self.start_status_listener() if start_servo_listener_after_reconnect: servo.start_status_listener()
def __load_fw_checks( self, target: int, fw_file: str, callback_status_msg: Optional[Callable[[str], None]] = None ) -> CanopenServo: """Checks prior to firmware upload and return the target CanopenServo instance Args: target: Targeted node ID to be loaded. fw_file: Path to the firmware file. callback_status_msg: Subscribed callback function for the status message Returns: Target servo Raises: FileNotFoundError: Firmware file does not exist. ILFirmwareLoadError: The drive is not connected. ILFirmwareLoadError: Firmware and bootloader versions are not compatible. """ if not os.path.isfile(fw_file): raise FileNotFoundError(f"Could not find {fw_file}.") servo = None for connected_servo in self.servos: if connected_servo.target == target: servo = connected_servo if not servo: raise ILFirmwareLoadError(f"Node {target} is not connected.") logger.info("Checking compatibility") if callback_status_msg: callback_status_msg("Checking compatibility") try: servo.read(PROG_STAT_1, subnode=0) except ILError as e: raise ILFirmwareLoadError( "Firmware and bootloader versions are not compatible. Use FTP Bootloader instead." ) from e return servo def __force_boot( self, servo: CanopenServo, callback_status_msg: Optional[Callable[[str], None]] ) -> None: """Force boot, if drive is already in boot, do nothing Args: servo: target drive callback_status_msg: Subscribed callback function for the status message Raises: ILError: If the connection was not established yet. """ if self._connection is None: raise ILError("Can not force boot. The connection is not established yet.") device_type = int(servo.read(CIA301_DRV_ID_DEVICE_TYPE, subnode=0)) device_type = device_type & 0xFFFF if device_type == APPLICATION_LOADED_STATE: if callback_status_msg: callback_status_msg("Entering Bootmode") logger.info("Entering Bootmode") # Drive profile # Enter in NMT pre-operational state. self._connection.nmt.send_command(PROG_CTRL_STATE_FLASH) # The drive will unlock the clear program command password = 0x70636675 servo.write(FORCE_BOOT, password, subnode=0) def __program_control_to_flash( self, initial_status: int, servo: CanopenServo, callback_status_msg: Optional[Callable[[str], None]], ) -> None: """Change program control status to flash. Args: initial_status: program control initial status servo: target drive callback_status_msg: Subscribed callback function for the status message Raises: ILFirmwareLoadError: Program control status does not change """ if callback_status_msg: callback_status_msg("Setting up drive") target_status_list = [ PROG_CTRL_STATE_START, PROG_CTRL_STATE_STOP, PROG_CTRL_STATE_CLEAR, PROG_CTRL_STATE_FLASH, ] status_index = target_status_list.index(initial_status) + 1 logger.info("Clearing program...") for target_status in target_status_list[status_index:]: servo.write(PROG_STAT_1, target_status, subnode=0) if not self.__wait_for_register_value(servo, 0, PROG_STAT_1, target_status): raise ILFirmwareLoadError(f"Error setting program control to 0x{target_status:X}") @staticmethod def __wait_for_register_value( servo: CanopenServo, subnode: int, register: CanopenRegister, expected_value: Union[int, float, str], ) -> bool: """Waits for the register to reach a value. Args: servo: Instance of the servo to be used. subnode: Target subnode. register: Register to be read. expected_value: Expected value for the given register. Returns: True if values is reached, else False """ logger.debug(f"Waiting for register {register} to return <{expected_value}>") num_tries = 0 value = None while num_tries < POLLING_MAX_TRIES: with contextlib.suppress(ILError): value = servo.read(register, subnode=subnode) if value == expected_value: logger.debug(f"Success. Read value {value}. Num tries {num_tries}") return True num_tries += 1 logger.debug(f"Trying again {num_tries}. value {value}.") sleep(1) return False @staticmethod def __program_control_to_stop( servo: CanopenServo, callback_status_msg: Optional[Callable[[str], None]] ) -> None: """Change program control status to stop. Args: servo: target drive callback_status_msg: Subscribed callback function for the status message Raises: ILFirmwareLoadError: Drive does not respond """ if callback_status_msg: callback_status_msg("Flashing firmware") logger.info("Flashing firmware") with contextlib.suppress(ILError): servo.write(PROG_STAT_1, PROG_CTRL_STATE_STOP, subnode=0) try: servo.node.nmt.start_node_guarding(CANOPEN_BOTT_NODE_GUARDING_PERIOD) except VCIError as e: # This error is a specific error for ixxat transceivers raise ILFirmwareLoadError("An error occurred when starting the node guarding.") from e try: servo.node.nmt.wait_for_heartbeat(timeout=RECONNECTION_TIMEOUT) except canopen.nmt.NmtError as e: raise ILFirmwareLoadError("Could not recover drive") from e finally: servo.node.nmt.stop_node_guarding() @staticmethod def __program_control_to_start( servo: CanopenServo, callback_status_msg: Optional[Callable[[str], None]] ) -> None: """Change program control status to start. Args: servo: target drive callback_status_msg: Subscribed callback function for the status message Raises: ILFirmwareLoadError: Drive does not respond """ if callback_status_msg: callback_status_msg("Starting program") logger.info("Starting program") with contextlib.suppress(ILError): servo.write(PROG_STAT_1, PROG_CTRL_STATE_START, subnode=0) try: servo.node.nmt.wait_for_bootup(timeout=RECONNECTION_TIMEOUT) except canopen.nmt.NmtError as e: raise ILFirmwareLoadError("Could not recover drive") from e @staticmethod def __optimize_firmware_file( sfu_file: str, callback_status_msg: Optional[Callable[[str], None]] ) -> str: """Convert SFU file to LFU to optimize the firmware loading. Args: sfu_file: target SFU file callback_status_msg: Subscribed callback function for the status message Returns: LFU file path """ mcb = MCB() with open(sfu_file, "r") as temp_file: total_file_lines = sum(1 for _ in temp_file) # Convert the sfu file to lfu logger.info("Converting sfu to lfu...") logger.info("Optimizing file") if callback_status_msg: callback_status_msg("Optimizing file") lfu_file_d, lfu_path = tempfile.mkstemp(suffix=".lfu", text=True) os.close(lfu_file_d) with open(lfu_path, "wb") as lfu_file: with open(sfu_file, "r") as coco_in: bin_node = "" current_progress = 0 node = 10 for copy_process, line in enumerate(coco_in): if re.match(r"74 67 [0-4][0-4] 00 00 00 00 00 00 00", line) is not None: bin_node = line[6:8] newline = f"{bin_node} {line}" words = newline.split() # Get command and address subnode = int(words[0], 16) cmd = int(words[2] + words[1], 16) data = b"" num = 3 while num in range(3, len(words)): # load data MCB data += bytes([int(words[num], 16)]) num += 1 # send message mcb.add_cmd(node, subnode, cmd, data, lfu_file) new_progress = int(copy_process * 100 / total_file_lines) if new_progress != current_progress: current_progress = new_progress logger.info(f"Optimizing firmware file in progress: {current_progress}%") logger.info("Converted to lfu") return lfu_path @staticmethod def __send_fw_file( fw_file: str, servo: CanopenServo, callback_status_msg: Optional[Callable[[str], None]], callback_progress: Optional[Callable[[int], None]], ) -> None: """Send firmware file to drive with SDOs Args: fw_file: target firmware file servo: target drive callback_status_msg: Subscribed callback function for the status message callback_progress: Subscribed callback function for the live progress. Raises: ILFirmwareLoadError: Drive does not respond """ total_file_size = os.path.getsize(fw_file) / BOOTLOADER_MSG_SIZE servo._change_sdo_timeout(CANOPEN_SEND_FW_SDO_RESPONSE_TIMEOUT) logger.info("Downloading firmware") if callback_status_msg: callback_status_msg("Downloading firmware") counter = 0 progress = 0 with open(fw_file, "rb") as image: byte = image.read(BOOTLOADER_MSG_SIZE) while byte: try: servo.write(PROG_DL_1, byte, subnode=0) except ILError as e: raise ILFirmwareLoadError( "An error occurred while downloading. Check firmware file is correct." ) from e counter += 1 new_progress = int(counter * 100 / total_file_size) if progress != new_progress: progress = new_progress logger.info(f"Download firmware in progress: {progress}%") if callback_progress: callback_progress(progress) byte = image.read(BOOTLOADER_MSG_SIZE) logger.info("Download Finished!") servo._change_sdo_timeout(CANOPEN_SDO_RESPONSE_TIMEOUT) def __set_fw_load_status_msg(self, new_value: str) -> None: """Updates the fw_load_status_msg value and triggers all the callbacks associated. Args: new_value: New value for the variable. """ self.__fw_load_status_msg = new_value for callback in self.__observers_fw_load_status_msg: callback(new_value) def __set_fw_load_progress(self, new_value: int) -> None: """Updates the fw_load_progress value and triggers all the callbacks associated. Args: new_value: New value for the variable. """ self.__fw_load_progress = new_value for callback in self.__observers_fw_load_progress: callback(new_value) def __set_fw_load_errors_enabled(self, new_value: bool) -> None: """Updates the fw_load_errors_enabled value and triggers all the callbacks associated. Args: new_value: New value for the variable. """ self.__fw_load_errors_enabled = new_value for callback in self.__observers_fw_load_errors_enabled: callback(new_value)
[docs] def change_baudrate( self, target_node: int, new_target_baudrate: CAN_BAUDRATE, vendor_id: int, product_code: int, rev_number: int, serial_number: int, ) -> None: """Changes the node ID of a given target node ID. .. note:: The servo must be disconnected after this operation in order to make the changes visible and update all the internal data. It is also needed a power cycle of the servo otherwise the changes will not be applied. Args: target_node: Node ID of the targeted device. new_target_baudrate: New baudrate for the targeted device. vendor_id: Vendor ID of the targeted device. product_code: Product code of the targeted device. rev_number: Revision number of the targeted device. serial_number: Serial number of the targeted device. """ if self._connection is None: raise ILObjectNotExist("CAN connection was not established.") try: logger.debug("Switching LSS into CONFIGURATION state...") r = self._connection.lss.send_switch_state_selective( vendor_id, product_code, rev_number, serial_number, ) if r >= 0: self._connection.lss.configure_bit_timing(CAN_BIT_TIMMING[new_target_baudrate]) sleep(0.1) self._lss_store_configuration() else: raise ILError("Error switching lss to selective state. Error code: {}".format(r)) finally: self._lss_reset_connection_nodes(target_node) logger.info("Baudrate changed to {}".format(new_target_baudrate))
[docs] def change_node_id( self, target_node: int, new_target_node: int, vendor_id: int, product_code: int, rev_number: int, serial_number: int, ) -> None: """Changes the node ID of a given target node ID. .. note:: The servo must be disconnected after this operation in order to make the changes visible and update all the internal data. Args: target_node: Node ID of the targeted device. new_target_node: New node ID for the targeted device. vendor_id: Vendor ID of the targeted device. product_code: Product code of the targeted device. rev_number: Revision number of the targeted device. serial_number: Serial number of the targeted device. """ if self._connection is None: raise ILObjectNotExist("CAN connection was not established.") try: logger.debug("Switching LSS into CONFIGURATION state...") r = self._connection.lss.send_switch_state_selective( vendor_id, product_code, rev_number, serial_number, ) if r >= 0: self._connection.lss.configure_node_id(new_target_node) sleep(0.1) self._lss_store_configuration() else: raise ILError("Error switching lss to selective state. Error code: {}".format(r)) finally: self._lss_reset_connection_nodes(target_node) logger.info("Node ID changed to {}".format(new_target_node))
def _lss_store_configuration(self) -> None: """Stores the current configuration of the LSS Raises: ILError: If the connection was not established yet. """ if self._connection is None: raise ILError("Can not store configuration. The connection is not established yet.") self._connection.lss.store_configuration() sleep(0.1) logger.info("Stored new configuration") self._connection.lss.send_switch_state_global(self._connection.lss.WAITING_STATE) def _lss_reset_connection_nodes(self, target_node: int) -> None: """Resets the connection and starts node guarding for the connection nodes. Args: target_node: Node ID of the targeted device. Raises: ILError: If the connection was not established yet. """ if self._connection is None: raise ILError("Can not reset connection. The connection is not established yet.") self._connection.nodes[target_node].nmt.send_command(0x82) logger.debug("Wait until node is reset") sleep(0.5) for servo in self.servos: logger.info("Node connected: %i", servo.target) node = self._connection.add_node(servo.target) # Reset all nodes to default state self._connection.lss.send_switch_state_global(self._connection.lss.WAITING_STATE) self._connection.nodes[target_node].nmt.start_node_guarding(1)
[docs] def subscribe_to_status(self, node_id: int, callback: Callable[[NET_DEV_EVT], Any]) -> None: # type: ignore [override] """Subscribe to network state changes. Args: node_id: Drive's node ID. callback: Callback function. """ if callback in self.__observers_net_state[node_id]: logger.info("Callback already subscribed.") return self.__observers_net_state[node_id].append(callback)
[docs] def unsubscribe_from_status(self, node_id: int, callback: Callable[[NET_DEV_EVT], Any]) -> None: # type: ignore [override] """Unsubscribe from network state changes. Args: node_id: Drive's node ID. callback: Callback function. """ if callback not in self.__observers_net_state[node_id]: logger.info("Callback not subscribed.") return self.__observers_net_state[node_id].remove(callback)
def _notify_status(self, node_id: int, status: NET_DEV_EVT) -> None: """Notify subscribers of a network state change.""" for callback in self.__observers_net_state[node_id]: callback(status) def is_listener_started(self) -> bool: return self.__listener_net_status is not None
[docs] def start_status_listener(self) -> None: # type: ignore [override] """Start monitoring network events (CONNECTION/DISCONNECTION).""" if self.__listener_net_status is None: listener = NetStatusListener(self) listener.start() self.__listener_net_status = listener
[docs] def stop_status_listener(self) -> None: # type: ignore [override] """Stops the NetStatusListener from listening to the drive.""" if self._connection is None: return try: for node_id, node_obj in self._connection.nodes.items(): node_obj.nmt.stop_node_guarding() except Exception as e: logger.error("Could not stop node guarding. Exception: %s", str(e)) if self.__listener_net_status is not None: self.__listener_net_status.stop() self.__listener_net_status.join() self.__listener_net_status = None
@property def device(self) -> str: """Current device of the network.""" return self.__device @property def channel(self) -> Union[int, str]: """Current channel of the network.""" return self.__channel @property def baudrate(self) -> int: """Current baudrate of the network.""" return self.__baudrate @property def network(self) -> canopen.Network: """Returns the instance of the CANopen Network.""" return self._connection @property def protocol(self) -> NET_PROT: """Obtain network protocol.""" return NET_PROT.CAN def _get_servo_state(self, node_id: int) -> NET_STATE: return self.__servos_state[node_id] def _set_servo_state(self, node_id: int, state: NET_STATE) -> None: self.__servos_state[node_id] = state