Locator now can deal with ibeacons in theory

This commit is contained in:
Laura Klünder 2024-03-31 19:36:02 +02:00
parent 3be707f52b
commit 76b8858923
4 changed files with 65 additions and 43 deletions

View file

@ -23,5 +23,5 @@ class MeshRangingView(TemplateView):
return {
"ranging_form": RangingForm(self.request.GET or None),
"node_names": get_node_names(),
"nodes_xyz": Locator.load().get_all_xyz(),
"nodes_xyz": Locator.load().get_all_nodes_xyz(),
}

View file

@ -4,6 +4,7 @@ from django.conf import settings
from django.core.exceptions import ValidationError
from ninja import Field as APIField
from ninja import Router as APIRouter
from pydantic_extra_types.mac_address import MacAddress
from c3nav.api.auth import auth_responses
from c3nav.api.schema import BaseSchema
@ -11,7 +12,7 @@ from c3nav.mapdata.models.access import AccessPermission
from c3nav.mapdata.schemas.models import CustomLocationSchema
from c3nav.mapdata.utils.cache.stats import increment_cache_key
from c3nav.routing.locator import Locator
from c3nav.routing.schemas import BSSIDSchema, LocateRequestWifiPeerSchema, LocateRequestIBeaconPeerSchema
from c3nav.routing.schemas import LocateRequestWifiPeerSchema, LocateRequestIBeaconPeerSchema
positioning_api_router = APIRouter(tags=["positioning"])
@ -98,7 +99,7 @@ def locate_test(request):
BeaconsXYZ = dict[
BSSIDSchema,
MacAddress,
Annotated[
tuple[
Annotated[int, APIField(title="X (in cm)")],
@ -107,12 +108,4 @@ BeaconsXYZ = dict[
],
APIField(title="global XYZ coordinates")
]
]
@positioning_api_router.get('/beacons-xyz/', summary="get beacon coordinates",
description="get xyz coordinates for all known positioning beacons",
response={200: BeaconsXYZ, **auth_responses})
def beacons_xyz():
# todo: update with more details? todo permission?
return Locator.load().get_all_xyz()
]

View file

