access permissions can now also be given by group
This commit is contained in:
parent
c4a8e1d874
commit
3477354688
6 changed files with 139 additions and 28 deletions
|
@ -3,8 +3,9 @@ from typing import Annotated, Any, Literal, Union
|
||||||
|
|
||||||
from django.utils.functional import Promise
|
from django.utils.functional import Promise
|
||||||
from ninja import Schema
|
from ninja import Schema
|
||||||
from pydantic import Discriminator, model_validator
|
from pydantic import Discriminator
|
||||||
from pydantic import Field as APIField
|
from pydantic import Field as APIField
|
||||||
|
from pydantic import model_validator
|
||||||
from pydantic.functional_validators import ModelWrapValidatorHandler
|
from pydantic.functional_validators import ModelWrapValidatorHandler
|
||||||
from pydantic_core.core_schema import ValidationInfo
|
from pydantic_core.core_schema import ValidationInfo
|
||||||
|
|
||||||
|
|
|
@ -6,6 +6,7 @@ import time
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from datetime import timezone as dt_timezone
|
from datetime import timezone as dt_timezone
|
||||||
from itertools import chain
|
from itertools import chain
|
||||||
|
from typing import Sequence
|
||||||
|
|
||||||
from django.contrib.auth.models import User
|
from django.contrib.auth.models import User
|
||||||
from django.db.models import Prefetch
|
from django.db.models import Prefetch
|
||||||
|
@ -54,38 +55,52 @@ class AccessPermissionForm(Form):
|
||||||
pk__in=self.author_access_permissions.keys()
|
pk__in=self.author_access_permissions.keys()
|
||||||
)
|
)
|
||||||
|
|
||||||
self.access_restrictions = {
|
self.access_restrictions: dict[int: AccessRestriction] = {
|
||||||
access_restriction.pk: access_restriction
|
access_restriction.pk: access_restriction
|
||||||
for access_restriction in access_restrictions
|
for access_restriction in access_restrictions
|
||||||
}
|
}
|
||||||
access_restrictions_ids = set(self.access_restrictions.keys())
|
access_restrictions_ids = set(self.access_restrictions.keys())
|
||||||
|
|
||||||
self.access_restriction_choices = {
|
self.access_restriction_choices: dict[str, Sequence[int, str]] = {
|
||||||
'all': self.access_restrictions.values(),
|
**{str(pk): (pk, ) for pk, access_restriction in self.access_restrictions.items()}
|
||||||
**{str(pk): (access_restriction, ) for pk, access_restriction in self.access_restrictions.items()}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
# get access permission groups
|
# get access permission groups
|
||||||
groups = AccessRestrictionGroup.qs_for_request(request).prefetch_related(
|
groups = AccessRestrictionGroup.qs_for_request(request).prefetch_related(
|
||||||
Prefetch('accessrestrictions', AccessRestriction.objects.only('pk'))
|
Prefetch('accessrestrictions', AccessRestriction.objects.only('pk'))
|
||||||
)
|
)
|
||||||
group_contents = {
|
self.group_contents: dict[int, set[int]] = {
|
||||||
group.pk: set(r.pk for r in group.accessrestrictions.all())
|
group.pk: set(r.pk for r in group.accessrestrictions.all())
|
||||||
for group in groups
|
for group in groups
|
||||||
}
|
}
|
||||||
group_contents = {
|
self.group_contents = {
|
||||||
pk: restrictions for pk, restrictions in group_contents.items()
|
pk: restrictions for pk, restrictions in self.group_contents.items()
|
||||||
if not (restrictions - access_restrictions_ids)
|
if not (restrictions - access_restrictions_ids)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
self.titles = {
|
||||||
|
**{r.pk: r.title for r in access_restrictions},
|
||||||
|
**{('g%d' % g.pk): g.title for g in groups},
|
||||||
|
}
|
||||||
|
|
||||||
self.access_restriction_choices.update({
|
self.access_restriction_choices.update({
|
||||||
('g%d' % pk): tuple(
|
('g%d' % pk): (('g%d' % pk),)
|
||||||
self.access_restrictions[restriction] for restriction in restrictions
|
for pk, restrictions in self.group_contents.items()
|
||||||
) for pk, restrictions in group_contents.items()
|
|
||||||
})
|
})
|
||||||
|
|
||||||
|
restrictions_not_in_group: set[int] = access_restrictions_ids
|
||||||
|
for restrictions in self.group_contents.values():
|
||||||
|
restrictions_not_in_group -= restrictions
|
||||||
|
|
||||||
|
self.access_restriction_choices.update({
|
||||||
|
"all": tuple(('g%d' % pk) for pk in self.group_contents.keys()) + tuple(restrictions_not_in_group),
|
||||||
|
})
|
||||||
|
|
||||||
|
from pprint import pprint
|
||||||
|
pprint(self.access_restriction_choices)
|
||||||
|
|
||||||
# construct choice field for access permissions
|
# construct choice field for access permissions
|
||||||
choices = [('', _('choose permissions…')),
|
choices = [('', _('choose permissions…')), # noqa
|
||||||
('all', ngettext_lazy('everything possible (%d permission)',
|
('all', ngettext_lazy('everything possible (%d permission)',
|
||||||
'everything possible (%d permissions)',
|
'everything possible (%d permissions)',
|
||||||
len(access_restrictions)) % len(access_restrictions))]
|
len(access_restrictions)) % len(access_restrictions))]
|
||||||
|
@ -147,12 +162,22 @@ class AccessPermissionForm(Form):
|
||||||
default_expire_date = self.expire_date or self.cleaned_data['expires']
|
default_expire_date = self.expire_date or self.cleaned_data['expires']
|
||||||
for restriction in self.cleaned_data['access_restrictions']:
|
for restriction in self.cleaned_data['access_restrictions']:
|
||||||
expire_date = default_expire_date
|
expire_date = default_expire_date
|
||||||
author_expire_date = self.author_access_permissions.get(restriction.pk)
|
|
||||||
|
if isinstance(restriction, int):
|
||||||
|
author_expire_date = self.author_access_permissions.get(restriction)
|
||||||
|
else:
|
||||||
|
author_expire_date = min(
|
||||||
|
(d for d in (self.author_access_permissions.get(i)
|
||||||
|
for i in self.group_contents[int(restriction.removeprefix('g'))])
|
||||||
|
if d is not None),
|
||||||
|
default=None,
|
||||||
|
)
|
||||||
|
|
||||||
# make sure that each permission is not granted for a longer time than the author has it
|
# make sure that each permission is not granted for a longer time than the author has it
|
||||||
if author_expire_date is not None:
|
if author_expire_date is not None:
|
||||||
expire_date = author_expire_date if expire_date is None else min(expire_date, author_expire_date)
|
expire_date = author_expire_date if expire_date is None else min(expire_date, author_expire_date)
|
||||||
restrictions.append(AccessPermissionTokenItem(pk=restriction.pk, expire_date=expire_date,
|
restrictions.append(AccessPermissionTokenItem(pk=restriction, expire_date=expire_date,
|
||||||
title=restriction.title))
|
title=self.titles[restriction]))
|
||||||
return AccessPermissionToken(author=self.author,
|
return AccessPermissionToken(author=self.author,
|
||||||
can_grant=self.cleaned_data.get('can_grant', '0') == '1',
|
can_grant=self.cleaned_data.get('can_grant', '0') == '1',
|
||||||
restrictions=tuple(restrictions),
|
restrictions=tuple(restrictions),
|
||||||
|
@ -160,7 +185,7 @@ class AccessPermissionForm(Form):
|
||||||
|
|
||||||
def get_signed_data(self, key=None):
|
def get_signed_data(self, key=None):
|
||||||
try:
|
try:
|
||||||
api_secret = self.author.api_secrets.filter(scope_grant_permission=True).valid_only().get().api_secret
|
api_secret = self.author.api_secrets.filter(scope_grant_permissions=True).valid_only().get().api_secret
|
||||||
except Secret.DoesNotExist:
|
except Secret.DoesNotExist:
|
||||||
raise ValueError('Author has no feasible api secret.')
|
raise ValueError('Author has no feasible api secret.')
|
||||||
data = {
|
data = {
|
||||||
|
@ -172,7 +197,7 @@ class AccessPermissionForm(Form):
|
||||||
if key is not None:
|
if key is not None:
|
||||||
data['key'] = key
|
data['key'] = key
|
||||||
data = json.dumps(data, separators=(',', ':'))
|
data = json.dumps(data, separators=(',', ':'))
|
||||||
signature = hmac.new(api_secret, msg=data.encode(), digestmod=hashlib.sha256).digest()
|
signature = hmac.new(api_secret.encode(), msg=data.encode(), digestmod=hashlib.sha256).digest()
|
||||||
return '%s:%s' % (data, binascii.b2a_base64(signature).strip().decode())
|
return '%s:%s' % (data, binascii.b2a_base64(signature).strip().decode())
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
|
|
@ -12,7 +12,7 @@ from c3nav.control.forms import AccessPermissionForm, UserPermissionsForm, UserS
|
||||||
from c3nav.control.models import UserPermissions, UserSpaceAccess
|
from c3nav.control.models import UserPermissions, UserSpaceAccess
|
||||||
from c3nav.control.views.base import ControlPanelMixin, control_panel_view
|
from c3nav.control.views.base import ControlPanelMixin, control_panel_view
|
||||||
from c3nav.mapdata.models import AccessRestriction
|
from c3nav.mapdata.models import AccessRestriction
|
||||||
from c3nav.mapdata.models.access import AccessPermission
|
from c3nav.mapdata.models.access import AccessPermission, AccessRestrictionGroup
|
||||||
|
|
||||||
|
|
||||||
class UserListView(ControlPanelMixin, ListView):
|
class UserListView(ControlPanelMixin, ListView):
|
||||||
|
@ -37,7 +37,9 @@ def user_detail(request, user): # todo: make class based view
|
||||||
'permissions',
|
'permissions',
|
||||||
).prefetch_related(
|
).prefetch_related(
|
||||||
Prefetch('spaceaccesses', UserSpaceAccess.objects.select_related('space')),
|
Prefetch('spaceaccesses', UserSpaceAccess.objects.select_related('space')),
|
||||||
Prefetch('accesspermissions', AccessPermission.objects.select_related('access_restriction', 'author'))
|
Prefetch('accesspermissions', AccessPermission.objects.select_related(
|
||||||
|
'access_restriction', 'access_restriction_group', 'author'
|
||||||
|
))
|
||||||
)
|
)
|
||||||
user = get_object_or_404(qs, pk=user)
|
user = get_object_or_404(qs, pk=user)
|
||||||
|
|
||||||
|
@ -99,6 +101,17 @@ def user_detail(request, user): # todo: make class based view
|
||||||
access_restriction=restriction
|
access_restriction=restriction
|
||||||
).order_by('expire_date')
|
).order_by('expire_date')
|
||||||
})
|
})
|
||||||
|
elif restriction and restriction.startswith("g") and restriction[1:].isdigit():
|
||||||
|
restriction_group = get_object_or_404(AccessRestrictionGroup, pk=int(restriction[1:]))
|
||||||
|
permissions = user.accesspermissions.filter(access_restriction_group=restriction_group).order_by('expire_date')
|
||||||
|
for permission in permissions:
|
||||||
|
permission.expired = permission.expire_date and permission.expire_date >= now
|
||||||
|
ctx.update({
|
||||||
|
'access_restriction': restriction_group,
|
||||||
|
'access_permissions': user.accesspermissions.filter(
|
||||||
|
access_restriction_group=restriction_group
|
||||||
|
).order_by('expire_date')
|
||||||
|
})
|
||||||
else:
|
else:
|
||||||
if request.method == 'POST' and request.POST.get('submit_access_permissions'):
|
if request.method == 'POST' and request.POST.get('submit_access_permissions'):
|
||||||
form = AccessPermissionForm(request=request, data=request.POST)
|
form = AccessPermissionForm(request=request, data=request.POST)
|
||||||
|
@ -113,11 +126,17 @@ def user_detail(request, user): # todo: make class based view
|
||||||
|
|
||||||
access_permissions = {}
|
access_permissions = {}
|
||||||
for permission in user.accesspermissions.all():
|
for permission in user.accesspermissions.all():
|
||||||
access_permissions.setdefault(permission.access_restriction_id, []).append(permission)
|
access_permissions.setdefault(
|
||||||
|
permission.access_restriction_id or ("g%d" % permission.access_restriction_group_id), []
|
||||||
|
).append(permission)
|
||||||
access_permissions = tuple(
|
access_permissions = tuple(
|
||||||
{
|
{
|
||||||
'pk': pk,
|
'pk': pk,
|
||||||
'title': permissions[0].access_restriction.title,
|
'title': (
|
||||||
|
permissions[0].access_restriction.title
|
||||||
|
if permissions[0].access_restriction_id
|
||||||
|
else permissions[0].access_restriction_group.title
|
||||||
|
),
|
||||||
'can_grant': any(item.can_grant for item in permissions),
|
'can_grant': any(item.can_grant for item in permissions),
|
||||||
'expire_date': set(item.expire_date for item in permissions),
|
'expire_date': set(item.expire_date for item in permissions),
|
||||||
} for pk, permissions in access_permissions.items()
|
} for pk, permissions in access_permissions.items()
|
||||||
|
|
|
@ -3,7 +3,7 @@ from typing import Annotated, Literal, Optional, Union
|
||||||
from pydantic import Field as APIField
|
from pydantic import Field as APIField
|
||||||
from pydantic import PositiveInt
|
from pydantic import PositiveInt
|
||||||
|
|
||||||
from c3nav.api.schema import AnyGeometrySchema, GeometrySchema, LineSchema, BaseSchema
|
from c3nav.api.schema import AnyGeometrySchema, BaseSchema, GeometrySchema, LineSchema
|
||||||
from c3nav.api.utils import NonEmptyStr
|
from c3nav.api.utils import NonEmptyStr
|
||||||
|
|
||||||
GeometryStylesSchema = Annotated[
|
GeometryStylesSchema = Annotated[
|
||||||
|
|
|
@ -0,0 +1,49 @@
|
||||||
|
# Generated by Django 4.2.7 on 2023-12-19 17:51
|
||||||
|
|
||||||
|
from django.db import migrations, models
|
||||||
|
import django.db.models.deletion
|
||||||
|
|
||||||
|
|
||||||
|
class Migration(migrations.Migration):
|
||||||
|
dependencies = [
|
||||||
|
("mapdata", "0091_area_main_point"),
|
||||||
|
]
|
||||||
|
|
||||||
|
operations = [
|
||||||
|
migrations.AddField(
|
||||||
|
model_name="accesspermission",
|
||||||
|
name="access_restriction_group",
|
||||||
|
field=models.ForeignKey(
|
||||||
|
null=True,
|
||||||
|
on_delete=django.db.models.deletion.CASCADE,
|
||||||
|
to="mapdata.accessrestrictiongroup",
|
||||||
|
),
|
||||||
|
),
|
||||||
|
migrations.AlterField(
|
||||||
|
model_name="accesspermission",
|
||||||
|
name="access_restriction",
|
||||||
|
field=models.ForeignKey(
|
||||||
|
null=True,
|
||||||
|
on_delete=django.db.models.deletion.CASCADE,
|
||||||
|
to="mapdata.accessrestriction",
|
||||||
|
),
|
||||||
|
),
|
||||||
|
migrations.AddConstraint(
|
||||||
|
model_name="accesspermission",
|
||||||
|
constraint=models.CheckConstraint(
|
||||||
|
check=models.Q(
|
||||||
|
models.Q(
|
||||||
|
("access_restriction__isnull", True),
|
||||||
|
("access_restriction_group__isnull", True),
|
||||||
|
_negated=True,
|
||||||
|
),
|
||||||
|
models.Q(
|
||||||
|
("access_restriction__isnull", False),
|
||||||
|
("access_restriction_group__isnull", False),
|
||||||
|
_negated=True,
|
||||||
|
),
|
||||||
|
),
|
||||||
|
name="permission_needs_restriction_or_restriction_group",
|
||||||
|
),
|
||||||
|
),
|
||||||
|
]
|
|
@ -7,7 +7,7 @@ from typing import Sequence
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.core.cache import cache
|
from django.core.cache import cache
|
||||||
from django.db import models, transaction
|
from django.db import models, transaction
|
||||||
from django.db.models import Q
|
from django.db.models import CheckConstraint, Q
|
||||||
from django.utils import timezone
|
from django.utils import timezone
|
||||||
from django.utils.translation import gettext_lazy as _
|
from django.utils.translation import gettext_lazy as _
|
||||||
from django.utils.translation import ngettext_lazy
|
from django.utils.translation import ngettext_lazy
|
||||||
|
@ -114,9 +114,14 @@ class AccessPermissionToken(models.Model):
|
||||||
if self.author_id and self.unique_key:
|
if self.author_id and self.unique_key:
|
||||||
AccessPermission.objects.filter(author_id=self.author_id, unique_key=self.unique_key).delete()
|
AccessPermission.objects.filter(author_id=self.author_id, unique_key=self.unique_key).delete()
|
||||||
for restriction in self.restrictions:
|
for restriction in self.restrictions:
|
||||||
|
to_grant = (
|
||||||
|
{"access_restriction_id": restriction.pk}
|
||||||
|
if isinstance(restriction.pk, int)
|
||||||
|
else {"access_restriction_group_id": int(restriction.pk.removeprefix("g"))}
|
||||||
|
)
|
||||||
AccessPermission.objects.create(
|
AccessPermission.objects.create(
|
||||||
user=user,
|
user=user,
|
||||||
access_restriction_id=restriction.pk,
|
**to_grant,
|
||||||
author_id=self.author_id,
|
author_id=self.author_id,
|
||||||
expire_date=restriction.expire_date,
|
expire_date=restriction.expire_date,
|
||||||
can_grant=self.can_grant,
|
can_grant=self.can_grant,
|
||||||
|
@ -138,7 +143,8 @@ class AccessPermissionToken(models.Model):
|
||||||
|
|
||||||
class AccessPermission(models.Model):
|
class AccessPermission(models.Model):
|
||||||
user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
|
user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
|
||||||
access_restriction = models.ForeignKey(AccessRestriction, on_delete=models.CASCADE)
|
access_restriction = models.ForeignKey(AccessRestriction, on_delete=models.CASCADE, null=True)
|
||||||
|
access_restriction_group = models.ForeignKey(AccessRestrictionGroup, on_delete=models.CASCADE, null=True)
|
||||||
expire_date = models.DateTimeField(null=True, verbose_name=_('expires'))
|
expire_date = models.DateTimeField(null=True, verbose_name=_('expires'))
|
||||||
can_grant = models.BooleanField(default=False, verbose_name=_('can grant'))
|
can_grant = models.BooleanField(default=False, verbose_name=_('can grant'))
|
||||||
author = models.ForeignKey(settings.AUTH_USER_MODEL, null=True, on_delete=models.SET_NULL,
|
author = models.ForeignKey(settings.AUTH_USER_MODEL, null=True, on_delete=models.SET_NULL,
|
||||||
|
@ -154,6 +160,11 @@ class AccessPermission(models.Model):
|
||||||
unique_together = (
|
unique_together = (
|
||||||
('author', 'unique_key')
|
('author', 'unique_key')
|
||||||
)
|
)
|
||||||
|
constraints = (
|
||||||
|
CheckConstraint(check=(~Q(access_restriction__isnull=True, access_restriction_group__isnull=True) &
|
||||||
|
~Q(access_restriction__isnull=False, access_restriction_group__isnull=False)),
|
||||||
|
name="permission_needs_restriction_or_restriction_group"),
|
||||||
|
)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def user_access_permission_key(user_id):
|
def user_access_permission_key(user_id):
|
||||||
|
@ -176,13 +187,19 @@ class AccessPermission(models.Model):
|
||||||
return {pk: None for pk in cls.get_all_access_restrictions()}
|
return {pk: None for pk in cls.get_all_access_restrictions()}
|
||||||
|
|
||||||
result = tuple(
|
result = tuple(
|
||||||
cls.queryset_for_user(request.user, can_grant).values_list('access_restriction_id', 'expire_date')
|
cls.queryset_for_user(request.user, can_grant).select_related(
|
||||||
|
'access_restriction_group'
|
||||||
|
).prefetch_related('access_restriction_group__accessrestrictions')
|
||||||
)
|
)
|
||||||
|
|
||||||
# collect permissions (can be multiple for one restriction)
|
# collect permissions (can be multiple for one restriction)
|
||||||
permissions = {}
|
permissions = {}
|
||||||
for access_restriction_id, expire_date in result:
|
for permission in result:
|
||||||
permissions.setdefault(access_restriction_id, set()).add(expire_date)
|
if permission.access_restriction_id:
|
||||||
|
permissions.setdefault(permission.access_restriction_id, set()).add(permission.expire_date)
|
||||||
|
if permission.access_restriction_group_id:
|
||||||
|
for member in permission.access_restriction_group.accessrestrictions.all():
|
||||||
|
permissions.setdefault(member.pk, set()).add(permission.expire_date)
|
||||||
|
|
||||||
# get latest expire date for each permission
|
# get latest expire date for each permission
|
||||||
permissions = {
|
permissions = {
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue