team-3/src/c3nav/editor/models.py

360 lines
15 KiB
Python
Raw Normal View History

2017-06-13 03:31:10 +02:00
import json
2017-06-12 14:52:08 +02:00
import typing
from django.apps import apps
from django.conf import settings
2017-06-12 14:52:08 +02:00
from django.core.exceptions import ObjectDoesNotExist, ValidationError
from django.db import models
2017-06-12 16:59:57 +02:00
from django.db.models import Q
2017-06-13 15:31:54 +02:00
from django.urls import reverse
from django.utils.functional import cached_property
from django.utils.translation import ugettext_lazy as _
from django.utils.translation import ungettext_lazy
from c3nav.editor.wrappers import ModelInstanceWrapper, ModelWrapper
class ChangeSet(models.Model):
created = models.DateTimeField(auto_now_add=True, verbose_name=_('created'))
2017-06-12 16:59:57 +02:00
author = models.ForeignKey(settings.AUTH_USER_MODEL, null=True, on_delete=models.PROTECT, verbose_name=_('Author'))
proposed = models.DateTimeField(null=True, verbose_name=_('proposed'))
applied = models.DateTimeField(null=True, verbose_name=_('applied'))
applied_by = models.ForeignKey(settings.AUTH_USER_MODEL, null=True, on_delete=models.PROTECT,
related_name='applied_changesets', verbose_name=_('applied by'))
class Meta:
verbose_name = _('Change Set')
verbose_name_plural = _('Change Sets')
default_related_name = 'changesets'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.default_author = None
self.parsed = False
self.created_objects = {}
self.updated_existing = {}
self.deleted_existing = {}
self.m2m_add_existing = {}
self.m2m_remove_existing = {}
def parse_changes(self):
if self.parsed:
return
for change in self.changes.all():
self._parse_change(change)
self.parsed = True
def _parse_change(self, change):
if change.action == 'delchange':
raise NotImplementedError
model = change.model_class
if change.action == 'create':
self.created_objects.setdefault(model, {})[change.pk] = {}
elif change.action == 'delete':
if change.existing_object_pk is not None:
self.deleted_existing.setdefault(model, set()).add(change.existing_object_pk)
else:
self.created_objects[model].pop(change.pk)
return
name = change.field_name
value = json.loads(change.field_value)
if change.existing_object_pk is None:
if change.action == 'update':
self.created_objects[model][name] = value
elif change.action == 'm2m_add':
self.created_objects[model].setdefault(name, set()).add(value)
elif change.action == 'm2m_remove':
self.created_objects[model][name].remove(value)
return
if change.action == 'update':
value = json.loads(change.field_value)
self.updated_existing.setdefault(model, {}).setdefault(change.obj_pk, {})[name] = value
elif change.action == 'm2m_add':
m2m_remove_existing = self.m2m_remove_existing.get(model, {}).get(change.obj_pk, ())
if value in m2m_remove_existing:
m2m_remove_existing.remove(value)
else:
self.m2m_add_existing.setdefault(model, {}).setdefault(change.obj_pk, set()).add(value)
elif change.action == 'm2m_remove':
m2m_add_existing = self.m2m_add_existing.get(model, {}).get(change.obj_pk, ())
if value in m2m_add_existing:
m2m_add_existing.remove(value)
else:
self.m2m_remove_existing.setdefault(model, {}).setdefault(change.obj_pk, set()).add(value)
def get_changed_values(self, model, name):
return tuple((pk, values[name])
for pk, values in self.updated_existing.get(model, {}).items() if name in values)
@classmethod
2017-06-13 14:42:08 +02:00
def qs_base(cls, hide_applied=True):
qs = cls.objects.prefetch_related('changes').select_related('author')
if hide_applied:
qs = qs.filter(applied__isnull=True)
return qs
@classmethod
def qs_for_request(cls, request):
qs = cls.qs_base()
if request.user.is_authenticated():
qs = qs.filter(Q(author__isnull=True) | Q(author=request.user))
else:
qs = qs.filter(author__isnull=True)
return qs
@classmethod
def get_for_request(cls, request):
2017-06-13 14:42:08 +02:00
qs = cls.qs_for_request(request)
changeset_pk = request.session.get('changeset_pk', None)
if changeset_pk is not None:
changeset = qs.filter(pk=changeset_pk).first()
if changeset is not None:
changeset.default_author = request.user
if changeset.author_id is None and request.user.is_authenticated():
changeset.author = request.user
changeset.save()
return changeset
new_changeset = cls()
if request.user.is_authenticated():
2017-06-13 14:42:08 +02:00
changeset = qs.filter(Q(author=request.user)).order_by('-created').first()
if changeset is not None:
request.session['changeset_pk'] = changeset.pk
changeset.default_author = request.user
return changeset
new_changeset.author = request.user
new_changeset.save()
request.session['changeset_pk'] = new_changeset.pk
new_changeset.default_author = request.user
return new_changeset
2017-06-13 15:31:54 +02:00
def get_absolute_url(self):
return reverse('editor.changesets.detail', kwargs={'pk': self.pk})
@cached_property
def undeleted_changes_count(self):
return len([True for change in self.changes.all() if change.deletes_change_id is None])
2017-06-13 15:31:54 +02:00
@property
def title(self):
return _('Changeset #%d') % self.pk
@property
def count_display(self):
return ungettext_lazy('%(num)d Change', '%(num)d Changes', 'num') % {'num': self.undeleted_changes_count}
def wrap(self, obj, author=None):
self.parse_changes()
if author is None:
author = self.default_author
2017-06-13 15:31:54 +02:00
if author is not None and not author.is_authenticated():
author = None
if isinstance(obj, str):
return ModelWrapper(self, apps.get_model('mapdata', obj), author)
2017-06-13 03:31:10 +02:00
if isinstance(obj, type) and issubclass(obj, models.Model):
return ModelWrapper(self, obj, author)
if isinstance(obj, models.Model):
return ModelWrapper(self, type(obj), author).create_wrapped_model_class()(self, obj, author)
raise ValueError
def _new_change(self, author, **kwargs):
self.parse_changes()
change = Change(changeset=self)
change.changeset_id = self.pk
2017-06-13 03:31:10 +02:00
author = self.default_author if author is None else author
if author is not None and author.is_authenticated():
change.author = author
for name, value in kwargs.items():
setattr(change, name, value)
2017-06-13 03:31:10 +02:00
change.save()
2017-06-13 16:25:01 +02:00
self._parse_change(change)
return change
2017-06-13 03:31:10 +02:00
def add_create(self, obj, author=None):
change = self._new_change(author=author, action='create', model_class=type(obj._obj))
obj.pk = 'c%d' % change.pk
2017-06-13 14:21:01 +02:00
def _add_value(self, action, obj, name, value, author=None):
return self._new_change(author=author, action=action, obj=obj,
2017-06-13 03:31:10 +02:00
field_name=name, field_value=json.dumps(value, ensure_ascii=False))
2017-06-13 14:21:01 +02:00
def add_update(self, obj, name, value, author=None):
return self._add_value('update', obj, name, value, author)
def add_m2m_add(self, obj, name, value, author=None):
return self._add_value('m2m_add', obj, name, value, author)
def add_m2m_remove(self, obj, name, value, author=None):
return self._add_value('m2m_remove', obj, name, value, author)
2017-06-13 03:39:12 +02:00
def add_delete(self, obj, author=None):
return self._new_change(author=author, action='delete', obj=obj)
class Change(models.Model):
2017-06-12 14:52:08 +02:00
ACTIONS = (
('delchange', _('delete change')),
('create', _('create object')),
('delete', _('delete object')),
('update', _('update attribute')),
('m2m_add', _('add many to many relation')),
('m2m_remove', _('add many to many relation')),
)
changeset = models.ForeignKey(ChangeSet, on_delete=models.CASCADE, verbose_name=_('Change Set'))
2017-06-12 18:26:24 +02:00
author = models.ForeignKey(settings.AUTH_USER_MODEL, null=True, on_delete=models.PROTECT, verbose_name=_('Author'))
created = models.DateTimeField(auto_now_add=True, verbose_name=_('created'))
2017-06-12 14:52:08 +02:00
action = models.CharField(max_length=16, choices=ACTIONS, verbose_name=_('action'))
deletes_change = models.OneToOneField('Change', null=True, on_delete=models.CASCADE, related_name='deleted_by',
verbose_name=_('deletes change'))
model_name = models.CharField(max_length=50, null=True, verbose_name=_('model name'))
existing_object_pk = models.PositiveIntegerField(null=True, verbose_name=_('id of existing object'))
created_object = models.ForeignKey('Change', null=True, on_delete=models.CASCADE, related_name='changed_by',
verbose_name=_('changed object'))
field_name = models.CharField(max_length=50, null=True, verbose_name=_('field name'))
field_value = models.TextField(null=True, verbose_name=_('new field value'))
class Meta:
verbose_name = _('Change')
verbose_name_plural = _('Changes')
default_related_name = 'changes'
2017-06-13 15:31:54 +02:00
ordering = ['created', 'pk']
2017-06-12 14:52:08 +02:00
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._set_object = None
@property
2017-06-13 14:49:20 +02:00
def model_class(self) -> typing.Optional[typing.Type[models.Model]]:
2017-06-12 14:52:08 +02:00
if self.model_name is None:
2017-06-13 03:31:10 +02:00
return None
2017-06-12 14:52:08 +02:00
return apps.get_model('mapdata', self.model_name)
@model_class.setter
2017-06-13 03:31:10 +02:00
def model_class(self, value: typing.Optional[typing.Type[models.Model]]):
if value is None:
self.model_name = None
return
2017-06-12 14:52:08 +02:00
if not issubclass(value, models.Model):
raise ValueError('value is not a django model')
if value._meta.abstract:
raise ValueError('value is an abstract model')
if value._meta.app_label != 'mapdata':
raise ValueError('value is not a mapdata model')
self.model_name = value.__name__
@property
def obj_pk(self) -> typing.Union[int, str]:
if self._set_object is not None:
return self._set_object.pk
if self.existing_object_pk is not None:
return self.existing_object_pk
if self.created_object is not None:
return 'c'+str(self.created_object.changeset_id)
raise TypeError('existing_model_pk or created_object have to be set.')
2017-06-12 14:52:08 +02:00
@property
2017-06-13 15:31:54 +02:00
def obj(self) -> ModelInstanceWrapper:
2017-06-13 03:31:10 +02:00
if self._set_object is not None:
return self._set_object
2017-06-12 14:52:08 +02:00
if self.existing_object_pk is not None:
if self.created_object is not None:
raise TypeError('existing_object_pk and created_object can not both be set.')
2017-06-13 15:31:54 +02:00
self._set_object = self.changeset.wrap(self.model_class.objects.get(pk=self.existing_object_pk))
2017-06-13 03:31:10 +02:00
# noinspection PyTypeChecker
2017-06-12 14:52:08 +02:00
return self._set_object
elif self.created_object is not None:
if self.created_object.model_class != self.model_class:
raise TypeError('created_object model and change model do not match.')
if self.created_object.changeset_id != self.changeset_id:
raise TypeError('created_object belongs to a different changeset.')
2017-06-13 03:31:10 +02:00
raise NotImplementedError
2017-06-12 14:52:08 +02:00
raise TypeError('existing_model_pk or created_object have to be set.')
2017-06-12 18:29:21 +02:00
@obj.setter
2017-06-13 14:49:20 +02:00
def obj(self, value: typing.Union[models.Model, ModelInstanceWrapper]):
2017-06-13 14:21:01 +02:00
if not isinstance(value, ModelInstanceWrapper):
value = self.changeset.wrap(value)
if isinstance(value.pk, str):
2017-06-13 03:31:10 +02:00
if value._changeset.id != self.changeset.pk:
2017-06-12 14:52:08 +02:00
raise ValueError('value is a Change instance but belongs to a different changeset.')
2017-06-13 03:31:10 +02:00
self.model_class = type(value._obj)
self.created_object = Change.objects.get(pk=value.pk[1:])
self.created_object_id = int(value.pk[1:])
2017-06-12 14:52:08 +02:00
self.existing_object_pk = None
2017-06-13 03:31:10 +02:00
self._set_object = value
2017-06-12 14:52:08 +02:00
return
model_class_before = self.model_class
2017-06-13 03:31:10 +02:00
self.model_class = type(value._obj) if isinstance(value, ModelInstanceWrapper) else type(value)
2017-06-12 14:52:08 +02:00
if value.pk is None:
self.model_class = model_class_before
raise ValueError('object is not saved yet and cannot be referenced')
self.existing_object_pk = value.pk
self.created_object = None
2017-06-13 03:31:10 +02:00
self._set_object = value
2017-06-12 14:52:08 +02:00
def clean(self):
if self.action == 'delchange':
if self.deletes_change is None:
raise ValidationError('deletes_change has to be set if action is delchange.')
if self.deletes_change.changeset_id != self.changeset_id:
raise ValidationError('deletes_change refers to a change from a different changeset.')
for field_name in ('model_name', 'existing_object_pk', 'created_object', 'field_name', 'field_value'):
if getattr(self, field_name) is not None:
raise ValidationError('%s must not be set if action is delchange.' % field_name)
return
if self.deletes_change is not None:
raise ValidationError('deletes_change can only be set if action is delchange.')
if self.model_name is None:
raise ValidationError('model_name has to be set if action is not delchange.')
try:
2017-06-13 14:49:20 +02:00
# noinspection PyUnusedLocal
2017-06-12 18:29:21 +02:00
tmp = self.model_class if self.action == 'create' else self.obj # noqa
2017-06-12 14:52:08 +02:00
except TypeError as e:
raise ValidationError(str(e))
except ObjectDoesNotExist:
2017-06-12 18:26:24 +02:00
raise ValidationError('existing object does not exist.')
2017-06-12 14:52:08 +02:00
if self.action in ('create', 'delete'):
for field_name in ('field_name', 'field_value'):
if getattr(self, field_name) is not None:
raise ValidationError('%s must not be set if action is create or delete.' % field_name)
2017-06-12 14:59:56 +02:00
def save(self, *args, **kwargs):
2017-06-13 03:31:10 +02:00
self.clean()
if self.pk is not None:
raise TypeError('change objects can not be edited.')
if self.changeset.proposed is not None or self.changeset.applied is not None:
raise TypeError('can not add change object to uneditable changeset.')
super().save(*args, **kwargs)
def delete(self, *args, **kwargs):
raise TypeError('change objects can not be deleted directly.')
def __repr__(self):
2017-06-13 03:31:10 +02:00
result = '<Change #%s on ChangeSet #%s: ' % (str(self.pk), str(self.changeset_id))
if self.action == 'create':
2017-06-13 03:31:10 +02:00
result += 'Create '+repr(self.model_class.__name__)
elif self.action == 'update':
result += 'Update object '+repr(self.obj)+': '+self.field_name+'='+self.field_value
elif self.action == 'delete':
result += 'Delete object '+repr(self.obj)
2017-06-13 14:21:01 +02:00
elif self.action == 'm2m_add':
result += 'Update (m2m) object '+repr(self.obj)+': '+self.field_name+'.add('+self.field_value+')'
elif self.action == 'm2m_remove':
result += 'Update (m2m) object '+repr(self.obj)+': '+self.field_name+'.remove('+self.field_value+')'
result += '>'
return result