make linter happy

This commit is contained in:
Laura Klünder 2023-11-26 17:55:23 +01:00
parent 42280d6c71
commit 8a8b20f22e
16 changed files with 56 additions and 63 deletions

View file

@ -3,14 +3,11 @@ from collections import OrderedDict
from django.conf import settings
from django.core.exceptions import ValidationError
from django.forms import CharField, ModelForm
from django.utils import timezone
from django.utils.text import capfirst, format_lazy
from django.utils.translation import get_language_info
from django.utils.translation import gettext_lazy as _
from c3nav.mapdata.fields import I18nField
from c3nav.mapdata.models.locations import Position
from c3nav.mapdata.utils.locations import get_location_by_id_for_request
class I18nModelFormMixin(ModelForm):

View file

@ -576,10 +576,6 @@ def get_position_secret():
return get_random_string(32, string.ascii_letters+string.digits)
def get_position_api_secret():
return get_random_string(64, string.ascii_letters+string.digits)
class Position(CustomLocationProxyMixin, models.Model):
owner = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
name = models.CharField(_('name'), max_length=32)

View file

@ -1,7 +1,6 @@
import json
from typing import Optional
from django.core.cache import cache
from django.core.serializers.json import DjangoJSONEncoder
from django.shortcuts import redirect
from django.utils import timezone
@ -14,7 +13,6 @@ from c3nav.api.exceptions import API404, APIPermissionDenied, APIRequestValidati
from c3nav.api.newauth import auth_permission_responses, auth_responses, validate_responses
from c3nav.api.utils import NonEmptyStr
from c3nav.mapdata.models import Source
from c3nav.mapdata.models.access import AccessPermission
from c3nav.mapdata.models.locations import DynamicLocation, LocationRedirect, Position
from c3nav.mapdata.newapi.base import newapi_etag, newapi_stats
from c3nav.mapdata.schemas.filters import BySearchableFilter, RemoveGeometryFilter
@ -260,7 +258,8 @@ def location_by_slug_geometry(request, location_slug: NonEmptyStr):
@map_api_router.get('/positions/{position_id}/',
response={200: AnyPositionStatusSchema, **API404.dict(), **auth_responses},
summary="get coordinates of a moving position",
description="a numeric ID for a dynamic location or a string ID for the position secret can be used")
description="a numeric ID for a dynamic location or a string ID for the position secret "
"can be used")
@newapi_stats('get_position')
def get_position_by_id(request, position_id: AnyPositionID):
# no caching for obvious reasons!

View file

@ -9,8 +9,8 @@ from c3nav.api.schema import GeometrySchema, PointSchema
from c3nav.api.utils import NonEmptyStr
from c3nav.mapdata.schemas.model_base import (AnyLocationID, AnyPositionID, CustomLocationID, DjangoModelSchema,
LabelSettingsSchema, LocationSchema, PositionID, SerializableSchema,
SimpleGeometryPointAndBoundsSchema, SimpleGeometryPointSchema,
SimpleGeometryLocationsSchema, SpecificLocationSchema, TitledSchema,
SimpleGeometryLocationsSchema, SimpleGeometryPointAndBoundsSchema,
SimpleGeometryPointSchema, SpecificLocationSchema, TitledSchema,
WithAccessRestrictionSchema, WithLevelSchema,
WithLineStringGeometrySchema, WithPointGeometrySchema,
WithPolygonGeometrySchema, WithSpaceSchema)
@ -632,7 +632,8 @@ class PositionStatusSchema(Schema):
)
class PositionUnavailableStatusSchema(PositionStatusSchema, SimpleGeometryPointAndBoundsSchema, TrackablePositionSchema):
class PositionUnavailableStatusSchema(PositionStatusSchema, SimpleGeometryPointAndBoundsSchema,
TrackablePositionSchema):
available: Literal[False]

View file

