goodbye django rest framework

This commit is contained in:
Laura Klünder 2023-12-03 21:47:43 +01:00
parent 0e65efcbbc
commit caf23d053c
16 changed files with 48 additions and 1733 deletions

View file

@ -1,35 +0,0 @@
from functools import wraps
from rest_framework.generics import GenericAPIView
from rest_framework.renderers import JSONRenderer
from c3nav.mapdata.utils.json import json_encoder_reindent
orig_render = JSONRenderer.render
@wraps(JSONRenderer.render)
def nicer_renderer(self, data, accepted_media_type=None, renderer_context=None):
if self.get_indent(accepted_media_type, renderer_context) is None:
return orig_render(self, data, accepted_media_type, renderer_context)
shorten_limit = 50
if isinstance(data, (list, tuple)):
shorten_limit = 5 if any(('geometry' in item) for item in data[:50]) else 50
shorten = isinstance(data, (list, tuple)) and len(data) > shorten_limit
if shorten:
remaining_len = len(data)-shorten_limit
data = data[:shorten_limit]
result = json_encoder_reindent(lambda d: orig_render(self, d, accepted_media_type, renderer_context), data)
if shorten:
result = (result[:-2] +
('\n ...%d more elements (truncated for HTML preview)...' % remaining_len).encode() +
result[-2:])
return result
# Monkey patch for nicer indentation in the django rest framework
JSONRenderer.render = nicer_renderer
# Fuck serializers!
del GenericAPIView.get_serializer

View file

@ -1,91 +0,0 @@
from django.contrib.auth import login, logout
from django.contrib.auth.forms import AuthenticationForm
from django.middleware import csrf
from django.utils.translation import gettext_lazy as _
from rest_framework.authentication import SessionAuthentication
from rest_framework.decorators import action
from rest_framework.exceptions import ParseError, PermissionDenied
from rest_framework.response import Response
from rest_framework.viewsets import ViewSet
from c3nav.api.models import LoginToken
from c3nav.api.utils import get_api_post_data
class SessionViewSet(ViewSet):
"""
Session for Login, Logout, etc
Don't forget to set X-Csrftoken for POST requests!
/login POST with fields token or username and password to log in
/get_token POST with fields username and password to get a login token
/logout - POST to log out
"""
def list(self, request, *args, **kwargs):
return Response({
'is_authenticated': request.user.is_authenticated,
'csrf_token': csrf.get_token(request),
})
@action(detail=False, methods=['post'])
def login(self, request, *args, **kwargs):
# django-rest-framework doesn't do this for logged out requests
SessionAuthentication().enforce_csrf(request)
if request.user.is_authenticated:
raise ParseError(_('Log out first.'))
data = get_api_post_data(request)
if 'token' in data:
try:
token = LoginToken.get_by_token(data['token'])
except LoginToken.DoesNotExist:
raise PermissionDenied(_('This token does not exist or is no longer valid.'))
user = token.user
elif 'username' in data:
form = AuthenticationForm(request, data=data)
if not form.is_valid():
raise ParseError(form.errors)
user = form.user_cache
else:
raise ParseError(_('You need to send a token or username and password.'))
login(request, user)
return Response({
'detail': _('Login successful.'),
'csrf_token': csrf.get_token(request),
})
@action(detail=False, methods=['post'])
def get_token(self, request, *args, **kwargs):
# django-rest-framework doesn't do this for logged out requests
SessionAuthentication().enforce_csrf(request)
data = get_api_post_data(request)
form = AuthenticationForm(request, data=data)
if not form.is_valid():
raise ParseError(form.errors)
token = form.user_cache.login_tokens.create()
return Response({
'token': token.get_token(),
})
@action(detail=False, methods=['post'])
def logout(self, request, *args, **kwargs):
# django-rest-framework doesn't do this for logged out requests
SessionAuthentication().enforce_csrf(request)
if not request.user.is_authenticated:
return ParseError(_('Not logged in.'))
logout(request)
return Response({
'detail': _('Logout successful.'),
'csrf_token': csrf.get_token(request),
})

View file

@ -1,18 +0,0 @@
from django.utils.translation import gettext_lazy as _
from rest_framework.authentication import TokenAuthentication
from rest_framework.exceptions import AuthenticationFailed
class APISecretAuthentication(TokenAuthentication):
def authenticate_credentials(self, key):
try:
from c3nav.api.models import Secret
secret = Secret.objects.filter(api_secret=key).select_related('user', 'user__permissions')
# todo: auth scopes are ignored here, we need to get rid of this
except Secret.DoesNotExist:
raise AuthenticationFailed(_('Invalid token.'))
if not secret.user.is_active:
raise AuthenticationFailed(_('User inactive or deleted.'))
return (secret.user, secret)

View file

@ -1,28 +0,0 @@
{% extends "rest_framework/base.html" %}
{% load static %}
{% load compress %}
{% block title %}{% if name %}{{ name }} {% endif %}c3nav API{% endblock %}
{% block bootstrap_navbar_variant %}{% endblock %}
{% block meta %}
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta name="robots" content="NONE,NOARCHIVE" />
{% if favicon %}
<link href="{% static favicon %}" rel="icon">
{% endif %}
{% endblock %}
{% block style %}
{% compress css %}
<link href="{% static 'bootstrap/css/bootstrap.css' %}" rel="stylesheet">
<link href="{% static 'rest_framework/css/prettify.css' %}" rel="stylesheet">
<link href="{% static 'rest_framework/css/default.css' %}" rel="stylesheet">
<link href="{% static 'api/css/style.css' %}" rel="stylesheet">
{% endcompress %}
{% endblock %}
{% block branding %}
<span class="navbar-brand">c3nav API</span>
{% endblock %}

View file

@ -1,30 +1,13 @@
import inspect
import re
from collections import OrderedDict
from django.urls import path
from django.views.generic.base import RedirectView
from django.urls import include, path, re_path
from django.utils.functional import cached_property
from rest_framework.generics import GenericAPIView
from rest_framework.response import Response
from rest_framework.routers import SimpleRouter
from c3nav.api.api import SessionViewSet
from c3nav.api.newapi import auth_api_router
from c3nav.api.ninja import ninja_api
from c3nav.editor.api import ChangeSetViewSet, EditorViewSet
from c3nav.editor.newapi.endpoints import editor_api_router
from c3nav.mapdata.api import (AccessRestrictionGroupViewSet, AccessRestrictionViewSet, AreaViewSet, BuildingViewSet,
ColumnViewSet, CrossDescriptionViewSet, DoorViewSet, DynamicLocationPositionViewSet,
HoleViewSet, LeaveDescriptionViewSet, LevelViewSet, LineObstacleViewSet,
LocationBySlugViewSet, LocationGroupCategoryViewSet, LocationGroupViewSet,
LocationViewSet, MapViewSet, ObstacleViewSet, POIViewSet, RampViewSet, SourceViewSet,
SpaceViewSet, StairViewSet, UpdatesViewSet)
from c3nav.mapdata.newapi.map import map_api_router
from c3nav.mapdata.newapi.mapdata import mapdata_api_router
from c3nav.mapdata.newapi.updates import updates_api_router
from c3nav.mapdata.utils.user import can_access_editor
from c3nav.mesh.newapi import mesh_api_router
from c3nav.routing.api import RoutingViewSet
from c3nav.routing.newapi.positioning import positioning_api_router
from c3nav.routing.newapi.routing import routing_api_router
@ -41,89 +24,8 @@ ninja_api.add_router("/editor/", editor_api_router)
ninja_api.add_router("/mesh/", mesh_api_router)
"""
legacy API
"""
router = SimpleRouter()
router.register(r'map', MapViewSet, basename='map')
router.register(r'levels', LevelViewSet)
router.register(r'buildings', BuildingViewSet)
router.register(r'spaces', SpaceViewSet)
router.register(r'doors', DoorViewSet)
router.register(r'holes', HoleViewSet)
router.register(r'areas', AreaViewSet)
router.register(r'stairs', StairViewSet)
router.register(r'ramps', RampViewSet)
router.register(r'obstacles', ObstacleViewSet)
router.register(r'lineobstacles', LineObstacleViewSet)
router.register(r'columns', ColumnViewSet)
router.register(r'pois', POIViewSet)
router.register(r'leavedescriptions', LeaveDescriptionViewSet)
router.register(r'crossdescriptions', CrossDescriptionViewSet)
router.register(r'sources', SourceViewSet)
router.register(r'accessrestrictions', AccessRestrictionViewSet)
router.register(r'accessrestrictiongroups', AccessRestrictionGroupViewSet)
router.register(r'locations', LocationViewSet)
router.register(r'locations/by_slug', LocationBySlugViewSet, basename='location-by-slug')
router.register(r'locations/dynamic', DynamicLocationPositionViewSet, basename='dynamic-location')
router.register(r'locationgroupcategories', LocationGroupCategoryViewSet)
router.register(r'locationgroups', LocationGroupViewSet)
router.register(r'updates', UpdatesViewSet, basename='updates')
router.register(r'routing', RoutingViewSet, basename='routing')
router.register(r'editor', EditorViewSet, basename='editor')
router.register(r'changesets', ChangeSetViewSet)
router.register(r'session', SessionViewSet, basename='session')
class APIRoot(GenericAPIView):
"""
Welcome to the c3nav RESTful API.
The HTML preview is only shown because your Browser sent text/html in its Accept header.
If you want to use this API on a large scale, please use a client that supports E-Tags.
For more information on a specific API endpoint, access it with a browser.
This is the old API which is slowly being phased out in favor of the new API at /api/v2/.
"""
def _format_pattern(self, pattern):
return re.sub(r'\(\?P<([^>]*[^>_])_?>[^)]+\)', r'{\1}', pattern)[1:-1]
@cached_property
def urls(self):
include_editor = can_access_editor(self.request)
urls: dict[str, dict[str, str] | str] = OrderedDict()
for urlpattern in router.urls:
if not include_editor and inspect.getmodule(urlpattern.callback).__name__.startswith('c3nav.editor.'):
continue
name = urlpattern.name
url = self._format_pattern(str(urlpattern.pattern)).replace('{pk}', '{id}')
base = url.split('/', 1)[0]
if base == 'editor':
if name == 'editor-list':
continue
if name == 'editor-detail':
name = 'editor-api'
elif base == 'session':
if name == 'session-list':
name = 'session-info'
if '-' in name:
urls.setdefault(base, OrderedDict())[name.split('-', 1)[1]] = url
else:
urls[name] = url
return urls
def get(self, request):
return Response(self.urls)
urlpatterns = [
# todo: does this work? can it be better?
re_path(r'^$', APIRoot.as_view()),
path('', include(router.urls)),
path('v2/', ninja_api.urls),
path('', RedirectView.as_view(pattern_name="api-v2:openapi-view")),
]

