get access permissions without signing in

This commit is contained in:
Laura Klünder 2023-12-24 15:40:48 +01:00
parent fef1fb2c06
commit d5ec23a7fb
5 changed files with 148 additions and 71 deletions

View file

@ -118,7 +118,7 @@ def user_detail(request, user): # todo: make class based view
if form.is_valid():
token = form.get_token()
token.save()
token.redeem(user)
token.redeem(user=user)
messages.success(request, _('Access permissions successfully granted.'))
return redirect(request.path_info)
else:

View file

@ -0,0 +1,29 @@
# Generated by Django 4.2.7 on 2023-12-24 13:47
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
("mapdata", "0094_hub_import_prepare"),
]
operations = [
migrations.AddField(
model_name="accesspermission",
name="session_token",
field=models.UUIDField(editable=False, null=True),
),
migrations.AlterField(
model_name="accesspermission",
name="user",
field=models.ForeignKey(
null=True,
on_delete=django.db.models.deletion.CASCADE,
to=settings.AUTH_USER_MODEL,
),
),
]

View file

@ -122,14 +122,26 @@ class AccessPermissionToken(models.Model):
class RedeemError(Exception):
pass
def redeem(self, user=None):
if (user is None and self.redeemed) or (self.accesspermissions.exists() and not self.unlimited):
def redeem(self, /, user=None, request=None):
if user is None and request is not None:
if request.user.is_authenticated:
user = request.user
grant_to = None
if user:
grant_to = {"user": user}
elif request:
grant_to = {
"session_token": request.session.setdefault("accesspermission_session_token", str(uuid.uuid4()))
}
if (grant_to is None and self.redeemed) or (self.accesspermissions.exists() and not self.unlimited):
raise self.RedeemError('Already redeemed.')
if timezone.now() > self.valid_until + timedelta(minutes=5 if self.redeemed else 0):
raise self.RedeemError('No longer valid.')
if user:
if grant_to:
with transaction.atomic():
if self.author_id and self.unique_key:
AccessPermission.objects.filter(author_id=self.author_id, unique_key=self.unique_key).delete()
@ -140,7 +152,7 @@ class AccessPermissionToken(models.Model):
else {"access_restriction_group_id": int(restriction.pk.removeprefix("g"))}
)
AccessPermission.objects.create(
user=user,
**grant_to,
**to_grant,
author_id=self.author_id,
expire_date=restriction.expire_date,
@ -162,7 +174,8 @@ class AccessPermissionToken(models.Model):
class AccessPermission(models.Model):
user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
user = models.ForeignKey(settings.AUTH_USER_MODEL, null=True, on_delete=models.CASCADE)
session_token = models.UUIDField(null=True, editable=False)
access_restriction = models.ForeignKey(AccessRestriction, on_delete=models.CASCADE, null=True)
access_restriction_group = models.ForeignKey(AccessRestrictionGroup, on_delete=models.CASCADE, null=True)
expire_date = models.DateTimeField(null=True, verbose_name=_('expires'))
@ -184,11 +197,33 @@ class AccessPermission(models.Model):
CheckConstraint(check=(~Q(access_restriction__isnull=True, access_restriction_group__isnull=True) &
~Q(access_restriction__isnull=False, access_restriction_group__isnull=False)),
name="permission_needs_restriction_or_restriction_group"),
CheckConstraint(check=(~Q(user__isnull=True, session_token__isnull=True) &
~Q(user__isnull=False, session_token__isnull=False)),
name="permission_needs_user_or_session"),
)
@staticmethod
def user_access_permission_key(user_id):
def build_access_permission_key(*, session_token: str|None = None, user_id: int|None = None):
if session_token:
if user_id:
raise ValueError
return ('mapdata:session_access_permission:%s' % session_token)
elif user_id:
return 'mapdata:user_access_permission:%d' % user_id
raise ValueError
@staticmethod
def request_access_permission_key(request):
if request.user.is_authenticated:
return AccessPermission.build_access_permission_key(user_id=request.user.pk)
return AccessPermission.build_access_permission_key(
session_token=request.session.get("accesspermission_session_token", "NONE")
)
def access_permission_key(self):
if self.user_id:
return AccessPermission.build_access_permission_key(user_id=self.user_id)
return AccessPermission.build_access_permission_key(session_token=self.session_token)
@classmethod
def queryset_for_user(cls, user, can_grant=None):
@ -199,15 +234,27 @@ class AccessPermission(models.Model):
)
@classmethod
def get_for_request_with_expire_date(cls, request, can_grant=None):
if not request.user.is_authenticated:
return {}
def queryset_for_session(cls, session):
session_token = session.get("accesspermission_session_token", None)
if not session_token:
return AccessPermission.objects.none()
return AccessPermission.objects.filter(session_token=session_token).filter(
Q(expire_date__isnull=True) | Q(expire_date__gt=timezone.now())
)
@classmethod
def get_for_request_with_expire_date(cls, request, can_grant=None):
if request.user.is_authenticated:
if request.user_permissions.grant_all_access:
return {pk: None for pk in AccessRestriction.get_all()}
qs = cls.queryset_for_user(request.user, can_grant)
else:
if can_grant:
return {}
qs = cls.queryset_for_session(request.session)
result = tuple(
cls.queryset_for_user(request.user, can_grant).select_related(
qs.select_related(
'access_restriction_group'
).prefetch_related('access_restriction_group__accessrestrictions')
)
@ -230,15 +277,15 @@ class AccessPermission(models.Model):
@classmethod
def get_for_request(cls, request) -> set[int]:
if not request or not request.user.is_authenticated:
if not request:
return AccessRestriction.get_all_public()
if request.user_permissions.grant_all_access:
if request.user.is_authenticated and request.user_permissions.grant_all_access:
return AccessRestriction.get_all()
cache_key = cls.user_access_permission_key(request.user.pk)
cache_key = cls.request_access_permission_key(request)
access_restriction_ids = cache.get(cache_key, None)
if access_restriction_ids is None:
if access_restriction_ids is None or True:
permissions = cls.get_for_request_with_expire_date(request)
access_restriction_ids = set(permissions.keys())
@ -261,12 +308,12 @@ class AccessPermission(models.Model):
def save(self, *args, **kwargs):
with transaction.atomic():
super().save(*args, **kwargs)
transaction.on_commit(lambda: cache.delete(self.user_access_permission_key(self.user_id)))
transaction.on_commit(lambda: cache.delete(self.access_permission_key()))
def delete(self, *args, **kwargs):
with transaction.atomic():
super().delete(*args, **kwargs)
transaction.on_commit(lambda: cache.delete(self.user_access_permission_key(self.user_id)))
transaction.on_commit(lambda: cache.delete(self.access_permission_key()))
class AccessRestrictionMixin(SerializableMixin, models.Model):

View file

@ -17,7 +17,7 @@ def get_user_data(request):
}
if permissions:
result.update({
'title': _('not logged in'),
'title': _('Login'),
'subtitle': ngettext_lazy('%d area unlocked', '%d areas unlocked', len(permissions)) % len(permissions),
'permissions': tuple(permissions),
})

