better typing for updates and geometryindexed

This commit is contained in:
Laura Klünder 2025-03-18 20:18:45 +01:00
parent 4d527130e7
commit 3cb1fdeb63
5 changed files with 74 additions and 52 deletions

View file

@ -5,6 +5,7 @@ import time
from contextlib import contextmanager, suppress, nullcontext
from functools import cached_property
from sqlite3 import DatabaseError
from typing import TypeAlias
from django.conf import settings
from django.core.cache import cache
@ -19,6 +20,9 @@ from c3nav.mapdata.utils.cache.changes import GeometryChangeTracker
from c3nav.mapdata.utils.cache.local import per_request_cache
MapUpdateTuple: TypeAlias = tuple[int, int]
class MapUpdate(models.Model):
"""
A map update. created whenever mapdata is changed.
@ -47,7 +51,7 @@ class MapUpdate(models.Model):
self.was_processed = self.processed
@classmethod
def last_update(cls, force=False):
def last_update(cls, force=False) -> MapUpdateTuple:
if not force:
last_update = per_request_cache.get('mapdata:last_update', None)
if last_update is not None:
@ -62,7 +66,7 @@ class MapUpdate(models.Model):
return last_update
@classmethod
def last_processed_update(cls, force=False, lock=True):
def last_processed_update(cls, force=False, lock=True) -> MapUpdateTuple:
if not force:
last_processed_update = per_request_cache.get('mapdata:last_processed_update', None)
if last_processed_update is not None:
@ -77,7 +81,7 @@ class MapUpdate(models.Model):
return last_processed_update
@classmethod
def last_processed_geometry_update(cls, force=False):
def last_processed_geometry_update(cls, force=False) -> MapUpdateTuple:
if not force:
last_processed_geometry_update = per_request_cache.get('mapdata:last_processed_geometry_update', None)
if last_processed_geometry_update is not None:
@ -93,7 +97,7 @@ class MapUpdate(models.Model):
return last_processed_geometry_update
@property
def to_tuple(self):
def to_tuple(self) -> MapUpdateTuple:
return self.pk, int(make_naive(self.datetime).timestamp())
@property
@ -113,7 +117,7 @@ class MapUpdate(models.Model):
return cls.build_cache_key(*cls.last_processed_geometry_update())
@staticmethod
def build_cache_key(pk, timestamp):
def build_cache_key(pk: int, timestamp: int):
return int_to_base36(pk)+'_'+int_to_base36(timestamp)
@classmethod

View file

@ -1,8 +1,12 @@
import operator
import struct
from functools import reduce
from os import PathLike
from typing import Self, Iterator
import numpy as np
from scipy.linalg._decomp_interpolative import NDArray
from shapely import Polygon, MultiPolygon
from c3nav.mapdata.utils.cache.indexed import LevelGeometryIndexed
@ -17,13 +21,13 @@ class AccessRestrictionAffected(LevelGeometryIndexed):
variant_id = 2
variant_name = 'restrictions'
def __init__(self, restrictions=None, **kwargs):
def __init__(self, restrictions: list[int] = None, **kwargs):
super().__init__(**kwargs)
self.restrictions = [] if restrictions is None else restrictions
self.restrictions_lookup = {restriction: i for i, restriction in enumerate(self.restrictions)}
self.restrictions: list[int] = [] if restrictions is None else restrictions
self.restrictions_lookup: dict[int, int] = {restriction: i for i, restriction in enumerate(self.restrictions)}
@classmethod
def _read_metadata(cls, f, kwargs):
def _read_metadata(cls, f, kwargs: dict):
restrictions = list(struct.unpack('<'+'I'*64, f.read(4*64)))
while restrictions and restrictions[-1] == 0:
restrictions.pop()
@ -33,21 +37,21 @@ class AccessRestrictionAffected(LevelGeometryIndexed):
f.write(struct.pack('<'+'I'*64, *self.restrictions, *((0, )*(64-len(self.restrictions)))))
@classmethod
def build(cls, access_restriction_affected):
def build(cls, access_restriction_affected) -> Self:
result = cls()
for restriction, area in access_restriction_affected.items():
result[area.buffer(1)].add(restriction)
return result
@classmethod
def open(cls, filename):
def open(cls, filename: str | bytes | PathLike) -> Self:
try:
instance = super().open(filename)
except FileNotFoundError:
instance = cls(restrictions=[], filename=filename)
return instance
def _get_restriction_index(self, restriction, create=False):
def get_restriction_index(self, restriction: int, create=False) -> int:
i = self.restrictions_lookup.get(restriction)
if create and i is None:
i = len(self.restrictions)
@ -55,7 +59,7 @@ class AccessRestrictionAffected(LevelGeometryIndexed):
self.restrictions.append(restriction)
return i
def __getitem__(self, selector):
def __getitem__(self, selector: tuple[slice, slice] | Polygon | MultiPolygon) -> "AccessRestrictionAffectedCells":
return AccessRestrictionAffectedCells(self, selector)
def __setitem__(self, selector, value):
@ -63,43 +67,44 @@ class AccessRestrictionAffected(LevelGeometryIndexed):
class AccessRestrictionAffectedCells:
def __init__(self, parent: AccessRestrictionAffected, selector):
def __init__(self, parent: AccessRestrictionAffected,
selector: tuple[slice, slice] | Polygon | MultiPolygon):
self.parent = parent
self.selector = selector
self.values = self._get_values()
def _get_values(self):
def _get_values(self) -> NDArray:
return LevelGeometryIndexed.__getitem__(self.parent, self.selector)
def _set(self, values):
def _set(self, values: NDArray):
self.values = values
LevelGeometryIndexed.__setitem__(self.parent, self.selector, values)
def __contains__(self, restriction):
i = self.parent._get_restriction_index(restriction)
def __contains__(self, restriction: int):
i = self.parent.get_restriction_index(restriction)
return (self.values & (2**i)).any()
def add(self, restriction):
def add(self, restriction: int):
from shapely.geometry.base import BaseGeometry
if not isinstance(self.selector, BaseGeometry):
raise TypeError('Can only add restrictions with Geometry based selectors')
# expand array
bounds = self.parent._get_geometry_bounds(self.selector)
bounds = self.parent.get_geometry_bounds(self.selector)
self.parent.fit_bounds(*bounds)
self.values = self._get_values()
i = self.parent._get_restriction_index(restriction, create=True)
i = self.parent.get_restriction_index(restriction, create=True)
self._set(self.values | (2**i))
def discard(self, restriction):
def discard(self, restriction: int):
from shapely.geometry.base import BaseGeometry
if not isinstance(self.selector, BaseGeometry):
raise TypeError('Can only discard restrictions with Geometry based selectors')
i = self.parent._get_restriction_index(restriction)
i = self.parent.get_restriction_index(restriction)
self._set(self.values & ((2**64-1) ^ (2**i)))
def __iter__(self):
def __iter__(self) -> Iterator[int]:
all = reduce(operator.or_, self.values.tolist(), 0)
yield from (restriction for i, restriction in enumerate(self.parent.restrictions) if (all & 2**i))

View file

@ -3,6 +3,7 @@ import os
from django.db.models.signals import m2m_changed, post_delete
from shapely.ops import unary_union
from c3nav.mapdata.models.update import MapUpdateTuple
from c3nav.mapdata.utils.cache.maphistory import MapHistory
from c3nav.mapdata.utils.models import get_submodels
@ -59,7 +60,7 @@ class GeometryChangeTracker:
self._geometries_by_level.setdefault(level_id, []).append(other._get_unary_union(level_id))
self._unary_unions = {}
def save(self, last_update, new_update):
def save(self, last_update: MapUpdateTuple, new_update: MapUpdateTuple):
self.finalize()
for level_id, geometries in self._geometries_by_level.items():

View file

@ -1,5 +1,11 @@
import math
import struct
from os import PathLike
from pathlib import Path
from typing import Self, Optional
from scipy.linalg._decomp_interpolative import NDArray
from shapely import Polygon, MultiPolygon
import numpy as np
@ -22,29 +28,30 @@ class GeometryIndexed:
dtype = np.uint16
variant_id = 0
def __init__(self, resolution=None, x=0, y=0, data=None, filename=None):
def __init__(self, resolution: Optional[int] = None, x: int = 0, y: int = 0,
data: NDArray = None, filename: str | bytes | PathLike = None):
if resolution is None:
from django.conf import settings
resolution = settings.CACHE_RESOLUTION
self.resolution = resolution
self.resolution: int = resolution
self.x = x
self.y = y
self.data = data if data is not None else self._get_empty_array()
self.data: NDArray = data if data is not None else self._get_empty_array()
self.filename = filename
@classmethod
def _get_empty_array(cls):
def _get_empty_array(cls) -> NDArray:
return np.empty((0, 0), dtype=cls.dtype)
@classmethod
def open(cls, filename):
def open(cls, filename: str | bytes | PathLike) -> Self:
with open(filename, 'rb') as f:
instance = cls.read(f)
instance.filename = filename
return instance
@classmethod
def read(cls, f):
def read(cls, f) -> Self:
variant_id, resolution, x, y, width, height = struct.unpack('<BBhhHH', f.read(10))
if variant_id != cls.variant_id:
raise ValueError('variant id does not match')
@ -61,10 +68,10 @@ class GeometryIndexed:
return cls(**kwargs)
@classmethod
def _read_metadata(cls, f, kwargs):
def _read_metadata(cls, f, kwargs: dict):
pass
def save(self, filename=None):
def save(self, filename: str | bytes | PathLike = None):
if filename is None:
filename = self.filename
if filename is None:
@ -81,7 +88,7 @@ class GeometryIndexed:
def _write_metadata(self, f):
pass
def _get_geometry_bounds(self, geometry):
def get_geometry_bounds(self, geometry: Polygon | MultiPolygon) -> tuple[int, int, int, int]:
minx, miny, maxx, maxy = geometry.bounds
return (
int(math.floor(minx / self.resolution)),
@ -90,7 +97,7 @@ class GeometryIndexed:
int(math.ceil(maxy / self.resolution)),
)
def fit_bounds(self, minx, miny, maxx, maxy):
def fit_bounds(self, minx: int, miny: int, maxx: int, maxy: int):
height, width = self.data.shape
if self.data.size:
@ -110,9 +117,10 @@ class GeometryIndexed:
self.x = minx
self.y = miny
def get_geometry_cells(self, geometry, bounds=None):
def get_geometry_cells(self, geometry: Polygon | MultiPolygon,
bounds: Optional[tuple[int, int, int, int]] = None) -> NDArray:
if bounds is None:
bounds = self._get_geometry_bounds(geometry)
bounds = self.get_geometry_bounds(geometry)
minx, miny, maxx, maxy = bounds
height, width = self.data.shape
@ -135,11 +143,11 @@ class GeometryIndexed:
return cells
@property
def bounds(self):
def bounds(self) -> tuple[int, int, int, int]:
height, width = self.data.shape
return self.x, self.y, self.x+width, self.y+height
def __getitem__(self, key):
def __getitem__(self, key: tuple[slice, slice] | Polygon | MultiPolygon) -> int:
if isinstance(key, tuple):
xx, yy = key
@ -158,15 +166,15 @@ class GeometryIndexed:
from shapely.geometry.base import BaseGeometry
if isinstance(key, BaseGeometry):
bounds = self._get_geometry_bounds(key)
bounds = self.get_geometry_bounds(key)
return self.data[self.get_geometry_cells(key, bounds)]
raise TypeError('GeometryIndexed index must be a shapely geometry or tuple, not %s' % type(key).__name__)
def __setitem__(self, key, value):
def __setitem__(self, key: Polygon | MultiPolygon, value: NDArray | int):
from shapely.geometry.base import BaseGeometry
if isinstance(key, BaseGeometry):
bounds = self._get_geometry_bounds(key)
bounds = self.get_geometry_bounds(key)
self.fit_bounds(*bounds)
cells = self.get_geometry_cells(key, bounds)
self.data[cells] = value
@ -198,23 +206,23 @@ class LevelGeometryIndexed(GeometryIndexed):
variant_name = None
@classmethod
def level_filename(cls, level_id, mode):
def level_filename(cls, level_id: int, mode) -> Path:
from django.conf import settings
return settings.CACHE_ROOT / ('%s_%s_level_%d' % (cls.variant_name, mode, level_id))
@classmethod
def open_level(cls, level_id, mode, **kwargs):
def open_level(cls, level_id: int, mode, **kwargs) -> Self:
# noinspection PyArgumentList
return cls.open(cls.level_filename(level_id, mode), **kwargs)
def save_level(self, level_id, mode):
def save_level(self, level_id: int, mode):
# noinspection PyArgumentList
return self.save(self.level_filename(level_id, mode))
cached = LocalContext()
@classmethod
def open_level_cached(cls, level_id, mode):
def open_level_cached(cls, level_id: int, mode) -> Self:
from c3nav.mapdata.models import MapUpdate
cache_key = MapUpdate.current_processed_cache_key()
if getattr(cls.cached, 'key', None) != cache_key:

View file

@ -1,8 +1,12 @@
import struct
from itertools import chain
from os import PathLike
from typing import Optional, Self
from shapely import Polygon, MultiPolygon
import numpy as np
from c3nav.mapdata.models.update import MapUpdateTuple
from c3nav.mapdata.utils.cache.indexed import LevelGeometryIndexed
@ -17,12 +21,12 @@ class MapHistory(LevelGeometryIndexed):
variant_id = 1
variant_name = 'history'
def __init__(self, updates, **kwargs):
def __init__(self, updates: list[MapUpdateTuple], **kwargs):
super().__init__(**kwargs)
self.updates = updates
@classmethod
def _read_metadata(cls, f, kwargs):
def _read_metadata(cls, f, kwargs: dict):
num_updates = struct.unpack('<H', f.read(2))[0]
updates = struct.unpack('<'+'II'*num_updates, f.read(num_updates*8))
updates = list(zip(updates[0::2], updates[1::2]))
@ -33,7 +37,7 @@ class MapHistory(LevelGeometryIndexed):
f.write(struct.pack('<'+'II'*len(self.updates), *chain(*self.updates)))
@classmethod
def open(cls, filename, default_update=None):
def open(cls, filename: str | bytes | PathLike, default_update: Optional[MapUpdateTuple] = None) -> Self:
try:
instance = super().open(filename)
except FileNotFoundError:
@ -44,7 +48,7 @@ class MapHistory(LevelGeometryIndexed):
instance.save()
return instance
def add_geometry(self, geometry, update):
def add_geometry(self, geometry: Polygon | MultiPolygon, update: MapUpdateTuple):
if self.updates[-1] != update:
self.updates.append(update)
@ -63,7 +67,7 @@ class MapHistory(LevelGeometryIndexed):
self.simplify()
super().write(*args, **kwargs)
def composite(self, other, mask_geometry):
def composite(self, other: Self, mask_geometry: Optional[Polygon | MultiPolygon]):
if self.resolution != other.resolution:
raise ValueError('Cannot composite with different resolutions.')
@ -98,7 +102,7 @@ class MapHistory(LevelGeometryIndexed):
self.updates = new_updates
self.simplify()
def last_update(self, minx, miny, maxx, maxy):
def last_update(self, minx: float, miny: float, maxx: float, maxy: float) -> MapUpdateTuple:
cells = self[minx:maxx, miny:maxy]
if cells.size:
return self.updates[cells.max()]