team-3/src/c3nav/control/forms.py

236 lines
9.5 KiB
Python
Raw Normal View History

import binascii
import hashlib
import hmac
import json
import time
2017-12-19 15:46:43 +01:00
from datetime import datetime, timedelta
2017-12-08 21:31:53 +01:00
from itertools import chain
2017-12-19 15:46:43 +01:00
import pytz
from django.contrib.auth.models import User
2017-12-08 21:31:53 +01:00
from django.db.models import Q
2017-12-08 21:54:04 +01:00
from django.forms import ChoiceField, Form, ModelForm
2017-12-08 21:31:53 +01:00
from django.utils import timezone
from django.utils.translation import ugettext_lazy as _
from django.utils.translation import ungettext_lazy
2017-12-08 18:41:48 +01:00
from c3nav.control.models import UserPermissions
2017-12-10 15:23:53 +01:00
from c3nav.mapdata.forms import I18nModelFormMixin
2017-12-10 14:13:20 +01:00
from c3nav.mapdata.models.access import AccessPermissionToken, AccessPermissionTokenItem, AccessRestriction
2017-12-10 15:23:53 +01:00
from c3nav.site.models import Announcement
2017-12-08 18:41:48 +01:00
class UserPermissionsForm(ModelForm):
class Meta:
model = UserPermissions
2017-12-19 12:00:10 +01:00
exclude = ('user', 'max_changeset_changes', 'api_secret')
2017-12-08 21:31:53 +01:00
class AccessPermissionForm(Form):
2017-12-19 15:46:43 +01:00
def __init__(self, request=None, author=None, expire_date=None, *args, **kwargs):
2017-12-08 21:31:53 +01:00
super().__init__(*args, **kwargs)
2017-12-14 23:35:18 +01:00
# remember author if this form is saved
2017-12-19 15:46:43 +01:00
self.author = author or request.user
author_permissions = request.user_permissions if request else author.permissions
self.expire_date = expire_date
2017-12-08 21:31:53 +01:00
2017-12-14 23:35:18 +01:00
# determine which access permissions the author can grant
2017-12-19 15:46:43 +01:00
if not author_permissions.grant_all_access:
2017-12-08 21:31:53 +01:00
self.author_access_permissions = {
pk: expire_date for pk, expire_date in self.author.accesspermissions.filter(
Q(can_grant=True) & (Q(expire_date__isnull=True) | Q(expire_date__lt=timezone.now()))
).values_list('access_restriction_id', 'expire_date')
}
access_restrictions = AccessRestriction.objects.filter(
pk__in=self.author_access_permissions.keys()
)
else:
self.author_access_permissions = {}
access_restrictions = AccessRestriction.objects.all()
self.access_restrictions = {
access_restriction.pk: access_restriction
for access_restriction in access_restrictions
}
self.access_restriction_choices = {
'all': self.access_restrictions.values(),
**{str(pk): (access_restriction, ) for pk, access_restriction in self.access_restrictions.items()}
}
2017-12-14 23:35:18 +01:00
# construct choice field for access permissions
2017-12-08 21:31:53 +01:00
choices = [('', _('choose permissions…')),
('all', ungettext_lazy('everything possible (%d permission)',
'everything possible (%d permissions)',
len(access_restrictions)) % len(access_restrictions))]
choices.append((_('Access Permissions'), tuple(
(str(pk), access_restriction.title)
for pk, access_restriction in self.access_restrictions.items()
)))
2017-12-08 21:54:04 +01:00
self.fields['access_restrictions'] = ChoiceField(choices=choices, required=True)
2017-12-08 21:31:53 +01:00
2017-12-14 23:35:18 +01:00
# construct choices for the expire field
2017-12-08 21:31:53 +01:00
expire_choices = [
('', _('never')),
]
for minutes in range(15, 60, 15):
expire_choices.append(
(str(minutes), ungettext_lazy('in %d minute', 'in %d minutes', minutes) % minutes))
for hours in chain(range(1, 6), range(6, 24, 6)):
expire_choices.append(
(str(hours*60), ungettext_lazy('in %d hour', 'in %d hours', hours) % hours)
)
expire_choices.insert(
5, (str(90), _('in 1½ hour'))
)
for days in range(1, 14):
expire_choices.append(
(str(days*24*60), ungettext_lazy('in %d day', 'in %d days', days) % days)
)
2017-12-08 21:54:04 +01:00
self.fields['expires'] = ChoiceField(required=False, initial='60', choices=expire_choices)
2017-12-08 21:31:53 +01:00
2017-12-14 23:35:18 +01:00
# if applicable, add field to grant pass on permissions
2017-12-19 15:46:43 +01:00
if author_permissions.grant_all_access:
2017-12-08 21:54:04 +01:00
choices = [('0', '---')]*6 + [('1', _('can pass on'))] + [('0', '---')]*3
self.fields['can_grant'] = ChoiceField(required=False, initial='60', choices=choices)
2017-12-08 21:31:53 +01:00
def clean_access_restrictions(self):
data = self.cleaned_data['access_restrictions']
return self.access_restriction_choices[data]
def clean_expires(self):
data = self.cleaned_data['expires']
if data == '':
return None
return timezone.now()+timedelta(minutes=int(data))
def save(self, user):
self._save_code(self._create_code(), user)
2017-12-19 15:46:43 +01:00
def get_token(self, unique_key=None):
2017-12-14 23:35:18 +01:00
# create an AccessPermissionToken from this form and return it
2017-12-08 21:31:53 +01:00
restrictions = []
2017-12-19 15:46:43 +01:00
default_expire_date = self.expire_date or self.cleaned_data['expires']
2017-12-08 21:31:53 +01:00
for restriction in self.cleaned_data['access_restrictions']:
2017-12-19 15:46:43 +01:00
expire_date = default_expire_date
2017-12-10 14:13:20 +01:00
author_expire_date = self.author_access_permissions.get(restriction.pk)
2017-12-14 23:57:59 +01:00
# make sure that each permission is not granted for a longer time than the author has it
2017-12-10 14:13:20 +01:00
if author_expire_date is not None:
expire_date = author_expire_date if expire_date is None else min(expire_date, author_expire_date)
restrictions.append(AccessPermissionTokenItem(pk=restriction.pk, expire_date=expire_date,
title=restriction.title))
2017-12-10 03:16:07 +01:00
return AccessPermissionToken(author=self.author,
can_grant=self.cleaned_data.get('can_grant', '0') == '1',
2017-12-19 15:46:43 +01:00
restrictions=tuple(restrictions),
unique_key=unique_key)
2017-12-10 15:23:53 +01:00
def get_signed_data(self, key=None):
if not self.author.permissions.api_secret:
raise ValueError('Author has no api secret.')
data = {
'id': self.data['access_restrictions'],
'time': int(time.time()),
'valid_until': int(self.cleaned_data['expires'].strftime('%s')),
'author': self.author.pk,
}
if key is not None:
data['key'] = key
data = json.dumps(data, separators=(',', ':'))
signature = hmac.new(self.author.permissions.api_secret.encode(),
2017-12-19 15:46:43 +01:00
msg=data.encode(), digestmod=hashlib.sha256).digest()
return '%s:%s' % (data, binascii.b2a_base64(signature).strip().decode())
2017-12-19 15:46:43 +01:00
@classmethod
def load_signed_data(cls, signed_data: str):
if ':' not in signed_data:
raise SignedPermissionDataError('Invalid data.')
raw_data, signature = signed_data.rsplit(':', 1)
try:
data = json.loads(raw_data)
except json.JSONDecodeError:
raise SignedPermissionDataError('Invalid JSON.')
try:
restrictions = data.pop('id')
author_id = data.pop('author')
issue_time = data.pop('time')
valid_until = data.pop('valid_until')
unique_key = data.pop('key', None)
except KeyError as e:
raise SignedPermissionDataError('Missing %s.' % str(e))
for unknown_key in data:
raise SignedPermissionDataError('Unknown value: %s' % unknown_key)
try:
issue_time = int(issue_time)
except ValueError:
raise SignedPermissionDataError('Invalid time.')
try:
valid_until = int(valid_until) if valid_until is not None else None
except ValueError:
raise SignedPermissionDataError('Invalid valid_until.')
else:
valid_until = valid_until and datetime.utcfromtimestamp(valid_until).replace(tzinfo=pytz.utc)
try:
author_id = int(author_id)
except ValueError:
raise SignedPermissionDataError('Invalid author.')
if unique_key is not None and not isinstance(unique_key, str):
raise SignedPermissionDataError('key has to be null or a string.')
if issue_time > time.time()+5:
raise SignedPermissionDataError('time cannot be in the future.')
if issue_time < time.time()-60:
raise SignedPermissionDataError('token has expired.')
if unique_key is not None and not (1 <= len(unique_key) <= 32):
raise SignedPermissionDataError('key has to be 1-32 characters')
try:
author = User.objects.select_related('permissions').get(pk=author_id)
except User.DoesNotExist:
raise SignedPermissionDataError('Author does not exist.')
try:
api_secret = author.permissions.api_secret
except AttributeError:
raise SignedPermissionDataError('Author has no API secret.')
verify_signature = binascii.b2a_base64(hmac.new(api_secret.encode(),
msg=raw_data.encode(), digestmod=hashlib.sha256).digest())
print(verify_signature, signature)
if signature != verify_signature.strip().decode():
raise SignedPermissionDataError('Invalid signature.')
form = cls(author=author, expire_date=valid_until, data={
'access_restrictions': str(restrictions),
})
if not form.is_valid():
raise SignedPermissionDataError(' '.join(form.errors))
return form.get_token(unique_key=unique_key)
class SignedPermissionDataError(Exception):
pass
2017-12-10 15:23:53 +01:00
class AnnouncementForm(I18nModelFormMixin, ModelForm):
class Meta:
model = Announcement
fields = ('text', 'active', 'active_until')
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.fields['active_until'].initial = timezone.now()