View file

@ -1,683 +0,0 @@
from functools import wraps
from itertools import chain
from django.db.models import Prefetch, Q
from django.urls import Resolver404, resolve
from django.utils.functional import cached_property
from django.utils.translation import gettext_lazy as _
from rest_framework.authentication import SessionAuthentication
from rest_framework.decorators import action
from rest_framework.exceptions import NotFound, ParseError, PermissionDenied, ValidationError
from rest_framework.generics import get_object_or_404
from rest_framework.response import Response
from rest_framework.viewsets import ReadOnlyModelViewSet, ViewSet
from shapely import prepared
from shapely.ops import unary_union
from c3nav.api.utils import get_api_post_data
from c3nav.editor.forms import ChangeSetForm, RejectForm
from c3nav.editor.models import ChangeSet
from c3nav.editor.utils import LevelChildEditUtils, SpaceChildEditUtils
from c3nav.editor.views.base import editor_etag_func
from c3nav.mapdata.api import api_etag
from c3nav.mapdata.models import Area, MapUpdate, Source
from c3nav.mapdata.models.geometry.space import POI
from c3nav.mapdata.utils.geometry import unwrap_geom
from c3nav.mapdata.utils.user import can_access_editor
class EditorViewSetMixin(ViewSet):
def initial(self, request, *args, **kwargs):
if not can_access_editor(request):
raise PermissionDenied
return super().initial(request, *args, **kwargs)
def api_etag_with_update_cache_key(**outkwargs):
outkwargs.setdefault('cache_kwargs', {})['update_cache_key_match'] = bool
def wrapper(func):
func = api_etag(**outkwargs)(func)
@wraps(func)
def wrapped_func(self, request, *args, **kwargs):
try:
changeset = request.changeset
except AttributeError:
changeset = ChangeSet.get_for_request(request)
request.changeset = changeset
update_cache_key = request.changeset.raw_cache_key_without_changes
update_cache_key_match = request.GET.get('update_cache_key') == update_cache_key
return func(self, request, *args,
update_cache_key=update_cache_key, update_cache_key_match=update_cache_key_match,
**kwargs)
return wrapped_func
return wrapper
class EditorViewSet(EditorViewSetMixin, ViewSet):
"""
Editor API
/geometries/ returns a list of geojson features, you have to specify ?level=<id> or ?space=<id>
/geometrystyles/ returns styling information for all geometry types
/bounds/ returns the maximum bounds of the map
/{path}/ insert an editor path to get an API represantation of it. POST requests on forms are possible as well
"""
lookup_field = 'path'
lookup_value_regex = r'.+'
@staticmethod
def space_sorting_func(space):
groups = tuple(space.groups.all())
if not groups:
return (0, 0, 0)
return (1, groups[0].category.priority, groups[0].hierarchy, groups[0].priority)
@staticmethod
def _get_level_geometries(level):
buildings = level.buildings.all()
buildings_geom = unary_union([unwrap_geom(building.geometry) for building in buildings])
spaces = {space.pk: space for space in level.spaces.all()}
holes_geom = []
for space in spaces.values():
if space.outside:
space.geometry = space.geometry.difference(buildings_geom)
columns = [column.geometry for column in space.columns.all()]
if columns:
columns_geom = unary_union([unwrap_geom(column.geometry) for column in space.columns.all()])
space.geometry = space.geometry.difference(columns_geom)
holes = [unwrap_geom(hole.geometry) for hole in space.holes.all()]
if holes:
space_holes_geom = unary_union(holes)
holes_geom.append(space_holes_geom.intersection(unwrap_geom(space.geometry)))
space.geometry = space.geometry.difference(space_holes_geom)
for building in buildings:
building.original_geometry = building.geometry
if holes_geom:
holes_geom = unary_union(holes_geom)
holes_geom_prep = prepared.prep(holes_geom)
for obj in buildings:
if holes_geom_prep.intersects(unwrap_geom(obj.geometry)):
obj.geometry = obj.geometry.difference(holes_geom)
results = []
results.extend(buildings)
for door in level.doors.all():
results.append(door)
results.extend(sorted(spaces.values(), key=EditorViewSet.space_sorting_func))
return results
@staticmethod
def _get_levels_pk(request, level):
# noinspection PyPep8Naming
Level = request.changeset.wrap_model('Level')
levels_under = ()
levels_on_top = ()
lower_level = level.lower(Level).first()
primary_levels = (level,) + ((lower_level,) if lower_level else ())
secondary_levels = Level.objects.filter(on_top_of__in=primary_levels).values_list('pk', 'on_top_of')
if lower_level:
levels_under = tuple(pk for pk, on_top_of in secondary_levels if on_top_of == lower_level.pk)
if True:
levels_on_top = tuple(pk for pk, on_top_of in secondary_levels if on_top_of == level.pk)
levels = chain([level.pk], levels_under, levels_on_top)
return levels, levels_on_top, levels_under
@staticmethod
def area_sorting_func(area):
groups = tuple(area.groups.all())
if not groups:
return (0, 0, 0)
return (1, groups[0].category.priority, groups[0].hierarchy, groups[0].priority)
# noinspection PyPep8Naming
@action(detail=False, methods=['get'])
@api_etag_with_update_cache_key(etag_func=editor_etag_func, cache_parameters={'level': str, 'space': str})
def geometries(self, request, update_cache_key, update_cache_key_match, *args, **kwargs):
Level = request.changeset.wrap_model('Level')
Space = request.changeset.wrap_model('Space')
Column = request.changeset.wrap_model('Column')
Hole = request.changeset.wrap_model('Hole')
AltitudeMarker = request.changeset.wrap_model('AltitudeMarker')
Building = request.changeset.wrap_model('Building')
Door = request.changeset.wrap_model('Door')
LocationGroup = request.changeset.wrap_model('LocationGroup')
WifiMeasurement = request.changeset.wrap_model('WifiMeasurement')
RangingBeacon = request.changeset.wrap_model('RangingBeacon')
level = request.GET.get('level')
space = request.GET.get('space')
if level is not None:
if space is not None:
raise ValidationError('Only level or space can be specified.')
level = get_object_or_404(Level.objects.filter(Level.q_for_request(request)), pk=level)
edit_utils = LevelChildEditUtils(level, request)
if not edit_utils.can_access_child_base_mapdata:
raise PermissionDenied
levels, levels_on_top, levels_under = self._get_levels_pk(request, level)
# don't prefetch groups for now as changesets do not yet work with m2m-prefetches
levels = Level.objects.filter(pk__in=levels).filter(Level.q_for_request(request))
# graphnodes_qs = request.changeset.wrap_model('GraphNode').objects.all()
levels = levels.prefetch_related(
Prefetch('spaces', Space.objects.filter(Space.q_for_request(request)).only(
'geometry', 'level', 'outside'
)),
Prefetch('doors', Door.objects.filter(Door.q_for_request(request)).only('geometry', 'level')),
Prefetch('spaces__columns', Column.objects.filter(
Q(access_restriction__isnull=True) | ~Column.q_for_request(request)
).only('geometry', 'space')),
Prefetch('spaces__groups', LocationGroup.objects.only(
'color', 'category', 'priority', 'hierarchy', 'category__priority', 'category__allow_spaces'
)),
Prefetch('buildings', Building.objects.only('geometry', 'level')),
Prefetch('spaces__holes', Hole.objects.only('geometry', 'space')),
Prefetch('spaces__altitudemarkers', AltitudeMarker.objects.only('geometry', 'space')),
Prefetch('spaces__wifi_measurements', WifiMeasurement.objects.only('geometry', 'space')),
Prefetch('spaces__ranging_beacons', RangingBeacon.objects.only('geometry', 'space')),
# Prefetch('spaces__graphnodes', graphnodes_qs)
)
levels = {s.pk: s for s in levels}
level = levels[level.pk]
levels_under = [levels[pk] for pk in levels_under]
levels_on_top = [levels[pk] for pk in levels_on_top]
# todo: permissions
# graphnodes = tuple(chain(*(space.graphnodes.all()
# for space in chain(*(level.spaces.all() for level in levels.values())))))
# graphnodes_lookup = {node.pk: node for node in graphnodes}
# graphedges = request.changeset.wrap_model('GraphEdge').objects.all()
# graphedges = graphedges.filter(Q(from_node__in=graphnodes) | Q(to_node__in=graphnodes))
# graphedges = graphedges.select_related('waytype')
# this is faster because we only deserialize graphnode geometries once
# missing_graphnodes = graphnodes_qs.filter(pk__in=set(chain(*((edge.from_node_id, edge.to_node_id)
# for edge in graphedges))))
# graphnodes_lookup.update({node.pk: node for node in missing_graphnodes})
# for edge in graphedges:
# edge._from_node_cache = graphnodes_lookup[edge.from_node_id]
# edge._to_node_cache = graphnodes_lookup[edge.to_node_id]
# graphedges = [edge for edge in graphedges if edge.from_node.space_id != edge.to_node.space_id]
results = chain(
*(self._get_level_geometries(level) for level in levels_under),
self._get_level_geometries(level),
*(self._get_level_geometries(level) for level in levels_on_top),
*(space.altitudemarkers.all() for space in level.spaces.all()),
*(space.wifi_measurements.all() for space in level.spaces.all()),
*(space.ranging_beacons.all() for space in level.spaces.all()),
# graphedges,
# graphnodes,
)
elif space is not None:
space_q_for_request = Space.q_for_request(request)
qs = Space.objects.filter(space_q_for_request)
space = get_object_or_404(qs.select_related('level', 'level__on_top_of'), pk=space)
level = space.level
edit_utils = SpaceChildEditUtils(space, request)
if not edit_utils.can_access_child_base_mapdata:
raise PermissionDenied
if request.user_permissions.can_access_base_mapdata:
doors = [door for door in level.doors.filter(Door.q_for_request(request)).all()
if door.geometry.wrapped_geom.intersects(space.geometry.wrapped_geom)]
doors_space_geom = unary_union(
[unwrap_geom(door.geometry) for door in doors] +
[unwrap_geom(space.geometry)]
)
levels, levels_on_top, levels_under = self._get_levels_pk(request, level.primary_level)
if level.on_top_of_id is not None:
levels = chain([level.pk], levels_on_top)
other_spaces = Space.objects.filter(space_q_for_request, level__pk__in=levels).only(
'geometry', 'level'
).prefetch_related(
Prefetch('groups', LocationGroup.objects.only(
'color', 'category', 'priority', 'hierarchy', 'category__priority', 'category__allow_spaces'
).filter(color__isnull=False))
)
space = next(s for s in other_spaces if s.pk == space.pk)
other_spaces = [s for s in other_spaces
if s.geometry.intersects(doors_space_geom) and s.pk != space.pk]
all_other_spaces = other_spaces
if level.on_top_of_id is None:
other_spaces_lower = [s for s in other_spaces if s.level_id in levels_under]
other_spaces_upper = [s for s in other_spaces if s.level_id in levels_on_top]
else:
other_spaces_lower = [s for s in other_spaces if s.level_id == level.on_top_of_id]
other_spaces_upper = []
other_spaces = [s for s in other_spaces if s.level_id == level.pk]
space.bounds = True
# deactivated for performance reasons
buildings = level.buildings.all()
# buildings_geom = unary_union([building.geometry for building in buildings])
# for other_space in other_spaces:
# if other_space.outside:
# other_space.geometry = other_space.geometry.difference(buildings_geom)
for other_space in chain(other_spaces, other_spaces_lower, other_spaces_upper):
other_space.opacity = 0.4
other_space.color = '#ffffff'
for building in buildings:
building.opacity = 0.5
else:
buildings = []
doors = []
other_spaces = []
other_spaces_lower = []
other_spaces_upper = []
all_other_spaces = []
# todo: permissions
if request.user_permissions.can_access_base_mapdata:
graphnodes = request.changeset.wrap_model('GraphNode').objects.all()
graphnodes = graphnodes.filter((Q(space__in=all_other_spaces)) | Q(space__pk=space.pk))
space_graphnodes = tuple(node for node in graphnodes if node.space_id == space.pk)
graphedges = request.changeset.wrap_model('GraphEdge').objects.all()
space_graphnodes_ids = tuple(node.pk for node in space_graphnodes)
graphedges = graphedges.filter(Q(from_node__pk__in=space_graphnodes_ids) |
Q(to_node__pk__in=space_graphnodes_ids))
graphedges = graphedges.select_related('from_node', 'to_node', 'waytype').only(
'from_node__geometry', 'to_node__geometry', 'waytype__color'
)
else:
graphnodes = []
graphedges = []
areas = space.areas.filter(Area.q_for_request(request)).only(
'geometry', 'space'
).prefetch_related(
Prefetch('groups', LocationGroup.objects.order_by(
'-category__priority', '-hierarchy', '-priority'
).only(
'color', 'category', 'priority', 'hierarchy', 'category__priority', 'category__allow_areas'
))
)
for area in areas:
area.opacity = 0.5
areas = sorted(areas, key=self.area_sorting_func)
results = chain(
buildings,
other_spaces_lower,
doors,
other_spaces,
[space],
areas,
space.holes.all().only('geometry', 'space'),
space.stairs.all().only('geometry', 'space'),
space.ramps.all().only('geometry', 'space'),
space.obstacles.all().only('geometry', 'space', 'color'),
space.lineobstacles.all().only('geometry', 'width', 'space', 'color'),
space.columns.all().only('geometry', 'space'),
space.altitudemarkers.all().only('geometry', 'space'),
space.wifi_measurements.all().only('geometry', 'space'),
space.ranging_beacons.all().only('geometry', 'space'),
space.pois.filter(POI.q_for_request(request)).only('geometry', 'space').prefetch_related(
Prefetch('groups', LocationGroup.objects.only(
'color', 'category', 'priority', 'hierarchy', 'category__priority', 'category__allow_pois'
).filter(color__isnull=False))
),
other_spaces_upper,
graphedges,
graphnodes
)
else:
raise ValidationError('No level or space specified.')
return Response(list(chain(
[('update_cache_key', update_cache_key)],
(self.conditional_geojson(obj, update_cache_key_match) for obj in results)
)))
def conditional_geojson(self, obj, update_cache_key_match):
if update_cache_key_match and not obj._affected_by_changeset:
return obj.get_geojson_key()
result = obj.to_geojson(instance=obj)
result['properties']['changed'] = obj._affected_by_changeset
return result
@action(detail=False, methods=['get'])
@api_etag(etag_func=MapUpdate.current_cache_key, cache_parameters={})
def geometrystyles(self, request, *args, **kwargs):
return Response({
'building': '#aaaaaa',
'space': '#eeeeee',
'hole': 'rgba(255, 0, 0, 0.3)',
'door': '#ffffff',
'area': '#55aaff',
'stair': '#a000a0',
'ramp': 'rgba(160, 0, 160, 0.2)',
'obstacle': '#999999',
'lineobstacle': '#999999',
'column': 'rgba(0, 0, 50, 0.3)',
'poi': '#4488cc',
'shadow': '#000000',
'graphnode': '#009900',
'graphedge': '#00CC00',
'altitudemarker': '#0000FF',
'wifimeasurement': '#DDDD00',
'rangingbeacon': '#CC00CC',
})
@action(detail=False, methods=['get'])
@api_etag(etag_func=editor_etag_func, cache_parameters={})
def bounds(self, request, *args, **kwargs):
return Response({
'bounds': Source.max_bounds(),
})
def __getattr__(self, name):
# allow POST and DELETE methods for the editor API
if getattr(self, 'get', None).__name__ in ('list', 'retrieve'):
if name == 'post' and (self.resolved.url_name.endswith('.create') or
self.resolved.url_name.endswith('.edit')):
return self.post_or_delete
if name == 'delete' and self.resolved.url_name.endswith('.edit'):
return self.post_or_delete
raise AttributeError
def post_or_delete(self, request, *args, **kwargs):
# django-rest-framework doesn't automatically do this for logged out requests
SessionAuthentication().enforce_csrf(request)
return self.retrieve(request, *args, **kwargs)
def list(self, request, *args, **kwargs):
return self.retrieve(request, *args, **kwargs)
@cached_property
def resolved(self):
resolved = None
path = self.kwargs.get('path', '')
if path:
try:
resolved = resolve('/editor/'+path+'/')
except Resolver404:
pass
if not resolved:
try:
resolved = resolve('/editor/'+path)
except Resolver404:
pass
self.request.sub_resolver_match = resolved
return resolved
def retrieve(self, request, *args, **kwargs):
resolved = self.resolved
if not resolved:
raise NotFound(_('No matching editor view endpoint found.'))
if not getattr(resolved.func, 'api_hybrid', False):
raise NotFound(_('Matching editor view point does not provide an API.'))
get_api_post_data(request)
response = resolved.func(request, api=True, *resolved.args, **resolved.kwargs)
return response
class ChangeSetViewSet(EditorViewSetMixin, ReadOnlyModelViewSet):
"""
List and manipulate changesets. All lists are ordered by last update descending. Use ?offset= to specify an offset.
Don't forget to set X-Csrftoken for POST requests!
/ lists all changesets this user can see.
/user/ lists changesets by this user
/reviewing/ lists changesets this user is currently reviewing.
/pending_review/ lists changesets this user can review.
/current/ returns the current changeset.
/direct_editing/ POST to activate direct editing (if available).
/deactive/ POST to deactivate current changeset or deactivate direct editing
/{id}/changes/ list all changes of a given changeset.
/{id}/activate/ POST to activate given changeset.
/{id}/edit/ POST to edit given changeset (provide title and description in POST data).
/{id}/restore_object/ POST to restore an object deleted by this changeset (provide change id as id in POST data).
/{id}/delete/ POST to delete given changeset.
/{id}/propose/ POST to propose given changeset.
/{id}/unpropose/ POST to unpropose given changeset.
/{id}/review/ POST to review given changeset.
/{id}/reject/ POST to reject given changeset (provide reject=1 in POST data for final rejection).
/{id}/unreject/ POST to unreject given changeset.
/{id}/apply/ POST to accept and apply given changeset.
"""
queryset = ChangeSet.objects.all()
def get_queryset(self):
return ChangeSet.qs_for_request(self.request).select_related('last_update', 'last_state_update', 'last_change')
def _list(self, request, qs):
offset = 0
if 'offset' in request.GET:
if not request.GET['offset'].isdigit():
raise ParseError('offset has to be a positive integer.')
offset = int(request.GET['offset'])
return Response([obj.serialize() for obj in qs.order_by('-last_update')[offset:offset+20]])
def list(self, request, *args, **kwargs):
return self._list(request, self.get_queryset())
@action(detail=False, methods=['get'])
def user(self, request, *args, **kwargs):
return self._list(request, self.get_queryset().filter(author=request.user))
@action(detail=False, methods=['get'])
def reviewing(self, request, *args, **kwargs):
return self._list(request, self.get_queryset().filter(
assigned_to=request.user, state='review'
))
@action(detail=False, methods=['get'])
def pending_review(self, request, *args, **kwargs):
return self._list(request, self.get_queryset().filter(
state__in=('proposed', 'reproposed'),
))
def retrieve(self, request, *args, **kwargs):
return Response(self.get_object().serialize())
@action(detail=False, methods=['get'])
def current(self, request, *args, **kwargs):
changeset = ChangeSet.get_for_request(request)
return Response({
'direct_editing': changeset.direct_editing,
'changeset': changeset.serialize() if changeset.pk else None,
})
@action(detail=False, methods=['post'])
def direct_editing(self, request, *args, **kwargs):
# django-rest-framework doesn't automatically do this for logged out requests
SessionAuthentication().enforce_csrf(request)
if not ChangeSet.can_direct_edit(request):
raise PermissionDenied(_('You don\'t have the permission to activate direct editing.'))
changeset = ChangeSet.get_for_request(request)
if changeset.pk is not None:
raise PermissionDenied(_('You cannot activate direct editing if you have an active changeset.'))
request.session['direct_editing'] = True
return Response({
'success': True,
})
@action(detail=False, methods=['post'])
def deactivate(self, request, *args, **kwargs):
# django-rest-framework doesn't automatically do this for logged out requests
SessionAuthentication().enforce_csrf(request)
request.session.pop('changeset', None)
request.session['direct_editing'] = False
return Response({
'success': True,
})
@action(detail=True, methods=['get'])
def changes(self, request, *args, **kwargs):
changeset = self.get_object()
changeset.fill_changes_cache()
return Response([obj.serialize() for obj in changeset.iter_changed_objects()])
@action(detail=True, methods=['post'])
def activate(self, request, *args, **kwargs):
changeset = self.get_object()
with changeset.lock_to_edit(request) as changeset:
if not changeset.can_activate(request):
raise PermissionDenied(_('You can not activate this change set.'))
changeset.activate(request)
return Response({'success': True})
@action(detail=True, methods=['post'])
def edit(self, request, *args, **kwargs):
changeset = self.get_object()
with changeset.lock_to_edit(request) as changeset:
if not changeset.can_edit(request):
raise PermissionDenied(_('You cannot edit this change set.'))
form = ChangeSetForm(instance=changeset, data=get_api_post_data(request))
if not form.is_valid():
raise ParseError(form.errors)
changeset = form.instance
update = changeset.updates.create(user=request.user,
title=changeset.title, description=changeset.description)
changeset.last_update = update
changeset.save()
return Response({'success': True})
@action(detail=True, methods=['post'])
def restore_object(self, request, *args, **kwargs):
data = get_api_post_data(request)
if 'id' not in data:
raise ParseError('Missing id.')
restore_id = data['id']
if isinstance(restore_id, str) and restore_id.isdigit():
restore_id = int(restore_id)
if not isinstance(restore_id, int):
raise ParseError('id needs to be an integer.')
changeset = self.get_object()
with changeset.lock_to_edit(request) as changeset:
if not changeset.can_edit(request):
raise PermissionDenied(_('You can not edit changes on this change set.'))
try:
changed_object = changeset.changed_objects_set.get(pk=restore_id)
except Exception:
raise NotFound('could not find object.')
try:
changed_object.restore()
except PermissionError:
raise PermissionDenied(_('You cannot restore this object, because it depends on '
'a deleted object or it would violate a unique contraint.'))
return Response({'success': True})
@action(detail=True, methods=['post'])
def propose(self, request, *args, **kwargs):
if not request.user.is_authenticated:
raise PermissionDenied(_('You need to log in to propose changes.'))
changeset = self.get_object()
with changeset.lock_to_edit(request) as changeset:
if not changeset.title or not changeset.description:
raise PermissionDenied(_('You need to add a title an a description to propose this change set.'))
if not changeset.can_propose(request):
raise PermissionDenied(_('You cannot propose this change set.'))
changeset.propose(request.user)
return Response({'success': True})
@action(detail=True, methods=['post'])
def unpropose(self, request, *args, **kwargs):
changeset = self.get_object()
with changeset.lock_to_edit(request) as changeset:
if not changeset.can_unpropose(request):
raise PermissionDenied(_('You cannot unpropose this change set.'))
changeset.unpropose(request.user)
return Response({'success': True})
@action(detail=True, methods=['post'])
def review(self, request, *args, **kwargs):
changeset = self.get_object()
with changeset.lock_to_edit(request) as changeset:
if not changeset.can_start_review(request):
raise PermissionDenied(_('You cannot review these changes.'))
changeset.start_review(request.user)
return Response({'success': True})
@action(detail=True, methods=['post'])
def reject(self, request, *args, **kwargs):
changeset = self.get_object()
with changeset.lock_to_edit(request) as changeset:
if not not changeset.can_end_review(request):
raise PermissionDenied(_('You cannot reject these changes.'))
form = RejectForm(get_api_post_data(request))
if not form.is_valid():
raise ParseError(form.errors)
changeset.reject(request.user, form.cleaned_data['comment'], form.cleaned_data['final'])
return Response({'success': True})
@action(detail=True, methods=['post'])
def unreject(self, request, *args, **kwargs):
changeset = self.get_object()
with changeset.lock_to_edit(request) as changeset:
if not changeset.can_unreject(request):
raise PermissionDenied(_('You cannot unreject these changes.'))
changeset.unreject(request.user)
return Response({'success': True})
@action(detail=True, methods=['post'])
def apply(self, request, *args, **kwargs):
changeset = self.get_object()
with changeset.lock_to_edit(request) as changeset:
if not changeset.can_end_review(request):
raise PermissionDenied(_('You cannot accept and apply these changes.'))
changeset.apply(request.user)
return Response({'success': True})
@action(detail=True, methods=['post'])
def delete(self, request, *args, **kwargs):
changeset = self.get_object()
with changeset.lock_to_edit(request) as changeset:
if not changeset.can_delete(request):
raise PermissionDenied(_('You cannot delete this change set.'))
changeset.delete()
return Response({'success': True})

