add view to show currently installed firmwares

This commit is contained in:
Laura Klünder 2023-11-06 18:26:19 +01:00
parent e01e718356
commit c872b97fa3
6 changed files with 188 additions and 14 deletions

View file

@ -1,7 +1,9 @@
from collections import UserDict
from collections import UserDict, namedtuple
from dataclasses import dataclass
from datetime import datetime
from functools import cached_property
from operator import attrgetter
from typing import Any, Mapping, Self
from typing import Any, Mapping, Optional, Self
from django.contrib.auth import get_user_model
from django.db import NotSupportedError, models
@ -10,20 +12,44 @@ from django.utils.text import slugify
from django.utils.translation import gettext_lazy as _
from c3nav.mesh.dataformats import BoardType
from c3nav.mesh.messages import ChipType
from c3nav.mesh.messages import ChipType, ConfigFirmwareMessage, ConfigHardwareMessage
from c3nav.mesh.messages import MeshMessage as MeshMessage
from c3nav.mesh.messages import MeshMessageType
FirmwareLookup = namedtuple('FirmwareLookup', ('sha256_hash', 'chip', 'project_name', 'version', 'idf_version'))
@dataclass
class FirmwareDescription:
chip: ChipType
project_name: str
version: str
idf_version: str
sha256_hash: str
build: Optional["FirmwareBuild"] = None
created: datetime | None = None
def get_lookup(self) -> FirmwareLookup:
return FirmwareLookup(
chip=self.chip,
project_name=self.project_name,
version=self.version,
idf_version=self.idf_version,
sha256_hash=self.sha256_hash,
)
class MeshNodeQuerySet(models.QuerySet):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._prefetch_last_messages = set()
self._prefetch_last_messages_done = False
self._prefetch_firmwares = False
def _clone(self):
clone = super()._clone()
clone._prefetch_last_messages = self._prefetch_last_messages
clone._prefetch_firmwares = self._prefetch_firmwares
return clone
def prefetch_last_messages(self, *types: MeshMessageType):
@ -33,10 +59,16 @@ class MeshNodeQuerySet(models.QuerySet):
)
return clone
def prefetch_firmwares(self, *types: MeshMessageType):
clone = self.prefetch_last_messages(MeshMessageType.CONFIG_FIRMWARE,
MeshMessageType.CONFIG_HARDWARE)
clone._prefetch_firmwares = True
return clone
def _fetch_all(self):
super()._fetch_all()
if self._prefetch_last_messages and not self._prefetch_last_messages_done:
nodes = {node.pk: node for node in self._result_cache}
nodes: dict[str, MeshNode] = {node.pk: node for node in self._result_cache}
try:
for message in NodeMessage.objects.order_by('message_type', 'src_node', '-datetime', '-pk').filter(
message_type__in=(t.name for t in self._prefetch_last_messages),
@ -45,9 +77,52 @@ class MeshNodeQuerySet(models.QuerySet):
nodes[message.src_node_id].last_messages[message.message_type] = message
for node in nodes.values():
node.last_messages["any"] = max(node.last_messages.values(), key=attrgetter("datetime"))
self._prefetch_last_messages_done = True
except NotSupportedError:
pass
if self._prefetch_firmwares:
# fetch matching firmware builds
firmwares = {
fw_desc.get_lookup(): fw_desc for fw_desc in
(build.get_firmware_description() for build in FirmwareBuild.objects.filter(
sha256_hash__in=set(
node.last_messages[MeshMessageType.CONFIG_FIRMWARE].parsed.app_desc.app_elf_sha256
for node in self._result_cache
)
))
}
# assign firmware descriptions
for node in nodes.values():
firmware_desc = node.get_firmware_description()
node.firmware_desc = firmwares.get(firmware_desc.get_lookup(), firmware_desc)
# get date of first appearance
nodes_to_complete = tuple(
node for node in nodes.values()
if node.firmware_desc.build is None
)
try:
created_lookup = {
msg.parsed.app_desc.app_elf_sha256: msg.datetime
for msg in NodeMessage.objects.filter(
message_type=MeshMessageType.CONFIG_FIRMWARE.name,
data__app_elf_sha256__in=(node.firmware_desc.sha256_hash for node in nodes_to_complete)
).order_by('data__app_elf_sha256', 'datetime').distinct('data__app_elf_sha256')
}
print(created_lookup)
except NotSupportedError:
created_lookup = {
app_elf_sha256: NodeMessage.objects.filter(
message_type=MeshMessageType.CONFIG_FIRMWARE.name,
data__app_elf_sha256=app_elf_sha256
).order_by('datetime').first()
for app_elf_sha256 in {node.firmware_desc.sha256_hash for node in nodes_to_complete}
}
for node in nodes_to_complete:
node.firmware_desc.created = created_lookup[node.firmware_desc.sha256_hash]
class LastMessagesByTypeLookup(UserDict):
def __init__(self, node):
@ -103,9 +178,20 @@ class MeshNode(models.Model):
return self.address
@cached_property
def last_messages(self) -> Mapping[Any, Self]:
def last_messages(self) -> Mapping[Any, "NodeMessage"]:
return LastMessagesByTypeLookup(self)
def get_firmware_description(self) -> FirmwareDescription:
firmware_msg: ConfigFirmwareMessage = self.last_messages[MeshMessageType.CONFIG_FIRMWARE].parsed
hardware_msg: ConfigHardwareMessage = self.last_messages[MeshMessageType.CONFIG_HARDWARE].parsed
return FirmwareDescription(
chip=hardware_msg.chip,
project_name=firmware_msg.app_desc.project_name,
version=firmware_msg.app_desc.version,
idf_version=firmware_msg.app_desc.idf_version,
sha256_hash=firmware_msg.app_desc.app_elf_sha256,
)
class MeshUplink(models.Model):
"""
@ -144,7 +230,7 @@ class NodeMessage(models.Model):
return '(#%d) %s at %s' % (self.pk, self.get_message_type_display(), self.datetime)
@cached_property
def parsed(self) -> dict:
def parsed(self) -> Self:
return MeshMessage.fromjson(self.data)
@ -202,6 +288,17 @@ class FirmwareBuild(models.Model):
'boards': self.boards,
}
def get_firmware_description(self) -> FirmwareDescription:
return FirmwareDescription(
chip=ChipType(self.chip),
project_name=self.version.project_name,
version=self.version.version,
idf_version=self.version.idf_version,
sha256_hash=self.sha256_hash,
created=self.version.created,
build=self,
)
class FirmwareBuildBoard(models.Model):
BOARDS = [(boardtype.name, boardtype.pretty_name) for boardtype in BoardType]