team-3/src/c3nav/routing/newapi/positioning.py

103 lines
3.3 KiB
Python
Raw Normal View History

2023-12-02 00:00:23 +01:00
from typing import Annotated, Optional
2023-11-27 22:59:59 +01:00
2023-12-03 18:25:26 +01:00
from django.core.exceptions import ValidationError
2023-11-27 22:59:59 +01:00
from ninja import Field as APIField
from ninja import Router as APIRouter
from ninja import Schema
2023-12-03 18:25:26 +01:00
from pydantic import NegativeInt, PositiveInt
from c3nav.api.newauth import auth_responses
2023-12-02 00:00:23 +01:00
from c3nav.api.utils import NonEmptyStr
2023-12-03 18:25:26 +01:00
from c3nav.mapdata.models.access import AccessPermission
2023-12-02 00:00:23 +01:00
from c3nav.mapdata.schemas.models import CustomLocationSchema
2023-12-03 18:25:26 +01:00
from c3nav.mapdata.utils.cache.stats import increment_cache_key
from c3nav.routing.locator import Locator
2023-11-27 22:59:59 +01:00
from c3nav.routing.rangelocator import RangeLocator
2023-12-02 00:00:23 +01:00
BSSIDSchema = Annotated[str, APIField(pattern=r"^[a-z0-9]{2}(:[a-z0-9]{2}){5}$", title="BSSID")]
positioning_api_router = APIRouter(tags=["positioning"])
2023-12-03 18:25:26 +01:00
class LocateRequestPeerSchema(Schema):
2023-12-02 00:00:23 +01:00
bssid: BSSIDSchema
2023-12-03 18:25:26 +01:00
ssid: NonEmptyStr
2023-12-02 00:00:23 +01:00
rssi: NegativeInt
2023-12-03 18:25:26 +01:00
frequency: Optional[PositiveInt] = None
2023-12-02 00:00:23 +01:00
distance: Optional[float] = None
class LocateRequestSchema(Schema):
2023-12-03 18:25:26 +01:00
peers: list[LocateRequestPeerSchema]
2023-12-02 00:00:23 +01:00
class PositioningResult(Schema):
location: Optional[CustomLocationSchema]
@positioning_api_router.post('/locate/', summary="locate based on wifi scans",
2023-12-02 00:00:23 +01:00
response={200: PositioningResult, **auth_responses})
2023-12-03 18:25:26 +01:00
def get_position(request, parameters: LocateRequestSchema):
try:
location = Locator.load().locate(parameters.dict()["peers"], permissions=AccessPermission.get_for_request(request))
if location is not None:
# todo: this will overload us probably, group these
increment_cache_key('apistats__locate__%s' % location.pk)
except ValidationError:
# todo: validation error, seriously? this shouldn't happen anyways
raise
return {
"location": location.serialize(simple_geometry=True),
}
2023-11-26 17:55:23 +01:00
@positioning_api_router.get('/locate-test/', summary="get dummy location for debugging",
2023-12-02 00:00:23 +01:00
response={200: PositioningResult, **auth_responses})
def locate_test():
2023-12-03 18:25:26 +01:00
from c3nav.mesh.messages import MeshMessageType
from c3nav.mesh.models import MeshNode
try:
node = MeshNode.objects.prefetch_last_messages(MeshMessageType.LOCATE_RANGE_RESULTS).get(
address="d4:f9:8d:2d:0d:f1"
)
except MeshNode.DoesNotExist:
return {
"location": None
}
msg = node.last_messages[MeshMessageType.LOCATE_RANGE_RESULTS]
locator = RangeLocator.load()
location = locator.locate(
{
r.peer: r.distance
for r in msg.parsed.ranges
if r.distance != 0xFFFF
},
None
)
return {
"ranges": msg.parsed.tojson(msg.parsed)["ranges"],
"datetime": msg.datetime,
"location": location.serialize(simple_geometry=True) if location else None
}
2023-11-27 22:59:59 +01:00
BeaconsXYZ = dict[
2023-12-02 00:00:23 +01:00
BSSIDSchema,
2023-11-27 22:59:59 +01:00
Annotated[
tuple[
Annotated[int, APIField(title="X (in cm)")],
Annotated[int, APIField(title="Y (in cm)")],
Annotated[int, APIField(title="Z (in cm)")],
],
APIField(title="global XYZ coordinates")
]
]
@positioning_api_router.get('/beacons-xyz/', summary="get calculated x y z for all beacons",
response={200: BeaconsXYZ, **auth_responses})
def beacons_xyz():
2023-11-27 22:59:59 +01:00
return RangeLocator.load().get_all_xyz()