beacon_type support

This commit is contained in:
Laura Klünder 2024-12-28 14:59:09 +01:00
parent a2b19dad4e
commit ca6252583c
4 changed files with 213 additions and 22 deletions

View file

@ -1,9 +1,9 @@
import operator
import pickle
from dataclasses import dataclass, field
from enum import StrEnum
from functools import cached_property, reduce
from pprint import pprint
from typing import Annotated
from typing import Annotated, NamedTuple, Union
from typing import Optional, Self, Sequence, TypeAlias
from uuid import UUID
@ -24,13 +24,25 @@ try:
except ImportError:
from threading import local as LocalContext
LocatorPeerIdentifier: TypeAlias = MacAddress | str | tuple[
UUID, Annotated[NonNegativeInt, Lt(2 ** 16)], Annotated[NonNegativeInt, Lt(2 ** 16)]]
class PeerType(StrEnum):
WIFI = "wifi"
DECT = "dect"
IBEACON = "ibeacon"
class TypedIdentifier(NamedTuple):
peer_type: PeerType
identifier: Union[
MacAddress,
str,
tuple[UUID, Annotated[NonNegativeInt, Lt(2 ** 16)], Annotated[NonNegativeInt, Lt(2 ** 16)]]
]
@dataclass
class LocatorPeer:
identifier: LocatorPeerIdentifier
identifier: TypedIdentifier
frequencies: set[int] = field(default_factory=set)
xyz: Optional[tuple[int, int, int]] = None
space_id: Optional[int] = None
@ -67,7 +79,7 @@ class LocatorPoint:
@dataclass
class Locator:
peers: list[LocatorPeer] = field(default_factory=list)
peer_lookup: dict[LocatorPeerIdentifier, int] = field(default_factory=dict)
peer_lookup: dict[TypedIdentifier, int] = field(default_factory=dict)
xyz: np.array = field(default_factory=(lambda: np.empty((0,))))
spaces: dict[int, "LocatorSpace"] = field(default_factory=dict)
@ -83,9 +95,11 @@ class Locator:
for beacon in calculated.beacons.values():
identifiers = []
for bssid in beacon.wifi_bssids:
identifiers.append(bssid)
identifiers.append(TypedIdentifier(PeerType.WIFI, bssid))
if beacon.ibeacon_uuid and beacon.ibeacon_major is not None and beacon.ibeacon_minor is not None:
identifiers.append((beacon.ibeacon_uuid, beacon.ibeacon_major, beacon.ibeacon_minor))
identifiers.append(
TypedIdentifier(PeerType.IBEACON, (beacon.ibeacon_uuid, beacon.ibeacon_major, beacon.ibeacon_minor))
)
for identifier in identifiers:
peer_id = self.get_peer_id(identifier, create=True)
self.peers[peer_id].xyz = (
@ -111,7 +125,7 @@ class Locator:
if new_space.points:
self.spaces[space.pk] = new_space
def get_peer_id(self, identifier: LocatorPeerIdentifier, create=False) -> Optional[int]:
def get_peer_id(self, identifier: TypedIdentifier, create=False) -> Optional[int]:
peer_id = self.peer_lookup.get(identifier, None)
if peer_id is None and create:
peer = LocatorPeer(identifier=identifier)
@ -126,9 +140,9 @@ class Locator:
if settings.WIFI_SSIDS and scan_value.ssid not in settings.WIFI_SSIDS:
continue
peer_ids = {
self.get_peer_id(scan_value.bssid, create=create_peers),
self.get_peer_id(scan_value.ap_name, create=create_peers),
} - {None, ""}
self.get_peer_id(TypedIdentifier(PeerType.WIFI, scan_value.bssid), create=create_peers),
self.get_peer_id(TypedIdentifier(PeerType.WIFI, scan_value.ap_name), create=create_peers),
} - {None, ""}
for peer_id in peer_ids:
result[peer_id] = ScanDataValue(rssi=scan_value.rssi, distance=scan_value.distance)
return result
@ -137,7 +151,7 @@ class Locator:
result = {}
for scan_value in scan_data:
peer_id = self.get_peer_id(
(scan_value.uuid, scan_value.major, scan_value.minor),
TypedIdentifier(PeerType.IBEACON, (scan_value.uuid, scan_value.major, scan_value.minor)),
create=create_peers
)
if peer_id is not None:
@ -185,13 +199,13 @@ class Locator:
def convert_raw_scan_data(self, raw_scan_data: list[LocateWifiPeerSchema]) -> ScanData:
return self.convert_wifi_scan(raw_scan_data, create_peers=False)
def get_xyz(self, identifier: LocatorPeerIdentifier) -> tuple[int, int, int] | None:
def get_xyz(self, identifier: TypedIdentifier) -> tuple[int, int, int] | None:
i = self.get_peer_id(identifier)
if i is None:
return None
return self.peers[i].xyz
def get_all_nodes_xyz(self) -> dict[LocatorPeerIdentifier, tuple[float, float, float]]:
def get_all_nodes_xyz(self) -> dict[TypedIdentifier, tuple[float, float, float]]:
return {
peer.identifier: peer.xyz for peer in self.peers[:len(self.xyz)]
if isinstance(peer.identifier, MacAddress)