merge RangeLocator into Locator and modernize code

This commit is contained in:
Laura Klünder 2023-12-07 02:15:32 +01:00
parent ae65a68830
commit 34af60782c
14 changed files with 373 additions and 438 deletions

View file

@ -9,7 +9,7 @@ from django.utils.functional import SimpleLazyObject, lazy
from ninja.security import APIKeyHeader
from c3nav import settings
from c3nav.api.exceptions import APIPermissionDenied, APIKeyInvalid
from c3nav.api.exceptions import APIKeyInvalid, APIPermissionDenied
from c3nav.api.models import Secret
from c3nav.api.schema import APIErrorSchema
from c3nav.control.middleware import UserPermissionsMiddleware

View file

@ -21,6 +21,7 @@ from c3nav.mapdata.fields import GeometryField
from c3nav.mapdata.forms import I18nModelFormMixin
from c3nav.mapdata.models import GraphEdge
from c3nav.mapdata.models.access import AccessPermission
from c3nav.routing.schemas import LocateRequestPeerSchema
class EditorFormBase(I18nModelFormMixin, ModelForm):
@ -244,9 +245,12 @@ class EditorFormBase(I18nModelFormMixin, ModelForm):
except json.JSONDecodeError:
raise ValidationError(_('Invalid JSON.'))
from c3nav.routing.locator import LocatorPoint
LocatorPoint.clean_scans(data)
if not isinstance(data, list):
raise ValidationError(_('Scan data is not a list.'))
for item in data:
# todo: catch pydantic validation error
LocateRequestPeerSchema.model_validate(item)
return data
def clean(self):

View file

@ -1,5 +1,5 @@
import json
from typing import Annotated, Optional, Union
from typing import Annotated, Union
from django.core.serializers.json import DjangoJSONEncoder
from django.shortcuts import redirect
@ -17,9 +17,9 @@ from c3nav.mapdata.api.base import api_etag, api_stats, can_access_geometry
from c3nav.mapdata.models import Source
from c3nav.mapdata.models.locations import DynamicLocation, LocationRedirect, Position
from c3nav.mapdata.schemas.filters import BySearchableFilter, RemoveGeometryFilter
from c3nav.mapdata.schemas.model_base import AnyLocationID, AnyPositionID, CustomLocationID, schema_definition
from c3nav.mapdata.schemas.model_base import AnyLocationID, AnyPositionID, CustomLocationID
from c3nav.mapdata.schemas.models import (AnyPositionStatusSchema, FullListableLocationSchema, FullLocationSchema,
LevelSchema, LocationDisplay, SlimListableLocationSchema, SlimLocationSchema,
LocationDisplay, SlimListableLocationSchema, SlimLocationSchema,
all_location_definitions, listable_location_definitions)
from c3nav.mapdata.schemas.responses import LocationGeometry, WithBoundsSchema
from c3nav.mapdata.utils.locations import (get_location_by_id_for_request, get_location_by_slug_for_request,
@ -274,6 +274,7 @@ def get_position_by_id(request, position_id: AnyPositionID):
raise API404()
return location.serialize_position()
class UpdatePositionSchema(Schema):
coordinates_id: Union[
Annotated[CustomLocationID, APIField(title="set coordinates")],

View file

@ -73,7 +73,8 @@ class FetchUpdatesResponseSchema(Schema):
title="null",
description="only for cross-origin requests",
)],
] = APIField(None,
] = APIField(
None,
title="user data",
description="user data of this request. ommited for cross-origin requests.",
)

View file

@ -185,10 +185,6 @@ class MapUpdate(models.Model):
from c3nav.routing.locator import Locator
locator = Locator.rebuild(new_updates[-1].to_tuple)
logger.info('Rebuilding range locator...')
from c3nav.routing.rangelocator import RangeLocator
range_locator = RangeLocator.rebuild(new_updates[-1].to_tuple, router)
for new_update in reversed(new_updates):
new_update.processed = True
new_update.save()

View file

@ -1,5 +1,5 @@
import re
from typing import Annotated, Any, Union, Optional
from typing import Annotated, Any, Optional, Union
from ninja import Schema
from pydantic import Field as APIField

View file

