Source code for ingenialink.monitor
from enum import Enum
from ._ingenialink import ffi, lib
from ingenialink.utils._utils import raise_null, raise_err, to_ms
from ingenialink.register import _get_reg_id
[docs]class MONITOR_TRIGGER(Enum):
"""Monitor Trigger Types."""
IMMEDIATE = lib.IL_MONITOR_TRIGGER_IMMEDIATE
"""Immediate."""
MOTION = lib.IL_MONITOR_TRIGGER_MOTION
"""Motion start."""
POS = lib.IL_MONITOR_TRIGGER_POS
"""Positive."""
NEG = lib.IL_MONITOR_TRIGGER_NEG
"""Negative."""
WINDOW = lib.IL_MONITOR_TRIGGER_WINDOW
"""Exit window."""
DIN = lib.IL_MONITOR_TRIGGER_DIN
"""Digital input."""
[docs]class Monitor:
"""Monitor.
Args:
servo (Servo): Servo instance.
Raises:
ILCreationError: If the monitor could not be created.
"""
def __init__(self, servo):
monitor = lib.il_monitor_create(servo._cffi_servo)
raise_null(monitor)
self._monitor = ffi.gc(monitor, lib.il_monitor_destroy)
self._acq = ffi.new('il_monitor_acq_t **')
[docs] def start(self):
"""Start the monitor."""
r = lib.il_monitor_start(self._monitor)
raise_err(r)
[docs] def wait(self, timeout):
"""Wait until the current acquisition finishes.
Args:
timeout (int, float): Timeout (s).
"""
r = lib.il_monitor_wait(self._monitor, to_ms(timeout))
raise_err(r)
@property
def data(self):
"""Obtain configured data.
Returns:
tuple: Current acquisition time and data for all channels.
"""
lib.il_monitor_data_get(self._monitor, self._acq)
acq = ffi.cast('il_monitor_acq_t *', self._acq[0])
t = list(acq.t[0:acq.cnt])
d = []
for ch in range(lib.IL_MONITOR_CH_NUM):
if acq.d[ch] != ffi.NULL:
d.append(list(acq.d[ch][0:acq.cnt]))
else:
d.append(None)
return t, d
[docs] def configure(self, t_s, delay_samples=0, max_samples=0):
"""Configure the monitor parameters.
Args:
t_s (int, float): Sampling period (s, resolution: 1e-4 s).
delay_samples (int, optional): Delay samples.
max_samples (int, optional): Maximum acquisition samples.
"""
r = lib.il_monitor_configure(self._monitor, int(t_s * 1e6),
delay_samples, max_samples)
raise_err(r)
[docs] def ch_configure(self, ch, reg):
"""Configure a channel mapping.
Args:
ch (int): Channel.
reg (str, Register): Register to be mapped to the channel.
"""
_reg, _id = _get_reg_id(reg)
r = lib.il_monitor_ch_configure(self._monitor, ch, _reg, _id)
raise_err(r)
[docs] def ch_disable(self, ch):
"""Disable a channel.
Args:
ch (int): Channel identifier.
"""
r = lib.il_monitor_ch_disable(self._monitor, ch)
raise_err(r)
[docs] def ch_disable_all(self):
"""Disable all channels."""
r = lib.il_monitor_ch_disable_all(self._monitor)
raise_err(r)
[docs] def trigger_configure(self, mode, delay_samples=0, source=None, th_pos=0.,
th_neg=0., din_msk=0):
"""Configure the trigger.
Args:
mode (MONITOR_TRIGGER): Trigger mode.
delay_samples (int, optional): Delay samples.
source (str, Register, optional): Source register, required for
MONITOR_TRIGGER.POS, MONITOR_TRIGGER.NEG and
MONITOR_TRIGGER.WINDOW.
th_pos (int, float, optional): Positive threshold, used for
MONITOR_TRIGGER.POS, MONITOR_TRIGGER.WINDOW
th_neg (int, float, optional): Negative threshold, used for
MONITOR_TRIGGER.NEG, MONITOR_TRIGGER.WINDOW
din_msk (int, optional): Digital input mask, used for
MONITOR_TRIGGER.DIN
"""
if not isinstance(mode, MONITOR_TRIGGER):
raise TypeError('Invalid trigger mode')
_source_required = (MONITOR_TRIGGER.POS, MONITOR_TRIGGER.NEG,
MONITOR_TRIGGER.WINDOW)
if mode in _source_required:
_reg, _id = _get_reg_id(source)
else:
_reg, _id = ffi.NULL, ffi.NULL
r = lib.il_monitor_trigger_configure(
self._monitor, mode.value, delay_samples, _reg, _id, th_pos,
th_neg, din_msk)
raise_err(r)