an experiment. lets don't use threading locks, hope it helps with performance

This commit is contained in:
Laura Klünder 2023-12-07 17:28:13 +01:00
parent 2e09b286f4
commit 372936fcad
8 changed files with 88 additions and 77 deletions

View file

@ -1,6 +1,5 @@
import operator import operator
import pickle import pickle
import threading
from collections import deque from collections import deque
from itertools import chain from itertools import chain
@ -17,6 +16,11 @@ from c3nav.mapdata.utils.cache import AccessRestrictionAffected, MapHistory
from c3nav.mapdata.utils.cache.package import CachePackage from c3nav.mapdata.utils.cache.package import CachePackage
from c3nav.mapdata.utils.geometry import get_rings, unwrap_geom from c3nav.mapdata.utils.geometry import get_rings, unwrap_geom
try:
from asgiref.local import Local as LocalContext
except ImportError:
from threading import local as LocalContext
empty_geometry_collection = GeometryCollection() empty_geometry_collection = GeometryCollection()
@ -292,9 +296,7 @@ class LevelRenderData:
package.save_all() package.save_all()
cached = {} cached = LocalContext()
cache_key = None
cache_lock = threading.Lock()
@staticmethod @staticmethod
def _level_filename(pk): def _level_filename(pk):
@ -304,22 +306,21 @@ class LevelRenderData:
def get(cls, level): def get(cls, level):
# get the current render data from local variable if no new processed mapupdate exists. # get the current render data from local variable if no new processed mapupdate exists.
# this is much faster than any other possible cache # this is much faster than any other possible cache
with cls.cache_lock: cache_key = MapUpdate.current_processed_cache_key()
cache_key = MapUpdate.current_processed_cache_key() level_pk = str(level.pk if isinstance(level, Level) else level)
level_pk = str(level.pk if isinstance(level, Level) else level) if getattr(cls.cached, 'cache_key', None) != cache_key:
if cls.cache_key != cache_key: cls.cached.key = cache_key
cls.cache_key = cache_key cls.cached.data = {}
cls.cached = {} else:
else: result = cls.cached.data.get(level_pk, None)
result = cls.cached.get(level_pk, None) if result is not None:
if result is not None: return result
return result
pk = level.pk if isinstance(level, Level) else level pk = level.pk if isinstance(level, Level) else level
result = pickle.load(open(cls._level_filename(pk), 'rb')) result = pickle.load(open(cls._level_filename(pk), 'rb'))
cls.cached[level_pk] = result cls.cached.data[level_pk] = result
return result return result
def save(self, pk): def save(self, pk):
return pickle.dump(self, open(self._level_filename(pk), 'wb')) return pickle.dump(self, open(self._level_filename(pk), 'wb'))

View file

@ -1,9 +1,13 @@
import math import math
import struct import struct
import threading
import numpy as np import numpy as np
try:
from asgiref.local import Local as LocalContext
except ImportError:
from threading import local as LocalContext
class GeometryIndexed: class GeometryIndexed:
# binary format (everything little-endian): # binary format (everything little-endian):
@ -207,23 +211,20 @@ class LevelGeometryIndexed(GeometryIndexed):
# noinspection PyArgumentList # noinspection PyArgumentList
return self.save(self.level_filename(level_id, mode)) return self.save(self.level_filename(level_id, mode))
cached = {} cached = LocalContext()
cache_key = None
cache_lock = threading.Lock()
@classmethod @classmethod
def open_level_cached(cls, level_id, mode): def open_level_cached(cls, level_id, mode):
with cls.cache_lock: from c3nav.mapdata.models import MapUpdate
from c3nav.mapdata.models import MapUpdate cache_key = MapUpdate.current_processed_cache_key()
cache_key = MapUpdate.current_processed_cache_key() if getattr(cls.cached, 'cache_key', None) != cache_key:
if cls.cache_key != cache_key: cls.cached.key = cache_key
cls.cache_key = cache_key cls.cached.data = {}
cls.cached = {} else:
else: result = cls.cached.data.get((level_id, mode), None)
result = cls.cached.get((level_id, mode), None) if result is not None:
if result is not None: return result
return result
result = cls.open_level(level_id, mode) result = cls.open_level(level_id, mode)
cls.cached[(level_id, mode)] = result cls.cached.data[(level_id, mode)] = result
return result return result