@ -195,7 +195,7 @@ class ChipRevFormat(SimpleFormat):
def __init__(self):
super().__init__('H')
def decode(self, data: bytes) -> tuple[tuple[int,int], bytes]:
def decode(self, data: bytes) -> tuple[tuple[int, int], bytes]:
value, data = super().decode(data)
return (value // 100, value % 100), data

View file

@ -115,7 +115,7 @@ class MeshConsumer(AsyncWebsocketConsumer):
traceback.print_exc()
return
#print(msg)
# print(msg)
if msg.dst != messages.MESH_ROOT_ADDRESS and msg.dst != messages.MESH_PARENT_ADDRESS:
# message not adressed to us, forward it
@ -370,7 +370,6 @@ class MeshConsumer(AsyncWebsocketConsumer):
else:
node_state.ota_recipient = None
@database_sync_to_async
def get_nodes_with_ota(self, addresses) -> dict[str, Optional[OTAUpdateRecipient]]:
return {node.address: node.current_ota
@ -429,11 +428,13 @@ class MeshConsumer(AsyncWebsocketConsumer):
traceback.print_exc()
await asyncio.sleep(1)
async def ota_set_chunks(self, update: OTAUpdate, chunks: Optional[set[int]] = None, min_chunk: int=0):
async def ota_set_chunks(self, update: OTAUpdate, chunks: Optional[set[int]] = None, min_chunk: int = 0):
async with self.ota_chunks_available_condition:
num_chunks = (update.build.binary.size-1)//OTA_CHUNK_SIZE+1
print('queueing chunks for update', update.id, 'num_chunks=%d' % num_chunks, "chunks:", chunks)
chunks = set(range(min_chunk, num_chunks*0+10)) if chunks is None else {chunk for chunk in chunks if chunk < num_chunks}
chunks = (set(range(min_chunk, num_chunks*0+10))
if chunks is None
else {chunk for chunk in chunks if chunk < num_chunks})
self.ota_chunks.setdefault(update.id, set()).update(chunks)
self.ota_chunks_available_condition.notify_all()
@ -522,7 +523,6 @@ class MeshConsumer(AsyncWebsocketConsumer):
"uplink": self.channel_name
})
async def remove_dst_nodes(self, addresses):
for address in tuple(addresses):
await self.log_text(address, "destination removed")
@ -559,12 +559,12 @@ class MeshUIConsumer(AsyncJsonWebsocketConsumer):
if content.get("subscribe", None) == "log":
await self.channel_layer.group_add("mesh_log", self.channel_name)
# disabled because security
#if content.get("subscribe", None) == "msg_sent":
# await self.channel_layer.group_add("mesh_msg_sent", self.channel_name)
# self.msg_sent_filter = dict(content.get("filter", {}))
#if content.get("subscribe", None) == "msg_received":
# await self.channel_layer.group_add("mesh_msg_sent", self.channel_name)
# self.msg_received_filter = dict(content.get("filter", {}))
# if content.get("subscribe", None) == "msg_sent":
# await self.channel_layer.group_add("mesh_msg_sent", self.channel_name)
# self.msg_sent_filter = dict(content.get("filter", {}))
# if content.get("subscribe", None) == "msg_received":
# await self.channel_layer.group_add("mesh_msg_sent", self.channel_name)
# self.msg_received_filter = dict(content.get("filter", {}))
if content.get("subscribe", None) == "ranging":
await self.channel_layer.group_add("mesh_msg_received", self.channel_name)
self.msg_received_filter = {"msg_type": MeshMessageType.LOCATE_RANGE_RESULTS.name}

View file

@ -161,7 +161,6 @@ class MeshLayerAnnounceMessage(MeshMessage, msg_type=MeshMessageType.MESH_LAYER_
})
@dataclass
class MeshAddDestinationsMessage(MeshMessage, msg_type=MeshMessageType.MESH_ADD_DESTINATIONS):
""" downstream node announces served destination """

View file

@ -421,6 +421,7 @@ class FirmwareBuild(models.Model):
unique_together = [
('version', 'variant'),
]
@property
def boards(self):
return {BoardType[board.board] for board in self.firmwarebuildboard_set.all()

View file

@ -2,7 +2,7 @@ from django.urls import path
from c3nav.mesh.consumers import MeshConsumer, MeshUIConsumer
from c3nav.mesh.views.firmware import (FirmwareBuildDetailView, FirmwareDetailView, FirmwaresCurrentListView,
FirmwaresListView, OTAListView, OTADetailView)
FirmwaresListView, OTADetailView, OTAListView)
from c3nav.mesh.views.messages import MeshMessageListView, MeshMessageSendingView, MeshMessageSendView
from c3nav.mesh.views.misc import MeshLogView, MeshRangingView
from c3nav.mesh.views.nodes import NodeDetailView, NodeEditView, NodeListView

View file

