team-3/src/c3nav/mapdata/models/geometry/level.py

247 lines
11 KiB
Python
Raw Normal View History

from django.db import models
from django.utils.translation import ugettext_lazy as _
from shapely.geometry import JOIN_STYLE
2017-08-05 14:17:24 +02:00
from shapely.ops import cascaded_union
from c3nav.mapdata.fields import GeometryField
2017-08-05 12:13:15 +02:00
from c3nav.mapdata.models import Level
2017-07-13 23:49:00 +02:00
from c3nav.mapdata.models.access import AccessRestrictionMixin
2017-05-08 16:40:22 +02:00
from c3nav.mapdata.models.geometry.base import GeometryMixin
2017-05-10 18:03:57 +02:00
from c3nav.mapdata.models.locations import SpecificLocation
2017-08-05 14:17:24 +02:00
from c3nav.mapdata.utils.geometry import assert_multilinestring, assert_multipolygon
2017-06-11 14:43:14 +02:00
class LevelGeometryMixin(GeometryMixin):
level = models.ForeignKey('mapdata.Level', on_delete=models.CASCADE, verbose_name=_('level'))
class Meta:
abstract = True
def get_geojson_properties(self, *args, instance=None, **kwargs) -> dict:
result = super().get_geojson_properties(*args, **kwargs)
2017-06-11 14:43:14 +02:00
result['level'] = self.level_id
if hasattr(self, 'get_color'):
color = self.get_color(instance=instance)
if color:
result['color'] = color
if hasattr(self, 'opacity'):
result['opacity'] = self.opacity
2017-05-11 19:36:49 +02:00
return result
2017-06-11 14:43:14 +02:00
def _serialize(self, level=True, **kwargs):
2017-05-11 19:36:49 +02:00
result = super()._serialize(**kwargs)
2017-06-11 14:43:14 +02:00
if level:
result['level'] = self.level_id
return result
2017-06-11 14:43:14 +02:00
class Building(LevelGeometryMixin, models.Model):
"""
The outline of a building on a specific level
"""
geometry = GeometryField('polygon')
class Meta:
verbose_name = _('Building')
verbose_name_plural = _('Buildings')
default_related_name = 'buildings'
2017-06-11 14:43:14 +02:00
class Space(SpecificLocation, LevelGeometryMixin, models.Model):
"""
2017-06-11 14:43:14 +02:00
An accessible space. Shouldn't overlap with spaces on the same level.
"""
2017-05-09 09:36:08 +02:00
geometry = GeometryField('polygon')
outside = models.BooleanField(default=False, verbose_name=_('only outside of building'))
class Meta:
2017-05-09 13:48:55 +02:00
verbose_name = _('Space')
verbose_name_plural = _('Spaces')
default_related_name = 'spaces'
2017-07-13 23:49:00 +02:00
class Door(AccessRestrictionMixin, LevelGeometryMixin, models.Model):
"""
2017-05-09 13:48:55 +02:00
A connection between two spaces
"""
geometry = GeometryField('polygon')
class Meta:
verbose_name = _('Door')
verbose_name_plural = _('Doors')
default_related_name = 'doors'
2017-08-05 11:56:29 +02:00
class AltitudeArea(LevelGeometryMixin, models.Model):
"""
An altitude area
"""
geometry = GeometryField('polygon')
altitude = models.DecimalField(_('altitude'), null=False, max_digits=6, decimal_places=2)
class Meta:
verbose_name = _('Altitude Area')
verbose_name_plural = _('Altitude Areas')
default_related_name = 'altitudeareas'
2017-08-05 14:17:24 +02:00
@classmethod
def recalculate(cls):
# collect location areas
all_areas = []
for level in Level.objects.prefetch_related('buildings', 'doors', 'spaces', 'spaces__columns',
'spaces__obstacles', 'spaces__lineobstacles', 'spaces__holes',
'spaces__stairs', 'spaces__altitudemarkers'):
2017-08-05 14:17:24 +02:00
areas = []
stairs = []
spaces = {}
# collect all accessible areas on this level
2017-08-05 14:17:24 +02:00
buildings_geom = cascaded_union(tuple(building.geometry for building in level.buildings.all()))
for space in level.spaces.all():
if space.outside:
space.geometry = space.geometry.difference(buildings_geom)
spaces[space.pk] = space
area = space.geometry
buffered = space.geometry.buffer(0.0001)
remove = cascaded_union(tuple(c.geometry for c in space.columns.all()) +
tuple(o.geometry for o in space.obstacles.all()) +
tuple(o.buffered_geometry for o in space.lineobstacles.all()) +
tuple(h.geometry for h in space.holes.all()))
areas.extend(assert_multipolygon(space.geometry.difference(remove)))
for stair in space.stairs.all():
substairs = tuple(assert_multilinestring(stair.geometry.intersection(buffered).difference(remove)))
for substair in substairs:
substair.space = space.pk
stairs.extend(substairs)
2017-08-05 14:17:24 +02:00
areas = assert_multipolygon(cascaded_union(areas+list(door.geometry for door in level.doors.all())))
areas = [AltitudeArea(geometry=area, level=level) for area in areas]
space_areas = {space.pk: [] for space in level.spaces.all()}
# assign spaces to areas
2017-08-05 14:17:24 +02:00
for area in areas:
area.spaces = set()
area.connected_to = []
for space in level.spaces.all():
if area.geometry.intersects(space.geometry):
area.spaces.add(space.pk)
space_areas[space.pk].append(area)
2017-08-05 14:17:24 +02:00
# divide areas using stairs
2017-08-05 14:17:24 +02:00
for stair in stairs:
for area in space_areas[stair.space]:
2017-08-05 15:16:00 +02:00
if not stair.intersects(area.geometry):
2017-08-05 14:17:24 +02:00
continue
divided = assert_multipolygon(area.geometry.difference(stair.buffer(0.0001)))
if len(divided) > 2:
raise ValueError
area.geometry = divided[0]
if len(divided) == 2:
new_area = AltitudeArea(geometry=divided[1], level=level)
2017-08-05 15:16:00 +02:00
new_area.spaces = set()
2017-08-05 14:17:24 +02:00
new_area.connected_to = [area]
area.connected_to.append(new_area)
areas.append(new_area)
2017-08-05 15:16:00 +02:00
original_spaces = area.spaces
if len(area.spaces) == 1:
new_area.spaces = area.spaces
space_areas[next(iter(area.spaces))].append(new_area)
else:
# update area spaces
2017-08-05 15:16:00 +02:00
for subarea in (area, new_area):
spaces_before = subarea.spaces
2017-08-05 15:16:00 +02:00
subarea.spaces = set(space for space in original_spaces
if spaces[space].geometry.intersects(subarea.geometry))
for space in spaces_before-subarea.spaces:
space_areas[space].remove(subarea)
for space in subarea.spaces-spaces_before:
space_areas[space].append(subarea)
# update area connections
buffer_area = area.geometry.buffer(0.0005, join_style=JOIN_STYLE.mitre)
buffer_new_area = new_area.geometry.buffer(0.0005, join_style=JOIN_STYLE.mitre)
remove_area_connected_to = []
for other_area in area.connected_to:
if not buffer_area.intersects(other_area.geometry):
new_area.connected_to.append(other_area)
remove_area_connected_to.append(other_area)
other_area.connected_to.remove(area)
other_area.connected_to.append(new_area)
elif other_area != new_area and buffer_new_area.intersects(other_area.geometry):
new_area.connected_to.append(other_area)
other_area.connected_to.append(new_area)
for other_area in remove_area_connected_to:
area.connected_to.remove(other_area)
2017-08-05 14:17:24 +02:00
break
2017-08-05 15:17:15 +02:00
else:
raise ValueError
# give altitudes to areas
for space in level.spaces.all():
for altitudemarker in space.altitudemarkers.all():
for area in space_areas[space.pk]:
if area.geometry.contains(altitudemarker.geometry):
area.altitude = altitudemarker.altitude
break
else:
raise ValueError(space.title)
all_areas.extend(areas)
2017-08-05 16:18:19 +02:00
# give temporary ids to all areas
areas = all_areas
for i, area in enumerate(areas):
area.tmpid = i
2017-08-05 16:22:25 +02:00
for area in areas:
2017-08-05 16:18:19 +02:00
area.connected_to = set(area.tmpid for area in area.connected_to)
2017-08-05 16:22:25 +02:00
for space in space_areas.keys():
space_areas[space] = set(area.tmpid for area in space_areas[space])
2017-08-05 16:18:19 +02:00
# interpolate altitudes
areas_without_altitude = set(area.tmpid for area in areas if area.altitude is None)
while areas_without_altitude:
# find a area without an altitude that is connected
# to one with an altitude to start the chain
chain = []
for tmpid in areas_without_altitude:
area = areas[tmpid]
connected_with_altitude = area.connected_to-areas_without_altitude
if connected_with_altitude:
chain = [next(iter(connected_with_altitude)), tmpid]
current = area
break
else:
# there are no more chains possible
break
# continue chain as long as possible
while True:
connected_with_altitude = (current.connected_to-areas_without_altitude).difference(chain)
if connected_with_altitude:
2017-08-05 16:18:55 +02:00
# interpolate
2017-08-05 16:18:19 +02:00
area = areas[next(iter(connected_with_altitude))]
from_altitude = areas[chain[0]].altitude
delta_altitude = area.altitude-from_altitude
for i, tmpid in enumerate(chain[1:], 1):
areas[tmpid].altitude = from_altitude+delta_altitude*i/len(chain)
areas_without_altitude.difference_update(chain)
break
connected = current.connected_to.difference(chain)
if not connected:
2017-08-05 16:18:55 +02:00
# end of chain
2017-08-05 16:18:19 +02:00
altitude = areas[chain[0]].altitude
for i, tmpid in enumerate(chain[1:], 1):
areas[tmpid].altitude = altitude
areas_without_altitude.difference_update(chain)
break
2017-08-05 16:18:55 +02:00
# continue chain
2017-08-05 16:18:19 +02:00
current = areas[next(iter(connected))]
chain.append(current.tmpid)
print(len(areas_without_altitude))