pointimporthelper

This commit is contained in:
Laura Klünder 2024-12-28 15:35:15 +01:00
parent cfed6d9ed7
commit 6d78a0c491
3 changed files with 82 additions and 62 deletions

View file

@ -1,17 +1,14 @@
import hashlib
import requests import requests
from django.conf import settings from django.conf import settings
from django.core.management.base import BaseCommand from django.core.management.base import BaseCommand
from pydantic import BaseModel from pydantic import BaseModel
from shapely import distance from shapely import distance
from c3nav.mapdata.models import MapUpdate, Space, Level from c3nav.mapdata.models import MapUpdate
from c3nav.mapdata.models.geometry.space import RangingBeacon from c3nav.mapdata.models.geometry.space import RangingBeacon
from c3nav.mapdata.models.report import Report from c3nav.mapdata.utils.importer import PointImportHelper
from c3nav.mapdata.utils.cache.changes import changed_geometries from c3nav.mapdata.utils.cache.changes import changed_geometries
from c3nav.mapdata.utils.geometry import unwrap_geom from c3nav.mapdata.utils.geometry import unwrap_geom
from shapely.ops import nearest_points, unary_union
class NocImportItem(BaseModel): class NocImportItem(BaseModel):
@ -40,12 +37,7 @@ class Command(BaseCommand):
MapUpdate.objects.create(type='importnoc') MapUpdate.objects.create(type='importnoc')
def do_import(self, items: dict[str, NocImportItem]): def do_import(self, items: dict[str, NocImportItem]):
spaces_for_level = {} import_helper = PointImportHelper()
levels = tuple(Level.objects.values_list("pk", flat=True))
lower_levels_for_level = {pk: levels[:i] for i, pk in enumerate(levels)}
for space in Space.objects.select_related('level').prefetch_related('holes'):
spaces_for_level.setdefault(space.level_id, []).append(space)
beacons_so_far: dict[str, RangingBeacon] = { beacons_so_far: dict[str, RangingBeacon] = {
**{m.import_tag: m for m in RangingBeacon.objects.filter(import_tag__startswith="noc:", **{m.import_tag: m for m in RangingBeacon.objects.filter(import_tag__startswith="noc:",
@ -64,54 +56,16 @@ class Command(BaseCommand):
print(f"ERROR: {name} has invalid layer: {item.layer}") print(f"ERROR: {name} has invalid layer: {item.layer}")
continue continue
new_geometry = converter.convert(item.lat, item.lng) point = converter.convert(item.lat, item.lng)
# determine space new_space, point = import_helper.get_point_and_space(
possible_spaces = [space for space in spaces_for_level[converter.level_id] level_id=converter.level_id,
if space.geometry.intersects(new_geometry)] point=point,
if not possible_spaces: name=name,
possible_spaces = [space for space in spaces_for_level[converter.level_id]
if distance(unwrap_geom(space.geometry), new_geometry) < 0.3]
if len(possible_spaces) == 1:
new_space = possible_spaces[0]
the_distance = distance(unwrap_geom(new_space.geometry), new_geometry)
print(f"SUCCESS: {name} is {the_distance:.02f}m away from {new_space.title}")
elif len(possible_spaces) == 2:
new_space = min(possible_spaces, key=lambda s: distance(unwrap_geom(s.geometry), new_geometry))
print(f"WARNING: {name} could be in multiple spaces ({possible_spaces}, picking {new_space}...")
else:
print(f"ERROR: {name} is not within any space (NOC: {(item.lat, item.lng)}, NAV: {new_geometry}")
continue
# move point into space if needed
new_space_geometry = new_space.geometry.difference(
unary_union([unwrap_geom(hole.geometry) for hole in new_space.columns.all()])
) )
if not new_space_geometry.intersects(new_geometry):
new_geometry = nearest_points(new_space_geometry.buffer(-0.05), new_geometry)[0]
elif len(possible_spaces) == 1:
new_space = possible_spaces[0]
print(f"SUCCESS: {name} is in {new_space.title}")
else:
print(f"WARNING: {name} could be in multiple spaces, picking one...")
new_space = possible_spaces[0]
lower_levels = lower_levels_for_level[new_space.level_id] if new_space is None:
for lower_level in reversed(lower_levels): continue
# let's go through the lower levels
if not unary_union([unwrap_geom(h.geometry) for h in new_space.holes.all()]).intersects(new_geometry):
# current selected spacae is fine, that's it
break
print(f"NOTE: {name} is in a hole, looking lower...")
# find a lower space
possible_spaces = [space for space in spaces_for_level[lower_level]
if space.geometry.intersects(new_geometry)]
if possible_spaces:
new_space = possible_spaces[0]
print(f"NOTE: {name} moved to lower space {new_space}")
else:
print(f"WARNING: {name} couldn't find a lower space, still in a hole")
# find existing location # find existing location
result = beacons_so_far.pop(import_tag, None) result = beacons_so_far.pop(import_tag, None)
@ -121,14 +75,14 @@ class Command(BaseCommand):
if not result: if not result:
result = RangingBeacon(import_tag=import_tag, beacon_type=RangingBeacon.BeaconType.EVENT_WIFI) result = RangingBeacon(import_tag=import_tag, beacon_type=RangingBeacon.BeaconType.EVENT_WIFI)
else: else:
if result.space == new_space and distance(unwrap_geom(result.geometry), new_geometry) < 0.03: if result.space == new_space and distance(unwrap_geom(result.geometry), point) < 0.03:
continue continue
if result.space == new_space and distance(unwrap_geom(result.geometry), new_geometry) < 0.20: if result.space == new_space and distance(unwrap_geom(result.geometry), point) < 0.20:
altitude_quest = False altitude_quest = False
result.comment = name result.comment = name
result.space = new_space result.space = new_space
result.geometry = new_geometry result.geometry = point
result.altitude = 0 result.altitude = 0
if altitude_quest: if altitude_quest:
result.altitude_quest = True result.altitude_quest = True

View file

@ -51,5 +51,6 @@ def process_map_updates(self):
@app.task(bind=True, max_retries=10) @app.task(bind=True, max_retries=10)
def delete_map_cache_key(self, cache_key): def delete_map_cache_key(self, cache_key):
if hasattr(cache, 'keys'):
for key in cache.keys(f'*{cache_key}*'): for key in cache.keys(f'*{cache_key}*'):
cache.delete(key) cache.delete(key)

View file

@ -0,0 +1,65 @@
from shapely import Point, distance
from shapely.ops import unary_union, nearest_points
from c3nav.mapdata.models import Level, Space
from c3nav.mapdata.utils.geometry import unwrap_geom
class PointImportHelper:
def __init__(self):
self.spaces_for_level = {}
self.levels = tuple(Level.objects.values_list("pk", flat=True))
self.lower_levels_for_level = {pk: self.levels[:i] for i, pk in enumerate(self.levels)}
for space in Space.objects.select_related('level').prefetch_related('holes'):
self.spaces_for_level.setdefault(space.level_id, []).append(space)
def get_point_and_space(self, level_id: int, point: Point, name: str):
# determine space
possible_spaces = [space for space in self.spaces_for_level[level_id]
if space.geometry.intersects(point)]
if not possible_spaces:
possible_spaces = [space for space in self.spaces_for_level[level_id]
if distance(unwrap_geom(space.geometry), point) < 0.3]
if len(possible_spaces) == 1:
new_space = possible_spaces[0]
the_distance = distance(unwrap_geom(new_space.geometry), point)
print(f"SUCCESS: {name} is {the_distance:.02f}m away from {new_space.title}")
elif len(possible_spaces) == 2:
new_space = min(possible_spaces, key=lambda s: distance(unwrap_geom(s.geometry), point))
print(f"WARNING: {name} could be in multiple spaces ({possible_spaces}, picking {new_space}...")
else:
print(f"ERROR: {name} is not within any space ({point})")
return None, None
# move point into space if needed
new_space_geometry = new_space.geometry.difference(
unary_union([unwrap_geom(hole.geometry) for hole in new_space.columns.all()])
)
if not new_space_geometry.intersects(point):
point = nearest_points(new_space_geometry.buffer(-0.05), point)[0]
elif len(possible_spaces) == 1:
new_space = possible_spaces[0]
print(f"SUCCESS: {name} is in {new_space.title}")
else:
print(f"WARNING: {name} could be in multiple spaces, picking one...")
new_space = possible_spaces[0]
lower_levels = self.lower_levels_for_level[new_space.level_id]
for lower_level in reversed(lower_levels):
# let's go through the lower levels
if not unary_union([unwrap_geom(h.geometry) for h in new_space.holes.all()]).intersects(point):
# current selected spacae is fine, that's it
break
print(f"NOTE: {name} is in a hole, looking lower...")
# find a lower space
possible_spaces = [space for space in self.spaces_for_level[lower_level]
if space.geometry.intersects(point)]
if possible_spaces:
new_space = possible_spaces[0]
print(f"NOTE: {name} moved to lower space {new_space}")
else:
print(f"WARNING: {name} couldn't find a lower space, still in a hole")
return new_space, point