from .._ingenialink import ffi, lib
from ingenialink.utils._utils import raise_null, raise_err, to_ms
from ingenialink.register_deprecated import _get_reg_id
from ingenialink.ethercat.servo import EthercatServo
[docs]class IPBPoller:
"""IPB poller.
Args:
servo (EthercatServo): Servo.
num_channels (int): Number of channels.
Raises:
ILCreationError: If the poller could not be created.
"""
def __init__(self, servo, num_channels):
self.__servo = servo
self.__num_channels = num_channels
poller = lib.il_poller_create(servo._cffi_servo, num_channels)
raise_null(poller)
self._poller = ffi.gc(poller, lib.il_poller_destroy)
self._n_ch = num_channels
self._acq = ffi.new("il_poller_acq_t **")
[docs] def start(self):
"""Start poller."""
r = lib.il_poller_start(self._poller)
raise_err(r)
[docs] def stop(self):
"""Stop poller."""
lib.il_poller_stop(self._poller)
[docs] def ch_disable(self, ch):
"""Disable a channel.
Args:
ch (int): Channel to be disabled.
"""
r = lib.il_poller_ch_disable(self._poller, ch)
raise_err(r)
[docs] def ch_disable_all(self):
"""Disable all channels."""
r = lib.il_poller_ch_disable_all(self._poller)
raise_err(r)
@property
def data(self):
"""Obtains processed data.
Returns:
tuple (list, list, bool): Time vector, array of data vectors and a
flag indicating if data was lost.
"""
lib.il_poller_data_get(self._poller, self._acq)
acq = ffi.cast("il_poller_acq_t *", self._acq[0])
t = list(acq.t[0 : acq.cnt])
d = []
for ch in range(self._n_ch):
if acq.d[ch] != ffi.NULL:
d.append(list(acq.d[ch][0 : acq.cnt]))
else:
d.append(None)
return t, d, bool(acq.lost)
@property
def servo(self):
"""Servo: Servo instance to be used."""
return self.__servo
@servo.setter
def servo(self, value):
self.__servo = value
@property
def num_channels(self):
"""int: Number of channels in the poller."""
return self.__num_channels
@num_channels.setter
def num_channels(self, value):
self.__num_channels = value