From 3cb1fdeb6337057e7cb544ab5152a9c1d65fb96c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laura=20Kl=C3=BCnder?= Date: Tue, 18 Mar 2025 20:18:45 +0100 Subject: [PATCH] better typing for updates and geometryindexed --- src/c3nav/mapdata/models/update.py | 14 ++++-- .../mapdata/utils/cache/accessrestrictions.py | 43 +++++++++------- src/c3nav/mapdata/utils/cache/changes.py | 3 +- src/c3nav/mapdata/utils/cache/indexed.py | 50 +++++++++++-------- src/c3nav/mapdata/utils/cache/maphistory.py | 16 +++--- 5 files changed, 74 insertions(+), 52 deletions(-) diff --git a/src/c3nav/mapdata/models/update.py b/src/c3nav/mapdata/models/update.py index 970d7f19..1a1fac59 100644 --- a/src/c3nav/mapdata/models/update.py +++ b/src/c3nav/mapdata/models/update.py @@ -5,6 +5,7 @@ import time from contextlib import contextmanager, suppress, nullcontext from functools import cached_property from sqlite3 import DatabaseError +from typing import TypeAlias from django.conf import settings from django.core.cache import cache @@ -19,6 +20,9 @@ from c3nav.mapdata.utils.cache.changes import GeometryChangeTracker from c3nav.mapdata.utils.cache.local import per_request_cache +MapUpdateTuple: TypeAlias = tuple[int, int] + + class MapUpdate(models.Model): """ A map update. created whenever mapdata is changed. @@ -47,7 +51,7 @@ class MapUpdate(models.Model): self.was_processed = self.processed @classmethod - def last_update(cls, force=False): + def last_update(cls, force=False) -> MapUpdateTuple: if not force: last_update = per_request_cache.get('mapdata:last_update', None) if last_update is not None: @@ -62,7 +66,7 @@ class MapUpdate(models.Model): return last_update @classmethod - def last_processed_update(cls, force=False, lock=True): + def last_processed_update(cls, force=False, lock=True) -> MapUpdateTuple: if not force: last_processed_update = per_request_cache.get('mapdata:last_processed_update', None) if last_processed_update is not None: @@ -77,7 +81,7 @@ class MapUpdate(models.Model): return last_processed_update @classmethod - def last_processed_geometry_update(cls, force=False): + def last_processed_geometry_update(cls, force=False) -> MapUpdateTuple: if not force: last_processed_geometry_update = per_request_cache.get('mapdata:last_processed_geometry_update', None) if last_processed_geometry_update is not None: @@ -93,7 +97,7 @@ class MapUpdate(models.Model): return last_processed_geometry_update @property - def to_tuple(self): + def to_tuple(self) -> MapUpdateTuple: return self.pk, int(make_naive(self.datetime).timestamp()) @property @@ -113,7 +117,7 @@ class MapUpdate(models.Model): return cls.build_cache_key(*cls.last_processed_geometry_update()) @staticmethod - def build_cache_key(pk, timestamp): + def build_cache_key(pk: int, timestamp: int): return int_to_base36(pk)+'_'+int_to_base36(timestamp) @classmethod diff --git a/src/c3nav/mapdata/utils/cache/accessrestrictions.py b/src/c3nav/mapdata/utils/cache/accessrestrictions.py index c64a98bb..59cf3499 100644 --- a/src/c3nav/mapdata/utils/cache/accessrestrictions.py +++ b/src/c3nav/mapdata/utils/cache/accessrestrictions.py @@ -1,8 +1,12 @@ import operator import struct from functools import reduce +from os import PathLike +from typing import Self, Iterator import numpy as np +from scipy.linalg._decomp_interpolative import NDArray +from shapely import Polygon, MultiPolygon from c3nav.mapdata.utils.cache.indexed import LevelGeometryIndexed @@ -17,13 +21,13 @@ class AccessRestrictionAffected(LevelGeometryIndexed): variant_id = 2 variant_name = 'restrictions' - def __init__(self, restrictions=None, **kwargs): + def __init__(self, restrictions: list[int] = None, **kwargs): super().__init__(**kwargs) - self.restrictions = [] if restrictions is None else restrictions - self.restrictions_lookup = {restriction: i for i, restriction in enumerate(self.restrictions)} + self.restrictions: list[int] = [] if restrictions is None else restrictions + self.restrictions_lookup: dict[int, int] = {restriction: i for i, restriction in enumerate(self.restrictions)} @classmethod - def _read_metadata(cls, f, kwargs): + def _read_metadata(cls, f, kwargs: dict): restrictions = list(struct.unpack('<'+'I'*64, f.read(4*64))) while restrictions and restrictions[-1] == 0: restrictions.pop() @@ -33,21 +37,21 @@ class AccessRestrictionAffected(LevelGeometryIndexed): f.write(struct.pack('<'+'I'*64, *self.restrictions, *((0, )*(64-len(self.restrictions))))) @classmethod - def build(cls, access_restriction_affected): + def build(cls, access_restriction_affected) -> Self: result = cls() for restriction, area in access_restriction_affected.items(): result[area.buffer(1)].add(restriction) return result @classmethod - def open(cls, filename): + def open(cls, filename: str | bytes | PathLike) -> Self: try: instance = super().open(filename) except FileNotFoundError: instance = cls(restrictions=[], filename=filename) return instance - def _get_restriction_index(self, restriction, create=False): + def get_restriction_index(self, restriction: int, create=False) -> int: i = self.restrictions_lookup.get(restriction) if create and i is None: i = len(self.restrictions) @@ -55,7 +59,7 @@ class AccessRestrictionAffected(LevelGeometryIndexed): self.restrictions.append(restriction) return i - def __getitem__(self, selector): + def __getitem__(self, selector: tuple[slice, slice] | Polygon | MultiPolygon) -> "AccessRestrictionAffectedCells": return AccessRestrictionAffectedCells(self, selector) def __setitem__(self, selector, value): @@ -63,43 +67,44 @@ class AccessRestrictionAffected(LevelGeometryIndexed): class AccessRestrictionAffectedCells: - def __init__(self, parent: AccessRestrictionAffected, selector): + def __init__(self, parent: AccessRestrictionAffected, + selector: tuple[slice, slice] | Polygon | MultiPolygon): self.parent = parent self.selector = selector self.values = self._get_values() - def _get_values(self): + def _get_values(self) -> NDArray: return LevelGeometryIndexed.__getitem__(self.parent, self.selector) - def _set(self, values): + def _set(self, values: NDArray): self.values = values LevelGeometryIndexed.__setitem__(self.parent, self.selector, values) - def __contains__(self, restriction): - i = self.parent._get_restriction_index(restriction) + def __contains__(self, restriction: int): + i = self.parent.get_restriction_index(restriction) return (self.values & (2**i)).any() - def add(self, restriction): + def add(self, restriction: int): from shapely.geometry.base import BaseGeometry if not isinstance(self.selector, BaseGeometry): raise TypeError('Can only add restrictions with Geometry based selectors') # expand array - bounds = self.parent._get_geometry_bounds(self.selector) + bounds = self.parent.get_geometry_bounds(self.selector) self.parent.fit_bounds(*bounds) self.values = self._get_values() - i = self.parent._get_restriction_index(restriction, create=True) + i = self.parent.get_restriction_index(restriction, create=True) self._set(self.values | (2**i)) - def discard(self, restriction): + def discard(self, restriction: int): from shapely.geometry.base import BaseGeometry if not isinstance(self.selector, BaseGeometry): raise TypeError('Can only discard restrictions with Geometry based selectors') - i = self.parent._get_restriction_index(restriction) + i = self.parent.get_restriction_index(restriction) self._set(self.values & ((2**64-1) ^ (2**i))) - def __iter__(self): + def __iter__(self) -> Iterator[int]: all = reduce(operator.or_, self.values.tolist(), 0) yield from (restriction for i, restriction in enumerate(self.parent.restrictions) if (all & 2**i)) diff --git a/src/c3nav/mapdata/utils/cache/changes.py b/src/c3nav/mapdata/utils/cache/changes.py index eb05acd2..46ba466c 100644 --- a/src/c3nav/mapdata/utils/cache/changes.py +++ b/src/c3nav/mapdata/utils/cache/changes.py @@ -3,6 +3,7 @@ import os from django.db.models.signals import m2m_changed, post_delete from shapely.ops import unary_union +from c3nav.mapdata.models.update import MapUpdateTuple from c3nav.mapdata.utils.cache.maphistory import MapHistory from c3nav.mapdata.utils.models import get_submodels @@ -59,7 +60,7 @@ class GeometryChangeTracker: self._geometries_by_level.setdefault(level_id, []).append(other._get_unary_union(level_id)) self._unary_unions = {} - def save(self, last_update, new_update): + def save(self, last_update: MapUpdateTuple, new_update: MapUpdateTuple): self.finalize() for level_id, geometries in self._geometries_by_level.items(): diff --git a/src/c3nav/mapdata/utils/cache/indexed.py b/src/c3nav/mapdata/utils/cache/indexed.py index ebbf71a3..8a3fb159 100644 --- a/src/c3nav/mapdata/utils/cache/indexed.py +++ b/src/c3nav/mapdata/utils/cache/indexed.py @@ -1,5 +1,11 @@ import math import struct +from os import PathLike +from pathlib import Path +from typing import Self, Optional + +from scipy.linalg._decomp_interpolative import NDArray +from shapely import Polygon, MultiPolygon import numpy as np @@ -22,29 +28,30 @@ class GeometryIndexed: dtype = np.uint16 variant_id = 0 - def __init__(self, resolution=None, x=0, y=0, data=None, filename=None): + def __init__(self, resolution: Optional[int] = None, x: int = 0, y: int = 0, + data: NDArray = None, filename: str | bytes | PathLike = None): if resolution is None: from django.conf import settings resolution = settings.CACHE_RESOLUTION - self.resolution = resolution + self.resolution: int = resolution self.x = x self.y = y - self.data = data if data is not None else self._get_empty_array() + self.data: NDArray = data if data is not None else self._get_empty_array() self.filename = filename @classmethod - def _get_empty_array(cls): + def _get_empty_array(cls) -> NDArray: return np.empty((0, 0), dtype=cls.dtype) @classmethod - def open(cls, filename): + def open(cls, filename: str | bytes | PathLike) -> Self: with open(filename, 'rb') as f: instance = cls.read(f) instance.filename = filename return instance @classmethod - def read(cls, f): + def read(cls, f) -> Self: variant_id, resolution, x, y, width, height = struct.unpack(' tuple[int, int, int, int]: minx, miny, maxx, maxy = geometry.bounds return ( int(math.floor(minx / self.resolution)), @@ -90,7 +97,7 @@ class GeometryIndexed: int(math.ceil(maxy / self.resolution)), ) - def fit_bounds(self, minx, miny, maxx, maxy): + def fit_bounds(self, minx: int, miny: int, maxx: int, maxy: int): height, width = self.data.shape if self.data.size: @@ -110,9 +117,10 @@ class GeometryIndexed: self.x = minx self.y = miny - def get_geometry_cells(self, geometry, bounds=None): + def get_geometry_cells(self, geometry: Polygon | MultiPolygon, + bounds: Optional[tuple[int, int, int, int]] = None) -> NDArray: if bounds is None: - bounds = self._get_geometry_bounds(geometry) + bounds = self.get_geometry_bounds(geometry) minx, miny, maxx, maxy = bounds height, width = self.data.shape @@ -135,11 +143,11 @@ class GeometryIndexed: return cells @property - def bounds(self): + def bounds(self) -> tuple[int, int, int, int]: height, width = self.data.shape return self.x, self.y, self.x+width, self.y+height - def __getitem__(self, key): + def __getitem__(self, key: tuple[slice, slice] | Polygon | MultiPolygon) -> int: if isinstance(key, tuple): xx, yy = key @@ -158,15 +166,15 @@ class GeometryIndexed: from shapely.geometry.base import BaseGeometry if isinstance(key, BaseGeometry): - bounds = self._get_geometry_bounds(key) + bounds = self.get_geometry_bounds(key) return self.data[self.get_geometry_cells(key, bounds)] raise TypeError('GeometryIndexed index must be a shapely geometry or tuple, not %s' % type(key).__name__) - def __setitem__(self, key, value): + def __setitem__(self, key: Polygon | MultiPolygon, value: NDArray | int): from shapely.geometry.base import BaseGeometry if isinstance(key, BaseGeometry): - bounds = self._get_geometry_bounds(key) + bounds = self.get_geometry_bounds(key) self.fit_bounds(*bounds) cells = self.get_geometry_cells(key, bounds) self.data[cells] = value @@ -198,23 +206,23 @@ class LevelGeometryIndexed(GeometryIndexed): variant_name = None @classmethod - def level_filename(cls, level_id, mode): + def level_filename(cls, level_id: int, mode) -> Path: from django.conf import settings return settings.CACHE_ROOT / ('%s_%s_level_%d' % (cls.variant_name, mode, level_id)) @classmethod - def open_level(cls, level_id, mode, **kwargs): + def open_level(cls, level_id: int, mode, **kwargs) -> Self: # noinspection PyArgumentList return cls.open(cls.level_filename(level_id, mode), **kwargs) - def save_level(self, level_id, mode): + def save_level(self, level_id: int, mode): # noinspection PyArgumentList return self.save(self.level_filename(level_id, mode)) cached = LocalContext() @classmethod - def open_level_cached(cls, level_id, mode): + def open_level_cached(cls, level_id: int, mode) -> Self: from c3nav.mapdata.models import MapUpdate cache_key = MapUpdate.current_processed_cache_key() if getattr(cls.cached, 'key', None) != cache_key: diff --git a/src/c3nav/mapdata/utils/cache/maphistory.py b/src/c3nav/mapdata/utils/cache/maphistory.py index 16e20097..f603f07d 100644 --- a/src/c3nav/mapdata/utils/cache/maphistory.py +++ b/src/c3nav/mapdata/utils/cache/maphistory.py @@ -1,8 +1,12 @@ import struct from itertools import chain +from os import PathLike +from typing import Optional, Self +from shapely import Polygon, MultiPolygon import numpy as np +from c3nav.mapdata.models.update import MapUpdateTuple from c3nav.mapdata.utils.cache.indexed import LevelGeometryIndexed @@ -17,12 +21,12 @@ class MapHistory(LevelGeometryIndexed): variant_id = 1 variant_name = 'history' - def __init__(self, updates, **kwargs): + def __init__(self, updates: list[MapUpdateTuple], **kwargs): super().__init__(**kwargs) self.updates = updates @classmethod - def _read_metadata(cls, f, kwargs): + def _read_metadata(cls, f, kwargs: dict): num_updates = struct.unpack(' Self: try: instance = super().open(filename) except FileNotFoundError: @@ -44,7 +48,7 @@ class MapHistory(LevelGeometryIndexed): instance.save() return instance - def add_geometry(self, geometry, update): + def add_geometry(self, geometry: Polygon | MultiPolygon, update: MapUpdateTuple): if self.updates[-1] != update: self.updates.append(update) @@ -63,7 +67,7 @@ class MapHistory(LevelGeometryIndexed): self.simplify() super().write(*args, **kwargs) - def composite(self, other, mask_geometry): + def composite(self, other: Self, mask_geometry: Optional[Polygon | MultiPolygon]): if self.resolution != other.resolution: raise ValueError('Cannot composite with different resolutions.') @@ -98,7 +102,7 @@ class MapHistory(LevelGeometryIndexed): self.updates = new_updates self.simplify() - def last_update(self, minx, miny, maxx, maxy): + def last_update(self, minx: float, miny: float, maxx: float, maxy: float) -> MapUpdateTuple: cells = self[minx:maxx, miny:maxy] if cells.size: return self.updates[cells.max()]