api endpoint to show ranging beacons on map

This commit is contained in:
Laura Klünder 2024-03-30 16:37:48 +01:00
parent 43ee0a07a4
commit 7798206fe2
2 changed files with 61 additions and 3 deletions

View file

@ -8,13 +8,15 @@ from ninja import Router as APIRouter
from ninja import Schema, UploadedFile
from ninja.pagination import paginate
from pydantic import PositiveInt, field_validator
from shapely.geometry.geo import mapping
from c3nav.api.auth import APIKeyAuth, auth_permission_responses, auth_responses, validate_responses
from c3nav.api.exceptions import API404, APIConflict, APIRequestValidationFailed
from c3nav.api.schema import BaseSchema
from c3nav.mesh.schemas import BoardType, ChipType, FirmwareImage
from c3nav.mapdata.models.geometry.space import RangingBeacon
from c3nav.mesh.messages import MeshMessageType, MeshMessage
from c3nav.mesh.models import FirmwareBuild, FirmwareVersion, NodeMessage
from c3nav.mesh.models import FirmwareBuild, FirmwareVersion, NodeMessage, MeshNode
from c3nav.mesh.schemas import BoardType, ChipType, FirmwareImage, RangingBeaconGeoFeature
mesh_api_router = APIRouter(tags=["mesh"], auth=APIKeyAuth(permissions={"mesh_control"}))
@ -243,3 +245,40 @@ def messages_list(request, filters: Query[MessagesFilter]):
if filters.time_until:
qs = qs.filter(datetime__lte=filters.time_until)
return qs.order_by('-datetime')
@mesh_api_router.get(
'/map/{level_id}/', summary="ranging beacons map",
description="query and filter all received mesh messages",
response={200: list[RangingBeaconGeoFeature], **auth_permission_responses},
openapi_extra={"security": [{"APIKeyAuth": ["mesh_control"]}]}
)
def mesh_map(request, level_id: int):
beacons = RangingBeacon.objects.filter(space__level__id=level_id)
beacon_ids = set(beacon.id for beacon in beacons)
nodes_for_beacons = {
node.ranging_beacon.id: node
for node in MeshNode.objects.all().prefetch_last_messages().prefetch_ranging_beacon()
if node.ranging_beacon and node.ranging_beacon.id in beacon_ids
}
result = []
for beacon in RangingBeacon.objects.filter(space__level__id=level_id):
node = nodes_for_beacons.get(beacon.id, None)
node_uplink = None if node is None else node.get_uplink()
result.append({
"type": "Feature",
"geometry": mapping(beacon.geometry),
"properties": {
"node_number": beacon.node_number,
"wifi_bssid": beacon.wifi_bssid,
"comment": beacon.comment,
"mesh_node": None if node is None else {
"address": node.address,
"uplink": None if node_uplink is None else node_uplink.node_id,
},
}
})
return result

View file

@ -1,7 +1,7 @@
import re
from dataclasses import dataclass, field
from enum import unique
from typing import Annotated, BinaryIO, ClassVar, Literal, Self, Union
from typing import Annotated, BinaryIO, ClassVar, Literal, Self, Union, Optional
from annotated_types import Gt, Le, Lt, MaxLen, Ge
from pydantic import NegativeInt, PositiveInt
@ -9,6 +9,7 @@ from pydantic.main import BaseModel
from pydantic.types import Discriminator, NonNegativeInt, NonPositiveInt
from pydantic_extra_types.mac_address import MacAddress
from c3nav.api.schema import BaseSchema, PointSchema
from c3nav.mesh.cformats import AsDefinition, AsHex, CName, ExistingCStruct, discriminator_value, \
CEnum, TwoNibblesEncodable
@ -282,3 +283,21 @@ class FirmwareImage(BaseModel):
def from_file(cls, file: BinaryIO) -> Self:
result, data = cls.decode(file.read(FirmwareImage.get_min_size()))
return result
class MeshNodeGeoFeatureProperties(BaseSchema):
address: MacAddress
uplink: Optional[MacAddress]
class RangingBeaconGeoFeatureProperties(BaseSchema):
node_number: Optional[int]
wifi_bssid: Optional[MacAddress]
comment: Optional[str]
mesh_node: Optional[MeshNodeGeoFeatureProperties]
class RangingBeaconGeoFeature(BaseSchema):
type: Literal["Feature"]
geometry: PointSchema
properties: RangingBeaconGeoFeatureProperties