Source code for ingenialink.ipb.register

from enum import Enum

from .._ingenialink import ffi, lib
from ingenialink.utils._utils import *
from ..register import Register, REG_DTYPE, REG_ACCESS, REG_PHY, dtypes_ranges

import collections


[docs]def get_enums(enums, enums_count): """Obtain enumerations list of the register. Returns: array: Enumerations of the register. """ aux_enums = [] for i in range(enums_count): aux_dict = { 'label': pstr(enums[i].label), 'value': enums[i].value } aux_enums.append(aux_dict) return aux_enums
[docs]def get_range(reg_range, dtype): """Obtains register range. Returns: tuple: Register range (min, max), None if undefined. """ if dtype == REG_DTYPE.S8: return reg_range.min.s8, reg_range.max.s8 elif dtype == REG_DTYPE.U8: return reg_range.min.u8, reg_range.max.u8 if dtype == REG_DTYPE.S16: return reg_range.min.s16, reg_range.max.s16 elif dtype == REG_DTYPE.U16: return reg_range.min.u16, reg_range.max.u16 if dtype == REG_DTYPE.S32: return reg_range.min.s32, reg_range.max.s32 elif dtype == REG_DTYPE.U32: return reg_range.min.u32, reg_range.max.u32 if dtype == REG_DTYPE.S64: return reg_range.min.s64, reg_range.max.s64 elif dtype == REG_DTYPE.U64: return reg_range.min.u64, reg_range.max.u64 elif dtype == REG_DTYPE.FLOAT: return reg_range.min.flt, reg_range.max.flt return None
[docs]def get_storage(storage, storage_valid, dtype): """Obtain register storage. Returns: int: Register storage. """ if not storage_valid: return None if dtype == REG_DTYPE.S8: return storage.s8 elif dtype == REG_DTYPE.U8: return storage.u8 if dtype == REG_DTYPE.S16: return storage.s16 elif dtype == REG_DTYPE.U16: return storage.u16 if dtype == REG_DTYPE.S32: return storage.s32 elif dtype == REG_DTYPE.U32: return storage.u32 if dtype == REG_DTYPE.S64: return storage.s64 elif dtype == REG_DTYPE.U64: return storage.u64 elif dtype == REG_DTYPE.FLOAT: return storage.flt else: return None
[docs]def ipb_register_from_cffi(cffi_register): """Creates an IPBRegister instance from a CFFI register instance. Args: cffi_register (CData): CFFI instance of the register. Returns: IPBRegister: Instance of the newly created register. """ units = None cyclic = None labels = None cat_id = None scat_id = None identifier = pstr(cffi_register.identifier) dtype = REG_DTYPE(cffi_register.dtype) access = REG_ACCESS(cffi_register.access) phy = REG_PHY(cffi_register.phy) address = cffi_register.address subnode = cffi_register.subnode storage = get_storage(cffi_register.storage, cffi_register.storage_valid, dtype) reg_range = get_range(cffi_register.range, dtype) enums_count = cffi_register.enums_count enums = get_enums(cffi_register.enums, enums_count) internal_use = cffi_register.internal_use if cffi_register.units != ffi.NULL: units = pstr(cffi_register.units) if cffi_register.cyclic != ffi.NULL: cyclic = pstr(cffi_register.cyclic) if cffi_register.cat_id != ffi.NULL: cat_id = pstr(cffi_register.cat_id) if cffi_register.scat_id != ffi.NULL: scat_id = pstr(cffi_register.scat_id) if cffi_register.labels != ffi.NULL: labels = LabelsDictionary._from_labels(cffi_register.labels) return IPBRegister(identifier, units, cyclic, dtype, access, address, phy, subnode, storage, reg_range, labels, enums, enums_count, cat_id, scat_id, internal_use, cffi_register)
[docs]class IPBRegister(Register): """IPB Register. Args: identifier (str): Identifier. units (str): Units. cyclic (str): Cyclic typed register. dtype (REG_DTYPE): Data type. access (REG_ACCESS): Access type. address (int): Address. phy (REG_PHY, optional): Physical units. subnode (int): Subnode storage (any, optional): Storage. reg_range (tuple, optional): Range (min, max). labels (dict, optional): Register labels. enums (dict, optional): Enumeration values. cat_id (str, optional): Category ID. scat_id (str, optional): Sub-category ID. internal_use (int, optional): Internal use. Raises: TypeError: If any of the parameters has invalid type. """ def __init__(self, identifier, units, cyclic, dtype, access, address, phy=REG_PHY.NONE, subnode=1, storage=None, reg_range=None, labels=None, enums=None, enums_count=0, cat_id=None, scat_id=None, internal_use=0, c_reg=None): if labels is None: labels = {} if enums is None: enums = [] super(IPBRegister, self).__init__( identifier, units, cyclic, dtype, access, phy, subnode, storage, reg_range, labels, enums, enums_count, cat_id, scat_id, internal_use) if not isinstance(dtype, REG_DTYPE): raise TypeError('Invalid data type') if not isinstance(access, REG_ACCESS): raise TypeError('Invalid access type') if not isinstance(phy, REG_PHY): raise TypeError('Invalid physical units type') if not cat_id and scat_id: raise ValueError('Sub-category requires a parent category') self._address = address self._labels = LabelsDictionary(labels) self.__update_range(dtype) if c_reg is None: self._reg = self.__create_c_reg() else: self._reg = c_reg def __repr__(self): """Obtain register object. Returns: str: Register information. """ if self.cat_id: cat_info = self.cat_id if self.scat_id: cat_info += ' + ' + self.scat_id else: cat_info = 'Uncategorized' if self.storage and self.storage_valid: storage_info = self.storage else: storage_info = 'No storage' return '<Register: {}, {}, {}, {}, 0x{:08x}, {}{}, {}, {}, [], {},' \ 'ST: {}, [{}], {}>'.format( self.identifier, self.units, self.subnode, self.cyclic, self.address, self.dtype, ' ∊ ' + str(self.range) if self.range else '', self.access, self.phy, self.enums, self.enums_count, storage_info, cat_info, self.internal_use) def __create_c_reg(self): _reg = ffi.new('il_reg_t *') _reg.identifier = ffi.new("char[]", cstr(self.identifier)) _reg.units = ffi.new("char[]", cstr(self.units)) _reg.address = self.address _reg.subnode = self.subnode _reg.cyclic = ffi.new("char[]", cstr(self.cyclic)) _reg.dtype = self.dtype.value _reg.access = self.access.value _reg.phy = self.phy.value _reg.internal_use = self.internal_use _reg.storage_valid = 0 if not self.storage else 1 dtype_attr = { REG_DTYPE.U8: "u8", REG_DTYPE.S8: "s8", REG_DTYPE.U16: "u16", REG_DTYPE.S16: "s16", REG_DTYPE.U32: "u32", REG_DTYPE.S32: "s32", REG_DTYPE.U64: "u64", REG_DTYPE.S64: "s64", REG_DTYPE.FLOAT: "flt" } if self.dtype in REG_DTYPE and self.dtype in dtype_attr: attr_name = dtype_attr[self.dtype] if self.storage: if self.dtype == REG_DTYPE.FLOAT: setattr(_reg.storage, attr_name, float(self.storage)) else: setattr(_reg.storage, attr_name, int(self.storage)) setattr(_reg.range.min, attr_name, self.range[0]) setattr(_reg.range.max, attr_name, self.range[1]) else: _reg.storage_valid = 0 _reg.labels = self._labels._labels _reg.enums_count = self.enums_count _reg.cat_id = ffi.NULL if not self.cat_id else cstr(self.cat_id) _reg.scat_id = ffi.NULL if not self.scat_id else cstr(self.scat_id) return _reg def __update_range(self, dtype): if dtype not in dtypes_ranges: return aux_range = ( self.range[0] if self.range[0] is not None else dtypes_ranges[dtype]["min"], self.range[1] if self.range[1] is not None else dtypes_ranges[dtype]["max"], ) self._range = aux_range @property def address(self): """int: Obtain register address.""" return self._address
[docs]class LabelsDictionary(collections.MutableMapping): """Labels dictionary. Args: labels (dict, optional): Labels. Raises: ILCreationError: If the dictionary could not be created. """ def __init__(self, labels=None): if labels is None: labels = {} _labels = lib.il_dict_labels_create() raise_null(_labels) self._labels = ffi.gc(_labels, lib.il_dict_labels_destroy) for lang, content in labels.items(): lib.il_dict_labels_set(self._labels, cstr(lang), cstr(content)) self._load_langs() @classmethod def _from_labels(cls, _labels): """Create a new class instance from an existing labels dictionary.""" inst = cls.__new__(cls) inst._labels = _labels inst._load_langs() return inst def _load_langs(self): """Load languages from dictionary (cache).""" langs = lib.il_dict_labels_langs_get(self._labels) self._langs = [] i = 0 lang = langs[0] while lang != ffi.NULL: self._langs.append(pstr(lang)) i += 1 lang = langs[i] lib.il_dict_labels_langs_destroy(langs) def __getitem__(self, lang): content_p = ffi.new('char **') r = lib.il_dict_labels_get(self._labels, cstr(lang), content_p) raise_err(r) return pstr(content_p[0]) def __setitem__(self, lang, content): lib.il_dict_labels_set(self._labels, cstr(lang), cstr(content)) self._langs.append(lang) def __delitem__(self, lang): lib.il_dict_labels_del(self._labels, cstr(lang)) self._langs.remove(lang) def __len__(self): return len(self._langs) def __iter__(self): return iter(self._langs)