Source code for ingenialink.ethernet.servo

from .._ingenialink import lib, ffi
from ingenialink.utils._utils import *
from ingenialink.exceptions import ILError
from ingenialink.network import NET_TRANS_PROT
from ingenialink.constants import PASSWORD_STORE_RESTORE_TCP_IP
from ingenialink.ipb.register import IPBRegister, REG_DTYPE, REG_ACCESS
from ingenialink.ipb.servo import IPBServo, STORE_COCO_ALL, RESTORE_COCO_ALL

import ingenialogger
logger = ingenialogger.get_logger(__name__)


COMMS_ETH_IP = IPBRegister(
    identifier='', units='', subnode=0, address=0x00A1, cyclic='CONFIG',
    dtype=REG_DTYPE.U32, access=REG_ACCESS.RW
)
COMMS_ETH_NET_MASK = IPBRegister(
    identifier='', units='', subnode=0, address=0x00A2, cyclic='CONFIG',
    dtype=REG_DTYPE.U32, access=REG_ACCESS.RW
)
COMMS_ETH_NET_GATEWAY = IPBRegister(
    identifier='', units='', subnode=0, address=0x00A3, cyclic='CONFIG',
    dtype=REG_DTYPE.U32, access=REG_ACCESS.RW
)


[docs]class EthernetServo(IPBServo): """Servo object for all the Ethernet slave functionalities. Args: cffi_servo (CData): CData instance of the servo. cffi_net (CData): CData instance of the network. target (str): Target ID for the slave. port (int): Port for the communication. communication_protocol (NET_TRANS_PROT): Transmission protocol. dictionary_path (str): Path to the dictionary. servo_status_listener (bool): Toggle the listener of the servo for its status, errors, faults, etc. """ def __init__(self, cffi_servo, cffi_net, target, port, communication_protocol, dictionary_path=None, servo_status_listener=True): servo = ffi.gc(cffi_servo, lib.il_servo_fake_destroy) super(EthernetServo, self).__init__( servo, cffi_net, target, dictionary_path) self.port = port """int: Port number used for connections to the servo.""" self.communication_protocol = communication_protocol """NET_TRANS_PROT: Protocol used to connect to the servo.""" prod_name = '' if self.dictionary.part_number is None \ else self.dictionary.part_number self.full_name = '{} {} ({})'.format(prod_name, self.name, self.target) if servo_status_listener: self.start_status_listener() else: self.stop_status_listener()
[docs] def store_tcp_ip_parameters(self): """Stores the TCP/IP values. Affects IP address, subnet mask and gateway""" self.write(reg=STORE_COCO_ALL, data=PASSWORD_STORE_RESTORE_TCP_IP, subnode=0) logger.info('Store TCP/IP successfully done.')
[docs] def restore_tcp_ip_parameters(self): """Restores the TCP/IP values back to default. Affects IP address, subnet mask and gateway""" self.write(reg=RESTORE_COCO_ALL, data=PASSWORD_STORE_RESTORE_TCP_IP, subnode=0) logger.info('Restore TCP/IP successfully done.')
[docs] def change_tcp_ip_parameters(self, ip_address, subnet_mask, gateway): """Stores the TCP/IP values. Affects IP address, network mask and gateway .. note:: The drive needs a power cycle after this in order for the changes to be properly applied. Args: ip_address (str): IP Address to be changed. subnet_mask (str): Subnet mask to be changed. gateway (str): Gateway to be changed. """ int_ip_address = convert_ip_to_int(ip_address) int_subnet_mask = convert_ip_to_int(subnet_mask) int_gateway = convert_ip_to_int(gateway) self.write(COMMS_ETH_IP, int_ip_address) self.write(COMMS_ETH_NET_MASK, int_subnet_mask) self.write(COMMS_ETH_NET_GATEWAY, int_gateway) try: self.store_tcp_ip_parameters() except ILError: self.store_parameters()