import operator import pickle from dataclasses import dataclass, field from functools import reduce from pprint import pprint from typing import Optional, Self, Sequence, TypeAlias import numpy as np import scipy from django.conf import settings from scipy.optimize import least_squares from c3nav.mapdata.models import MapUpdate, Space from c3nav.mapdata.models.geometry.space import RangingBeacon from c3nav.mapdata.utils.locations import CustomLocation from c3nav.routing.router import Router from c3nav.routing.schemas import LocateRequestPeerSchema try: from asgiref.local import Local as LocalContext except ImportError: from threading import local as LocalContext BSSID: TypeAlias = str @dataclass class LocatorPeer: bssid: BSSID frequencies: set[int] = field(default_factory=set) xyz: Optional[tuple[int, int, int]] = None @dataclass class ScanDataValue: rssi: int distance: Optional[float] = None @classmethod def average(cls, items: Sequence[Self]): rssi = [item.rssi for item in items] distance = [item.distance for item in items if item.distance is not None] return cls( rssi=(sum(rssi)//len(rssi)), distance=(sum(distance)/len(distance)) if distance else None, ) ScanData: TypeAlias = dict[int, ScanDataValue] @dataclass class LocatorPoint: x: float y: float values: ScanData @dataclass class Locator: peers: list[LocatorPeer] = field(default_factory=list) peer_lookup: dict[BSSID, int] = field(default_factory=dict) xyz: np.array = field(default_factory=(lambda: np.empty((0,)))) spaces: dict[int, "LocatorSpace"] = field(default_factory=dict) @classmethod def rebuild(cls, update, router): locator = cls() locator._rebuild(router) pickle.dump(locator, open(cls.build_filename(update), 'wb')) return locator def _rebuild(self, router): for beacon in RangingBeacon.objects.all(): peer_id = self.get_peer_id(beacon.bssid, create=True) self.peers[peer_id].xyz = ( int(beacon.geometry.x * 100), int(beacon.geometry.y * 100), int((router.altitude_for_point(beacon.space_id, beacon.geometry) + float(beacon.altitude)) * 100), ) self.xyz = np.array(tuple(peer.xyz for peer in self.peers)) for space in Space.objects.prefetch_related('wifi_measurements'): new_space = LocatorSpace.create( pk=space.pk, points=tuple( LocatorPoint( x=measurement.geometry.x, y=measurement.geometry.y, values=self.convert_scans(measurement.data, create_peers=True), ) for measurement in space.wifi_measurements.all() ) ) if new_space.points: self.spaces[space.pk] = new_space def get_peer_id(self, bssid: BSSID, create=False) -> Optional[int]: peer_id = self.peer_lookup.get(bssid, None) if peer_id is None and create: peer = LocatorPeer(bssid=bssid) peer_id = len(self.peers) self.peer_lookup[bssid] = peer_id self.peers.append(peer) return peer_id def convert_scan(self, scan_data, create_peers=False) -> ScanData: result = {} for scan_value in scan_data: if settings.WIFI_SSIDS and scan_value['ssid'] not in settings.WIFI_SSIDS: continue peer_id = self.get_peer_id(scan_value['bssid'], create=create_peers) if peer_id is not None: result[peer_id] = ScanDataValue(rssi=scan_value["rssi"], distance=scan_value.get("distance", None)) return result def convert_scans(self, scans_data, create_peers=False) -> ScanData: converted = [] for scan in scans_data: converted.append(self.convert_scan(scan, create_peers=create_peers)) peer_ids = reduce(operator.or_, (frozenset(values.keys()) for values in converted), frozenset()) return { peer_id: ScanDataValue.average( tuple(values[peer_id] for values in converted if peer_id in values) ) for peer_id in peer_ids } @classmethod def build_filename(cls, update): return settings.CACHE_ROOT / ('locator_%s.pickle' % MapUpdate.build_cache_key(*update)) @classmethod def load_nocache(cls, update): return pickle.load(open(cls.build_filename(update), 'rb')) cached = LocalContext() class NoUpdate: pass @classmethod def load(cls): from c3nav.mapdata.models import MapUpdate update = MapUpdate.last_processed_update() if getattr(cls.cached, 'update', cls.NoUpdate) != update: cls.cached.update = update cls.cached.data = cls.load_nocache(update) return cls.cached.data def convert_raw_scan_data(self, raw_scan_data: list[LocateRequestPeerSchema]) -> ScanData: return self.convert_scan(raw_scan_data, create_peers=False) def get_xyz(self, address: BSSID) -> tuple[int, int, int] | None: i = self.get_peer_id(address) if i is None: return None return self.peers[i].xyz def get_all_xyz(self) -> dict[BSSID, float]: return { peer: peer.xyz for peer in self.peers[:len(self.xyz)] } def locate(self, raw_scan_data: list[LocateRequestPeerSchema], permissions=None): scan_data = self.convert_raw_scan_data(raw_scan_data) if not scan_data: return None result = self.locate_range(scan_data, permissions) if result is not None: return result return self.locate_rssi(scan_data, permissions) def locate_rssi(self, scan_data: ScanData, permissions=None): router = Router.load() restrictions = router.get_restrictions(permissions) # get visible spaces spaces = tuple(space for pk, space in self.spaces.items() if pk not in restrictions.spaces) # find best point best_peer_id = max(scan_data.items(), key=operator.itemgetter(1))[0] best_location = None best_score = float('inf') for space in spaces: point, score = space.get_best_point(scan_data, needed_peer_id=best_peer_id) if point is None: continue if score < best_score: location = CustomLocation(router.spaces[space.pk].level, point.x, point.y, permissions=permissions, icon='my_location') best_location = location best_score = score best_location.score = best_score return best_location def locate_range(self, scan_data: ScanData, permissions=None, orig_addr=None): pprint(scan_data) peer_ids = tuple(i for i in scan_data if i < len(self.xyz)) if len(peer_ids) < 3: # can't get a good result from just two beacons # todo: maybe we can at least give… something? print('less than 3 ranges, can\'t do ranging') return None if len(peer_ids) == 3 and 0: print('2D trilateration') dimensions = 2 else: print('3D trilateration') dimensions = 3 relevant_xyz = self.xyz[peer_ids, :] # create 2d array with x, y, z, distance as rows np_ranges = np.hstack(( relevant_xyz, np.array(tuple(scan_data[i].distance for i in peer_ids)).reshape((-1, 1)), )) print(np_ranges) measured_ranges = np_ranges[:, 3] print('a', measured_ranges) # measured_ranges[measured_ranges<1] = 1 print('b', measured_ranges) # rating the guess by calculating the distances def diff_func(guess): result = scipy.linalg.norm(np_ranges[:, :dimensions] - guess[:dimensions], axis=1) - measured_ranges # print(result) return result # factors = scipy.linalg.norm(np_ranges[:, :dimensions] - guess[:dimensions], axis=1) / measured_ranges # return factors - np.mean(factors) def cost_func(guess): result = np.abs(diff_func(guess)) result[result < 300] = result[result < 300]/3+200 return result # initial guess i the average of all beacons, with scale 1 initial_guess = np.average(np_ranges[:, :dimensions], axis=0) # here the magic happens results = least_squares( fun=cost_func, # jac="3-point", loss="linear", bounds=( np.min(self.beacon_positions[:, :dimensions], axis=0) - np.array([200, 200, 100])[:dimensions], np.max(self.beacon_positions[:, :dimensions], axis=0) + np.array([200, 200, 100])[:dimensions], ), x0=initial_guess, ) # create result # todo: figure out level result_pos = results.x from c3nav.mapdata.models import Level location = CustomLocation( level=Level.objects.first(), x=result_pos[0]/100, y=result_pos[1]/100, permissions=(), icon='my_location' ) location.z = result_pos[2]/100 pprint(relevant_xyz) orig_xyz = None print('orig_addr', orig_addr) if orig_addr: orig_xyz = self.get_xyz(orig_addr) if orig_xyz: orig_xyz = np.array(orig_xyz) print() print("result:", ", ".join(("%.2f" % i) for i in tuple(result_pos))) if orig_xyz is not None: print("correct:", ", ".join(("%.2f" % i) for i in tuple(orig_xyz))) print("diff:", ", ".join(("%.2f" % i) for i in tuple(orig_xyz-result_pos))) print() print("measured ranges:", ", ".join(("%.2f" % i) for i in tuple(np_ranges[:, 3]))) print("result ranges:", ", ".join( ("%.2f" % i) for i in tuple(scipy.linalg.norm(np_ranges[:, :dimensions] - result_pos[:dimensions], axis=1)) )) if orig_xyz is not None: print("correct ranges:", ", ".join( ("%.2f" % i) for i in tuple(scipy.linalg.norm(np_ranges[:, :dimensions] - orig_xyz[:dimensions], axis=1)) )) print() print("diff result-measured:", ", ".join( ("%.2f" % i) for i in tuple(diff_func(result_pos)) )) if orig_xyz is not None: print("diff correct-measured:", ", ".join( ("%.2f" % i) for i in tuple(diff_func(orig_xyz)) )) def print_cost(title, pos): cost = cost_func(pos) print(title, ", ".join( ("%.2f" % i) for i in cost ), '=', np.sum(cost**2)) print_cost("cost:", result_pos) if orig_xyz is not None: print_cost("cost of correct position:", orig_xyz) if dimensions > 2: print("height:", result_pos[2]) # print("scale:", (factor or results.x[3])) return location no_signal = int(-90)**2 @dataclass class LocatorSpace: pk: int points: list[LocatorPoint] peer_ids: frozenset[int] peer_lookup: dict[int, int] levels: np.array @classmethod def create(cls, pk: int, points: Sequence[LocatorPoint]): peer_set = reduce(operator.or_, (frozenset(point.values.keys()) for point in points), frozenset()) peers = tuple(peer_set) peer_lookup = {peer_id: i for i, peer_id in enumerate(peers)} levels = np.full((len(points), len(peers)), fill_value=no_signal, dtype=np.int64) for i, point in enumerate(points): for peer_id, value in point.values.items(): levels[i][peer_lookup[peer_id]] = int(value)**2 return cls( pk=pk, points=list(points), peer_ids=peer_set, peer_lookup=peer_lookup, levels=levels, ) def get_best_point(self, scan_values: ScanData, needed_peer_id=None) -> tuple[LocatorPoint, float] | tuple[None, None]: # check if this space knows the needed peer id, otherwise no results here if needed_peer_id not in self.peer_ids: return None, None # peers that this space knows peer_id = frozenset(scan_values.keys()) & self.peer_ids penalty = 0 for peer_id, value in scan_values.items(): if peer_id not in self.peer_ids: penalty += (value.rssi - no_signal)**2 peers = tuple(self.peer_lookup[peer_id] for peer_id in peer_id) values = np.array(tuple(scan_values[peer_id] for peer_id in peer_id), dtype=np.int64) # acceptable points need to have a value for the needed_peer_id points = tuple( np.argwhere(self.levels[:, self.peer_lookup[needed_peer_id]] > 0).ravel() ) if not points: return None, None scores = (np.sum( (self.levels[np.array(points, dtype=np.uint32).reshape((-1, 1)), peers] - values)**2, axis=1 )+penalty) / len(scan_values) best_point_i = np.argmin(scores).ravel()[0] best_point = points[best_point_i] return self.points[best_point], scores[best_point_i]