use redis lock in process_updates if possible
This commit is contained in:
parent
22b1cdd198
commit
c1c5dde682
1 changed files with 28 additions and 2 deletions
|
@ -2,6 +2,7 @@ import logging
|
||||||
import os
|
import os
|
||||||
import pickle
|
import pickle
|
||||||
from contextlib import contextmanager, suppress
|
from contextlib import contextmanager, suppress
|
||||||
|
from sqlite3 import DatabaseError
|
||||||
|
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.core.cache import cache
|
from django.core.cache import cache
|
||||||
|
@ -84,12 +85,37 @@ class MapUpdate(models.Model):
|
||||||
def _changed_geometries_filename(self):
|
def _changed_geometries_filename(self):
|
||||||
return os.path.join(settings.CACHE_ROOT, 'changed_geometries', 'update_%d.pickle' % self.pk)
|
return os.path.join(settings.CACHE_ROOT, 'changed_geometries', 'update_%d.pickle' % self.pk)
|
||||||
|
|
||||||
|
class ProcessUpdatesAlreadyRunning(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
@contextmanager
|
||||||
|
def get_updates_to_process(cls):
|
||||||
|
queryset = cls.objects.filter(processed=False)
|
||||||
|
with transaction.atomic():
|
||||||
|
if settings.HAS_REDIS:
|
||||||
|
import redis
|
||||||
|
lock_aquired = None
|
||||||
|
lock = redis.Redis().lock('mapupdate:process_updates_lock')
|
||||||
|
try:
|
||||||
|
have_lock = lock.acquire(blocking=False, timeout=1800)
|
||||||
|
if not have_lock:
|
||||||
|
raise cls.ProcessUpdatesAlreadyRunning
|
||||||
|
yield tuple(queryset)
|
||||||
|
finally:
|
||||||
|
if lock_aquired:
|
||||||
|
lock.release()
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
yield tuple(queryset.select_for_update(nowait=True))
|
||||||
|
except DatabaseError:
|
||||||
|
raise cls.ProcessUpdatesAlreadyRunning
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def process_updates(cls):
|
def process_updates(cls):
|
||||||
logger = logging.getLogger('c3nav')
|
logger = logging.getLogger('c3nav')
|
||||||
|
|
||||||
with transaction.atomic():
|
with cls.get_updates_to_process() as new_updates:
|
||||||
new_updates = tuple(cls.objects.filter(processed=False).select_for_update(nowait=True))
|
|
||||||
if not new_updates:
|
if not new_updates:
|
||||||
return ()
|
return ()
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue