redeem access permission signed data
This commit is contained in:
parent
a1c3caaad3
commit
1b2c37ef2b
3 changed files with 125 additions and 12 deletions
|
@ -3,9 +3,11 @@ import hashlib
|
||||||
import hmac
|
import hmac
|
||||||
import json
|
import json
|
||||||
import time
|
import time
|
||||||
from datetime import timedelta
|
from datetime import datetime, timedelta
|
||||||
from itertools import chain
|
from itertools import chain
|
||||||
|
|
||||||
|
import pytz
|
||||||
|
from django.contrib.auth.models import User
|
||||||
from django.db.models import Q
|
from django.db.models import Q
|
||||||
from django.forms import ChoiceField, Form, ModelForm
|
from django.forms import ChoiceField, Form, ModelForm
|
||||||
from django.utils import timezone
|
from django.utils import timezone
|
||||||
|
@ -25,14 +27,17 @@ class UserPermissionsForm(ModelForm):
|
||||||
|
|
||||||
|
|
||||||
class AccessPermissionForm(Form):
|
class AccessPermissionForm(Form):
|
||||||
def __init__(self, request, *args, **kwargs):
|
def __init__(self, request=None, author=None, expire_date=None, *args, **kwargs):
|
||||||
super().__init__(*args, **kwargs)
|
super().__init__(*args, **kwargs)
|
||||||
|
|
||||||
# remember author if this form is saved
|
# remember author if this form is saved
|
||||||
self.author = request.user
|
self.author = author or request.user
|
||||||
|
author_permissions = request.user_permissions if request else author.permissions
|
||||||
|
|
||||||
|
self.expire_date = expire_date
|
||||||
|
|
||||||
# determine which access permissions the author can grant
|
# determine which access permissions the author can grant
|
||||||
if not request.user_permissions.grant_all_access:
|
if not author_permissions.grant_all_access:
|
||||||
self.author_access_permissions = {
|
self.author_access_permissions = {
|
||||||
pk: expire_date for pk, expire_date in self.author.accesspermissions.filter(
|
pk: expire_date for pk, expire_date in self.author.accesspermissions.filter(
|
||||||
Q(can_grant=True) & (Q(expire_date__isnull=True) | Q(expire_date__lt=timezone.now()))
|
Q(can_grant=True) & (Q(expire_date__isnull=True) | Q(expire_date__lt=timezone.now()))
|
||||||
|
@ -91,7 +96,7 @@ class AccessPermissionForm(Form):
|
||||||
self.fields['expires'] = ChoiceField(required=False, initial='60', choices=expire_choices)
|
self.fields['expires'] = ChoiceField(required=False, initial='60', choices=expire_choices)
|
||||||
|
|
||||||
# if applicable, add field to grant pass on permissions
|
# if applicable, add field to grant pass on permissions
|
||||||
if request.user_permissions.grant_all_access:
|
if author_permissions.grant_all_access:
|
||||||
choices = [('0', '---')]*6 + [('1', _('can pass on'))] + [('0', '---')]*3
|
choices = [('0', '---')]*6 + [('1', _('can pass on'))] + [('0', '---')]*3
|
||||||
self.fields['can_grant'] = ChoiceField(required=False, initial='60', choices=choices)
|
self.fields['can_grant'] = ChoiceField(required=False, initial='60', choices=choices)
|
||||||
|
|
||||||
|
@ -108,11 +113,12 @@ class AccessPermissionForm(Form):
|
||||||
def save(self, user):
|
def save(self, user):
|
||||||
self._save_code(self._create_code(), user)
|
self._save_code(self._create_code(), user)
|
||||||
|
|
||||||
def get_token(self):
|
def get_token(self, unique_key=None):
|
||||||
# create an AccessPermissionToken from this form and return it
|
# create an AccessPermissionToken from this form and return it
|
||||||
restrictions = []
|
restrictions = []
|
||||||
|
default_expire_date = self.expire_date or self.cleaned_data['expires']
|
||||||
for restriction in self.cleaned_data['access_restrictions']:
|
for restriction in self.cleaned_data['access_restrictions']:
|
||||||
expire_date = self.cleaned_data['expires']
|
expire_date = default_expire_date
|
||||||
author_expire_date = self.author_access_permissions.get(restriction.pk)
|
author_expire_date = self.author_access_permissions.get(restriction.pk)
|
||||||
# make sure that each permission is not granted for a longer time than the author has it
|
# make sure that each permission is not granted for a longer time than the author has it
|
||||||
if author_expire_date is not None:
|
if author_expire_date is not None:
|
||||||
|
@ -121,7 +127,8 @@ class AccessPermissionForm(Form):
|
||||||
title=restriction.title))
|
title=restriction.title))
|
||||||
return AccessPermissionToken(author=self.author,
|
return AccessPermissionToken(author=self.author,
|
||||||
can_grant=self.cleaned_data.get('can_grant', '0') == '1',
|
can_grant=self.cleaned_data.get('can_grant', '0') == '1',
|
||||||
restrictions=tuple(restrictions))
|
restrictions=tuple(restrictions),
|
||||||
|
unique_key=unique_key)
|
||||||
|
|
||||||
def get_signed_data(self, key=None):
|
def get_signed_data(self, key=None):
|
||||||
if not self.author.permissions.api_secret:
|
if not self.author.permissions.api_secret:
|
||||||
|
@ -136,10 +143,87 @@ class AccessPermissionForm(Form):
|
||||||
data['key'] = key
|
data['key'] = key
|
||||||
data = json.dumps(data, separators=(',', ':'))
|
data = json.dumps(data, separators=(',', ':'))
|
||||||
signature = hmac.new(self.author.permissions.api_secret.encode(),
|
signature = hmac.new(self.author.permissions.api_secret.encode(),
|
||||||
msg=data.encode(),
|
msg=data.encode(), digestmod=hashlib.sha256).digest()
|
||||||
digestmod=hashlib.sha256).digest()
|
|
||||||
return '%s:%s' % (data, binascii.b2a_base64(signature).strip().decode())
|
return '%s:%s' % (data, binascii.b2a_base64(signature).strip().decode())
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def load_signed_data(cls, signed_data: str):
|
||||||
|
if ':' not in signed_data:
|
||||||
|
raise SignedPermissionDataError('Invalid data.')
|
||||||
|
|
||||||
|
raw_data, signature = signed_data.rsplit(':', 1)
|
||||||
|
|
||||||
|
try:
|
||||||
|
data = json.loads(raw_data)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
raise SignedPermissionDataError('Invalid JSON.')
|
||||||
|
|
||||||
|
try:
|
||||||
|
restrictions = data.pop('id')
|
||||||
|
author_id = data.pop('author')
|
||||||
|
issue_time = data.pop('time')
|
||||||
|
valid_until = data.pop('valid_until')
|
||||||
|
unique_key = data.pop('key', None)
|
||||||
|
except KeyError as e:
|
||||||
|
raise SignedPermissionDataError('Missing %s.' % str(e))
|
||||||
|
|
||||||
|
for unknown_key in data:
|
||||||
|
raise SignedPermissionDataError('Unknown value: %s' % unknown_key)
|
||||||
|
|
||||||
|
try:
|
||||||
|
issue_time = int(issue_time)
|
||||||
|
except ValueError:
|
||||||
|
raise SignedPermissionDataError('Invalid time.')
|
||||||
|
|
||||||
|
try:
|
||||||
|
valid_until = int(valid_until) if valid_until is not None else None
|
||||||
|
except ValueError:
|
||||||
|
raise SignedPermissionDataError('Invalid valid_until.')
|
||||||
|
else:
|
||||||
|
valid_until = valid_until and datetime.utcfromtimestamp(valid_until).replace(tzinfo=pytz.utc)
|
||||||
|
|
||||||
|
try:
|
||||||
|
author_id = int(author_id)
|
||||||
|
except ValueError:
|
||||||
|
raise SignedPermissionDataError('Invalid author.')
|
||||||
|
|
||||||
|
if unique_key is not None and not isinstance(unique_key, str):
|
||||||
|
raise SignedPermissionDataError('key has to be null or a string.')
|
||||||
|
|
||||||
|
if issue_time > time.time()+5:
|
||||||
|
raise SignedPermissionDataError('time cannot be in the future.')
|
||||||
|
if issue_time < time.time()-60:
|
||||||
|
raise SignedPermissionDataError('token has expired.')
|
||||||
|
if unique_key is not None and not (1 <= len(unique_key) <= 32):
|
||||||
|
raise SignedPermissionDataError('key has to be 1-32 characters')
|
||||||
|
|
||||||
|
try:
|
||||||
|
author = User.objects.select_related('permissions').get(pk=author_id)
|
||||||
|
except User.DoesNotExist:
|
||||||
|
raise SignedPermissionDataError('Author does not exist.')
|
||||||
|
|
||||||
|
try:
|
||||||
|
api_secret = author.permissions.api_secret
|
||||||
|
except AttributeError:
|
||||||
|
raise SignedPermissionDataError('Author has no API secret.')
|
||||||
|
|
||||||
|
verify_signature = binascii.b2a_base64(hmac.new(api_secret.encode(),
|
||||||
|
msg=raw_data.encode(), digestmod=hashlib.sha256).digest())
|
||||||
|
print(verify_signature, signature)
|
||||||
|
if signature != verify_signature.strip().decode():
|
||||||
|
raise SignedPermissionDataError('Invalid signature.')
|
||||||
|
|
||||||
|
form = cls(author=author, expire_date=valid_until, data={
|
||||||
|
'access_restrictions': str(restrictions),
|
||||||
|
})
|
||||||
|
if not form.is_valid():
|
||||||
|
raise SignedPermissionDataError(' '.join(form.errors))
|
||||||
|
return form.get_token(unique_key=unique_key)
|
||||||
|
|
||||||
|
|
||||||
|
class SignedPermissionDataError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class AnnouncementForm(I18nModelFormMixin, ModelForm):
|
class AnnouncementForm(I18nModelFormMixin, ModelForm):
|
||||||
class Meta:
|
class Meta:
|
||||||
|
|
|
@ -164,7 +164,8 @@ def grant_access(request):
|
||||||
token = form.get_token()
|
token = form.get_token()
|
||||||
token.save()
|
token.save()
|
||||||
if settings.DEBUG and request.user_permissions.api_secret:
|
if settings.DEBUG and request.user_permissions.api_secret:
|
||||||
print(form.get_signed_data())
|
signed_data = form.get_signed_data()
|
||||||
|
print(signed_data)
|
||||||
return redirect(reverse('control.access.qr', kwargs={'token': token.token}))
|
return redirect(reverse('control.access.qr', kwargs={'token': token.token}))
|
||||||
else:
|
else:
|
||||||
form = AccessPermissionForm(request=request)
|
form = AccessPermissionForm(request=request)
|
||||||
|
|
|
@ -7,6 +7,7 @@ from django.contrib import messages
|
||||||
from django.contrib.auth import login, logout
|
from django.contrib.auth import login, logout
|
||||||
from django.contrib.auth.decorators import login_required
|
from django.contrib.auth.decorators import login_required
|
||||||
from django.contrib.auth.forms import AuthenticationForm, PasswordChangeForm, UserCreationForm
|
from django.contrib.auth.forms import AuthenticationForm, PasswordChangeForm, UserCreationForm
|
||||||
|
from django.contrib.auth.views import redirect_to_login
|
||||||
from django.core.serializers.json import DjangoJSONEncoder
|
from django.core.serializers.json import DjangoJSONEncoder
|
||||||
from django.db import transaction
|
from django.db import transaction
|
||||||
from django.http import HttpResponse, HttpResponseBadRequest
|
from django.http import HttpResponse, HttpResponseBadRequest
|
||||||
|
@ -20,6 +21,7 @@ from django.views.decorators.cache import cache_control, never_cache
|
||||||
from django.views.decorators.clickjacking import xframe_options_exempt
|
from django.views.decorators.clickjacking import xframe_options_exempt
|
||||||
from django.views.decorators.http import etag
|
from django.views.decorators.http import etag
|
||||||
|
|
||||||
|
from c3nav.control.forms import AccessPermissionForm, SignedPermissionDataError
|
||||||
from c3nav.mapdata.models import Location, Source
|
from c3nav.mapdata.models import Location, Source
|
||||||
from c3nav.mapdata.models.access import AccessPermissionToken
|
from c3nav.mapdata.models.access import AccessPermissionToken
|
||||||
from c3nav.mapdata.models.locations import LocationRedirect, SpecificLocation
|
from c3nav.mapdata.models.locations import LocationRedirect, SpecificLocation
|
||||||
|
@ -50,6 +52,32 @@ def check_location(location: Optional[str], request) -> Optional[SpecificLocatio
|
||||||
|
|
||||||
def map_index(request, mode=None, slug=None, slug2=None, details=None, options=None,
|
def map_index(request, mode=None, slug=None, slug2=None, details=None, options=None,
|
||||||
level=None, x=None, y=None, zoom=None, embed=None):
|
level=None, x=None, y=None, zoom=None, embed=None):
|
||||||
|
|
||||||
|
# check for access token
|
||||||
|
access_signed_data = request.GET.get('access')
|
||||||
|
if access_signed_data:
|
||||||
|
try:
|
||||||
|
token = AccessPermissionForm.load_signed_data(access_signed_data)
|
||||||
|
except SignedPermissionDataError as e:
|
||||||
|
return HttpResponse(str(e).encode(), content_type='text/plain', status=400)
|
||||||
|
|
||||||
|
num_restrictions = len(token.restrictions)
|
||||||
|
with transaction.atomic():
|
||||||
|
token.save()
|
||||||
|
|
||||||
|
if not request.user.is_authenticated:
|
||||||
|
messages.info(request, _('You need to log in to unlock areas.'))
|
||||||
|
request.session['redeem_token_on_login'] = str(token.token)
|
||||||
|
token.redeem()
|
||||||
|
return redirect('site.login')
|
||||||
|
|
||||||
|
token.redeem(request.user)
|
||||||
|
token.save()
|
||||||
|
|
||||||
|
messages.success(request, ungettext_lazy('Area successfully unlocked.',
|
||||||
|
'Areas successfully unlocked.', num_restrictions))
|
||||||
|
return redirect('site.index')
|
||||||
|
|
||||||
origin = None
|
origin = None
|
||||||
destination = None
|
destination = None
|
||||||
routing = False
|
routing = False
|
||||||
|
@ -260,7 +288,7 @@ def access_redeem_view(request, token):
|
||||||
messages.info(request, _('You need to log in to unlock areas.'))
|
messages.info(request, _('You need to log in to unlock areas.'))
|
||||||
request.session['redeem_token_on_login'] = str(token.token)
|
request.session['redeem_token_on_login'] = str(token.token)
|
||||||
token.redeem()
|
token.redeem()
|
||||||
return redirect('site.login')
|
return redirect_to_login(request.path_info, 'site.login')
|
||||||
|
|
||||||
token.redeem(request.user)
|
token.redeem(request.user)
|
||||||
token.save()
|
token.save()
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue