team-3/src/c3nav/routing/api/positioning.py

121 lines
4.2 KiB
Python
Raw Normal View History

from typing import Annotated, Union
2023-11-27 22:59:59 +01:00
2024-03-30 22:12:27 +01:00
from django.conf import settings
2023-12-03 18:25:26 +01:00
from django.core.exceptions import ValidationError
2023-11-27 22:59:59 +01:00
from ninja import Field as APIField
from ninja import Router as APIRouter
from pydantic_extra_types.mac_address import MacAddress
2023-12-03 21:55:08 +01:00
from c3nav.api.auth import auth_responses
from c3nav.api.schema import BaseSchema
2023-12-03 18:25:26 +01:00
from c3nav.mapdata.models.access import AccessPermission
2023-12-02 00:00:23 +01:00
from c3nav.mapdata.schemas.models import CustomLocationSchema
2024-12-28 22:58:25 +01:00
from c3nav.mapdata.tasks import update_ap_names_bssid_mapping
2023-12-03 18:25:26 +01:00
from c3nav.mapdata.utils.cache.stats import increment_cache_key
from c3nav.routing.locator import Locator
from c3nav.routing.schemas import LocateWifiPeerSchema, LocateIBeaconPeerSchema
positioning_api_router = APIRouter(tags=["positioning"])
class LocateRequestSchema(BaseSchema):
wifi_peers: list[LocateWifiPeerSchema] = APIField(
title="list of visible/measured wifi location beacons",
)
ibeacon_peers: list[LocateIBeaconPeerSchema] = APIField(
title="list of visible/measured location iBeacons",
)
2023-12-02 00:00:23 +01:00
class PositioningResult(BaseSchema):
location: Union[
Annotated[CustomLocationSchema, APIField(title="location")],
Annotated[None, APIField(title="null", description="position could not be determined")]
] = APIField(
title="location",
description="positinoing result",
)
2023-12-02 00:00:23 +01:00
@positioning_api_router.post('/locate/', summary="determine position",
description="determine position based on wireless measurements "
"(including ranging, if available)",
2023-12-02 00:00:23 +01:00
response={200: PositioningResult, **auth_responses})
2024-12-28 22:58:25 +01:00
def get_position(request, parameters: LocateRequestSchema):
2023-12-03 18:25:26 +01:00
try:
location = Locator.load().locate(parameters.wifi_peers,
2023-12-11 19:02:19 +01:00
permissions=AccessPermission.get_for_request(request))
2023-12-03 18:25:26 +01:00
if location is not None:
# todo: this will overload us probably, group these
increment_cache_key('apistats__locate__%s' % location.pk)
except ValidationError:
# todo: validation error, seriously? this shouldn't happen anyways
raise
if request.user_permissions.passive_ap_name_scanning:
bssid_mapping = {}
for peer in parameters.wifi_peers:
if not peer.ap_name:
continue
bssid_mapping.setdefault(peer.ap_name, set()).add(peer.bssid)
if bssid_mapping:
update_ap_names_bssid_mapping.delay(map_name=bssid_mapping, user=request.user)
2023-12-03 18:25:26 +01:00
return {
"location": location
2023-12-03 18:25:26 +01:00
}
2023-11-26 17:55:23 +01:00
2024-03-30 22:12:27 +01:00
if settings.METRICS:
from c3nav.mapdata.metrics import APIStatsCollector
APIStatsCollector.add_stat('locate', 'location')
@positioning_api_router.get('/locate-test/', summary="debug position",
description="outputs a location for debugging purposes",
2023-12-02 00:00:23 +01:00
response={200: PositioningResult, **auth_responses})
def locate_test(request):
2023-12-03 18:25:26 +01:00
from c3nav.mesh.messages import MeshMessageType
from c3nav.mesh.models import MeshNode
try:
node = MeshNode.objects.prefetch_last_messages(MeshMessageType.LOCATE_RANGE_RESULTS).get(
address="d4:f9:8d:2d:0d:f1"
)
except MeshNode.DoesNotExist:
return {
"location": None
}
msg = node.last_messages[MeshMessageType.LOCATE_RANGE_RESULTS]
locator = Locator.load()
location = locator.locate_range(
locator.convert_raw_scan_data([
{
"bssid": r.peer,
"ssid": "",
"rssi": r.rssi,
"distance": r.distance,
}
2023-12-03 18:25:26 +01:00
for r in msg.parsed.ranges
if r.distance != 0xFFFF
]),
2023-12-03 18:25:26 +01:00
None
)
return {
2024-03-29 15:01:39 +01:00
"ranges": msg.parsed.model_dump(mode="json")["ranges"],
2023-12-03 18:25:26 +01:00
"datetime": msg.datetime,
"location": location
2023-12-03 18:25:26 +01:00
}
2023-11-27 22:59:59 +01:00
BeaconsXYZ = dict[
MacAddress,
2023-11-27 22:59:59 +01:00
Annotated[
tuple[
Annotated[int, APIField(title="X (in cm)")],
Annotated[int, APIField(title="Y (in cm)")],
Annotated[int, APIField(title="Z (in cm)")],
],
APIField(title="global XYZ coordinates")
]
]