From 330a32adf8dc46c0258a891f00083648332df2a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laura=20Kl=C3=BCnder?= Date: Thu, 19 Oct 2023 22:12:46 +0200 Subject: [PATCH] update message types and refactor configuration messages --- src/c3nav/mesh/baseformats.py | 124 ++++++++++++++++-- src/c3nav/mesh/dataformats.py | 83 +++++++++++- .../{mesh_msg_c.py => generate_c_types.py} | 24 ++-- src/c3nav/mesh/messages.py | 65 ++++----- 4 files changed, 230 insertions(+), 66 deletions(-) rename src/c3nav/mesh/management/commands/{mesh_msg_c.py => generate_c_types.py} (87%) diff --git a/src/c3nav/mesh/baseformats.py b/src/c3nav/mesh/baseformats.py index 149e1290..3ebbc180 100644 --- a/src/c3nav/mesh/baseformats.py +++ b/src/c3nav/mesh/baseformats.py @@ -39,6 +39,15 @@ class BaseFormat(ABC): pre, post = self.get_c_parts() return "%s %s%s;" % (pre, name, post) + def set_field_type(self, field_type): + self.field_type = field_type + + def get_c_definitions(self) -> dict[str, str]: + return {} + + def get_typedef_name(self): + return '%s_t' % normalize_name(self.field_type.__name__) + class SimpleFormat(BaseFormat): def __init__(self, fmt): @@ -76,6 +85,44 @@ class SimpleFormat(BaseFormat): return self.c_type, ("" if self.num == 1 else ("[%d]" % self.num)) +class EnumFormat(SimpleFormat): + def __init__(self, as_hex=False): + super().__init__("B") + self.as_hex = as_hex + + def set_field_type(self, field_type): + super().set_field_type(field_type) + self.c_struct_name = normalize_name(field_type.__name__)+'_t' + + def decode(self, data: bytes) -> tuple[Any, bytes]: + value, out_data = super().decode(data) + return self.field_type(value), out_data + + def get_c_parts(self): + return self.c_struct_name, "" + + def get_c_definitions(self) -> dict[str, str]: + prefix = normalize_name(self.field_type.__name__).upper() + options = [] + last_value = None + for item in self.field_type: + if last_value is not None and item.value != last_value+1: + options.append('') + last_value = item.value + options.append("%(prefix)s_%(name)s = %(value)s," % { + "prefix": prefix, + "name": normalize_name(item.name).upper(), + "value": ("0x%02x" if self.as_hex else "%d") % item.value + }) + + return { + self.c_struct_name: "enum {\n%(options)s\n};\ntypedef uint8_t %(name)s;" % { + "options": indent_c("\n".join(options)), + "name": self.c_struct_name, + } + } + + class BoolFormat(SimpleFormat): def __init__(self): super().__init__('B') @@ -202,6 +249,9 @@ class StructType: metadata = dict(attr.metadata) if "defining_class" not in metadata: metadata["defining_class"] = cls + if "format" in metadata: + metadata["format"].set_field_type(cls.__annotations__[attr_name]) + attr.metadata = metadata for key, values in cls._union_options.items(): @@ -389,7 +439,14 @@ class StructType: if "format" in field_.metadata: if not field_.metadata.get("union_discriminator") or field_.metadata.get("defining_class") == cls: items.append(( - field_.metadata["format"].get_c_code(name), + ( + ("%(typedef_name)s %(name)s;" % { + "typedef_name": field_.metadata["format"].get_typedef_name(), + "name": name, + }) + if field_.metadata.get("as_definition") + else field_.metadata["format"].get_c_code(name) + ), field_.metadata.get("doc", None), )), elif issubclass(field_.type, StructType): @@ -398,9 +455,16 @@ class StructType: items.extend(embedded_items) else: items.append(( - field_.type.get_c_code(name, typedef=False), + ( + ("%(typedef_name)s %(name)s;" % { + "typedef_name": field_.type.get_typedef_name(), + "name": name, + }) + if field_.metadata.get("as_definition") + else field_.type.get_c_code(name, typedef=False) + ), field_.metadata.get("doc", None), - )) + )), else: raise TypeError('field %s.%s has no format and is no StructType' % (cls.__name__, field_.name)) @@ -420,14 +484,41 @@ class StructType: default=0, ) + @classmethod + def get_c_definitions(cls) -> dict[str, str]: + definitions = {} + for field_ in fields(cls): + if "format" in field_.metadata: + definitions.update(field_.metadata["format"].get_c_definitions()) + if field_.metadata.get("as_definition"): + typedef_name = field_.metadata["format"].get_typedef_name() + definitions[typedef_name] = 'typedef %(code)s %(name)s;' % { + "code": ''.join(field_.metadata["format"].get_c_parts()), + "name": typedef_name, + } + elif issubclass(field_.type, StructType): + definitions.update(field_.type.get_c_definitions()) + if field_.metadata.get("as_definition"): + typedef_name = field_.type.get_typedef_name() + definitions[typedef_name] = field_.type.get_c_code(name=typedef_name, typedef=True) + else: + raise TypeError('field %s.%s has no format and is no StructType' % + (cls.__name__, field_.name)) + if cls.union_type_field: + for key, option in cls._union_options[cls.union_type_field].items(): + definitions.update(option.get_c_definitions()) + return definitions + @classmethod def get_c_union_code(cls, ignore_fields=None): union_items = [] for key, option in cls._union_options[cls.union_type_field].items(): base_name = normalize_name(getattr(key, 'name', option.__name__)) - union_items.append( - option.get_c_code(base_name, ignore_fields=ignore_fields, typedef=False, in_union=True) + item_c_code = option.get_c_code( + base_name, ignore_fields=ignore_fields, typedef=False, in_union=True, no_empty=True ) + if item_c_code: + union_items.append(item_c_code) size = cls.get_c_union_size() union_items.append( "uint8_t bytes[%0d]; " % size @@ -495,8 +586,8 @@ class StructType: return base_name @classmethod - def get_struct_name(cls, base_name): - return "%s_t" % base_name + def get_typedef_name(cls): + return "%s_t" % normalize_name(cls.__name__) @classmethod def get_min_size(cls, no_inherited_fields=False) -> int: @@ -514,9 +605,16 @@ class StructType: def normalize_name(name): if '_' in name: - return name.lower() - return re.sub( - r"([a-z])([A-Z])", - r"\1_\2", - name - ).lower() + name = name.lower() + else: + name = re.sub( + r"([a-zA-Z])([A-Z][a-z])", + r"\1_\2", + name + ).lower() + name = name.replace('config', 'cfg') + name = name.replace('position', 'pos') + name = name.replace('mesh_', '') + name = name.replace('firmware', 'fw') + name = name.replace('hardware', 'hw') + return name diff --git a/src/c3nav/mesh/dataformats.py b/src/c3nav/mesh/dataformats.py index 00e0dd5f..b22c1ffd 100644 --- a/src/c3nav/mesh/dataformats.py +++ b/src/c3nav/mesh/dataformats.py @@ -1,7 +1,8 @@ from dataclasses import dataclass, field from enum import IntEnum, unique -from c3nav.mesh.baseformats import FixedHexFormat, FixedStrFormat, SimpleFormat, StructType, VarArrayFormat +from c3nav.mesh.baseformats import (BoolFormat, EnumFormat, FixedHexFormat, FixedStrFormat, SimpleFormat, StructType, + VarArrayFormat) class MacAddressFormat(FixedHexFormat): @@ -16,19 +17,29 @@ class MacAddressesListFormat(VarArrayFormat): @unique class LedType(IntEnum): + NONE = 0 SERIAL = 1 MULTIPIN = 2 +@unique +class SerialLedType(IntEnum): + WS2812 = 1 + SK6812 = 2 + + @dataclass class LedConfig(StructType, union_type_field="led_type"): - led_type: LedType = field(metadata={"format": SimpleFormat('B')}) + """ + configuration for an optional connected status LED + """ + led_type: LedType = field(metadata={"format": EnumFormat(), "c_name": "type"}) @dataclass class SerialLedConfig(LedConfig, led_type=LedType.SERIAL): + serial_led_type: SerialLedType = field(metadata={"format": EnumFormat(), "c_name": "type"}) gpio: int = field(metadata={"format": SimpleFormat('B')}) - rmt: int = field(metadata={"format": SimpleFormat('B')}) @dataclass @@ -38,6 +49,72 @@ class MultipinLedConfig(LedConfig, led_type=LedType.MULTIPIN): gpio_blue: int = field(metadata={"format": SimpleFormat('B')}) +@dataclass +class UWBConfig(StructType): + """ + configuration for the connection to the UWB module + """ + enable: bool = field(metadata={"format": BoolFormat()}) + gpio_miso: int = field(metadata={"format": SimpleFormat('B')}) + gpio_mosi: int = field(metadata={"format": SimpleFormat('B')}) + gpio_clk: int = field(metadata={"format": SimpleFormat('B')}) + gpio_cs: int = field(metadata={"format": SimpleFormat('B')}) + gpio_irq: int = field(metadata={"format": SimpleFormat('B')}) + gpio_rst: int = field(metadata={"format": SimpleFormat('B')}) + gpio_wakeup: int = field(metadata={"format": SimpleFormat('B')}) + gpio_exton: int = field(metadata={"format": SimpleFormat('B')}) + + +@unique +class BoardType(IntEnum): + CUSTOM = 0x00 + + # devboards + ESP32_C3_DEVKIT_M_1 = 0x01 + ESP32_C3_32S = 2 + + # custom boards + C3NAV_UWB_BOARD = 0x10 + C3NAV_LOCATION_PCB_REV_0_1 = 0x11 + C3NAV_LOCATION_PCB_REV_0_2 = 0x12 + + +@dataclass +class BoardConfig(StructType, union_type_field="board"): + board: BoardType = field(metadata={"format": EnumFormat(as_hex=True)}) + + +@dataclass +class CustomBoardConfig(StructType, board=BoardType.CUSTOM): + uwb: UWBConfig = field(metadata={"as_definition": True}) + led: LedConfig = field(metadata={"as_definition": True}) + + +@dataclass +class DevkitMBoardConfig(StructType, board=BoardType.ESP32_C3_DEVKIT_M_1): + uwb: UWBConfig = field(metadata={"as_definition": True}) + + +@dataclass +class Esp32SBoardConfig(StructType, board=BoardType.ESP32_C3_32S): + uwb: UWBConfig = field(metadata={"as_definition": True}) + + +@dataclass +class UwbBoardConfig(StructType, board=BoardType.C3NAV_UWB_BOARD): + pass + + +@dataclass +class LocationPCBRev0Dot1BoardConfig(StructType, board=BoardType.C3NAV_LOCATION_PCB_REV_0_1): + pass + + +@dataclass +class LocationPCBRev0Dot2BoardConfig(StructType, board=BoardType.C3NAV_LOCATION_PCB_REV_0_2): + pass + + @dataclass class RangeItemType(StructType): address: str = field(metadata={"format": MacAddressFormat()}) diff --git a/src/c3nav/mesh/management/commands/mesh_msg_c.py b/src/c3nav/mesh/management/commands/generate_c_types.py similarity index 87% rename from src/c3nav/mesh/management/commands/mesh_msg_c.py rename to src/c3nav/mesh/management/commands/generate_c_types.py index 3107d82f..4fd9f73d 100644 --- a/src/c3nav/mesh/management/commands/mesh_msg_c.py +++ b/src/c3nav/mesh/management/commands/generate_c_types.py @@ -10,18 +10,12 @@ from c3nav.mesh.utils import indent_c class Command(BaseCommand): help = 'export mesh message structs for c code' - def shorten_name(self, name): - name = name.replace('config', 'cfg') - name = name.replace('position', 'pos') - name = name.replace('mesh_', '') - name = name.replace('firmware', 'fw') - return name - def handle(self, *args, **options): done_struct_names = set() nodata = set() struct_lines = {} struct_sizes = [] + done_definitions = set() ignore_names = set(field_.name for field_ in fields(MeshMessage)) for msg_id, msg_type in MeshMessage.get_types().items(): @@ -31,11 +25,17 @@ class Command(BaseCommand): done_struct_names.add(msg_type.c_struct_name) msg_type = MeshMessage.c_structs[msg_type.c_struct_name] - base_name = (msg_type.c_struct_name or self.shorten_name(normalize_name( + base_name = (msg_type.c_struct_name or normalize_name( getattr(msg_id, 'name', msg_type.__name__) - ))) + )) name = "mesh_msg_%s_t" % base_name + for definition_name, definition in msg_type.get_c_definitions().items(): + if definition_name not in done_definitions: + done_definitions.add(definition_name) + print(definition) + print() + code = msg_type.get_c_code(name, ignore_fields=ignore_names, no_empty=True) if code: size = msg_type.get_min_size(no_inherited_fields=True) @@ -58,14 +58,16 @@ class Command(BaseCommand): % max(struct_sizes) ) + print() + max_msg_type = max(MeshMessage.get_types().keys()) macro_data = [] for i in range(((max_msg_type//16)+1)*16): msg_type = MeshMessage.get_types().get(i, None) if msg_type: - name = (msg_type.c_struct_name or self.shorten_name(normalize_name( + name = (msg_type.c_struct_name or normalize_name( getattr(msg_type.msg_id, 'name', msg_type.__name__) - ))) + )) macro_data.append(( msg_type.get_c_enum_name()+',', ("nodata" if msg_type in nodata else name)+',', diff --git a/src/c3nav/mesh/messages.py b/src/c3nav/mesh/messages.py index 53e7511e..3ce85780 100644 --- a/src/c3nav/mesh/messages.py +++ b/src/c3nav/mesh/messages.py @@ -1,4 +1,3 @@ -import re from dataclasses import dataclass, field from enum import IntEnum, unique from typing import TypeVar @@ -6,8 +5,9 @@ from typing import TypeVar import channels from asgiref.sync import async_to_sync -from c3nav.mesh.baseformats import BoolFormat, FixedStrFormat, SimpleFormat, StructType, VarArrayFormat, VarStrFormat -from c3nav.mesh.dataformats import (FirmwareAppDescription, LedConfig, MacAddressesListFormat, MacAddressFormat, +from c3nav.mesh.baseformats import (BoolFormat, FixedStrFormat, SimpleFormat, StructType, VarArrayFormat, VarStrFormat, + normalize_name) +from c3nav.mesh.dataformats import (BoardConfig, FirmwareAppDescription, MacAddressesListFormat, MacAddressFormat, RangeItemType) from c3nav.mesh.utils import get_mesh_comm_group @@ -36,10 +36,11 @@ class MeshMessageType(IntEnum): MESH_ROUTING_FAILED = 0x0a CONFIG_DUMP = 0x10 - CONFIG_FIRMWARE = 0x11 - CONFIG_POSITION = 0x12 - CONFIG_LED = 0x13 + CONFIG_HARDWARE = 0x11 + CONFIG_BOARD = 0x12 + CONFIG_FIRMWARE = 0x13 CONFIG_UPLINK = 0x14 + CONFIG_POSITION = 0x15 LOCATE_REQUEST_RANGE = 0x20 LOCATE_RANGE_RESULTS = 0x21 @@ -91,17 +92,9 @@ class MeshMessage(StructType, union_type_field="msg_id"): def get_variable_name(cls, base_name): return cls.c_struct_name or base_name - @classmethod - def get_struct_name(cls, base_name): - return "mesh_msg_%s_t" % base_name - @classmethod def get_c_enum_name(cls): - return re.sub( - r"([a-z])([A-Z])", - r"\1_\2", - cls.__name__.removeprefix('Mesh').removesuffix('Message') - ).upper().replace('CONFIG', 'CFG').replace('FIRMWARE', 'FW').replace('POSITION', 'POS') + return normalize_name(cls.__name__.removeprefix('Mesh').removesuffix('Message')).upper() @dataclass @@ -212,35 +205,35 @@ class ConfigDumpMessage(MeshMessage, msg_id=MeshMessageType.CONFIG_DUMP): @dataclass -class ConfigFirmwareMessage(MeshMessage, msg_id=MeshMessageType.CONFIG_FIRMWARE): - """ respond firmware info """ +class ConfigHardwareMessage(MeshMessage, msg_id=MeshMessageType.CONFIG_HARDWARE): + """ respond hardware/chip info """ chip: int = field(metadata={ "format": SimpleFormat('H'), "c_name": "chip_id", }) revision_major: int = field(metadata={"format": SimpleFormat('B')}) revision_minor: int = field(metadata={"format": SimpleFormat('B')}) - app_desc: FirmwareAppDescription = field(metadata={'json_embed': True}) - - @classmethod - def upgrade_json(cls, data): - if 'revision' in data: - data['revision_major'], data['revision_minor'] = data.pop('revision') - return data def get_chip_display(self): return ChipType(self.chip).name.replace('_', '-') - @classmethod - def get_ignore_c_fields(self): - return { - "magic_word", "secure_version", "reserv1", "version", "project_name", - "compile_time", "compile_date", "idf_version", "app_elf_sha256", "reserv2" - } - @classmethod - def get_additional_c_fields(self): - return ("esp_app_desc_t app_desc;", ) +@unique +class BoardType(IntEnum): + SERIAL = 1 + MULTIPIN = 2 + + +@dataclass +class ConfigBoardMessage(MeshMessage, msg_id=MeshMessageType.CONFIG_BOARD): + """ set/respond board config """ + board_config: BoardConfig = field(metadata={"c_embed": True}) + + +@dataclass +class ConfigFirmwareMessage(MeshMessage, msg_id=MeshMessageType.CONFIG_FIRMWARE): + """ respond firmware info """ + app_desc: FirmwareAppDescription = field(metadata={'json_embed': True}) @dataclass @@ -251,12 +244,6 @@ class ConfigPositionMessage(MeshMessage, msg_id=MeshMessageType.CONFIG_POSITION) z_pos: int = field(metadata={"format": SimpleFormat('h')}) -@dataclass -class ConfigLedMessage(MeshMessage, msg_id=MeshMessageType.CONFIG_LED): - """ set/respond led config """ - led_config: LedConfig = field(metadata={"c_embed": True}) - - @dataclass class ConfigUplinkMessage(MeshMessage, msg_id=MeshMessageType.CONFIG_UPLINK): """ set/respond uplink config """