team-3/src/c3nav/editor/models/changeset.py

323 lines
12 KiB
Python
Raw Normal View History

from collections import OrderedDict
from itertools import chain
2017-06-12 14:52:08 +02:00
from django.apps import apps
from django.conf import settings
2017-06-29 15:06:14 +02:00
from django.core.cache import cache
from django.db import models
from django.db.models import Max, Q
2017-06-13 15:31:54 +02:00
from django.urls import reverse
from django.utils.translation import ugettext_lazy as _
from django.utils.translation import ungettext_lazy
from c3nav.editor.models.changedobject import ChangedObject
from c3nav.editor.utils import is_created_pk
from c3nav.editor.wrappers import ModelWrapper
2017-06-21 13:54:00 +02:00
from c3nav.mapdata.models import LocationSlug
from c3nav.mapdata.models.locations import LocationRedirect
2017-06-22 21:33:22 +02:00
from c3nav.mapdata.utils.models import get_submodels
class ChangeSet(models.Model):
created = models.DateTimeField(auto_now_add=True, verbose_name=_('created'))
2017-06-12 16:59:57 +02:00
author = models.ForeignKey(settings.AUTH_USER_MODEL, null=True, on_delete=models.PROTECT, verbose_name=_('Author'))
title = models.CharField(max_length=100, default='', verbose_name=_('Title'))
description = models.TextField(max_length=1000, default='', verbose_name=_('Description'))
session_id = models.CharField(unique=True, null=True, max_length=32)
proposed = models.DateTimeField(null=True, verbose_name=_('proposed'))
2017-06-29 17:01:44 +02:00
assigned_to = models.ForeignKey(settings.AUTH_USER_MODEL, null=True, on_delete=models.PROTECT,
related_name='assigned_changesets', verbose_name=_('assigned to'))
applied = models.DateTimeField(null=True, verbose_name=_('applied'))
class Meta:
verbose_name = _('Change Set')
verbose_name_plural = _('Change Sets')
default_related_name = 'changesets'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.changed_objects = None
self.created_objects = {}
self.updated_existing = {}
self.deleted_existing = {}
2017-06-16 19:19:54 +02:00
self.m2m_added = {}
self.m2m_removed = {}
2017-06-29 15:06:14 +02:00
self.last_change_cache = None
2017-06-21 13:31:41 +02:00
"""
Get Changesets for Request/Session/User
"""
@classmethod
def qs_base(cls, hide_applied=True):
qs = cls.objects.select_related('author')
if hide_applied:
qs = qs.filter(applied__isnull=True)
return qs
@classmethod
def qs_for_request(cls, request):
2017-06-21 18:00:12 +02:00
"""
Returns a base QuerySet to get only changesets the current user is allowed to see
"""
2017-06-21 13:31:41 +02:00
qs = cls.qs_base()
if request.user.is_authenticated:
qs = qs.filter(author=request.user)
else:
qs = qs.filter(author__isnull=True)
return qs
@classmethod
def get_for_request(cls, request):
2017-06-21 18:00:12 +02:00
"""
Get the changeset for the current request.
If a changeset is associated with the session id, it will be returned.
Otherwise, if the user is authenticated, the last created queryset
for this user will be returned and the session id will be added to it.
If both fails, an empty unsaved changeset will be returned which will
be automatically saved when a change is added to it.
In any case, the default autor for changes added to the queryset during
this request will be set to the current user.
"""
2017-06-21 13:31:41 +02:00
qs = cls.qs_for_request(request)
if request.session.session_key is not None:
changeset = qs.filter(session_id=request.session.session_key).first()
if changeset is not None:
if changeset.author_id is None and request.user.is_authenticated:
changeset.author = request.user
changeset.save()
return changeset
new_changeset = cls()
new_changeset.request = request
if request.session.session_key is None:
request.session.save()
2017-06-21 13:31:41 +02:00
if request.user.is_authenticated:
changeset = qs.filter(Q(author=request.user)).order_by('-created').first()
if changeset is not None:
changeset.session_id = request.session.session_key
changeset.save()
return changeset
new_changeset.author = request.user
new_changeset.session_id = request.session.session_key
return new_changeset
"""
Wrap Objects
"""
def wrap_model(self, model):
if isinstance(model, str):
model = apps.get_model('mapdata', model)
assert isinstance(model, type) and issubclass(model, models.Model)
return ModelWrapper(self, model)
2017-06-21 13:31:41 +02:00
def wrap_instance(self, instance):
assert isinstance(instance, models.Model)
return self.wrap_model(instance.__class__).create_wrapped_model_class()(self, instance)
2017-06-20 15:39:22 +02:00
def relevant_changed_objects(self):
2017-06-29 14:51:46 +02:00
return self.changed_objects_set.exclude(stale=True).exclude(existing_object_pk__isnull=True, deleted=True)
def fill_changes_cache(self, include_deleted_created=False):
2017-06-21 18:00:12 +02:00
"""
Get all changed objects and fill this ChangeSet's changes cache.
Only executable once, if something is changed later the cache will be automatically updated.
This method gets called automatically when the cache is needed.
Only call it if you need to set include_deleted_created to True.
:param include_deleted_created: Fetch created objects that were deleted.
:rtype: True if the method was executed, else False
2017-06-21 18:00:12 +02:00
"""
2017-06-27 16:10:28 +02:00
if self.changed_objects is not None:
return False
if self.pk is None:
self.changed_objects = {}
return False
2017-06-20 15:39:22 +02:00
if include_deleted_created:
2017-06-29 14:51:46 +02:00
qs = self.changed_objects_set.exclude(stale=True)
2017-06-20 15:39:22 +02:00
else:
qs = self.relevant_changed_objects()
self.changed_objects = {}
for change in qs:
change.update_changeset_cache()
2017-06-29 15:06:14 +02:00
self._last_change_cache = max(change.last_update for change in qs)
return True
2017-06-16 19:19:54 +02:00
2017-06-21 13:48:13 +02:00
"""
Analyse Changes
"""
def get_objects(self):
if self.changed_objects is None:
2017-06-21 13:48:13 +02:00
raise TypeError
# collect pks of relevant objects
object_pks = {}
for change in chain(*(objects.values() for objects in self.changed_objects.values())):
change.add_relevant_object_pks(object_pks)
2017-06-21 13:48:13 +02:00
# retrieve relevant objects
objects = {}
for model, pks in object_pks.items():
created_pks = set(pk for pk in pks if is_created_pk(pk))
existing_pks = pks - created_pks
model_objects = {}
if existing_pks:
for obj in model.objects.filter(pk__in=existing_pks):
if model == LocationSlug:
obj = obj.get_child()
model_objects[obj.pk] = obj
if created_pks:
for pk in created_pks:
model_objects[pk] = self.get_created_object(model, pk, allow_deleted=True)._obj
objects[model] = model_objects
# add LocationSlug objects as their correct model
for pk, obj in objects.get(LocationSlug, {}).items():
objects.setdefault(obj.__class__, {})[pk] = obj
2017-06-21 13:48:13 +02:00
return objects
2017-06-21 13:31:41 +02:00
"""
Lookup changes and created objects
"""
2017-06-21 19:13:23 +02:00
def get_changed_values(self, model: models.Model, name: str) -> tuple:
2017-06-21 18:00:12 +02:00
"""
Get all changes values for a specific field on existing models
:param model: model class
:param name: field name
:return: returns a dictionary with primary keys as keys and new values as values
"""
2017-06-15 17:53:00 +02:00
r = tuple((pk, values[name]) for pk, values in self.updated_existing.get(model, {}).items() if name in values)
return r
def get_changed_object(self, obj) -> ChangedObject:
model = obj.__class__
pk = obj.pk
if pk is None:
return ChangedObject(changeset=self, model_class=model)
self.fill_changes_cache()
objects = tuple(obj for obj in ((submodel, self.changed_objects.get(submodel, {}).get(pk, None))
for submodel in get_submodels(model)) if obj[1] is not None)
if len(objects) > 1:
raise model.MultipleObjectsReturned
if objects:
return objects[0][1]
if is_created_pk(pk):
raise model.DoesNotExist
return ChangedObject(changeset=self, model_class=model, existing_object_pk=pk)
def get_created_object(self, model, pk, get_foreign_objects=False, allow_deleted=False):
2017-06-21 18:00:12 +02:00
"""
Gets a created model instance.
:param model: model class
:param pk: primary key
:param get_foreign_objects: whether to fetch foreign objects and not just set their id to field.attname
:param allow_deleted: return created objects that have already been deleted (needs get_history=True)
:return: a wrapped model instance
"""
self.fill_changes_cache()
2017-06-16 13:00:47 +02:00
if issubclass(model, ModelWrapper):
model = model._obj
2017-06-27 17:07:54 +02:00
obj = self.get_changed_object(model(pk=pk))
if obj.deleted and not allow_deleted:
raise model.DoesNotExist
2017-06-27 17:07:54 +02:00
return obj.get_obj(get_foreign_objects=get_foreign_objects)
2017-06-16 13:00:47 +02:00
2017-06-21 18:00:12 +02:00
def get_created_pks(self, model) -> set:
"""
Returns a set with the primary keys of created objects from this model
"""
self.fill_changes_cache()
if issubclass(model, ModelWrapper):
model = model._obj
return set(self.created_objects.get(model, {}).keys())
2017-06-21 13:31:41 +02:00
"""
Methods for display
"""
@property
def changed_objects_count(self):
2017-06-21 18:00:12 +02:00
"""
Get the number of changed objects.
2017-06-21 18:00:12 +02:00
"""
self.fill_changes_cache()
count = 0
changed_locationslug_pks = set()
for model, objects in self.changed_objects.items():
if issubclass(model, LocationSlug):
if model == LocationRedirect:
continue
changed_locationslug_pks.update(objects.keys())
2017-06-29 16:42:47 +02:00
count += sum(1 for obj in objects.values() if not obj.is_created or not obj.deleted)
2017-06-29 14:27:42 +02:00
count += len(set(obj.updated_fields['target']
for obj in self.changed_objects.get(LocationRedirect, {}).values()) - changed_locationslug_pks)
return count
2017-06-21 13:31:41 +02:00
@property
def count_display(self):
2017-06-21 18:00:12 +02:00
"""
Get %d changed objects display text.
2017-06-21 18:00:12 +02:00
"""
2017-06-21 13:31:41 +02:00
if self.pk is None:
return _('No changed objects')
return (ungettext_lazy('%(num)d changed object', '%(num)d changed objects', 'num') %
{'num': self.changed_objects_count})
2017-06-21 13:31:41 +02:00
@property
2017-06-29 15:06:14 +02:00
def last_change(self):
last_change = cache.get('changeset:%s:last_change' % self.pk)
if last_change is None:
# was not in cache, calculate it
try:
last_change = self.changed_objects_set.aggregate(Max('last_update'))['last_update__max']
except ChangedObject.DoesNotExist:
last_change = self.created
elif self.last_change_cache is None or self.last_change_cache <= last_change:
# was in cache and our local value (if we had one) is not newer
return last_change
else:
# our local value is newer
last_change = self.last_change_cache
2017-06-29 15:06:14 +02:00
# update cache
cache.set('changeset:%s:last_change' % self.pk, last_change, 900)
return last_change
2017-06-21 13:31:41 +02:00
@property
def cache_key(self):
if self.pk is None:
return None
2017-06-29 15:06:14 +02:00
return str(self.pk)+'-'+str(self.last_change)
2017-06-21 13:31:41 +02:00
def get_absolute_url(self):
if self.pk is None:
return ''
return reverse('editor.changesets.detail', kwargs={'pk': self.pk})
def serialize(self):
return OrderedDict((
('id', self.pk),
('author', self.author_id),
('created', None if self.created is None else self.created.isoformat()),
('proposed', None if self.proposed is None else self.proposed.isoformat()),
('applied', None if self.applied is None else self.applied.isoformat()),
('applied_by', None if self.applied_by_id is None else self.applied_by_id),
('changes', tuple(change.serialize() for change in self.changes.all())),
))