View file

@ -9,6 +9,7 @@ from django.contrib.auth import login, logout
from django.contrib.auth.decorators import login_required
from django.contrib.auth.forms import AuthenticationForm, PasswordChangeForm, UserCreationForm
from django.contrib.auth.views import redirect_to_login
from django.core.cache import cache
from django.core.exceptions import ObjectDoesNotExist, SuspiciousOperation
from django.core.paginator import Paginator
from django.core.serializers.json import DjangoJSONEncoder
@ -30,7 +31,7 @@ from c3nav.api.models import Secret
from c3nav.control.forms import AccessPermissionForm, SignedPermissionDataError
from c3nav.mapdata.grid import grid
from c3nav.mapdata.models import Location, Source
from c3nav.mapdata.models.access import AccessPermissionToken
from c3nav.mapdata.models.access import AccessPermissionToken, AccessPermission
from c3nav.mapdata.models.locations import LocationRedirect, Position, SpecificLocation, get_position_secret
from c3nav.mapdata.models.report import Report, ReportUpdate
from c3nav.mapdata.utils.locations import (get_location_by_id_for_request, get_location_by_slug_for_request,
@ -64,29 +65,35 @@ def check_location(location: Optional[str], request) -> Optional[SpecificLocatio
def map_index(request, mode=None, slug=None, slug2=None, details=None, options=None, nearby=None, pos=None, embed=None):
# check for access token
access_signed_data = request.GET.get('access')
if access_signed_data:
access_token = request.GET.get('access')
if access_token:
with transaction.atomic():
try:
token = AccessPermissionForm.load_signed_data(access_signed_data)
except SignedPermissionDataError as e:
return HttpResponse(str(e).encode(), content_type='text/plain', status=400)
token = AccessPermissionToken.objects.select_for_update().get(token=access_token, redeemed=False,
valid_until__gte=timezone.now())
except AccessPermissionToken.DoesNotExist:
messages.error(request, _('This token does not exist or was already redeemed.'))
else:
num_restrictions = len(token.restrictions)
with transaction.atomic():
token.save()
if not request.user.is_authenticated:
messages.info(request, _('You need to log in to unlock areas.'))
request.session['redeem_token_on_login'] = str(token.token)
token.redeem()
return redirect_to_login(request.path_info, 'site.login')
token.redeem(request.user)
token.redeem(request=request)
token.save()
if request.user.is_authenticated:
messages.success(request, ngettext_lazy('Area successfully unlocked.',
'Areas successfully unlocked.', num_restrictions))
return redirect('site.index')
else:
messages.success(
request,
ngettext_lazy(
'Area successfully unlocked. '
'If you sign in, it will also be saved to your account.',
'Areas successfully unlocked. '
'If you sign in, they will also be saved to your account.',
num_restrictions
)
)
origin = None
destination = None
@ -204,23 +211,14 @@ def close_response(request):
return redirect(redirect_path)
def redeem_token_after_login(request):
token = request.session.pop('redeem_token_on_login', None)
if not token:
return
try:
token = AccessPermissionToken.objects.get(token=token)
except AccessPermissionToken.DoesNotExist:
return
try:
token.redeem(request.user)
except AccessPermissionToken.RedeemError:
messages.error(request, _('Areas could not be unlocked because the token has expired.'))
return
messages.success(request, token.redeem_success_message)
def migrate_access_permissions_after_login(request):
if not request.user.is_authenticated:
raise ValueError
with transaction.atomic():
session_token = request.session.pop("accesspermission_session_token", None)
if session_token:
AccessPermission.objects.filter(session_token=session_token).update(session_token=None, user=request.user)
transaction.on_commit(lambda: cache.delete(AccessPermission.request_access_permission_key(request)))
@never_cache
@ -232,7 +230,7 @@ def login_view(request):
form = AuthenticationForm(request, data=request.POST)
if form.is_valid():
login(request, form.user_cache)
redeem_token_after_login(request)
migrate_access_permissions_after_login(request)
return close_response(request)
else:
form = AuthenticationForm(request)
@ -270,7 +268,7 @@ def register_view(request):
if form.is_valid():
user = form.save()
login(request, user)
redeem_token_after_login(request)
migrate_access_permissions_after_login(request)
return close_response(request)
else:
form = UserCreationForm()
@ -357,17 +355,20 @@ def access_redeem_view(request, token):
num_restrictions = len(token.restrictions)
if request.method == 'POST':
if not request.user.is_authenticated:
messages.info(request, _('You need to log in to unlock areas.'))
request.session['redeem_token_on_login'] = str(token.token)
token.redeem()
return redirect('site.login')
token.redeem(request.user)
token.redeem(request=request)
token.save()
if request.user.is_authenticated:
messages.success(request, ngettext_lazy('Area successfully unlocked.',
'Areas successfully unlocked.', num_restrictions))
else:
messages.success(
request,
ngettext_lazy(
'Area successfully unlocked. If you sign in, it will also be saved to your account.',
'Areas successfully unlocked. If you sign in, they will also be saved to your account.',
num_restrictions
))
return redirect('site.index')
return render(request, 'site/confirm.html', {