From 375aa9f12ce53d23bbf7a1c8dd4b11cf5a738e62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laura=20Kl=C3=BCnder?= Date: Thu, 30 Nov 2023 21:31:51 +0100 Subject: [PATCH] =?UTF-8?q?move=20mesh=20forms=20from=20site=20to=20mesh?= =?UTF-8?q?=20module=E2=80=A6=20wtf?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/c3nav/mesh/forms.py | 105 ++++++++++++++++++++++++++++++- src/c3nav/mesh/views/firmware.py | 2 +- src/c3nav/site/forms.py | 93 --------------------------- 3 files changed, 105 insertions(+), 95 deletions(-) diff --git a/src/c3nav/mesh/forms.py b/src/c3nav/mesh/forms.py index 89e9ad28..3cc8896d 100644 --- a/src/c3nav/mesh/forms.py +++ b/src/c3nav/mesh/forms.py @@ -1,14 +1,22 @@ import time +from dataclasses import dataclass +from dataclasses import replace as dataclass_replace +from functools import cached_property +from itertools import chain +from typing import Sequence, Any from asgiref.sync import async_to_sync from django import forms from django.core.exceptions import ValidationError +from django.db import transaction +from django.forms import Form, ChoiceField, BooleanField from django.http import Http404 from django.utils.translation import gettext_lazy as _ from c3nav.mesh.dataformats import BoardConfig, BoardType, LedType, SerialLedType from c3nav.mesh.messages import MESH_BROADCAST_ADDRESS, MESH_ROOT_ADDRESS, MeshMessage, MeshMessageType -from c3nav.mesh.models import MeshNode +from c3nav.mesh.models import MeshNode, HardwareDescription, FirmwareBuild, OTAUpdateRecipient, OTARecipientStatus, \ + OTAUpdate class MeshMessageForm(forms.Form): @@ -284,3 +292,98 @@ class RangingForm(forms.Form): node_choices = tuple((node.address, str(node)) for node in MeshNode.objects.all()) self.fields['range_from'].choices = node_choices self.fields['range_to'].choices = node_choices + + + +@dataclass +class OTAFormGroup: + hardware: HardwareDescription + builds: Sequence[FirmwareBuild] + fields: dict[str, tuple[MeshNode, Any]] + + @cached_property + def builds_by_id(self) -> dict[int, FirmwareBuild]: + return {build.pk: build for build in self.builds} + + +class OTACreateForm(Form): + def __init__(self, builds: Sequence[FirmwareBuild], *args, **kwargs): + super().__init__(*args, **kwargs) + + nodes: Sequence[MeshNode] = MeshNode.objects.prefetch_last_messages( + MeshMessageType.CONFIG_BOARD + ).prefetch_firmwares().prefetch_ota() + + builds_by_hardware = {} + for build in builds: + for hardware_desc in build.hardware_descriptions: + builds_by_hardware.setdefault(hardware_desc, []).append(build) + + nodes_by_hardware = {} + for node in nodes: + nodes_by_hardware.setdefault(node.hardware_description, []).append(node) + + self._groups: list[OTAFormGroup] = [] + for hardware, hw_nodes in sorted(nodes_by_hardware.items(), key=lambda k: len(k[1]), reverse=True): + try: + hw_builds = builds_by_hardware[hardware] + except KeyError: + continue + choices = [ + ('', '---'), + *((build.pk, build.variant) for build in hw_builds) + ] + + group = OTAFormGroup( + hardware=hardware, + builds=hw_builds, + fields={ + f'build_{node.pk}': (node, ( + ChoiceField(choices=choices, required=False) + if len(hw_builds) > 1 + else BooleanField(required=False) + )) for node in hw_nodes + } + ) + for name, (node, hw_field) in group.fields.items(): + self.fields[name] = hw_field + self._groups.append(group) + + @property + def groups(self) -> list[OTAFormGroup]: + return [ + dataclass_replace(group, fields={ + name: (node, self[name]) + for name, (node, hw_field) in group.fields.items() + }) + for group in self._groups + ] + + @property + def selected_builds(self): + build_nodes = {} + for group in self._groups: + for name, (node, hw_field) in group.fields.items(): + value = self.cleaned_data.get(name, None) + if not value: + continue + if len(group.builds) == 1: + build_nodes.setdefault(group.builds[0], []).append(node) + else: + build_nodes.setdefault(group.builds[0], []).append(group.builds_by_id[int(value)]) + return build_nodes + + def save(self) -> list[OTAUpdate]: + updates = [] + with transaction.atomic(): + replaced_recipients = OTAUpdateRecipient.objects.filter( + node__in=chain(*self.selected_builds.values()), + status=OTARecipientStatus.RUNNING, + ).select_for_update() + replaced_recipients.update(status=OTARecipientStatus.REPLACED) + for build, nodes in self.selected_builds.items(): + update = OTAUpdate.objects.create(build=build) + for node in nodes: + update.recipients.create(node=node) + updates.append(update) + return updates diff --git a/src/c3nav/mesh/views/firmware.py b/src/c3nav/mesh/views/firmware.py index c1c417b1..591b2824 100644 --- a/src/c3nav/mesh/views/firmware.py +++ b/src/c3nav/mesh/views/firmware.py @@ -2,9 +2,9 @@ from django.contrib.messages.views import SuccessMessageMixin from django.views.generic import DetailView, ListView, TemplateView from django.views.generic.edit import FormMixin +from c3nav.mesh.forms import OTACreateForm from c3nav.mesh.models import FirmwareBuild, FirmwareVersion, MeshNode, OTARecipientStatus, OTAUpdate from c3nav.mesh.views.base import MeshControlMixin -from c3nav.site.forms import OTACreateForm class FirmwaresListView(MeshControlMixin, ListView): diff --git a/src/c3nav/site/forms.py b/src/c3nav/site/forms.py index 091578e4..605790f8 100644 --- a/src/c3nav/site/forms.py +++ b/src/c3nav/site/forms.py @@ -74,96 +74,3 @@ class PositionSetForm(Form): self.fields['position'].queryset = Position.objects.filter(owner=request.user) self.fields['position'].label_from_instance = attrgetter('name') - -@dataclass -class OTAFormGroup: - hardware: HardwareDescription - builds: Sequence[FirmwareBuild] - fields: dict[str, tuple[MeshNode, Any]] - - @cached_property - def builds_by_id(self) -> dict[int, FirmwareBuild]: - return {build.pk: build for build in self.builds} - - -class OTACreateForm(Form): - def __init__(self, builds: Sequence[FirmwareBuild], *args, **kwargs): - super().__init__(*args, **kwargs) - - nodes: Sequence[MeshNode] = MeshNode.objects.prefetch_last_messages( - MeshMessageType.CONFIG_BOARD - ).prefetch_firmwares().prefetch_ota() - - builds_by_hardware = {} - for build in builds: - for hardware_desc in build.hardware_descriptions: - builds_by_hardware.setdefault(hardware_desc, []).append(build) - - nodes_by_hardware = {} - for node in nodes: - nodes_by_hardware.setdefault(node.hardware_description, []).append(node) - - self._groups: list[OTAFormGroup] = [] - for hardware, hw_nodes in sorted(nodes_by_hardware.items(), key=lambda k: len(k[1]), reverse=True): - try: - hw_builds = builds_by_hardware[hardware] - except KeyError: - continue - choices = [ - ('', '---'), - *((build.pk, build.variant) for build in hw_builds) - ] - - group = OTAFormGroup( - hardware=hardware, - builds=hw_builds, - fields={ - f'build_{node.pk}': (node, ( - ChoiceField(choices=choices, required=False) - if len(hw_builds) > 1 - else BooleanField(required=False) - )) for node in hw_nodes - } - ) - for name, (node, hw_field) in group.fields.items(): - self.fields[name] = hw_field - self._groups.append(group) - - @property - def groups(self) -> list[OTAFormGroup]: - return [ - dataclass_replace(group, fields={ - name: (node, self[name]) - for name, (node, hw_field) in group.fields.items() - }) - for group in self._groups - ] - - @property - def selected_builds(self): - build_nodes = {} - for group in self._groups: - for name, (node, hw_field) in group.fields.items(): - value = self.cleaned_data.get(name, None) - if not value: - continue - if len(group.builds) == 1: - build_nodes.setdefault(group.builds[0], []).append(node) - else: - build_nodes.setdefault(group.builds[0], []).append(group.builds_by_id[int(value)]) - return build_nodes - - def save(self) -> list[OTAUpdate]: - updates = [] - with transaction.atomic(): - replaced_recipients = OTAUpdateRecipient.objects.filter( - node__in=chain(*self.selected_builds.values()), - status=OTARecipientStatus.RUNNING, - ).select_for_update() - replaced_recipients.update(status=OTARecipientStatus.REPLACED) - for build, nodes in self.selected_builds.items(): - update = OTAUpdate.objects.create(build=build) - for node in nodes: - update.recipients.create(node=node) - updates.append(update) - return updates