@ -3,14 +3,19 @@ import pickle
from dataclasses import dataclass, field
from functools import cached_property, reduce
from pprint import pprint
from typing import Annotated
from typing import Optional, Self, Sequence, TypeAlias
from uuid import UUID
import numpy as np
from annotated_types import Lt
from django.conf import settings
from pydantic.types import NonNegativeInt
from pydantic_extra_types.mac_address import MacAddress
from c3nav.mapdata.models import MapUpdate, Space
from c3nav.mapdata.models.geometry.space import RangingBeacon
from c3nav.mapdata.utils.locations import CustomLocation
from c3nav.mesh.utils import get_nodes_and_ranging_beacons
from c3nav.routing.router import Router
from c3nav.routing.schemas import LocateRequestWifiPeerSchema
@ -19,27 +24,30 @@ try:
except ImportError:
from threading import local as LocalContext
BSSID: TypeAlias = str
LocatorPeerIdentifier: TypeAlias = MacAddress | tuple[UUID, Annotated[NonNegativeInt, Lt(2 ** 16)], Annotated[NonNegativeInt, Lt(2 ** 16)]]
@dataclass
class LocatorPeer:
bssid: BSSID
identifier: LocatorPeerIdentifier
frequencies: set[int] = field(default_factory=set)
xyz: Optional[tuple[int, int, int]] = None
@dataclass
class ScanDataValue:
rssi: int
rssi: Optional[int] = None
ibeacon_range: Optional[float] = None
distance: Optional[float] = None
@classmethod
def average(cls, items: Sequence[Self]):
rssi = [item.rssi for item in items]
rssi = [item.rssi for item in items if item.rssi]
ibeacon_range = [item.ibeacon_range for item in items if item.ibeacon_range is not None]
distance = [item.distance for item in items if item.distance is not None]
return cls(
rssi=(sum(rssi)//len(rssi)),
rssi=(sum(rssi)//len(rssi)) if rssi else None,
ibeacon_range=(sum(ibeacon_range) // len(ibeacon_range)) if ibeacon_range else None,
distance=(sum(distance)/len(distance)) if distance else None,
)
@ -57,7 +65,7 @@ class LocatorPoint:
@dataclass
class Locator:
peers: list[LocatorPeer] = field(default_factory=list)
peer_lookup: dict[BSSID, int] = field(default_factory=dict)
peer_lookup: dict[LocatorPeerIdentifier, int] = field(default_factory=dict)
xyz: np.array = field(default_factory=(lambda: np.empty((0,))))
spaces: dict[int, "LocatorSpace"] = field(default_factory=dict)
@ -69,13 +77,20 @@ class Locator:
return locator
def _rebuild(self, router):
for beacon in RangingBeacon.objects.all():
peer_id = self.get_peer_id(beacon.wifi_bssid, create=True)
self.peers[peer_id].xyz = (
int(beacon.geometry.x * 100),
int(beacon.geometry.y * 100),
int((router.altitude_for_point(beacon.space_id, beacon.geometry) + float(beacon.altitude)) * 100),
)
calculated = get_nodes_and_ranging_beacons()
for beacon in calculated.beacons.values():
identifiers = []
if beacon.wifi_bssid:
identifiers.append(beacon.wifi_bssid)
if beacon.ibeacon_uuid and beacon.ibeacon_major is not None and beacon.ibeacon_minor is not None:
identifiers.append((beacon.ibeacon_uuid, beacon.ibeacon_major, beacon.ibeacon_minor))
for identifier in identifiers:
peer_id = self.get_peer_id(identifier, create=True)
self.peers[peer_id].xyz = (
int(beacon.geometry.x * 100),
int(beacon.geometry.y * 100),
int((router.altitude_for_point(beacon.space_id, beacon.geometry) + float(beacon.altitude)) * 100),
)
self.xyz = np.array(tuple(peer.xyz for peer in self.peers))
for space in Space.objects.prefetch_related('beacon_measurements'):
@ -85,7 +100,7 @@ class Locator:
LocatorPoint(
x=measurement.geometry.x,
y=measurement.geometry.y,
values=self.convert_scans(measurement.data["wifi"], create_peers=True),
values=self.convert_scans(measurement.data, create_peers=True),
)
for measurement in space.beacon_measurements.all()
)
@ -93,16 +108,16 @@ class Locator:
if new_space.points:
self.spaces[space.pk] = new_space
def get_peer_id(self, bssid: BSSID, create=False) -> Optional[int]:
peer_id = self.peer_lookup.get(bssid, None)
def get_peer_id(self, identifier: LocatorPeerIdentifier, create=False) -> Optional[int]:
peer_id = self.peer_lookup.get(identifier, None)
if peer_id is None and create:
peer = LocatorPeer(bssid=bssid)
peer = LocatorPeer(identifier=identifier)
peer_id = len(self.peers)
self.peer_lookup[bssid] = peer_id
self.peer_lookup[identifier] = peer_id
self.peers.append(peer)
return peer_id
def convert_scan(self, scan_data, create_peers=False) -> ScanData:
def convert_wifi_scan(self, scan_data, create_peers=False) -> ScanData:
result = {}
for scan_value in scan_data:
if settings.WIFI_SSIDS and scan_value['ssid'] not in settings.WIFI_SSIDS:
@ -112,10 +127,25 @@ class Locator:
result[peer_id] = ScanDataValue(rssi=scan_value["rssi"], distance=scan_value.get("distance", None))
return result
def convert_ibeacon_scan(self, scan_data, create_peers=False) -> ScanData:
result = {}
for scan_value in scan_data:
peer_id = self.get_peer_id(
(scan_value['uuid'], scan_value['major'], scan_value['minor']),
create=create_peers
)
if peer_id is not None:
result[peer_id] = ScanDataValue(ibeacon_range=scan_value["distance"])
return result
def convert_scans(self, scans_data, create_peers=False) -> ScanData:
converted = []
for scan in scans_data:
converted.append(self.convert_scan(scan, create_peers=create_peers))
for scan in scans_data["wifi"]:
converted.append(self.convert_wifi_scan(scan, create_peers=create_peers))
for scan in scans_data["ibeacon"]:
converted.append(self.convert_ibeacon_scan(scan, create_peers=create_peers))
peer_ids = reduce(operator.or_, (frozenset(values.keys()) for values in converted), frozenset())
return {
peer_id: ScanDataValue.average(
@ -149,15 +179,16 @@ class Locator:
def convert_raw_scan_data(self, raw_scan_data: list[LocateRequestWifiPeerSchema]) -> ScanData:
return self.convert_scan(raw_scan_data, create_peers=False)
def get_xyz(self, address: BSSID) -> tuple[int, int, int] | None:
i = self.get_peer_id(address)
def get_xyz(self, identifier: LocatorPeerIdentifier) -> tuple[int, int, int] | None:
i = self.get_peer_id(identifier)
if i is None:
return None
return self.peers[i].xyz
def get_all_xyz(self) -> dict[BSSID, float]:
def get_all_nodes_xyz(self) -> dict[LocatorPeerIdentifier, tuple[float, float, float]]:
return {
peer: peer.xyz for peer in self.peers[:len(self.xyz)]
peer.identifier: peer.xyz for peer in self.peers[:len(self.xyz)]
if isinstance(peer.identifier, MacAddress)
}
def locate(self, raw_scan_data: list[LocateRequestWifiPeerSchema], permissions=None):

View file

@ -4,16 +4,14 @@ from uuid import UUID
from annotated_types import Lt
from pydantic import Field as APIField
from pydantic import NegativeInt, PositiveInt
from pydantic.types import NonNegativeInt, PositiveFloat, NonNegativeFloat
from pydantic.types import NonNegativeInt, NonNegativeFloat
from pydantic_extra_types.mac_address import MacAddress
from c3nav.api.schema import BaseSchema
from c3nav.api.utils import NonEmptyStr
BSSIDSchema = Annotated[str, APIField(pattern=r"^[a-z0-9]{2}(:[a-z0-9]{2}){5}$", title="BSSID")]
class LocateRequestWifiPeerSchema(BaseSchema):
bssid: BSSIDSchema = APIField(
bssid: MacAddress = APIField(
title="BSSID",
description="BSSID of the peer",
example="c3:42:13:37:ac:ab",