@ -21,7 +21,7 @@ from c3nav.mesh.messages import (MESH_BROADCAST_ADDRESS, MESH_NONE_ADDRESS, MESH
MeshMessage, MeshMessageType, OTAApplyMessage, OTASettingMessage)
from c3nav.mesh.models import MeshNode, MeshUplink, NodeMessage, OTARecipientStatus, OTAUpdate, OTAUpdateRecipient
from c3nav.mesh.utils import MESH_ALL_OTA_GROUP, MESH_ALL_UPLINKS_GROUP, UPLINK_PING, get_mesh_uplink_group
from c3nav.routing.rangelocator import RangeLocator
from c3nav.routing.locator import Locator
class Unknown:
@ -738,13 +738,18 @@ class MeshUIConsumer(AsyncJsonWebsocketConsumer):
@database_sync_to_async
def locator(self, msg, orig_addr=None):
locator = RangeLocator.load()
return locator.locate(
locator = Locator.load()
return locator.locate_range(
locator.convert_raw_scan_data([
{
r["peer"]: r["distance"]
"bssid": r["peer"],
"ssid": "",
"rssi": r["rssi"],
"distance": r["distance"]
}
for r in msg["ranges"]
if r["distance"] != 0xFFFF
},
]),
permissions=None,
orig_addr=orig_addr,
)

View file

@ -23,7 +23,7 @@ from c3nav.mesh.messages import ConfigFirmwareMessage, ConfigHardwareMessage
from c3nav.mesh.messages import MeshMessage as MeshMessage
from c3nav.mesh.messages import MeshMessageType
from c3nav.mesh.utils import MESH_ALL_OTA_GROUP, UPLINK_TIMEOUT
from c3nav.routing.rangelocator import RangeLocator
from c3nav.routing.locator import Locator
FirmwareLookup = namedtuple('FirmwareLookup', ('sha256_hash', 'chip', 'project_name', 'version', 'idf_version'))
@ -338,7 +338,7 @@ class MeshNode(models.Model):
return dst_node.get_uplink()
def get_locator_xyz(self):
locator = RangeLocator.load()
locator = Locator.load()
return locator.get_xyz(self.address)

View file

@ -19,9 +19,9 @@ class MeshRangingView(TemplateView):
template_name = "mesh/mesh_ranging.html"
def get_context_data(self, **kwargs):
from c3nav.routing.rangelocator import RangeLocator
from c3nav.routing.locator import Locator
return {
"ranging_form": RangingForm(self.request.GET or None),
"node_names": get_node_names(),
"nodes_xyz": RangeLocator.load().get_all_xyz(),
"nodes_xyz": Locator.load().get_all_xyz(),
}

View file

