new api secret system, multiple api secrets, limited scopes, etc...
This commit is contained in:
parent
4307a7fc4d
commit
43791583e0
22 changed files with 404 additions and 156 deletions
|
@ -8,7 +8,7 @@ from rest_framework.exceptions import ParseError, PermissionDenied
|
|||
from rest_framework.response import Response
|
||||
from rest_framework.viewsets import ViewSet
|
||||
|
||||
from c3nav.api.models import Token
|
||||
from c3nav.api.models import LoginToken
|
||||
from c3nav.api.utils import get_api_post_data
|
||||
|
||||
|
||||
|
@ -39,8 +39,8 @@ class SessionViewSet(ViewSet):
|
|||
|
||||
if 'token' in data:
|
||||
try:
|
||||
token = Token.get_by_token(data['token'])
|
||||
except Token.DoesNotExist:
|
||||
token = LoginToken.get_by_token(data['token'])
|
||||
except LoginToken.DoesNotExist:
|
||||
raise PermissionDenied(_('This token does not exist or is no longer valid.'))
|
||||
user = token.user
|
||||
elif 'username' in data:
|
||||
|
|
|
@ -5,16 +5,14 @@ from rest_framework.exceptions import AuthenticationFailed
|
|||
|
||||
class APISecretAuthentication(TokenAuthentication):
|
||||
def authenticate_credentials(self, key):
|
||||
from c3nav.control.models import UserPermissions
|
||||
|
||||
try:
|
||||
user_perms = UserPermissions.objects.exclude(api_secret='').exclude(api_secret__isnull=True).filter(
|
||||
api_secret=key
|
||||
).get()
|
||||
except UserPermissions.DoesNotExist:
|
||||
from c3nav.api.models import Secret
|
||||
secret = Secret.objects.filter(api_secret=key).select_related('user', 'user__permissions')
|
||||
# todo: auth scopes are ignored here, we need to get rid of this
|
||||
except Secret.DoesNotExist:
|
||||
raise AuthenticationFailed(_('Invalid token.'))
|
||||
|
||||
if not user_perms.user.is_active:
|
||||
if not secret.user.is_active:
|
||||
raise AuthenticationFailed(_('User inactive or deleted.'))
|
||||
|
||||
return (user_perms.user, user_perms)
|
||||
return (secret.user, secret)
|
||||
|
|
107
src/c3nav/api/migrations/0003_rename_token_logintoken_secret.py
Normal file
107
src/c3nav/api/migrations/0003_rename_token_logintoken_secret.py
Normal file
|
@ -0,0 +1,107 @@
|
|||
# Generated by Django 4.2.3 on 2023-11-27 22:32
|
||||
|
||||
from django.conf import settings
|
||||
from django.db import migrations, models
|
||||
import django.db.models.deletion
|
||||
|
||||
|
||||
def forwards_func(apps, schema_editor):
|
||||
UserPermissions = apps.get_model("control", "UserPermissions")
|
||||
for permissions in UserPermissions.objects.select_related("user"):
|
||||
if permissions.api_secret:
|
||||
permissions.user.api_secrets.create(
|
||||
name="legacy secret (migrated)",
|
||||
api_secret=permissions.api_secret,
|
||||
readonly=False,
|
||||
scope_grant_permissions=True,
|
||||
scope_editor=True,
|
||||
scope_mesh=True,
|
||||
valid_until=None,
|
||||
)
|
||||
|
||||
|
||||
def backwards_func(apps, schema_editor):
|
||||
UserPermissions = apps.get_model("control", "UserPermissions")
|
||||
for permissions in UserPermissions.objects.select_related("user"):
|
||||
secret = permissions.user.api_secrets.first()
|
||||
if secret:
|
||||
permissions.api_secret = secret.api_secret
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
dependencies = [
|
||||
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
|
||||
("api", "0002_django_4_0"),
|
||||
("control", "0010_userpermissions_mesh_control"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.RenameModel(
|
||||
old_name="Token",
|
||||
new_name="LoginToken",
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name="Secret",
|
||||
fields=[
|
||||
(
|
||||
"id",
|
||||
models.AutoField(
|
||||
auto_created=True,
|
||||
primary_key=True,
|
||||
serialize=False,
|
||||
verbose_name="ID",
|
||||
),
|
||||
),
|
||||
("name", models.CharField(verbose_name="name")),
|
||||
(
|
||||
"created",
|
||||
models.DateTimeField(
|
||||
auto_now_add=True, verbose_name="creation date"
|
||||
),
|
||||
),
|
||||
(
|
||||
"api_secret",
|
||||
models.CharField(max_length=64, unique=True, verbose_name="API secret"),
|
||||
),
|
||||
("readonly", models.BooleanField(verbose_name="readonly")),
|
||||
(
|
||||
"scope_grant_permissions",
|
||||
models.BooleanField(
|
||||
default=False, verbose_name="grant map access permissions"
|
||||
),
|
||||
),
|
||||
(
|
||||
"scope_editor",
|
||||
models.BooleanField(default=False, verbose_name="editor access"),
|
||||
),
|
||||
(
|
||||
"scope_mesh",
|
||||
models.BooleanField(default=False, verbose_name="mesh access"),
|
||||
),
|
||||
(
|
||||
"valid_until",
|
||||
models.DateTimeField(null=True, verbose_name="valid_until"),
|
||||
),
|
||||
(
|
||||
"user",
|
||||
models.ForeignKey(
|
||||
on_delete=django.db.models.deletion.CASCADE,
|
||||
related_name="api_secrets",
|
||||
to=settings.AUTH_USER_MODEL,
|
||||
),
|
||||
),
|
||||
],
|
||||
),
|
||||
migrations.RunPython(forwards_func, backwards_func),
|
||||
migrations.AlterModelOptions(
|
||||
name="secret",
|
||||
options={
|
||||
"verbose_name": "API secret",
|
||||
"verbose_name_plural": "API secrets",
|
||||
},
|
||||
),
|
||||
migrations.AlterUniqueTogether(
|
||||
name="secret",
|
||||
unique_together={("user", "name")},
|
||||
),
|
||||
]
|
|
@ -6,7 +6,37 @@ from django.utils.crypto import constant_time_compare, get_random_string
|
|||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
|
||||
class Token(models.Model):
|
||||
class SecretQuerySet(models.QuerySet):
|
||||
def get_by_secret(self, secret):
|
||||
self.filter(secret=secret, )
|
||||
|
||||
|
||||
class Secret(models.Model):
|
||||
user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, related_name="api_secrets")
|
||||
name = models.CharField(_('name'))
|
||||
created = models.DateTimeField(auto_now_add=True, verbose_name=_('creation date'))
|
||||
api_secret = models.CharField(max_length=64, verbose_name=_('API secret'), unique=True)
|
||||
readonly = models.BooleanField(_('readonly'))
|
||||
scope_grant_permissions = models.BooleanField(_('grant map access permissions'), default=False)
|
||||
scope_editor = models.BooleanField(_('editor access'), default=False)
|
||||
scope_mesh = models.BooleanField(_('mesh access'), default=False)
|
||||
valid_until = models.DateTimeField(null=True, verbose_name=_('valid_until'))
|
||||
|
||||
def scopes_display(self):
|
||||
return [
|
||||
field.verbose_name for field in self._meta.get_fields()
|
||||
if field.name.startswith('scope_') and getattr(self, field.name)
|
||||
] + ([_('(readonly)')] if self.readonly else [])
|
||||
|
||||
class Meta:
|
||||
verbose_name = _('API secret')
|
||||
verbose_name_plural = _('API secrets')
|
||||
unique_together = [
|
||||
('user', 'name'),
|
||||
]
|
||||
|
||||
|
||||
class LoginToken(models.Model):
|
||||
"""
|
||||
Token for log in via API
|
||||
"""
|
||||
|
|
|
@ -1,26 +1,43 @@
|
|||
from collections import namedtuple
|
||||
from dataclasses import dataclass
|
||||
from enum import StrEnum
|
||||
from importlib import import_module
|
||||
|
||||
from django.contrib.auth import get_user as auth_get_user
|
||||
from django.contrib.auth.models import AnonymousUser
|
||||
from django.db.models import Q
|
||||
from django.utils import timezone
|
||||
from django.utils.functional import SimpleLazyObject, lazy
|
||||
from ninja.security import HttpBearer
|
||||
|
||||
from c3nav import settings
|
||||
from c3nav.api.exceptions import APIPermissionDenied, APITokenInvalid
|
||||
from c3nav.api.models import Secret
|
||||
from c3nav.api.schema import APIErrorSchema
|
||||
from c3nav.control.middleware import UserPermissionsMiddleware
|
||||
from c3nav.control.models import UserPermissions
|
||||
|
||||
FakeRequest = namedtuple('FakeRequest', ('session', ))
|
||||
|
||||
|
||||
class APIAuthMethod(StrEnum):
|
||||
ANONYMOUS = 'anonymous'
|
||||
SESSION = 'session'
|
||||
SECRET = 'secret'
|
||||
|
||||
|
||||
@dataclass
|
||||
class NewAPIAuth:
|
||||
auth_method: APIAuthMethod
|
||||
auth_readonly: bool
|
||||
|
||||
|
||||
description = """
|
||||
An API token can be acquired in 4 ways:
|
||||
|
||||
* Use `anonymous` for guest access.
|
||||
* Generate a session-bound token using the auth session endpoint.
|
||||
* Create an API token in your user account settings.
|
||||
* Create an API token by signing in through the auth endpoint.
|
||||
* Generate a session-bound temporary token using the auth session endpoint.
|
||||
* Create an API secret in your user account settings.
|
||||
""".strip()
|
||||
|
||||
|
||||
|
@ -28,41 +45,78 @@ class APITokenAuth(HttpBearer):
|
|||
openapi_name = "api token authentication"
|
||||
openapi_description = description
|
||||
|
||||
def __init__(self, logged_in=False, superuser=False):
|
||||
def __init__(self, logged_in=False, superuser=False, permissions: set[str] = None, is_readonly=False):
|
||||
super().__init__()
|
||||
self.logged_in = superuser or logged_in
|
||||
self.superuser = superuser
|
||||
self.permissions = permissions or set()
|
||||
self.is_readonly = is_readonly
|
||||
engine = import_module(settings.SESSION_ENGINE)
|
||||
self.SessionStore = engine.SessionStore
|
||||
|
||||
def _authenticate(self, request, token):
|
||||
def _authenticate(self, request, token) -> NewAPIAuth:
|
||||
request.user = AnonymousUser
|
||||
request.user_permissions = SimpleLazyObject(lambda: UserPermissionsMiddleware.get_user_permissions(request))
|
||||
request.user_space_accesses = lazy(UserPermissionsMiddleware.get_user_space_accesses, dict)(request)
|
||||
|
||||
if token == "anonymous":
|
||||
return AnonymousUser
|
||||
return NewAPIAuth(
|
||||
auth_method=APIAuthMethod.ANONYMOUS,
|
||||
auth_readonly=True,
|
||||
)
|
||||
elif token.startswith("session:"):
|
||||
session = self.SessionStore(token.removeprefix("session:"))
|
||||
# todo: ApiTokenInvalid?
|
||||
print('session is empty:', request.session.is_empty())
|
||||
user = auth_get_user(FakeRequest(session=session))
|
||||
return user
|
||||
if not user.is_authenticated:
|
||||
raise APITokenInvalid
|
||||
request.user = user
|
||||
return NewAPIAuth(
|
||||
auth_method=APIAuthMethod.SESSION,
|
||||
auth_readonly=True,
|
||||
)
|
||||
elif token.startswith("secret:"):
|
||||
try:
|
||||
user_perms = UserPermissions.objects.filter(
|
||||
~Q(api_secret=""),
|
||||
~Q(api_secret__isnull=True),
|
||||
api_secret=token.removeprefix("secret:")
|
||||
).select_related("user").get()
|
||||
except UserPermissions.DoesNotExist:
|
||||
secret = Secret.objects.filter(
|
||||
Q(api_secret=token.removeprefix("secret:")),
|
||||
Q(valid_until__isnull=True) | Q(valid_until__lt=timezone.now()),
|
||||
).select_related("user", "user__permissions").get()
|
||||
except Secret.DoesNotExist:
|
||||
raise APITokenInvalid
|
||||
return user_perms.user
|
||||
|
||||
# get user permissions and restrict them based on scopes
|
||||
user_permissions: UserPermissions = secret.user.permissions
|
||||
if secret.scope_mesh is False:
|
||||
user_permissions.mesh_control = False
|
||||
if secret.scope_editor is False:
|
||||
user_permissions.editor_access = False
|
||||
if secret.scope_grant_permissions is False:
|
||||
user_permissions.grant_permissions = False
|
||||
|
||||
request.user = secret.user
|
||||
request.user_permissions = user_permissions
|
||||
|
||||
return NewAPIAuth(
|
||||
auth_method=APIAuthMethod.SESSION,
|
||||
auth_readonly=True
|
||||
)
|
||||
# todo: implement token (app) auth
|
||||
raise APITokenInvalid
|
||||
|
||||
def authenticate(self, request, token):
|
||||
user = self._authenticate(request, token)
|
||||
if self.logged_in and user.is_anonymous:
|
||||
raise APIPermissionDenied
|
||||
if self.superuser and not user.is_superuser:
|
||||
raise APIPermissionDenied
|
||||
return user
|
||||
auth_result = self._authenticate(request, token)
|
||||
if self.logged_in and not request.user.is_authenticated:
|
||||
raise APIPermissionDenied('You need to be signed in for this request.')
|
||||
if self.superuser and not request.user.is_superuser:
|
||||
raise APIPermissionDenied('You need to have admin rights for this endpoint.')
|
||||
for permission in self.permissions:
|
||||
if not getattr(request.user_permissions, permission):
|
||||
raise APIPermissionDenied('You need to have the "%s" permission for this endpoint.')
|
||||
if request.method == 'GET' and self.is_readonly:
|
||||
raise ValueError('this makes no sense for GET')
|
||||
if request.method != 'GET' and not self.is_readonly:
|
||||
raise APIPermissionDenied('You need a non-readonly API access key for this endpoint.')
|
||||
return auth_result
|
||||
|
||||
|
||||
validate_responses = {422: APIErrorSchema, }
|
||||
|
|
|
@ -158,6 +158,7 @@ class AccessPermissionForm(Form):
|
|||
unique_key=unique_key)
|
||||
|
||||
def get_signed_data(self, key=None):
|
||||
# todo: yep, we stil need to fix this
|
||||
if not self.author.permissions.api_secret:
|
||||
raise ValueError('Author has no api secret.')
|
||||
data = {
|
||||
|
|
|
@ -17,22 +17,24 @@ class UserPermissionsMiddleware:
|
|||
def __init__(self, get_response):
|
||||
self.get_response = get_response
|
||||
|
||||
def get_user_permissions(self, request):
|
||||
@staticmethod
|
||||
def get_user_permissions(request):
|
||||
try:
|
||||
return getattr(request, '_user_permissions_cache')
|
||||
except AttributeError:
|
||||
pass
|
||||
result = UserPermissions.get_for_user(request.user)
|
||||
self._user_permissions_cache = result
|
||||
request._user_permissions_cache = result
|
||||
return result
|
||||
|
||||
def get_user_space_accesses(self, request):
|
||||
@staticmethod
|
||||
def get_user_space_accesses(request):
|
||||
try:
|
||||
return getattr(request, '_user_space_accesses_cache')
|
||||
except AttributeError:
|
||||
pass
|
||||
result = UserSpaceAccess.get_for_user(request.user)
|
||||
self._user_space_accesses_cache = result
|
||||
request._user_space_accesses_cache = result
|
||||
return result
|
||||
|
||||
def __call__(self, request):
|
||||
|
|
|
@ -30,36 +30,6 @@
|
|||
</p>
|
||||
{% endif %}
|
||||
|
||||
{% if request.user_permissions.grant_permissions or request.user == user and user.permissions.api_secret %}
|
||||
<h4>{% trans 'API secret' %}</h4>
|
||||
<p>
|
||||
{% if user.permissions.api_secret %}
|
||||
{% if request.user == user %}
|
||||
{% trans 'This user has an API secret.' %}
|
||||
{% else %}
|
||||
{% trans 'You have an API secret.' %}
|
||||
{% endif %}
|
||||
{% trans 'You can not see it, but generate a new one.' %}
|
||||
{% else %}
|
||||
{% trans 'This user has not an API secret.' %}
|
||||
{% trans 'You can create one.' %}
|
||||
{% endif %}
|
||||
</p>
|
||||
<form method="POST">
|
||||
{% csrf_token %}
|
||||
<select name="api_secret" style="width: auto;">
|
||||
<option value="">---</option>
|
||||
{% if user.permissions.api_secret %}
|
||||
<option value="regenerate">{% trans 'Regenerate API secret' %}</option>
|
||||
<option value="delete">{% trans 'Delete API secret' %}</option>
|
||||
{% else %}
|
||||
<option value="generate">{% trans 'Generate API secret' %}</option>
|
||||
{% endif %}
|
||||
</select>
|
||||
<button type="submit">{% trans 'Update API secret' %}</button>
|
||||
</form>
|
||||
{% endif %}
|
||||
|
||||
<a name="access"></a>
|
||||
<h4>{% trans 'Access Permissions' %}</h4>
|
||||
{% if access_restriction %}
|
||||
|
|
|
@ -21,6 +21,7 @@ def grant_access(request): # todo: make class based view
|
|||
if form.is_valid():
|
||||
token = form.get_token()
|
||||
token.save()
|
||||
# todo: this still needs fixing
|
||||
if settings.DEBUG and request.user_permissions.api_secret:
|
||||
signed_data = form.get_signed_data()
|
||||
print('/?'+urlencode({'access': signed_data}))
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
import string
|
||||
|
||||
from django.contrib import messages
|
||||
from django.contrib.auth.decorators import login_required
|
||||
from django.contrib.auth.models import User
|
||||
|
@ -7,7 +5,6 @@ from django.db import IntegrityError, transaction
|
|||
from django.db.models import Prefetch
|
||||
from django.shortcuts import get_object_or_404, redirect, render
|
||||
from django.utils import timezone
|
||||
from django.utils.crypto import get_random_string
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from django.views.generic import ListView
|
||||
|
||||
|
@ -60,37 +57,6 @@ def user_detail(request, user): # todo: make class based view
|
|||
messages.error(request, _('You cannot delete this Access Permission.'))
|
||||
return redirect(request.path_info+'?restriction='+str(permission.pk)+'#access')
|
||||
|
||||
api_secret_action = request.POST.get('api_secret')
|
||||
if (api_secret_action and (request.user_permissions.grant_permissions or
|
||||
(request.user == user and user.permissions.api_secret))):
|
||||
|
||||
permissions = user.permissions
|
||||
|
||||
if api_secret_action == 'generate' and permissions.api_secret:
|
||||
messages.error(request, _('This user already has an API secret.'))
|
||||
return redirect(request.path_info)
|
||||
|
||||
if api_secret_action in ('delete', 'regenerate') and not permissions.api_secret:
|
||||
messages.error(request, _('This user does not have an API secret.'))
|
||||
return redirect(request.path_info)
|
||||
|
||||
with transaction.atomic():
|
||||
if api_secret_action in ('generate', 'regenerate'):
|
||||
api_secret = '%d-%s' % (user.pk, get_random_string(62, string.ascii_letters+string.digits))
|
||||
permissions.api_secret = api_secret
|
||||
permissions.save()
|
||||
|
||||
messages.success(request, _('The new API secret is: %s – '
|
||||
'be sure to note it down now, it won\'t be shown again.') % api_secret)
|
||||
|
||||
elif api_secret_action == 'delete':
|
||||
permissions.api_secret = None
|
||||
permissions.save()
|
||||
|
||||
messages.success(request, _('API secret successfully deleted!'))
|
||||
|
||||
return redirect(request.path_info)
|
||||
|
||||
ctx = {
|
||||
'user': user,
|
||||
}
|
||||
|
|
1
src/c3nav/mapdata/utils/cache/stats.py
vendored
1
src/c3nav/mapdata/utils/cache/stats.py
vendored
|
@ -14,6 +14,7 @@ from c3nav.mapdata.utils.locations import CustomLocation, get_location_by_id_for
|
|||
|
||||
|
||||
def increment_cache_key(cache_key):
|
||||
print('increment_cache_key', cache_key)
|
||||
try:
|
||||
cache.incr(cache_key)
|
||||
except ValueError:
|
||||
|
|
|
@ -3,20 +3,20 @@ from dataclasses import dataclass
|
|||
from dataclasses import replace as dataclass_replace
|
||||
from functools import cached_property
|
||||
from itertools import chain
|
||||
from typing import Sequence, Any
|
||||
from typing import Any, Sequence
|
||||
|
||||
from asgiref.sync import async_to_sync
|
||||
from django import forms
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.db import transaction
|
||||
from django.forms import Form, ChoiceField, BooleanField
|
||||
from django.forms import BooleanField, ChoiceField, Form
|
||||
from django.http import Http404
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
from c3nav.mesh.dataformats import BoardConfig, BoardType, LedType, SerialLedType
|
||||
from c3nav.mesh.messages import MESH_BROADCAST_ADDRESS, MESH_ROOT_ADDRESS, MeshMessage, MeshMessageType
|
||||
from c3nav.mesh.models import MeshNode, HardwareDescription, FirmwareBuild, OTAUpdateRecipient, OTARecipientStatus, \
|
||||
OTAUpdate
|
||||
from c3nav.mesh.models import (FirmwareBuild, HardwareDescription, MeshNode, OTARecipientStatus, OTAUpdate,
|
||||
OTAUpdateRecipient)
|
||||
|
||||
|
||||
class MeshMessageForm(forms.Form):
|
||||
|
@ -300,7 +300,6 @@ class RangingForm(forms.Form):
|
|||
self.fields['range_to'].choices = node_choices
|
||||
|
||||
|
||||
|
||||
@dataclass
|
||||
class OTAFormGroup:
|
||||
hardware: HardwareDescription
|
||||
|
|
|
@ -1,8 +1,9 @@
|
|||
from datetime import datetime
|
||||
from typing import Optional, Annotated
|
||||
from typing import Annotated, Optional
|
||||
|
||||
from django.db import IntegrityError, transaction
|
||||
from ninja import Field as APIField, Query
|
||||
from ninja import Field as APIField
|
||||
from ninja import Query
|
||||
from ninja import Router as APIRouter
|
||||
from ninja import Schema, UploadedFile
|
||||
from ninja.pagination import paginate
|
||||
|
@ -14,7 +15,7 @@ from c3nav.mesh.dataformats import BoardType, ChipType, FirmwareImage
|
|||
from c3nav.mesh.messages import MeshMessageType
|
||||
from c3nav.mesh.models import FirmwareBuild, FirmwareVersion, NodeMessage
|
||||
|
||||
mesh_api_router = APIRouter(tags=["mesh"])
|
||||
mesh_api_router = APIRouter(tags=["mesh"], auth=APITokenAuth(permissions={"mesh_control"}))
|
||||
|
||||
|
||||
class FirmwareBuildSchema(Schema):
|
||||
|
@ -144,7 +145,7 @@ class UploadFirmwareSchema(Schema):
|
|||
|
||||
|
||||
@mesh_api_router.post(
|
||||
'/firmwares/upload', summary="Upload firmware", auth=APITokenAuth(superuser=True),
|
||||
'/firmwares/upload', summary="Upload firmware",
|
||||
description="your OpenAPI viewer might not show it: firmware_data is UploadFirmware as json",
|
||||
response={200: FirmwareSchema, **validate_responses, **auth_permission_responses, **APIConflict.dict()}
|
||||
)
|
||||
|
@ -166,20 +167,10 @@ def firmware_upload(request, firmware_data: UploadFirmwareSchema, binary_files:
|
|||
project_name=firmware_data.project_name,
|
||||
version=firmware_data.version,
|
||||
idf_version=firmware_data.idf_version,
|
||||
uploader=request.auth,
|
||||
uploader=request.user,
|
||||
)
|
||||
|
||||
for build_data in firmware_data.builds:
|
||||
# if bin_file.size > 4 * 1024 * 1024:
|
||||
# raise ValueError # todo: better error
|
||||
|
||||
# h = hashlib.sha256()
|
||||
# h.update(build_data.binary)
|
||||
# sha256_bin_file = h.hexdigest() # todo: verify sha256 correctly
|
||||
#
|
||||
# if sha256_bin_file != build_data.sha256_hash:
|
||||
# raise ValueError
|
||||
|
||||
try:
|
||||
image = FirmwareImage.from_file(binary_files_by_name[build_data.uploaded_filename].open('rb'))
|
||||
except ValueError:
|
||||
|
@ -224,7 +215,7 @@ class NodeMessageSchema(Schema):
|
|||
|
||||
|
||||
@mesh_api_router.get(
|
||||
'/messages/', summary="query recorded mesh messages", auth=APITokenAuth(superuser=True),
|
||||
'/messages/', summary="query recorded mesh messages",
|
||||
response={200: list[NodeMessageSchema], **auth_permission_responses}
|
||||
)
|
||||
@paginate
|
||||
|
|
|
@ -13,7 +13,7 @@ positioning_api_router = APIRouter(tags=["positioning"])
|
|||
|
||||
@positioning_api_router.post('/locate/', summary="locate based on wifi scans",
|
||||
response={200: BoundsSchema, **auth_responses})
|
||||
def locate(request):
|
||||
def locate():
|
||||
# todo: implement
|
||||
return {
|
||||
"bounds": Source.max_bounds(),
|
||||
|
@ -22,7 +22,7 @@ def locate(request):
|
|||
|
||||
@positioning_api_router.get('/locate-test/', summary="get dummy location for debugging",
|
||||
response={200: BoundsSchema, **auth_responses})
|
||||
def locate_test(request):
|
||||
def locate_test():
|
||||
# todo: implement
|
||||
return {
|
||||
"bounds": Source.max_bounds(),
|
||||
|
@ -44,5 +44,5 @@ BeaconsXYZ = dict[
|
|||
|
||||
@positioning_api_router.get('/beacons-xyz/', summary="get calculated x y z for all beacons",
|
||||
response={200: BeaconsXYZ, **auth_responses})
|
||||
def beacons_xyz(request):
|
||||
def beacons_xyz():
|
||||
return RangeLocator.load().get_all_xyz()
|
||||
|
|
|
@ -2,7 +2,7 @@ import pickle
|
|||
import threading
|
||||
from dataclasses import dataclass
|
||||
from pprint import pprint
|
||||
from typing import Self
|
||||
from typing import Annotated, Self
|
||||
|
||||
import numpy as np
|
||||
import scipy
|
||||
|
|
|
@ -49,7 +49,7 @@ with suppress(ImportError):
|
|||
import sentry_sdk
|
||||
from sentry_sdk.integrations.celery import CeleryIntegration
|
||||
from sentry_sdk.integrations.django import DjangoIntegration
|
||||
from sentry_sdk.scrubber import EventScrubber, DEFAULT_DENYLIST
|
||||
from sentry_sdk.scrubber import DEFAULT_DENYLIST, EventScrubber
|
||||
|
||||
sensitive_env_vars = ['C3NAV_DJANGO_SECRET', 'C3NAV_TILE_SECRET', 'C3NAV_DATABASE', 'C3NAV_DATABASE_PASSWORD',
|
||||
'C3NAV_MEMCACHED', 'C3NAV_MEMCACHED_USER', 'C3NAV_MEMCACHED_PASSWORD',
|
||||
|
|
|
@ -1,20 +1,17 @@
|
|||
from dataclasses import dataclass
|
||||
from dataclasses import replace as dataclass_replace
|
||||
from functools import cached_property
|
||||
from itertools import chain
|
||||
import string
|
||||
from datetime import timedelta
|
||||
from operator import attrgetter
|
||||
from typing import Any, Sequence
|
||||
|
||||
from django.db import transaction
|
||||
from django.forms import BooleanField, ChoiceField, Form, ModelChoiceField, ModelForm
|
||||
from django.forms import Form, IntegerField, ModelChoiceField, ModelForm
|
||||
from django.utils import timezone
|
||||
from django.utils.crypto import get_random_string
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
from c3nav.api.models import Secret
|
||||
from c3nav.mapdata.forms import I18nModelFormMixin
|
||||
from c3nav.mapdata.models.locations import Position
|
||||
from c3nav.mapdata.models.report import Report, ReportUpdate
|
||||
from c3nav.mesh.messages import MeshMessageType
|
||||
from c3nav.mesh.models import (FirmwareBuild, HardwareDescription, MeshNode, OTARecipientStatus, OTAUpdate,
|
||||
OTAUpdateRecipient)
|
||||
|
||||
|
||||
class ReportIssueForm(I18nModelFormMixin, ModelForm):
|
||||
|
@ -74,3 +71,35 @@ class PositionSetForm(Form):
|
|||
self.fields['position'].queryset = Position.objects.filter(owner=request.user)
|
||||
self.fields['position'].label_from_instance = attrgetter('name')
|
||||
|
||||
|
||||
class APISecretForm(ModelForm):
|
||||
valid_for_days = IntegerField(min_value=0, max_value=90, label=_('valid for (days)'), initial=7)
|
||||
valid_for_hours = IntegerField(min_value=0, max_value=24, label=_('valid for (hours)'), initial=0)
|
||||
|
||||
def __init__(self, *args, request, **kwargs):
|
||||
self.request = request
|
||||
super().__init__(*args, **kwargs)
|
||||
if not self.request.user_permissions.grant_permissions:
|
||||
self.fields.pop('scope_grant_permissions', None)
|
||||
if not self.request.user_permissions.editor_access:
|
||||
self.fields.pop('scope_editor', None)
|
||||
if not self.request.user_permissions.mesh_control:
|
||||
self.fields.pop('scope_mesh', None)
|
||||
|
||||
class Meta:
|
||||
model = Secret
|
||||
fields = ['name', 'readonly', 'scope_grant_permissions', 'scope_editor', 'scope_mesh']
|
||||
# todo: allow suuplying days and hours
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
self.instance.valid_until = (
|
||||
timezone.now()
|
||||
+ timedelta(days=self.cleaned_data['valid_for_days'])
|
||||
+ timedelta(hours=self.cleaned_data['valid_for_hours'])
|
||||
)
|
||||
self.instance.user = self.request.user
|
||||
self.instance.api_secret = (
|
||||
'%d-%s' % (self.request.user.pk, get_random_string(62, string.ascii_letters + string.digits))
|
||||
)[:64]
|
||||
|
||||
return super().save(*args, **kwargs)
|
||||
|
|
|
@ -13,25 +13,24 @@
|
|||
{% endwith %}
|
||||
</p>
|
||||
|
||||
{% if request.user_permissions.control_panel and not request.mobileclient %}
|
||||
<hr>
|
||||
{% if request.user_permissions.control_panel or request.user_permissions.can_review_reports or request.user_permissions.mesh_control %}
|
||||
<p>
|
||||
{% trans 'You can access the control panel.' %}
|
||||
{% trans 'You can manage:' %}
|
||||
</p>
|
||||
<p>
|
||||
<a class="button" href="{% url 'control.index' %}">{% trans 'c3nav control panel' %}</a>
|
||||
{% if request.user_permissions.control_panel %}
|
||||
<a class="button" href="{% url 'control.index' %}">{% trans 'stuff' %}</a>
|
||||
{% endif %}
|
||||
{% if request.user_permissions.can_review_reports %}
|
||||
<a class="button" href="{% url 'site.report_list' %}">{% trans 'reports' %}</a>
|
||||
{% endif %}
|
||||
{% if request.user_permissions.mesh_control %}
|
||||
<a class="button" href="{% url 'mesh.nodes' %}">{% trans 'mesh' %}</a>
|
||||
{% endif %}
|
||||
</p>
|
||||
{% endif %}
|
||||
|
||||
{% if request.user_permissions.can_review_reports %}
|
||||
<hr>
|
||||
<p>
|
||||
{% trans 'You can review reports' %}
|
||||
</p>
|
||||
<p>
|
||||
<a class="button" target="_blank" href="{% url 'site.report_list' filter='open' %}">{% trans 'Show reports' %}</a>
|
||||
</p>
|
||||
{% elif user_has_reports %}
|
||||
{% if not request.user_permissions.can_review_reports and user_has_reports %}
|
||||
<hr>
|
||||
<p>
|
||||
{% trans 'You have submitted reports.' %}
|
||||
|
@ -42,12 +41,11 @@
|
|||
{% endif %}
|
||||
|
||||
<hr>
|
||||
|
||||
<p>
|
||||
{% trans 'You can create custom positions.' %}
|
||||
<a class="button" target="_blank" href="{% url 'site.position_list' %}">{% trans 'Manage custom positions' %}</a>
|
||||
</p>
|
||||
<p>
|
||||
<a class="button" target="_blank" href="{% url 'site.position_list' %}">{% trans 'Manage your positions' %}</a>
|
||||
<a class="button" target="_blank" href="{% url 'site.api_secret_list' %}">{% trans 'Manage API secrets' %}</a>
|
||||
</p>
|
||||
|
||||
<hr>
|
||||
|
|
16
src/c3nav/site/templates/site/api_secret_create.html
Normal file
16
src/c3nav/site/templates/site/api_secret_create.html
Normal file
|
@ -0,0 +1,16 @@
|
|||
{% extends 'site/base.html' %}
|
||||
{% load i18n %}
|
||||
|
||||
{% block content %}
|
||||
<main class="account">
|
||||
<h2>{% trans 'Create API secret' %}</h2>
|
||||
{% include 'site/fragment_messages.html' %}
|
||||
<p><a href="{% url 'site.api_secret_list' %}">« {% trans 'back to overview' %}</a></p>
|
||||
|
||||
<form method="post" action="{{ request.path_info }}">
|
||||
{% csrf_token %}
|
||||
{{ form.as_p }}
|
||||
<button type="submit">{% trans 'Submit' %}</button>
|
||||
</form>
|
||||
</main>
|
||||
{% endblock %}
|
44
src/c3nav/site/templates/site/api_secret_list.html
Normal file
44
src/c3nav/site/templates/site/api_secret_list.html
Normal file
|
@ -0,0 +1,44 @@
|
|||
{% extends 'site/base.html' %}
|
||||
{% load i18n %}
|
||||
|
||||
{% block content %}
|
||||
<main class="account">
|
||||
{% include 'site/fragment_messages.html' %}
|
||||
<h2>{% trans 'Your API secrets' %}</h2>
|
||||
|
||||
<form method="post" action="{{ request.path_info }}" style="max-width: 100%;">
|
||||
{% csrf_token %}
|
||||
{% if api_secrets %}
|
||||
<table>
|
||||
<tr>
|
||||
<th>{% trans "Name" %}</th>
|
||||
<th>{% trans "Created" %}</th>
|
||||
<th>{% trans "Permissions" %}</th>
|
||||
<th>{% trans "Valid Until" %}</th>
|
||||
<th>{% trans "Delete" %}</th>
|
||||
</tr>
|
||||
{% for secret in api_secrets %}
|
||||
<tr>
|
||||
<td>{{ secret.name }}</td>
|
||||
<td>{{ secret.created }}</td>
|
||||
<td>
|
||||
{% for scope in secret.scopes_display %}
|
||||
{% if forloop.counter0 != 0 %}<br>{% endif %}
|
||||
{{ scope }}
|
||||
{% endfor %}
|
||||
</td>
|
||||
<td>
|
||||
{% if secret.valid_until %}{{ secret.valid_until }}{% endif %}
|
||||
</td>
|
||||
<td>
|
||||
<button type="submit" name="delete" value="{{ secret.pk }}">{% trans 'delete' %}</button>
|
||||
</td>
|
||||
</tr>
|
||||
{% endfor %}
|
||||
</table>
|
||||
{% endif %}
|
||||
</form>
|
||||
|
||||
<a class="button" href="{% url 'site.api_secret_create' %}">{% trans 'Create API secret' %}</a>
|
||||
</main>
|
||||
{% endblock %}
|
|
@ -3,9 +3,10 @@ from itertools import chain
|
|||
from django.urls import path, register_converter
|
||||
|
||||
from c3nav.site.converters import AtPositionConverter, CoordinatesConverter, IsEmbedConverter, LocationConverter
|
||||
from c3nav.site.views import (about_view, access_redeem_view, account_view, change_password_view, choose_language,
|
||||
login_view, logout_view, map_index, position_create, position_detail, position_list,
|
||||
position_set, qr_code, register_view, report_create, report_detail, report_list)
|
||||
from c3nav.site.views import (about_view, access_redeem_view, account_view, api_secret_create, api_secret_list,
|
||||
change_password_view, choose_language, login_view, logout_view, map_index,
|
||||
position_create, position_detail, position_list, position_set, qr_code, register_view,
|
||||
report_create, report_detail, report_list)
|
||||
|
||||
register_converter(LocationConverter, 'loc')
|
||||
register_converter(CoordinatesConverter, 'coords')
|
||||
|
@ -55,4 +56,6 @@ urlpatterns = [
|
|||
path('positions/create/', position_create, name='site.position_create'),
|
||||
path('positions/<int:pk>/', position_detail, name='site.position_detail'),
|
||||
path('positions/set/<coords:coordinates>/', position_set, name='site.position_set'),
|
||||
path('api-secrets/', api_secret_list, name='site.api_secret_list'),
|
||||
path('api-secrets/create/', api_secret_create, name='site.api_secret_create'),
|
||||
]
|
||||
|
|
|
@ -18,12 +18,14 @@ from django.middleware import csrf
|
|||
from django.shortcuts import get_object_or_404, redirect, render
|
||||
from django.urls import reverse
|
||||
from django.utils import timezone
|
||||
from django.utils.html import format_html
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from django.utils.translation import ngettext_lazy
|
||||
from django.views.decorators.cache import cache_control, never_cache
|
||||
from django.views.decorators.clickjacking import xframe_options_exempt
|
||||
from django.views.decorators.http import etag
|
||||
|
||||
from c3nav.api.models import Secret
|
||||
from c3nav.control.forms import AccessPermissionForm, SignedPermissionDataError
|
||||
from c3nav.mapdata.grid import grid
|
||||
from c3nav.mapdata.models import Location, Source
|
||||
|
@ -35,7 +37,7 @@ from c3nav.mapdata.utils.locations import (get_location_by_id_for_request, get_l
|
|||
from c3nav.mapdata.utils.user import can_access_editor, get_user_data
|
||||
from c3nav.mapdata.views import set_tile_access_cookie
|
||||
from c3nav.routing.models import RouteOptions
|
||||
from c3nav.site.forms import PositionForm, PositionSetForm, ReportUpdateForm
|
||||
from c3nav.site.forms import APISecretForm, PositionForm, PositionSetForm, ReportUpdateForm
|
||||
from c3nav.site.models import Announcement, SiteUpdate
|
||||
|
||||
|
||||
|
@ -570,3 +572,39 @@ def position_set(request, coordinates):
|
|||
'coordinates': coordinates,
|
||||
'form': form,
|
||||
})
|
||||
|
||||
|
||||
@login_required(login_url='site.login')
|
||||
def api_secret_list(request):
|
||||
print(Secret.objects.values_list("api_secret", flat=True))
|
||||
if request.method == 'POST' and request.POST.get('delete', 'nope').isdigit():
|
||||
Secret.objects.filter(user=request.user, pk=int(request.POST['delete'])).delete()
|
||||
messages.success(request, _('API secret deleted.'))
|
||||
return redirect(reverse('site.api_secret_list'))
|
||||
return render(request, 'site/api_secret_list.html', {
|
||||
'api_secrets': Secret.objects.filter(user=request.user).order_by('-created'),
|
||||
'user_data_json': json.dumps(get_user_data(request), cls=DjangoJSONEncoder),
|
||||
})
|
||||
|
||||
|
||||
@login_required(login_url='site.login')
|
||||
def api_secret_create(request):
|
||||
if Secret.objects.filter(user=request.user).count() >= 20:
|
||||
messages.error(request, _('You can\'t create more than 20 API secrets.'))
|
||||
|
||||
if request.method == 'POST':
|
||||
form = APISecretForm(data=request.POST, request=request)
|
||||
if form.is_valid():
|
||||
secret = form.save()
|
||||
messages.success(request, format_html(
|
||||
'{}<br><code>{}</code>',
|
||||
_('API secret created. Save it now, cause it will not be shown again!'),
|
||||
secret.api_secret,
|
||||
))
|
||||
return redirect(reverse('site.api_secret_list'))
|
||||
else:
|
||||
form = APISecretForm(request=request)
|
||||
|
||||
return render(request, 'site/api_secret_create.html', {
|
||||
'form': form,
|
||||
})
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue