import ipaddress
import socket
from typing import Optional
from ingenialink.exceptions import ILError, ILTimeoutError, ILIOError, ILWrongRegisterError
from ingenialink.constants import PASSWORD_STORE_RESTORE_TCP_IP
from ingenialink.ethernet.register import EthernetRegister
from ingenialink.constants import MCB_CMD_READ, MCB_CMD_WRITE, ETH_MAX_WRITE_SIZE, ETH_BUF_SIZE
from ingenialink.enums.register import REG_DTYPE, REG_ACCESS
from ingenialink.servo import Servo
from ingenialink.utils.mcb import MCB
from ingenialink.utils._utils import convert_ip_to_int
from ingenialink.ethernet.dictionary import EthernetDictionary
import ingenialogger
logger = ingenialogger.get_logger(__name__)
[docs]class EthernetServo(Servo):
"""Servo object for all the Ethernet slave functionalities.
Args:
socket: Socket.
dictionary_path: Path to the dictionary.
servo_status_listener: Toggle the listener of the servo for
its status, errors, faults, etc.
"""
DICTIONARY_CLASS = EthernetDictionary
MAX_WRITE_SIZE = ETH_MAX_WRITE_SIZE
COMMS_ETH_IP = "COMMS_ETH_IP"
COMMS_ETH_NET_MASK = "COMMS_ETH_NET_MASK"
COMMS_ETH_NET_GATEWAY = "COMMS_ETH_GW"
MONITORING_DATA = EthernetRegister(
identifier="",
units="",
subnode=0,
address=0x00B2,
cyclic="CONFIG",
dtype=REG_DTYPE.U16,
access=REG_ACCESS.RO,
)
DIST_DATA = EthernetRegister(
identifier="",
units="",
subnode=0,
address=0x00B4,
cyclic="CONFIG",
dtype=REG_DTYPE.U16,
access=REG_ACCESS.WO,
)
def __init__(
self, socket: socket.socket, dictionary_path: str, servo_status_listener: bool = False
) -> None:
self.socket = socket
self.ip_address, self.port = self.socket.getpeername()
super(EthernetServo, self).__init__(self.ip_address, dictionary_path, servo_status_listener)
[docs] def store_tcp_ip_parameters(self) -> None:
"""Stores the TCP/IP values. Affects IP address,
subnet mask and gateway"""
self.write(reg=self.STORE_COCO_ALL, data=PASSWORD_STORE_RESTORE_TCP_IP, subnode=0)
logger.info("Store TCP/IP successfully done.")
[docs] def restore_tcp_ip_parameters(self) -> None:
"""Restores the TCP/IP values back to default. Affects
IP address, subnet mask and gateway"""
self.write(reg=self.RESTORE_COCO_ALL, data=PASSWORD_STORE_RESTORE_TCP_IP, subnode=0)
logger.info("Restore TCP/IP successfully done.")
[docs] def change_tcp_ip_parameters(self, ip_address: str, subnet_mask: str, gateway: str) -> None:
"""Stores the TCP/IP values. Affects IP address,
network mask and gateway
.. note::
The drive needs a power cycle after this
in order for the changes to be properly applied.
Args:
ip_address: IP Address to be changed.
subnet_mask: Subnet mask to be changed.
gateway: Gateway to be changed.
Raises:
ValueError: If the drive or gateway IP is not a
valid IP address.
ValueError: If the drive IP and gateway IP are not
on the same network.
NetmaskValueError: If the subnet_mask is not a valid
netmask.
"""
drive_ip = ipaddress.ip_address(ip_address)
gateway_ip = ipaddress.ip_address(gateway)
net = ipaddress.IPv4Network(f"{drive_ip}/{subnet_mask}", strict=False)
if gateway_ip not in net:
raise ValueError(
f"Drive IP {ip_address} and Gateway IP {gateway} are not on the same network."
)
int_ip_address = convert_ip_to_int(ip_address)
int_subnet_mask = convert_ip_to_int(subnet_mask)
int_gateway = convert_ip_to_int(gateway)
self.write(self.COMMS_ETH_IP, int_ip_address, subnode=0)
self.write(self.COMMS_ETH_NET_MASK, int_subnet_mask, subnode=0)
self.write(self.COMMS_ETH_NET_GATEWAY, int_gateway, subnode=0)
try:
self.store_tcp_ip_parameters()
except ILError:
self.store_parameters()
def _write_raw(self, reg: EthernetRegister, data: bytes) -> None: # type: ignore [override]
self._send_mcb_frame(MCB_CMD_WRITE, reg.address, reg.subnode, data)
def _read_raw(self, reg: EthernetRegister) -> bytes: # type: ignore [override]
return self._send_mcb_frame(MCB_CMD_READ, reg.address, reg.subnode)
def _send_mcb_frame(
self, cmd: int, reg: int, subnode: int, data: Optional[bytes] = None
) -> bytes:
"""Send an MCB frame to the drive.
Args:
cmd: Read/write command.
reg: Register address to be read/written.
subnode: Target axis of the drive.
data: Data to be written to the register.
Returns:
The response frame.
"""
frame = MCB.build_mcb_frame(cmd, subnode, reg, data)
self._lock.acquire()
try:
try:
self.socket.sendall(frame)
except socket.error as e:
raise ILIOError("Error sending data.") from e
try:
return self.__receive_mcb_frame(reg)
except ILWrongRegisterError as e:
logger.error(e)
return self.__receive_mcb_frame(reg)
finally:
self._lock.release()
def __receive_mcb_frame(self, reg: int) -> bytes:
"""Receive frame from socket and return MCB data
Args:
reg: expected address
Returns:
MCB message data in bytes
Raises:
ILTimeoutError: socket timeout
ILIOError: socket error
"""
try:
response = self.socket.recv(ETH_BUF_SIZE)
except socket.timeout as e:
raise ILTimeoutError("Timeout while receiving data.") from e
except socket.error as e:
raise ILIOError("Error receiving data.") from e
return MCB.read_mcb_data(reg, response)