@ -4,57 +4,19 @@ from django.core.exceptions import ValidationError
from ninja import Field as APIField
from ninja import Router as APIRouter
from ninja import Schema
from pydantic import NegativeInt, PositiveInt
from c3nav.api.auth import auth_responses
from c3nav.api.utils import NonEmptyStr
from c3nav.mapdata.models.access import AccessPermission
from c3nav.mapdata.schemas.models import CustomLocationSchema
from c3nav.mapdata.utils.cache.stats import increment_cache_key
from c3nav.routing.locator import Locator
from c3nav.routing.rangelocator import RangeLocator
from c3nav.routing.schemas import LocateRequestPeerSchema
BSSIDSchema = Annotated[str, APIField(pattern=r"^[a-z0-9]{2}(:[a-z0-9]{2}){5}$", title="BSSID")]
positioning_api_router = APIRouter(tags=["positioning"])
class LocateRequestPeerSchema(Schema):
bssid: BSSIDSchema = APIField(
title="BSSID",
description="BSSID of the peer",
example="c3:42:13:37:ac:ab",
)
ssid: NonEmptyStr = APIField(
title="SSID",
description="(E)SSID of the peer",
example="c3nav-locate",
)
rssi: NegativeInt = APIField(
title="RSSI",
description="RSSI in dBm",
example=-42,
)
frequency: Union[
PositiveInt,
Annotated[None, APIField(title="null", description="frequency not given")]
] = APIField(
default=None,
title="frequency",
description="frequency in KHz",
example=2472,
)
distance: Union[
float,
Annotated[None, APIField(title="null", description="distance was not measured")]
] = APIField(
default=None,
title="distance",
description="measured distance in meters",
example=8.32
)
class LocateRequestSchema(Schema):
peers: list[LocateRequestPeerSchema] = APIField(
title="list of visible/measured location beacons",
@ -106,13 +68,18 @@ def locate_test():
}
msg = node.last_messages[MeshMessageType.LOCATE_RANGE_RESULTS]
locator = RangeLocator.load()
location = locator.locate(
locator = Locator.load()
location = locator.locate_range(
locator.convert_raw_scan_data([
{
r.peer: r.distance
"bssid": r.peer,
"ssid": "",
"rssi": r.rssi,
"distance": r.distance,
}
for r in msg.parsed.ranges
if r.distance != 0xFFFF
},
]),
None
)
return {
@ -139,4 +106,4 @@ BeaconsXYZ = dict[
description="get xyz coordinates for all known positioning beacons",
response={200: BeaconsXYZ, **auth_responses})
def beacons_xyz():
return RangeLocator.load().get_all_xyz()
return Locator.load().get_all_xyz()

View file

@ -1,5 +1,5 @@
from enum import StrEnum
from typing import Annotated, Optional, Union
from typing import Annotated, Union
from django.core.exceptions import ValidationError
from django.urls import reverse

View file

@ -1,43 +1,126 @@
import operator
import pickle
import re
import threading
from collections import deque, namedtuple
from dataclasses import dataclass, field
from functools import reduce
from pprint import pprint
from typing import Optional, Self, Sequence, TypeAlias
import numpy as np
import scipy
from django.conf import settings
from django.core.exceptions import ValidationError
from django.utils.translation import gettext_lazy as _
from scipy.optimize import least_squares
from c3nav.mapdata.models import MapUpdate, Space
from c3nav.mapdata.models.geometry.space import RangingBeacon
from c3nav.mapdata.utils.locations import CustomLocation
from c3nav.routing.router import Router
from c3nav.routing.schemas import LocateRequestPeerSchema
BSSID: TypeAlias = str
class Locator:
filename = settings.CACHE_ROOT / 'locator'
@dataclass
class LocatorPeer:
bssid: BSSID
frequencies: set[int] = field(default_factory=set)
xyz: Optional[tuple[int, int, int]] = None
def __init__(self, stations, spaces):
self.stations = stations
self.spaces = spaces
@dataclass
class ScanDataValue:
rssi: int
distance: Optional[float] = None
@classmethod
def rebuild(cls, update):
stations = LocatorStations()
spaces = {}
def average(cls, items: Sequence[Self]):
rssi = [item.rssi for item in items]
distance = [item.distance for item in items if item.distance is not None]
return cls(
rssi=(sum(rssi)//len(rssi)),
distance=(sum(distance)/len(distance)) if distance else None,
)
ScanData: TypeAlias = dict[int, ScanDataValue]
@dataclass
class LocatorPoint:
x: float
y: float
values: ScanData
@dataclass
class Locator:
peers: list[LocatorPeer] = field(default_factory=list)
peer_lookup: dict[BSSID, int] = field(default_factory=dict)
xyz: np.array = np.empty((0,))
spaces: dict[int, "LocatorSpace"] = field(default_factory=dict)
@classmethod
def rebuild(cls, update, router):
locator = cls()
locator._rebuild(router)
pickle.dump(locator, open(cls.build_filename(update), 'wb'))
return locator
def _rebuild(self, router):
for beacon in RangingBeacon.objects.all():
peer_id = self.get_peer_id(beacon.bssid, create=True)
self.peers[peer_id].xyz = (
int(beacon.geometry.x * 100),
int(beacon.geometry.y * 100),
int((router.altitude_for_point(beacon.space_id, beacon.geometry) + float(beacon.altitude)) * 100),
)
self.xyz = np.array(tuple(peer.xyz for peer in self.peers))
for space in Space.objects.prefetch_related('wifi_measurements'):
new_space = LocatorSpace(
pk=space.pk,
points=(LocatorPoint.from_measurement(measurement, stations)
for measurement in space.wifi_measurements.all())
points=tuple(
LocatorPoint(
x=measurement.x,
y=measurement.x,
values=self.convert_scans(measurement.data),
)
for measurement in space.wifi_measurements.all()
)
)
if new_space.points:
spaces[space.pk] = new_space
self.spaces[space.pk] = new_space
locator = cls(stations, spaces)
pickle.dump(locator, open(cls.build_filename(update), 'wb'))
return locator
def get_peer_id(self, bssid: BSSID, create=False) -> Optional[int]:
peer_id = self.peer_lookup.get(bssid, None)
if peer_id is None and create:
peer = LocatorPeer(bssid=bssid)
peer_id = len(self.peers)
self.peer_lookup[bssid] = peer_id
self.peers.append(peer)
return peer_id
def convert_scan(self, scan_data, create_peers=False) -> ScanData:
result = {}
for scan_value in scan_data:
if settings.WIFI_SSIDS and scan_value['ssid'] not in settings.WIFI_SSIDS:
continue
peer_id = self.get_peer_id(scan_value['bssid'], create=create_peers)
if peer_id is not None:
result[peer_id] = ScanDataValue(rssi=scan_value["rssi"], distance=scan_value["distance"])
return result
def convert_scans(self, scans_data, create_peers=False) -> ScanData:
converted = []
for scan in scans_data:
converted.append(self.convert_scan(scan, create_peers=create_peers))
peer_ids = reduce(operator.or_, (frozenset(values.keys()) for values in converted), frozenset())
return {
peer_id: ScanDataValue.average(
tuple(values[peer_id] for values in converted if peer_id in values)
)
for peer_id in peer_ids
}
@classmethod
def build_filename(cls, update):
@ -61,27 +144,44 @@ class Locator:
cls.cached = cls.load_nocache(update)
return cls.cached
def locate(self, scan, permissions=None):
router = Router.load()
restrictions = router.get_restrictions(permissions)
def convert_raw_scan_data(self, raw_scan_data: list[LocateRequestPeerSchema]) -> ScanData:
return self.convert_scan([d.dict() for d in raw_scan_data], create_peers=False)
scan = LocatorPoint.clean_scan(scan, ignore_invalid_stations=True)
scan_values = LocatorPoint.convert_scan(scan, self.stations, create=False)
if not scan_values:
def get_xyz(self, address: BSSID) -> tuple[int, int, int] | None:
i = self.get_peer_id(address)
if i is None:
return None
return self.peers[i].xyz
def get_all_xyz(self) -> dict[BSSID, float]:
return {
peer: peer.xyz for peer in self.peers[:len(self.xyz)]
}
def locate(self, raw_scan_data: list[LocateRequestPeerSchema], permissions=None):
scan_data = self.convert_raw_scan_data(raw_scan_data)
if not scan_data:
return None
# convert scan values
scan_values = {station_id: value**2 for station_id, value in scan_values.items()}
result = self.locate_range(scan_data, permissions)
if result is not None:
return result
return self.locate_rssi(scan_data, permissions)
def locate_rssi(self, scan_data: ScanData, permissions=None):
router = Router.load()
restrictions = router.get_restrictions(permissions)
# get visible spaces
spaces = tuple(space for pk, space in self.spaces.items() if pk not in restrictions.spaces)
# find best point
best_station_id = max(scan_values.items(), key=operator.itemgetter(1))[0]
best_peer_id = max(scan_data.items(), key=operator.itemgetter(1))[0]
best_location = None
best_score = float('inf')
for space in spaces:
point, score = space.get_best_point(scan_values, needed_station_id=best_station_id)
point, score = space.get_best_point(scan_data, needed_peer_id=best_peer_id)
if point is None:
continue
if score < best_score:
@ -94,165 +194,186 @@ class Locator:
return best_location
def locate_range(self, scan_data: ScanData, permissions=None, orig_addr=None):
pprint(scan_data)
class LocatorStations:
def __init__(self):
self.stations = []
self.stations_lookup = {}
peer_ids = tuple(i for i in scan_data if i < len(self.xyz))
def get(self, bssid, ssid, frequency, create=False):
# yes, we're not looking up frequencies, because they can change… yep…
station_id = self.stations_lookup.get((bssid, None), None)
if station_id is not None:
station = self.stations[station_id]
station.frequencies.add(frequency)
elif create:
station = LocatorStation(bssid, ssid, {frequency})
station_id = len(self.stations)
self.stations_lookup[(bssid, None)] = station_id
self.stations.append(station)
return station_id
if len(peer_ids) < 3:
# can't get a good result from just two beacons
# todo: maybe we can at least give… something?
print('less than 3 ranges, can\'t do ranging')
return None
if len(peer_ids) == 3 and 0:
print('2D trilateration')
dimensions = 2
else:
print('3D trilateration')
dimensions = 3
relevant_xyz = self.xyz[peer_ids, :]
# create 2d array with x, y, z, distance as rows
np_ranges = np.hstack((
relevant_xyz,
np.array(tuple(scan_data[i].distance for i in peer_ids)).reshape((-1, 1)),
))
print(np_ranges)
measured_ranges = np_ranges[:, 3]
print('a', measured_ranges)
# measured_ranges[measured_ranges<1] = 1
print('b', measured_ranges)
# rating the guess by calculating the distances
def diff_func(guess):
result = scipy.linalg.norm(np_ranges[:, :dimensions] - guess[:dimensions], axis=1) - measured_ranges
# print(result)
return result
# factors = scipy.linalg.norm(np_ranges[:, :dimensions] - guess[:dimensions], axis=1) / measured_ranges
# return factors - np.mean(factors)
def cost_func(guess):
result = np.abs(diff_func(guess))
result[result < 300] = result[result < 300]/3+200
return result
# initial guess i the average of all beacons, with scale 1
initial_guess = np.average(np_ranges[:, :dimensions], axis=0)
# here the magic happens
results = least_squares(
fun=cost_func,
# jac="3-point",
loss="linear",
bounds=(
np.min(self.beacon_positions[:, :dimensions], axis=0) - np.array([200, 200, 100])[:dimensions],
np.max(self.beacon_positions[:, :dimensions], axis=0) + np.array([200, 200, 100])[:dimensions],
),
x0=initial_guess,
)
# create result
# todo: figure out level
result_pos = results.x
from c3nav.mapdata.models import Level
location = CustomLocation(
level=Level.objects.first(),
x=result_pos[0]/100,
y=result_pos[1]/100,
permissions=(),
icon='my_location'
)
location.z = result_pos[2]/100
pprint(relevant_xyz)
orig_xyz = None
print('orig_addr', orig_addr)
if orig_addr:
orig_xyz = self.get_xyz(orig_addr)
if orig_xyz:
orig_xyz = np.array(orig_xyz)
print()
print("result:", ", ".join(("%.2f" % i) for i in tuple(result_pos)))
if orig_xyz is not None:
print("correct:", ", ".join(("%.2f" % i) for i in tuple(orig_xyz)))
print("diff:", ", ".join(("%.2f" % i) for i in tuple(orig_xyz-result_pos)))
print()
print("measured ranges:", ", ".join(("%.2f" % i) for i in tuple(np_ranges[:, 3])))
print("result ranges:", ", ".join(
("%.2f" % i) for i in tuple(scipy.linalg.norm(np_ranges[:, :dimensions] - result_pos[:dimensions], axis=1))
))
if orig_xyz is not None:
print("correct ranges:", ", ".join(
("%.2f" % i)
for i in tuple(scipy.linalg.norm(np_ranges[:, :dimensions] - orig_xyz[:dimensions], axis=1))
))
print()
print("diff result-measured:", ", ".join(
("%.2f" % i) for i in
tuple(diff_func(result_pos))
))
if orig_xyz is not None:
print("diff correct-measured:", ", ".join(
("%.2f" % i) for i in
tuple(diff_func(orig_xyz))
))
def print_cost(title, pos):
cost = cost_func(pos)
print(title, ", ".join(
("%.2f" % i) for i in cost
), '=', np.sum(cost**2))
print_cost("cost:", result_pos)
if orig_xyz is not None:
print_cost("cost of correct position:", orig_xyz)
if dimensions > 2:
print("height:", result_pos[2])
# print("scale:", (factor or results.x[3]))
return location
no_signal = int(-90)**2
@dataclass
class LocatorSpace:
no_signal = int(-90)**2
pk: int
points: list[LocatorPoint]
peer_ids: frozenset[int]
peer_lookup: dict[int, int]
levels: np.array
def __init__(self, pk, points):
self.pk = pk
self.points = tuple(points)
self.stations_set = reduce(operator.or_, (frozenset(point.values.keys()) for point in self.points), frozenset())
self.stations = tuple(self.stations_set)
self.stations_lookup = {station_id: i for i, station_id in enumerate(self.stations)}
@classmethod
def create(cls, pk: int, points: Sequence[LocatorPoint]):
peer_set = reduce(operator.or_, (frozenset(point.values.keys()) for point in points), frozenset())
peers = tuple(peer_set)
peer_lookup = {peer_id: i for i, peer_id in enumerate(peers)}
levels = np.full((len(points), len(peers)), fill_value=no_signal, dtype=np.int64)
for i, point in enumerate(points):
for peer_id, value in point.values.items():
levels[i][peer_lookup[peer_id]] = int(value)**2
self.levels = np.full((len(self.points), len(self.stations)), fill_value=self.no_signal, dtype=np.int64)
for i, point in enumerate(self.points):
for station_id, value in point.values.items():
self.levels[i][self.stations_lookup[station_id]] = int(value)**2
return cls(
pk=pk,
points=list(points),
peer_ids=peer_set,
peer_lookup=peer_lookup,
levels=levels,
)
def get_best_point(self, scan_values, needed_station_id=None):
# check if this space knows the needed station id, otherwise no results here
if needed_station_id not in self.stations_set:
def get_best_point(self, scan_values: ScanData,
needed_peer_id=None) -> tuple[LocatorPoint, float] | tuple[None, None]:
# check if this space knows the needed peer id, otherwise no results here
if needed_peer_id not in self.peer_ids:
return None, None
# stations that this space knows
station_ids = frozenset(scan_values.keys()) & self.stations_set
# peers that this space knows
peer_id = frozenset(scan_values.keys()) & self.peer_ids
penalty = 0
for station_id, value in scan_values.items():
if station_id not in self.stations_set:
penalty += (value - self.no_signal)**2
for peer_id, value in scan_values.items():
if peer_id not in self.peer_ids:
penalty += (value.rssi - no_signal)**2
stations = tuple(self.stations_lookup[station_id] for station_id in station_ids)
values = np.array(tuple(scan_values[station_id] for station_id in station_ids), dtype=np.int64)
peers = tuple(self.peer_lookup[peer_id] for peer_id in peer_id)
values = np.array(tuple(scan_values[peer_id] for peer_id in peer_id), dtype=np.int64)
# acceptable points need to have a value for the needed_station_id
# points = tuple(
# np.argwhere(self.levels[:, self.stations_lookup[needed_station_id]] < self.no_signal).ravel()
# )
# temporary: don't filter these points by needed station id! the noc is still having fun deploying new stationg
# acceptable points need to have a value for the needed_peer_id
points = tuple(
np.argwhere(self.levels[:, self.stations_lookup[needed_station_id]] > 0).ravel()
np.argwhere(self.levels[:, self.peer_lookup[needed_peer_id]] > 0).ravel()
)
if not points:
return None, None
scores = (np.sum(
(self.levels[np.array(points, dtype=np.uint32).reshape((-1, 1)), stations] - values)**2,
(self.levels[np.array(points, dtype=np.uint32).reshape((-1, 1)), peers] - values)**2,
axis=1
)+penalty) / len(scan_values)
best_point_i = np.argmin(scores).ravel()[0]
best_point = points[best_point_i]
return self.points[best_point], scores[best_point_i]
class LocatorPoint(namedtuple('LocatorPoint', ('x', 'y', 'values'))):
@classmethod
def from_measurement(cls, measurement, stations: LocatorStations):
return cls(x=measurement.geometry.x, y=measurement.geometry.y,
values=cls.convert_scans(measurement.data, stations, create=True))
@classmethod
def convert_scan(cls, scan, stations: LocatorStations, create=False):
values = {}
for scan_value in scan:
if settings.WIFI_SSIDS and scan_value['ssid'] not in settings.WIFI_SSIDS:
continue
station_id = stations.get(bssid=scan_value['bssid'], ssid=scan_value['ssid'],
frequency=scan_value['frequency'], create=create)
if station_id is not None:
# todo: convert to something more or less linear
values[station_id] = scan_value['level']
return values
@classmethod
def convert_scans(cls, scans, stations: LocatorStations, create=False):
values_list = deque()
for scan in scans:
values_list.append(cls.convert_scan(scan, stations, create))
station_ids = reduce(operator.or_, (frozenset(values.keys()) for values in values_list), frozenset())
return {
station_id: cls.average(tuple(values[station_id] for values in values_list if station_id in values))
for station_id in station_ids
}
@staticmethod
def average(items):
return sum(items) / len(items)
valid_frequencies = frozenset((
2412, 2417, 2422, 2427, 2432, 2437, 2442, 2447, 2452, 2457, 2462, 2467, 2472, 2484,
5180, 5190, 5200, 5210, 5220, 5230, 5240, 5250, 5260, 5270, 5280, 5290, 5300, 5310, 5320,
5500, 5510, 5520, 5530, 5540, 5550, 5560, 5570, 5580, 5590, 5600, 5610, 5620, 5630, 5640,
5660, 5670, 5680, 5690, 5700, 5710, 5720, 5745, 5755, 5765, 5775, 5785, 5795, 5805, 5825
))
needed_keys = frozenset(('bssid', 'ssid', 'level', 'frequency'))
allowed_keys = needed_keys | frozenset(('last', ))
@classmethod
def clean_scans(cls, data, ignore_invalid_stations=False):
if not isinstance(data, list):
raise ValidationError(_('Invalid Scan. Scans list list not a list.'))
return tuple(cls.clean_scan(scan) for scan in data)
@classmethod
def clean_scan(cls, data, ignore_invalid_stations=False):
if not isinstance(data, list):
raise ValidationError(_('Invalid Scan. Scan not a list.'))
cleaned_scan = deque()
for scan_value in data:
try:
cleaned_scan.append(cls.clean_scan_value(scan_value))
except ValidationError:
if not ignore_invalid_stations:
raise
return tuple(cleaned_scan)
@classmethod
def clean_scan_value(cls, data):
if not isinstance(data, dict):
raise ValidationError(_('Invalid Scan. Scan value not a dictionary.'))
keys = frozenset(data.keys())
if (keys - cls.allowed_keys) or (cls.needed_keys - keys):
raise ValidationError(_('Invalid Scan. Missing or forbidden keys.'))
if not isinstance(data['bssid'], str):
raise ValidationError(_('Invalid Scan. BSSID not a String.'))
data['bssid'] = data['bssid'].upper()
if not re.match(r'^([0-9A-F]{2}:){5}[0-9A-F]{2}$', data['bssid']):
raise ValidationError(_('Invalid Scan. Invalid BSSID.'))
if not isinstance(data['level'], int) or not (-1 >= data['level'] >= -100):
raise ValidationError(_('Invalid Scan. Invalid RSSI/Level.'))
if data['frequency'] not in cls.valid_frequencies:
raise ValidationError(_('Invalid Scan. Not an allowed frequency.'))
if 'last' in keys and (not isinstance(data['last'], int) or data['last'] <= 0):
raise ValidationError(_('Invalid Scan. Invalid last timestamp.'))
return data
class LocatorStation:
def __init__(self, bssid, ssid, frequencies=()):
self.bssid = bssid
self.ssid = ssid
self.frequencies = set(frequencies)
def __repr__(self):
return 'LocatorStation(%r, %r, frequencies=%r)' % (self.bssid, self.ssid, self.frequencies)

View file

@ -1,204 +0,0 @@
import pickle
import threading
from dataclasses import dataclass
from pprint import pprint
from typing import Annotated, Self
import numpy as np
import scipy
from django.conf import settings
from scipy.optimize import least_squares
from c3nav.mapdata.models import MapUpdate
from c3nav.mapdata.models.geometry.space import RangingBeacon
from c3nav.mapdata.utils.locations import CustomLocation
from c3nav.routing.router import Router
@dataclass
class RangeLocator:
filename = settings.CACHE_ROOT / 'rangelocator'
beacon_positions: np.array
beacon_lookup: dict[str: int]
@classmethod
def rebuild(cls, update, router):
beacons = RangingBeacon.objects.all()
locator = cls(
beacon_positions=np.array(tuple(
(
int(beacon.geometry.x * 100),
int(beacon.geometry.y * 100),
int((router.altitude_for_point(beacon.space_id, beacon.geometry) + float(beacon.altitude)) * 100),
)
for beacon in beacons
)),
beacon_lookup={beacon.bssid: i for i, beacon in enumerate(beacons)}
)
pickle.dump(locator, open(cls.build_filename(update), 'wb'))
return locator
@classmethod
def build_filename(cls, update):
return settings.CACHE_ROOT / ('rangelocator_%s.pickle' % MapUpdate.build_cache_key(*update))
@classmethod
def load_nocache(cls, update):
return pickle.load(open(cls.build_filename(update), 'rb'))
def get_xyz(self, address) -> tuple[int, int, int] | None:
try:
i = self.beacon_lookup[address]
except KeyError:
return None
return tuple(self.beacon_positions[i])
def get_all_xyz(self):
return {
address: tuple(self.beacon_positions[i].tolist())
for address, i in self.beacon_lookup.items()
}
cached = None
cache_update = None
cache_lock = threading.Lock()
@classmethod
def load(cls) -> Self:
from c3nav.mapdata.models import MapUpdate
update = MapUpdate.last_processed_update()
if cls.cache_update != update:
with cls.cache_lock:
cls.cache_update = update
cls.cached = cls.load_nocache(update)
return cls.cached
def locate(self, ranges: dict[str, int], permissions=None, orig_addr=None):
pprint(ranges)
# get the i and peer for every peer that we actually know
relevant_ranges = tuple(
(i, distance) for i, distance in (
(self.beacon_lookup.get(bssid, None), distance) for bssid, distance in ranges.items()
) if i is not None
)
relevant_positions = self.beacon_positions[tuple(i for i, _ in relevant_ranges), :]
mean = np.mean(relevant_positions, axis=0)
relevant_positions = relevant_positions
# create 2d array with x, y, z, distance as rows
np_ranges = np.hstack((
relevant_positions,
np.array(tuple(distance for i, distance in relevant_ranges)).reshape((-1, 1)),
))
print(np_ranges)
if np_ranges.shape[0] < 3:
# can't get a good result from just two beacons
# todo: maybe we can at least give… something?
print('less than 3 ranges, can\'t do ranging')
return None
if np_ranges.shape[0] == 3 and 0:
print('2D trilateration')
dimensions = 2
else:
print('3D trilateration')
dimensions = 3
measured_ranges = np_ranges[:, 3]
print('a', measured_ranges)
#measured_ranges[measured_ranges<1] = 1
print('b', measured_ranges)
# rating the guess by calculating the distances
def diff_func(guess):
result = scipy.linalg.norm(np_ranges[:, :dimensions] - guess[:dimensions], axis=1) - measured_ranges
#print(result)
return result
# factors = scipy.linalg.norm(np_ranges[:, :dimensions] - guess[:dimensions], axis=1) / measured_ranges
# return factors - np.mean(factors)
def cost_func(guess):
result = np.abs(diff_func(guess))
result[result<300] = result[result<300]/3+200
return result
# initial guess i the average of all beacons, with scale 1
initial_guess = np.average(np_ranges[:, :dimensions], axis=0)
# here the magic happens
results = least_squares(
fun=cost_func,
#jac="3-point",
loss="linear",
bounds=(
np.min(self.beacon_positions[:, :dimensions], axis=0) - np.array([200, 200, 100])[:dimensions],
np.max(self.beacon_positions[:, :dimensions], axis=0) + np.array([200, 200, 100])[:dimensions],
),
x0=initial_guess,
)
# create result
# todo: figure out level
result_pos = results.x
from c3nav.mapdata.models import Level
location = CustomLocation(
level=Level.objects.first(),
x=result_pos[0]/100,
y=result_pos[1]/100,
permissions=(),
icon='my_location'
)
location.z = result_pos[2]/100
pprint(relevant_ranges)
orig_xyz = None
print('orig_addr', orig_addr)
if orig_addr:
orig_xyz = self.get_xyz(orig_addr)
if orig_xyz:
orig_xyz = np.array(orig_xyz)
print()
print("result:", ", ".join(("%.2f" % i) for i in tuple(result_pos)))
if orig_xyz is not None:
print("correct:", ", ".join(("%.2f" % i) for i in tuple(orig_xyz)))
print("diff:", ", ".join(("%.2f" % i) for i in tuple(orig_xyz-result_pos)))
print()
print("measured ranges:", ", ".join(("%.2f" % i) for i in tuple(np_ranges[:, 3])))
print("result ranges:", ", ".join(
("%.2f" % i) for i in tuple(scipy.linalg.norm(np_ranges[:, :dimensions] - result_pos[:dimensions], axis=1))
))
if orig_xyz is not None:
print("correct ranges:", ", ".join(
("%.2f" % i) for i in tuple(scipy.linalg.norm(np_ranges[:, :dimensions] - orig_xyz[:dimensions], axis=1))
))
print()
print("diff result-measured:", ", ".join(
("%.2f" % i) for i in
tuple(diff_func(result_pos))
))
if orig_xyz is not None:
print("diff correct-measured:", ", ".join(
("%.2f" % i) for i in
tuple(diff_func(orig_xyz))
))
def print_cost(title, pos):
cost = cost_func(pos)
print(title, ", ".join(
("%.2f" % i) for i in cost
), '=', np.sum(cost**2))
print_cost("cost:", result_pos)
if orig_xyz is not None:
print_cost("cost of correct position:", orig_xyz)
if dimensions > 2:
print("height:", result_pos[2])
# print("scale:", (factor or results.x[3]))
return location

View file

@ -0,0 +1,44 @@
from typing import Annotated, Union
from ninja import Schema
from pydantic import Field as APIField
from pydantic import NegativeInt, PositiveInt
from c3nav.api.utils import NonEmptyStr
from c3nav.routing.api.positioning import BSSIDSchema
class LocateRequestPeerSchema(Schema):
bssid: BSSIDSchema = APIField(
title="BSSID",
description="BSSID of the peer",
example="c3:42:13:37:ac:ab",
)
ssid: NonEmptyStr = APIField(
title="SSID",
description="(E)SSID of the peer",
example="c3nav-locate",
)
rssi: NegativeInt = APIField(
title="RSSI",
description="RSSI in dBm",
example=-42,
)
frequency: Union[
PositiveInt,
Annotated[None, APIField(title="null", description="frequency not given")]
] = APIField(
default=None,
title="frequency",
description="frequency in KHz",
example=2472,
)
distance: Union[
float,
Annotated[None, APIField(title="null", description="distance was not measured")]
] = APIField(
default=None,
title="distance",
description="measured distance in meters",
example=8.32
)