diff --git a/src/c3nav/api/auth.py b/src/c3nav/api/auth.py index e70563ca..3b462ce1 100644 --- a/src/c3nav/api/auth.py +++ b/src/c3nav/api/auth.py @@ -9,7 +9,7 @@ from django.utils.functional import SimpleLazyObject, lazy from ninja.security import APIKeyHeader from c3nav import settings -from c3nav.api.exceptions import APIPermissionDenied, APIKeyInvalid +from c3nav.api.exceptions import APIKeyInvalid, APIPermissionDenied from c3nav.api.models import Secret from c3nav.api.schema import APIErrorSchema from c3nav.control.middleware import UserPermissionsMiddleware diff --git a/src/c3nav/editor/forms.py b/src/c3nav/editor/forms.py index a0a27f7e..43a3730b 100644 --- a/src/c3nav/editor/forms.py +++ b/src/c3nav/editor/forms.py @@ -21,6 +21,7 @@ from c3nav.mapdata.fields import GeometryField from c3nav.mapdata.forms import I18nModelFormMixin from c3nav.mapdata.models import GraphEdge from c3nav.mapdata.models.access import AccessPermission +from c3nav.routing.schemas import LocateRequestPeerSchema class EditorFormBase(I18nModelFormMixin, ModelForm): @@ -244,9 +245,12 @@ class EditorFormBase(I18nModelFormMixin, ModelForm): except json.JSONDecodeError: raise ValidationError(_('Invalid JSON.')) - from c3nav.routing.locator import LocatorPoint - LocatorPoint.clean_scans(data) + if not isinstance(data, list): + raise ValidationError(_('Scan data is not a list.')) + for item in data: + # todo: catch pydantic validation error + LocateRequestPeerSchema.model_validate(item) return data def clean(self): diff --git a/src/c3nav/mapdata/api/map.py b/src/c3nav/mapdata/api/map.py index dcab9ed1..35c22c2a 100644 --- a/src/c3nav/mapdata/api/map.py +++ b/src/c3nav/mapdata/api/map.py @@ -1,5 +1,5 @@ import json -from typing import Annotated, Optional, Union +from typing import Annotated, Union from django.core.serializers.json import DjangoJSONEncoder from django.shortcuts import redirect @@ -17,9 +17,9 @@ from c3nav.mapdata.api.base import api_etag, api_stats, can_access_geometry from c3nav.mapdata.models import Source from c3nav.mapdata.models.locations import DynamicLocation, LocationRedirect, Position from c3nav.mapdata.schemas.filters import BySearchableFilter, RemoveGeometryFilter -from c3nav.mapdata.schemas.model_base import AnyLocationID, AnyPositionID, CustomLocationID, schema_definition +from c3nav.mapdata.schemas.model_base import AnyLocationID, AnyPositionID, CustomLocationID from c3nav.mapdata.schemas.models import (AnyPositionStatusSchema, FullListableLocationSchema, FullLocationSchema, - LevelSchema, LocationDisplay, SlimListableLocationSchema, SlimLocationSchema, + LocationDisplay, SlimListableLocationSchema, SlimLocationSchema, all_location_definitions, listable_location_definitions) from c3nav.mapdata.schemas.responses import LocationGeometry, WithBoundsSchema from c3nav.mapdata.utils.locations import (get_location_by_id_for_request, get_location_by_slug_for_request, @@ -274,6 +274,7 @@ def get_position_by_id(request, position_id: AnyPositionID): raise API404() return location.serialize_position() + class UpdatePositionSchema(Schema): coordinates_id: Union[ Annotated[CustomLocationID, APIField(title="set coordinates")], diff --git a/src/c3nav/mapdata/api/updates.py b/src/c3nav/mapdata/api/updates.py index 5063cc2f..3ecd9f86 100644 --- a/src/c3nav/mapdata/api/updates.py +++ b/src/c3nav/mapdata/api/updates.py @@ -73,7 +73,8 @@ class FetchUpdatesResponseSchema(Schema): title="null", description="only for cross-origin requests", )], - ] = APIField(None, + ] = APIField( + None, title="user data", description="user data of this request. ommited for cross-origin requests.", ) diff --git a/src/c3nav/mapdata/models/update.py b/src/c3nav/mapdata/models/update.py index de879191..65dfcefc 100644 --- a/src/c3nav/mapdata/models/update.py +++ b/src/c3nav/mapdata/models/update.py @@ -185,10 +185,6 @@ class MapUpdate(models.Model): from c3nav.routing.locator import Locator locator = Locator.rebuild(new_updates[-1].to_tuple) - logger.info('Rebuilding range locator...') - from c3nav.routing.rangelocator import RangeLocator - range_locator = RangeLocator.rebuild(new_updates[-1].to_tuple, router) - for new_update in reversed(new_updates): new_update.processed = True new_update.save() diff --git a/src/c3nav/mapdata/schemas/model_base.py b/src/c3nav/mapdata/schemas/model_base.py index c8a4540e..f91a73a7 100644 --- a/src/c3nav/mapdata/schemas/model_base.py +++ b/src/c3nav/mapdata/schemas/model_base.py @@ -1,5 +1,5 @@ import re -from typing import Annotated, Any, Union, Optional +from typing import Annotated, Any, Optional, Union from ninja import Schema from pydantic import Field as APIField diff --git a/src/c3nav/mesh/consumers.py b/src/c3nav/mesh/consumers.py index 9cef2e4c..0a100884 100644 --- a/src/c3nav/mesh/consumers.py +++ b/src/c3nav/mesh/consumers.py @@ -21,7 +21,7 @@ from c3nav.mesh.messages import (MESH_BROADCAST_ADDRESS, MESH_NONE_ADDRESS, MESH MeshMessage, MeshMessageType, OTAApplyMessage, OTASettingMessage) from c3nav.mesh.models import MeshNode, MeshUplink, NodeMessage, OTARecipientStatus, OTAUpdate, OTAUpdateRecipient from c3nav.mesh.utils import MESH_ALL_OTA_GROUP, MESH_ALL_UPLINKS_GROUP, UPLINK_PING, get_mesh_uplink_group -from c3nav.routing.rangelocator import RangeLocator +from c3nav.routing.locator import Locator class Unknown: @@ -738,13 +738,18 @@ class MeshUIConsumer(AsyncJsonWebsocketConsumer): @database_sync_to_async def locator(self, msg, orig_addr=None): - locator = RangeLocator.load() - return locator.locate( - { - r["peer"]: r["distance"] + locator = Locator.load() + return locator.locate_range( + locator.convert_raw_scan_data([ + { + "bssid": r["peer"], + "ssid": "", + "rssi": r["rssi"], + "distance": r["distance"] + } for r in msg["ranges"] if r["distance"] != 0xFFFF - }, + ]), permissions=None, orig_addr=orig_addr, ) diff --git a/src/c3nav/mesh/models.py b/src/c3nav/mesh/models.py index 9422be78..681def67 100644 --- a/src/c3nav/mesh/models.py +++ b/src/c3nav/mesh/models.py @@ -23,7 +23,7 @@ from c3nav.mesh.messages import ConfigFirmwareMessage, ConfigHardwareMessage from c3nav.mesh.messages import MeshMessage as MeshMessage from c3nav.mesh.messages import MeshMessageType from c3nav.mesh.utils import MESH_ALL_OTA_GROUP, UPLINK_TIMEOUT -from c3nav.routing.rangelocator import RangeLocator +from c3nav.routing.locator import Locator FirmwareLookup = namedtuple('FirmwareLookup', ('sha256_hash', 'chip', 'project_name', 'version', 'idf_version')) @@ -338,7 +338,7 @@ class MeshNode(models.Model): return dst_node.get_uplink() def get_locator_xyz(self): - locator = RangeLocator.load() + locator = Locator.load() return locator.get_xyz(self.address) diff --git a/src/c3nav/mesh/views/misc.py b/src/c3nav/mesh/views/misc.py index 09403c13..24a10aee 100644 --- a/src/c3nav/mesh/views/misc.py +++ b/src/c3nav/mesh/views/misc.py @@ -19,9 +19,9 @@ class MeshRangingView(TemplateView): template_name = "mesh/mesh_ranging.html" def get_context_data(self, **kwargs): - from c3nav.routing.rangelocator import RangeLocator + from c3nav.routing.locator import Locator return { "ranging_form": RangingForm(self.request.GET or None), "node_names": get_node_names(), - "nodes_xyz": RangeLocator.load().get_all_xyz(), + "nodes_xyz": Locator.load().get_all_xyz(), } diff --git a/src/c3nav/routing/api/positioning.py b/src/c3nav/routing/api/positioning.py index 91f0ba0d..081e8a9d 100644 --- a/src/c3nav/routing/api/positioning.py +++ b/src/c3nav/routing/api/positioning.py @@ -4,57 +4,19 @@ from django.core.exceptions import ValidationError from ninja import Field as APIField from ninja import Router as APIRouter from ninja import Schema -from pydantic import NegativeInt, PositiveInt from c3nav.api.auth import auth_responses -from c3nav.api.utils import NonEmptyStr from c3nav.mapdata.models.access import AccessPermission from c3nav.mapdata.schemas.models import CustomLocationSchema from c3nav.mapdata.utils.cache.stats import increment_cache_key from c3nav.routing.locator import Locator -from c3nav.routing.rangelocator import RangeLocator +from c3nav.routing.schemas import LocateRequestPeerSchema BSSIDSchema = Annotated[str, APIField(pattern=r"^[a-z0-9]{2}(:[a-z0-9]{2}){5}$", title="BSSID")] positioning_api_router = APIRouter(tags=["positioning"]) -class LocateRequestPeerSchema(Schema): - bssid: BSSIDSchema = APIField( - title="BSSID", - description="BSSID of the peer", - example="c3:42:13:37:ac:ab", - ) - ssid: NonEmptyStr = APIField( - title="SSID", - description="(E)SSID of the peer", - example="c3nav-locate", - ) - rssi: NegativeInt = APIField( - title="RSSI", - description="RSSI in dBm", - example=-42, - ) - frequency: Union[ - PositiveInt, - Annotated[None, APIField(title="null", description="frequency not given")] - ] = APIField( - default=None, - title="frequency", - description="frequency in KHz", - example=2472, - ) - distance: Union[ - float, - Annotated[None, APIField(title="null", description="distance was not measured")] - ] = APIField( - default=None, - title="distance", - description="measured distance in meters", - example=8.32 - ) - - class LocateRequestSchema(Schema): peers: list[LocateRequestPeerSchema] = APIField( title="list of visible/measured location beacons", @@ -106,13 +68,18 @@ def locate_test(): } msg = node.last_messages[MeshMessageType.LOCATE_RANGE_RESULTS] - locator = RangeLocator.load() - location = locator.locate( - { - r.peer: r.distance + locator = Locator.load() + location = locator.locate_range( + locator.convert_raw_scan_data([ + { + "bssid": r.peer, + "ssid": "", + "rssi": r.rssi, + "distance": r.distance, + } for r in msg.parsed.ranges if r.distance != 0xFFFF - }, + ]), None ) return { @@ -139,4 +106,4 @@ BeaconsXYZ = dict[ description="get xyz coordinates for all known positioning beacons", response={200: BeaconsXYZ, **auth_responses}) def beacons_xyz(): - return RangeLocator.load().get_all_xyz() + return Locator.load().get_all_xyz() diff --git a/src/c3nav/routing/api/routing.py b/src/c3nav/routing/api/routing.py index 699cee58..8dd3ac08 100644 --- a/src/c3nav/routing/api/routing.py +++ b/src/c3nav/routing/api/routing.py @@ -1,5 +1,5 @@ from enum import StrEnum -from typing import Annotated, Optional, Union +from typing import Annotated, Union from django.core.exceptions import ValidationError from django.urls import reverse diff --git a/src/c3nav/routing/locator.py b/src/c3nav/routing/locator.py index e52d0cbd..5c30f4d4 100644 --- a/src/c3nav/routing/locator.py +++ b/src/c3nav/routing/locator.py @@ -1,43 +1,126 @@ import operator import pickle -import re import threading -from collections import deque, namedtuple +from dataclasses import dataclass, field from functools import reduce +from pprint import pprint +from typing import Optional, Self, Sequence, TypeAlias import numpy as np +import scipy from django.conf import settings -from django.core.exceptions import ValidationError -from django.utils.translation import gettext_lazy as _ +from scipy.optimize import least_squares from c3nav.mapdata.models import MapUpdate, Space +from c3nav.mapdata.models.geometry.space import RangingBeacon from c3nav.mapdata.utils.locations import CustomLocation from c3nav.routing.router import Router +from c3nav.routing.schemas import LocateRequestPeerSchema + +BSSID: TypeAlias = str -class Locator: - filename = settings.CACHE_ROOT / 'locator' +@dataclass +class LocatorPeer: + bssid: BSSID + frequencies: set[int] = field(default_factory=set) + xyz: Optional[tuple[int, int, int]] = None - def __init__(self, stations, spaces): - self.stations = stations - self.spaces = spaces + +@dataclass +class ScanDataValue: + rssi: int + distance: Optional[float] = None @classmethod - def rebuild(cls, update): - stations = LocatorStations() - spaces = {} + def average(cls, items: Sequence[Self]): + rssi = [item.rssi for item in items] + distance = [item.distance for item in items if item.distance is not None] + return cls( + rssi=(sum(rssi)//len(rssi)), + distance=(sum(distance)/len(distance)) if distance else None, + ) + + +ScanData: TypeAlias = dict[int, ScanDataValue] + + +@dataclass +class LocatorPoint: + x: float + y: float + values: ScanData + + +@dataclass +class Locator: + peers: list[LocatorPeer] = field(default_factory=list) + peer_lookup: dict[BSSID, int] = field(default_factory=dict) + xyz: np.array = np.empty((0,)) + spaces: dict[int, "LocatorSpace"] = field(default_factory=dict) + + @classmethod + def rebuild(cls, update, router): + locator = cls() + locator._rebuild(router) + pickle.dump(locator, open(cls.build_filename(update), 'wb')) + return locator + + def _rebuild(self, router): + for beacon in RangingBeacon.objects.all(): + peer_id = self.get_peer_id(beacon.bssid, create=True) + self.peers[peer_id].xyz = ( + int(beacon.geometry.x * 100), + int(beacon.geometry.y * 100), + int((router.altitude_for_point(beacon.space_id, beacon.geometry) + float(beacon.altitude)) * 100), + ) + self.xyz = np.array(tuple(peer.xyz for peer in self.peers)) + for space in Space.objects.prefetch_related('wifi_measurements'): new_space = LocatorSpace( pk=space.pk, - points=(LocatorPoint.from_measurement(measurement, stations) - for measurement in space.wifi_measurements.all()) + points=tuple( + LocatorPoint( + x=measurement.x, + y=measurement.x, + values=self.convert_scans(measurement.data), + ) + for measurement in space.wifi_measurements.all() + ) ) if new_space.points: - spaces[space.pk] = new_space + self.spaces[space.pk] = new_space - locator = cls(stations, spaces) - pickle.dump(locator, open(cls.build_filename(update), 'wb')) - return locator + def get_peer_id(self, bssid: BSSID, create=False) -> Optional[int]: + peer_id = self.peer_lookup.get(bssid, None) + if peer_id is None and create: + peer = LocatorPeer(bssid=bssid) + peer_id = len(self.peers) + self.peer_lookup[bssid] = peer_id + self.peers.append(peer) + return peer_id + + def convert_scan(self, scan_data, create_peers=False) -> ScanData: + result = {} + for scan_value in scan_data: + if settings.WIFI_SSIDS and scan_value['ssid'] not in settings.WIFI_SSIDS: + continue + peer_id = self.get_peer_id(scan_value['bssid'], create=create_peers) + if peer_id is not None: + result[peer_id] = ScanDataValue(rssi=scan_value["rssi"], distance=scan_value["distance"]) + return result + + def convert_scans(self, scans_data, create_peers=False) -> ScanData: + converted = [] + for scan in scans_data: + converted.append(self.convert_scan(scan, create_peers=create_peers)) + peer_ids = reduce(operator.or_, (frozenset(values.keys()) for values in converted), frozenset()) + return { + peer_id: ScanDataValue.average( + tuple(values[peer_id] for values in converted if peer_id in values) + ) + for peer_id in peer_ids + } @classmethod def build_filename(cls, update): @@ -61,27 +144,44 @@ class Locator: cls.cached = cls.load_nocache(update) return cls.cached - def locate(self, scan, permissions=None): - router = Router.load() - restrictions = router.get_restrictions(permissions) + def convert_raw_scan_data(self, raw_scan_data: list[LocateRequestPeerSchema]) -> ScanData: + return self.convert_scan([d.dict() for d in raw_scan_data], create_peers=False) - scan = LocatorPoint.clean_scan(scan, ignore_invalid_stations=True) - scan_values = LocatorPoint.convert_scan(scan, self.stations, create=False) - if not scan_values: + def get_xyz(self, address: BSSID) -> tuple[int, int, int] | None: + i = self.get_peer_id(address) + if i is None: + return None + return self.peers[i].xyz + + def get_all_xyz(self) -> dict[BSSID, float]: + return { + peer: peer.xyz for peer in self.peers[:len(self.xyz)] + } + + def locate(self, raw_scan_data: list[LocateRequestPeerSchema], permissions=None): + scan_data = self.convert_raw_scan_data(raw_scan_data) + if not scan_data: return None - # convert scan values - scan_values = {station_id: value**2 for station_id, value in scan_values.items()} + result = self.locate_range(scan_data, permissions) + if result is not None: + return result + + return self.locate_rssi(scan_data, permissions) + + def locate_rssi(self, scan_data: ScanData, permissions=None): + router = Router.load() + restrictions = router.get_restrictions(permissions) # get visible spaces spaces = tuple(space for pk, space in self.spaces.items() if pk not in restrictions.spaces) # find best point - best_station_id = max(scan_values.items(), key=operator.itemgetter(1))[0] + best_peer_id = max(scan_data.items(), key=operator.itemgetter(1))[0] best_location = None best_score = float('inf') for space in spaces: - point, score = space.get_best_point(scan_values, needed_station_id=best_station_id) + point, score = space.get_best_point(scan_data, needed_peer_id=best_peer_id) if point is None: continue if score < best_score: @@ -94,165 +194,186 @@ class Locator: return best_location + def locate_range(self, scan_data: ScanData, permissions=None, orig_addr=None): + pprint(scan_data) -class LocatorStations: - def __init__(self): - self.stations = [] - self.stations_lookup = {} + peer_ids = tuple(i for i in scan_data if i < len(self.xyz)) - def get(self, bssid, ssid, frequency, create=False): - # yes, we're not looking up frequencies, because they can change… yep… - station_id = self.stations_lookup.get((bssid, None), None) - if station_id is not None: - station = self.stations[station_id] - station.frequencies.add(frequency) - elif create: - station = LocatorStation(bssid, ssid, {frequency}) - station_id = len(self.stations) - self.stations_lookup[(bssid, None)] = station_id - self.stations.append(station) - return station_id + if len(peer_ids) < 3: + # can't get a good result from just two beacons + # todo: maybe we can at least give… something? + print('less than 3 ranges, can\'t do ranging') + return None + + if len(peer_ids) == 3 and 0: + print('2D trilateration') + dimensions = 2 + else: + print('3D trilateration') + dimensions = 3 + + relevant_xyz = self.xyz[peer_ids, :] + + # create 2d array with x, y, z, distance as rows + np_ranges = np.hstack(( + relevant_xyz, + np.array(tuple(scan_data[i].distance for i in peer_ids)).reshape((-1, 1)), + )) + + print(np_ranges) + + measured_ranges = np_ranges[:, 3] + print('a', measured_ranges) + # measured_ranges[measured_ranges<1] = 1 + print('b', measured_ranges) + + # rating the guess by calculating the distances + def diff_func(guess): + result = scipy.linalg.norm(np_ranges[:, :dimensions] - guess[:dimensions], axis=1) - measured_ranges + # print(result) + return result + # factors = scipy.linalg.norm(np_ranges[:, :dimensions] - guess[:dimensions], axis=1) / measured_ranges + # return factors - np.mean(factors) + + def cost_func(guess): + result = np.abs(diff_func(guess)) + result[result < 300] = result[result < 300]/3+200 + return result + + # initial guess i the average of all beacons, with scale 1 + initial_guess = np.average(np_ranges[:, :dimensions], axis=0) + + # here the magic happens + results = least_squares( + fun=cost_func, + # jac="3-point", + loss="linear", + bounds=( + np.min(self.beacon_positions[:, :dimensions], axis=0) - np.array([200, 200, 100])[:dimensions], + np.max(self.beacon_positions[:, :dimensions], axis=0) + np.array([200, 200, 100])[:dimensions], + ), + x0=initial_guess, + ) + + # create result + # todo: figure out level + result_pos = results.x + from c3nav.mapdata.models import Level + location = CustomLocation( + level=Level.objects.first(), + x=result_pos[0]/100, + y=result_pos[1]/100, + permissions=(), + icon='my_location' + ) + location.z = result_pos[2]/100 + + pprint(relevant_xyz) + + orig_xyz = None + print('orig_addr', orig_addr) + if orig_addr: + orig_xyz = self.get_xyz(orig_addr) + if orig_xyz: + orig_xyz = np.array(orig_xyz) + + print() + print("result:", ", ".join(("%.2f" % i) for i in tuple(result_pos))) + if orig_xyz is not None: + print("correct:", ", ".join(("%.2f" % i) for i in tuple(orig_xyz))) + print("diff:", ", ".join(("%.2f" % i) for i in tuple(orig_xyz-result_pos))) + print() + print("measured ranges:", ", ".join(("%.2f" % i) for i in tuple(np_ranges[:, 3]))) + print("result ranges:", ", ".join( + ("%.2f" % i) for i in tuple(scipy.linalg.norm(np_ranges[:, :dimensions] - result_pos[:dimensions], axis=1)) + )) + if orig_xyz is not None: + print("correct ranges:", ", ".join( + ("%.2f" % i) + for i in tuple(scipy.linalg.norm(np_ranges[:, :dimensions] - orig_xyz[:dimensions], axis=1)) + )) + print() + print("diff result-measured:", ", ".join( + ("%.2f" % i) for i in + tuple(diff_func(result_pos)) + )) + if orig_xyz is not None: + print("diff correct-measured:", ", ".join( + ("%.2f" % i) for i in + tuple(diff_func(orig_xyz)) + )) + + def print_cost(title, pos): + cost = cost_func(pos) + print(title, ", ".join( + ("%.2f" % i) for i in cost + ), '=', np.sum(cost**2)) + print_cost("cost:", result_pos) + if orig_xyz is not None: + print_cost("cost of correct position:", orig_xyz) + if dimensions > 2: + print("height:", result_pos[2]) + # print("scale:", (factor or results.x[3])) + + return location +no_signal = int(-90)**2 + + +@dataclass class LocatorSpace: - no_signal = int(-90)**2 + pk: int + points: list[LocatorPoint] + peer_ids: frozenset[int] + peer_lookup: dict[int, int] + levels: np.array - def __init__(self, pk, points): - self.pk = pk - self.points = tuple(points) - self.stations_set = reduce(operator.or_, (frozenset(point.values.keys()) for point in self.points), frozenset()) - self.stations = tuple(self.stations_set) - self.stations_lookup = {station_id: i for i, station_id in enumerate(self.stations)} + @classmethod + def create(cls, pk: int, points: Sequence[LocatorPoint]): + peer_set = reduce(operator.or_, (frozenset(point.values.keys()) for point in points), frozenset()) + peers = tuple(peer_set) + peer_lookup = {peer_id: i for i, peer_id in enumerate(peers)} + levels = np.full((len(points), len(peers)), fill_value=no_signal, dtype=np.int64) + for i, point in enumerate(points): + for peer_id, value in point.values.items(): + levels[i][peer_lookup[peer_id]] = int(value)**2 - self.levels = np.full((len(self.points), len(self.stations)), fill_value=self.no_signal, dtype=np.int64) - for i, point in enumerate(self.points): - for station_id, value in point.values.items(): - self.levels[i][self.stations_lookup[station_id]] = int(value)**2 + return cls( + pk=pk, + points=list(points), + peer_ids=peer_set, + peer_lookup=peer_lookup, + levels=levels, + ) - def get_best_point(self, scan_values, needed_station_id=None): - # check if this space knows the needed station id, otherwise no results here - if needed_station_id not in self.stations_set: + def get_best_point(self, scan_values: ScanData, + needed_peer_id=None) -> tuple[LocatorPoint, float] | tuple[None, None]: + # check if this space knows the needed peer id, otherwise no results here + if needed_peer_id not in self.peer_ids: return None, None - # stations that this space knows - station_ids = frozenset(scan_values.keys()) & self.stations_set + # peers that this space knows + peer_id = frozenset(scan_values.keys()) & self.peer_ids penalty = 0 - for station_id, value in scan_values.items(): - if station_id not in self.stations_set: - penalty += (value - self.no_signal)**2 + for peer_id, value in scan_values.items(): + if peer_id not in self.peer_ids: + penalty += (value.rssi - no_signal)**2 - stations = tuple(self.stations_lookup[station_id] for station_id in station_ids) - values = np.array(tuple(scan_values[station_id] for station_id in station_ids), dtype=np.int64) + peers = tuple(self.peer_lookup[peer_id] for peer_id in peer_id) + values = np.array(tuple(scan_values[peer_id] for peer_id in peer_id), dtype=np.int64) - # acceptable points need to have a value for the needed_station_id - # points = tuple( - # np.argwhere(self.levels[:, self.stations_lookup[needed_station_id]] < self.no_signal).ravel() - # ) - # temporary: don't filter these points by needed station id! the noc is still having fun deploying new stationg + # acceptable points need to have a value for the needed_peer_id points = tuple( - np.argwhere(self.levels[:, self.stations_lookup[needed_station_id]] > 0).ravel() + np.argwhere(self.levels[:, self.peer_lookup[needed_peer_id]] > 0).ravel() ) if not points: return None, None scores = (np.sum( - (self.levels[np.array(points, dtype=np.uint32).reshape((-1, 1)), stations] - values)**2, + (self.levels[np.array(points, dtype=np.uint32).reshape((-1, 1)), peers] - values)**2, axis=1 )+penalty) / len(scan_values) best_point_i = np.argmin(scores).ravel()[0] best_point = points[best_point_i] return self.points[best_point], scores[best_point_i] - - -class LocatorPoint(namedtuple('LocatorPoint', ('x', 'y', 'values'))): - @classmethod - def from_measurement(cls, measurement, stations: LocatorStations): - return cls(x=measurement.geometry.x, y=measurement.geometry.y, - values=cls.convert_scans(measurement.data, stations, create=True)) - - @classmethod - def convert_scan(cls, scan, stations: LocatorStations, create=False): - values = {} - for scan_value in scan: - if settings.WIFI_SSIDS and scan_value['ssid'] not in settings.WIFI_SSIDS: - continue - station_id = stations.get(bssid=scan_value['bssid'], ssid=scan_value['ssid'], - frequency=scan_value['frequency'], create=create) - if station_id is not None: - # todo: convert to something more or less linear - values[station_id] = scan_value['level'] - return values - - @classmethod - def convert_scans(cls, scans, stations: LocatorStations, create=False): - values_list = deque() - for scan in scans: - values_list.append(cls.convert_scan(scan, stations, create)) - - station_ids = reduce(operator.or_, (frozenset(values.keys()) for values in values_list), frozenset()) - return { - station_id: cls.average(tuple(values[station_id] for values in values_list if station_id in values)) - for station_id in station_ids - } - - @staticmethod - def average(items): - return sum(items) / len(items) - - valid_frequencies = frozenset(( - 2412, 2417, 2422, 2427, 2432, 2437, 2442, 2447, 2452, 2457, 2462, 2467, 2472, 2484, - 5180, 5190, 5200, 5210, 5220, 5230, 5240, 5250, 5260, 5270, 5280, 5290, 5300, 5310, 5320, - 5500, 5510, 5520, 5530, 5540, 5550, 5560, 5570, 5580, 5590, 5600, 5610, 5620, 5630, 5640, - 5660, 5670, 5680, 5690, 5700, 5710, 5720, 5745, 5755, 5765, 5775, 5785, 5795, 5805, 5825 - )) - needed_keys = frozenset(('bssid', 'ssid', 'level', 'frequency')) - allowed_keys = needed_keys | frozenset(('last', )) - - @classmethod - def clean_scans(cls, data, ignore_invalid_stations=False): - if not isinstance(data, list): - raise ValidationError(_('Invalid Scan. Scans list list not a list.')) - return tuple(cls.clean_scan(scan) for scan in data) - - @classmethod - def clean_scan(cls, data, ignore_invalid_stations=False): - if not isinstance(data, list): - raise ValidationError(_('Invalid Scan. Scan not a list.')) - cleaned_scan = deque() - for scan_value in data: - try: - cleaned_scan.append(cls.clean_scan_value(scan_value)) - except ValidationError: - if not ignore_invalid_stations: - raise - return tuple(cleaned_scan) - - @classmethod - def clean_scan_value(cls, data): - if not isinstance(data, dict): - raise ValidationError(_('Invalid Scan. Scan value not a dictionary.')) - keys = frozenset(data.keys()) - if (keys - cls.allowed_keys) or (cls.needed_keys - keys): - raise ValidationError(_('Invalid Scan. Missing or forbidden keys.')) - if not isinstance(data['bssid'], str): - raise ValidationError(_('Invalid Scan. BSSID not a String.')) - data['bssid'] = data['bssid'].upper() - if not re.match(r'^([0-9A-F]{2}:){5}[0-9A-F]{2}$', data['bssid']): - raise ValidationError(_('Invalid Scan. Invalid BSSID.')) - if not isinstance(data['level'], int) or not (-1 >= data['level'] >= -100): - raise ValidationError(_('Invalid Scan. Invalid RSSI/Level.')) - if data['frequency'] not in cls.valid_frequencies: - raise ValidationError(_('Invalid Scan. Not an allowed frequency.')) - if 'last' in keys and (not isinstance(data['last'], int) or data['last'] <= 0): - raise ValidationError(_('Invalid Scan. Invalid last timestamp.')) - return data - - -class LocatorStation: - def __init__(self, bssid, ssid, frequencies=()): - self.bssid = bssid - self.ssid = ssid - self.frequencies = set(frequencies) - - def __repr__(self): - return 'LocatorStation(%r, %r, frequencies=%r)' % (self.bssid, self.ssid, self.frequencies) diff --git a/src/c3nav/routing/rangelocator.py b/src/c3nav/routing/rangelocator.py deleted file mode 100644 index 43044324..00000000 --- a/src/c3nav/routing/rangelocator.py +++ /dev/null @@ -1,204 +0,0 @@ -import pickle -import threading -from dataclasses import dataclass -from pprint import pprint -from typing import Annotated, Self - -import numpy as np -import scipy -from django.conf import settings -from scipy.optimize import least_squares - -from c3nav.mapdata.models import MapUpdate -from c3nav.mapdata.models.geometry.space import RangingBeacon -from c3nav.mapdata.utils.locations import CustomLocation -from c3nav.routing.router import Router - - -@dataclass -class RangeLocator: - filename = settings.CACHE_ROOT / 'rangelocator' - - beacon_positions: np.array - beacon_lookup: dict[str: int] - - @classmethod - def rebuild(cls, update, router): - beacons = RangingBeacon.objects.all() - - locator = cls( - beacon_positions=np.array(tuple( - ( - int(beacon.geometry.x * 100), - int(beacon.geometry.y * 100), - int((router.altitude_for_point(beacon.space_id, beacon.geometry) + float(beacon.altitude)) * 100), - ) - for beacon in beacons - )), - beacon_lookup={beacon.bssid: i for i, beacon in enumerate(beacons)} - ) - pickle.dump(locator, open(cls.build_filename(update), 'wb')) - return locator - - @classmethod - def build_filename(cls, update): - return settings.CACHE_ROOT / ('rangelocator_%s.pickle' % MapUpdate.build_cache_key(*update)) - - @classmethod - def load_nocache(cls, update): - return pickle.load(open(cls.build_filename(update), 'rb')) - - def get_xyz(self, address) -> tuple[int, int, int] | None: - try: - i = self.beacon_lookup[address] - except KeyError: - return None - return tuple(self.beacon_positions[i]) - - def get_all_xyz(self): - return { - address: tuple(self.beacon_positions[i].tolist()) - for address, i in self.beacon_lookup.items() - } - - cached = None - cache_update = None - cache_lock = threading.Lock() - - @classmethod - def load(cls) -> Self: - from c3nav.mapdata.models import MapUpdate - update = MapUpdate.last_processed_update() - if cls.cache_update != update: - with cls.cache_lock: - cls.cache_update = update - cls.cached = cls.load_nocache(update) - return cls.cached - - def locate(self, ranges: dict[str, int], permissions=None, orig_addr=None): - pprint(ranges) - # get the i and peer for every peer that we actually know - relevant_ranges = tuple( - (i, distance) for i, distance in ( - (self.beacon_lookup.get(bssid, None), distance) for bssid, distance in ranges.items() - ) if i is not None - ) - - relevant_positions = self.beacon_positions[tuple(i for i, _ in relevant_ranges), :] - mean = np.mean(relevant_positions, axis=0) - relevant_positions = relevant_positions - - # create 2d array with x, y, z, distance as rows - np_ranges = np.hstack(( - relevant_positions, - np.array(tuple(distance for i, distance in relevant_ranges)).reshape((-1, 1)), - )) - - print(np_ranges) - - if np_ranges.shape[0] < 3: - # can't get a good result from just two beacons - # todo: maybe we can at least give… something? - print('less than 3 ranges, can\'t do ranging') - return None - - if np_ranges.shape[0] == 3 and 0: - print('2D trilateration') - dimensions = 2 - else: - print('3D trilateration') - dimensions = 3 - - measured_ranges = np_ranges[:, 3] - print('a', measured_ranges) - #measured_ranges[measured_ranges<1] = 1 - print('b', measured_ranges) - - # rating the guess by calculating the distances - def diff_func(guess): - result = scipy.linalg.norm(np_ranges[:, :dimensions] - guess[:dimensions], axis=1) - measured_ranges - #print(result) - return result - # factors = scipy.linalg.norm(np_ranges[:, :dimensions] - guess[:dimensions], axis=1) / measured_ranges - # return factors - np.mean(factors) - - def cost_func(guess): - result = np.abs(diff_func(guess)) - result[result<300] = result[result<300]/3+200 - return result - - # initial guess i the average of all beacons, with scale 1 - initial_guess = np.average(np_ranges[:, :dimensions], axis=0) - - # here the magic happens - results = least_squares( - fun=cost_func, - #jac="3-point", - loss="linear", - bounds=( - np.min(self.beacon_positions[:, :dimensions], axis=0) - np.array([200, 200, 100])[:dimensions], - np.max(self.beacon_positions[:, :dimensions], axis=0) + np.array([200, 200, 100])[:dimensions], - ), - x0=initial_guess, - ) - - # create result - # todo: figure out level - result_pos = results.x - from c3nav.mapdata.models import Level - location = CustomLocation( - level=Level.objects.first(), - x=result_pos[0]/100, - y=result_pos[1]/100, - permissions=(), - icon='my_location' - ) - location.z = result_pos[2]/100 - - pprint(relevant_ranges) - - orig_xyz = None - print('orig_addr', orig_addr) - if orig_addr: - orig_xyz = self.get_xyz(orig_addr) - if orig_xyz: - orig_xyz = np.array(orig_xyz) - - print() - print("result:", ", ".join(("%.2f" % i) for i in tuple(result_pos))) - if orig_xyz is not None: - print("correct:", ", ".join(("%.2f" % i) for i in tuple(orig_xyz))) - print("diff:", ", ".join(("%.2f" % i) for i in tuple(orig_xyz-result_pos))) - print() - print("measured ranges:", ", ".join(("%.2f" % i) for i in tuple(np_ranges[:, 3]))) - print("result ranges:", ", ".join( - ("%.2f" % i) for i in tuple(scipy.linalg.norm(np_ranges[:, :dimensions] - result_pos[:dimensions], axis=1)) - )) - if orig_xyz is not None: - print("correct ranges:", ", ".join( - ("%.2f" % i) for i in tuple(scipy.linalg.norm(np_ranges[:, :dimensions] - orig_xyz[:dimensions], axis=1)) - )) - print() - print("diff result-measured:", ", ".join( - ("%.2f" % i) for i in - tuple(diff_func(result_pos)) - )) - if orig_xyz is not None: - print("diff correct-measured:", ", ".join( - ("%.2f" % i) for i in - tuple(diff_func(orig_xyz)) - )) - - def print_cost(title, pos): - cost = cost_func(pos) - print(title, ", ".join( - ("%.2f" % i) for i in cost - ), '=', np.sum(cost**2)) - print_cost("cost:", result_pos) - if orig_xyz is not None: - print_cost("cost of correct position:", orig_xyz) - if dimensions > 2: - print("height:", result_pos[2]) - # print("scale:", (factor or results.x[3])) - - return location diff --git a/src/c3nav/routing/schemas.py b/src/c3nav/routing/schemas.py new file mode 100644 index 00000000..5ad080fc --- /dev/null +++ b/src/c3nav/routing/schemas.py @@ -0,0 +1,44 @@ +from typing import Annotated, Union + +from ninja import Schema +from pydantic import Field as APIField +from pydantic import NegativeInt, PositiveInt + +from c3nav.api.utils import NonEmptyStr +from c3nav.routing.api.positioning import BSSIDSchema + + +class LocateRequestPeerSchema(Schema): + bssid: BSSIDSchema = APIField( + title="BSSID", + description="BSSID of the peer", + example="c3:42:13:37:ac:ab", + ) + ssid: NonEmptyStr = APIField( + title="SSID", + description="(E)SSID of the peer", + example="c3nav-locate", + ) + rssi: NegativeInt = APIField( + title="RSSI", + description="RSSI in dBm", + example=-42, + ) + frequency: Union[ + PositiveInt, + Annotated[None, APIField(title="null", description="frequency not given")] + ] = APIField( + default=None, + title="frequency", + description="frequency in KHz", + example=2472, + ) + distance: Union[ + float, + Annotated[None, APIField(title="null", description="distance was not measured")] + ] = APIField( + default=None, + title="distance", + description="measured distance in meters", + example=8.32 + )