Source code for ingenialink.eoe.network

import ipaddress
import socket
import time
from enum import Enum
from typing import List, Dict, Optional

import ingenialogger

from ingenialink import constants
from ingenialink.ethernet.network import EthernetNetwork
from ingenialink.ethernet.servo import EthernetServo
from ingenialink.exceptions import ILTimeoutError, ILIOError, ILError

logger = ingenialogger.get_logger(__name__)


[docs]class EoECommand(Enum): INIT = 0 DEINIT = 1 SCAN = 2 CONFIG = 3 ERASE_CONFIG = 4 EOE_START = 5 EOE_STOP = 6 GET_STATUS = 7
[docs]class EoENetwork(EthernetNetwork): """Network for EoE (Ethernet over EtherCAT) communication. Args: ifname: Network interface name. connection_timeout: Time in seconds of the connection timeout to the EoE service. """ EOE_MSG_CMD_SIZE = 2 EOE_MSG_NODE_SIZE = 2 EOE_MSG_IP_SIZE = 4 EOE_MSG_DATA_SIZE = 53 EOE_MSG_FRAME_SIZE = EOE_MSG_CMD_SIZE + EOE_MSG_DATA_SIZE NULL_TERMINATOR = b"\x00" STATUS_EOE_BIT = 0b10 STATUS_INIT_BIT = 0b1 WAIT_EOE_TIMEOUT = 1 ECAT_SERVICE_NETWORK = ipaddress.ip_network("192.168.3.0/24") def __init__( self, ifname: str, connection_timeout: float = constants.DEFAULT_ETH_CONNECTION_TIMEOUT ) -> None: super().__init__() self.ifname = ifname self._eoe_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self._eoe_socket.settimeout(connection_timeout) self._connect_to_eoe_service() status = self._get_status_eoe_service() if status & self.STATUS_EOE_BIT: self._stop_eoe_service() self._erase_config_eoe_service() if status & self.STATUS_INIT_BIT: self._deinitialize_eoe_service() self._eoe_service_init = False self._eoe_service_started = False self._configured_slaves: Dict[str, int] = {}
[docs] def connect_to_slave( # type: ignore [override] self, slave_id: int, ip_address: str, dictionary: str, port: int = 1061, connection_timeout: float = constants.DEFAULT_ETH_CONNECTION_TIMEOUT, servo_status_listener: bool = False, net_status_listener: bool = False, ) -> EthernetServo: """Connects to a slave through the given network settings. Args: slave_id: EtherCAT slave ID. ip_address: IP address to be assigned to the slave. dictionary: Path to the target dictionary file. port: Port to connect to the slave. connection_timeout: Time in seconds of the connection timeout. servo_status_listener: Toggle the listener of the servo for its status, errors, faults, etc. net_status_listener: Toggle the listener of the network status, connection and disconnection. Raises: ValueError: ip_address must be a subnetwork of 192.168.3.0/24 ILError: If the EoE service is not running. ILError: If the EoE service cannot be started on the network interface. Returns: EthernetServo: Instance of the servo connected. """ if ipaddress.ip_address(ip_address) not in self.ECAT_SERVICE_NETWORK: raise ValueError("ip_address must be a subnetwork of 192.168.3.0/24") if not self._eoe_service_init: self._initialize_eoe_service() if self._eoe_service_started: self._stop_eoe_service() self._erase_config_eoe_service() self.__reconfigure_drives() try: self._configure_slave(slave_id, ip_address) finally: self._start_eoe_service() self.__wait_eoe_starts() self._configured_slaves[ip_address] = slave_id return super().connect_to_slave( ip_address, dictionary, port, connection_timeout, servo_status_listener, net_status_listener, )
def __wait_eoe_starts(self) -> None: """Wait until the EoE service starts the EoE or the timeout was reached""" status = self._get_status_eoe_service() time_start = time.time() while not status & self.STATUS_EOE_BIT and self.WAIT_EOE_TIMEOUT > time.time() - time_start: time.sleep(0.1) status = self._get_status_eoe_service() if not status & self.STATUS_EOE_BIT: logger.warning("Service did not starts the EoE") def __reconfigure_drives(self) -> None: """Reconfigure all the slaves saved in the network""" for ip_addr, slave_id in self._configured_slaves.items(): try: self._configure_slave(slave_id, ip_addr) except ILError as e: logger.error(e)
[docs] def disconnect_from_slave(self, servo: EthernetServo) -> None: # type: ignore [override] del self._configured_slaves[servo.ip_address] super().disconnect_from_slave(servo) if len(self.servos) == 0: self._stop_eoe_service() self._erase_config_eoe_service() try: self._deinitialize_eoe_service() except ILError as e: logger.error(e)
def __del__(self) -> None: self._eoe_socket.shutdown(socket.SHUT_RDWR) self._eoe_socket.close()
[docs] def scan_slaves(self) -> List[int]: """Scan slaves connected to the network adapter. Returns: List containing the ids of the connected slaves. Raises: ILError: If the EoE service fails to perform a scan. """ deinit_later = False if not self._eoe_service_init: deinit_later = True self._initialize_eoe_service() result = self._scan_eoe_service() if deinit_later: self._deinitialize_eoe_service() return result
def _scan_eoe_service(self) -> List[int]: """Make the scan request to the EoE service Returns: List containing the ids of the connected slaves. Raises: ILError: If the EoE service fails to perform a scan. """ msg = self._build_eoe_command_msg(EoECommand.SCAN.value) try: r = self._send_command(msg) except (ILIOError, ILTimeoutError) as e: raise ILError( "Failed to perform a network scan. Please verify the EoE service is running." ) from e return list(range(1, r + 1)) @staticmethod def _build_eoe_command_msg(cmd: int, data: Optional[bytes] = None) -> bytes: """ Build a message with the following format. +----------+----------+ | cmd | data | +==========+==========+ | 2 Byte | 53 Bytes | +----------+----------+ Args: cmd: Indicates which operation to perform. data: Contains the necessary data to perform the desired command. Returns: The message to send. """ if data is None: data = bytes() cmd_field = cmd.to_bytes(EoENetwork.EOE_MSG_CMD_SIZE, "little") data_field = data + EoENetwork.NULL_TERMINATOR * (EoENetwork.EOE_MSG_DATA_SIZE - len(data)) return cmd_field + data_field def _send_command(self, msg: bytes) -> int: """ Send command to EoE service. Args: msg: Message to send. Returns: Response from the EoE service. Raises: ILTimeoutError: Timeout while receiving a response from the EoE service. ILIOError: Error while sending/receiving message. """ try: self._eoe_socket.send(msg) except socket.error as e: raise ILIOError("Error sending message.") from e try: response = self._eoe_socket.recv(1024) except socket.timeout as e: raise ILTimeoutError("Timeout while receiving response.") from e except socket.error as e: raise ILIOError("Error receiving response.") from e return int.from_bytes(response, byteorder="little", signed=True) def _connect_to_eoe_service(self) -> None: """Connect to the EoE service.""" self._eoe_socket.connect(("127.0.0.1", 8888)) def _initialize_eoe_service(self) -> None: """Initialize the virtual network interface and the packet forwarder. Raises: ILError: If the EoE service is not running. ILError: If the EoE service cannot be started on the network interface. """ self._eoe_service_init = True data = self.ifname msg = self._build_eoe_command_msg(EoECommand.INIT.value, data=data.encode("utf-8")) try: r = self._send_command(msg) except (ILIOError, ILTimeoutError) as e: raise ILError( "Failed to initialize the EoE service. Please verify it's running." ) from e if r < 0: raise ILError(f"Failed to initialize the EoE service using interface {self.ifname}.") def _deinitialize_eoe_service(self) -> None: """Deinitialize the virtual network interface and the packet forwarder. Raises: ILError: If the EoE service is not running. ILError: If the EoE service cannot be stopped on the network interface. """ data = self.ifname msg = self._build_eoe_command_msg(EoECommand.DEINIT.value, data=data.encode("utf-8")) try: self._send_command(msg) self._eoe_service_init = False except (ILIOError, ILTimeoutError) as e: raise ILError("Failed to deinitialize the EoE service.") from e def _configure_slave( self, slave_id: int, ip_address: str, net_mask: str = "255.255.255.0" ) -> None: """ Configure an EtherCAT slave with a given IP. Args: slave_id: EtherCAT slave ID. ip_address: IP address to be set to the slave. Raises: ILError: If the EoE service fails to configure a slave. """ slave_bytes = slave_id.to_bytes(self.EOE_MSG_NODE_SIZE, "little") ip_int = int(ipaddress.IPv4Address(ip_address)) ip_bytes = ip_int.to_bytes(self.EOE_MSG_IP_SIZE, "little") net_mask_int = int(ipaddress.IPv4Address(net_mask)) net_mask_bytes = net_mask_int.to_bytes(self.EOE_MSG_IP_SIZE, "little") data = slave_bytes + ip_bytes + net_mask_bytes msg = self._build_eoe_command_msg(EoECommand.CONFIG.value, data) try: self._send_command(msg) except (ILIOError, ILTimeoutError) as e: raise ILError(f"Failed to configure slave {slave_id} with IP {ip_address}.") from e def _start_eoe_service(self) -> None: """Starts the EoE service Raises: ILError: If the EoE service fails to start. """ self._eoe_service_started = True msg = self._build_eoe_command_msg(EoECommand.EOE_START.value) try: self._send_command(msg) except (ILIOError, ILTimeoutError) as e: raise ILError("Failed to start the EoE service.") from e def _stop_eoe_service(self) -> None: """Stops the EoE service Raises: ILError: If the EoE service fails to stop. """ self._eoe_service_started = False msg = self._build_eoe_command_msg(EoECommand.EOE_STOP.value) try: self._send_command(msg) except (ILIOError, ILTimeoutError) as e: raise ILError("Failed to stop the EoE service.") from e def _erase_config_eoe_service(self) -> None: """Stops the EoE service Raises: ILError: If the EoE service fails to stop. """ msg = self._build_eoe_command_msg(EoECommand.ERASE_CONFIG.value) try: self._send_command(msg) except (ILIOError, ILTimeoutError) as e: raise ILError("Failed to stop the EoE service.") from e def _get_status_eoe_service(self) -> int: """Get the EoE service status. +-----------+------+------+ |ECAT status| init | eoe | +===========+======+======+ | 1Byte | 1bit | 1bit | +-----------+------+------+ Returns: Status response Raises: ILError: If get status request fails. """ msg = self._build_eoe_command_msg(EoECommand.GET_STATUS.value) try: r = self._send_command(msg) except (ILIOError, ILTimeoutError) as e: raise ILError("Failed to get service status.") from e return r def load_firmware_moco(self) -> None: # type: ignore [override] raise NotImplementedError def load_firmware(self) -> None: # type: ignore [override] raise NotImplementedError