diff --git a/src/c3nav/mapdata/management/commands/importnoc.py b/src/c3nav/mapdata/management/commands/importnoc.py index c2d187ab..8535ceaf 100644 --- a/src/c3nav/mapdata/management/commands/importnoc.py +++ b/src/c3nav/mapdata/management/commands/importnoc.py @@ -80,7 +80,7 @@ class Command(BaseCommand): if result.space == new_space and distance(unwrap_geom(result.geometry), point) < 0.20: altitude_quest = False - result.comment = name + result.ap_name = name result.space = new_space result.geometry = point result.altitude = 0 diff --git a/src/c3nav/mapdata/management/commands/importpoc.py b/src/c3nav/mapdata/management/commands/importpoc.py index b8c752b8..bff65cbe 100644 --- a/src/c3nav/mapdata/management/commands/importpoc.py +++ b/src/c3nav/mapdata/management/commands/importpoc.py @@ -1,133 +1,94 @@ -import hashlib +from typing import Literal import requests from django.conf import settings from django.core.management.base import BaseCommand from pydantic import BaseModel +from pydantic.type_adapter import TypeAdapter +from pydantic_extra_types.mac_address import MacAddress from shapely import distance +from shapely.geometry import shape, Point -from c3nav.mapdata.models import MapUpdate, Space, Level +from c3nav.api.schema import PointSchema +from c3nav.mapdata.models import MapUpdate, Level from c3nav.mapdata.models.geometry.space import RangingBeacon -from c3nav.mapdata.models.report import Report from c3nav.mapdata.utils.cache.changes import changed_geometries from c3nav.mapdata.utils.geometry import unwrap_geom -from shapely.ops import nearest_points, unary_union +from c3nav.mapdata.utils.importer import PointImportHelper -class NocImportItem(BaseModel): +class PocImportItemProperties(BaseModel): + level: str + mac: MacAddress + name: str + + +class PocImportItem(BaseModel): """ Something imported from the NOC """ - lat: float | int - lng: float | int - layer: str - type: str = "unknown" + type: Literal["Point"] = "Point" + geometry: PointSchema + properties: PocImportItemProperties class Command(BaseCommand): help = 'import APs from noc' def handle(self, *args, **options): - r = requests.get(settings.NOC_BASE+"/api/markers/get") + r = requests.get(settings.POC_BASE+"antenna-locations") r.raise_for_status() - items = {name: NocImportItem.model_validate(item) - for name, item in r.json()["markers"].items() - if not name.startswith("__polyline")} + items = TypeAdapter(list[PocImportItem]).validate_python(r.json()) with MapUpdate.lock(): changed_geometries.reset() self.do_import(items) MapUpdate.objects.create(type='importnoc') - def do_import(self, items: dict[str, NocImportItem]): - spaces_for_level = {} - levels = tuple(Level.objects.values_list("pk", flat=True)) - lower_levels_for_level = {pk: levels[:i] for i, pk in enumerate(levels)} - - for space in Space.objects.select_related('level').prefetch_related('holes'): - spaces_for_level.setdefault(space.level_id, []).append(space) + def do_import(self, items: list[PocImportItem]): + import_helper = PointImportHelper() beacons_so_far: dict[str, RangingBeacon] = { - **{m.import_tag: m for m in RangingBeacon.objects.filter(import_tag__startswith="noc:")}, + **{m.import_tag: m for m in RangingBeacon.objects.filter(import_tag__startswith="poc:", + beacon_type=RangingBeacon.BeaconType.DECT)}, } - for name, item in items.items(): - import_tag = f"noc:{name}" + levels_by_level_index = {str(level.level_index): level for level in Level.objects.all()} - if item.type != "AP": - continue + for item in items: + import_tag = f"poc:{item.properties.name}" # determine geometry - converter = settings.NOC_LAYERS.get(item.layer, None) - if not converter: - print(f"ERROR: {name} has invalid layer: {item.layer}") + level_id = levels_by_level_index[item.properties.level] + + point: Point = shape(item.geometry.model_dump()) # nowa + + new_space, point = import_helper.get_point_and_space( + level_id=level_id, + point=point, + name=item.properties.name, + ) + + if new_space is None: continue - new_geometry = converter.convert(item.lat, item.lng) - - # determine space - possible_spaces = [space for space in spaces_for_level[converter.level_id] - if space.geometry.intersects(new_geometry)] - if not possible_spaces: - possible_spaces = [space for space in spaces_for_level[converter.level_id] - if distance(unwrap_geom(space.geometry), new_geometry) < 0.3] - if len(possible_spaces) == 1: - new_space = possible_spaces[0] - the_distance = distance(unwrap_geom(new_space.geometry), new_geometry) - print(f"SUCCESS: {name} is {the_distance:.02f}m away from {new_space.title}") - elif len(possible_spaces) == 2: - new_space = min(possible_spaces, key=lambda s: distance(unwrap_geom(s.geometry), new_geometry)) - print(f"WARNING: {name} could be in multiple spaces ({possible_spaces}, picking {new_space}...") - else: - print(f"ERROR: {name} is not within any space (NOC: {(item.lat, item.lng)}, NAV: {new_geometry}") - continue - - # move point into space if needed - new_space_geometry = new_space.geometry.difference( - unary_union([unwrap_geom(hole.geometry) for hole in new_space.columns.all()]) - ) - if not new_space_geometry.intersects(new_geometry): - new_geometry = nearest_points(new_space_geometry.buffer(-0.05), new_geometry)[0] - elif len(possible_spaces) == 1: - new_space = possible_spaces[0] - print(f"SUCCESS: {name} is in {new_space.title}") - else: - print(f"WARNING: {name} could be in multiple spaces, picking one...") - new_space = possible_spaces[0] - - lower_levels = lower_levels_for_level[new_space.level_id] - for lower_level in reversed(lower_levels): - # let's go through the lower levels - if not unary_union([unwrap_geom(h.geometry) for h in new_space.holes.all()]).intersects(new_geometry): - # current selected spacae is fine, that's it - break - print(f"NOTE: {name} is in a hole, looking lower...") - - # find a lower space - possible_spaces = [space for space in spaces_for_level[lower_level] - if space.geometry.intersects(new_geometry)] - if possible_spaces: - new_space = possible_spaces[0] - print(f"NOTE: {name} moved to lower space {new_space}") - else: - print(f"WARNING: {name} couldn't find a lower space, still in a hole") - # find existing location result = beacons_so_far.pop(import_tag, None) # build resulting object altitude_quest = True if not result: - result = RangingBeacon(import_tag=import_tag) + result = RangingBeacon(import_tag=import_tag, beacon_type=RangingBeacon.BeaconType.EVENT_WIFI) else: - if result.space == new_space and distance(unwrap_geom(result.geometry), new_geometry) < 0.03: + if result.space == new_space and distance(unwrap_geom(result.geometry), point) < 0.03: continue - if result.space == new_space and distance(unwrap_geom(result.geometry), new_geometry) < 0.20: + if result.space == new_space and distance(unwrap_geom(result.geometry), point) < 0.20: altitude_quest = False - result.comment = name + result.ap_name = item.properties.name + result.addresses = [item.properties.mac.lower()] result.space = new_space - result.geometry = new_geometry + result.geometry = point result.altitude = 0 if altitude_quest: result.altitude_quest = True