replace Change with ChangedObject (wrapper integration still missing)
This commit is contained in:
parent
0ef51ac147
commit
9d5811bc15
12 changed files with 457 additions and 855 deletions
|
@ -1,309 +0,0 @@
|
|||
import json
|
||||
import typing
|
||||
from collections import OrderedDict
|
||||
|
||||
from django.apps import apps
|
||||
from django.conf import settings
|
||||
from django.core.exceptions import ObjectDoesNotExist, ValidationError
|
||||
from django.db import models
|
||||
from django.db.models import FieldDoesNotExist, Q
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
|
||||
from c3nav.editor.utils import get_current_obj, is_created_pk
|
||||
from c3nav.editor.wrappers import ModelInstanceWrapper
|
||||
|
||||
|
||||
class Change(models.Model):
|
||||
ACTIONS = (
|
||||
('create', _('create object')),
|
||||
('delete', _('delete object')),
|
||||
('update', _('update attribute')),
|
||||
('restore', _('restore attribute')),
|
||||
('m2m_add', _('add many to many relation')),
|
||||
('m2m_remove', _('add many to many relation')),
|
||||
)
|
||||
changeset = models.ForeignKey('editor.ChangeSet', on_delete=models.CASCADE, verbose_name=_('Change Set'))
|
||||
author = models.ForeignKey(settings.AUTH_USER_MODEL, null=True, on_delete=models.PROTECT, verbose_name=_('Author'))
|
||||
created = models.DateTimeField(auto_now_add=True, verbose_name=_('created'))
|
||||
action = models.CharField(max_length=16, choices=ACTIONS, verbose_name=_('action'))
|
||||
discarded_by = models.OneToOneField('Change', null=True, on_delete=models.CASCADE, related_name='discards',
|
||||
verbose_name=_('discarded by other change'))
|
||||
model_name = models.CharField(max_length=50, verbose_name=_('model name'))
|
||||
existing_object_pk = models.PositiveIntegerField(null=True, verbose_name=_('id of existing object'))
|
||||
created_object = models.ForeignKey('Change', null=True, on_delete=models.CASCADE, related_name='changed_by',
|
||||
verbose_name=_('changed object'))
|
||||
field_name = models.CharField(max_length=50, null=True, verbose_name=_('field name'))
|
||||
field_value = models.TextField(null=True, verbose_name=_('new field value'))
|
||||
|
||||
class Meta:
|
||||
verbose_name = _('Change')
|
||||
verbose_name_plural = _('Changes')
|
||||
default_related_name = 'changes'
|
||||
ordering = ['created', 'pk']
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self._set_object = None
|
||||
|
||||
@property
|
||||
def model_class(self) -> typing.Optional[typing.Type[models.Model]]:
|
||||
if not self.model_name:
|
||||
return None
|
||||
return apps.get_model('mapdata', self.model_name)
|
||||
|
||||
@model_class.setter
|
||||
def model_class(self, value: typing.Optional[typing.Type[models.Model]]):
|
||||
if value is None:
|
||||
self.model_name = None
|
||||
return
|
||||
if not issubclass(value, models.Model):
|
||||
raise ValueError('value is not a django model')
|
||||
if value._meta.abstract:
|
||||
raise ValueError('value is an abstract model')
|
||||
if value._meta.app_label != 'mapdata':
|
||||
raise ValueError('value is not a mapdata model')
|
||||
self.model_name = value.__name__
|
||||
|
||||
@property
|
||||
def obj_pk(self) -> typing.Union[int, str]:
|
||||
if self.existing_object_pk is not None:
|
||||
return self.existing_object_pk
|
||||
if self.created_object_id is not None:
|
||||
return 'c'+str(self.created_object_id)
|
||||
if self.action == 'create':
|
||||
return 'c'+str(self.pk)
|
||||
raise TypeError('existing_model_pk or created_object have to be set.')
|
||||
|
||||
def other_changes(self):
|
||||
"""
|
||||
get queryset of other active changes on the same object
|
||||
"""
|
||||
qs = self.changeset.changes.filter(~Q(pk=self.pk), model_name=self.model_name, discarded_by__isnull=True)
|
||||
if self.existing_object_pk is not None:
|
||||
return qs.filter(existing_object_pk=self.existing_object_pk)
|
||||
if self.action == 'create':
|
||||
return qs.filter(created_object_id=self.pk)
|
||||
return qs.filter(Q(pk=self.created_object_id) | Q(created_object_id=self.created_object_id))
|
||||
|
||||
@property
|
||||
def obj(self) -> ModelInstanceWrapper:
|
||||
if self._set_object is not None:
|
||||
return self._set_object
|
||||
|
||||
if self.existing_object_pk is not None:
|
||||
if self.created_object is not None:
|
||||
raise TypeError('existing_object_pk and created_object can not both be set.')
|
||||
self._set_object = self.changeset.wrap(self.model_class.objects.get(pk=self.existing_object_pk))
|
||||
# noinspection PyTypeChecker
|
||||
return self._set_object
|
||||
elif self.created_object is not None:
|
||||
if self.created_object.model_class != self.model_class:
|
||||
raise TypeError('created_object model and change model do not match.')
|
||||
if self.created_object.changeset_id != self.changeset_id:
|
||||
raise TypeError('created_object belongs to a different changeset.')
|
||||
return self.changeset.get_created_object(self.model_class, self.created_object_id)
|
||||
raise TypeError('existing_model_pk or created_object have to be set.')
|
||||
|
||||
@obj.setter
|
||||
def obj(self, value: typing.Union[models.Model, ModelInstanceWrapper]):
|
||||
if not isinstance(value, ModelInstanceWrapper):
|
||||
value = self.changeset.wrap(value)
|
||||
|
||||
if is_created_pk(value.pk):
|
||||
if value._changeset.id != self.changeset.pk:
|
||||
raise ValueError('value is a Change instance but belongs to a different changeset.')
|
||||
self.model_class = type(value._obj)
|
||||
self.created_object = Change.objects.get(pk=value.pk[1:])
|
||||
self.created_object_id = int(value.pk[1:])
|
||||
self.existing_object_pk = None
|
||||
self._set_object = value
|
||||
return
|
||||
|
||||
model_class_before = self.model_class
|
||||
self.model_class = type(value._obj) if isinstance(value, ModelInstanceWrapper) else type(value)
|
||||
if value.pk is None:
|
||||
self.model_class = model_class_before
|
||||
raise ValueError('object is not saved yet and cannot be referenced')
|
||||
self.existing_object_pk = value.pk
|
||||
self.created_object = None
|
||||
self._set_object = value
|
||||
|
||||
@property
|
||||
def field(self):
|
||||
return self.model_class._meta.get_field(self.field_name)
|
||||
|
||||
def check_apply_problem(self):
|
||||
if self.discarded_by_id is not None or self.action in ('create', 'restore'):
|
||||
return None
|
||||
|
||||
try:
|
||||
model = self.model_class
|
||||
except LookupError:
|
||||
return _('Type %(model_name)s does not exist any more.' % {'model_name': self.model_name})
|
||||
|
||||
try:
|
||||
obj = get_current_obj(model, self.obj_pk)
|
||||
except model.DoesNotExist:
|
||||
return _('This %(model_title)s does not exist any more.' % {'model_title': model._meta.verbose_name})
|
||||
|
||||
if self.action == 'delete':
|
||||
return None
|
||||
|
||||
if self.action == 'update' and self.field_name.startswith('title_'):
|
||||
return None
|
||||
|
||||
try:
|
||||
field = self.field
|
||||
except FieldDoesNotExist:
|
||||
return _('This field does not exist any more.')
|
||||
value = json.loads(self.field_value)
|
||||
|
||||
if self.action == 'update':
|
||||
if field.many_to_many:
|
||||
return _('This is a m2m field, so it can\'t be updated.')
|
||||
if field.many_to_one:
|
||||
if value is None and not field.null:
|
||||
return _('This field has to be set.')
|
||||
try:
|
||||
self.changeset.wrap(field.related_model).objects.get(pk=value)
|
||||
except ObjectDoesNotExist:
|
||||
return (_('Referenced %(model_title)s does not exist any more.') %
|
||||
{'model_title': field.related_model._meta.verbose_name})
|
||||
return None
|
||||
if field.is_relation:
|
||||
raise NotImplementedError
|
||||
try:
|
||||
field.clean(value, obj)
|
||||
except ValidationError as e:
|
||||
return str(e)
|
||||
return None
|
||||
|
||||
if self.action in ('m2m_add', 'm2m_remove'):
|
||||
if not field.many_to_many:
|
||||
return _('This is not a m2m field, so it can\'t be trated as such.')
|
||||
try:
|
||||
self.changeset.wrap(field.related_model).objects.get(pk=value)
|
||||
except ObjectDoesNotExist:
|
||||
return _('Referenced object does not exist any more.')
|
||||
return None
|
||||
|
||||
@property
|
||||
def can_restore(self):
|
||||
if self.discarded_by_id is not None:
|
||||
return False
|
||||
|
||||
if self.action == 'delete':
|
||||
return not is_created_pk(self.obj_pk)
|
||||
|
||||
if self.action == 'create':
|
||||
return False
|
||||
|
||||
try:
|
||||
obj = get_current_obj(self.model_class, self.obj_pk)
|
||||
field = self.field
|
||||
except (LookupError, ObjectDoesNotExist, FieldDoesNotExist):
|
||||
return True
|
||||
|
||||
if field.many_to_one:
|
||||
return field.null
|
||||
|
||||
if field.name == 'geometry':
|
||||
return False
|
||||
|
||||
try:
|
||||
field.clean(field.get_prep_value(getattr(obj, field.name)), obj)
|
||||
except ValidationError:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def check_has_no_effect(self):
|
||||
if self.discarded_by_id is not None or self.action in ('create', 'restore'):
|
||||
return False
|
||||
|
||||
if self.action == 'delete':
|
||||
if is_created_pk(self.obj_pk):
|
||||
return False
|
||||
|
||||
try:
|
||||
current_obj = get_current_obj(self.model_class, self.obj_pk)
|
||||
except (LookupError, ObjectDoesNotExist):
|
||||
return not is_created_pk(self.obj_pk)
|
||||
|
||||
if self.action == 'delete':
|
||||
return False
|
||||
|
||||
if self.field_name.startswith('title_'):
|
||||
return self.field_value == current_obj.titles.get(self.field_name[6:], '')
|
||||
|
||||
try:
|
||||
field = self.field
|
||||
except FieldDoesNotExist:
|
||||
return True
|
||||
|
||||
if self.action == 'update':
|
||||
if field.many_to_one:
|
||||
return self.field_value == getattr(current_obj, field.attname)
|
||||
if field.is_relation:
|
||||
raise NotImplementedError
|
||||
return self.field_value == field.get_prep_value(getattr(current_obj, field.name))
|
||||
|
||||
if self.action == 'm2m_add':
|
||||
if is_created_pk(self.field_value):
|
||||
return False
|
||||
return getattr(current_obj, field.name).filter(pk=self.field_value).exists()
|
||||
|
||||
if self.action == 'm2m_remove':
|
||||
if is_created_pk(self.field_value):
|
||||
return True
|
||||
return not getattr(current_obj, field.name).filter(pk=self.field_value).exists()
|
||||
|
||||
def restore(self, author):
|
||||
if not self.can_restore:
|
||||
return
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
if self.pk is not None:
|
||||
raise TypeError('change objects can not be edited (use update to set discarded_by)')
|
||||
if self.changeset.proposed is not None or self.changeset.applied is not None:
|
||||
raise TypeError('can not add change object to uneditable changeset.')
|
||||
super().save(*args, **kwargs)
|
||||
|
||||
def delete(self, *args, **kwargs):
|
||||
raise TypeError('change objects can not be deleted directly.')
|
||||
|
||||
def __repr__(self):
|
||||
result = '<Change #%s on ChangeSet #%s: ' % (str(self.pk), str(self.changeset_id))
|
||||
if self.action == 'create':
|
||||
result += 'Create '+repr(self.model_name)
|
||||
elif self.action == 'update':
|
||||
result += ('Update '+repr(self.model_name)+' #'+str(self.obj_pk)+': ' +
|
||||
self.field_name+'='+self.field_value)
|
||||
elif self.action == 'restore':
|
||||
result += ('Restore '+repr(self.model_name)+' #'+str(self.obj_pk)+': '+self.field_name)
|
||||
elif self.action == 'delete':
|
||||
result += 'Delete object '+repr(self.model_name)+' #'+str(self.obj_pk)
|
||||
elif self.action == 'm2m_add':
|
||||
result += ('Update (m2m) object '+repr(self.model_name)+' #'+str(self.obj_pk)+': ' +
|
||||
self.field_name+'.add('+self.field_value+')')
|
||||
elif self.action == 'm2m_remove':
|
||||
result += ('Update (m2m) object '+repr(self.model_name)+' #'+str(self.obj_pk)+': ' +
|
||||
self.field_name+'.remove('+self.field_value+')')
|
||||
result += '>'
|
||||
return result
|
||||
|
||||
def serialize(self):
|
||||
result = OrderedDict((
|
||||
('id', self.pk),
|
||||
('author', self.author_id),
|
||||
('created', None if self.created is None else self.created.isoformat()),
|
||||
('action', self.action),
|
||||
('object_type', self.model_class.__name__.lower()),
|
||||
('object_id', ('c'+str(self.pk)) if self.action == 'create' else self.obj_pk),
|
||||
))
|
||||
if self.action in ('update', 'm2m_add', 'm2m_remove'):
|
||||
result.update(OrderedDict((
|
||||
('name', self.field_name),
|
||||
('value', json.loads(self.field_value)),
|
||||
)))
|
||||
return result
|
157
src/c3nav/editor/models/changedobject.py
Normal file
157
src/c3nav/editor/models/changedobject.py
Normal file
|
@ -0,0 +1,157 @@
|
|||
import typing
|
||||
from itertools import chain
|
||||
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.db import models
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
|
||||
from c3nav.editor.utils import is_created_pk
|
||||
from c3nav.editor.wrappers import ModelInstanceWrapper
|
||||
from c3nav.mapdata.fields import JSONField
|
||||
|
||||
|
||||
class ChangedObject(models.Model):
|
||||
changeset = models.ForeignKey('editor.ChangeSet', on_delete=models.CASCADE, verbose_name=_('Change Set'))
|
||||
created = models.DateTimeField(auto_now_add=True, verbose_name=_('created'))
|
||||
last_update = models.DateTimeField(auto_now=True, verbose_name=_('last update'))
|
||||
content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE)
|
||||
existing_object_pk = models.PositiveIntegerField(null=True, verbose_name=_('id of existing object'))
|
||||
updated_fields = JSONField(default={}, verbose_name=_('updated fields'))
|
||||
m2m_added = JSONField(default={}, verbose_name=_('added m2m values'))
|
||||
m2m_removed = JSONField(default={}, verbose_name=_('removed m2m values'))
|
||||
deleted = models.BooleanField(default=False, verbose_name=_('new field value'))
|
||||
|
||||
class Meta:
|
||||
verbose_name = _('Changed object')
|
||||
verbose_name_plural = _('Changed objects')
|
||||
default_related_name = 'changed_objects_set'
|
||||
unique_together = ('changeset', 'content_type', 'existing_object_pk')
|
||||
ordering = ['created', 'pk']
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
model_class = kwargs.pop('model_class', None)
|
||||
super().__init__(*args, **kwargs)
|
||||
self._set_object = None
|
||||
self._m2m_added_cache = {name: set(values) for name, values in self.m2m_added}
|
||||
self._m2m_removed_cache = {name: set(values) for name, values in self.m2m_added}
|
||||
if model_class is not None:
|
||||
self.model_class = model_class
|
||||
|
||||
@property
|
||||
def model_class(self) -> typing.Optional[typing.Type[models.Model]]:
|
||||
return self.content_type.model_class()
|
||||
|
||||
@model_class.setter
|
||||
def model_class(self, value: typing.Optional[typing.Type[models.Model]]):
|
||||
self.content_type = ContentType.objects.get_for_model(value)
|
||||
|
||||
@property
|
||||
def obj_pk(self) -> typing.Union[int, str]:
|
||||
if not self.is_created:
|
||||
return self.existing_object_pk
|
||||
return 'c'+str(self.pk)
|
||||
|
||||
@property
|
||||
def obj(self) -> ModelInstanceWrapper:
|
||||
return self.get_obj(get_foreign_objects=True)
|
||||
|
||||
@property
|
||||
def is_created(self):
|
||||
return self.existing_object_pk is None
|
||||
|
||||
def get_obj(self, get_foreign_objects=False) -> ModelInstanceWrapper:
|
||||
model = self.model_class
|
||||
|
||||
if not self.is_created:
|
||||
if self._set_object is None:
|
||||
self._set_object = self.changeset.wrap_instance(model.objects.get(pk=self.existing_object_pk))
|
||||
|
||||
# noinspection PyTypeChecker
|
||||
return self._set_object
|
||||
|
||||
pk = self.obj_pk
|
||||
|
||||
obj = model()
|
||||
obj.pk = pk
|
||||
if hasattr(model._meta.pk, 'related_model'):
|
||||
setattr(obj, model._meta.pk.related_model._meta.pk.attname, pk)
|
||||
obj._state.adding = False
|
||||
|
||||
for name, value in self.updated_fields.items():
|
||||
if name.startswith('title_'):
|
||||
if value:
|
||||
obj.titles[name[6:]] = value
|
||||
continue
|
||||
|
||||
field = model._meta.get_field(name)
|
||||
|
||||
if field.many_to_many:
|
||||
continue
|
||||
|
||||
if field.many_to_one:
|
||||
setattr(obj, field.attname, value)
|
||||
if is_created_pk(value):
|
||||
setattr(obj, field.get_cache_name(), self.changeset.get_created_object(field.related_model, value))
|
||||
elif get_foreign_objects:
|
||||
related_obj = self.changeset.wrap_model(field.related_model).objects.get(pk=value)
|
||||
setattr(obj, field.get_cache_name(), related_obj)
|
||||
continue
|
||||
|
||||
setattr(obj, name, field.to_python(value))
|
||||
return self.changeset.wrap_instance(obj)
|
||||
|
||||
def add_relevant_object_pks(self, object_pks):
|
||||
object_pks.setdefault(self.model_class, set()).add(self.obj_pk)
|
||||
for name, value in self.updated_fields.items():
|
||||
if name.startswith('title_'):
|
||||
continue
|
||||
field = self.model_class._meta.get_field(name)
|
||||
if field.is_relation:
|
||||
object_pks.setdefault(field.related_model, set()).add(value)
|
||||
|
||||
for name, value in chain(self._m2m_added_cache.items(), self._m2m_removed_cache.items()):
|
||||
field = self.model_class._meta.get_field(name)
|
||||
object_pks.setdefault(field.related_model, set()).update(value)
|
||||
|
||||
def update_changeset_cache(self):
|
||||
if self.pk is None:
|
||||
return
|
||||
|
||||
model = self.model_class
|
||||
pk = self.obj_pk
|
||||
|
||||
self.changeset.changed_objects.setdefault(model, {})[pk] = self
|
||||
|
||||
if self.is_created:
|
||||
if not self.deleted:
|
||||
self.changeset.created_objects.setdefault(model, {})[pk] = self.updated_fields
|
||||
self.changeset.ever_created_objects.setdefault(model, {})[pk] = self.updated_fields
|
||||
else:
|
||||
if not self.deleted:
|
||||
self.changeset.updated_existing.setdefault(model, {})[pk] = self.updated_fields
|
||||
self.changeset.deleted_existing.setdefault(model, set()).discard(pk)
|
||||
else:
|
||||
self.changeset.updated_existing.setdefault(model, {}).pop(pk, None)
|
||||
self.changeset.deleted_existing.setdefault(model, set()).add(pk)
|
||||
|
||||
if not self.deleted:
|
||||
self.changeset.m2m_added.get(model, {})[pk] = self._m2m_added_cache
|
||||
self.changeset.m2m_removed.get(model, {})[pk] = self._m2m_removed_cache
|
||||
else:
|
||||
self.changeset.m2m_added.get(model, {}).pop(pk, None)
|
||||
self.changeset.m2m_removed.get(model, {}).pop(pk, None)
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
self.m2m_added = {name: tuple(values) for name, values in self._m2m_added_cache}
|
||||
self.m2m_removed = {name: tuple(values) for name, values in self._m2m_added_cache}
|
||||
if self.changeset.proposed is not None or self.changeset.applied is not None:
|
||||
raise TypeError('can not add change object to uneditable changeset.')
|
||||
super().save(*args, **kwargs)
|
||||
if not self.changeset.fill_changes_cache():
|
||||
self.update_changeset_cache()
|
||||
|
||||
def delete(self, *args, **kwargs):
|
||||
raise TypeError('change objects can not be deleted directly.')
|
||||
|
||||
def __repr__(self):
|
||||
return '<ChangedObject #%s on ChangeSet #%s>' % (str(self.pk), str(self.changeset_id))
|
|
@ -1,18 +1,19 @@
|
|||
import json
|
||||
from collections import OrderedDict
|
||||
from itertools import chain
|
||||
from operator import attrgetter
|
||||
|
||||
from django.apps import apps
|
||||
from django.conf import settings
|
||||
from django.core.serializers.json import DjangoJSONEncoder
|
||||
from django.db import models, transaction
|
||||
from django.db.models import Q
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.db import models
|
||||
from django.db.models import Max, Q
|
||||
from django.urls import reverse
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
from django.utils.translation import ungettext_lazy
|
||||
|
||||
from c3nav.editor.models.change import Change
|
||||
from c3nav.editor.utils import get_current_obj, get_field_value, is_created_pk
|
||||
from c3nav.editor.wrappers import ModelInstanceWrapper, ModelWrapper
|
||||
from c3nav.editor.models.changedobject import ChangedObject
|
||||
from c3nav.editor.utils import is_created_pk
|
||||
from c3nav.editor.wrappers import ModelWrapper
|
||||
from c3nav.mapdata.models import LocationSlug
|
||||
from c3nav.mapdata.models.locations import LocationRedirect
|
||||
from c3nav.mapdata.utils.models import get_submodels
|
||||
|
@ -35,7 +36,8 @@ class ChangeSet(models.Model):
|
|||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.default_author = None
|
||||
self.changes_qs = None
|
||||
self.changed_objects = None
|
||||
|
||||
self.ever_created_objects = {}
|
||||
self.created_objects = {}
|
||||
self.updated_existing = {}
|
||||
|
@ -111,129 +113,53 @@ class ChangeSet(models.Model):
|
|||
"""
|
||||
Wrap Objects
|
||||
"""
|
||||
def wrap(self, obj, author=None):
|
||||
"""
|
||||
Wrap the given object in a changeset wrapper.
|
||||
:param obj: A model, a model instance or the name of a model as a string.
|
||||
:param author: Author to which ne changes will be assigned. This changesets default author (if set) if None.
|
||||
"""
|
||||
self.parse_changes()
|
||||
if author is None:
|
||||
author = self.default_author
|
||||
if author is not None and not author.is_authenticated:
|
||||
author = None
|
||||
if isinstance(obj, str):
|
||||
return ModelWrapper(self, apps.get_model('mapdata', obj), author)
|
||||
if isinstance(obj, type) and issubclass(obj, models.Model):
|
||||
return ModelWrapper(self, obj, author)
|
||||
if isinstance(obj, models.Model):
|
||||
return ModelWrapper(self, type(obj), author).create_wrapped_model_class()(self, obj, author)
|
||||
raise ValueError
|
||||
def wrap_model(self, model):
|
||||
if isinstance(model, str):
|
||||
model = apps.get_model('mapdata', model)
|
||||
assert isinstance(model, type) and issubclass(model, models.Model)
|
||||
return ModelWrapper(self, model)
|
||||
|
||||
"""
|
||||
Parse Changes
|
||||
"""
|
||||
def relevant_changes(self):
|
||||
"""
|
||||
Get all changes of this queryset that have not been discarded and do not restore original data.
|
||||
You should not call this, but instead call parse_changes(), it will store the result in self.changes_qs.
|
||||
"""
|
||||
qs = self.changes.filter(discarded_by__isnull=True).exclude(action='restore')
|
||||
qs = qs.exclude(action='delete', created_object_id__isnull=False)
|
||||
return qs
|
||||
def wrap_instance(self, instance):
|
||||
assert isinstance(instance, models.Model)
|
||||
return self.wrap_model(instance.__class__).create_wrapped_model_class()(self, instance)
|
||||
|
||||
def parse_changes(self, get_history=False):
|
||||
"""
|
||||
Parse changes of this changeset so they can be reflected when querying data.
|
||||
Only executable once, if changes are added later they are automatically parsed.
|
||||
This method gets automatically called when parsed changes are needed or when adding a new change.
|
||||
The queryset used/created by this method can be found in changes_qs afterwards.
|
||||
:param get_history: Whether to get all changes (True) or only relevant ones (False)
|
||||
"""
|
||||
if self.pk is None or self.changes_qs is not None:
|
||||
return
|
||||
def relevant_changed_objects(self):
|
||||
return self.changed_objects_set.exclude(existing_object_pk__isnull=True, deleted=True)
|
||||
|
||||
if get_history:
|
||||
self.changes_qs = self.changes.all()
|
||||
def fill_changes_cache(self, include_deleted_created=False):
|
||||
"""
|
||||
Get all changed objects and fill this ChangeSet's changes cache.
|
||||
Only executable once, if something is changed later the cache will be automatically updated.
|
||||
This method gets called automatically when the cache is needed.
|
||||
Only call it if you need to set include_deleted_created to True.
|
||||
:param include_deleted_created: Fetch created objects that were deleted.
|
||||
:rtype: True if the method was executed, else False
|
||||
"""
|
||||
if self.pk is None or self.changed_objects is not None:
|
||||
return False
|
||||
|
||||
if include_deleted_created:
|
||||
qs = self.changed_objects_set.all()
|
||||
else:
|
||||
self.changes_qs = self.relevant_changes()
|
||||
qs = self.relevant_changed_objects()
|
||||
|
||||
# noinspection PyTypeChecker
|
||||
for change in self.changes_qs:
|
||||
self._parse_change(change)
|
||||
self.changed_objects = {}
|
||||
for change in qs:
|
||||
change.update_changeset_cache()
|
||||
|
||||
def _parse_change(self, change):
|
||||
self._last_change_pk = change.pk
|
||||
|
||||
model = change.model_class
|
||||
pk = change.obj_pk
|
||||
if change.action == 'create':
|
||||
new = {}
|
||||
self.created_objects.setdefault(model, {})[pk] = new
|
||||
self.ever_created_objects.setdefault(model, {})[pk] = new
|
||||
return
|
||||
elif change.action == 'delete':
|
||||
if not is_created_pk(pk):
|
||||
self.deleted_existing.setdefault(model, set()).add(pk)
|
||||
else:
|
||||
self.created_objects[model].pop(pk)
|
||||
self.m2m_added.get(model, {}).pop(pk, None)
|
||||
self.m2m_removed.get(model, {}).pop(pk, None)
|
||||
return
|
||||
|
||||
name = change.field_name
|
||||
|
||||
if change.action == 'restore' and change.field_value is None:
|
||||
if is_created_pk(pk):
|
||||
self.created_objects[model][pk].pop(name, None)
|
||||
else:
|
||||
self.updated_existing.setdefault(model, {}).setdefault(pk, {}).pop(name, None)
|
||||
return
|
||||
|
||||
value = json.loads(change.field_value)
|
||||
if change.action == 'update':
|
||||
if is_created_pk(pk):
|
||||
self.created_objects[model][pk][name] = value
|
||||
else:
|
||||
self.updated_existing.setdefault(model, {}).setdefault(pk, {})[name] = value
|
||||
|
||||
if change.action == 'restore':
|
||||
self.m2m_removed.get(model, {}).get(pk, {}).get(name, set()).discard(value)
|
||||
self.m2m_added.get(model, {}).get(pk, {}).get(name, set()).discard(value)
|
||||
elif change.action == 'm2m_add':
|
||||
m2m_removed = self.m2m_removed.get(model, {}).get(pk, {}).get(name, ())
|
||||
if value in m2m_removed:
|
||||
m2m_removed.remove(value)
|
||||
self.m2m_added.setdefault(model, {}).setdefault(pk, {}).setdefault(name, set()).add(value)
|
||||
elif change.action == 'm2m_remove':
|
||||
m2m_added = self.m2m_added.get(model, {}).get(pk, {}).get(name, ())
|
||||
if value in m2m_added:
|
||||
m2m_added.discard(value)
|
||||
self.m2m_removed.setdefault(model, {}).setdefault(pk, {}).setdefault(name, set()).add(value)
|
||||
return True
|
||||
|
||||
"""
|
||||
Analyse Changes
|
||||
"""
|
||||
def get_objects(self):
|
||||
if self.changes_qs is None:
|
||||
if self.changed_objects is None:
|
||||
raise TypeError
|
||||
|
||||
# collect pks of relevant objects
|
||||
object_pks = {}
|
||||
for change in self.changes_qs:
|
||||
object_pks.setdefault(change.model_class, set()).add(change.obj_pk)
|
||||
model = None
|
||||
if change.action == 'update':
|
||||
if change.model_class == LocationRedirect:
|
||||
if change.field_name == 'target':
|
||||
object_pks.setdefault(LocationSlug, set()).add(json.loads(change.field_value))
|
||||
continue
|
||||
elif not change.field_name.startswith('title_'):
|
||||
model = getattr(change.field, 'related_model', None)
|
||||
if change.action in ('m2m_add', 'm2m_remove'):
|
||||
model = change.field.related_model
|
||||
if model is not None:
|
||||
object_pks.setdefault(model, set()).add(json.loads(change.field_value))
|
||||
for change in chain(self.changed_objects.values()):
|
||||
change.add_relevant_object_pks(object_pks)
|
||||
|
||||
# retrieve relevant objects
|
||||
objects = {}
|
||||
|
@ -266,58 +192,38 @@ class ChangeSet(models.Model):
|
|||
r = tuple((pk, values[name]) for pk, values in self.updated_existing.get(model, {}).items() if name in values)
|
||||
return r
|
||||
|
||||
def get_created_object(self, model, pk, author=None, get_foreign_objects=False, allow_deleted=False):
|
||||
def get_changed_object(self, model, pk=None):
|
||||
self.fill_changes_cache()
|
||||
|
||||
objects = tuple(obj for obj in ((submodel, self.changed_objects.get(submodel, {}).get(pk, None))
|
||||
for submodel in get_submodels(model)) if obj[1] is not None)
|
||||
if len(objects) > 1:
|
||||
raise model.MultipleObjectsReturned
|
||||
if objects:
|
||||
return objects[0]
|
||||
|
||||
if is_created_pk(pk):
|
||||
raise model.DoesNotExist
|
||||
|
||||
return ChangedObject(changeset=self, model_class=model, existing_obj_pk=pk)
|
||||
|
||||
def get_created_object(self, model, pk, get_foreign_objects=False, allow_deleted=False):
|
||||
"""
|
||||
Gets a created model instance.
|
||||
:param model: model class
|
||||
:param pk: primary key
|
||||
:param author: overwrite default author for changes made to that model
|
||||
:param get_foreign_objects: whether to fetch foreign objects and not just set their id to field.attname
|
||||
:param allow_deleted: return created objects that have already been deleted (needs get_history=True)
|
||||
:return: a wrapped model instance
|
||||
"""
|
||||
self.parse_changes()
|
||||
self.fill_changes_cache()
|
||||
if issubclass(model, ModelWrapper):
|
||||
model = model._obj
|
||||
|
||||
objects = self.ever_created_objects if allow_deleted else self.created_objects
|
||||
|
||||
objects = tuple(obj for obj in ((submodel, objects.get(submodel, {}).get(pk, None))
|
||||
for submodel in get_submodels(model)) if obj[1] is not None)
|
||||
if not objects:
|
||||
object = self.get_changed_object(model, pk)
|
||||
if object.deleted and not allow_deleted:
|
||||
raise model.DoesNotExist
|
||||
if len(objects) > 1:
|
||||
raise model.MultipleObjectsReturned
|
||||
|
||||
model, data = objects[0]
|
||||
|
||||
obj = model()
|
||||
obj.pk = pk
|
||||
if hasattr(model._meta.pk, 'related_model'):
|
||||
setattr(obj, model._meta.pk.related_model._meta.pk.attname, pk)
|
||||
obj._state.adding = False
|
||||
|
||||
for name, value in data.items():
|
||||
if name.startswith('title_'):
|
||||
if value:
|
||||
obj.titles[name[6:]] = value
|
||||
continue
|
||||
|
||||
field = model._meta.get_field(name)
|
||||
|
||||
if field.many_to_many:
|
||||
continue
|
||||
|
||||
if field.many_to_one:
|
||||
setattr(obj, field.attname, value)
|
||||
if is_created_pk(value):
|
||||
setattr(obj, field.get_cache_name(), self.get_created_object(field.related_model, value))
|
||||
elif get_foreign_objects:
|
||||
setattr(obj, field.get_cache_name(), self.wrap(field.related_model.objects.get(pk=value)))
|
||||
continue
|
||||
|
||||
setattr(obj, name, field.to_python(value))
|
||||
return self.wrap(obj, author=author)
|
||||
return object.get_obj(get_foreign_objects=get_foreign_objects)
|
||||
|
||||
def get_created_pks(self, model) -> set:
|
||||
"""
|
||||
|
@ -327,144 +233,29 @@ class ChangeSet(models.Model):
|
|||
model = model._obj
|
||||
return set(self.created_objects.get(model, {}).keys())
|
||||
|
||||
"""
|
||||
add changes
|
||||
"""
|
||||
def _new_change(self, author, **kwargs):
|
||||
if self.pk is None:
|
||||
if self.session_id is None:
|
||||
try:
|
||||
# noinspection PyUnresolvedReferences
|
||||
session = self.request.session
|
||||
if session.session_key is None:
|
||||
session.save()
|
||||
self.session_id = session.session_key
|
||||
except AttributeError:
|
||||
pass # ok, so lets keep it this way
|
||||
self.save()
|
||||
self.parse_changes()
|
||||
change = Change(changeset=self)
|
||||
change.changeset_id = self.pk
|
||||
author = self.default_author if author is None else author
|
||||
if author is not None and author.is_authenticated:
|
||||
change.author = author
|
||||
for name, value in kwargs.items():
|
||||
setattr(change, name, value)
|
||||
change.save()
|
||||
self._parse_change(change)
|
||||
return change
|
||||
|
||||
def add_create(self, obj, author=None):
|
||||
"""
|
||||
Creates a new object in this changeset. Called when a new ModelInstanceWrapper is saved.
|
||||
"""
|
||||
change = self._new_change(author=author, action='create', model_class=type(obj._obj))
|
||||
obj.pk = 'c%d' % change.pk
|
||||
|
||||
def _add_value(self, action, obj, name, value, author=None):
|
||||
return self._new_change(author=author, action=action, obj=obj, field_name=name,
|
||||
field_value=json.dumps(value, ensure_ascii=False, cls=DjangoJSONEncoder))
|
||||
|
||||
def add_restore(self, obj, name, value=None, author=None):
|
||||
"""
|
||||
Restore a models field value (= remove it from the changeset).
|
||||
"""
|
||||
return self._new_change(author=author, action='restore', obj=obj, field_name=name, field_value=value)
|
||||
|
||||
def add_update(self, obj, name, value, author=None):
|
||||
"""
|
||||
Update a models field value. Called when a ModelInstanceWrapper is saved.
|
||||
"""
|
||||
if isinstance(obj, ModelInstanceWrapper):
|
||||
obj = obj._obj
|
||||
model = type(obj)
|
||||
with transaction.atomic():
|
||||
current_obj = get_current_obj(model, obj.pk)
|
||||
current_value = get_field_value(current_obj, name)
|
||||
|
||||
if current_value != value:
|
||||
change = self._add_value('update', obj, name, value, author)
|
||||
else:
|
||||
change = self.add_restore(obj, name, author)
|
||||
change.other_changes().filter(field_name=name).update(discarded_by=change)
|
||||
return change
|
||||
|
||||
def add_m2m_add(self, obj, name, value, author=None):
|
||||
"""
|
||||
Add an object to a m2m relation. Called by ManyRelatedManagerWrapper.
|
||||
"""
|
||||
if isinstance(obj, ModelInstanceWrapper):
|
||||
obj = obj._obj
|
||||
with transaction.atomic():
|
||||
if is_created_pk(obj.pk) or is_created_pk(value) or not getattr(obj, name).filter(pk=value).exists():
|
||||
change = self._add_value('m2m_add', obj, name, value, author)
|
||||
else:
|
||||
change = self.add_restore(obj, name, value, author)
|
||||
change.other_changes().filter(field_name=name, field_value=change.field_value).update(discarded_by=change)
|
||||
return change
|
||||
|
||||
def add_m2m_remove(self, obj, name, value, author=None):
|
||||
"""
|
||||
Remove an object from a m2m reltation. Called by ManyRelatedManagerWrapper.
|
||||
"""
|
||||
if isinstance(obj, ModelInstanceWrapper):
|
||||
obj = obj._obj
|
||||
with transaction.atomic():
|
||||
if is_created_pk(obj.pk) or is_created_pk(value) or not getattr(obj, name).filter(pk=value).exists():
|
||||
change = self.add_restore(obj, name, value, author)
|
||||
else:
|
||||
change = self._add_value('m2m_remove', obj, name, value, author)
|
||||
change.other_changes().filter(field_name=name, field_value=change.field_value).update(discarded_by=change)
|
||||
return change
|
||||
|
||||
def add_delete(self, obj, author=None):
|
||||
"""
|
||||
Delete an object. Called by ModelInstanceWrapper.delete().
|
||||
"""
|
||||
with transaction.atomic():
|
||||
change = self._new_change(author=author, action='delete', obj=obj)
|
||||
change.other_changes().update(discarded_by=change)
|
||||
return change
|
||||
|
||||
"""
|
||||
Methods for display
|
||||
"""
|
||||
@property
|
||||
def changes_count(self):
|
||||
def changed_objects_count(self):
|
||||
"""
|
||||
Get the number of relevant changes. Does not need a query if changes are already parsed.
|
||||
Get the number of changed objects. Does not need a query if cache is already filled.
|
||||
"""
|
||||
if self.changes_qs is None:
|
||||
return self.relevant_changes().exclude(model_name='LocationRedirect', action='update').count()
|
||||
if self.changed_objects is None:
|
||||
location_redirect_type = ContentType.objects.get_for_model(LocationRedirect)
|
||||
return self.relevant_changed_objects().exclude(content_type=location_redirect_type).count()
|
||||
|
||||
result = 0
|
||||
|
||||
for model, objects in self.created_objects.items():
|
||||
result += len(objects)
|
||||
if model == LocationRedirect:
|
||||
continue
|
||||
result += sum(len(values) for values in objects.values())
|
||||
|
||||
for objects in self.updated_existing.values():
|
||||
result += sum(len(values) for values in objects.values())
|
||||
|
||||
result += sum(len(objs) for objs in self.deleted_existing.values())
|
||||
|
||||
for m2m in self.m2m_added, self.m2m_removed:
|
||||
for objects in m2m.values():
|
||||
for obj in objects.values():
|
||||
result += sum(len(values) for values in obj.values())
|
||||
|
||||
return result
|
||||
return sum((len(objects) for model, objects in self.changed_objects.items() if model != LocationRedirect))
|
||||
|
||||
@property
|
||||
def count_display(self):
|
||||
"""
|
||||
Get “%d changes” display text.
|
||||
Get “%d changed objects” display text.
|
||||
"""
|
||||
if self.pk is None:
|
||||
return _('No changes')
|
||||
return ungettext_lazy('%(num)d change', '%(num)d changes', 'num') % {'num': self.changes_count}
|
||||
return _('No changed objects')
|
||||
return (ungettext_lazy('%(num)d changed object', '%(num)d changed objects', 'num') %
|
||||
{'num': self.changed_objects_count})
|
||||
|
||||
@property
|
||||
def title(self):
|
||||
|
@ -472,11 +263,18 @@ class ChangeSet(models.Model):
|
|||
return ''
|
||||
return _('Changeset #%d') % self.pk
|
||||
|
||||
@property
|
||||
def last_update(self):
|
||||
if self.changed_objects is None:
|
||||
return self.relevant_changed_objects().aggregate(Max('last_update'))['last_update__max']
|
||||
|
||||
return max(chain(*self.changed_objects.values()), key=attrgetter('last_update'))
|
||||
|
||||
@property
|
||||
def cache_key(self):
|
||||
if self.pk is None:
|
||||
return None
|
||||
return str(self.pk)+'-'+str(self._last_change_pk)
|
||||
return str(self.pk)+'-'+str(self.last_update)
|
||||
|
||||
def get_absolute_url(self):
|
||||
if self.pk is None:
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue