from ingenialink.utils._utils import raise_err
from ingenialink.utils import constants
from datetime import datetime
from threading import Timer, RLock
import ingenialogger
logger = ingenialogger.get_logger(__name__)
[docs]class PollerTimer:
"""Custom timer for the Poller.
Args:
time (int): Timeout to use for the timer.
cb (function): Callback.
"""
def __init__(self, time, cb):
self.cb = cb
self.time = time
self.thread = Timer(self.time, self.handle_function)
[docs] def handle_function(self):
"""Handle method that creates the timer for the poller"""
self.cb()
self.thread = Timer(self.time, self.handle_function)
self.thread.start()
[docs] def start(self):
"""Starts the poller timer"""
self.thread.start()
[docs] def cancel(self):
"""Stops the poller timer"""
self.thread.cancel()
if self.thread.is_alive():
self.thread.join()
[docs]class Poller:
"""Register poller for CANOpen/Ethernet communications.
Args:
servo (CanopenServo, EthernetServo): Servo.
num_channels (int): Number of channels.
"""
def __init__(self, servo, num_channels):
self.__servo = servo
self.__num_channels = num_channels
self.__sz = 0
self.__refresh_time = 0
self.__time_start = 0.0
self.__samples_count = 0
self.__samples_lost = False
self.__timer = None
self.__running = False
self.__mappings = []
self.__mappings_enabled = []
self.__lock = RLock()
self._reset_acq()
[docs] def start(self):
"""Start the poller."""
if self.__running:
logger.warning("Poller already running")
raise_err(constants.IL_EALREADY)
# Activate timer
self.__timer = PollerTimer(self.__refresh_time, self._acquire_callback_poller_data)
self.__timer.start()
self.__time_start = datetime.now()
self.__running = True
return 0
[docs] def stop(self):
"""Stop poller."""
if self.__running:
self.__timer.cancel()
self.__running = False
[docs] def ch_disable(self, channel):
"""Disable a channel.
Args:
channel (int): Channel to be disabled.
Raises:
ILStateError: The poller is already running.
ILValueError: Channel out of range.
Returns:
int: Status code.
"""
if self.__running:
logger.warning("Poller is running")
raise_err(constants.IL_ESTATE)
if channel > self.num_channels:
logger.error("Channel out of range")
raise_err(constants.IL_EINVAL)
# Set channel required as disabled
self.__mappings_enabled[channel] = False
return 0
[docs] def ch_disable_all(self):
"""Disable all channels.
Returns:
int: Status code.
"""
for channel in range(self.num_channels):
r = self.ch_disable(channel)
if r < 0:
raise_err(r)
return 0
def _reset_acq(self):
"""Resets the acquired channels."""
self.__acq = {"t": [], "d": []}
def _acquire_callback_poller_data(self):
"""Acquire callback for poller data."""
time_diff = datetime.now()
delta = time_diff - self.__time_start
# Obtain current time
t = delta.total_seconds()
self.__lock.acquire()
# Acquire all configured channels
if self.__samples_count >= self.__sz:
self.__samples_lost = True
else:
self.__acq["t"][self.__samples_count] = t
# Acquire enabled channels, comprehension list indexes obtained
enabled_channel_indexes = [
channel_idx
for channel_idx, is_enabled in enumerate(self.__mappings_enabled)
if is_enabled
]
for channel in enabled_channel_indexes:
for register_identifier, subnode in self.__mappings[channel].items():
self.__acq["d"][channel][self.__samples_count] = self.servo.read(
register_identifier, subnode
)
# Increment samples count
self.__samples_count += 1
self.__lock.release()
@property
def data(self):
"""tuple (list, list, bool): Time vector, array of data vectors and a
flag indicating if data was lost."""
t = list(self.__acq["t"][0 : self.__samples_count])
d = []
# Acquire enabled channels, comprehension list indexes obtained
enabled_channel_indexes = [
channel_idx
for channel_idx, is_enabled in enumerate(self.__mappings_enabled)
if is_enabled
]
for channel in range(self.num_channels):
if self.__mappings_enabled[channel]:
d.append(list(self.__acq["d"][channel][0 : self.__samples_count]))
else:
d.append(list(None))
self.__lock.acquire()
self.__samples_count = 0
self.__samples_lost = False
self.__lock.release()
return t, d, self.__samples_lost
@property
def servo(self):
"""Servo: Servo instance to be used."""
return self.__servo
@servo.setter
def servo(self, value):
self.__servo = value
@property
def num_channels(self):
"""int: Number of channels in the poller."""
return self.__num_channels
@num_channels.setter
def num_channels(self, value):
self.__num_channels = value