update message types and refactor configuration messages

This commit is contained in:
Laura Klünder 2023-10-19 22:12:46 +02:00
parent e8a69eb02e
commit 330a32adf8
4 changed files with 230 additions and 66 deletions

View file

@ -39,6 +39,15 @@ class BaseFormat(ABC):
pre, post = self.get_c_parts() pre, post = self.get_c_parts()
return "%s %s%s;" % (pre, name, post) return "%s %s%s;" % (pre, name, post)
def set_field_type(self, field_type):
self.field_type = field_type
def get_c_definitions(self) -> dict[str, str]:
return {}
def get_typedef_name(self):
return '%s_t' % normalize_name(self.field_type.__name__)
class SimpleFormat(BaseFormat): class SimpleFormat(BaseFormat):
def __init__(self, fmt): def __init__(self, fmt):
@ -76,6 +85,44 @@ class SimpleFormat(BaseFormat):
return self.c_type, ("" if self.num == 1 else ("[%d]" % self.num)) return self.c_type, ("" if self.num == 1 else ("[%d]" % self.num))
class EnumFormat(SimpleFormat):
def __init__(self, as_hex=False):
super().__init__("B")
self.as_hex = as_hex
def set_field_type(self, field_type):
super().set_field_type(field_type)
self.c_struct_name = normalize_name(field_type.__name__)+'_t'
def decode(self, data: bytes) -> tuple[Any, bytes]:
value, out_data = super().decode(data)
return self.field_type(value), out_data
def get_c_parts(self):
return self.c_struct_name, ""
def get_c_definitions(self) -> dict[str, str]:
prefix = normalize_name(self.field_type.__name__).upper()
options = []
last_value = None
for item in self.field_type:
if last_value is not None and item.value != last_value+1:
options.append('')
last_value = item.value
options.append("%(prefix)s_%(name)s = %(value)s," % {
"prefix": prefix,
"name": normalize_name(item.name).upper(),
"value": ("0x%02x" if self.as_hex else "%d") % item.value
})
return {
self.c_struct_name: "enum {\n%(options)s\n};\ntypedef uint8_t %(name)s;" % {
"options": indent_c("\n".join(options)),
"name": self.c_struct_name,
}
}
class BoolFormat(SimpleFormat): class BoolFormat(SimpleFormat):
def __init__(self): def __init__(self):
super().__init__('B') super().__init__('B')
@ -202,6 +249,9 @@ class StructType:
metadata = dict(attr.metadata) metadata = dict(attr.metadata)
if "defining_class" not in metadata: if "defining_class" not in metadata:
metadata["defining_class"] = cls metadata["defining_class"] = cls
if "format" in metadata:
metadata["format"].set_field_type(cls.__annotations__[attr_name])
attr.metadata = metadata attr.metadata = metadata
for key, values in cls._union_options.items(): for key, values in cls._union_options.items():
@ -389,7 +439,14 @@ class StructType:
if "format" in field_.metadata: if "format" in field_.metadata:
if not field_.metadata.get("union_discriminator") or field_.metadata.get("defining_class") == cls: if not field_.metadata.get("union_discriminator") or field_.metadata.get("defining_class") == cls:
items.append(( items.append((
field_.metadata["format"].get_c_code(name), (
("%(typedef_name)s %(name)s;" % {
"typedef_name": field_.metadata["format"].get_typedef_name(),
"name": name,
})
if field_.metadata.get("as_definition")
else field_.metadata["format"].get_c_code(name)
),
field_.metadata.get("doc", None), field_.metadata.get("doc", None),
)), )),
elif issubclass(field_.type, StructType): elif issubclass(field_.type, StructType):
@ -398,9 +455,16 @@ class StructType:
items.extend(embedded_items) items.extend(embedded_items)
else: else:
items.append(( items.append((
field_.type.get_c_code(name, typedef=False), (
("%(typedef_name)s %(name)s;" % {
"typedef_name": field_.type.get_typedef_name(),
"name": name,
})
if field_.metadata.get("as_definition")
else field_.type.get_c_code(name, typedef=False)
),
field_.metadata.get("doc", None), field_.metadata.get("doc", None),
)) )),
else: else:
raise TypeError('field %s.%s has no format and is no StructType' % raise TypeError('field %s.%s has no format and is no StructType' %
(cls.__name__, field_.name)) (cls.__name__, field_.name))
@ -420,14 +484,41 @@ class StructType:
default=0, default=0,
) )
@classmethod
def get_c_definitions(cls) -> dict[str, str]:
definitions = {}
for field_ in fields(cls):
if "format" in field_.metadata:
definitions.update(field_.metadata["format"].get_c_definitions())
if field_.metadata.get("as_definition"):
typedef_name = field_.metadata["format"].get_typedef_name()
definitions[typedef_name] = 'typedef %(code)s %(name)s;' % {
"code": ''.join(field_.metadata["format"].get_c_parts()),
"name": typedef_name,
}
elif issubclass(field_.type, StructType):
definitions.update(field_.type.get_c_definitions())
if field_.metadata.get("as_definition"):
typedef_name = field_.type.get_typedef_name()
definitions[typedef_name] = field_.type.get_c_code(name=typedef_name, typedef=True)
else:
raise TypeError('field %s.%s has no format and is no StructType' %
(cls.__name__, field_.name))
if cls.union_type_field:
for key, option in cls._union_options[cls.union_type_field].items():
definitions.update(option.get_c_definitions())
return definitions
@classmethod @classmethod
def get_c_union_code(cls, ignore_fields=None): def get_c_union_code(cls, ignore_fields=None):
union_items = [] union_items = []
for key, option in cls._union_options[cls.union_type_field].items(): for key, option in cls._union_options[cls.union_type_field].items():
base_name = normalize_name(getattr(key, 'name', option.__name__)) base_name = normalize_name(getattr(key, 'name', option.__name__))
union_items.append( item_c_code = option.get_c_code(
option.get_c_code(base_name, ignore_fields=ignore_fields, typedef=False, in_union=True) base_name, ignore_fields=ignore_fields, typedef=False, in_union=True, no_empty=True
) )
if item_c_code:
union_items.append(item_c_code)
size = cls.get_c_union_size() size = cls.get_c_union_size()
union_items.append( union_items.append(
"uint8_t bytes[%0d]; " % size "uint8_t bytes[%0d]; " % size
@ -495,8 +586,8 @@ class StructType:
return base_name return base_name
@classmethod @classmethod
def get_struct_name(cls, base_name): def get_typedef_name(cls):
return "%s_t" % base_name return "%s_t" % normalize_name(cls.__name__)
@classmethod @classmethod
def get_min_size(cls, no_inherited_fields=False) -> int: def get_min_size(cls, no_inherited_fields=False) -> int:
@ -514,9 +605,16 @@ class StructType:
def normalize_name(name): def normalize_name(name):
if '_' in name: if '_' in name:
return name.lower() name = name.lower()
return re.sub( else:
r"([a-z])([A-Z])", name = re.sub(
r"\1_\2", r"([a-zA-Z])([A-Z][a-z])",
name r"\1_\2",
).lower() name
).lower()
name = name.replace('config', 'cfg')
name = name.replace('position', 'pos')
name = name.replace('mesh_', '')
name = name.replace('firmware', 'fw')
name = name.replace('hardware', 'hw')
return name

View file

@ -1,7 +1,8 @@
from dataclasses import dataclass, field from dataclasses import dataclass, field
from enum import IntEnum, unique from enum import IntEnum, unique
from c3nav.mesh.baseformats import FixedHexFormat, FixedStrFormat, SimpleFormat, StructType, VarArrayFormat from c3nav.mesh.baseformats import (BoolFormat, EnumFormat, FixedHexFormat, FixedStrFormat, SimpleFormat, StructType,
VarArrayFormat)
class MacAddressFormat(FixedHexFormat): class MacAddressFormat(FixedHexFormat):
@ -16,19 +17,29 @@ class MacAddressesListFormat(VarArrayFormat):
@unique @unique
class LedType(IntEnum): class LedType(IntEnum):
NONE = 0
SERIAL = 1 SERIAL = 1
MULTIPIN = 2 MULTIPIN = 2
@unique
class SerialLedType(IntEnum):
WS2812 = 1
SK6812 = 2
@dataclass @dataclass
class LedConfig(StructType, union_type_field="led_type"): class LedConfig(StructType, union_type_field="led_type"):
led_type: LedType = field(metadata={"format": SimpleFormat('B')}) """
configuration for an optional connected status LED
"""
led_type: LedType = field(metadata={"format": EnumFormat(), "c_name": "type"})
@dataclass @dataclass
class SerialLedConfig(LedConfig, led_type=LedType.SERIAL): class SerialLedConfig(LedConfig, led_type=LedType.SERIAL):
serial_led_type: SerialLedType = field(metadata={"format": EnumFormat(), "c_name": "type"})
gpio: int = field(metadata={"format": SimpleFormat('B')}) gpio: int = field(metadata={"format": SimpleFormat('B')})
rmt: int = field(metadata={"format": SimpleFormat('B')})
@dataclass @dataclass
@ -38,6 +49,72 @@ class MultipinLedConfig(LedConfig, led_type=LedType.MULTIPIN):
gpio_blue: int = field(metadata={"format": SimpleFormat('B')}) gpio_blue: int = field(metadata={"format": SimpleFormat('B')})
@dataclass
class UWBConfig(StructType):
"""
configuration for the connection to the UWB module
"""
enable: bool = field(metadata={"format": BoolFormat()})
gpio_miso: int = field(metadata={"format": SimpleFormat('B')})
gpio_mosi: int = field(metadata={"format": SimpleFormat('B')})
gpio_clk: int = field(metadata={"format": SimpleFormat('B')})
gpio_cs: int = field(metadata={"format": SimpleFormat('B')})
gpio_irq: int = field(metadata={"format": SimpleFormat('B')})
gpio_rst: int = field(metadata={"format": SimpleFormat('B')})
gpio_wakeup: int = field(metadata={"format": SimpleFormat('B')})
gpio_exton: int = field(metadata={"format": SimpleFormat('B')})
@unique
class BoardType(IntEnum):
CUSTOM = 0x00
# devboards
ESP32_C3_DEVKIT_M_1 = 0x01
ESP32_C3_32S = 2
# custom boards
C3NAV_UWB_BOARD = 0x10
C3NAV_LOCATION_PCB_REV_0_1 = 0x11
C3NAV_LOCATION_PCB_REV_0_2 = 0x12
@dataclass
class BoardConfig(StructType, union_type_field="board"):
board: BoardType = field(metadata={"format": EnumFormat(as_hex=True)})
@dataclass
class CustomBoardConfig(StructType, board=BoardType.CUSTOM):
uwb: UWBConfig = field(metadata={"as_definition": True})
led: LedConfig = field(metadata={"as_definition": True})
@dataclass
class DevkitMBoardConfig(StructType, board=BoardType.ESP32_C3_DEVKIT_M_1):
uwb: UWBConfig = field(metadata={"as_definition": True})
@dataclass
class Esp32SBoardConfig(StructType, board=BoardType.ESP32_C3_32S):
uwb: UWBConfig = field(metadata={"as_definition": True})
@dataclass
class UwbBoardConfig(StructType, board=BoardType.C3NAV_UWB_BOARD):
pass
@dataclass
class LocationPCBRev0Dot1BoardConfig(StructType, board=BoardType.C3NAV_LOCATION_PCB_REV_0_1):
pass
@dataclass
class LocationPCBRev0Dot2BoardConfig(StructType, board=BoardType.C3NAV_LOCATION_PCB_REV_0_2):
pass
@dataclass @dataclass
class RangeItemType(StructType): class RangeItemType(StructType):
address: str = field(metadata={"format": MacAddressFormat()}) address: str = field(metadata={"format": MacAddressFormat()})

View file

@ -10,18 +10,12 @@ from c3nav.mesh.utils import indent_c
class Command(BaseCommand): class Command(BaseCommand):
help = 'export mesh message structs for c code' help = 'export mesh message structs for c code'
def shorten_name(self, name):
name = name.replace('config', 'cfg')
name = name.replace('position', 'pos')
name = name.replace('mesh_', '')
name = name.replace('firmware', 'fw')
return name
def handle(self, *args, **options): def handle(self, *args, **options):
done_struct_names = set() done_struct_names = set()
nodata = set() nodata = set()
struct_lines = {} struct_lines = {}
struct_sizes = [] struct_sizes = []
done_definitions = set()
ignore_names = set(field_.name for field_ in fields(MeshMessage)) ignore_names = set(field_.name for field_ in fields(MeshMessage))
for msg_id, msg_type in MeshMessage.get_types().items(): for msg_id, msg_type in MeshMessage.get_types().items():
@ -31,11 +25,17 @@ class Command(BaseCommand):
done_struct_names.add(msg_type.c_struct_name) done_struct_names.add(msg_type.c_struct_name)
msg_type = MeshMessage.c_structs[msg_type.c_struct_name] msg_type = MeshMessage.c_structs[msg_type.c_struct_name]
base_name = (msg_type.c_struct_name or self.shorten_name(normalize_name( base_name = (msg_type.c_struct_name or normalize_name(
getattr(msg_id, 'name', msg_type.__name__) getattr(msg_id, 'name', msg_type.__name__)
))) ))
name = "mesh_msg_%s_t" % base_name name = "mesh_msg_%s_t" % base_name
for definition_name, definition in msg_type.get_c_definitions().items():
if definition_name not in done_definitions:
done_definitions.add(definition_name)
print(definition)
print()
code = msg_type.get_c_code(name, ignore_fields=ignore_names, no_empty=True) code = msg_type.get_c_code(name, ignore_fields=ignore_names, no_empty=True)
if code: if code:
size = msg_type.get_min_size(no_inherited_fields=True) size = msg_type.get_min_size(no_inherited_fields=True)
@ -58,14 +58,16 @@ class Command(BaseCommand):
% max(struct_sizes) % max(struct_sizes)
) )
print()
max_msg_type = max(MeshMessage.get_types().keys()) max_msg_type = max(MeshMessage.get_types().keys())
macro_data = [] macro_data = []
for i in range(((max_msg_type//16)+1)*16): for i in range(((max_msg_type//16)+1)*16):
msg_type = MeshMessage.get_types().get(i, None) msg_type = MeshMessage.get_types().get(i, None)
if msg_type: if msg_type:
name = (msg_type.c_struct_name or self.shorten_name(normalize_name( name = (msg_type.c_struct_name or normalize_name(
getattr(msg_type.msg_id, 'name', msg_type.__name__) getattr(msg_type.msg_id, 'name', msg_type.__name__)
))) ))
macro_data.append(( macro_data.append((
msg_type.get_c_enum_name()+',', msg_type.get_c_enum_name()+',',
("nodata" if msg_type in nodata else name)+',', ("nodata" if msg_type in nodata else name)+',',

View file

@ -1,4 +1,3 @@
import re
from dataclasses import dataclass, field from dataclasses import dataclass, field
from enum import IntEnum, unique from enum import IntEnum, unique
from typing import TypeVar from typing import TypeVar
@ -6,8 +5,9 @@ from typing import TypeVar
import channels import channels
from asgiref.sync import async_to_sync from asgiref.sync import async_to_sync
from c3nav.mesh.baseformats import BoolFormat, FixedStrFormat, SimpleFormat, StructType, VarArrayFormat, VarStrFormat from c3nav.mesh.baseformats import (BoolFormat, FixedStrFormat, SimpleFormat, StructType, VarArrayFormat, VarStrFormat,
from c3nav.mesh.dataformats import (FirmwareAppDescription, LedConfig, MacAddressesListFormat, MacAddressFormat, normalize_name)
from c3nav.mesh.dataformats import (BoardConfig, FirmwareAppDescription, MacAddressesListFormat, MacAddressFormat,
RangeItemType) RangeItemType)
from c3nav.mesh.utils import get_mesh_comm_group from c3nav.mesh.utils import get_mesh_comm_group
@ -36,10 +36,11 @@ class MeshMessageType(IntEnum):
MESH_ROUTING_FAILED = 0x0a MESH_ROUTING_FAILED = 0x0a
CONFIG_DUMP = 0x10 CONFIG_DUMP = 0x10
CONFIG_FIRMWARE = 0x11 CONFIG_HARDWARE = 0x11
CONFIG_POSITION = 0x12 CONFIG_BOARD = 0x12
CONFIG_LED = 0x13 CONFIG_FIRMWARE = 0x13
CONFIG_UPLINK = 0x14 CONFIG_UPLINK = 0x14
CONFIG_POSITION = 0x15
LOCATE_REQUEST_RANGE = 0x20 LOCATE_REQUEST_RANGE = 0x20
LOCATE_RANGE_RESULTS = 0x21 LOCATE_RANGE_RESULTS = 0x21
@ -91,17 +92,9 @@ class MeshMessage(StructType, union_type_field="msg_id"):
def get_variable_name(cls, base_name): def get_variable_name(cls, base_name):
return cls.c_struct_name or base_name return cls.c_struct_name or base_name
@classmethod
def get_struct_name(cls, base_name):
return "mesh_msg_%s_t" % base_name
@classmethod @classmethod
def get_c_enum_name(cls): def get_c_enum_name(cls):
return re.sub( return normalize_name(cls.__name__.removeprefix('Mesh').removesuffix('Message')).upper()
r"([a-z])([A-Z])",
r"\1_\2",
cls.__name__.removeprefix('Mesh').removesuffix('Message')
).upper().replace('CONFIG', 'CFG').replace('FIRMWARE', 'FW').replace('POSITION', 'POS')
@dataclass @dataclass
@ -212,35 +205,35 @@ class ConfigDumpMessage(MeshMessage, msg_id=MeshMessageType.CONFIG_DUMP):
@dataclass @dataclass
class ConfigFirmwareMessage(MeshMessage, msg_id=MeshMessageType.CONFIG_FIRMWARE): class ConfigHardwareMessage(MeshMessage, msg_id=MeshMessageType.CONFIG_HARDWARE):
""" respond firmware info """ """ respond hardware/chip info """
chip: int = field(metadata={ chip: int = field(metadata={
"format": SimpleFormat('H'), "format": SimpleFormat('H'),
"c_name": "chip_id", "c_name": "chip_id",
}) })
revision_major: int = field(metadata={"format": SimpleFormat('B')}) revision_major: int = field(metadata={"format": SimpleFormat('B')})
revision_minor: int = field(metadata={"format": SimpleFormat('B')}) revision_minor: int = field(metadata={"format": SimpleFormat('B')})
app_desc: FirmwareAppDescription = field(metadata={'json_embed': True})
@classmethod
def upgrade_json(cls, data):
if 'revision' in data:
data['revision_major'], data['revision_minor'] = data.pop('revision')
return data
def get_chip_display(self): def get_chip_display(self):
return ChipType(self.chip).name.replace('_', '-') return ChipType(self.chip).name.replace('_', '-')
@classmethod
def get_ignore_c_fields(self):
return {
"magic_word", "secure_version", "reserv1", "version", "project_name",
"compile_time", "compile_date", "idf_version", "app_elf_sha256", "reserv2"
}
@classmethod @unique
def get_additional_c_fields(self): class BoardType(IntEnum):
return ("esp_app_desc_t app_desc;", ) SERIAL = 1
MULTIPIN = 2
@dataclass
class ConfigBoardMessage(MeshMessage, msg_id=MeshMessageType.CONFIG_BOARD):
""" set/respond board config """
board_config: BoardConfig = field(metadata={"c_embed": True})
@dataclass
class ConfigFirmwareMessage(MeshMessage, msg_id=MeshMessageType.CONFIG_FIRMWARE):
""" respond firmware info """
app_desc: FirmwareAppDescription = field(metadata={'json_embed': True})
@dataclass @dataclass
@ -251,12 +244,6 @@ class ConfigPositionMessage(MeshMessage, msg_id=MeshMessageType.CONFIG_POSITION)
z_pos: int = field(metadata={"format": SimpleFormat('h')}) z_pos: int = field(metadata={"format": SimpleFormat('h')})
@dataclass
class ConfigLedMessage(MeshMessage, msg_id=MeshMessageType.CONFIG_LED):
""" set/respond led config """
led_config: LedConfig = field(metadata={"c_embed": True})
@dataclass @dataclass
class ConfigUplinkMessage(MeshMessage, msg_id=MeshMessageType.CONFIG_UPLINK): class ConfigUplinkMessage(MeshMessage, msg_id=MeshMessageType.CONFIG_UPLINK):
""" set/respond uplink config """ """ set/respond uplink config """