View file

@ -12,6 +12,7 @@ class NoneFromCache:
class LocalCacheProxy: class LocalCacheProxy:
# django cache, buffered using a LRU cache # django cache, buffered using a LRU cache
# only usable for stuff that never changes, obviously # only usable for stuff that never changes, obviously
# todo: ensure thread-safety, compatible with async + daphne etc
def __init__(self, maxsize=128): def __init__(self, maxsize=128):
self._maxsize = maxsize self._maxsize = maxsize
self._mapupdate = None self._mapupdate = None

View file

@ -1,15 +1,19 @@
import os import os
import struct import struct
import threading
from collections import namedtuple from collections import namedtuple
from io import BytesIO from io import BytesIO
from tarfile import TarFile, TarInfo from tarfile import TarFile, TarInfo
from typing import BinaryIO from typing import BinaryIO, Self
from pyzstd import CParameter, ZstdError, ZstdFile from pyzstd import CParameter, ZstdError, ZstdFile
from c3nav.mapdata.utils.cache import AccessRestrictionAffected, GeometryIndexed, MapHistory from c3nav.mapdata.utils.cache import AccessRestrictionAffected, GeometryIndexed, MapHistory
try:
from asgiref.local import Local as LocalContext
except ImportError:
from threading import local as LocalContext
ZSTD_MAGIC_NUMBER = b"\x28\xb5\x2f\xfd" ZSTD_MAGIC_NUMBER = b"\x28\xb5\x2f\xfd"
CachePackageLevel = namedtuple('CachePackageLevel', ('history', 'restrictions')) CachePackageLevel = namedtuple('CachePackageLevel', ('history', 'restrictions'))
@ -109,23 +113,20 @@ class CachePackage:
package = settings.CACHE_ROOT / 'package.tar' package = settings.CACHE_ROOT / 'package.tar'
return cls.read(package.open('rb')) return cls.read(package.open('rb'))
cached = None cached = LocalContext()
cache_key = None
cache_lock = threading.Lock()
@classmethod @classmethod
def open_cached(cls): def open_cached(cls) -> Self:
with cls.cache_lock: from c3nav.mapdata.models import MapUpdate
from c3nav.mapdata.models import MapUpdate cache_key = MapUpdate.current_processed_cache_key()
cache_key = MapUpdate.current_processed_cache_key() if getattr(cls.cached, 'cache_key', None) != cache_key:
if cls.cache_key != cache_key: cls.cached.key = cache_key
cls.cache_key = cache_key cls.cached.data = None
cls.cached = None
if cls.cached is None: if cls.cached.data is None:
cls.cached = cls.open() cls.cached.data = cls.open()
return cls.cached return cls.cached.data
def bounds_valid(self, minx, miny, maxx, maxy): def bounds_valid(self, minx, miny, maxx, maxy):
return (minx <= self.bounds[2] and maxx >= self.bounds[0] and return (minx <= self.bounds[2] and maxx >= self.bounds[0] and

View file

@ -1,6 +1,5 @@
import operator import operator
import pickle import pickle
import threading
from dataclasses import dataclass, field from dataclasses import dataclass, field
from functools import reduce from functools import reduce
from pprint import pprint from pprint import pprint
@ -17,6 +16,11 @@ from c3nav.mapdata.utils.locations import CustomLocation
from c3nav.routing.router import Router from c3nav.routing.router import Router
from c3nav.routing.schemas import LocateRequestPeerSchema from c3nav.routing.schemas import LocateRequestPeerSchema
try:
from asgiref.local import Local as LocalContext
except ImportError:
from threading import local as LocalContext
BSSID: TypeAlias = str BSSID: TypeAlias = str
@ -130,18 +134,18 @@ class Locator:
def load_nocache(cls, update): def load_nocache(cls, update):
return pickle.load(open(cls.build_filename(update), 'rb')) return pickle.load(open(cls.build_filename(update), 'rb'))
cached = None cached = LocalContext()
cache_update = None
cache_lock = threading.Lock() class NoUpdate:
pass
@classmethod @classmethod
def load(cls): def load(cls):
from c3nav.mapdata.models import MapUpdate from c3nav.mapdata.models import MapUpdate
update = MapUpdate.last_processed_update() update = MapUpdate.last_processed_update()
if cls.cache_update != update: if getattr(cls.cached, 'update', cls.NoUpdate) != update:
with cls.cache_lock: cls.cached.update = update
cls.cache_update = update cls.cached.data = cls.load_nocache(update)
cls.cached = cls.load_nocache(update)
return cls.cached return cls.cached
def convert_raw_scan_data(self, raw_scan_data: list[LocateRequestPeerSchema]) -> ScanData: def convert_raw_scan_data(self, raw_scan_data: list[LocateRequestPeerSchema]) -> ScanData:

View file

@ -1,4 +1,3 @@
import threading
from collections import OrderedDict from collections import OrderedDict
from django import forms from django import forms
@ -10,6 +9,11 @@ from django.utils.translation import gettext_lazy as _
from c3nav.mapdata.models import MapUpdate, WayType from c3nav.mapdata.models import MapUpdate, WayType
try:
from asgiref.local import Local as LocalContext
except ImportError:
from threading import local as LocalContext
class RouteOptions(models.Model): class RouteOptions(models.Model):
user = models.OneToOneField(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, primary_key=True) user = models.OneToOneField(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, primary_key=True)
@ -20,9 +24,7 @@ class RouteOptions(models.Model):
verbose_name_plural = _('Route options') verbose_name_plural = _('Route options')
default_related_name = 'routeoptions' default_related_name = 'routeoptions'
fields_cached = None fields_cached = LocalContext()
fields_cache_key = None
fields_cache_lock = threading.Lock()
@classmethod @classmethod
def build_fields(cls): def build_fields(cls):
@ -58,11 +60,10 @@ class RouteOptions(models.Model):
@classmethod @classmethod
def get_fields(cls): def get_fields(cls):
cache_key = MapUpdate.current_cache_key() cache_key = MapUpdate.current_cache_key()
if cls.fields_cache_key != cache_key: if getattr(cls.fields_cached, 'cache_key', None) != cache_key:
with cls.fields_cache_lock: cls.fields_cached.key = cache_key
cls.fields_cache_key = cache_key cls.fields_cached.data = cls.build_fields()
cls.fields_cached = cls.build_fields() return cls.fields_cached.data
return cls.fields_cached
@staticmethod @staticmethod
def get_cache_key(pk): def get_cache_key(pk):

View file

@ -1,7 +1,6 @@
import logging import logging
import operator import operator
import pickle import pickle
import threading
from collections import deque, namedtuple from collections import deque, namedtuple
from functools import reduce from functools import reduce
from itertools import chain from itertools import chain
@ -24,6 +23,11 @@ from c3nav.mapdata.utils.locations import CustomLocation
from c3nav.routing.exceptions import LocationUnreachable, NoRouteFound, NotYetRoutable from c3nav.routing.exceptions import LocationUnreachable, NoRouteFound, NotYetRoutable
from c3nav.routing.route import Route from c3nav.routing.route import Route
try:
from asgiref.local import Local as LocalContext
except ImportError:
from threading import local as LocalContext
logger = logging.getLogger('c3nav') logger = logging.getLogger('c3nav')
@ -278,18 +282,18 @@ class Router:
def load_nocache(cls, update): def load_nocache(cls, update):
return pickle.load(open(cls.build_filename(update), 'rb')) return pickle.load(open(cls.build_filename(update), 'rb'))
cached = None cached = LocalContext()
cache_update = None
cache_lock = threading.Lock() class NoUpdate:
pass
@classmethod @classmethod
def load(cls): def load(cls):
from c3nav.mapdata.models import MapUpdate from c3nav.mapdata.models import MapUpdate
update = MapUpdate.last_processed_update() update = MapUpdate.last_processed_update()
if cls.cache_update != update: if getattr(cls.cached, 'update', cls.NoUpdate) != update:
with cls.cache_lock: cls.cached.update = update
cls.cache_update = update cls.cached.data = cls.load_nocache(update)
cls.cached = cls.load_nocache(update)
return cls.cached return cls.cached
def get_locations(self, location, restrictions): def get_locations(self, location, restrictions):

View file

@ -199,8 +199,6 @@ class TileServer:
self.cache_package = pickle.load(f) self.cache_package = pickle.load(f)
return self.cache_package return self.cache_package
cache_lock = multiprocessing.Lock()
@property @property
def cache(self): def cache(self):
cache = self.get_cache_client() cache = self.get_cache_client()