team-3/src/c3nav/routing/locator.py

384 lines
13 KiB
Python
Raw Normal View History

2017-12-25 16:41:59 +01:00
import operator
import pickle
from dataclasses import dataclass, field
2017-12-25 16:41:59 +01:00
from functools import reduce
from pprint import pprint
from typing import Optional, Self, Sequence, TypeAlias
2017-12-25 16:41:59 +01:00
import numpy as np
import scipy
2017-12-25 16:41:59 +01:00
from django.conf import settings
from scipy.optimize import least_squares
2017-12-25 16:41:59 +01:00
from c3nav.mapdata.models import MapUpdate, Space
from c3nav.mapdata.models.geometry.space import RangingBeacon
from c3nav.mapdata.utils.locations import CustomLocation
from c3nav.routing.router import Router
from c3nav.routing.schemas import LocateRequestPeerSchema
2017-12-25 16:41:59 +01:00
try:
from asgiref.local import Local as LocalContext
except ImportError:
from threading import local as LocalContext
BSSID: TypeAlias = str
@dataclass
class LocatorPeer:
bssid: BSSID
frequencies: set[int] = field(default_factory=set)
xyz: Optional[tuple[int, int, int]] = None
2017-12-25 16:41:59 +01:00
@dataclass
class ScanDataValue:
rssi: int
distance: Optional[float] = None
2017-12-25 16:41:59 +01:00
@classmethod
def average(cls, items: Sequence[Self]):
rssi = [item.rssi for item in items]
distance = [item.distance for item in items if item.distance is not None]
return cls(
rssi=(sum(rssi)//len(rssi)),
distance=(sum(distance)/len(distance)) if distance else None,
)
ScanData: TypeAlias = dict[int, ScanDataValue]
@dataclass
class LocatorPoint:
x: float
y: float
values: ScanData
@dataclass
class Locator:
peers: list[LocatorPeer] = field(default_factory=list)
peer_lookup: dict[BSSID, int] = field(default_factory=dict)
2023-12-11 19:02:19 +01:00
xyz: np.array = field(default_factory=(lambda: np.empty((0,))))
spaces: dict[int, "LocatorSpace"] = field(default_factory=dict)
@classmethod
def rebuild(cls, update, router):
locator = cls()
locator._rebuild(router)
pickle.dump(locator, open(cls.build_filename(update), 'wb'))
return locator
def _rebuild(self, router):
for beacon in RangingBeacon.objects.all():
peer_id = self.get_peer_id(beacon.bssid, create=True)
self.peers[peer_id].xyz = (
int(beacon.geometry.x * 100),
int(beacon.geometry.y * 100),
int((router.altitude_for_point(beacon.space_id, beacon.geometry) + float(beacon.altitude)) * 100),
)
self.xyz = np.array(tuple(peer.xyz for peer in self.peers))
2017-12-25 16:41:59 +01:00
for space in Space.objects.prefetch_related('wifi_measurements'):
new_space = LocatorSpace.create(
2018-12-24 18:25:26 +01:00
pk=space.pk,
points=tuple(
LocatorPoint(
x=measurement.geometry.x,
y=measurement.geometry.y,
2023-12-27 21:32:00 +01:00
values=self.convert_scans(measurement.data, create_peers=True),
)
for measurement in space.wifi_measurements.all()
)
2017-12-25 16:41:59 +01:00
)
if new_space.points:
self.spaces[space.pk] = new_space
def get_peer_id(self, bssid: BSSID, create=False) -> Optional[int]:
peer_id = self.peer_lookup.get(bssid, None)
if peer_id is None and create:
peer = LocatorPeer(bssid=bssid)
peer_id = len(self.peers)
self.peer_lookup[bssid] = peer_id
self.peers.append(peer)
return peer_id
def convert_scan(self, scan_data, create_peers=False) -> ScanData:
result = {}
for scan_value in scan_data:
if settings.WIFI_SSIDS and scan_value['ssid'] not in settings.WIFI_SSIDS:
continue
peer_id = self.get_peer_id(scan_value['bssid'], create=create_peers)
if peer_id is not None:
2023-12-27 21:36:39 +01:00
result[peer_id] = ScanDataValue(rssi=scan_value["rssi"], distance=scan_value.get("distance", None))
return result
def convert_scans(self, scans_data, create_peers=False) -> ScanData:
converted = []
for scan in scans_data:
converted.append(self.convert_scan(scan, create_peers=create_peers))
peer_ids = reduce(operator.or_, (frozenset(values.keys()) for values in converted), frozenset())
return {
peer_id: ScanDataValue.average(
tuple(values[peer_id] for values in converted if peer_id in values)
)
for peer_id in peer_ids
}
2017-12-25 16:41:59 +01:00
@classmethod
def build_filename(cls, update):
return settings.CACHE_ROOT / ('locator_%s.pickle' % MapUpdate.build_cache_key(*update))
@classmethod
def load_nocache(cls, update):
return pickle.load(open(cls.build_filename(update), 'rb'))
cached = LocalContext()
class NoUpdate:
pass
@classmethod
def load(cls):
from c3nav.mapdata.models import MapUpdate
update = MapUpdate.last_processed_update()
if getattr(cls.cached, 'update', cls.NoUpdate) != update:
cls.cached.update = update
cls.cached.data = cls.load_nocache(update)
2023-12-07 22:15:50 +01:00
return cls.cached.data
def convert_raw_scan_data(self, raw_scan_data: list[LocateRequestPeerSchema]) -> ScanData:
2023-12-27 17:44:20 +01:00
return self.convert_scan(raw_scan_data, create_peers=False)
def get_xyz(self, address: BSSID) -> tuple[int, int, int] | None:
i = self.get_peer_id(address)
if i is None:
return None
return self.peers[i].xyz
def get_all_xyz(self) -> dict[BSSID, float]:
return {
peer: peer.xyz for peer in self.peers[:len(self.xyz)]
}
def locate(self, raw_scan_data: list[LocateRequestPeerSchema], permissions=None):
scan_data = self.convert_raw_scan_data(raw_scan_data)
if not scan_data:
return None
result = self.locate_range(scan_data, permissions)
if result is not None:
return result
return self.locate_rssi(scan_data, permissions)
def locate_rssi(self, scan_data: ScanData, permissions=None):
router = Router.load()
restrictions = router.get_restrictions(permissions)
2018-12-24 18:25:26 +01:00
# get visible spaces
2018-12-24 18:25:26 +01:00
spaces = tuple(space for pk, space in self.spaces.items() if pk not in restrictions.spaces)
2018-12-24 18:25:26 +01:00
# find best point
best_peer_id = max(scan_data.items(), key=lambda v: v[1].rssi)[0]
best_location = None
best_score = float('inf')
2018-12-24 18:25:26 +01:00
for space in spaces:
point, score = space.get_best_point(scan_data, needed_peer_id=best_peer_id)
2017-12-26 15:28:41 +01:00
if point is None:
continue
if score < best_score:
2018-12-24 18:25:26 +01:00
location = CustomLocation(router.spaces[space.pk].level, point.x, point.y,
permissions=permissions, icon='my_location')
best_location = location
best_score = score
2019-12-28 17:18:32 +01:00
best_location.score = best_score
return best_location
def locate_range(self, scan_data: ScanData, permissions=None, orig_addr=None):
pprint(scan_data)
2017-12-25 16:41:59 +01:00
peer_ids = tuple(i for i in scan_data if i < len(self.xyz))
2017-12-25 16:41:59 +01:00
if len(peer_ids) < 3:
# can't get a good result from just two beacons
# todo: maybe we can at least give… something?
print('less than 3 ranges, can\'t do ranging')
return None
2017-12-25 16:41:59 +01:00
if len(peer_ids) == 3 and 0:
print('2D trilateration')
dimensions = 2
else:
print('3D trilateration')
dimensions = 3
relevant_xyz = self.xyz[peer_ids, :]
# create 2d array with x, y, z, distance as rows
np_ranges = np.hstack((
relevant_xyz,
np.array(tuple(scan_data[i].distance for i in peer_ids)).reshape((-1, 1)),
))
print(np_ranges)
measured_ranges = np_ranges[:, 3]
print('a', measured_ranges)
# measured_ranges[measured_ranges<1] = 1
print('b', measured_ranges)
# rating the guess by calculating the distances
def diff_func(guess):
result = scipy.linalg.norm(np_ranges[:, :dimensions] - guess[:dimensions], axis=1) - measured_ranges
# print(result)
return result
# factors = scipy.linalg.norm(np_ranges[:, :dimensions] - guess[:dimensions], axis=1) / measured_ranges
# return factors - np.mean(factors)
def cost_func(guess):
result = np.abs(diff_func(guess))
result[result < 300] = result[result < 300]/3+200
return result
# initial guess i the average of all beacons, with scale 1
initial_guess = np.average(np_ranges[:, :dimensions], axis=0)
# here the magic happens
results = least_squares(
fun=cost_func,
# jac="3-point",
loss="linear",
bounds=(
np.min(self.beacon_positions[:, :dimensions], axis=0) - np.array([200, 200, 100])[:dimensions],
np.max(self.beacon_positions[:, :dimensions], axis=0) + np.array([200, 200, 100])[:dimensions],
),
x0=initial_guess,
)
2017-12-25 16:41:59 +01:00
# create result
# todo: figure out level
result_pos = results.x
from c3nav.mapdata.models import Level
location = CustomLocation(
level=Level.objects.first(),
x=result_pos[0]/100,
y=result_pos[1]/100,
permissions=(),
icon='my_location'
)
location.z = result_pos[2]/100
pprint(relevant_xyz)
orig_xyz = None
print('orig_addr', orig_addr)
if orig_addr:
orig_xyz = self.get_xyz(orig_addr)
if orig_xyz:
orig_xyz = np.array(orig_xyz)
print()
print("result:", ", ".join(("%.2f" % i) for i in tuple(result_pos)))
if orig_xyz is not None:
print("correct:", ", ".join(("%.2f" % i) for i in tuple(orig_xyz)))
print("diff:", ", ".join(("%.2f" % i) for i in tuple(orig_xyz-result_pos)))
print()
print("measured ranges:", ", ".join(("%.2f" % i) for i in tuple(np_ranges[:, 3])))
print("result ranges:", ", ".join(
("%.2f" % i) for i in tuple(scipy.linalg.norm(np_ranges[:, :dimensions] - result_pos[:dimensions], axis=1))
))
if orig_xyz is not None:
print("correct ranges:", ", ".join(
("%.2f" % i)
for i in tuple(scipy.linalg.norm(np_ranges[:, :dimensions] - orig_xyz[:dimensions], axis=1))
))
print()
print("diff result-measured:", ", ".join(
("%.2f" % i) for i in
tuple(diff_func(result_pos))
))
if orig_xyz is not None:
print("diff correct-measured:", ", ".join(
("%.2f" % i) for i in
tuple(diff_func(orig_xyz))
))
def print_cost(title, pos):
cost = cost_func(pos)
print(title, ", ".join(
("%.2f" % i) for i in cost
), '=', np.sum(cost**2))
print_cost("cost:", result_pos)
if orig_xyz is not None:
print_cost("cost of correct position:", orig_xyz)
if dimensions > 2:
print("height:", result_pos[2])
# print("scale:", (factor or results.x[3]))
return location
no_signal = int(-90)**2
@dataclass
2017-12-25 16:41:59 +01:00
class LocatorSpace:
pk: int
points: list[LocatorPoint]
peer_ids: frozenset[int]
peer_lookup: dict[int, int]
levels: np.array
@classmethod
def create(cls, pk: int, points: Sequence[LocatorPoint]):
peer_set = reduce(operator.or_, (frozenset(point.values.keys()) for point in points), frozenset())
peers = tuple(peer_set)
peer_lookup = {peer_id: i for i, peer_id in enumerate(peers)}
levels = np.full((len(points), len(peers)), fill_value=no_signal, dtype=np.int64)
for i, point in enumerate(points):
for peer_id, value in point.values.items():
levels[i][peer_lookup[peer_id]] = int(value.rssi)**2
return cls(
pk=pk,
points=list(points),
peer_ids=peer_set,
peer_lookup=peer_lookup,
levels=levels,
)
def get_best_point(self, scan_values: ScanData,
needed_peer_id=None) -> tuple[LocatorPoint, float] | tuple[None, None]:
# check if this space knows the needed peer id, otherwise no results here
if needed_peer_id not in self.peer_ids:
2018-12-24 18:25:26 +01:00
return None, None
# peers that this space knows
peer_ids = frozenset(scan_values.keys()) & self.peer_ids
2018-12-24 18:25:26 +01:00
penalty = 0
for peer_id, value in scan_values.items():
if peer_id not in self.peer_ids:
penalty += (value.rssi - no_signal)**2
peers = tuple(self.peer_lookup[peer_id] for peer_id in peer_ids)
values = np.array(tuple(scan_values[peer_id] for peer_id in peer_ids), dtype=np.int64)
# acceptable points need to have a value for the needed_peer_id
2018-12-24 18:25:26 +01:00
points = tuple(
np.argwhere(self.levels[:, self.peer_lookup[needed_peer_id]] > 0).ravel()
)
2018-12-24 18:25:26 +01:00
if not points:
2017-12-26 15:28:41 +01:00
return None, None
2018-12-24 19:14:53 +01:00
scores = (np.sum(
(self.levels[np.array(points, dtype=np.uint32).reshape((-1, 1)), peers] - values)**2,
2018-12-24 18:25:26 +01:00
axis=1
2018-12-24 19:14:53 +01:00
)+penalty) / len(scan_values)
2018-12-24 18:25:26 +01:00
best_point_i = np.argmin(scores).ravel()[0]
best_point = points[best_point_i]
2018-12-24 19:14:53 +01:00
return self.points[best_point], scores[best_point_i]