View file

@ -14,7 +14,6 @@ from django.shortcuts import redirect, render
from django.utils.cache import patch_vary_headers
from django.utils.translation import get_language
from django.utils.translation import gettext_lazy as _
from rest_framework.response import Response as APIResponse
from c3nav.editor.models import ChangeSet
from c3nav.editor.wrappers import QuerySetWrapper
@ -269,7 +268,8 @@ def call_api_hybrid_view_for_api(func, request, *args, **kwargs):
result['messages'] = messages
result.move_to_end('messages', last=False)
api_response = APIResponse(result, status=response.status_code)
# todo: fix this
# api_response = APIResponse(result, status=response.status_code)
if request.method == 'GET':
response.add_headers(api_response)
return api_response

View file

@ -1,550 +0,0 @@
import mimetypes
import os
from collections import namedtuple
from functools import wraps
from urllib.parse import urlparse
from django.core.cache import cache
from django.db.models import Prefetch
from django.http import Http404, HttpResponse
from django.shortcuts import redirect
from django.utils.cache import get_conditional_response
from django.utils.http import http_date, quote_etag, urlsafe_base64_encode
from django.utils.translation import get_language
from django.utils.translation import gettext_lazy as _
from rest_framework.decorators import action
from rest_framework.exceptions import NotFound, ValidationError
from rest_framework.generics import get_object_or_404
from rest_framework.mixins import RetrieveModelMixin, UpdateModelMixin
from rest_framework.response import Response
from rest_framework.viewsets import GenericViewSet, ReadOnlyModelViewSet, ViewSet
from c3nav.mapdata.models import AccessRestriction, Building, Door, Hole, LocationGroup, MapUpdate, Source, Space
from c3nav.mapdata.models.access import AccessPermission, AccessRestrictionGroup
from c3nav.mapdata.models.geometry.base import GeometryMixin
from c3nav.mapdata.models.geometry.level import LevelGeometryMixin
from c3nav.mapdata.models.geometry.space import (POI, Area, Column, CrossDescription, LeaveDescription, LineObstacle,
Obstacle, Ramp, SpaceGeometryMixin, Stair)
from c3nav.mapdata.models.level import Level
from c3nav.mapdata.models.locations import (DynamicLocation, Location, LocationGroupCategory, LocationRedirect,
LocationSlug, Position, SpecificLocation)
from c3nav.mapdata.utils.cache.local import LocalCacheProxy
from c3nav.mapdata.utils.cache.stats import increment_cache_key
from c3nav.mapdata.utils.locations import (get_location_by_id_for_request, get_location_by_slug_for_request,
searchable_locations_for_request, visible_locations_for_request)
from c3nav.mapdata.utils.models import get_submodels
from c3nav.mapdata.utils.user import can_access_editor, get_user_data
from c3nav.mapdata.views import set_tile_access_cookie
request_cache = LocalCacheProxy(maxsize=64)
def optimize_query(qs):
if issubclass(qs.model, SpecificLocation):
base_qs = LocationGroup.objects.select_related('category')
qs = qs.prefetch_related(Prefetch('groups', queryset=base_qs))
if issubclass(qs.model, AccessRestriction):
qs = qs.prefetch_related('groups')
return qs
def api_stats_clean_location_value(value):
if isinstance(value, str) and value.startswith('c:'):
value = value.split(':')
value = 'c:%s:%d:%d' % (value[1], int(float(value[2]) / 3) * 3, int(float(value[3]) / 3) * 3)
return (value, 'c:anywhere')
return (value, )
def api_stats(view_name):
def wrapper(func):
@wraps(func)
def wrapped_func(self, request, *args, **kwargs):
response = func(self, request, *args, **kwargs)
if response.status_code < 400 and kwargs:
name, value = next(iter(kwargs.items()))
for value in api_stats_clean_location_value(value):
increment_cache_key('apistats__%s__%s__%s' % (view_name, name, value))
return response
return wrapped_func
return wrapper
def api_etag(permissions=True, etag_func=AccessPermission.etag_func,
cache_parameters=None, cache_kwargs=None, base_mapdata_check=False):
def wrapper(func):
@wraps(func)
def wrapped_func(self, request, *args, **kwargs):
response_format = self.perform_content_negotiation(request)[0].format
etag_user = (':'+str(request.user.pk or 0)) if response_format == 'api' else ''
raw_etag = '%s%s:%s:%s' % (response_format, etag_user, get_language(),
(etag_func(request) if permissions else MapUpdate.current_cache_key()))
if base_mapdata_check and self.base_mapdata:
raw_etag += ':%d' % request.user_permissions.can_access_base_mapdata
etag = quote_etag(raw_etag)
response = get_conditional_response(request, etag=etag)
if response is None:
cache_key = 'mapdata:api:'+request.path_info[5:].replace('/', '-').strip('-')+':'+raw_etag
if cache_parameters is not None:
for param, type_ in cache_parameters.items():
value = int(param in request.GET) if type_ == bool else type_(request.GET.get(param))
cache_key += ':'+urlsafe_base64_encode(str(value).encode())
if cache_kwargs is not None:
for name, type_ in cache_kwargs.items():
value = type_(kwargs[name])
if type_ == bool:
value = int(value)
cache_key += ':'+urlsafe_base64_encode(str(value).encode())
data = request_cache.get(cache_key)
if data is not None:
response = Response(data)
if response is None:
with GeometryMixin.dont_keep_originals():
response = func(self, request, *args, **kwargs)
if cache_parameters is not None and response.status_code == 200:
request_cache.set(cache_key, response.data, 900)
if response.status_code == 200:
response['ETag'] = etag
response['Cache-Control'] = 'no-cache'
return response
return wrapped_func
return wrapper
class MapViewSet(ViewSet):
"""
Map API
/bounds/ returns the maximum bounds of the map
"""
@action(detail=False, methods=['get'])
@api_etag(permissions=False, cache_parameters={})
def bounds(self, request, *args, **kwargs):
return Response({
'bounds': Source.max_bounds(),
})
class MapdataViewSet(ReadOnlyModelViewSet):
base_mapdata = False
order_by = ('id', )
def get_queryset(self):
qs = super().get_queryset()
if hasattr(qs.model, 'qs_for_request'):
return qs.model.qs_for_request(self.request)
return qs
@staticmethod
def can_access_geometry(request, obj):
if isinstance(obj, Space):
return obj.base_mapdata_accessible or request.user_permissions.can_access_base_mapdata
elif isinstance(obj, (Building, Door)):
return request.user_permissions.can_access_base_mapdata
return True
qs_filter = namedtuple('qs_filter', ('field', 'model', 'key', 'value'))
def _get_keys_for_model(self, request, model, key):
if hasattr(model, 'qs_for_request'):
cache_key = 'mapdata:api:keys:%s:%s:%s' % (model.__name__, key,
AccessPermission.cache_key_for_request(request))
qs = model.qs_for_request(request)
else:
cache_key = 'mapdata:api:keys:%s:%s:%s' % (model.__name__, key,
MapUpdate.current_cache_key())
qs = model.objects.all()
result = cache.get(cache_key, None)
if result is not None:
return result
result = set(qs.values_list(key, flat=True))
cache.set(cache_key, result, 300)
return result
def _get_list(self, request):
qs = optimize_query(self.get_queryset())
filters = []
if issubclass(qs.model, LevelGeometryMixin) and 'level' in request.GET:
filters.append(self.qs_filter(field='level', model=Level, key='pk', value=request.GET['level']))
if issubclass(qs.model, SpaceGeometryMixin) and 'space' in request.GET:
filters.append(self.qs_filter(field='space', model=Space, key='pk', value=request.GET['space']))
if issubclass(qs.model, LocationGroup) and 'category' in request.GET:
filters.append(self.qs_filter(field='category', model=LocationGroupCategory,
key='pk' if request.GET['category'].isdigit() else 'name',
value=request.GET['category']))
if issubclass(qs.model, SpecificLocation) and 'group' in request.GET:
filters.append(self.qs_filter(field='groups', model=LocationGroup, key='pk', value=request.GET['group']))
if qs.model == Level and 'on_top_of' in request.GET:
value = None if request.GET['on_top_of'] == 'null' else request.GET['on_top_of']
filters.append(self.qs_filter(field='on_top_of', model=Level, key='pk', value=value))
cache_key = 'mapdata:api:%s:%s' % (qs.model.__name__, AccessPermission.cache_key_for_request(request))
for qs_filter in filters:
cache_key += ';%s,%s' % (qs_filter.field, qs_filter.value)
results = cache.get(cache_key, None)
if results is not None:
return results
for qs_filter in filters:
if qs_filter.key == 'pk' and not qs_filter.value.isdigit():
raise ValidationError(detail={
'detail': _('%(field)s is not an integer.') % {'field': qs_filter.field}
})
for qs_filter in filters:
if qs_filter.value is not None:
keys = self._get_keys_for_model(request, qs_filter.model, qs_filter.key)
value = int(qs_filter.value) if qs_filter.key == 'pk' else qs_filter.value
if value not in keys:
raise NotFound(detail=_('%(model)s not found.') % {'model': qs_filter.model._meta.verbose_name})
results = tuple(qs.order_by(*self.order_by))
cache.set(cache_key, results, 300)
return results
@api_etag(base_mapdata_check=True)
def list(self, request, *args, **kwargs):
geometry = 'geometry' in request.GET
results = self._get_list(request)
if results:
geometry = geometry and self.can_access_geometry(request, results[0])
return Response([obj.serialize(geometry=geometry) for obj in results])
@api_etag(base_mapdata_check=True)
def retrieve(self, request, *args, **kwargs):
obj = self.get_object()
return Response(obj.serialize(geometry=self.can_access_geometry(request, obj)))
@staticmethod
def list_types(models_list, **kwargs):
return Response([
model.serialize_type(**kwargs) for model in models_list
])
class LevelViewSet(MapdataViewSet):
"""
Add ?on_top_of=<null or id> to filter by on_top_of, add ?group=<id> to filter by group.
A Level is a Location so if it is visible, you can use its ID in the Location API as well.
"""
queryset = Level.objects.all()
@action(detail=False, methods=['get'])
@api_etag(permissions=False, cache_parameters={})
def geometrytypes(self, request):
return self.list_types(get_submodels(LevelGeometryMixin))
class BuildingViewSet(MapdataViewSet):
""" Add ?geometry=1 to get geometries if available, add ?level=<id> to filter by level. """
queryset = Building.objects.all()
base_mapdata = True
class SpaceViewSet(MapdataViewSet):
"""
Add ?geometry=1 to get geometries if available, ?level=<id> to filter by level, ?group=<id> to filter by group.
A Space is a Location so if it is visible, you can use its ID in the Location API as well.
"""
queryset = Space.objects.all()
base_mapdata = True
@action(detail=False, methods=['get'])
@api_etag(permissions=False, cache_parameters={})
def geometrytypes(self, request):
return self.list_types(get_submodels(SpaceGeometryMixin))
class DoorViewSet(MapdataViewSet):
""" Add ?geometry=1 to get geometries if available, add ?level=<id> to filter by level. """
queryset = Door.objects.all()
base_mapdata = True
class HoleViewSet(MapdataViewSet):
""" Add ?geometry=1 to get geometries, add ?space=<id> to filter by space. """
queryset = Hole.objects.all()
class AreaViewSet(MapdataViewSet):
"""
Add ?geometry=1 to get geometries, add ?space=<id> to filter by space, add ?group=<id> to filter by group.
An Area is a Location so if it is visible, you can use its ID in the Location API as well.
"""
queryset = Area.objects.all()
class StairViewSet(MapdataViewSet):
""" Add ?geometry=1 to get geometries, add ?space=<id> to filter by space. """
queryset = Stair.objects.all()
class RampViewSet(MapdataViewSet):
""" Add ?geometry=1 to get geometries, add ?space=<id> to filter by space. """
queryset = Ramp.objects.all()
class ObstacleViewSet(MapdataViewSet):
""" Add ?geometry=1 to get geometries, add ?space=<id> to filter by space. """
queryset = Obstacle.objects.all()
class LineObstacleViewSet(MapdataViewSet):
""" Add ?geometry=1 to get geometries, add ?space=<id> to filter by space. """
queryset = LineObstacle.objects.all()
class ColumnViewSet(MapdataViewSet):
""" Add ?geometry=1 to get geometries, add ?space=<id> to filter by space. """
queryset = Column.objects.all()
class POIViewSet(MapdataViewSet):
"""
Add ?geometry=1 to get geometries, add ?space=<id> to filter by space, add ?group=<id> to filter by group.
A POI is a Location so if it is visible, you can use its ID in the Location API as well.
"""
queryset = POI.objects.all()
class LeaveDescriptionViewSet(MapdataViewSet):
queryset = LeaveDescription.objects.all()
class CrossDescriptionViewSet(MapdataViewSet):
queryset = CrossDescription.objects.all()
class LocationGroupCategoryViewSet(MapdataViewSet):
queryset = LocationGroupCategory.objects.all()
class LocationGroupViewSet(MapdataViewSet):
"""
Add ?category=<id or name> to filter by category.
A Location Group is a Location so if it is visible, you can use its ID in the Location API as well.
"""
queryset = LocationGroup.objects.all()
class LocationViewSetBase(RetrieveModelMixin, GenericViewSet):
queryset = LocationSlug.objects.all()
base_mapdata = True
def get_object(self) -> LocationSlug:
raise NotImplementedError
@api_stats('location_retrieve')
@api_etag(cache_parameters={'show_redirects': bool, 'detailed': bool, 'geometry': bool}, base_mapdata_check=True)
def retrieve(self, request, key=None, *args, **kwargs):
show_redirects = 'show_redirects' in request.GET
detailed = 'detailed' in request.GET
geometry = 'geometry' in request.GET
location = self.get_object()
if location is None:
raise NotFound
if isinstance(location, LocationRedirect):
if not show_redirects:
return redirect('../' + str(location.target.slug)) # todo: why does redirect/reverse not work here?
return Response(location.serialize(include_type=True, detailed=detailed,
geometry=geometry and MapdataViewSet.can_access_geometry(request, location),
simple_geometry=True))
@action(detail=True, methods=['get'])
@api_stats('location_details')
@api_etag(base_mapdata_check=True)
def details(self, request, **kwargs):
location = self.get_object()
if location is None:
raise NotFound
if isinstance(location, LocationRedirect):
return redirect('../' + str(location.target.pk) + '/details/')
return Response(location.details_display(
detailed_geometry=MapdataViewSet.can_access_geometry(request, location),
editor_url=can_access_editor(request)
))
@action(detail=True, methods=['get'])
@api_stats('location_geometry')
@api_etag(base_mapdata_check=True)
def geometry(self, request, **kwargs):
location = self.get_object()
if location is None:
raise NotFound
if isinstance(location, LocationRedirect):
return redirect('../' + str(location.target.pk) + '/geometry/')
return Response({
'id': location.pk,
'level': getattr(location, 'level_id', None),
'geometry': location.get_geometry(
detailed_geometry=MapdataViewSet.can_access_geometry(request, location)
)
})
class LocationViewSet(LocationViewSetBase):
"""
Locations are Levels, Spaces, Areas, POIs and Location Groups (see /locations/types/). They have a shared ID pool.
This API endpoint only accesses locations that have can_search or can_describe set to true.
If you want to access all of them, use the API endpoints for the Location Types.
Additionally, you can access Custom Locations (Coordinates) by using c:<level.short_label>:x:y as an id or slug.
add ?searchable to only show locations with can_search set to true ordered by relevance
add ?detailed to show all attributes
add ?geometry to show geometries if available
/{id}/ add ?show_redirect=1 to suppress redirects and show them as JSON.
also possible: /by_slug/{slug}/
"""
queryset = LocationSlug.objects.all()
lookup_value_regex = r'[^/]+'
def get_object(self):
return get_location_by_id_for_request(self.kwargs['pk'], self.request)
@api_etag(cache_parameters={'searchable': bool, 'detailed': bool, 'geometry': bool}, base_mapdata_check=True)
def list(self, request, *args, **kwargs):
searchable = 'searchable' in request.GET
detailed = 'detailed' in request.GET
geometry = 'geometry' in request.GET
cache_key = 'mapdata:api:location:list:%d:%s:%d' % (
searchable + detailed*2 + geometry*4,
AccessPermission.cache_key_for_request(request),
request.user_permissions.can_access_base_mapdata
)
result = cache.get(cache_key, None)
if result is None:
if searchable:
locations = searchable_locations_for_request(request)
else:
locations = visible_locations_for_request(request).values()
result = tuple(obj.serialize(include_type=True, detailed=detailed, search=searchable,
geometry=geometry and MapdataViewSet.can_access_geometry(request, obj),
simple_geometry=True)
for obj in locations)
cache.set(cache_key, result, 300)
return Response(result)
@action(detail=False, methods=['get'])
@api_etag(permissions=False)
def types(self, request):
return MapdataViewSet.list_types(get_submodels(Location), geomtype=False)
class LocationBySlugViewSet(LocationViewSetBase):
queryset = LocationSlug.objects.all()
lookup_field = 'slug'
lookup_value_regex = r'[^/]+'
def get_object(self):
return get_location_by_slug_for_request(self.kwargs['slug'], self.request)
class DynamicLocationPositionViewSet(UpdateModelMixin, RetrieveModelMixin, GenericViewSet):
queryset = LocationSlug.objects.all()
lookup_field = 'slug'
lookup_value_regex = r'[^/]+'
def get_object(self):
slug = self.kwargs['slug']
if slug.startswith('p:'):
return get_object_or_404(Position, secret=slug[2:])
if slug.isdigit():
location = get_location_by_id_for_request(slug, self.request)
if isinstance(location, DynamicLocation):
return location
raise Http404
@api_stats('dynamic_location_retrieve')
def retrieve(self, request, key=None, *args, **kwargs):
obj = self.get_object()
return Response(obj.serialize_position())
class SourceViewSet(MapdataViewSet):
queryset = Source.objects.all()
order_by = ('name',)
@action(detail=True, methods=['get'])
def image(self, request, pk=None):
return self._image(request, pk=pk)
def _image(self, request, pk=None):
source = self.get_object()
last_modified = int(os.path.getmtime(source.filepath))
response = get_conditional_response(request, last_modified=last_modified)
if response is None:
response = HttpResponse(open(source.filepath, 'rb'), content_type=mimetypes.guess_type(source.name)[0])
response['Last-Modified'] = http_date(last_modified)
return response
class AccessRestrictionViewSet(MapdataViewSet):
queryset = AccessRestriction.objects.all()
class AccessRestrictionGroupViewSet(MapdataViewSet):
queryset = AccessRestrictionGroup.objects.all()
class UpdatesViewSet(GenericViewSet):
"""
Get information about recent updates.
Get display information about the current user.
Set the tile access cookie.
The tile access cookie is only valid for 1 minute, so if you are displaying a map, call this endpoint repeatedly.
"""
@action(detail=False, methods=['get'])
def fetch(self, request, key=None):
cross_origin = request.META.get('HTTP_ORIGIN')
if cross_origin is not None:
try:
if request.META['HTTP_HOST'] == urlparse(cross_origin).hostname:
cross_origin = None
except ValueError:
pass
increment_cache_key('api_updates_fetch_requests%s' % ('_cross_origin' if cross_origin is not None else ''))
from c3nav.site.models import SiteUpdate
result = {
'last_site_update': SiteUpdate.last_update(),
'last_map_update': MapUpdate.current_processed_cache_key(),
}
if cross_origin is None:
result.update({
'user': get_user_data(request),
})
response = Response(result)
if cross_origin is not None:
response['Access-Control-Allow-Origin'] = cross_origin
response['Access-Control-Allow-Credentials'] = 'true'
set_tile_access_cookie(request, response)
return response

