add node and ibeacon config messages
This commit is contained in:
parent
0ca93acbf6
commit
602035a624
2 changed files with 32 additions and 0 deletions
|
@ -8,6 +8,7 @@ from dataclasses import dataclass
|
||||||
from dataclasses import fields as dataclass_fields
|
from dataclasses import fields as dataclass_fields
|
||||||
from enum import IntEnum, Enum
|
from enum import IntEnum, Enum
|
||||||
from typing import Any, Sequence, Self, Annotated, Literal, Union, Type, TypeVar, ClassVar
|
from typing import Any, Sequence, Self, Annotated, Literal, Union, Type, TypeVar, ClassVar
|
||||||
|
from uuid import UUID
|
||||||
|
|
||||||
from annotated_types import SLOTS, BaseMetadata, Ge
|
from annotated_types import SLOTS, BaseMetadata, Ge
|
||||||
from pydantic.fields import Field, FieldInfo
|
from pydantic.fields import Field, FieldInfo
|
||||||
|
@ -278,6 +279,8 @@ class CFormat(ABC):
|
||||||
field_format = SimpleFormat(int_type)
|
field_format = SimpleFormat(int_type)
|
||||||
elif type_hint.base is bool:
|
elif type_hint.base is bool:
|
||||||
field_format = BoolFormat()
|
field_format = BoolFormat()
|
||||||
|
elif type_hint.base is UUID:
|
||||||
|
field_format = UUIDFormat()
|
||||||
elif type_hint.base in (str, bytes):
|
elif type_hint.base in (str, bytes):
|
||||||
as_hex = any(getattr(m, 'as_hex', False) for m in type_hint.metadata)
|
as_hex = any(getattr(m, 'as_hex', False) for m in type_hint.metadata)
|
||||||
max_length, var_len_name = type_hint.get_len_metadata()
|
max_length, var_len_name = type_hint.get_len_metadata()
|
||||||
|
@ -513,6 +516,17 @@ class FixedBytesFormat(SimpleFormat):
|
||||||
return data[:self.num], data[self.num:]
|
return data[:self.num], data[self.num:]
|
||||||
|
|
||||||
|
|
||||||
|
class UUIDFormat(SimpleFormat):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__("16B")
|
||||||
|
|
||||||
|
def encode(self, value: str | UUID):
|
||||||
|
return super().encode(UUID(value).bytes)
|
||||||
|
|
||||||
|
def decode(self, data: bytes) -> tuple[UUID, bytes]:
|
||||||
|
return UUID(bytes=data[:16]), data[16:]
|
||||||
|
|
||||||
|
|
||||||
class FixedHexFormat(SimpleFormat):
|
class FixedHexFormat(SimpleFormat):
|
||||||
def __init__(self, num, sep=''):
|
def __init__(self, num, sep=''):
|
||||||
self.num = num
|
self.num = num
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
from enum import unique
|
from enum import unique
|
||||||
from typing import Annotated, Union
|
from typing import Annotated, Union
|
||||||
|
from uuid import UUID
|
||||||
|
|
||||||
import channels
|
import channels
|
||||||
from annotated_types import Ge, Le, Lt, MaxLen
|
from annotated_types import Ge, Le, Lt, MaxLen
|
||||||
|
@ -45,6 +46,8 @@ class MeshMessageType(CEnum):
|
||||||
CONFIG_FIRMWARE = "CONFIG_FIRMWARE", 0x13
|
CONFIG_FIRMWARE = "CONFIG_FIRMWARE", 0x13
|
||||||
CONFIG_UPLINK = "CONFIG_UPLINK", 0x14
|
CONFIG_UPLINK = "CONFIG_UPLINK", 0x14
|
||||||
CONFIG_POSITION = "CONFIG_POSITION", 0x15
|
CONFIG_POSITION = "CONFIG_POSITION", 0x15
|
||||||
|
CONFIG_NODE = "CONFIG_NODE", 0x16
|
||||||
|
CONFIG_IBEACON = "CONFIG_IBEACON", 0x17
|
||||||
|
|
||||||
OTA_STATUS = "OTA_STATUS", 0x20
|
OTA_STATUS = "OTA_STATUS", 0x20
|
||||||
OTA_REQUEST_STATUS = "OTA_REQUEST_STATUS", 0x21
|
OTA_REQUEST_STATUS = "OTA_REQUEST_STATUS", 0x21
|
||||||
|
@ -163,6 +166,19 @@ class ConfigPositionMessage(discriminator_value(msg_type=MeshMessageType.CONFIG_
|
||||||
z_pos: Annotated[int, Ge(-2**15), Lt(2**15)]
|
z_pos: Annotated[int, Ge(-2**15), Lt(2**15)]
|
||||||
|
|
||||||
|
|
||||||
|
class ConfigNodeMessage(discriminator_value(msg_type=MeshMessageType.CONFIG_NODE), BaseModel):
|
||||||
|
""" set/respond node config """
|
||||||
|
number: Annotated[NonNegativeInt, Lt(2**16)]
|
||||||
|
name: Annotated[str, MaxLen(32)]
|
||||||
|
|
||||||
|
|
||||||
|
class ConfigIBeaconMessage(discriminator_value(msg_type=MeshMessageType.CONFIG_IBEACON), BaseModel):
|
||||||
|
""" set/respond ibeacon config """
|
||||||
|
uuid: UUID
|
||||||
|
major: Annotated[NonNegativeInt, Lt(2**32)]
|
||||||
|
minor: Annotated[NonNegativeInt, Lt(2**32)]
|
||||||
|
|
||||||
|
|
||||||
class ConfigUplinkMessage(discriminator_value(msg_type=MeshMessageType.CONFIG_UPLINK), BaseModel):
|
class ConfigUplinkMessage(discriminator_value(msg_type=MeshMessageType.CONFIG_UPLINK), BaseModel):
|
||||||
""" set/respond uplink config """
|
""" set/respond uplink config """
|
||||||
enabled: bool
|
enabled: bool
|
||||||
|
@ -303,6 +319,8 @@ MeshMessageContent = Annotated[
|
||||||
ConfigFirmwareMessage,
|
ConfigFirmwareMessage,
|
||||||
ConfigPositionMessage,
|
ConfigPositionMessage,
|
||||||
ConfigUplinkMessage,
|
ConfigUplinkMessage,
|
||||||
|
ConfigNodeMessage,
|
||||||
|
ConfigIBeaconMessage,
|
||||||
OTAStatusMessage,
|
OTAStatusMessage,
|
||||||
OTARequestStatusMessage,
|
OTARequestStatusMessage,
|
||||||
OTAStartMessage,
|
OTAStartMessage,
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue