team-3/src/c3nav/editor/api/geometries.py

328 lines
14 KiB
Python
Raw Normal View History

from dataclasses import dataclass
from itertools import chain
2023-12-11 19:02:19 +01:00
from typing import TYPE_CHECKING, Sequence
from django.db.models import Prefetch, Q
from shapely import prepared
2023-12-03 17:57:15 +01:00
from shapely.ops import unary_union
from c3nav.api.exceptions import API404, APIPermissionDenied
from c3nav.editor.utils import LevelChildEditUtils, SpaceChildEditUtils
from c3nav.mapdata.utils.geometry import unwrap_geom
def space_sorting_func(space):
groups = tuple(space.groups.all())
if not groups:
return (0, 0, 0)
return (1, groups[0].category.priority, groups[0].hierarchy, groups[0].priority)
def _get_geometries_for_one_level(level):
buildings = level.buildings.all()
buildings_geom = unary_union([unwrap_geom(building.geometry) for building in buildings])
spaces = {space.pk: space for space in level.spaces.all()}
holes_geom = []
for space in spaces.values():
if space.outside:
space.geometry = space.geometry.difference(buildings_geom)
columns = [column.geometry for column in space.columns.all()]
if columns:
columns_geom = unary_union([unwrap_geom(column.geometry) for column in space.columns.all()])
space.geometry = space.geometry.difference(columns_geom)
holes = [unwrap_geom(hole.geometry) for hole in space.holes.all()]
if holes:
space_holes_geom = unary_union(holes)
holes_geom.append(space_holes_geom.intersection(unwrap_geom(space.geometry)))
space.geometry = space.geometry.difference(space_holes_geom)
for building in buildings:
building.original_geometry = building.geometry
if holes_geom:
holes_geom = unary_union(holes_geom)
holes_geom_prep = prepared.prep(holes_geom)
for obj in buildings:
if holes_geom_prep.intersects(unwrap_geom(obj.geometry)):
obj.geometry = obj.geometry.difference(holes_geom)
results = []
results.extend(buildings)
for door in level.doors.all():
results.append(door)
results.extend(sorted(spaces.values(), key=space_sorting_func))
return results
2023-12-11 19:02:19 +01:00
if TYPE_CHECKING:
from c3nav.mapdata.models import Level
@dataclass(slots=True)
class LevelsForLevel:
levels: Sequence[int] # IDs of all levels to render for this level, in order, including the level itself
levels_on_top: Sequence[int] # IDs of levels that are on top of this level (on_top_of field)
levels_under: Sequence[int] # IDs of the level below this level plus levels on top of it (on_top_of field)
@classmethod
def for_level(cls, request, level: "Level", special_if_on_top=False): # add typing
# noinspection PyPep8Naming
Level = request.changeset.wrap_model('Level')
levels_under = ()
levels_on_top = ()
lower_level = level.lower(Level).first()
primary_levels = (level,) + ((lower_level,) if lower_level else ())
secondary_levels = Level.objects.filter(on_top_of__in=primary_levels).values_list('pk', 'on_top_of')
if lower_level:
levels_under = tuple(pk for pk, on_top_of in secondary_levels if on_top_of == lower_level.pk)
if True:
levels_on_top = tuple(pk for pk, on_top_of in secondary_levels if on_top_of == level.pk)
levels = tuple(chain([level.pk], levels_under, levels_on_top))
if special_if_on_top and level.on_top_of_id is not None:
levels = tuple(chain([level.pk], levels_on_top))
levels_under = (level.on_top_of_id, )
levels_on_top = ()
return cls(
levels=levels,
levels_under=levels_under,
levels_on_top=levels_on_top,
)
def area_sorting_func(area):
groups = tuple(area.groups.all())
if not groups:
return (0, 0, 0)
return (1, groups[0].category.priority, groups[0].hierarchy, groups[0].priority)
def conditional_geojson(obj, update_cache_key_match):
if update_cache_key_match and not obj._affected_by_changeset:
return obj.get_geojson_key()
result = obj.to_geojson(instance=obj)
result['properties']['changed'] = obj._affected_by_changeset
return result
# noinspection PyPep8Naming
def get_level_geometries_result(request, level_id: int, update_cache_key: str, update_cache_key_match: True):
Level = request.changeset.wrap_model('Level')
Space = request.changeset.wrap_model('Space')
Column = request.changeset.wrap_model('Column')
Hole = request.changeset.wrap_model('Hole')
AltitudeMarker = request.changeset.wrap_model('AltitudeMarker')
Building = request.changeset.wrap_model('Building')
Door = request.changeset.wrap_model('Door')
LocationGroup = request.changeset.wrap_model('LocationGroup')
WifiMeasurement = request.changeset.wrap_model('WifiMeasurement')
RangingBeacon = request.changeset.wrap_model('RangingBeacon')
try:
level = Level.objects.filter(Level.q_for_request(request)).get(pk=level_id)
except Level.DoesNotExist:
raise API404('Level not found')
edit_utils = LevelChildEditUtils(level, request) # todo: what's happening here?
if not edit_utils.can_access_child_base_mapdata:
raise APIPermissionDenied()
levels_for_level = LevelsForLevel.for_level(request, level)
# don't prefetch groups for now as changesets do not yet work with m2m-prefetches
levels = Level.objects.filter(pk__in=levels_for_level.levels).filter(Level.q_for_request(request))
2023-12-19 14:48:11 +01:00
graphnodes_qs = request.changeset.wrap_model('GraphNode').objects.all()
levels = levels.prefetch_related(
Prefetch('spaces', Space.objects.filter(Space.q_for_request(request)).only(
'geometry', 'level', 'outside'
)),
Prefetch('doors', Door.objects.filter(Door.q_for_request(request)).only('geometry', 'level')),
Prefetch('spaces__columns', Column.objects.filter(
Q(access_restriction__isnull=True) | ~Column.q_for_request(request)
).only('geometry', 'space')),
Prefetch('spaces__groups', LocationGroup.objects.only(
'color', 'category', 'priority', 'hierarchy', 'category__priority', 'category__allow_spaces'
)),
Prefetch('buildings', Building.objects.only('geometry', 'level')),
Prefetch('spaces__holes', Hole.objects.only('geometry', 'space')),
Prefetch('spaces__altitudemarkers', AltitudeMarker.objects.only('geometry', 'space')),
Prefetch('spaces__wifi_measurements', WifiMeasurement.objects.only('geometry', 'space')),
Prefetch('spaces__ranging_beacons', RangingBeacon.objects.only('geometry', 'space')),
2023-12-19 14:48:11 +01:00
Prefetch('spaces__graphnodes', graphnodes_qs)
)
levels = {s.pk: s for s in levels}
level = levels[level.pk]
levels_under = [levels[pk] for pk in levels_for_level.levels_under]
levels_on_top = [levels[pk] for pk in levels_for_level.levels_on_top]
2023-12-19 14:48:11 +01:00
# todo: permissions?
graphnodes = tuple(chain(*(space.graphnodes.all()
for space in chain(*(level.spaces.all() for level in levels.values())))))
graphnodes_lookup = {node.pk: node for node in graphnodes}
2023-12-19 14:48:11 +01:00
graphedges = request.changeset.wrap_model('GraphEdge').objects.all()
graphedges = graphedges.filter(Q(from_node__in=graphnodes) | Q(to_node__in=graphnodes))
graphedges = graphedges.select_related('waytype')
# this is faster because we only deserialize graphnode geometries once
2023-12-19 14:48:11 +01:00
missing_graphnodes = graphnodes_qs.filter(pk__in=set(chain(*((edge.from_node_id, edge.to_node_id)
for edge in graphedges))))
graphnodes_lookup.update({node.pk: node for node in missing_graphnodes})
for edge in graphedges:
edge._from_node_cache = graphnodes_lookup[edge.from_node_id]
edge._to_node_cache = graphnodes_lookup[edge.to_node_id]
2023-12-19 14:48:11 +01:00
graphedges = [edge for edge in graphedges if edge.from_node.space_id != edge.to_node.space_id]
results = chain(
*(_get_geometries_for_one_level(level) for level in levels_under),
_get_geometries_for_one_level(level),
*(_get_geometries_for_one_level(level) for level in levels_on_top),
*(space.altitudemarkers.all() for space in level.spaces.all()),
*(space.wifi_measurements.all() for space in level.spaces.all()),
*(space.ranging_beacons.all() for space in level.spaces.all()),
2023-12-19 14:48:11 +01:00
graphedges,
graphnodes,
)
return list(chain(
[('update_cache_key', update_cache_key)],
(conditional_geojson(obj, update_cache_key_match) for obj in results)
))
def get_space_geometries_result(request, space_id: int, update_cache_key: str, update_cache_key_match: bool):
Space = request.changeset.wrap_model('Space')
Area = request.changeset.wrap_model('Area')
POI = request.changeset.wrap_model('POI')
Door = request.changeset.wrap_model('Door')
LocationGroup = request.changeset.wrap_model('LocationGroup')
space_q_for_request = Space.q_for_request(request)
qs = Space.objects.filter(space_q_for_request)
try:
space = qs.select_related('level', 'level__on_top_of').get(pk=space_id)
except Space.DoesNotExist:
raise API404('space not found')
level = space.level
edit_utils = SpaceChildEditUtils(space, request)
if not edit_utils.can_access_child_base_mapdata:
raise APIPermissionDenied
if request.user_permissions.can_access_base_mapdata:
doors = [door for door in level.doors.filter(Door.q_for_request(request)).all()
if unwrap_geom(door.geometry).intersects(unwrap_geom(space.geometry))]
doors_space_geom = unary_union(
[unwrap_geom(door.geometry) for door in doors] +
[unwrap_geom(space.geometry)]
)
levels_for_level = LevelsForLevel.for_level(request, level.primary_level, special_if_on_top=True)
other_spaces = Space.objects.filter(space_q_for_request, level__pk__in=levels_for_level.levels).only(
'geometry', 'level'
).prefetch_related(
Prefetch('groups', LocationGroup.objects.only(
'color', 'category', 'priority', 'hierarchy', 'category__priority', 'category__allow_spaces'
).filter(color__isnull=False))
)
space = next(s for s in other_spaces if s.pk == space.pk)
other_spaces = [s for s in other_spaces
if s.geometry.intersects(doors_space_geom) and s.pk != space.pk]
all_other_spaces = other_spaces
other_spaces_lower = [s for s in other_spaces if s.level_id in levels_for_level.levels_under]
other_spaces_upper = [s for s in other_spaces if s.level_id in levels_for_level.levels_on_top]
other_spaces = [s for s in other_spaces if s.level_id == level.pk]
space.bounds = True
# deactivated for performance reasons
buildings = level.buildings.all()
# buildings_geom = unary_union([building.geometry for building in buildings])
# for other_space in other_spaces:
# if other_space.outside:
# other_space.geometry = other_space.geometry.difference(buildings_geom)
for other_space in chain(other_spaces, other_spaces_lower, other_spaces_upper):
other_space.opacity = 0.4
other_space.color = '#ffffff'
for building in buildings:
building.opacity = 0.5
else:
buildings = []
doors = []
other_spaces = []
other_spaces_lower = []
other_spaces_upper = []
all_other_spaces = []
# todo: permissions
if request.user_permissions.can_access_base_mapdata:
graph_nodes = request.changeset.wrap_model('GraphNode').objects.all()
graph_nodes = graph_nodes.filter((Q(space__in=all_other_spaces)) | Q(space__pk=space.pk))
space_graph_nodes = tuple(node for node in graph_nodes if node.space_id == space.pk)
graph_edges = request.changeset.wrap_model('GraphEdge').objects.all()
space_graphnodes_ids = tuple(node.pk for node in space_graph_nodes)
graph_edges = graph_edges.filter(Q(from_node__pk__in=space_graphnodes_ids) |
2023-12-11 19:02:19 +01:00
Q(to_node__pk__in=space_graphnodes_ids))
graph_edges = graph_edges.select_related('from_node', 'to_node', 'waytype').only(
'from_node__geometry', 'to_node__geometry', 'waytype__color'
)
else:
graph_nodes = []
graph_edges = []
areas = space.areas.filter(Area.q_for_request(request)).only(
'geometry', 'space'
).prefetch_related(
Prefetch('groups', LocationGroup.objects.order_by(
'-category__priority', '-hierarchy', '-priority'
).only(
'color', 'category', 'priority', 'hierarchy', 'category__priority', 'category__allow_areas'
))
)
for area in areas:
area.opacity = 0.5
areas = sorted(areas, key=area_sorting_func)
results = chain(
buildings,
other_spaces_lower,
doors,
other_spaces,
[space],
areas,
space.holes.all().only('geometry', 'space'),
space.stairs.all().only('geometry', 'space'),
space.ramps.all().only('geometry', 'space'),
space.obstacles.all().only('geometry', 'space', 'color'),
space.lineobstacles.all().only('geometry', 'width', 'space', 'color'),
space.columns.all().only('geometry', 'space'),
space.altitudemarkers.all().only('geometry', 'space'),
space.wifi_measurements.all().only('geometry', 'space'),
space.ranging_beacons.all().only('geometry', 'space'),
space.pois.filter(POI.q_for_request(request)).only('geometry', 'space').prefetch_related(
Prefetch('groups', LocationGroup.objects.only(
'color', 'category', 'priority', 'hierarchy', 'category__priority', 'category__allow_pois'
).filter(color__isnull=False))
),
other_spaces_upper,
graph_edges,
graph_nodes
)
return list(chain(
[('update_cache_key', update_cache_key)],
(conditional_geojson(obj, update_cache_key_match) for obj in results)
))