team-3/src/c3nav/routing/locator.py
Laura Klünder 2063b715e7 fix fixxfix
2024-12-27 02:15:10 +01:00

428 lines
16 KiB
Python

import operator
import pickle
from dataclasses import dataclass, field
from functools import cached_property, reduce
from pprint import pprint
from typing import Annotated
from typing import Optional, Self, Sequence, TypeAlias
from uuid import UUID
import numpy as np
from annotated_types import Lt
from django.conf import settings
from pydantic.types import NonNegativeInt
from pydantic_extra_types.mac_address import MacAddress
from c3nav.mapdata.models import MapUpdate, Space
from c3nav.mapdata.utils.locations import CustomLocation
from c3nav.mesh.utils import get_nodes_and_ranging_beacons
from c3nav.routing.router import Router
from c3nav.routing.schemas import LocateWifiPeerSchema, BeaconMeasurementDataSchema, LocateIBeaconPeerSchema
try:
from asgiref.local import Local as LocalContext
except ImportError:
from threading import local as LocalContext
LocatorPeerIdentifier: TypeAlias = MacAddress | tuple[UUID, Annotated[NonNegativeInt, Lt(2 ** 16)], Annotated[NonNegativeInt, Lt(2 ** 16)]]
@dataclass
class LocatorPeer:
identifier: LocatorPeerIdentifier
frequencies: set[int] = field(default_factory=set)
xyz: Optional[tuple[int, int, int]] = None
@dataclass
class ScanDataValue:
rssi: Optional[int] = None
ibeacon_range: Optional[float] = None
distance: Optional[float] = None
@classmethod
def average(cls, items: Sequence[Self]):
rssi = [item.rssi for item in items if item.rssi]
ibeacon_range = [item.ibeacon_range for item in items if item.ibeacon_range is not None]
distance = [item.distance for item in items if item.distance is not None]
return cls(
rssi=(sum(rssi)//len(rssi)) if rssi else None,
ibeacon_range=(sum(ibeacon_range) // len(ibeacon_range)) if ibeacon_range else None,
distance=(sum(distance)/len(distance)) if distance else None,
)
ScanData: TypeAlias = dict[int, ScanDataValue]
@dataclass
class LocatorPoint:
x: float
y: float
values: ScanData
@dataclass
class Locator:
peers: list[LocatorPeer] = field(default_factory=list)
peer_lookup: dict[LocatorPeerIdentifier, int] = field(default_factory=dict)
xyz: np.array = field(default_factory=(lambda: np.empty((0,))))
spaces: dict[int, "LocatorSpace"] = field(default_factory=dict)
@classmethod
def rebuild(cls, update, router):
locator = cls()
locator._rebuild(router)
pickle.dump(locator, open(cls.build_filename(update), 'wb'))
return locator
def _rebuild(self, router):
calculated = get_nodes_and_ranging_beacons()
for beacon in calculated.beacons.values():
identifiers = []
for bssid in beacon.wifi_bssids:
identifiers.append(bssid)
if beacon.ibeacon_uuid and beacon.ibeacon_major is not None and beacon.ibeacon_minor is not None:
identifiers.append((beacon.ibeacon_uuid, beacon.ibeacon_major, beacon.ibeacon_minor))
for identifier in identifiers:
peer_id = self.get_peer_id(identifier, create=True)
self.peers[peer_id].xyz = (
int(beacon.geometry.x * 100),
int(beacon.geometry.y * 100),
int((router.altitude_for_point(beacon.space_id, beacon.geometry) + float(beacon.altitude)) * 100),
)
self.xyz = np.array(tuple(peer.xyz for peer in self.peers))
for space in Space.objects.prefetch_related('beacon_measurements'):
new_space = LocatorSpace.create(
pk=space.pk,
points=tuple(
LocatorPoint(
x=measurement.geometry.x,
y=measurement.geometry.y,
values=self.convert_scans(measurement.data, create_peers=True),
)
for measurement in space.beacon_measurements.all()
)
)
if new_space.points:
self.spaces[space.pk] = new_space
def get_peer_id(self, identifier: LocatorPeerIdentifier, create=False) -> Optional[int]:
peer_id = self.peer_lookup.get(identifier, None)
if peer_id is None and create:
peer = LocatorPeer(identifier=identifier)
peer_id = len(self.peers)
self.peer_lookup[identifier] = peer_id
self.peers.append(peer)
return peer_id
def convert_wifi_scan(self, scan_data: list[LocateWifiPeerSchema], create_peers=False) -> ScanData:
result = {}
for scan_value in scan_data:
if settings.WIFI_SSIDS and scan_value.ssid not in settings.WIFI_SSIDS:
continue
peer_id = self.get_peer_id(scan_value.bssid, create=create_peers)
if peer_id is not None:
result[peer_id] = ScanDataValue(rssi=scan_value.rssi, distance=scan_value.distance)
return result
def convert_ibeacon_scan(self, scan_data: list[LocateIBeaconPeerSchema], create_peers=False) -> ScanData:
result = {}
for scan_value in scan_data:
peer_id = self.get_peer_id(
(scan_value.uuid, scan_value.major, scan_value.minor),
create=create_peers
)
if peer_id is not None:
result[peer_id] = ScanDataValue(ibeacon_range=scan_value.distance)
return result
def convert_scans(self, scans_data: BeaconMeasurementDataSchema, create_peers=False) -> ScanData:
converted = []
for scan in scans_data.wifi:
converted.append(self.convert_wifi_scan(scan, create_peers=create_peers))
for scan in scans_data.ibeacon:
converted.append(self.convert_ibeacon_scan(scan, create_peers=create_peers))
peer_ids = reduce(operator.or_, (frozenset(values.keys()) for values in converted), frozenset())
return {
peer_id: ScanDataValue.average(
tuple(values[peer_id] for values in converted if peer_id in values)
)
for peer_id in peer_ids
}
@classmethod
def build_filename(cls, update):
return settings.CACHE_ROOT / MapUpdate.build_cache_key(*update) / 'locator.pickle'
@classmethod
def load_nocache(cls, update):
return pickle.load(open(cls.build_filename(update), 'rb'))
cached = LocalContext()
class NoUpdate:
pass
@classmethod
def load(cls):
from c3nav.mapdata.models import MapUpdate
update = MapUpdate.last_processed_update()
if getattr(cls.cached, 'update', cls.NoUpdate) != update:
cls.cached.update = update
cls.cached.data = cls.load_nocache(update)
return cls.cached.data
def convert_raw_scan_data(self, raw_scan_data: list[LocateWifiPeerSchema]) -> ScanData:
return self.convert_wifi_scan(raw_scan_data, create_peers=False)
def get_xyz(self, identifier: LocatorPeerIdentifier) -> tuple[int, int, int] | None:
i = self.get_peer_id(identifier)
if i is None:
return None
return self.peers[i].xyz
def get_all_nodes_xyz(self) -> dict[LocatorPeerIdentifier, tuple[float, float, float]]:
return {
peer.identifier: peer.xyz for peer in self.peers[:len(self.xyz)]
if isinstance(peer.identifier, MacAddress)
}
def locate(self, raw_scan_data: list[LocateWifiPeerSchema], permissions=None):
# todo: support for ibeacons
scan_data = self.convert_raw_scan_data(raw_scan_data)
if not scan_data:
return None
result = self.locate_range(scan_data, permissions)
if result is not None:
return result
return self.locate_rssi(scan_data, permissions)
def locate_rssi(self, scan_data: ScanData, permissions=None):
router = Router.load()
restrictions = router.get_restrictions(permissions)
# get visible spaces
spaces = tuple(space for pk, space in self.spaces.items() if pk not in restrictions.spaces)
# find best point
best_peer_id = max(scan_data.items(), key=lambda v: v[1].rssi)[0]
best_location = None
best_score = float('inf')
for space in spaces:
point, score = space.get_best_point(scan_data, needed_peer_id=best_peer_id)
if point is None:
continue
if score < best_score:
location = CustomLocation(router.spaces[space.pk].level, point.x, point.y,
permissions=permissions, icon='my_location')
best_location = location
best_score = score
if best_location is not None:
best_location.score = best_score
return best_location
@cached_property
def least_squares_func(self):
# this is effectively a lazy import to save memory… todo: do we need that?
from scipy.optimize import least_squares
return least_squares
@cached_property
def norm_func(self):
# this is effectively a lazy import to save memory… todo: do we need that?
from scipy.linalg import norm
return norm
def locate_range(self, scan_data: ScanData, permissions=None, orig_addr=None):
pprint(scan_data)
peer_ids = tuple(i for i in scan_data if i < len(self.xyz))
if len(peer_ids) < 3:
# can't get a good result from just two beacons
# todo: maybe we can at least give… something?
print('less than 3 ranges, can\'t do ranging')
return None
if len(peer_ids) == 3 and 0:
print('2D trilateration')
dimensions = 2
else:
print('3D trilateration')
dimensions = 3
relevant_xyz = self.xyz[peer_ids, :]
# create 2d array with x, y, z, distance as rows
np_ranges = np.hstack((
relevant_xyz,
np.array(tuple(scan_data[i].distance for i in peer_ids)).reshape((-1, 1)),
))
print(np_ranges)
measured_ranges = np_ranges[:, 3]
print('a', measured_ranges)
# measured_ranges[measured_ranges<1] = 1
print('b', measured_ranges)
# rating the guess by calculating the distances
def diff_func(guess):
result = self.norm_func(np_ranges[:, :dimensions] - guess[:dimensions], axis=1) - measured_ranges
# print(result)
return result
# factors = self.norm_func(np_ranges[:, :dimensions] - guess[:dimensions], axis=1) / measured_ranges
# return factors - np.mean(factors)
def cost_func(guess):
result = np.abs(diff_func(guess))
result[result < 300] = result[result < 300]/3+200
return result
# initial guess i the average of all beacons, with scale 1
initial_guess = np.average(np_ranges[:, :dimensions], axis=0)
# here the magic happens
results = self.least_squares_func(
fun=cost_func,
# jac="3-point",
loss="linear",
bounds=(
np.min(self.beacon_positions[:, :dimensions], axis=0) - np.array([200, 200, 100])[:dimensions],
np.max(self.beacon_positions[:, :dimensions], axis=0) + np.array([200, 200, 100])[:dimensions],
),
x0=initial_guess,
)
# create result
# todo: figure out level
result_pos = results.x
from c3nav.mapdata.models import Level
location = CustomLocation(
level=Level.objects.first(),
x=result_pos[0]/100,
y=result_pos[1]/100,
permissions=(),
icon='my_location'
)
location.z = result_pos[2]/100
pprint(relevant_xyz)
orig_xyz = None
print('orig_addr', orig_addr)
if orig_addr:
orig_xyz = self.get_xyz(orig_addr)
if orig_xyz:
orig_xyz = np.array(orig_xyz)
print()
print("result:", ", ".join(("%.2f" % i) for i in tuple(result_pos)))
if orig_xyz is not None:
print("correct:", ", ".join(("%.2f" % i) for i in tuple(orig_xyz)))
print("diff:", ", ".join(("%.2f" % i) for i in tuple(orig_xyz-result_pos)))
print()
print("measured ranges:", ", ".join(("%.2f" % i) for i in tuple(np_ranges[:, 3])))
print("result ranges:", ", ".join(
("%.2f" % i) for i in tuple(self.norm_func(np_ranges[:, :dimensions] - result_pos[:dimensions], axis=1))
))
if orig_xyz is not None:
print("correct ranges:", ", ".join(
("%.2f" % i)
for i in tuple(self.norm_func(np_ranges[:, :dimensions] - orig_xyz[:dimensions], axis=1))
))
print()
print("diff result-measured:", ", ".join(
("%.2f" % i) for i in
tuple(diff_func(result_pos))
))
if orig_xyz is not None:
print("diff correct-measured:", ", ".join(
("%.2f" % i) for i in
tuple(diff_func(orig_xyz))
))
def print_cost(title, pos):
cost = cost_func(pos)
print(title, ", ".join(
("%.2f" % i) for i in cost
), '=', np.sum(cost**2))
print_cost("cost:", result_pos)
if orig_xyz is not None:
print_cost("cost of correct position:", orig_xyz)
if dimensions > 2:
print("height:", result_pos[2])
# print("scale:", (factor or results.x[3]))
return location
no_signal = int(-90)**2
@dataclass
class LocatorSpace:
pk: int
points: list[LocatorPoint]
peer_ids: frozenset[int]
peer_lookup: dict[int, int]
levels: np.array
@classmethod
def create(cls, pk: int, points: Sequence[LocatorPoint]):
peer_set = reduce(operator.or_, (frozenset(point.values.keys()) for point in points), frozenset())
peers = tuple(peer_set)
peer_lookup = {peer_id: i for i, peer_id in enumerate(peers)}
levels = np.full((len(points), len(peers)), fill_value=no_signal, dtype=np.int64)
for i, point in enumerate(points):
for peer_id, value in point.values.items():
if value.rssi is None:
continue # todo: ibeaconrange
levels[i][peer_lookup[peer_id]] = int(value.rssi)**2
return cls(
pk=pk,
points=list(points),
peer_ids=peer_set,
peer_lookup=peer_lookup,
levels=levels,
)
def get_best_point(self, scan_values: ScanData,
needed_peer_id=None) -> tuple[LocatorPoint, float] | tuple[None, None]:
# check if this space knows the needed peer id, otherwise no results here
if needed_peer_id not in self.peer_ids:
return None, None
# peers that this space knows
peer_ids = frozenset(scan_values.keys()) & self.peer_ids
penalty = 0
for peer_id, value in scan_values.items():
if peer_id not in self.peer_ids:
penalty += (value.rssi - no_signal)**2
peers = tuple(self.peer_lookup[peer_id] for peer_id in peer_ids)
values = np.array(tuple(scan_values[peer_id].rssi for peer_id in peer_ids), dtype=np.int64)
# acceptable points need to have a value for the needed_peer_id
points = tuple(
np.argwhere(self.levels[:, self.peer_lookup[needed_peer_id]] > 0).ravel()
)
if not points:
return None, None
scores = (np.sum(
(self.levels[np.array(points, dtype=np.uint32).reshape((-1, 1)), peers] - values)**2,
axis=1
)+penalty) / len(scan_values)
best_point_i = np.argmin(scores).ravel()[0]
best_point = points[best_point_i]
return self.points[best_point], scores[best_point_i]