import ftplib
import time
from collections import defaultdict
from .servo import EthernetServo
from ingenialink.utils.udp import UDP
from ingenialink.utils._utils import raise_err
from ..network import NET_PROT
from ingenialink.network import Network, NET_STATE, NET_DEV_EVT
from ingenialink.exceptions import ILFirmwareLoadError
from ingenialink.constants import DEFAULT_ETH_CONNECTION_TIMEOUT
from ftplib import FTP
from time import sleep
import os
import socket
import ingenialogger
from threading import Thread
from ping3 import ping
logger = ingenialogger.get_logger(__name__)
FTP_SESSION_OK_CODE = "220"
FTP_LOGIN_OK_CODE = "230"
FTP_FILE_TRANSFER_OK_CODE = "226"
FTP_CLOSE_OK_CODE = "221"
CMD_CHANGE_CPU = 0x67E4
MAX_NUM_UNSUCCESSFUL_PINGS = 3
[docs]class NetStatusListener(Thread):
"""Network status listener thread to check if the drive is alive.
Args:
network (EthernetNetwork): Network instance of the Ethernet communication.
"""
def __init__(self, network):
super(NetStatusListener, self).__init__()
self.__network = network
self.__stop = False
self.__max_unsuccessful_pings = MAX_NUM_UNSUCCESSFUL_PINGS
[docs] def run(self):
while not self.__stop:
for servo in self.__network.servos:
unsuccessful_pings = 0
servo_ip = servo.ip_address
servo_state = self.__network._get_servo_state(servo_ip)
while unsuccessful_pings < self.__max_unsuccessful_pings:
response = ping(servo_ip, timeout=1)
if not isinstance(response, float):
unsuccessful_pings += 1
else:
break
ping_response = unsuccessful_pings != self.__max_unsuccessful_pings
if servo_state == NET_STATE.CONNECTED and not ping_response:
self.__network._notify_status(servo_ip, NET_DEV_EVT.REMOVED)
self.__network._set_servo_state(servo_ip, NET_STATE.DISCONNECTED)
if servo_state == NET_STATE.DISCONNECTED and ping_response:
self.__network._notify_status(servo_ip, NET_DEV_EVT.ADDED)
self.__network._set_servo_state(servo_ip, NET_STATE.CONNECTED)
time.sleep(0.25)
def stop(self):
self.__stop = True
[docs]class EthernetNetwork(Network):
"""Network for all Ethernet communications."""
def __init__(self):
super(EthernetNetwork, self).__init__()
self.__servos_state = {}
self.__listener_net_status = None
self.__observers_net_state = defaultdict(list)
[docs] @staticmethod
def load_firmware(fw_file, target="192.168.2.22", ftp_user="", ftp_pwd=""):
"""Loads a given firmware file to the target slave.
.. warning ::
It is needed to disconnect the drive(:func:`disconnect_from_slave`)
after loading the firmware since the `Servo` object's data will
become obsolete.
Args:
fw_file (str): Path to the firmware file to be loaded.
target (str): IP of the target slave.
ftp_user (str): FTP user to connect with.
ftp_pwd (str): FTP password for the given user.
Raises:
ILError: If the loading firmware process fails.
"""
if not os.path.isfile(fw_file):
raise FileNotFoundError(f"Could not find {fw_file}.")
try:
file = open(fw_file, "rb")
ftp_output = None
ftp = FTP()
# Start a FTP session. Drive must be in BOOT mode.
logger.info("Starting FTP session...")
ftp_output = ftp.connect(target)
logger.info(ftp_output)
if FTP_SESSION_OK_CODE not in ftp_output:
raise_err("Unable to open FTP session")
# Login into FTP session.
logger.info("Logging into FTP session...")
ftp_output = ftp.login(ftp_user, ftp_pwd)
logger.info(ftp_output)
if FTP_LOGIN_OK_CODE not in ftp_output:
raise_err("Unable to login the FTP session")
# Load file through FTP.
logger.info("Uploading firmware file...")
ftp.set_pasv(False)
ftp_output = ftp.storbinary(f"STOR {os.path.basename(file.name)}", file)
logger.info(ftp_output)
if FTP_FILE_TRANSFER_OK_CODE not in ftp_output:
raise_err("Unable to load the FW file through FTP")
# Close FTP session.
logger.info("Closing FTP session...")
ftp.close()
# Close the temporal file
file.close()
except Exception as e:
logger.error(e)
raise ILFirmwareLoadError("Error during bootloader process.")
[docs] @staticmethod
def load_firmware_moco(node, subnode, ip, port, moco_file):
"""Update MOCO firmware through UDP protocol.
Args:
node: Network node.
subnode: Drive subnode.
ip: Drive address IP.
port: Drive port.
moco_file: Path to the firmware file.
Returns:
int: Result code.
Raises:
ILFirmwareLoadError: The firmware load process fails
with an error message.
"""
r = 0
upd = UDP(port, ip)
if not moco_file or not os.path.isfile(moco_file):
raise ILFirmwareLoadError("File not found")
moco_in = open(moco_file, "r")
logger.info("Loading firmware...")
try:
for line in moco_in:
words = line.split()
# Get command and address
cmd = int(words[1] + words[0], 16)
data = b""
data_start_byte = 2
while data_start_byte in range(data_start_byte, len(words)):
# Load UDP data
data += bytes([int(words[data_start_byte], 16)])
data_start_byte += 1
# Send message
upd.raw_cmd(node, subnode, cmd, data)
if cmd == CMD_CHANGE_CPU:
sleep(1)
logger.info("Bootload process succeeded")
except ftplib.error_temp as e:
logger.error(e)
raise ILFirmwareLoadError("Firewall might be blocking the access.")
except Exception as e:
logger.error(e)
raise ILFirmwareLoadError("Error during bootloader process.")
def scan_slaves(self):
raise NotImplementedError
[docs] def connect_to_slave(
self,
target,
dictionary=None,
port=1061,
connection_timeout=DEFAULT_ETH_CONNECTION_TIMEOUT,
servo_status_listener=False,
net_status_listener=False,
):
"""Connects to a slave through the given network settings.
Args:
target (str): IP of the target slave.
dictionary (str): Path to the target dictionary file.
port (int): Port to connect to the slave.
connection_timeout (float): Time in seconds of the connection timeout.
servo_status_listener (bool): Toggle the listener of the servo for
its status, errors, faults, etc.
net_status_listener (bool): Toggle the listener of the network
status, connection and disconnection.
Returns:
EthernetServo: Instance of the servo connected.
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(connection_timeout)
sock.connect((target, port))
self._set_servo_state(target, NET_STATE.CONNECTED)
servo = EthernetServo(sock, dictionary, servo_status_listener)
self.servos.append(servo)
if net_status_listener:
self.start_status_listener()
else:
self.stop_status_listener()
return servo
[docs] def disconnect_from_slave(self, servo):
"""Disconnects the slave from the network.
Args:
servo (EthernetServo): Instance of the servo connected.
"""
self.servos.remove(servo)
servo.stop_status_listener()
self.close_socket(servo.socket)
self._set_servo_state(servo.ip_address, NET_STATE.DISCONNECTED)
if len(self.servos) == 0:
self.stop_status_listener()
[docs] @staticmethod
def close_socket(sock):
"""Closes the established network socket."""
sock.shutdown(socket.SHUT_RDWR)
sock.close()
[docs] def start_status_listener(self):
"""Start monitoring network events (CONNECTION/DISCONNECTION)."""
if self.__listener_net_status is None:
listener = NetStatusListener(self)
listener.start()
self.__listener_net_status = listener
[docs] def stop_status_listener(self):
"""Stops the NetStatusListener from listening to the drive."""
if self.__listener_net_status is not None:
self.__listener_net_status.stop()
self.__listener_net_status.join()
self.__listener_net_status = None
def _notify_status(self, ip, status):
"""Notify subscribers of a network state change."""
for callback in self.__observers_net_state[ip]:
callback(status)
[docs] def subscribe_to_status(self, ip, callback):
"""Subscribe to network state changes.
Args:
ip (str): IP of the drive to subscribe.
callback (function): Callback function.
"""
if callback in self.__observers_net_state[ip]:
logger.info("Callback already subscribed.")
return
self.__observers_net_state[ip].append(callback)
[docs] def unsubscribe_from_status(self, ip, callback):
"""Unsubscribe from network state changes.
Args:
ip (str): IP of the drive to unsubscribe.
callback (function): Callback function.
"""
if callback not in self.__observers_net_state[ip]:
logger.info("Callback not subscribed.")
return
self.__observers_net_state[ip].remove(callback)
def _get_servo_state(self, ip):
return self.__servos_state[ip]
def _set_servo_state(self, ip, state):
self.__servos_state[ip] = state
@property
def protocol(self):
"""NET_PROT: Obtain network protocol."""
return NET_PROT.ETH