View file

@ -2,15 +2,16 @@ import json
from functools import wraps
from django.core.serializers.json import DjangoJSONEncoder
from django.db.models import Prefetch
from django.utils.cache import get_conditional_response
from django.utils.http import quote_etag
from django.utils.translation import get_language
from ninja.decorators import decorate_view
from c3nav.mapdata.api import api_stats_clean_location_value
from c3nav.mapdata.models import MapUpdate
from c3nav.mapdata.models import AccessRestriction, LocationGroup, MapUpdate
from c3nav.mapdata.models.access import AccessPermission
from c3nav.mapdata.models.geometry.base import GeometryMixin
from c3nav.mapdata.models.locations import SpecificLocation
from c3nav.mapdata.utils.cache.local import LocalCacheProxy
from c3nav.mapdata.utils.cache.stats import increment_cache_key
@ -90,3 +91,34 @@ def newapi_stats(stat_name):
return response
return wrapped_func
return decorate_view(wrapper)
def optimize_query(qs):
if issubclass(qs.model, SpecificLocation):
base_qs = LocationGroup.objects.select_related('category')
qs = qs.prefetch_related(Prefetch('groups', queryset=base_qs))
if issubclass(qs.model, AccessRestriction):
qs = qs.prefetch_related('groups')
return qs
def api_stats_clean_location_value(value):
if isinstance(value, str) and value.startswith('c:'):
value = value.split(':')
value = 'c:%s:%d:%d' % (value[1], int(float(value[2]) / 3) * 3, int(float(value[3]) / 3) * 3)
return (value, 'c:anywhere')
return (value, )
def api_stats(view_name):
def wrapper(func):
@wraps(func)
def wrapped_func(self, request, *args, **kwargs):
response = func(self, request, *args, **kwargs)
if response.status_code < 400 and kwargs:
name, value = next(iter(kwargs.items()))
for value in api_stats_clean_location_value(value):
increment_cache_key('apistats__%s__%s__%s' % (view_name, name, value))
return response
return wrapped_func
return wrapper

