more typehinting in route and router

This commit is contained in:
Laura Klünder 2024-12-17 22:28:52 +00:00
parent 317aa441f4
commit 387fdc62be
4 changed files with 148 additions and 93 deletions

View file

@ -220,6 +220,8 @@ def preview_route(request, slug, slug2):
from c3nav.mapdata.utils.geometry import unwrap_geom
origin = check_location(slug, None)
destination = check_location(slug2, None)
if origin is None or destination is None:
raise Http404()
visible_locations = visible_locations_for_request(request)
try:
route = Router.load().get_route(origin=origin,

View file

@ -1,10 +1,15 @@
# flake8: noqa
from collections import OrderedDict, deque
from dataclasses import dataclass
import typing
import numpy as np
from django.utils.functional import cached_property
from django.utils.translation import gettext_lazy as _
if typing.TYPE_CHECKING:
from c3nav.routing.router import Router
def describe_location(location, locations):
if location.can_describe:
@ -18,7 +23,10 @@ def describe_location(location, locations):
return location
#@dataclass
class Route:
router: "Router"
def __init__(self, router, origin, destination, path_nodes, options,
origin_addition, destination_addition, origin_xyz, destination_xyz,
visible_locations):
@ -41,6 +49,7 @@ class Route:
if self.destination_addition and any(self.destination_addition):
nodes.append(self.destination_addition)
# calculate distances from origin and destination to the origin and destination nodes
if self.origin_xyz is not None:
node = nodes[0][0]
if not hasattr(node, 'xyz'):

View file

@ -2,24 +2,28 @@ import logging
import operator
import pickle
from collections import deque, namedtuple
from dataclasses import dataclass, field
from functools import reduce
from itertools import chain
from typing import Optional
from typing import Optional, TypeVar, Generic, Mapping, Any, Sequence, TypeAlias
import numpy as np
from django.conf import settings
from django.core.cache import cache
from django.utils.functional import cached_property
from django.utils.functional import cached_property, Promise
from shapely import prepared
from shapely.geometry import LineString, Point
from shapely.geometry import LineString, Point, Polygon, MultiPolygon
from shapely.ops import unary_union
from twisted.protocols.amp import Decimal
from c3nav.mapdata.models import AltitudeArea, Area, GraphEdge, Level, LocationGroup, MapUpdate, Space, WayType
from c3nav.mapdata.models.geometry.level import AltitudeAreaPoint
from c3nav.mapdata.models.geometry.space import POI, CrossDescription, LeaveDescription
from c3nav.mapdata.models.locations import CustomLocationProxyMixin
from c3nav.mapdata.models.locations import CustomLocationProxyMixin, Location
from c3nav.mapdata.utils.geometry import assert_multipolygon, get_rings, good_representative_point, unwrap_geom
from c3nav.mapdata.utils.locations import CustomLocation
from c3nav.routing.exceptions import LocationUnreachable, NoRouteFound, NotYetRoutable
from c3nav.routing.models import RouteOptions
from c3nav.routing.route import Route
try:
@ -29,11 +33,16 @@ except ImportError:
logger = logging.getLogger('c3nav')
NodeConnectionsByNode: TypeAlias = dict[int, tuple["RouterNode", "RouterEdge"] | tuple[None, None]]
PointCompatible: TypeAlias = Point | CustomLocation | CustomLocationProxyMixin
EdgeIndex: TypeAlias = tuple[int, int]
class Router:
filename = settings.CACHE_ROOT / 'router'
def __init__(self, levels, spaces, areas, pois, groups, restrictions, nodes, edges, waytypes, graph):
def __init__(self, levels, spaces, areas, pois, groups, restrictions: dict[int, "RouterRestriction"],
nodes, edges, waytypes, graph):
self.levels = levels
self.spaces = spaces
self.areas = areas
@ -134,8 +143,12 @@ class Router:
area_clear_geom = unary_union(tuple(get_rings(subgeom.difference(obstacles_geom))))
if area_clear_geom.is_empty:
continue
area = RouterAltitudeArea(subgeom, area_clear_geom,
area.altitude, area.points)
area = RouterAltitudeArea(
geometry=subgeom,
clear_geometry=area_clear_geom,
altitude=area.altitude,
points=area.points
)
area_nodes = tuple(node for node in space_nodes if area.geometry_prep.intersects(node.point))
area.nodes = set(node.i for node in area_nodes)
for node in area_nodes:
@ -161,21 +174,27 @@ class Router:
# create fallback nodes
if not area.nodes and space_nodes:
fallback_point = good_representative_point(area.clear_geometry)
fallback_node = RouterNode(None, None, fallback_point.x, fallback_point.y,
space.pk, area.get_altitude(fallback_point))
fallback_node = RouterNode(
i=None,
pk=None,
x=fallback_point.x,
y=fallback_point.y,
space=space.pk,
altitude=area.get_altitude(fallback_point)
)
# todo: check waytypes here
for node in space_nodes:
line = LineString([(node.x, node.y), (fallback_node.x, fallback_node.y)])
if line.length < 5 and not clear_geom_prep.intersects(line):
area.fallback_nodes[node.i] = (
fallback_node,
RouterEdge(fallback_node, node, 0)
RouterEdge.create(from_node=fallback_node, to_node=node, waytype=0)
)
if not area.fallback_nodes:
nearest_node = min(space_nodes, key=lambda node: fallback_point.distance(node.point))
area.fallback_nodes[nearest_node.i] = (
fallback_node,
RouterEdge(fallback_node, nearest_node, 0)
RouterEdge.create(from_node=fallback_node, to_node=nearest_node, waytype=0)
)
for poi in space_obj.pois.all():
@ -238,10 +257,15 @@ class Router:
nodes_lookup = {node.pk: node.i for node in nodes}
# collect edges
edges = tuple(RouterEdge(from_node=nodes[nodes_lookup[edge.from_node_id]],
to_node=nodes[nodes_lookup[edge.to_node_id]],
waytype=waytypes_lookup[edge.waytype_id],
access_restriction=edge.access_restriction_id) for edge in GraphEdge.objects.all())
edges = tuple(
RouterEdge.create(
from_node=nodes[nodes_lookup[edge.from_node_id]],
to_node=nodes[nodes_lookup[edge.to_node_id]],
waytype=waytypes_lookup[edge.waytype_id],
access_restriction=edge.access_restriction_id
)
for edge in GraphEdge.objects.all()
)
edges = {(edge.from_node, edge.to_node): edge for edge in edges}
# build graph matrix
@ -295,7 +319,7 @@ class Router:
cls.cached.data = cls.load_nocache(update)
return cls.cached.data
def get_locations(self, location, restrictions):
def get_locations(self, location: Location, restrictions) -> "RouterLocation":
locations = ()
if isinstance(location, Level):
if location.access_restriction_id not in restrictions:
@ -350,7 +374,7 @@ class Router:
raise LocationUnreachable
return result
def space_for_point(self, level, point, restrictions) -> Optional['RouterSpace']:
def space_for_point(self, level: int, point: PointCompatible, restrictions) -> Optional['RouterSpace']:
point = Point(point.x, point.y)
level = self.levels[level]
excluded_spaces = restrictions.spaces if restrictions else ()
@ -366,7 +390,7 @@ class Router:
return None
return min(spaces, key=operator.itemgetter(1))[0]
def altitude_for_point(self, space: int, point: Point) -> float:
def altitude_for_point(self, space: int, point: PointCompatible) -> float:
return self.spaces[space].altitudearea_for_point(point).get_altitude(point)
def describe_custom_location(self, location):
@ -479,12 +503,13 @@ class Router:
predecessors.astype(np.int32).tobytes()), 600)
return distances, predecessors
def get_restrictions(self, permissions):
def get_restrictions(self, permissions: set[int]) -> "RouterRestrictionSet":
return RouterRestrictionSet({
pk: restriction for pk, restriction in self.restrictions.items() if pk not in permissions
})
def get_route(self, origin, destination, permissions, options, visible_locations):
def get_route(self, origin: Location, destination: Location, permissions: set[int],
options: RouteOptions, visible_locations: Mapping[int, Location]):
restrictions = self.get_restrictions(permissions)
# get possible origins and destinations
@ -534,13 +559,15 @@ class Router:
CustomLocationDescription = namedtuple('CustomLocationDescription', ('space', 'altitude',
'areas', 'near_area', 'near_poi', 'nearby'))
# todo: switch to new syntax… bound?
RouterProxiedType = TypeVar('RouterProxiedType')
# todo: make generic
class BaseRouterProxy:
def __init__(self, src):
self.src = src
self.nodes = set()
self.nodes_addition = {}
@dataclass
class BaseRouterProxy(Generic[RouterProxiedType]):
src: RouterProxiedType
nodes: set[int] = field(default_factory=set)
nodes_addition: NodeConnectionsByNode = field(default_factory=set)
@cached_property
def geometry_prep(self):
@ -557,22 +584,20 @@ class BaseRouterProxy:
return getattr(self.src, name)
class RouterLevel(BaseRouterProxy):
def __init__(self, level, spaces=None):
super().__init__(level)
self.spaces = spaces if spaces else set()
@dataclass
class RouterLevel(BaseRouterProxy[Level]):
spaces: set[int] = field(default_factory=set)
class RouterSpace(BaseRouterProxy):
def __init__(self, space, altitudeareas=None):
super().__init__(space)
self.areas = set()
self.pois = set()
self.altitudeareas = altitudeareas if altitudeareas else []
self.leave_descriptions = {}
self.cross_descriptions = {}
@dataclass
class RouterSpace(BaseRouterProxy[Space]):
areas: set[int] = field(default_factory=set)
pois: set[int] = field(default_factory=set)
altitudeareas: list["RouterAltitudeArea"] = field(default_factory=list)
leave_descriptions: dict[int, Promise] = field(default_factory=dict)
cross_descriptions: dict[tuple[int, int], Promise] = field(default_factory=dict)
def altitudearea_for_point(self, point):
def altitudearea_for_point(self, point: PointCompatible):
point = Point(point.x, point.y)
if not self.altitudeareas:
raise LocationUnreachable
@ -612,28 +637,28 @@ class RouterSpace(BaseRouterProxy):
return min(near, key=operator.itemgetter(1))[0], nearby
class RouterArea(BaseRouterProxy):
@dataclass
class RouterArea(BaseRouterProxy[Area]):
pass
class RouterPoint(BaseRouterProxy):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.altitude = None
@dataclass
class RouterPoint(BaseRouterProxy[Point]):
altitude: float | None = None
@cached_property
def xyz(self):
return np.array((self.x, self.y, self.altitude))
@dataclass
class RouterAltitudeArea:
def __init__(self, geometry, clear_geometry, altitude, points):
self.geometry = geometry
self.clear_geometry = clear_geometry
self.altitude = altitude
self.points = points
self.nodes = frozenset()
self.fallback_nodes = {}
geometry: Polygon | MultiPolygon
clear_geometry: Polygon | MultiPolygon
altitude: Decimal
points: Sequence[AltitudeAreaPoint]
nodes: frozenset[int] = field(default_factory=frozenset)
fallback_nodes: NodeConnectionsByNode = field(default_factory=dict)
@cached_property
def geometry_prep(self):
@ -643,11 +668,11 @@ class RouterAltitudeArea:
def clear_geometry_prep(self):
return prepared.prep(self.clear_geometry)
def get_altitude(self, point):
def get_altitude(self, point: PointCompatible):
# noinspection PyTypeChecker,PyCallByClass
return AltitudeArea.get_altitudes(self, (point.x, point.y))[0]
def nodes_for_point(self, point, all_nodes):
def nodes_for_point(self, point: PointCompatible, all_nodes) -> NodeConnectionsByNode:
point = Point(point.x, point.y)
nodes = {}
@ -672,19 +697,26 @@ class RouterAltitudeArea:
return result
@dataclass
class RouterNode:
def __init__(self, i, pk, x, y, space, altitude=None, areas=None):
self.i = i
self.pk = pk
self.x = x
self.y = y
self.space = space
self.altitude = altitude
self.areas = areas if areas else set()
i: int | None
pk: int | None
x: float
y: float
space: int
altitude: float
areas: set[int] = field(default_factory=set)
@classmethod
def from_graph_node(cls, node, i):
return cls(i, node.pk, node.geometry.x, node.geometry.y, node.space_id)
return cls(
i=i,
pk=node.pk,
x=node.geometry.x,
y=node.geometry.y,
space=node.space_id,
altitude=0,
)
@cached_property
def point(self):
@ -695,26 +727,34 @@ class RouterNode:
return np.array((self.x, self.y, self.altitude))
@dataclass
class RouterEdge:
def __init__(self, from_node, to_node, waytype, access_restriction=None, rise=None, distance=None):
self.from_node = from_node.i
self.to_node = to_node.i
self.waytype = waytype
self.access_restriction = access_restriction
if rise is not None:
self.rise = rise
elif to_node.altitude is None or from_node.altitude is None:
self.rise = None
else:
self.rise = (to_node.altitude - from_node.altitude)
self.distance = distance if distance is not None else np.linalg.norm(to_node.xyz - from_node.xyz)
from_node: int
to_node: int
waytype: int
access_restriction: int
rise: float | None
distance: float
@classmethod
def create(cls, from_node: "RouterNode", to_node: "RouterNode", waytype: int,
access_restriction: int | None = None):
return cls(
from_node=from_node.i,
to_node=to_node.i,
waytype=waytype,
access_restriction=access_restriction,
rise=(None if to_node.altitude is None or from_node.altitude is None
else (to_node.altitude - from_node.altitude)),
distance=np.linalg.norm(to_node.xyz - from_node.xyz),
)
@dataclass
class RouterWayType:
def __init__(self, waytype):
self.src = waytype
self.upwards_indices = deque()
self.nonupwards_indices = deque()
src: WayType
upwards_indices: deque[EdgeIndex] = field(default_factory=deque)
nonupwards_indices: deque[EdgeIndex] = field(default_factory=deque)
def __getattr__(self, name):
if name in ('__getstate__', '__setstate__'):
@ -730,12 +770,16 @@ class RouterWayType:
return duration
@dataclass
class RouterLocation:
def __init__(self, locations=()):
self.locations = locations
"""
Describes a Location selected as an origin or destination for a route. This might match multiple locations,
for example if we route to a group, in which case we select the nearest/best specific location.
"""
locations: tuple[RouterPoint]
@cached_property
def nodes(self):
def nodes(self) -> frozenset[int]:
return reduce(operator.or_, (location.nodes for location in self.locations), frozenset())
def get_location_for_node(self, node):
@ -745,23 +789,23 @@ class RouterLocation:
return None
@dataclass
class RouterRestriction:
def __init__(self, spaces=None):
self.spaces = spaces if spaces else set()
self.additional_nodes = set()
self.edges = deque()
spaces: set[int] = field(default_factory=set)
additional_nodes: set[int] = field(default_factory=set)
edges: deque[EdgeIndex] = field(default_factory=deque)
@dataclass
class RouterRestrictionSet:
def __init__(self, restrictions):
self.restrictions = restrictions
restrictions: dict[int, RouterRestriction]
@cached_property
def spaces(self):
def spaces(self) -> frozenset[int]:
return reduce(operator.or_, (restriction.spaces for restriction in self.restrictions.values()), frozenset())
@cached_property
def additional_nodes(self):
def additional_nodes(self) -> frozenset[int]:
return reduce(operator.or_, (restriction.additional_nodes
for restriction in self.restrictions.values()), frozenset())

View file

@ -52,16 +52,16 @@ if settings.METRICS:
from prometheus_client import Counter
def check_location(location: Optional[str], request) -> Optional[SpecificLocation]:
if location is None:
def check_location(location_slug: Optional[str], request) -> Optional[Location]:
if location_slug is None:
return None
location = get_location_by_slug_for_request(location, request)
location = get_location_by_slug_for_request(location_slug, request)
if location is None:
return None
if isinstance(location, LocationRedirect):
location: Location = location.target
result = location.target
if location is None:
return None