from ._ingenialink import ffi, lib
from ._utils import raise_null, raise_err, to_ms
from .registers import _get_reg_id
[docs]class Poller(object):
""" Register poller.
Args:
servo (Servo): Servo.
n_ch (int): Number of channels.
Raises:
ILCreationError: If the poller could not be created.
"""
def __init__(self, servo, n_ch):
poller = lib.il_poller_create(servo._servo, n_ch)
raise_null(poller)
self._poller = ffi.gc(poller, lib.il_poller_destroy)
self._n_ch = n_ch
self._acq = ffi.new('il_poller_acq_t **')
[docs] def start(self):
"""
Start poller.
"""
r = lib.il_poller_start(self._poller)
raise_err(r)
[docs] def stop(self):
"""
Stop poller.
"""
lib.il_poller_stop(self._poller)
@property
def data(self):
"""
Obtains processed data.
Returns:
tuple (list, list, bool): Time vector, array of data vectors and a
flag indicating if data was lost.
"""
lib.il_poller_data_get(self._poller, self._acq)
acq = ffi.cast('il_poller_acq_t *', self._acq[0])
t = list(acq.t[0:acq.cnt])
d = []
for ch in range(self._n_ch):
if acq.d[ch] != ffi.NULL:
d.append(list(acq.d[ch][0:acq.cnt]))
else:
d.append(None)
return t, d, bool(acq.lost)
[docs] def ch_disable(self, ch):
"""
Disable a channel.
Args:
ch (int): Channel to be disabled.
"""
r = lib.il_poller_ch_disable(self._poller, ch)
raise_err(r)
[docs] def ch_disable_all(self):
"""
Disable all channels.
"""
r = lib.il_poller_ch_disable_all(self._poller)
raise_err(r)