View file

@ -60,10 +60,10 @@ def _location_list(request, detailed: bool, filters: LocationListFilters):
else:
locations = visible_locations_for_request(request).values()
result = tuple(obj.serialize(detailed=detailed, search=filters.searchable,
geometry=filters.geometry and can_access_geometry(request),
simple_geometry=True)
for obj in locations)
result = [obj.serialize(detailed=detailed, search=filters.searchable,
geometry=filters.geometry and can_access_geometry(request),
simple_geometry=True)
for obj in locations]
return result

View file

@ -6,14 +6,13 @@ from ninja import Router as APIRouter
from c3nav.api.exceptions import API404
from c3nav.api.newauth import auth_responses, validate_responses
from c3nav.mapdata.api import optimize_query
from c3nav.mapdata.models import (Area, Building, Door, Hole, Level, LocationGroup, LocationGroupCategory, Source,
Space, Stair)
from c3nav.mapdata.models.access import AccessRestriction, AccessRestrictionGroup
from c3nav.mapdata.models.geometry.space import (POI, Column, CrossDescription, LeaveDescription, LineObstacle,
Obstacle, Ramp)
from c3nav.mapdata.models.locations import DynamicLocation
from c3nav.mapdata.newapi.base import newapi_etag
from c3nav.mapdata.newapi.base import newapi_etag, optimize_query
from c3nav.mapdata.schemas.filters import (ByCategoryFilter, ByGroupFilter, ByOnTopOfFilter, FilterSchema,
LevelGeometryFilter, SpaceGeometryFilter)
from c3nav.mapdata.schemas.models import (AccessRestrictionGroupSchema, AccessRestrictionSchema, AreaSchema,

View file

@ -1,200 +0,0 @@
from django.core.exceptions import ValidationError
from django.urls import reverse
from django.utils.translation import gettext_lazy as _
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.viewsets import ViewSet
from c3nav.mapdata.api import api_stats_clean_location_value
from c3nav.mapdata.models.access import AccessPermission
from c3nav.mapdata.models.locations import Position
from c3nav.mapdata.utils.cache.stats import increment_cache_key
from c3nav.mapdata.utils.locations import visible_locations_for_request
from c3nav.routing.exceptions import LocationUnreachable, NoRouteFound, NotYetRoutable
from c3nav.routing.forms import RouteForm
from c3nav.routing.locator import Locator
from c3nav.routing.models import RouteOptions
from c3nav.routing.rangelocator import RangeLocator
from c3nav.routing.router import Router
class RoutingViewSet(ViewSet):
"""
/route/ Get routes.
/options/ Get or set route options.
/locate/ Wifi locate.
How to use the /locate/ endpoint:
POST visible wifi stations as JSON data like this:
{
"stations": [
{
"bssid": "11:22:33:44:55:66",
"ssid": "36C3",
"level": -55,
"frequency": 5500
},
...
]
}
"""
@action(detail=False, methods=['get', 'post'])
def route(self, request, *args, **kwargs):
params = request.POST if request.method == 'POST' else request.GET
form = RouteForm(params, request=request)
if not form.is_valid():
return Response({
'errors': form.errors,
}, status=400)
options = RouteOptions.get_for_request(request)
try:
options.update(params, ignore_unknown=True)
except ValidationError as e:
return Response({
'errors': (str(e), ),
}, status=400)
try:
route = Router.load().get_route(origin=form.cleaned_data['origin'],
destination=form.cleaned_data['destination'],
permissions=AccessPermission.get_for_request(request),
options=options)
except NotYetRoutable:
return Response({
'error': _('Not yet routable, try again shortly.'),
})
except LocationUnreachable:
return Response({
'error': _('Unreachable location.'),
})
except NoRouteFound:
return Response({
'error': _('No route found.'),
})
origin_values = api_stats_clean_location_value(form.cleaned_data['origin'].pk)
destination_values = api_stats_clean_location_value(form.cleaned_data['destination'].pk)
increment_cache_key('apistats__route')
for origin_value in origin_values:
for destination_value in destination_values:
increment_cache_key('apistats__route_tuple_%s_%s' % (origin_value, destination_value))
for value in origin_values:
increment_cache_key('apistats__route_origin_%s' % value)
for value in destination_values:
increment_cache_key('apistats__route_destination_%s' % value)
return Response({
'request': {
'origin': self.get_request_pk(form.cleaned_data['origin']),
'destination': self.get_request_pk(form.cleaned_data['destination']),
},
'options': options.serialize(),
'report_issue_url': reverse('site.report_create', kwargs={
'origin': request.POST['origin'],
'destination': request.POST['destination'],
'options': options.serialize_string()
}),
'result': route.serialize(locations=visible_locations_for_request(request)),
})
def get_request_pk(self, location):
return location.slug if isinstance(location, Position) else location.pk
@action(detail=False, methods=['get', 'post'])
def options(self, request, *args, **kwargs):
options = RouteOptions.get_for_request(request)
if request.method == 'POST':
try:
options.update(request.POST, ignore_unknown=True)
except ValidationError as e:
return Response({
'errors': (str(e),),
}, status=400)
options.save()
return Response(options.serialize())
@action(detail=False, methods=('POST', ))
def locate(self, request, *args, **kwargs):
if isinstance(request.data, list):
stations_data = request.data
data = {}
else:
data = request.data
if 'stations' not in data:
return Response({
'errors': (_('stations is missing.'),),
}, status=400)
stations_data = data['stations']
try:
location = Locator.load().locate(stations_data, permissions=AccessPermission.get_for_request(request))
if location is not None:
increment_cache_key('apistats__locate__%s' % location.pk)
except ValidationError:
return Response({
'errors': (_('Invalid scan data.'),),
}, status=400)
# if 'set_position' in data and location:
# set_position = data['set_position']
# if not set_position.startswith('p:'):
# return Response({
# 'errors': (_('Invalid set_position.'),),
# }, status=400)
#
# try:
# position = Position.objects.get(secret=set_position[2:])
# except Position.DoesNotExist:
# return Response({
# 'errors': (_('Invalid set_position.'),),
# }, status=400)
#
# form_data = {
# **data,
# 'coordinates_id': None if location is None else location.pk,
# }
#
# # todo: migrate
# # form = PositionAPIUpdateForm(instance=position, data=form_data, request=request)
# #
# # if not form.is_valid():
# # return Response({
# # 'errors': form.errors,
# # }, status=400)
# #
# # form.save()
return Response({'location': None if location is None else location.serialize(simple_geometry=True)})
@action(detail=False)
def locate_test(self, request):
from c3nav.mesh.messages import MeshMessageType
from c3nav.mesh.models import MeshNode
try:
node = MeshNode.objects.prefetch_last_messages(MeshMessageType.LOCATE_RANGE_RESULTS).get(
address="d4:f9:8d:2d:0d:f1"
)
except MeshNode.DoesNotExist:
return Response({
"location": None
})
msg = node.last_messages[MeshMessageType.LOCATE_RANGE_RESULTS]
locator = RangeLocator.load()
location = locator.locate(
{
r.peer: r.distance
for r in msg.parsed.ranges
if r.distance != 0xFFFF
},
None
)
return Response({
"ranges": msg.parsed.tojson(msg.parsed)["ranges"],
"datetime": msg.datetime,
"location": location.serialize(simple_geometry=True) if location else None
})

View file

@ -10,11 +10,11 @@ from ninja import Schema
from pydantic import PositiveInt
from c3nav.api.exceptions import APIRequestValidationFailed
from c3nav.api.newauth import auth_responses, validate_responses, APITokenAuth
from c3nav.api.newauth import APITokenAuth, auth_responses, validate_responses
from c3nav.api.utils import NonEmptyStr
from c3nav.mapdata.api import api_stats_clean_location_value
from c3nav.mapdata.models.access import AccessPermission
from c3nav.mapdata.models.locations import Position
from c3nav.mapdata.newapi.base import api_stats_clean_location_value
from c3nav.mapdata.schemas.model_base import AnyLocationID, Coordinates3D
from c3nav.mapdata.utils.cache.stats import increment_cache_key
from c3nav.mapdata.utils.locations import visible_locations_for_request

View file

@ -302,7 +302,6 @@ INSTALLED_APPS = [
'bootstrap3',
'ninja',
'c3nav.api',
'rest_framework',
'c3nav.mapdata',
'c3nav.routing',
'c3nav.site',
@ -372,16 +371,6 @@ USE_I18N = True
USE_L10N = True
USE_TZ = True
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': (
'rest_framework.authentication.SessionAuthentication',
'c3nav.api.auth.APISecretAuthentication',
),
'DEFAULT_PERMISSION_CLASSES': (
'rest_framework.permissions.AllowAny',
)
}
NINJA_PAGINATION_CLASS = "ninja.pagination.LimitOffsetPagination"
LOCALE_PATHS = (

View file

@ -15,8 +15,7 @@ import c3nav.site.urls
urlpatterns = [
path('editor/', include(c3nav.editor.urls)),
path('api/v2/', c3nav.api.urls.ninja_api.urls),
path('api/', include((c3nav.api.urls, 'api'), namespace='api')),
path('api/', include(c3nav.api.urls)),
path('map/', include(c3nav.mapdata.urls)),
path('admin/', admin.site.urls),
path('control/', include(c3nav.control.urls)),

View file

@ -2,7 +2,6 @@ Django==4.2.7
django-bootstrap3==23.4
django-compressor==4.4
csscompressor==0.9.5
djangorestframework==3.14.0
django-ninja==1.0.1
django-filter==23.4
django-environ==0.11.2