from enum import Enum
from ..exceptions import *
from ..network import NET_PROT
from .servo import EthercatServo
from ingenialink.constants import *
from .._ingenialink import lib, ffi
from ingenialink.utils._utils import cstr
from ingenialink.ipb.network import IPBNetwork
from ingenialink.network import EEPROM_FILE_FORMAT
import os
import ingenialogger
logger = ingenialogger.get_logger(__name__)
FIRMWARE_UPDATE_ERROR = {
lib.UP_STATEMACHINE_ERROR: "Slave could not enter the expected state",
lib.UP_NOT_IN_BOOT_ERROR: "Slave is not in Boot Mode",
lib.UP_EEPROM_PDI_ERROR: "EEPROM PDI Error",
lib.UP_EEPROM_FILE_ERROR: "File was not read properly",
lib.UP_NOT_FOUND_ERROR: "No slaves were found",
lib.UP_NO_SOCKET: "No socket connection was found. Execute as Root",
lib.UP_FORCE_BOOT_ERROR: "Could not force Boot mode",
lib.SOEM_EC_ERR_TYPE_SDO_ERROR: "EtherCAT Error. SDO error",
lib.SOEM_EC_ERR_TYPE_EMERGENCY: "EtherCAT Error. Emergency error",
lib.SOEM_EC_ERR_TYPE_PACKET_ERROR: "EtherCAT Error. Packet error",
lib.SOEM_EC_ERR_TYPE_SDOINFO_ERROR: "EtherCAT Error. SDO Info error",
lib.SOEM_EC_ERR_TYPE_FOE_ERROR: "EtherCAT Error. FOE error",
lib.SOEM_EC_ERR_TYPE_FOE_BUF2SMALL: "EtherCAT Error. Buffer too small error",
lib.SOEM_EC_ERR_TYPE_FOE_PACKETNUMBER: "EtherCAT Error. FOE Packet number error",
lib.SOEM_EC_ERR_TYPE_SOE_ERROR: "EtherCAT Error. SOE error",
lib.SOEM_EC_ERR_TYPE_MBX_ERROR: "EtherCAT Error. MBX error",
lib.SOEM_EC_ERR_TYPE_FOE_FILE_NOTFOUND: "EtherCAT Error. FOE File not found error",
lib.SOEM_EC_ERR_TYPE_EOE_INVALID_RX_DATA: "EtherCAT Error. Invalid RX Data error",
}
[docs]class EthercatNetwork(IPBNetwork):
"""Network for all EtherCAT communications.
Args:
interface_name (str): Interface name to be targeted.
"""
def __init__(self, interface_name):
super(EthercatNetwork, self).__init__()
self.interface_name = interface_name
"""str: Interface name used in the network settings."""
self.servos = []
"""list: List of the connected servos in the network."""
[docs] def load_firmware(self, fw_file, target=1, boot_in_app=None):
"""Loads a given firmware file to a target.
.. warning::
Choose the ``boot_in_app`` flag accordingly to your
servo specifications otherwise the servo could enter
a blocking state.
.. warning ::
It is needed to disconnect the drive(:func:`disconnect_from_slave`)
after loading the firmware since the `Servo` object's data will
become obsolete.
Args:
target (int): Targeted node ID to be loaded.
fw_file (str): Path to the firmware file.
boot_in_app (bool): If ``fw_file`` extension is .sfu -> True.
Otherwise -> False.
Raises:
ILFirmwareLoadError: The firmware load process fails
with an error message.
ValueError: If the firmware file has the wrong extension.
"""
if not os.path.isfile(fw_file):
raise FileNotFoundError("Could not find {}.".format(fw_file))
if boot_in_app is None:
if not fw_file.endswith((FILE_EXT_SFU, FILE_EXT_LFU)):
raise ValueError(
f"Firmware file should have extension {FILE_EXT_SFU} or {FILE_EXT_LFU}"
)
boot_in_app = fw_file.endswith(FILE_EXT_SFU)
self._cffi_network = ffi.new("il_net_t **")
_interface_name = cstr(self.interface_name) if self.interface_name else ffi.NULL
_fw_file = cstr(fw_file) if fw_file else ffi.NULL
r = lib.il_net_update_firmware(
self._cffi_network, _interface_name, target, _fw_file, boot_in_app
)
if r < 0:
error_msg = "Error updating firmware. Error code: {}".format(r)
if r in FIRMWARE_UPDATE_ERROR:
error_msg = FIRMWARE_UPDATE_ERROR[r]
raise ILFirmwareLoadError(error_msg)
def _read_eeprom(self, eeprom_file, slave, file_format):
"""Reads the EEPROM.
Args:
eeprom_file (str): Path to the EEPROM file.
slave (int): Target slave number to be connected
file_format (EEPROM_FILE_FORMAT): EEPROM tool mode.
Raises:
ILError: In case the operation does not succeed.
"""
if file_format not in EEPROM_FILE_FORMAT:
raise ILError("Invalid file format")
if file_format == EEPROM_FILE_FORMAT.BINARY:
mode = EEPROM_TOOL_MODE.MODE_READBIN.value
else:
mode = EEPROM_TOOL_MODE.MODE_READINTEL.value
self._cffi_network = ffi.new("il_net_t **")
_interface_name = cstr(self.interface_name) if self.interface_name else ffi.NULL
_eeprom_file = cstr(eeprom_file) if eeprom_file else ffi.NULL
r = lib.il_net_eeprom_tool(self._cffi_network, _interface_name, slave, mode, _eeprom_file)
if r < 0:
raise ILError("Failed reading EEPROM file.")
def _write_eeprom(self, eeprom_file, slave, file_format):
"""Loads an EEPROM file to use as configuration.
Args:
eeprom_file (str): Path to the EEPROM file.
slave (int): Target slave number to be connected
file_format (EEPROM_FILE_FORMAT): EEPROM tool mode.
Raises:
ILError: In case the operation does not succeed.
"""
if file_format not in EEPROM_FILE_FORMAT:
raise ILError("Invalid file format")
if file_format == EEPROM_FILE_FORMAT.BINARY:
mode = EEPROM_TOOL_MODE.MODE_WRITEBIN.value
else:
mode = EEPROM_TOOL_MODE.MODE_WRITEINTEL.value
self._cffi_network = ffi.new("il_net_t **")
_interface_name = cstr(self.interface_name) if self.interface_name else ffi.NULL
_eeprom_file = cstr(eeprom_file) if eeprom_file else ffi.NULL
r = lib.il_net_eeprom_tool(self._cffi_network, _interface_name, slave, mode, _eeprom_file)
if r < 0:
raise ILError("Failed writing EEPROM file.")
def _write_eeprom_alias(self, eeprom_file, slave):
"""Writes the configuration station alias.
Args:
eeprom_file (str): Path to the EEPROM file.
slave (int): Target slave number to be connected
Raises:
ILError: In case the operation does not succeed.
"""
self._cffi_network = ffi.new("il_net_t **")
_interface_name = cstr(self.interface_name) if self.interface_name else ffi.NULL
_eeprom_file = cstr(eeprom_file) if eeprom_file else ffi.NULL
r = lib.il_net_eeprom_tool(
self._cffi_network,
_interface_name,
slave,
EEPROM_TOOL_MODE.MODE_WRITEALIAS.value,
_eeprom_file,
)
if r < 0:
raise ILError("Failed writing EEPROM alias.")
[docs] def scan_slaves(self):
"""Scan all the slaves connected in the network.
Returns:
list: List of number of slaves connected to the network.
"""
_interface_name = cstr(self.interface_name) if self.interface_name else ffi.NULL
number_slaves = lib.il_net_num_slaves_get(_interface_name)
return [slave + 1 for slave in range(number_slaves)]
[docs] def connect_to_slave(
self,
target=1,
dictionary="",
use_eoe_comms=1,
reconnection_retries=DEFAULT_MESSAGE_RETRIES,
reconnection_timeout=DEFAULT_MESSAGE_TIMEOUT,
servo_status_listener=False,
net_status_listener=False,
):
"""Connect a slave through an EtherCAT connection.
Args:
target (int): Number of the target slave.
dictionary (str): Path to the dictionary to be loaded.
use_eoe_comms (int): Specify which architecture is the target based on.
reconnection_retries (int): Number of reconnection retried before declaring
a connected or disconnected stated.
reconnection_timeout (int): Time in ms of the reconnection timeout.
servo_status_listener (bool): Toggle the listener of the servo for
its status, errors, faults, etc.
net_status_listener (bool): Toggle the listener of the network
status, connection and disconnection.
Returns:
EthercatServo: Instance of the connected servo.
"""
_interface_name = cstr(self.interface_name) if self.interface_name else ffi.NULL
_dictionary = cstr(dictionary) if dictionary else ffi.NULL
_servo = ffi.new("il_servo_t **")
self._cffi_network = ffi.new("il_net_t **")
r = lib.il_servo_connect_ecat(
3, _interface_name, self._cffi_network, _servo, _dictionary, 1061, target, use_eoe_comms
)
if r <= 0:
_servo = None
self._cffi_network = None
raise ILError("Could not find any servos connected.")
net_ = ffi.cast("il_net_t *", self._cffi_network[0])
servo_ = ffi.cast("il_servo_t *", _servo[0])
servo = EthercatServo(servo_, net_, target, dictionary, servo_status_listener)
self._cffi_network = net_
self.servos.append(servo)
if net_status_listener:
self.start_status_listener()
self.set_reconnection_retries(reconnection_retries)
self.set_recv_timeout(reconnection_timeout)
return servo
[docs] def disconnect_from_slave(self, servo):
"""Disconnects the slave from the network.
Args:
servo (EthernetServo): Instance of the servo connected.
"""
# TODO: This stops all connections no only the target servo.
if servo in self.servos:
self.servos.remove(servo)
self.stop_status_listener()
lib.il_servo_destroy(servo._cffi_servo)
r = lib.il_net_master_stop(self._cffi_network)
lib.il_net_destroy(self._cffi_network)
self._cffi_network = None
if r < 0:
raise ILError("Error disconnecting the drive. Return code: {}".format(r))
@property
def protocol(self):
"""NET_PROT: Obtain network protocol."""
return NET_PROT.ECAT