2023-11-11 03:01:15 +01:00
|
|
|
from datetime import datetime
|
2023-11-30 22:02:40 +01:00
|
|
|
from typing import Annotated, Optional
|
2023-11-11 03:01:15 +01:00
|
|
|
|
2023-11-17 16:42:27 +01:00
|
|
|
from django.db import IntegrityError, transaction
|
2023-11-30 22:02:40 +01:00
|
|
|
from ninja import Field as APIField
|
|
|
|
from ninja import Query
|
2023-11-11 13:30:12 +01:00
|
|
|
from ninja import Router as APIRouter
|
2023-11-15 14:54:21 +01:00
|
|
|
from ninja import Schema, UploadedFile
|
2023-11-11 03:01:15 +01:00
|
|
|
from ninja.pagination import paginate
|
2023-11-18 21:29:35 +01:00
|
|
|
from pydantic import PositiveInt, field_validator
|
2023-11-11 03:01:15 +01:00
|
|
|
|
2023-11-18 21:29:35 +01:00
|
|
|
from c3nav.api.exceptions import API404, APIConflict, APIRequestValidationFailed
|
2023-11-19 00:12:10 +01:00
|
|
|
from c3nav.api.newauth import APITokenAuth, auth_permission_responses, auth_responses, validate_responses
|
2023-11-18 21:29:35 +01:00
|
|
|
from c3nav.mesh.dataformats import BoardType, ChipType, FirmwareImage
|
2023-11-27 22:09:56 +01:00
|
|
|
from c3nav.mesh.messages import MeshMessageType
|
|
|
|
from c3nav.mesh.models import FirmwareBuild, FirmwareVersion, NodeMessage
|
2023-11-11 03:01:15 +01:00
|
|
|
|
2023-11-30 22:02:40 +01:00
|
|
|
mesh_api_router = APIRouter(tags=["mesh"], auth=APITokenAuth(permissions={"mesh_control"}))
|
2023-11-11 03:01:15 +01:00
|
|
|
|
|
|
|
|
|
|
|
class FirmwareBuildSchema(Schema):
|
2023-11-19 00:12:10 +01:00
|
|
|
"""
|
|
|
|
A build belonging to a firmware version.
|
|
|
|
"""
|
2023-11-18 21:29:35 +01:00
|
|
|
id: PositiveInt
|
2023-11-19 00:12:10 +01:00
|
|
|
variant: str = APIField(
|
|
|
|
description="a variant identifier for this build, unique for this firmware version",
|
|
|
|
example="c3uart"
|
|
|
|
)
|
|
|
|
chip: ChipType = APIField(
|
|
|
|
description="the chip that this build was built for",
|
|
|
|
example=ChipType.ESP32_C3.name,
|
|
|
|
)
|
|
|
|
sha256_hash: str = APIField(
|
|
|
|
description="SHE256 hash of the underlying ELF file",
|
|
|
|
pattern=r"^[0-9a-f]{64}$",
|
|
|
|
)
|
|
|
|
url: str = APIField(
|
|
|
|
alias="binary",
|
|
|
|
example="/media/firmware/012345/firmware.bin",
|
|
|
|
description="download URL for the build binary",
|
2023-11-19 15:34:08 +01:00
|
|
|
) # todo: downlaod differently?
|
2023-11-30 23:04:03 +01:00
|
|
|
boards: list[BoardType] = APIField(
|
2023-11-19 00:12:10 +01:00
|
|
|
description="set of boards that this build is compatible with",
|
|
|
|
example={BoardType.C3NAV_LOCATION_PCB_REV_0_2.name, }
|
|
|
|
)
|
2023-11-11 03:01:15 +01:00
|
|
|
|
2023-11-30 22:45:36 +01:00
|
|
|
@staticmethod
|
|
|
|
def resolve_boards(obj):
|
|
|
|
return list(obj.boards)
|
|
|
|
|
2023-11-18 21:29:35 +01:00
|
|
|
class Config(Schema.Config):
|
|
|
|
pass
|
2023-11-11 03:01:15 +01:00
|
|
|
|
|
|
|
|
|
|
|
class FirmwareSchema(Schema):
|
2023-11-19 00:12:10 +01:00
|
|
|
"""
|
|
|
|
A firmware version, usually with multiple build variants.
|
|
|
|
"""
|
2023-11-18 21:29:35 +01:00
|
|
|
id: PositiveInt
|
2023-11-11 03:01:15 +01:00
|
|
|
project_name: str = APIField(..., example="c3nav_positioning")
|
|
|
|
version: str = APIField(..., example="499837d-dirty")
|
|
|
|
idf_version: str = APIField(..., example="v5.1-476-g3187b8b326")
|
|
|
|
created: datetime
|
2023-11-19 00:12:10 +01:00
|
|
|
builds: list[FirmwareBuildSchema] = APIField(min_items=1)
|
2023-11-11 03:01:15 +01:00
|
|
|
|
2023-11-18 21:29:35 +01:00
|
|
|
@field_validator('builds')
|
2023-11-15 14:54:21 +01:00
|
|
|
def builds_variants_must_be_unique(cls, builds):
|
|
|
|
if len(set(build.variant for build in builds)) != len(builds):
|
|
|
|
raise ValueError("builds must have unique variant identifiers")
|
|
|
|
return builds
|
|
|
|
|
2023-11-11 03:01:15 +01:00
|
|
|
|
2023-11-18 21:29:35 +01:00
|
|
|
@mesh_api_router.get('/firmwares/', summary="List available firmwares",
|
|
|
|
response={200: list[FirmwareSchema], **validate_responses, **auth_responses})
|
2023-11-11 03:01:15 +01:00
|
|
|
@paginate
|
|
|
|
def firmware_list(request):
|
|
|
|
return FirmwareVersion.objects.all()
|
|
|
|
|
|
|
|
|
2023-11-18 21:29:35 +01:00
|
|
|
@mesh_api_router.get('/firmwares/{firmware_id}/', summary="Get specific firmware",
|
|
|
|
response={200: FirmwareSchema, **API404.dict(), **auth_responses})
|
2023-11-23 23:24:15 +01:00
|
|
|
def firmware_by_id(request, firmware_id: int):
|
2023-11-11 03:01:15 +01:00
|
|
|
try:
|
|
|
|
return FirmwareVersion.objects.get(id=firmware_id)
|
|
|
|
except FirmwareVersion.DoesNotExist:
|
2023-11-17 18:56:47 +01:00
|
|
|
raise API404("Firmware not found")
|
|
|
|
|
|
|
|
|
2023-11-18 21:29:35 +01:00
|
|
|
@mesh_api_router.get('/firmwares/{firmware_id}/{variant}/image_data',
|
|
|
|
summary="Get header data of firmware build image",
|
2023-11-19 00:12:10 +01:00
|
|
|
response={200: FirmwareImage.schema, **API404.dict(), **auth_responses},
|
2023-11-24 01:26:14 +01:00
|
|
|
openapi_extra={"externalDocs": {
|
|
|
|
'description': "esp-idf documentation",
|
|
|
|
'url': "https://docs.espressif.com/projects/esptool/en/latest/esp32s3/"
|
|
|
|
"advanced-topics/firmware-image-format.html"
|
|
|
|
}})
|
2023-11-17 18:56:47 +01:00
|
|
|
def firmware_build_image(request, firmware_id: int, variant: str):
|
|
|
|
try:
|
|
|
|
build = FirmwareBuild.objects.get(version_id=firmware_id, variant=variant)
|
|
|
|
return FirmwareImage.tojson(build.firmware_image)
|
|
|
|
except FirmwareVersion.DoesNotExist:
|
|
|
|
raise API404("Firmware or firmware build not found")
|
|
|
|
|
|
|
|
|
2023-11-18 21:29:35 +01:00
|
|
|
@mesh_api_router.get('/firmwares/{firmware_id}/{variant}/project_description',
|
|
|
|
summary="Get project description of firmware build",
|
2023-11-19 00:12:10 +01:00
|
|
|
response={200: dict, **API404.dict(), **auth_responses},
|
2023-11-24 01:26:14 +01:00
|
|
|
openapi_extra={"externalDocs": {
|
|
|
|
'description': 'esp-idf docs',
|
|
|
|
'url': "https://docs.espressif.com/projects/esp-idf/en/latest/esp32/"
|
|
|
|
"api-guides/build-system.html#build-system-metadata"
|
|
|
|
}})
|
2023-11-17 18:56:47 +01:00
|
|
|
def firmware_project_description(request, firmware_id: int, variant: str):
|
|
|
|
try:
|
|
|
|
return FirmwareBuild.objects.get(version_id=firmware_id, variant=variant).firmware_description
|
|
|
|
except FirmwareVersion.DoesNotExist:
|
|
|
|
raise API404("Firmware or firmware build not found")
|
2023-11-15 14:54:21 +01:00
|
|
|
|
|
|
|
|
|
|
|
class UploadFirmwareBuildSchema(Schema):
|
2023-11-19 00:12:10 +01:00
|
|
|
"""
|
|
|
|
A firmware build to upload, with at least one build variant
|
|
|
|
"""
|
2023-11-15 14:54:21 +01:00
|
|
|
variant: str = APIField(..., example="c3uart")
|
|
|
|
boards: list[BoardType] = APIField(..., example=[BoardType.C3NAV_LOCATION_PCB_REV_0_2.name, ])
|
2023-11-17 16:42:27 +01:00
|
|
|
project_description: dict = APIField(..., title='project_description.json contents')
|
|
|
|
uploaded_filename: str = APIField(..., example="firmware.bin")
|
2023-11-15 14:54:21 +01:00
|
|
|
|
|
|
|
|
|
|
|
class UploadFirmwareSchema(Schema):
|
2023-11-19 00:12:10 +01:00
|
|
|
"""
|
|
|
|
A firmware version to upload, with at least one build variant
|
|
|
|
"""
|
2023-11-15 14:54:21 +01:00
|
|
|
project_name: str = APIField(..., example="c3nav_positioning")
|
|
|
|
version: str = APIField(..., example="499837d-dirty")
|
|
|
|
idf_version: str = APIField(..., example="v5.1-476-g3187b8b326")
|
2023-11-19 00:12:10 +01:00
|
|
|
builds: list[UploadFirmwareBuildSchema] = APIField(min_items=1)
|
2023-11-15 14:54:21 +01:00
|
|
|
|
2023-11-18 21:29:35 +01:00
|
|
|
@field_validator('builds')
|
2023-11-15 14:54:21 +01:00
|
|
|
def builds_variants_must_be_unique(cls, builds):
|
|
|
|
if len(set(build.variant for build in builds)) != len(builds):
|
|
|
|
raise ValueError("builds must have unique variant identifiers")
|
|
|
|
return builds
|
|
|
|
|
|
|
|
|
2023-11-18 21:29:35 +01:00
|
|
|
@mesh_api_router.post(
|
2023-11-30 22:02:40 +01:00
|
|
|
'/firmwares/upload', summary="Upload firmware",
|
2023-11-19 14:56:00 +01:00
|
|
|
description="your OpenAPI viewer might not show it: firmware_data is UploadFirmware as json",
|
2023-11-18 21:29:35 +01:00
|
|
|
response={200: FirmwareSchema, **validate_responses, **auth_permission_responses, **APIConflict.dict()}
|
|
|
|
)
|
2023-11-17 16:42:27 +01:00
|
|
|
def firmware_upload(request, firmware_data: UploadFirmwareSchema, binary_files: list[UploadedFile]):
|
|
|
|
binary_files_by_name = {binary_file.name: binary_file for binary_file in binary_files}
|
|
|
|
if len([binary_file.name for binary_file in binary_files]) != len(binary_files_by_name):
|
|
|
|
raise APIRequestValidationFailed("Filenames of uploaded binary files must be unique.")
|
|
|
|
|
|
|
|
build_filenames = [build_data.uploaded_filename for build_data in firmware_data.builds]
|
|
|
|
if len(build_filenames) != len(set(build_filenames)):
|
|
|
|
raise APIRequestValidationFailed("Builds need to refer to different unique binary file names.")
|
|
|
|
|
|
|
|
if set(binary_files_by_name) != set(build_filenames):
|
|
|
|
raise APIRequestValidationFailed("All uploaded binary files need to be refered to by one build.")
|
|
|
|
|
|
|
|
try:
|
|
|
|
with transaction.atomic():
|
|
|
|
version = FirmwareVersion.objects.create(
|
|
|
|
project_name=firmware_data.project_name,
|
|
|
|
version=firmware_data.version,
|
|
|
|
idf_version=firmware_data.idf_version,
|
2023-11-30 22:02:40 +01:00
|
|
|
uploader=request.user,
|
2023-11-17 16:42:27 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
for build_data in firmware_data.builds:
|
2023-11-17 19:04:43 +01:00
|
|
|
try:
|
|
|
|
image = FirmwareImage.from_file(binary_files_by_name[build_data.uploaded_filename].open('rb'))
|
|
|
|
except ValueError:
|
|
|
|
raise APIRequestValidationFailed(f"Can't parse binary image {build_data.uploaded_filename}")
|
|
|
|
|
2023-11-17 16:42:27 +01:00
|
|
|
build = version.builds.create(
|
|
|
|
variant=build_data.variant,
|
2023-11-26 00:36:12 +01:00
|
|
|
chip=image.ext_header.chip,
|
2023-11-17 19:04:43 +01:00
|
|
|
sha256_hash=image.app_desc.app_elf_sha256,
|
2023-11-17 16:42:27 +01:00
|
|
|
project_description=build_data.project_description,
|
|
|
|
binary=binary_files_by_name[build_data.uploaded_filename],
|
|
|
|
)
|
|
|
|
|
|
|
|
for board in build_data.boards:
|
2023-11-17 19:04:43 +01:00
|
|
|
build.firmwarebuildboard_set.create(board=board.name)
|
2023-11-17 16:42:27 +01:00
|
|
|
except IntegrityError:
|
|
|
|
raise APIConflict('Firmware version already exists.')
|
|
|
|
|
|
|
|
return version
|
2023-11-27 22:09:56 +01:00
|
|
|
|
|
|
|
|
|
|
|
NodeAddress = Annotated[str, APIField(pattern=r"^[a-z0-9]{2}(:[a-z0-9]{2}){5}$")]
|
|
|
|
|
|
|
|
|
|
|
|
class MessagesFilter(Schema):
|
|
|
|
src_node: Optional[NodeAddress] = None
|
|
|
|
msg_type: Optional[MeshMessageType] = None
|
|
|
|
time_from: Optional[datetime] = None
|
|
|
|
time_until: Optional[datetime] = None
|
|
|
|
|
|
|
|
|
|
|
|
class NodeMessageSchema(Schema):
|
|
|
|
id: int
|
|
|
|
src_node: NodeAddress
|
|
|
|
message_type: MeshMessageType
|
|
|
|
datetime: datetime
|
|
|
|
data: dict
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def resolve_src_node(obj):
|
|
|
|
return obj.src_node.address
|
|
|
|
|
|
|
|
|
|
|
|
@mesh_api_router.get(
|
2023-11-30 22:02:40 +01:00
|
|
|
'/messages/', summary="query recorded mesh messages",
|
2023-11-27 22:09:56 +01:00
|
|
|
response={200: list[NodeMessageSchema], **auth_permission_responses}
|
|
|
|
)
|
|
|
|
@paginate
|
|
|
|
def messages_list(request, filters: Query[MessagesFilter]):
|
|
|
|
qs = NodeMessage.objects.all()
|
|
|
|
if filters.src_node:
|
|
|
|
qs = qs.filter(src_node__address=filters.src_node)
|
|
|
|
if filters.msg_type:
|
|
|
|
qs = qs.filter(message_type=filters.msg_type.name)
|
|
|
|
if filters.time_from:
|
|
|
|
qs = qs.filter(datetime__gte=filters.time_from)
|
|
|
|
if filters.time_until:
|
|
|
|
qs = qs.filter(datetime__lte=filters.time_until)
|
2023-11-27 22:33:47 +01:00
|
|
|
return qs.order_by('-datetime')
|