improve locating a bit by using the pointplacementhelper

This commit is contained in:
Laura Klünder 2024-12-29 15:50:13 +01:00
parent 652223085b
commit 8112abad31
4 changed files with 57 additions and 34 deletions

View file

@ -6,7 +6,7 @@ from shapely import distance
from c3nav.mapdata.models import MapUpdate
from c3nav.mapdata.models.geometry.space import RangingBeacon
from c3nav.mapdata.utils.importer import PointImportHelper
from c3nav.mapdata.utils.placement import PointPlacementHelper
from c3nav.mapdata.utils.cache.changes import changed_geometries
from c3nav.mapdata.utils.geometry import unwrap_geom
@ -37,7 +37,7 @@ class Command(BaseCommand):
MapUpdate.objects.create(type='importnoc')
def do_import(self, items: dict[str, NocImportItem]):
import_helper = PointImportHelper()
import_helper = PointPlacementHelper()
beacons_so_far: dict[str, RangingBeacon] = {
**{m.import_tag: m for m in RangingBeacon.objects.filter(import_tag__startswith="noc:",

View file

@ -14,7 +14,7 @@ from c3nav.mapdata.models import MapUpdate, Level
from c3nav.mapdata.models.geometry.space import RangingBeacon
from c3nav.mapdata.utils.cache.changes import changed_geometries
from c3nav.mapdata.utils.geometry import unwrap_geom
from c3nav.mapdata.utils.importer import PointImportHelper
from c3nav.mapdata.utils.placement import PointPlacementHelper
class PocImportItemProperties(BaseModel):
@ -46,7 +46,7 @@ class Command(BaseCommand):
MapUpdate.objects.create(type='importnoc')
def do_import(self, items: list[PocImportItem]):
import_helper = PointImportHelper()
import_helper = PointPlacementHelper()
beacons_so_far: dict[str, RangingBeacon] = {
**{m.import_tag: m for m in RangingBeacon.objects.filter(import_tag__startswith="poc:",

View file

@ -1,11 +1,14 @@
from typing import Optional
from shapely import Point, distance
from shapely.ops import unary_union, nearest_points
from c3nav.mapdata.models import Level, Space
from c3nav.mapdata.utils.geometry import unwrap_geom
from c3nav.routing.router import RouterRestrictionSet
class PointImportHelper:
class PointPlacementHelper:
def __init__(self):
self.spaces_for_level = {}
self.levels = tuple(Level.objects.values_list("pk", flat=True))
@ -14,23 +17,30 @@ class PointImportHelper:
for space in Space.objects.select_related('level').prefetch_related('holes'):
self.spaces_for_level.setdefault(space.level_id, []).append(space)
def get_point_and_space(self, level_id: int, point: Point, name: str):
def get_point_and_space(self, level_id: int, point: Point, name: Optional[str] = None,
restrictions: Optional[RouterRestrictionSet] = None, max_space_distance=1.5):
# determine space
restricted_spaces = restrictions.spaces if restrictions else ()
possible_spaces = [space for space in self.spaces_for_level[level_id]
if space.geometry.intersects(point)]
if space.pk not in restricted_spaces and space.geometry.intersects(point)]
if not possible_spaces:
possible_spaces = [space for space in self.spaces_for_level[level_id]
if distance(unwrap_geom(space.geometry), point) < 1.5]
if (space.pk not in restricted_spaces
and distance(unwrap_geom(space.geometry), point) < max_space_distance)]
if len(possible_spaces) == 1:
new_space = possible_spaces[0]
the_distance = distance(unwrap_geom(new_space.geometry), point)
print(f"SUCCESS: {name} is {the_distance:.02f}m away from {new_space.title}")
if name:
print(f"SUCCESS: {name} is {the_distance:.02f}m away from {new_space.title}")
elif len(possible_spaces) > 1:
new_space = min(possible_spaces, key=lambda s: distance(unwrap_geom(s.geometry), point))
print(f"WARNING: {name} could be in multiple spaces ({possible_spaces}, picking {new_space}, "
f"which is {distance(unwrap_geom(new_space.geometry), point)}m away...")
if name:
print(f"WARNING: {name} could be in multiple spaces ({possible_spaces}, picking {new_space}, "
f"which is {distance(unwrap_geom(new_space.geometry), point)}m away...")
else:
print(f"ERROR: {name} is not within any space on level {level_id} ({point})")
if name:
print(f"ERROR: {name} is not within any space on level {level_id} ({point})")
return None, None
# move point into space if needed
@ -41,9 +51,11 @@ class PointImportHelper:
point = nearest_points(new_space_geometry.buffer(-0.05), point)[0]
elif len(possible_spaces) == 1:
new_space = possible_spaces[0]
print(f"SUCCESS: {name} is in {new_space.title}")
if name:
print(f"SUCCESS: {name} is in {new_space.title}")
else:
print(f"WARNING: {name} could be in multiple spaces, picking one...")
if name:
print(f"WARNING: {name} could be in multiple spaces, picking one...")
new_space = possible_spaces[0]
lower_levels = self.lower_levels_for_level[new_space.level_id]
@ -52,15 +64,18 @@ class PointImportHelper:
if not unary_union([unwrap_geom(h.geometry) for h in new_space.holes.all()]).intersects(point):
# current selected spacae is fine, that's it
break
print(f"NOTE: {name} is in a hole, looking lower...")
if name:
print(f"NOTE: {name} is in a hole, looking lower...")
# find a lower space
possible_spaces = [space for space in self.spaces_for_level[lower_level]
if space.geometry.intersects(point)]
if space.pk not in restricted_spaces and space.geometry.intersects(point)]
if possible_spaces:
new_space = possible_spaces[0]
print(f"NOTE: {name} moved to lower space {new_space}")
if name:
print(f"NOTE: {name} moved to lower space {new_space}")
else:
print(f"WARNING: {name} couldn't find a lower space, still in a hole")
if name:
print(f"WARNING: {name} couldn't find a lower space, still in a hole")
return new_space, point

View file

@ -12,9 +12,11 @@ from annotated_types import Lt
from django.conf import settings
from pydantic.types import NonNegativeInt
from pydantic_extra_types.mac_address import MacAddress
from shapely import Point
from c3nav.mapdata.models import MapUpdate, Space
from c3nav.mapdata.utils.locations import CustomLocation
from c3nav.mapdata.utils.placement import PointPlacementHelper
from c3nav.mesh.utils import get_nodes_and_ranging_beacons
from c3nav.routing.router import Router
from c3nav.routing.schemas import LocateWifiPeerSchema, BeaconMeasurementDataSchema, LocateIBeaconPeerSchema
@ -82,6 +84,7 @@ class Locator:
peer_lookup: dict[TypedIdentifier, int] = field(default_factory=dict)
xyz: np.array = field(default_factory=(lambda: np.empty((0,))))
spaces: dict[int, "LocatorSpace"] = field(default_factory=dict)
placement_helper: Optional[PointPlacementHelper] = None
@classmethod
def rebuild(cls, update, router):
@ -127,6 +130,8 @@ class Locator:
if new_space.points:
self.spaces[space.pk] = new_space
self.placement_helper = PointPlacementHelper()
def get_peer_id(self, identifier: TypedIdentifier, create=False) -> Optional[int]:
peer_id = self.peer_lookup.get(identifier, None)
if peer_id is None and create:
@ -239,6 +244,7 @@ class Locator:
return None
router = Router.load()
restrictions = router.get_restrictions(permissions)
# get visible spaces
best_ap_id = max(scan_data_we_can_use, key=lambda item: item[1].rssi)[0]
@ -261,17 +267,8 @@ class Locator:
the_sum = sum((value.rssi + 90) for peer_id, value in deduplicized_scan_data_in_the_same_room[:3])
level = router.levels[space.level_id]
if level.on_top_of_id:
level = router.levels[level.on_top_of_id]
if not the_sum:
point = space.point
return CustomLocation(
level=level,
x=point.x,
y=point.y,
permissions=permissions,
icon='my_location'
)
else:
x = 0
y = 0
@ -279,13 +276,24 @@ class Locator:
for peer_id, value in deduplicized_scan_data_in_the_same_room[:3]:
x += float(self.peers[peer_id].xyz[0]) * (value.rssi+90) / the_sum
y += float(self.peers[peer_id].xyz[1]) * (value.rssi+90) / the_sum
return CustomLocation(
level=level,
x=x/100,
y=y/100,
permissions=permissions,
icon='my_location'
)
point = Point(x/100, y/100)
new_space, new_point = self.placement_helper.get_point_and_space(
level_id=level.pk, point=point, restrictions=restrictions,
max_space_distance=20,
)
level = router.levels[new_space.level_id]
if level.on_top_of_id:
level = router.levels[level.on_top_of_id]
return CustomLocation(
level=level,
x=x / 100,
y=y / 100,
permissions=permissions,
icon='my_location'
)
def locate_rssi(self, scan_data: ScanData, permissions=None):
router = Router.load()