from enum import Enum
from abc import ABC, abstractmethod
from typing import List, Any, Callable, Union
import ingenialogger
from ingenialink.servo import Servo
logger = ingenialogger.get_logger(__name__)
[docs]class NET_PROT(Enum):
"""Network Protocol."""
EUSB = 0
MCB = 1
ETH = 2
ECAT = 3
CAN = 5
[docs]class NET_STATE(Enum):
"""Network State."""
CONNECTED = 0
DISCONNECTED = 1
FAULTY = 2
[docs]class NET_DEV_EVT(Enum):
"""Device Event."""
ADDED = 0
REMOVED = 1
[docs]class NET_TRANS_PROT(Enum):
"""Transmission protocol."""
TCP = 1
UDP = 2
[docs]class Network(ABC):
"""Declaration of a general Network object."""
def __init__(self) -> None:
self.servos: List[Any] = []
"""List of the connected servos in the network."""
@abstractmethod
def scan_slaves(self) -> List[int]:
raise NotImplementedError
@abstractmethod
def connect_to_slave(self, *args: Any, **kwargs: Any) -> Servo:
raise NotImplementedError
@abstractmethod
def disconnect_from_slave(self, servo: Servo) -> None:
raise NotImplementedError
@abstractmethod
def load_firmware(self, *args: Any, **kwargs: Any) -> None:
raise NotImplementedError
@abstractmethod
def subscribe_to_status(
self, target: Union[int, str], callback: Callable[[NET_DEV_EVT], Any]
) -> None:
raise NotImplementedError
@abstractmethod
def unsubscribe_from_status(
self, target: Union[int, str], callback: Callable[[NET_DEV_EVT], Any]
) -> None:
raise NotImplementedError
@abstractmethod
def start_status_listener(self, *args: Any, **kwargs: Any) -> None:
raise NotImplementedError
@abstractmethod
def stop_status_listener(self, *args: Any, **kwargs: Any) -> None:
raise NotImplementedError
@property
def protocol(self) -> NET_PROT:
raise NotImplementedError