@ -2,7 +2,7 @@ from django.contrib.messages.views import SuccessMessageMixin
from django.views.generic import DetailView, ListView, TemplateView
from django.views.generic.edit import FormMixin
from c3nav.mesh.models import FirmwareBuild, FirmwareVersion, MeshNode, OTAUpdate, OTARecipientStatus
from c3nav.mesh.models import FirmwareBuild, FirmwareVersion, MeshNode, OTARecipientStatus, OTAUpdate
from c3nav.mesh.views.base import MeshControlMixin
from c3nav.site.forms import OTACreateForm

View file

@ -1,5 +1,4 @@
from django.contrib.messages.views import SuccessMessageMixin
from django.shortcuts import redirect
from django.urls import reverse
from django.utils.translation import gettext_lazy as _
from django.views.generic import DetailView, ListView, UpdateView

View file

@ -139,34 +139,34 @@ class RoutingViewSet(ViewSet):
'errors': (_('Invalid scan data.'),),
}, status=400)
if 'set_position' in data and location:
set_position = data['set_position']
if not set_position.startswith('p:'):
return Response({
'errors': (_('Invalid set_position.'),),
}, status=400)
try:
position = Position.objects.get(secret=set_position[2:])
except Position.DoesNotExist:
return Response({
'errors': (_('Invalid set_position.'),),
}, status=400)
form_data = {
**data,
'coordinates_id': None if location is None else location.pk,
}
# todo: migrate
#form = PositionAPIUpdateForm(instance=position, data=form_data, request=request)
if not form.is_valid():
return Response({
'errors': form.errors,
}, status=400)
form.save()
# if 'set_position' in data and location:
# set_position = data['set_position']
# if not set_position.startswith('p:'):
# return Response({
# 'errors': (_('Invalid set_position.'),),
# }, status=400)
#
# try:
# position = Position.objects.get(secret=set_position[2:])
# except Position.DoesNotExist:
# return Response({
# 'errors': (_('Invalid set_position.'),),
# }, status=400)
#
# form_data = {
# **data,
# 'coordinates_id': None if location is None else location.pk,
# }
#
# # todo: migrate
# # form = PositionAPIUpdateForm(instance=position, data=form_data, request=request)
# #
# # if not form.is_valid():
# # return Response({
# # 'errors': form.errors,
# # }, status=400)
# #
# # form.save()
return Response({'location': None if location is None else location.serialize(simple_geometry=True)})

View file

@ -15,6 +15,7 @@ def locate(request):
"bounds": Source.max_bounds(),
}
@positioning_api_router.get('/locate-test/', summary="get dummy location for debugging",
response={200: BoundsSchema, **auth_responses})
def locate_test(request):

View file

@ -15,8 +15,9 @@ def get_route(request):
"bounds": Source.max_bounds(),
}
@routing_api_router.get('/options/', summary="get current route options",
response={200: BoundsSchema, **auth_responses})
response={200: BoundsSchema, **auth_responses})
def get_route_options(request):
# todo: implement
return {
@ -34,7 +35,7 @@ def set_route_options(request):
@routing_api_router.get('/options/form/', summary="get current route options with form definitions (like old API)",
response={200: BoundsSchema, **auth_responses})
response={200: BoundsSchema, **auth_responses})
def get_route_options_form(request):
# todo: implement
return {

View file

@ -13,8 +13,8 @@ from c3nav.mapdata.forms import I18nModelFormMixin
from c3nav.mapdata.models.locations import Position
from c3nav.mapdata.models.report import Report, ReportUpdate
from c3nav.mesh.messages import MeshMessageType
from c3nav.mesh.models import FirmwareBuild, HardwareDescription, MeshNode, OTAUpdate, OTAUpdateRecipient, \
OTARecipientStatus
from c3nav.mesh.models import (FirmwareBuild, HardwareDescription, MeshNode, OTARecipientStatus, OTAUpdate,
OTAUpdateRecipient)
class ReportIssueForm(I18nModelFormMixin, ModelForm):

View file

@ -28,8 +28,7 @@ from c3nav.control.forms import AccessPermissionForm, SignedPermissionDataError
from c3nav.mapdata.grid import grid
from c3nav.mapdata.models import Location, Source
from c3nav.mapdata.models.access import AccessPermissionToken
from c3nav.mapdata.models.locations import (LocationRedirect, Position, SpecificLocation, get_position_api_secret,
get_position_secret)
from c3nav.mapdata.models.locations import LocationRedirect, Position, SpecificLocation, get_position_secret
from c3nav.mapdata.models.report import Report, ReportUpdate
from c3nav.mapdata.utils.locations import (get_location_by_id_for_request, get_location_by_slug_for_request,
levels_by_short_label_for_request)