team-3/src/c3nav/mapdata/models/geometry/level.py
2024-08-18 21:48:17 +02:00

677 lines
30 KiB
Python

import logging
from collections import deque, namedtuple
from decimal import Decimal
from itertools import chain, combinations
from operator import attrgetter, itemgetter
from typing import Sequence, Optional
import numpy as np
from django.core.validators import MinValueValidator
from django.db import models
from django.db.models import CheckConstraint, Q
from django.urls import reverse
from django.utils.functional import cached_property
from django.utils.text import format_lazy
from django.utils.translation import gettext_lazy as _
from pydantic import Field as APIField
from scipy.interpolate._rbfinterp import RBFInterpolator
from shapely import prepared
from shapely.affinity import scale
from shapely.geometry import JOIN_STYLE, LineString, MultiPolygon, Point
from shapely.geometry.polygon import orient
from django_pydantic_field import SchemaField
from shapely.ops import unary_union
from c3nav.api.schema import BaseSchema
from c3nav.mapdata.fields import GeometryField, I18nField
from c3nav.mapdata.grid import grid
from c3nav.mapdata.models import Level
from c3nav.mapdata.models.access import AccessRestrictionMixin
from c3nav.mapdata.models.geometry.base import GeometryMixin
from c3nav.mapdata.models.locations import SpecificLocation
from c3nav.mapdata.utils.cache.changes import changed_geometries
from c3nav.mapdata.utils.geometry import (assert_multilinestring, assert_multipolygon, clean_cut_polygon,
cut_polygon_with_line, unwrap_geom)
class LevelGeometryMixin(GeometryMixin):
level = models.ForeignKey('mapdata.Level', on_delete=models.CASCADE, verbose_name=_('level'))
class Meta:
abstract = True
def get_geojson_properties(self, *args, instance=None, **kwargs) -> dict:
result = super().get_geojson_properties(*args, **kwargs)
result['level'] = self.level_id
if hasattr(self, 'get_color'):
from c3nav.mapdata.render.theme import ColorManager
color = self.get_color(ColorManager.for_theme(None), instance=instance)
if color:
result['color'] = color
if hasattr(self, 'opacity'):
result['opacity'] = self.opacity
return result
def _serialize(self, level=True, **kwargs):
result = super()._serialize(**kwargs)
if level:
result['level'] = self.level_id
return result
def details_display(self, **kwargs):
result = super().details_display(**kwargs)
result['display'].insert(3, (
_('Level'),
{
'id': self.level_id,
'slug': self.level.get_slug(),
'title': self.level.title,
'can_search': self.level.can_search,
},
))
result['level'] = self.level_id
return result
@property
def subtitle(self):
base_subtitle = super().subtitle
level = getattr(self, '_level_cache', None)
if level is not None:
return format_lazy(_('{category}, {level}'),
category=base_subtitle,
level=level.title)
return base_subtitle
def register_change(self, force=False):
force = force or self.all_geometry_changed
if force or self.geometry_changed:
changed_geometries.register(self.level_id, self.geometry if force else self.get_changed_geometry())
def register_delete(self):
changed_geometries.register(self.level_id, self.geometry)
@classmethod
def q_for_request(cls, request, prefix='', allow_none=False):
return (
super().q_for_request(request, prefix=prefix, allow_none=allow_none) &
Level.q_for_request(request, prefix=prefix+'level__', allow_none=allow_none)
)
def save(self, *args, **kwargs):
self.register_change()
super().save(*args, **kwargs)
class Building(LevelGeometryMixin, models.Model):
"""
The outline of a building on a specific level
"""
geometry = GeometryField('polygon')
class Meta:
verbose_name = _('Building')
verbose_name_plural = _('Buildings')
default_related_name = 'buildings'
class Space(LevelGeometryMixin, SpecificLocation, models.Model):
"""
An accessible space. Shouldn't overlap with spaces on the same level.
"""
geometry = GeometryField('polygon')
height = models.DecimalField(_('height'), max_digits=6, decimal_places=2, null=True, blank=True,
validators=[MinValueValidator(Decimal('0'))])
outside = models.BooleanField(default=False, verbose_name=_('only outside of building'))
enter_description = I18nField(_('Enter description'), blank=True, fallback_language=None)
base_mapdata_accessible = models.BooleanField(default=False,
verbose_name=_('always accessible (overwrites base mapdata setting)'))
class Meta:
verbose_name = _('Space')
verbose_name_plural = _('Spaces')
default_related_name = 'spaces'
def _serialize(self, geometry=True, **kwargs):
result = super()._serialize(geometry=geometry, **kwargs)
result['outside'] = self.outside
result['height'] = None if self.height is None else float(str(self.height))
return result
@property
def grid_square(self):
if "geometry" in self.get_deferred_fields():
return None
return grid.get_squares_for_bounds(self.geometry.bounds) or ''
def details_display(self, editor_url=True, **kwargs):
result = super().details_display(**kwargs)
result['display'].extend([
(_('height'), self.height),
(_('outside only'), _('Yes') if self.outside else _('No')),
])
if editor_url:
result['editor_url'] = reverse('editor.spaces.detail', kwargs={'level': self.level_id, 'pk': self.pk})
return result
class Door(LevelGeometryMixin, AccessRestrictionMixin, models.Model):
"""
A connection between two spaces
"""
geometry = GeometryField('polygon')
class Meta:
verbose_name = _('Door')
verbose_name_plural = _('Doors')
default_related_name = 'doors'
class ItemWithValue:
def __init__(self, obj, func):
self.obj = obj
self._func = func
@cached_property
def value(self):
return self._func()
class AltitudeAreaPoint(BaseSchema):
coordinates: tuple[float, float] = APIField(
example=[1, 2.5]
)
altitude: float
RampConnectedTo = namedtuple('RampConnectedTo', ('area', 'intersections'))
class AltitudeArea(LevelGeometryMixin, models.Model):
"""
An altitude area
"""
geometry: MultiPolygon = GeometryField('multipolygon')
altitude = models.DecimalField(_('altitude'), null=True, max_digits=6, decimal_places=2)
points: Sequence[AltitudeAreaPoint] = SchemaField(schema=list[AltitudeAreaPoint], null=True)
constraints = (
CheckConstraint(check=(Q(points__isnull=True, altitude__isnull=False) |
Q(points__isnull=False, altitude__isnull=True)),
name="altitudearea_needs_precisely_one_of_altitude_or_points"),
)
class Meta:
verbose_name = _('Altitude Area')
verbose_name_plural = _('Altitude Areas')
default_related_name = 'altitudeareas'
ordering = ('altitude', )
def __str__(self):
return f'<Altitudearea #{self.pk} // Level #{self.level_id}, Bounds: {self.geometry.bounds}>'
def get_altitudes(self, points):
points = np.asanyarray(points).reshape((-1, 2))
if self.altitude is not None:
return np.full((points.shape[0],), fill_value=float(self.altitude))
if len(self.points) == 1:
raise ValueError
max_altitude = max(p.altitude for p in self.points)
min_altitude = min(p.altitude for p in self.points)
if len(self.points) == 2:
slope = np.array(self.points[1].coordinates) - np.array(self.points[0].coordinates)
distances = (
(np.sum(((points - np.array(self.points[0].coordinates)) * slope), axis=1)
/ (slope ** 2).sum()).clip(0, 1)
)
altitudes = self.points[0].altitude + distances*(self.points[1].altitude-self.points[0].altitude)
else:
altitudes = RBFInterpolator(
np.array([p.coordinates for p in self.points]),
np.array([p.altitude for p in self.points])
)(points)
return np.clip(altitudes, a_min=min_altitude, a_max=max_altitude)
@classmethod
def recalculate(cls):
# collect location areas
all_areas: list[AltitudeArea] = [] # all non-ramp altitude areas of the entire map
all_ramps: list[AltitudeArea] = [] # all ramp altitude areas of the entire map
space_areas: dict[int, list[AltitudeArea]] = {} # all non-ramp altitude areas present in the given space
space_ramps: dict[int, list[AltitudeArea]] = {} # all ramp altitude areas present in the given space
spaces: dict[int, list[Space]] = {} # all spaces by space id
levels = Level.objects.prefetch_related('buildings', 'doors', 'spaces', 'spaces__columns',
'spaces__obstacles', 'spaces__lineobstacles', 'spaces__holes',
'spaces__stairs', 'spaces__ramps',
'spaces__altitudemarkers__groundaltitude')
logger = logging.getLogger('c3nav')
for level in levels:
areas = [] # all altitude areas on this level that aren't ramps
ramps = [] # all altitude areas on this level that are ramps
stairs = [] # all stairs on this level
# collect all accessible areas on this level
buildings_geom = unary_union(tuple(unwrap_geom(building.geometry) for building in level.buildings.all()))
for space in level.spaces.all():
spaces[space.pk] = space
space.orig_geometry = space.geometry
if space.outside:
space.geometry = space.geometry.difference(buildings_geom)
space_accessible = space.geometry.difference(
unary_union(
tuple(unwrap_geom(c.geometry) for c in space.columns.all() if c.access_restriction_id is None) +
tuple(unwrap_geom(o.geometry) for o in space.obstacles.all() if o.altitude == 0) +
tuple(o.buffered_geometry for o in space.lineobstacles.all() if o.altitude == 0) +
tuple(unwrap_geom(h.geometry) for h in space.holes.all()))
)
space_ramps_geom = unary_union(tuple(unwrap_geom(r.geometry) for r in space.ramps.all()))
areas.append(space_accessible.difference(space_ramps_geom))
for geometry in assert_multipolygon(space_accessible.intersection(space_ramps_geom)):
ramp = AltitudeArea(geometry=geometry, level=level)
ramp.geometry_prep = prepared.prep(geometry)
ramp.space = space.pk
ramp.markers = []
ramps.append(ramp)
space_ramps.setdefault(space.pk, []).append(ramp)
areas = tuple(orient(polygon) for polygon in assert_multipolygon(
unary_union(areas+list(unwrap_geom(door.geometry) for door in level.doors.all()))
))
# collect all stairs on this level
for space in level.spaces.all():
space_buffer = space.geometry.buffer(0.001, join_style=JOIN_STYLE.mitre)
for stair in space.stairs.all():
stairs.extend(assert_multilinestring(
stair.geometry.intersection(space_buffer)
))
# divide areas using stairs
for stair in stairs:
areas = cut_polygon_with_line(areas, stair)
# create altitudearea objects
areas = [AltitudeArea(geometry=clean_cut_polygon(area), level=level)
for area in areas]
# prepare area geometries
for area in areas:
area.geometry_prep = prepared.prep(area.geometry)
# assign spaces to areas
space_areas.update({space.pk: [] for space in level.spaces.all()})
for area in areas:
area.spaces = set()
area.geometry_prep = prepared.prep(unwrap_geom(area.geometry))
for space in level.spaces.all():
if area.geometry_prep.intersects(unwrap_geom(space.geometry)):
area.spaces.add(space.pk)
space_areas[space.pk].append(area)
# give altitudes to areas
for space in level.spaces.all():
for altitudemarker in space.altitudemarkers.select_related('groundaltitude').all():
for area in space_areas[space.pk]:
if area.geometry_prep.contains(unwrap_geom(altitudemarker.geometry)):
area.altitude = altitudemarker.altitude
break
else:
for ramp in space_ramps[space.pk]:
if ramp.geometry_prep.contains(unwrap_geom(altitudemarker.geometry)):
ramp.markers.append(altitudemarker)
break
else:
logger.error(
_('AltitudeMarker #%(marker_id)d in Space #%(space_id)d on Level %(level_label)s '
'is not placed in an accessible area') % {'marker_id': altitudemarker.pk,
'space_id': space.pk,
'level_label': level.short_label})
# determine altitude area connections
for area in areas:
area.connected_to = []
for area, other_area in combinations(areas, 2):
if area.geometry_prep.intersects(other_area.geometry):
area.connected_to.append(other_area)
other_area.connected_to.append(area)
# determine ramp connections
for ramp in ramps:
ramp.connected_to = []
buffered = ramp.geometry.buffer(0.001)
for area in areas:
if area.geometry_prep.intersects(buffered):
intersections = []
for area_polygon in assert_multipolygon(area.geometry):
for ring in chain([area_polygon.exterior], area_polygon.interiors):
if ring.intersects(buffered):
intersections.append(ring.intersection(buffered))
ramp.connected_to.append(RampConnectedTo(area, intersections))
num_altitudes = len(ramp.connected_to) + len(ramp.markers)
if num_altitudes != 2:
if num_altitudes == 0:
logger.warning('A ramp in space #%d has no altitudes!' % ramp.space)
elif num_altitudes == 1:
logger.warning('A ramp in space #%d has only one altitude!' % ramp.space)
# add areas to global areas
all_areas.extend(areas)
all_ramps.extend(ramps)
# give temporary ids to all areas
areas = all_areas
ramps = all_ramps
for i, area in enumerate(areas):
area.tmpid = i
for area in areas:
area.connected_to = set(area.tmpid for area in area.connected_to)
for space in space_areas.keys():
space_areas[space] = set(area.tmpid for area in space_areas[space])
areas_without_altitude = set(area.tmpid for area in areas if area.altitude is None)
# interpolate altitudes
logger.info('Interpolating altitudes...')
areas_with_altitude = [i for i in range(len(areas)) if i not in areas_without_altitude]
for i, tmpid in enumerate(areas_with_altitude):
areas[tmpid].i = i
csgraph = np.zeros((len(areas), len(areas)), dtype=bool)
for area in areas:
for connected_tmpid in area.connected_to:
csgraph[area.tmpid, connected_tmpid] = True
repeat = True
from scipy.sparse.csgraph._shortest_path import dijkstra
while repeat:
repeat = False
# noinspection PyTupleAssignmentBalance
distances, predecessors = dijkstra(csgraph, directed=False, return_predecessors=True, unweighted=True)
np_areas_with_altitude = np.array(areas_with_altitude, dtype=np.uint32)
relevant_distances = distances[np_areas_with_altitude[:, None], np_areas_with_altitude]
# noinspection PyTypeChecker
for from_i, to_i in np.argwhere(np.logical_and(relevant_distances < np.inf, relevant_distances > 1)):
from_area = areas[areas_with_altitude[from_i]]
to_area = areas[areas_with_altitude[to_i]]
if from_area.altitude == to_area.altitude:
continue
path = [to_area.tmpid]
while path[-1] != from_area.tmpid:
path.append(predecessors[from_area.tmpid, path[-1]])
from_altitude = from_area.altitude
delta_altitude = (to_area.altitude-from_altitude)/(len(path)-1)
if set(path[1:-1]).difference(areas_without_altitude):
continue
for i, tmpid in enumerate(reversed(path[1:-1]), start=1):
area = areas[tmpid]
area.altitude = Decimal(from_altitude+delta_altitude*i).quantize(Decimal('1.00'))
areas_without_altitude.discard(tmpid)
area.i = len(areas_with_altitude)
areas_with_altitude.append(tmpid)
for from_tmpid, to_tmpid in zip(path[:-1], path[1:]):
csgraph[from_tmpid, to_tmpid] = False
csgraph[to_tmpid, from_tmpid] = False
repeat = True
# remaining areas: copy altitude from connected areas if any
repeat = True
while repeat:
repeat = False
for tmpid in tuple(areas_without_altitude):
area = areas[tmpid]
connected_with_altitude = area.connected_to-areas_without_altitude
if connected_with_altitude:
area.altitude = areas[next(iter(connected_with_altitude))].altitude
areas_without_altitude.discard(tmpid)
repeat = True
# remaining areas which belong to a room that has an altitude somewhere
for contained_areas in space_areas.values():
contained_areas_with_altitude = contained_areas - areas_without_altitude
contained_areas_without_altitude = contained_areas - contained_areas_with_altitude
if contained_areas_with_altitude and contained_areas_without_altitude:
altitude_areas = {}
for tmpid in contained_areas_with_altitude:
area = areas[tmpid]
altitude_areas.setdefault(area.altitude, []).append(area.geometry)
for altitude in altitude_areas.keys():
altitude_areas[altitude] = unary_union(altitude_areas[altitude])
for tmpid in contained_areas_without_altitude:
area = areas[tmpid]
area.altitude = min(altitude_areas.items(), key=lambda aa: aa[1].distance(area.geometry))[0]
areas_without_altitude.difference_update(contained_areas_without_altitude)
# last fallback: level base_altitude
for tmpid in areas_without_altitude:
area = areas[tmpid]
area.altitude = area.level.base_altitude
# prepare per-level operations
level_areas = {}
for area in areas:
level_areas.setdefault(area.level, set()).add(area.tmpid)
# make sure there is only one altitude area per altitude per level
for level in levels:
areas_by_altitude = {}
for tmpid in level_areas.get(level, []):
area = areas[tmpid]
areas_by_altitude.setdefault(area.altitude, []).append(area.geometry)
level_areas[level] = [AltitudeArea(level=level, geometry=unary_union(geometries), altitude=altitude)
for altitude, geometries in areas_by_altitude.items()]
# renumber joined areas
areas = list(chain(*(a for a in level_areas.values())))
for i, area in enumerate(areas):
area.tmpid = i
# finalize ramps
for ramp in ramps:
if not ramp.connected_to:
for area in space_areas[ramp.space]:
ramp.altitude = areas[area].altitude
break
else:
ramp.altitude = ramp.level.base_altitude
continue
if len(ramp.connected_to) == 1:
ramp.altitude = ramp.connected_to[0].area.altitude
continue
points = []
for connected_to in ramp.connected_to:
for intersection in connected_to.intersections:
points.extend([AltitudeAreaPoint(coordinates=coords, altitude=float(connected_to.area.altitude))
for coords in intersection.coords])
points.extend([AltitudeAreaPoint(coordinates=marker.geometry.coords, altitude=float(marker.altitude))
for marker in ramp.markers])
ramp.points = points
ramp.tmpid = len(areas)
areas.append(ramp)
level_areas[ramp.level].append(ramp)
#
# we have altitude areas, but they only cover accessible space for now
# however, we need obstacles to be part of altitude areas too.
# this is where we do that.
#
for level in levels:
logger.info('Assign remaining space (%s)...' % level.title)
for space in level.spaces.all():
space.geometry = space.orig_geometry
buildings_geom = unary_union(tuple(unwrap_geom(b.geometry) for b in level.buildings.all()))
doors_geom = unary_union(tuple(unwrap_geom(d.geometry) for d in level.doors.all()))
space_geom = unary_union(tuple((unwrap_geom(s.geometry)
if not s.outside
else s.geometry.difference(buildings_geom))
for s in level.spaces.all()))
# accessible area on this level is doors + spaces - holes
accessible_area = unary_union((doors_geom, space_geom))
for space in level.spaces.all():
accessible_area = accessible_area.difference(space.geometry.intersection(
unary_union(tuple(unwrap_geom(h.geometry) for h in space.holes.all()))
))
# areas mean altitude areas (including ramps) here
our_areas = level_areas.get(level, [])
for area in our_areas:
area.orig_geometry = area.geometry
area.orig_geometry_prep = prepared.prep(area.geometry)
area.polygons_to_add = deque()
stairs = []
for space in level.spaces.all():
space_geom = space.geometry
if space.outside:
space_geom = space_geom.difference(buildings_geom)
space_geom_prep = prepared.prep(unwrap_geom(space_geom))
holes_geom = unary_union(tuple(unwrap_geom(h.geometry) for h in space.holes.all()))
# remaining_space means remaining space (=obstacles) that still needs to be added to altitude areas
remaining_space = (
tuple(unwrap_geom(o.geometry) for o in space.obstacles.all()) +
tuple(o.buffered_geometry for o in space.lineobstacles.all())
)
# make sure to remove everything outside the space the obstacles are in as well as holes
remaining_space = tuple(g.intersection(unwrap_geom(space_geom)).difference(holes_geom)
for g in remaining_space
if space_geom_prep.intersects(unwrap_geom(g)))
# we need this to be a list of simple normal polygons
remaining_space = tuple(chain(*(
assert_multipolygon(g) for g in remaining_space if not g.is_empty
)))
if not remaining_space:
# if there are no remaining spaces? great, we're done here.
continue
cuts = []
for cut in chain(*(assert_multilinestring(stair.geometry) for stair in space.stairs.all()),
(ramp.geometry.exterior for ramp in space.ramps.all())):
for coord1, coord2 in zip(tuple(cut.coords)[:-1], tuple(cut.coords)[1:]):
line = space_geom.intersection(LineString([coord1, coord2]))
if line.is_empty:
continue
factor = (line.length + 2) / line.length
line = scale(line, xfact=factor, yfact=factor)
centroid = line.centroid
line = min(assert_multilinestring(space_geom.intersection(line)),
key=lambda line_: line_.centroid.distance(centroid), default=None)
cuts.append(scale(line, xfact=1.01, yfact=1.01))
remaining_space = tuple(
orient(polygon) for polygon in remaining_space
)
for cut in cuts:
remaining_space = tuple(chain(*(cut_polygon_with_line(geom, cut)
for geom in remaining_space)))
remaining_space = MultiPolygon(remaining_space)
for polygon in assert_multipolygon(remaining_space):
polygon = clean_cut_polygon(polygon).buffer(0)
buffered = polygon.buffer(0.001)
center = polygon.centroid
touches = tuple(ItemWithValue(area, lambda: buffered.intersection(area.orig_geometry).area)
for area in our_areas
if area.orig_geometry_prep.intersects(buffered))
if len(touches) == 1:
area = touches[0].obj
elif touches:
min_touches = sum((t.value for t in touches), 0)/4
area = max(touches, key=lambda item: (
item.value > min_touches,
item.obj.points is not None,
(max(p.altitude for p in item.obj.points)
if item.obj.altitude is None else item.obj.altitude),
item.value
)).obj
else:
area = min(our_areas,
key=lambda a: a.orig_geometry.distance(center)-(0 if a.points is None else 0.6))
area.polygons_to_add.append(polygon)
for i_area, area in enumerate(our_areas):
if area.polygons_to_add:
area.geometry = unary_union((area.geometry.buffer(0), *area.polygons_to_add))
del area.polygons_to_add
for level in levels:
level_areas[level] = set(area.tmpid for area in level_areas.get(level, []))
# save to database
areas_to_save = set(range(len(areas)))
all_candidates = AltitudeArea.objects.select_related('level')
for candidate in all_candidates:
candidate.area = candidate.geometry.area
candidate.geometry_prep = prepared.prep(unwrap_geom(candidate.geometry))
all_candidates = sorted(all_candidates, key=attrgetter('area'), reverse=True)
num_modified = 0
num_deleted = 0
num_created = 0
field = AltitudeArea._meta.get_field('geometry')
for candidate in all_candidates:
new_area = None
if candidate.points is None:
for tmpid in level_areas.get(candidate.level, set()):
area = areas[tmpid]
if area.points is None and area.altitude == candidate.altitude:
new_area = area
break
else:
potential_areas = [areas[tmpid] for tmpid in level_areas.get(candidate.level, set())]
potential_areas = [area for area in potential_areas
if ((candidate.altitude, set(p.altitude for p in (candidate.points or ()))) ==
(area.altitude, set(p.altitude for p in (area.points or ()))))]
potential_areas = [(area, area.geometry.intersection(unwrap_geom(candidate.geometry)).area)
for area in potential_areas
if candidate.geometry_prep.intersects(unwrap_geom(area.geometry))]
if potential_areas:
new_area = max(potential_areas, key=itemgetter(1))[0]
if new_area is None:
candidate.delete()
num_deleted += 1
continue
if not field.get_final_value(new_area.geometry).equals_exact(unwrap_geom(candidate.geometry), 0.00001):
num_modified += 1
candidate.geometry = new_area.geometry
candidate.altitude = new_area.altitude
candidate.points = new_area.points
candidate.save()
areas_to_save.discard(new_area.tmpid)
level_areas[new_area.level].discard(new_area.tmpid)
for tmpid in areas_to_save:
num_created += 1
areas[tmpid].save()
logger = logging.getLogger('c3nav')
logger.info(_('%d altitude areas built.') % len(areas))
logger.info(_('%(num_modified)d modified, %(num_deleted)d deleted, %(num_created)d created.') %
{'num_modified': num_modified, 'num_deleted': num_deleted, 'num_created': num_created})