from enum import Enum
from time import sleep, time
from threading import Thread
from .._ingenialink import lib
from .register import CanopenRegister
from ingenialink.utils.mcb import MCB
from ingenialink.utils._utils import *
from ..exceptions import ILFirmwareLoadError, ILObjectNotExist, ILError
from can.interfaces.pcan.pcan import PcanError
from ..network import NET_PROT, NET_STATE, NET_DEV_EVT, Network
from can.interfaces.ixxat.exceptions import VCIDeviceNotFoundError
from .servo import CanopenServo, REG_ACCESS, REG_DTYPE, CANOPEN_SDO_RESPONSE_TIMEOUT, \
STATUS_WORD_REGISTERS
import re
import os
import canopen
import tempfile
import ingenialogger
logger = ingenialogger.get_logger(__name__)
PROG_STAT_1 = CanopenRegister(
identifier='', units='', subnode=0, idx=0x1F51, subidx=0x01, cyclic='CONFIG',
dtype=REG_DTYPE.U8, access=REG_ACCESS.RW
)
PROG_DL_1 = CanopenRegister(
identifier='', units='', subnode=0, idx=0x1F50, subidx=0x01, cyclic='CONFIG',
dtype=REG_DTYPE.DOMAIN, access=REG_ACCESS.RW
)
FORCE_BOOT = CanopenRegister(
identifier='', units='', subnode=0, idx=0x5EDE, subidx=0x00, cyclic='CONFIG',
dtype=REG_DTYPE.U32, access=REG_ACCESS.WO
)
CIA301_DRV_ID_DEVICE_TYPE = CanopenRegister(
identifier='', units='', subnode=0, idx=0x1000, subidx=0x00, cyclic='CONFIG',
dtype=REG_DTYPE.U32, access=REG_ACCESS.RO
)
BOOTLOADER_MSG_SIZE = 256 # Size in Bytes
RECONNECTION_TIMEOUT = 180 # Seconds
POLLING_MAX_TRIES = 5 # Seconds
PROG_CTRL_STATE_STOP = 0x00
PROG_CTRL_STATE_START = 0x01
PROG_CTRL_STATE_CLEAR = 0x03
PROG_CTRL_STATE_FLASH = 0x80
APPLICATION_LOADED_STATE = 402
CAN_CHANNELS = {
'kvaser': (0, 1),
'pcan': ('PCAN_USBBUS1', 'PCAN_USBBUS2'),
'ixxat': (0, 1)
}
[docs]class CAN_DEVICE(Enum):
"""CAN Device."""
KVASER = 'kvaser'
PCAN = 'pcan'
IXXAT = 'ixxat'
[docs]class CAN_BAUDRATE(Enum):
"""Baudrates."""
Baudrate_1M = 1000000
"""1 Mbit/s"""
Baudrate_500K = 500000
"""500 Kbit/s"""
Baudrate_250K = 250000
"""250 Kbit/s"""
Baudrate_125K = 125000
"""125 Kbit/s"""
Baudrate_100K = 100000
"""100 Kbit/s"""
Baudrate_50K = 50000
"""50 Kbit/s"""
CAN_BIT_TIMMING = {
CAN_BAUDRATE.Baudrate_1M: 0,
CAN_BAUDRATE.Baudrate_500K: 2,
CAN_BAUDRATE.Baudrate_250K: 3,
CAN_BAUDRATE.Baudrate_125K: 4,
CAN_BAUDRATE.Baudrate_100K: 5,
CAN_BAUDRATE.Baudrate_50K: 6
}
[docs]class NetStatusListener(Thread):
"""Network status listener thread to check if the drive is alive.
Args:
network (CanopenNetwork): Network instance of the CANopen communication.
node (canopen.RemoteNode): Identifier for the targeted node ID.
"""
def __init__(self, network, node):
super(NetStatusListener, self).__init__()
self.__network = network
self.node = node
self.__timestamp = self.node.nmt.timestamp
self.__state = NET_STATE.CONNECTED
self.__stop = False
[docs] def run(self):
while not self.__stop:
if self.__timestamp == self.node.nmt.timestamp:
if self.__state != NET_STATE.DISCONNECTED:
self.__network.status = NET_STATE.DISCONNECTED
self.__state = NET_STATE.DISCONNECTED
self.__network._notify_status(NET_DEV_EVT.REMOVED)
else:
self.__network._reset_connection()
else:
if self.__state != NET_STATE.CONNECTED:
self.__network.status = NET_STATE.CONNECTED
self.__state = NET_STATE.CONNECTED
self.__network._notify_status(NET_DEV_EVT.ADDED)
self.__timestamp = self.node.nmt.timestamp
sleep(1.5)
def stop(self):
self.__stop = True
[docs]class CanopenNetwork(Network):
"""Network of the CANopen communication.
Args:
device (CAN_DEVICE): Targeted device to connect.
channel (int): Targeted channel number of the transceiver.
baudrate (CAN_BAUDRATE): Baudrate to communicate through.
"""
def __init__(self, device, channel=0, baudrate=CAN_BAUDRATE.Baudrate_1M):
super(CanopenNetwork, self).__init__()
self.__device = device.value
self.__channel = CAN_CHANNELS[self.__device][channel]
self.__baudrate = baudrate.value
self._connection = None
self.__listeners_net_status = []
self.__net_state = NET_STATE.DISCONNECTED
self.__observers_net_state = []
self.__observers_fw_load_status_msg = []
self.__observers_fw_load_progress = []
self.__observers_fw_load_errors_enabled = []
self.__fw_load_status_msg = ''
self.__fw_load_progress = 0
self.__fw_load_errors_enabled = True
[docs] def scan_slaves(self):
"""Scans for nodes in the network.
Returns:
list: Containing all the detected node IDs.
"""
is_connection_created = False
if self._connection is None:
is_connection_created = True
self._setup_connection()
self._connection.scanner.reset()
try:
self._connection.scanner.search()
except Exception as e:
logger.error("Error searching for nodes. Exception: {}".format(e))
logger.info("Resetting bus")
if self._connection is not None and self._connection.bus is not None:
self._connection.bus.reset()
sleep(0.05)
nodes = self._connection.scanner.nodes
if is_connection_created:
self._teardown_connection()
return nodes
[docs] def connect_to_slave(self, target, dictionary=None, eds=None,
servo_status_listener=False, net_status_listener=False):
"""Connects to a drive through a given target node ID.
Args:
target (int): Targeted node ID to be connected.
dictionary (str): Path to the dictionary file.
eds (str): Path to the EDS file.
servo_status_listener (bool): Toggle the listener of the servo for
its status, errors, faults, etc.
net_status_listener (bool): Toggle the listener of the network
status, connection and disconnection.
"""
nodes = self.scan_slaves()
if len(nodes) < 1:
raise_err(lib.IL_EFAIL, 'Could not find any nodes in the network')
self._setup_connection()
if target in nodes:
try:
node = self._connection.add_node(target, eds)
node.nmt.start_node_guarding(1)
servo = CanopenServo(target, node, dictionary, eds,
servo_status_listener=servo_status_listener)
if net_status_listener:
self.start_status_listener(servo)
self.servos.append(servo)
return servo
except Exception as e:
logger.error("Failed connecting to node %i. Exception: %s",
target, e)
raise_err(lib.IL_EFAIL,
'Failed connecting to node {}. '
'Please check the connection settings and verify '
'the transceiver is properly connected.'.format(target))
else:
logger.error('Node id not found')
raise_err(lib.IL_EFAIL,
'Node id {} not found in the network.'.format(target))
[docs] def disconnect_from_slave(self, servo):
"""Disconnects the slave from the network.
Args:
servo (CanopenServo): Instance of the servo connected.
"""
self.stop_status_listener(servo)
servo.stop_status_listener()
self.servos.remove(servo)
if not self.servos:
self._teardown_connection()
def _setup_connection(self):
"""Creates a network interface object establishing an empty connection
with all the network attributes already specified."""
if self._connection is None:
self._connection = canopen.Network()
try:
self._connection.connect(bustype=self.__device,
channel=self.__channel,
bitrate=self.__baudrate)
except (PcanError, VCIDeviceNotFoundError) as e:
logger.error('Transceiver not found in network. Exception: %s', e)
raise_err(lib.IL_EFAIL, 'Error connecting to the transceiver. '
'Please verify the transceiver '
'is properly connected.')
except OSError as e:
logger.error('Transceiver drivers not properly installed. Exception: %s',
e)
if hasattr(e, 'winerror') and e.winerror == 126:
e.strerror = 'Driver module not found.' \
' Drivers might not be properly installed.'
raise_err(lib.IL_EFAIL, e)
except Exception as e:
logger.error('Failed trying to connect. Exception: %s', e)
raise_err(lib.IL_EFAIL, 'Failed trying to connect. {}'.format(e))
else:
logger.info('Connection already established')
def _teardown_connection(self):
"""Tears down the already established connection
and deletes the network interface"""
self._connection.disconnect()
self._connection = None
logger.info('Tear down connection.')
def _reset_connection(self):
"""Resets the established CANopen network."""
try:
self._connection.disconnect()
except BaseException as e:
logger.error("Disconnection failed. Exception: %", e)
try:
for node in self._connection.scanner.nodes:
self._connection.nodes[node].nmt.stop_node_guarding()
if self._connection.bus:
self._connection.bus.flush_tx_buffer()
logger.info("Bus flushed")
except Exception as e:
logger.error("Could not stop guarding. Exception: %", e)
try:
self._connection.connect(bustype=self.__device,
channel=self.__channel,
bitrate=self.__baudrate)
for servo in self.servos:
node = self._connection.add_node(servo.target, servo.eds)
node.nmt.start_node_guarding(1)
except BaseException as e:
logger.error("Connection failed. Exception: %s", e)
[docs] def load_firmware(self, target, fw_file, callback_status_msg=None,
callback_progress=None, callback_errors_enabled=None):
"""Loads a given firmware file to a target.
.. warning ::
It is needed to disconnect the drive(:func:`disconnect_from_slave`)
after loading the firmware since the `Servo` object's data will
become obsolete.
Args:
target (int): Targeted node ID to be loaded.
fw_file (str): Path to the firmware file.
callback_status_msg (object): Subscribed callback function for the status
message when loading a firmware.
callback_progress (object): Subscribed callback function for the live
progress when loading a firmware.
callback_errors_enabled (object): Subscribed callback function for
knowing when to toggle the error detection when loading firmware.
Raises:
ILFirmwareLoadError: The firmware load process fails with an error message.
"""
servo = None
lfu_path = None
error_detected_msg = ''
servo_connected = False
error_connecting = False
progress = 0
if not os.path.isfile(fw_file):
raise FileNotFoundError('Could not find {}.'.format(fw_file))
# Subscribe to firmware loader process
if callback_status_msg is not None:
self.__observers_fw_load_status_msg.append(callback_status_msg)
if callback_progress is not None:
self.__observers_fw_load_progress.append(callback_progress)
if callback_errors_enabled is not None:
self.__observers_fw_load_errors_enabled.append(callback_errors_enabled)
self.__set_fw_load_status_msg('')
self.__set_fw_load_progress(progress)
self.__set_fw_load_errors_enabled(True)
# Check if target is already connected
for servo in self.servos:
if servo.target == target:
servo_connected = True
break
else:
servo = None
try:
if not servo_connected:
self.__set_fw_load_status_msg('Connecting to drive')
logger.info('Connecting to drive')
progress = 5
self.__set_fw_load_progress(progress)
try:
servo = self.connect_to_slave(target, servo_status_listener=False)
progress = 7
self.__set_fw_load_progress(progress)
except Exception as e:
logger.error('Error connecting to drive through CAN: %s', e)
error_connecting = True
if servo is not None and not error_connecting:
# Check if bootloader is supported
logger.info('Checking compatibility')
self.__set_fw_load_status_msg('Checking compatibility')
prog_stat_1 = None
try:
prog_stat_1 = servo.read(PROG_STAT_1, subnode=0)
is_bootloader_supported = True
except Exception as e:
is_bootloader_supported = False
progress = 10
current_progress = progress
self.__set_fw_load_progress(progress)
if is_bootloader_supported:
# Check if file is .lfu
file_name, file_extension = os.path.splitext(fw_file)
if file_extension != '' and file_extension == '.sfu':
fd, lfu_path = tempfile.mkstemp(suffix=".lfu")
logger.debug('>> FD: {}. \n>> '
'LFU PATH: {}.'.format(fd, lfu_path))
try:
# Convert the sfu file to lfu
logger.info('Converting sfu to lfu...')
self.__set_fw_load_status_msg('Optimizing file')
logger.info('Optimizing file')
total_file_lines = count_file_lines(fw_file)
outfile = open(lfu_path, 'wb')
coco_in = open(fw_file, "r")
mcb = MCB()
copy_process = 0
bin_node = ''
for line in coco_in:
if re.match("74 67 [0-4][0-4] 00 00 00 00 00 00 00",
line) is not None:
bin_node = line[6:8]
newline = '{} {}'.format(bin_node, line)
words = newline.split()
# Get command and address
subnode = int(words[0], 16)
cmd = int(words[2] + words[1], 16)
data = b''
num = 3
while num in range(3, len(words)):
# load data MCB
data = data + bytes([int(words[num], 16)])
num = num + 1
# send message
node = 10
mcb.add_cmd(node, subnode, cmd, data, outfile)
new_progress = int((copy_process * (25 - progress)) /
total_file_lines) + progress
if new_progress != current_progress:
current_progress = new_progress
self.__set_fw_load_progress(current_progress)
copy_process += 1
progress = current_progress
outfile.close()
coco_in.close()
logger.info('Converted to lfu')
except Exception as e:
logger.error('Exception converting to lfu. {}'.format(e))
else:
lfu_path = fw_file
total_file_size = os.path.getsize(lfu_path) / BOOTLOADER_MSG_SIZE
# Check if Boot mode or App loaded
try:
device_type = servo.read(CIA301_DRV_ID_DEVICE_TYPE, subnode=0)
device_type = device_type & 0xFFFF
except Exception as e:
device_type = 0
if device_type == APPLICATION_LOADED_STATE:
# Drive profile
self.__set_fw_load_status_msg('Entering Bootmode')
logger.info('Entering Bootmode')
# Enter in NMT pre-operational state.
self._connection.nmt.send_command(PROG_CTRL_STATE_FLASH)
# The drive will unlock the clear program command
password = 0x70636675
try:
servo.write(FORCE_BOOT, password, subnode=0)
except Exception as e:
pass
if prog_stat_1 == PROG_CTRL_STATE_START \
or prog_stat_1 == PROG_CTRL_STATE_FLASH:
# Write 0 to 0x1F51 to stop the app
try:
servo.write(PROG_STAT_1, PROG_CTRL_STATE_STOP, subnode=0)
except Exception as e:
pass
self.__set_fw_load_status_msg('Setting up drive')
logger.info('Connected')
logger.info('Clearing program...')
prog_stat_1 = None
try:
prog_stat_1 = servo.read(PROG_STAT_1, subnode=0)
r = 0
except Exception as e:
r = -1
if r >= 0 and prog_stat_1 == PROG_CTRL_STATE_STOP:
try:
servo.write(PROG_STAT_1, PROG_CTRL_STATE_CLEAR, subnode=0)
except Exception as e:
pass
r = wait_for_register_value(servo, 0,
PROG_STAT_1, PROG_CTRL_STATE_CLEAR)
if r >= 0:
try:
servo.write(PROG_STAT_1, PROG_CTRL_STATE_FLASH, subnode=0)
except Exception as e:
pass
r = wait_for_register_value(servo, 0,
PROG_STAT_1, PROG_CTRL_STATE_FLASH)
if r < 0:
error_detected_msg = 'Error entering flashing mode'
logger.info(error_detected_msg)
else:
error_detected_msg = 'Error entering boot mode'
logger.error(error_detected_msg)
if error_detected_msg == '':
logger.info('Downloading program...')
image = open(lfu_path, "rb")
counter = 0
# Read image content in BOOTLOADER_MSG_SIZE
try:
error_downloading = False
servo._change_sdo_timeout(10)
self.__set_fw_load_status_msg('Downloading firmware')
logger.info('Downloading firmware')
byte = image.read(1)
bytes_data = bytearray()
counter_blocks = 1
while not error_downloading:
bytes_data.extend(byte)
if counter_blocks == BOOTLOADER_MSG_SIZE:
counter_blocks = 0
try:
servo.write(PROG_DL_1, bytes_data, subnode=0)
r = 0
except Exception as e:
r = -1
if r < 0:
error_downloading = True
error_detected_msg = 'An error occurred ' \
'while downloading.'
counter += 1
new_progress = int((counter * (100 - progress)) /
total_file_size) + progress
if new_progress != current_progress:
current_progress = new_progress
self.__set_fw_load_progress(current_progress)
bytes_data = bytearray()
byte = image.read(1)
if not byte:
break
counter_blocks += 1
if not error_downloading:
self.__set_fw_load_status_msg('Flashing firmware')
logger.info("Download Finished!")
logger.info("Flashing firmware")
try:
servo.write(PROG_DL_1, bytes_data, subnode=0)
except Exception as e:
pass
servo._change_sdo_timeout(CANOPEN_SDO_RESPONSE_TIMEOUT)
except Exception as e:
error_detected_msg = 'An error occurred while downloading.'
logger.error('Failed to download fw, reset might be needed. '
'Exception {}.'.format(e))
try:
image.close()
logger.debug('Temp file deleted')
except Exception as e:
logger.warning('Could not remove temp file. '
'Exception: {}'.format(e))
if error_detected_msg == '':
logger.info('Disable errors')
self.__set_fw_load_errors_enabled(False)
logger.info("Flashing...")
try:
servo.write(PROG_STAT_1, PROG_CTRL_STATE_STOP, subnode=0)
except Exception as e:
pass
logger.info('Waiting for the drive to respond...')
sleep(5)
initial_time = time()
time_diff = time() - initial_time
bool_timeout = False
while self.status != NET_STATE.CONNECTED \
and time_diff < RECONNECTION_TIMEOUT:
time_diff = time() - initial_time
sleep(0.5)
if not time_diff < RECONNECTION_TIMEOUT:
bool_timeout = True
logger.debug("Time waited for reconnection: ",
time_diff, bool_timeout)
logger.debug("Net state after reconnection: ",
self.status)
sleep(5)
self.__set_fw_load_status_msg('Starting program')
logger.info("Starting program")
try:
servo.write(PROG_STAT_1, PROG_CTRL_STATE_START, subnode=0)
except Exception as e:
pass
if not bool_timeout:
logger.info('Bootloader finished successfully!')
self.__set_fw_load_status_msg('Bootloader '
'finished successfully!')
# Wait for the drive to reset
initial_time = time()
timeout = 25
stop = False
logger.info('Waiting for the drive to be available.')
while (time() - initial_time) < timeout and not stop:
try:
servo.read(STATUS_WORD_REGISTERS[1])
stop = True
except ILError:
pass
else:
error_detected_msg = 'Could not recover drive'
logger.error(error_detected_msg)
else:
# Bootloader not supported
error_detected_msg = 'Firmware and bootloader versions are ' \
'not compatible. Use FTP Bootloader instead.'
else:
error_detected_msg = 'Failed to connect to the drive'
logger.error("Error detected could not specify the drive.")
if self._connection is not None and not servo_connected:
self._teardown_connection()
logger.error('CANopen connection disconnected')
except Exception as e:
logger.error('Failed to load firmware. Exception: {}'.format(e))
error_detected_msg = 'Failed to load firmware'
if servo is not None and not servo_connected:
self.disconnect_from_slave(servo)
elif self._connection is not None and not servo_connected:
self._teardown_connection()
logger.error('CANopen connection disconnected')
try:
if servo is not None and not servo_connected:
self.disconnect_from_slave(servo)
elif self._connection is not None and not servo_connected:
self._teardown_connection()
logger.error('CANopen connection disconnected')
except Exception as e:
logger.error('Could not disconnect the drive. Exception {}.'.format(e))
try:
if lfu_path != fw_file:
os.remove(lfu_path)
except Exception as e:
logger.warning('Could not remove {}. Exception: {}'.format(lfu_path, e))
self.__set_fw_load_errors_enabled(True)
# Unsubscribe to firmware loader process
if callback_status_msg is not None:
self.__observers_fw_load_status_msg.remove(callback_status_msg)
if callback_progress is not None:
self.__observers_fw_load_progress.remove(callback_progress)
if callback_errors_enabled is not None:
self.__observers_fw_load_errors_enabled.remove(callback_errors_enabled)
if error_detected_msg != '':
raise ILFirmwareLoadError(error_detected_msg)
def __set_fw_load_status_msg(self, new_value):
"""Updates the fw_load_status_msg value and triggers
all the callbacks associated.
Args:
new_value: New value for the variable.
"""
self.__fw_load_status_msg = new_value
for callback in self.__observers_fw_load_status_msg:
callback(new_value)
def __set_fw_load_progress(self, new_value):
"""Updates the fw_load_progress value and triggers
all the callbacks associated.
Args:
new_value: New value for the variable.
"""
self.__fw_load_progress = new_value
for callback in self.__observers_fw_load_progress:
callback(new_value)
def __set_fw_load_errors_enabled(self, new_value):
"""Updates the fw_load_errors_enabled value and triggers
all the callbacks associated.
Args:
new_value: New value for the variable.
"""
self.__fw_load_errors_enabled = new_value
for callback in self.__observers_fw_load_errors_enabled:
callback(new_value)
[docs] def change_baudrate(self, target_node, new_target_baudrate, vendor_id,
product_code, rev_number, serial_number):
"""Changes the node ID of a given target node ID.
.. note::
The servo must be disconnected after this operation in order
to make the changes visible and update all the internal data.
It is also needed a power cycle of the servo otherwise the
changes will not be applied.
Args:
target_node (int): Node ID of the targeted device.
new_target_baudrate (CAN_BAUDRATE): New baudrate for the targeted device.
vendor_id (int): Vendor ID of the targeted device.
product_code (int): Product code of the targeted device.
rev_number (int): Revision number of the targeted device.
serial_number (int): Serial number of the targeted device.
Returns:
bool: Indicates if the operation was successful.
"""
if self._connection is None:
raise ILObjectNotExist('CAN connection was not established.')
try:
logger.debug("Switching LSS into CONFIGURATION state...")
r = self._connection.lss.send_switch_state_selective(
vendor_id,
product_code,
rev_number,
serial_number,
)
if r >= 0:
self._connection.lss.configure_bit_timing(
CAN_BIT_TIMMING[new_target_baudrate]
)
sleep(0.1)
self._lss_store_configuration()
else:
raise ILError('Error switching lss to selective state. '
'Error code: {}'.format(r))
finally:
self._lss_reset_connection_nodes(target_node)
logger.info('Baudrate changed to {}'.format(new_target_baudrate))
[docs] def change_node_id(self, target_node, new_target_node, vendor_id,
product_code, rev_number, serial_number):
"""Changes the node ID of a given target node ID.
.. note::
The servo must be disconnected after this operation in order
to make the changes visible and update all the internal data.
Args:
target_node (int): Node ID of the targeted device.
new_target_node (int): New node ID for the targeted device.
vendor_id (int): Vendor ID of the targeted device.
product_code (int): Product code of the targeted device.
rev_number (int): Revision number of the targeted device.
serial_number (int): Serial number of the targeted device.
Returns:
bool: Indicates if the operation was successful.
"""
if self._connection is None:
raise ILObjectNotExist('CAN connection was not established.')
try:
logger.debug("Switching LSS into CONFIGURATION state...")
r = self._connection.lss.send_switch_state_selective(
vendor_id,
product_code,
rev_number,
serial_number,
)
if r >= 0:
self._connection.lss.configure_node_id(new_target_node)
sleep(0.1)
self._lss_store_configuration()
else:
raise ILError('Error switching lss to selective state. '
'Error code: {}'.format(r))
finally:
self._lss_reset_connection_nodes(target_node)
logger.info('Node ID changed to {}'.format(new_target_node))
def _lss_store_configuration(self):
"""Stores the current configuration of the LSS"""
self._connection.lss.store_configuration()
sleep(0.1)
logger.info('Stored new configuration')
self._connection.lss.send_switch_state_global(
self._connection.lss.WAITING_STATE
)
def _lss_reset_connection_nodes(self, target_node):
"""Resets the connection and starts node guarding for the connection nodes.
Args:
target_node (int): Node ID of the targeted device.
"""
self._connection.nodes[target_node].nmt.send_command(0x82)
logger.debug("Wait until node is reset")
sleep(0.5)
for servo in self.servos:
logger.info('Node connected: %i', servo.target)
node = self._connection.add_node(servo.target, servo.eds)
# Reset all nodes to default state
self._connection.lss.send_switch_state_global(
self._connection.lss.WAITING_STATE
)
self._connection.nodes[target_node].nmt.start_node_guarding(1)
[docs] def subscribe_to_status(self, callback):
"""Subscribe to network state changes.
Args:
callback (function): Callback function.
"""
if callback in self.__observers_net_state:
logger.info('Callback already subscribed.')
return
self.__observers_net_state.append(callback)
[docs] def unsubscribe_from_status(self, callback):
"""Unsubscribe from network state changes.
Args:
callback (function): Callback function.
"""
if callback not in self.__observers_net_state:
logger.info('Callback not subscribed.')
return
self.__observers_net_state.remove(callback)
def _notify_status(self, status):
for callback in self.__observers_net_state:
callback(status)
[docs] def start_status_listener(self, servo):
"""Start monitoring network events (CONNECTION/DISCONNECTION)."""
for listener in self.__listeners_net_status:
if listener.node.id == servo.node.id:
logger.info(
f'Listener on node {servo.node.id} is already started.'
)
return
listener = NetStatusListener(
self, servo.node
)
listener.start()
self.__listeners_net_status.append(listener)
[docs] def stop_status_listener(self, servo):
"""Stops the NetStatusListener from listening to the drive."""
try:
for node_id, node_obj in self._connection.nodes.items():
if node_id == servo.node.id:
node_obj.nmt.stop_node_guarding()
except Exception as e:
logger.error('Could not stop node guarding. Exception: %s', str(e))
for listener in self.__listeners_net_status:
if listener.node == servo.node.id and listener.is_alive:
listener.stop()
listener.join()
@property
def device(self):
"""CAN_DEVICE: Current device of the network."""
return self.__device
@property
def channel(self):
"""int: Current device of the network."""
return self.__channel
@property
def baudrate(self):
"""int: Current baudrate of the network."""
return self.__baudrate
@property
def network(self):
"""canopen.Network: Returns the instance of the CANopen Network."""
return self._connection
@property
def protocol(self):
"""NET_PROT: Obtain network protocol."""
return NET_PROT.CAN
@property
def status(self):
"""NET_STATE: Network state."""
return self.__net_state
@status.setter
def status(self, new_state):
self.__net_state = new_state