process updates asynchronously

This commit is contained in:
Laura Klünder 2017-11-10 16:32:58 +01:00
parent 5d34790876
commit ea22c7a927
4 changed files with 102 additions and 13 deletions

View file

@ -0,0 +1,22 @@
from django.core.management.base import BaseCommand
from django.utils.formats import date_format
from django.utils.translation import ugettext_lazy as _
from django.utils.translation import ungettext_lazy
from c3nav.mapdata.models import MapUpdate
class Command(BaseCommand):
help = 'process unprocessed map updates'
def handle(self, *args, **options):
updates = MapUpdate.process_updates()
print()
print(ungettext_lazy('%d map update processed.', '%d map updates processed.', len(updates)) % len(updates))
if updates:
print(_('Last processed Update: %(date)s (#%(id)d)') % {
'date': date_format(updates[-1].datetime, 'DATETIME_FORMAT'),
'id': updates[-1].pk,
})

View file

@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.6 on 2017-11-10 14:52
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('mapdata', '0043_auto_20171110_1451'),
]
operations = [
migrations.AddField(
model_name='mapupdate',
name='changed_geometries',
field=models.BinaryField(null=True),
),
migrations.AddField(
model_name='mapupdate',
name='processed',
field=models.BooleanField(default=True),
),
migrations.AlterField(
model_name='mapupdate',
name='processed',
field=models.BooleanField(default=False),
),
]

View file

@ -1,3 +1,4 @@
import pickle
from contextlib import contextmanager
from django.conf import settings
@ -15,6 +16,8 @@ class MapUpdate(models.Model):
datetime = models.DateTimeField(auto_now_add=True, db_index=True)
user = models.ForeignKey(settings.AUTH_USER_MODEL, null=True, on_delete=models.PROTECT)
type = models.CharField(max_length=32)
processed = models.BooleanField(default=False)
changed_geometries = models.BinaryField(null=True)
class Meta:
verbose_name = _('Map update')
@ -22,6 +25,10 @@ class MapUpdate(models.Model):
default_related_name = 'mapupdates'
get_latest_by = 'datetime'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.was_processed = self.processed
@classmethod
def last_update(cls):
last_update = cache.get('mapdata:last_update', None)
@ -29,10 +36,21 @@ class MapUpdate(models.Model):
return last_update
with cls.lock():
last_update = cls.objects.latest()
result = last_update.pk, int(make_naive(last_update.datetime).timestamp())
result = last_update.to_tuple
cache.set('mapdata:last_update', result, 900)
return result
@classmethod
def last_processed_update(cls):
last_processed_update = cache.get('mapdata:last_processed_update', None)
if last_processed_update is not None:
return last_processed_update
with cls.lock():
last_processed_update = cls.objects.filter(processed=True).latest()
result = last_processed_update.to_tuple
cache.set('mapdata:last_processed_update', result, 900)
return result
@property
def to_tuple(self):
return self.pk, int(make_naive(self.datetime).timestamp())
@ -45,6 +63,10 @@ class MapUpdate(models.Model):
def current_cache_key(cls):
return cls.build_cache_key(*cls.last_update())
@classmethod
def current_processed_cache_key(cls):
return cls.build_cache_key(*cls.last_processed_update())
@staticmethod
def build_cache_key(pk, timestamp):
return int_to_base36(pk)+'_'+int_to_base36(timestamp)
@ -55,21 +77,36 @@ class MapUpdate(models.Model):
with transaction.atomic():
yield cls.objects.select_for_update().earliest()
@classmethod
def process_updates(cls):
with cls.lock():
new_updates = tuple(cls.objects.filter(processed=False))
if not new_updates:
return ()
from c3nav.mapdata.models import AltitudeArea
AltitudeArea.recalculate()
from c3nav.mapdata.render.data import LevelRenderData
LevelRenderData.rebuild()
last_unprocessed_update = cls.objects.filter(processed=False).latest().to_tuple
for new_update in new_updates:
pickle.loads(new_update.changed_geometries).save(last_unprocessed_update, new_update.to_tuple)
new_update.processed = True
new_update.save()
cache.set('mapdata:last_processed_update', new_updates[-1].to_tuple, 900)
return new_updates
def save(self, **kwargs):
if self.pk is not None:
if self.pk is not None and (self.was_processed or not self.processed):
raise TypeError
last_map_update = MapUpdate.last_update()
from c3nav.mapdata.models import AltitudeArea
AltitudeArea.recalculate()
from c3nav.mapdata.cache import changed_geometries
self.changed_geometries = pickle.dumps(changed_geometries)
super().save(**kwargs)
from c3nav.mapdata.cache import changed_geometries
changed_geometries.save(last_map_update, self.to_tuple)
from c3nav.mapdata.render.data import LevelRenderData
LevelRenderData.rebuild()
cache.set('mapdata:last_update', self.to_tuple, 900)

View file

@ -292,7 +292,7 @@ class LevelRenderData:
@classmethod
def get(cls, level):
with cls.cache_lock:
cache_key = MapUpdate.current_cache_key()
cache_key = MapUpdate.current_processed_cache_key()
level_pk = str(level.pk if isinstance(level, Level) else level)
if cls.cache_key != cache_key:
cls.cache_key = cache_key