tentative generic mapdata api functions, still with quite a few todos
This commit is contained in:
parent
b2aa76ba2d
commit
0cf5a2aadf
4 changed files with 138 additions and 22 deletions
|
@ -1,8 +1,8 @@
|
|||
from typing import Annotated, Any, Type
|
||||
|
||||
import annotated_types
|
||||
from pydantic import AfterValidator, GetCoreSchemaHandler, GetJsonSchemaHandler, PlainSerializer
|
||||
from pydantic.json_schema import JsonSchemaValue, WithJsonSchema
|
||||
from pydantic import GetCoreSchemaHandler, GetJsonSchemaHandler
|
||||
from pydantic.json_schema import JsonSchemaValue
|
||||
from pydantic_core import CoreSchema, core_schema
|
||||
from rest_framework.exceptions import ParseError
|
||||
|
||||
|
|
|
@ -1,11 +1,16 @@
|
|||
from typing import Optional, Sequence, Type
|
||||
|
||||
from django.db.models import Model
|
||||
from ninja import Query
|
||||
from ninja import Router as APIRouter
|
||||
from ninja.pagination import paginate
|
||||
|
||||
from c3nav.api.exceptions import API404
|
||||
from c3nav.api.newauth import auth_responses
|
||||
from c3nav.mapdata.api import optimize_query
|
||||
from c3nav.mapdata.models import Level, Source
|
||||
from c3nav.mapdata.schemas.filters import LevelFilters
|
||||
from c3nav.mapdata.models.access import AccessPermission
|
||||
from c3nav.mapdata.schemas.filters import FilterSchema, GroupFilter, OnTopOfFilter
|
||||
from c3nav.mapdata.schemas.models import LevelSchema
|
||||
from c3nav.mapdata.schemas.responses import BoundsSchema
|
||||
|
||||
|
@ -20,20 +25,68 @@ def bounds(request):
|
|||
}
|
||||
|
||||
|
||||
def mapdata_list_endpoint(request,
|
||||
model: Type[Model],
|
||||
filters: Optional[FilterSchema] = None,
|
||||
order_by: Sequence[str] = ('pk',)):
|
||||
# todo: request permissions based on api key
|
||||
|
||||
# todo: pagination cache?
|
||||
|
||||
# generate cache_key
|
||||
# todo: don't ignore request language
|
||||
cache_key = 'mapdata:api:%s:%s' % (model.__name__, AccessPermission.cache_key_for_request(request))
|
||||
if filters:
|
||||
for name in filters.model_fields_set: # noqa
|
||||
value = getattr(filters, name)
|
||||
if value is None:
|
||||
continue
|
||||
cache_key += ';%s,%s' % (name, value)
|
||||
|
||||
# todo: we have the cache key, this would be a great time for a shortcut
|
||||
|
||||
# validate filters
|
||||
if filters:
|
||||
filters.validate(request)
|
||||
|
||||
# get the queryset and filter it
|
||||
qs = optimize_query(
|
||||
model.qs_for_request(request) if hasattr(model, 'qs_for_request') else model.objects.all()
|
||||
)
|
||||
if filters:
|
||||
qs = filters.filter_qs(qs)
|
||||
|
||||
# order_by
|
||||
qs = qs.order_by(*order_by)
|
||||
|
||||
# todo: can access geometry… using defer?
|
||||
|
||||
return qs
|
||||
|
||||
|
||||
def mapdata_retrieve_endpoint(request, model: Type[Model], **lookups):
|
||||
try:
|
||||
return optimize_query(
|
||||
model.qs_for_request(request) if hasattr(model, 'qs_for_request') else model.objects.all()
|
||||
).get(**lookups)
|
||||
except model.DoesNotExist:
|
||||
raise API404("%s not found" % model.__name__.lower())
|
||||
|
||||
|
||||
class LevelFilters(GroupFilter, OnTopOfFilter):
|
||||
pass
|
||||
|
||||
|
||||
@map_api_router.get('/levels/', response=list[LevelSchema],
|
||||
summary="List available levels")
|
||||
summary="Get level list")
|
||||
@paginate
|
||||
def levels_list(request, filters: Query[LevelFilters]):
|
||||
# todo: access, caching, filtering, etc
|
||||
return Level.objects.all()
|
||||
# todo cache?
|
||||
return mapdata_list_endpoint(request, model=Level, filters=filters)
|
||||
|
||||
|
||||
@map_api_router.get('/levels/{level_id}/', response=LevelSchema,
|
||||
summary="List available levels")
|
||||
summary="Get level by ID")
|
||||
def level_detail(request, level_id: int):
|
||||
# todo: access, caching, filtering, etc
|
||||
try:
|
||||
level = Level.objects.get(id=level_id)
|
||||
except Level.DoesNotExist:
|
||||
raise API404("level not found")
|
||||
return level
|
||||
return mapdata_retrieve_endpoint(request, Level, pk=level_id)
|
||||
|
|
|
@ -1,16 +1,82 @@
|
|||
from typing import Literal, Optional
|
||||
from typing import Literal, Optional, Type
|
||||
|
||||
from django.core.cache import cache
|
||||
from django.db.models import Model, QuerySet
|
||||
from ninja import Schema
|
||||
from pydantic import Field as APIField
|
||||
|
||||
from c3nav.api.exceptions import APIRequestValidationFailed
|
||||
from c3nav.mapdata.models import Level, LocationGroup, MapUpdate
|
||||
from c3nav.mapdata.models.access import AccessPermission
|
||||
|
||||
class LevelFilters(Schema):
|
||||
|
||||
def get_keys_for_model(request, model: Type[Model], key: str) -> set:
|
||||
# get all accessible keys for this model for this request
|
||||
if hasattr(model, 'qs_for_request'):
|
||||
cache_key = 'mapdata:api:keys:%s:%s:%s' % (model.__name__, key,
|
||||
AccessPermission.cache_key_for_request(request))
|
||||
qs = model.qs_for_request(request)
|
||||
else:
|
||||
cache_key = 'mapdata:api:keys:%s:%s:%s' % (model.__name__, key,
|
||||
MapUpdate.current_cache_key())
|
||||
qs = model.objects.all()
|
||||
|
||||
result = cache.get(cache_key, None)
|
||||
if result is not None:
|
||||
return result
|
||||
|
||||
result = set(qs.values_list(key, flat=True))
|
||||
cache.set(cache_key, result, 300)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def assert_valid_value(request, model: Type[Model], key: str, values: set):
|
||||
keys = get_keys_for_model(request, model, key)
|
||||
remainder = values-keys
|
||||
if remainder:
|
||||
raise APIRequestValidationFailed("Unknown %s: %r" % (model.__name__, remainder))
|
||||
|
||||
|
||||
class FilterSchema(Schema):
|
||||
def filter_qs(self, qs: QuerySet) -> QuerySet:
|
||||
return qs
|
||||
|
||||
def validate(self, request):
|
||||
pass
|
||||
|
||||
|
||||
class GroupFilter(FilterSchema):
|
||||
group: Optional[int] = APIField(
|
||||
None,
|
||||
name="filter by location group"
|
||||
)
|
||||
|
||||
def validate(self, request):
|
||||
super().validate(request)
|
||||
if self.group is not None:
|
||||
assert_valid_value(request, LocationGroup, "pk", {self.group})
|
||||
|
||||
def filter_qs(self, qs: QuerySet) -> QuerySet:
|
||||
qs = super().filter_qs(qs)
|
||||
if self.group is not None:
|
||||
qs = qs.filter(groups=self.group)
|
||||
return super().filter_qs(qs)
|
||||
|
||||
|
||||
class OnTopOfFilter(FilterSchema):
|
||||
on_top_of: Optional[Literal["null"] | int] = APIField(
|
||||
None,
|
||||
name='filter by on top of level ID (or "null")',
|
||||
description='if set, only levels on top of this level (or "null" for no level) will be shown'
|
||||
)
|
||||
group: Optional[int] = APIField(
|
||||
None,
|
||||
name="filter by location group"
|
||||
)
|
||||
|
||||
def validate(self, request):
|
||||
super().validate(request)
|
||||
if self.group is not None and self.group != "null":
|
||||
assert_valid_value(request, Level, "pk", {self.on_top_of})
|
||||
|
||||
def filter_qs(self, qs: QuerySet) -> QuerySet:
|
||||
if self.on_top_of is not None:
|
||||
qs = qs.filter(on_top_of_id=None if self.on_top_of == "null" else self.on_top_of)
|
||||
return super().filter_qs(qs)
|
||||
|
|
|
@ -1,12 +1,9 @@
|
|||
from functools import cached_property
|
||||
from typing import Annotated, Any, Optional
|
||||
from typing import Any, Optional
|
||||
|
||||
import annotated_types
|
||||
from ninja import Schema
|
||||
from pydantic import Field as APIField
|
||||
from pydantic import PositiveInt, model_validator
|
||||
from pydantic.functional_validators import ModelWrapValidatorHandler
|
||||
from pydantic.utils import GetterDict
|
||||
from pydantic_core.core_schema import ValidationInfo
|
||||
|
||||
from c3nav.api.utils import NonEmptyStr
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue