add mesh message firmware

This commit is contained in:
Laura Klünder 2022-04-06 22:56:08 +02:00
parent 333b96f24f
commit 5e24ad4f2a
2 changed files with 60 additions and 4 deletions

View file

@ -12,14 +12,22 @@ class MeshConsumer(AsyncWebsocketConsumer):
print('disconnected!')
pass
async def send_msg(self, msg):
print('Sending message:', msg)
await self.send(bytes_data=msg.encode())
async def receive(self, text_data=None, bytes_data=None):
if bytes_data is None:
return
msg = messages.Message.decode(bytes_data)
print('Received message:', msg)
if isinstance(msg, messages.MeshSigninMessage):
await self.send(bytes_data=messages.MeshLayerAnnounceMessage(
await self.send_msg(messages.MeshLayerAnnounceMessage(
src='00:00:00:00:00:00',
dst=msg.src,
layer=messages.NO_LAYER
).encode())
))
await self.send_msg(messages.ConfigDumpMessage(
src='00:00:00:00:00:00',
dst=msg.src,
))

View file

@ -14,7 +14,36 @@ class SimpleFormat:
return struct.pack(self.fmt, value)
def decode(self, data: bytes):
return struct.unpack(self.fmt, data[:self.size]), data[self.size:]
value = struct.unpack(self.fmt, data[:self.size])
if len(value) == 1:
value = value[0]
return value, data[self.size:]
class FixedStrFormat:
def __init__(self, num):
self.num = num
def encode(self, value):
return struct.pack('%ss' % self.num, value)
def decode(self, data: bytes):
return struct.unpack('%ss' % self.num, data[:self.num])[0].rstrip(bytes((0, ))).decode(), data[self.num:]
class HexFormat:
def __init__(self, num, sep=''):
self.num = num
self.sep = sep
def encode(self, value):
return struct.pack('%ss' % self.num, bytes.fromhex(value))
def decode(self, data: bytes):
return (
struct.unpack('%ss' % self.num, data[:self.num])[0].hex(*([self.sep] if self.sep else [])),
data[self.num:]
)
class VarStrFormat:
@ -66,7 +95,7 @@ class Message:
@classmethod
def decode(cls, data: bytes):
print('decode', ' '.join(('%02x' % i) for i in data))
print('decode', data.hex(' '))
klass = cls.msg_types[data[12]]
values = {}
for field_ in fields(klass):
@ -103,3 +132,22 @@ class MeshAddDestinationsMessage(Message, msg_id=0x05):
@dataclass
class MeshRemoveDestinationsMessage(Message, msg_id=0x06):
mac_addresses: list[str] = field(default_factory=list, metadata={'format': MacAddressesListFormat()})
@dataclass
class ConfigDumpMessage(Message, msg_id=0x10):
pass
@dataclass
class ConfigFirmwareMessage(Message, msg_id=0x11):
magic_word: int = field(metadata={'format': SimpleFormat('I')}, repr=False)
secure_version: int = field(metadata={'format': SimpleFormat('I')})
reserv1: int = field(metadata={'format': SimpleFormat('2I')}, repr=False)
version: str = field(metadata={'format': FixedStrFormat(32)})
project_name: str = field(metadata={'format': FixedStrFormat(32)})
compile_time: str = field(metadata={'format': FixedStrFormat(16)})
compile_date: str = field(metadata={'format': FixedStrFormat(16)})
idf_version: str = field(metadata={'format': FixedStrFormat(32)})
app_elf_sha256: str = field(metadata={'format': HexFormat(32)})
reserv2: int = field(metadata={'format': SimpleFormat('20I')}, repr=False)