handle incoming messages from mesh
This commit is contained in:
parent
b815e46a7b
commit
67969951ed
5 changed files with 327 additions and 132 deletions
|
@ -1,133 +1,40 @@
|
|||
import struct
|
||||
from dataclasses import dataclass, field, fields
|
||||
from dataclasses import asdict, dataclass, field, fields, is_dataclass
|
||||
from enum import IntEnum, unique
|
||||
from typing import TypeVar
|
||||
|
||||
from c3nav.mesh.dataformats import (BoolFormat, FixedStrFormat, HexFormat, LedConfig, LedConfigFormat,
|
||||
MacAddressesListFormat, MacAddressFormat, SimpleFormat, VarStrFormat)
|
||||
|
||||
ROOT_ADDRESS = '00:00:00:00:00:00'
|
||||
PARENT_ADDRESS = '00:00:00:ff:ff:ff'
|
||||
NO_LAYER = 0xFF
|
||||
MAC_FMT = '%02x:%02x:%02x:%02x:%02x:%02x'
|
||||
|
||||
|
||||
class SimpleFormat:
|
||||
def __init__(self, fmt):
|
||||
self.fmt = fmt
|
||||
self.size = struct.calcsize(fmt)
|
||||
@unique
|
||||
class MessageType(IntEnum):
|
||||
ECHO_REQUEST = 0x01
|
||||
ECHO_RESPONSE = 0x02
|
||||
|
||||
def encode(self, value):
|
||||
return struct.pack(self.fmt, value)
|
||||
MESH_SIGNIN = 0x03
|
||||
MESH_LAYER_ANNOUNCE = 0x04
|
||||
MESH_ADD_DESTINATIONS = 0x05
|
||||
MESH_REMOVE_DESTINATIONS = 0x06
|
||||
|
||||
def decode(self, data: bytes):
|
||||
value = struct.unpack(self.fmt, data[:self.size])
|
||||
if len(value) == 1:
|
||||
value = value[0]
|
||||
return value, data[self.size:]
|
||||
CONFIG_DUMP = 0x10
|
||||
CONFIG_FIRMWARE = 0x11
|
||||
CONFIG_POSITION = 0x12
|
||||
CONFIG_LED = 0x13
|
||||
CONFIG_UPLINK = 0x14
|
||||
|
||||
|
||||
class FixedStrFormat:
|
||||
def __init__(self, num):
|
||||
self.num = num
|
||||
|
||||
def encode(self, value):
|
||||
return struct.pack('%ss' % self.num, value)
|
||||
|
||||
def decode(self, data: bytes):
|
||||
return struct.unpack('%ss' % self.num, data[:self.num])[0].rstrip(bytes((0, ))).decode(), data[self.num:]
|
||||
|
||||
|
||||
class BoolFormat:
|
||||
def encode(self, value):
|
||||
return struct.pack('B', (int(value), ))
|
||||
|
||||
def decode(self, data: bytes):
|
||||
return bool(struct.unpack('B', data[:1])[0]), data[1:]
|
||||
|
||||
|
||||
class HexFormat:
|
||||
def __init__(self, num, sep=''):
|
||||
self.num = num
|
||||
self.sep = sep
|
||||
|
||||
def encode(self, value):
|
||||
return struct.pack('%ss' % self.num, bytes.fromhex(value))
|
||||
|
||||
def decode(self, data: bytes):
|
||||
return (
|
||||
struct.unpack('%ss' % self.num, data[:self.num])[0].hex(*([self.sep] if self.sep else [])),
|
||||
data[self.num:]
|
||||
)
|
||||
|
||||
|
||||
class VarStrFormat:
|
||||
def encode(self, value: str) -> bytes:
|
||||
return bytes((len(value)+1, )) + value.encode() + bytes((0, ))
|
||||
|
||||
def decode(self, data: bytes):
|
||||
return data[1:data[0]].decode(), data[data[0]+1:]
|
||||
|
||||
|
||||
class MacAddressFormat:
|
||||
def encode(self, value: str) -> bytes:
|
||||
return bytes(int(value[i*3:i*3+2], 16) for i in range(6))
|
||||
|
||||
def decode(self, data: bytes):
|
||||
return (MAC_FMT % tuple(data[:6])), data[6:]
|
||||
|
||||
|
||||
class MacAddressesListFormat:
|
||||
def encode(self, value: list[str]) -> bytes:
|
||||
return bytes((len(value), )) + sum(
|
||||
(bytes((int(mac[i*3:i*3+2], 16) for i in range(6))) for mac in value),
|
||||
b''
|
||||
)
|
||||
|
||||
def decode(self, data: bytes):
|
||||
return [MAC_FMT % tuple(data[1+6*i:1+6+6*i]) for i in range(data[0])], data[1+data[0]*6:]
|
||||
|
||||
|
||||
class LedConfig:
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class SerialLedConfig(LedConfig):
|
||||
type = 1
|
||||
gpio: int
|
||||
rmt: int
|
||||
|
||||
|
||||
@dataclass
|
||||
class MultipinLedConfig(LedConfig):
|
||||
type = 2
|
||||
gpio_red: int
|
||||
gpio_green: int
|
||||
gpio_blue: int
|
||||
|
||||
|
||||
class LedConfigFormat:
|
||||
def encode(self, value) -> bytes:
|
||||
if value is None:
|
||||
return struct.pack('BBBB', (0, 0, 0, 0))
|
||||
if isinstance(value, SerialLedConfig):
|
||||
return struct.pack('BBBB', (value.type, value.gpio, value.rmt, 0))
|
||||
if isinstance(value, MultipinLedConfig):
|
||||
return struct.pack('BBBB', (value.type, value.gpio_red, value.gpio_green, value.gpio_blue))
|
||||
raise ValueError
|
||||
|
||||
def decode(self, data: bytes):
|
||||
type_, *bytes_ = struct.unpack('BBBB', data)
|
||||
if type_ == 0:
|
||||
value = None
|
||||
elif type_ == 1:
|
||||
value = SerialLedConfig(gpio=bytes_[0], rmt=bytes_[1])
|
||||
elif type_ == 2:
|
||||
value = MultipinLedConfig(gpio_red=bytes_[0], gpio_green=bytes_[1], gpio_blue=bytes_[2])
|
||||
else:
|
||||
raise ValueError
|
||||
return value, data[4:]
|
||||
M = TypeVar('M', bound='Message')
|
||||
|
||||
|
||||
@dataclass
|
||||
class Message:
|
||||
dst: str = field(metadata={'format': MacAddressFormat()})
|
||||
src: str = field(metadata={'format': MacAddressFormat()})
|
||||
msg_id: int = field(metadata={'format': SimpleFormat('B')}, init=False, repr=True)
|
||||
msg_id: int = field(metadata={'format': SimpleFormat('B')}, init=False, repr=False)
|
||||
msg_types = {}
|
||||
|
||||
# noinspection PyMethodOverriding
|
||||
|
@ -135,6 +42,8 @@ class Message:
|
|||
super().__init_subclass__(**kwargs)
|
||||
if msg_id:
|
||||
cls.msg_id = msg_id
|
||||
if msg_id in Message.msg_types:
|
||||
raise TypeError('duplicate use of msg_id %d' % msg_id)
|
||||
Message.msg_types[msg_id] = cls
|
||||
|
||||
def encode(self):
|
||||
|
@ -144,8 +53,8 @@ class Message:
|
|||
return data
|
||||
|
||||
@classmethod
|
||||
def decode(cls, data: bytes):
|
||||
print('decode', data.hex(' '))
|
||||
def decode(cls, data: bytes) -> M:
|
||||
# print('decode', data.hex(' '))
|
||||
klass = cls.msg_types[data[12]]
|
||||
values = {}
|
||||
for field_ in fields(klass):
|
||||
|
@ -153,44 +62,56 @@ class Message:
|
|||
values.pop('msg_id')
|
||||
return klass(**values)
|
||||
|
||||
def tojson(self):
|
||||
return asdict(self)
|
||||
|
||||
@classmethod
|
||||
def fromjson(cls, data) -> M:
|
||||
kwargs = data.copy()
|
||||
klass = cls.msg_types[kwargs.pop('msg_id')]
|
||||
for field_ in fields(klass):
|
||||
if is_dataclass(field_.type):
|
||||
kwargs[field_.name] = field_.type.fromjson(kwargs[field_.name])
|
||||
return klass(**kwargs)
|
||||
|
||||
|
||||
@dataclass
|
||||
class EchoRequestMessage(Message, msg_id=0x01):
|
||||
class EchoRequestMessage(Message, msg_id=MessageType.ECHO_REQUEST):
|
||||
content: str = field(default='', metadata={'format': VarStrFormat()})
|
||||
|
||||
|
||||
@dataclass
|
||||
class EchoResponseMessage(Message, msg_id=0x02):
|
||||
class EchoResponseMessage(Message, msg_id=MessageType.ECHO_RESPONSE):
|
||||
content: str = field(default='', metadata={'format': VarStrFormat()})
|
||||
|
||||
|
||||
@dataclass
|
||||
class MeshSigninMessage(Message, msg_id=0x03):
|
||||
class MeshSigninMessage(Message, msg_id=MessageType.MESH_SIGNIN):
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class MeshLayerAnnounceMessage(Message, msg_id=0x04):
|
||||
class MeshLayerAnnounceMessage(Message, msg_id=MessageType.MESH_LAYER_ANNOUNCE):
|
||||
layer: int = field(metadata={'format': SimpleFormat('B')})
|
||||
|
||||
|
||||
@dataclass
|
||||
class MeshAddDestinationsMessage(Message, msg_id=0x05):
|
||||
class MeshAddDestinationsMessage(Message, msg_id=MessageType.MESH_ADD_DESTINATIONS):
|
||||
mac_addresses: list[str] = field(default_factory=list, metadata={'format': MacAddressesListFormat()})
|
||||
|
||||
|
||||
@dataclass
|
||||
class MeshRemoveDestinationsMessage(Message, msg_id=0x06):
|
||||
class MeshRemoveDestinationsMessage(Message, msg_id=MessageType.MESH_REMOVE_DESTINATIONS):
|
||||
mac_addresses: list[str] = field(default_factory=list, metadata={'format': MacAddressesListFormat()})
|
||||
|
||||
|
||||
@dataclass
|
||||
class ConfigDumpMessage(Message, msg_id=0x10):
|
||||
class ConfigDumpMessage(Message, msg_id=MessageType.CONFIG_DUMP):
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class ConfigFirmwareMessage(Message, msg_id=0x11):
|
||||
class ConfigFirmwareMessage(Message, msg_id=MessageType.CONFIG_FIRMWARE):
|
||||
magic_word: int = field(metadata={'format': SimpleFormat('I')}, repr=False)
|
||||
secure_version: int = field(metadata={'format': SimpleFormat('I')})
|
||||
reserv1: list[int] = field(metadata={'format': SimpleFormat('2I')}, repr=False)
|
||||
|
@ -204,19 +125,19 @@ class ConfigFirmwareMessage(Message, msg_id=0x11):
|
|||
|
||||
|
||||
@dataclass
|
||||
class ConfigPositionMessage(Message, msg_id=0x12):
|
||||
class ConfigPositionMessage(Message, msg_id=MessageType.CONFIG_POSITION):
|
||||
x_pos: int = field(metadata={'format': SimpleFormat('I')})
|
||||
y_pos: int = field(metadata={'format': SimpleFormat('I')})
|
||||
z_pos: int = field(metadata={'format': SimpleFormat('H')})
|
||||
|
||||
|
||||
@dataclass
|
||||
class ConfigLedMessage(Message, msg_id=0x13):
|
||||
class ConfigLedMessage(Message, msg_id=MessageType.CONFIG_LED):
|
||||
led_config: LedConfig = field(metadata={'format': LedConfigFormat()})
|
||||
|
||||
|
||||
@dataclass
|
||||
class ConfigUplinkMessage(Message, msg_id=0x14):
|
||||
class ConfigUplinkMessage(Message, msg_id=MessageType.CONFIG_UPLINK):
|
||||
enabled: bool = field(metadata={'format': BoolFormat()})
|
||||
ssid: str = field(metadata={'format': FixedStrFormat(32)})
|
||||
password: str = field(metadata={'format': FixedStrFormat(64)})
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue