From 61673603eaa1ab4e26f29d460c40a5469f4cc85a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laura=20Kl=C3=BCnder?= Date: Mon, 6 Nov 2023 14:22:35 +0100 Subject: [PATCH] add MeshUplink model to keep better track of uplinks --- .../templates/control/mesh_messages.html | 3 +- .../templates/control/mesh_node_detail.html | 3 +- .../control/templates/control/mesh_nodes.html | 4 +- src/c3nav/mesh/consumers.py | 118 ++++++++------ src/c3nav/mesh/migrations/0009_meshuplink.py | 149 ++++++++++++++++++ src/c3nav/mesh/models.py | 35 +++- 6 files changed, 260 insertions(+), 52 deletions(-) create mode 100644 src/c3nav/mesh/migrations/0009_meshuplink.py diff --git a/src/c3nav/control/templates/control/mesh_messages.html b/src/c3nav/control/templates/control/mesh_messages.html index 66ac4e32..50f79254 100644 --- a/src/c3nav/control/templates/control/mesh_messages.html +++ b/src/c3nav/control/templates/control/mesh_messages.html @@ -132,7 +132,8 @@ {% endfor %} {% endif %} - {% mesh_node msg.uplink_node %} + {% comment %}todo: more details{% endcomment %} + {% mesh_node msg.uplink.node %} {% endfor %} diff --git a/src/c3nav/control/templates/control/mesh_node_detail.html b/src/c3nav/control/templates/control/mesh_node_detail.html index 214f7565..ca3f6cc4 100644 --- a/src/c3nav/control/templates/control/mesh_node_detail.html +++ b/src/c3nav/control/templates/control/mesh_node_detail.html @@ -12,7 +12,8 @@ Name: {% if node.name %}{{ node.name }}{% else %}{% trans '(no name)' %}{% endif %}

- Uplink: {% if node.uplink %}{{ node.uplink }}
{% endif %} + {% comment %}todo: more details{% endcomment %} + Uplink: {% if node.uplink %}{{ node.uplink.node }}
{% endif %} Last signin: {{ node.last_signin.date }} {{ node.last_signin.time|date:"H:i:s" }} diff --git a/src/c3nav/control/templates/control/mesh_nodes.html b/src/c3nav/control/templates/control/mesh_nodes.html index fcf3c96f..dac665f9 100644 --- a/src/c3nav/control/templates/control/mesh_nodes.html +++ b/src/c3nav/control/templates/control/mesh_nodes.html @@ -66,7 +66,9 @@ {{ timesince }} ago {% endblocktrans %} - {% if node.uplink %}{{ node.uplink }}{% endif %} + {% comment %}todo: hide uplink if timed out{% endcomment %} + {% comment %}todo: more details{% endcomment %} + {% if node.uplink %}{{ node.uplink.node }}{% endif %} {% endfor %} diff --git a/src/c3nav/mesh/consumers.py b/src/c3nav/mesh/consumers.py index e4295eb3..702628bc 100644 --- a/src/c3nav/mesh/consumers.py +++ b/src/c3nav/mesh/consumers.py @@ -2,12 +2,13 @@ import traceback from asgiref.sync import async_to_sync from channels.generic.websocket import JsonWebsocketConsumer, WebsocketConsumer +from django.db import transaction from django.utils import timezone from c3nav.mesh import messages from c3nav.mesh.messages import (MESH_BROADCAST_ADDRESS, MESH_NONE_ADDRESS, MESH_ROOT_ADDRESS, MeshMessage, MeshMessageType) -from c3nav.mesh.models import MeshNode, NodeMessage +from c3nav.mesh.models import MeshNode, MeshUplink, NodeMessage from c3nav.mesh.tasks import send_channel_msg from c3nav.mesh.utils import get_mesh_comm_group @@ -16,15 +17,17 @@ from c3nav.mesh.utils import get_mesh_comm_group class MeshConsumer(WebsocketConsumer): def connect(self): # todo: auth - self.uplink_node = None + self.uplink = None self.log_text(None, "new mesh websocket connection") self.dst_nodes = set() self.open_requests = set() self.accept() + # todo: ping stuff + def disconnect(self, close_code): - self.log_text(self.uplink_node, "mesh websocket disconnected") - if self.uplink_node is not None: + self.log_text(self.uplink.node, "mesh websocket disconnected") + if self.uplink is not None: # leave broadcast group async_to_sync(self.channel_layer.group_discard)( get_mesh_comm_group(MESH_BROADCAST_ADDRESS), self.channel_name @@ -33,6 +36,15 @@ class MeshConsumer(WebsocketConsumer): # remove all other destinations self.remove_dst_nodes(self.dst_nodes) + # set end reason (unless we set it to replaced already) + MeshUplink.objects.filter( + pk=self.uplink.pk, + ).exclude( + end_reason=MeshUplink.EndReason.REPLACED + ).update( + end_reason=MeshUplink.EndReason.CLOSED + ) + def send_msg(self, msg, sender=None, exclude_uplink_address=None): # print("sending", msg, MeshMessage.encode(msg).hex(' ', 1)) # self.log_text(msg.dst, "sending %s" % msg) @@ -42,7 +54,7 @@ class MeshConsumer(WebsocketConsumer): "timestamp": timezone.now().strftime("%d.%m.%y %H:%M:%S.%f"), "channel": self.channel_name, "sender": sender, - "uplink": self.uplink_node.address if self.uplink_node else None, + "uplink": self.uplink.node.address if self.uplink else None, "recipient": msg.dst, # "msg": msg.tojson(), # not doing this part for privacy reasons }) @@ -62,8 +74,8 @@ class MeshConsumer(WebsocketConsumer): # message not adressed to us, forward it print('Received message for forwarding:', msg) - if not self.uplink_node: - self.log_text(self.uplink_node, "received message not for us before sign in message, ignoring...") + if not self.uplink: + self.log_text(None, "received message not for us before sign in message, ignoring...") print('no sign in yet, ignoring') return @@ -73,7 +85,7 @@ class MeshConsumer(WebsocketConsumer): self.log_text(MESH_ROOT_ADDRESS, "adding ourselves to trace message before forwarding") msg.trace.append(MESH_ROOT_ADDRESS) - msg.send(exclude_uplink_address=self.uplink_node.address) + msg.send(exclude_uplink_address=self.uplink.node.address) # don't handle this message unless it's a broadcast message if msg.dst != messages.MESH_BROADCAST_ADDRESS: @@ -88,34 +100,49 @@ class MeshConsumer(WebsocketConsumer): src_node, created = MeshNode.objects.get_or_create(address=msg.src) if isinstance(msg, messages.MeshSigninMessage): - self.uplink_node = src_node - # log message, since we will not log it further down - self.log_received_message(src_node, msg) + with transaction.atomic(): + # tatabase fumbling, lock the mesh node database row + locked_node = MeshNode.objects.select_for_update().get(address=msg.src) - # inform signed in uplink node about its layer - self.send_msg(messages.MeshLayerAnnounceMessage( - src=messages.MESH_ROOT_ADDRESS, - dst=msg.src, - layer=messages.NO_LAYER - )) + # close other uplinks in the database (they might add their own close reason in a bit) + locked_node.uplink_sessions.filter(end_reason__isnull=True).update( + end_reason=MeshUplink.EndReason.NEW_TIMEOUT + ) - # add signed in uplink node to broadcast group - async_to_sync(self.channel_layer.group_add)( - get_mesh_comm_group(MESH_BROADCAST_ADDRESS), self.channel_name - ) + # create our own uplink in the database + self.uplink = MeshUplink.objects.create( + node=locked_node, + last_ping=timezone.now(), + name=self.channel_name, + ) - # kick out other consumers talking to the same uplink - async_to_sync(self.channel_layer.group_send)(get_mesh_comm_group(msg.src), { - "type": "mesh.uplink_consumer", - "name": self.channel_name, - }) + # inform other uplinks to shut down + async_to_sync(self.channel_layer.group_send)(get_mesh_comm_group(msg.src), { + "type": "mesh.uplink_consumer", + "name": self.channel_name, + }) - # add this node as a destination that this uplink handles (duh) - self.add_dst_nodes(nodes=(src_node, )) + # log message, since we will not log it further down + self.log_received_message(src_node, msg) + + # inform signed in uplink node about its layer + self.send_msg(messages.MeshLayerAnnounceMessage( + src=messages.MESH_ROOT_ADDRESS, + dst=msg.src, + layer=messages.NO_LAYER + )) + + # add signed in uplink node to broadcast group + async_to_sync(self.channel_layer.group_add)( + get_mesh_comm_group(MESH_BROADCAST_ADDRESS), self.channel_name + ) + + # add this node as a destination that this uplink handles (duh) + self.add_dst_nodes(nodes=(src_node, )) return - if self.uplink_node is None: + if self.uplink is None: print('Expected sign-in message, but got a different one!') self.close() return @@ -156,35 +183,38 @@ class MeshConsumer(WebsocketConsumer): def mesh_uplink_consumer(self, data): # message handler: if we are not the given uplink, leave this group if data["name"] != self.channel_name: - self.log_text(self.uplink_node, "shutting down, uplink now served by new consumer") + self.log_text(self.uplink.node, "shutting down, uplink now served by new consumer") + MeshUplink.objects.filter(pk=self.uplink.pk,).update( + end_reason=MeshUplink.EndReason.REPLACED + ) self.close() def mesh_dst_node_uplink(self, data): # message handler: if we are not the given uplink, leave this group - if data["uplink"] != self.uplink_node.address: + if data["uplink"] != self.uplink.node.address: self.log_text(data["address"], "node now served by new consumer") self.remove_dst_nodes((data["address"], )) def mesh_send(self, data): - if self.uplink_node.address == data["exclude_uplink_address"]: + if self.uplink.node.address == data["exclude_uplink_address"]: if data["msg"]["dst"] == MESH_BROADCAST_ADDRESS: self.log_text( - self.uplink_node.address, "not forwarding this broadcast message via us since it came from here" + self.uplink.node.address, "not forwarding this broadcast message via us since it came from here" ) else: self.log_text( - self.uplink_node.address, "we're the route for this message but it came from here so... no" + self.uplink.node.address, "we're the route for this message but it came from here so... no" ) return self.send_msg(MeshMessage.fromjson(data["msg"]), data["sender"]) def mesh_send_route_response(self, data): - self.log_text(self.uplink_node.address, "we're the uplink for this address, sending route response...") + self.log_text(self.uplink.node.address, "we're the uplink for this address, sending route response...") messages.MeshRouteResponseMessage( src=MESH_ROOT_ADDRESS, dst=data["dst"], request_id=data["request_id"], - route=self.uplink_node.address, + route=self.uplink.node.address, ).send() async_to_sync(self.channel_layer.send)(data["channel"], { "type": "mesh.route_response_sent", @@ -213,11 +243,11 @@ class MeshConsumer(WebsocketConsumer): "type": "mesh.msg_received", "timestamp": timezone.now().strftime("%d.%m.%y %H:%M:%S.%f"), "channel": self.channel_name, - "uplink": self.uplink_node.address if self.uplink_node else None, + "uplink": self.uplink.node.address if self.uplink else None, "msg": as_json, }) NodeMessage.objects.create( - uplink_node=self.uplink_node, + uplink=self.uplink, src_node=src_node, message_type=msg.msg_type.name, data=as_json, @@ -229,11 +259,11 @@ class MeshConsumer(WebsocketConsumer): "type": "mesh.log_entry", "timestamp": timezone.now().strftime("%d.%m.%y %H:%M:%S.%f"), "channel": self.channel_name, - "uplink": self.uplink_node.address if self.uplink_node else None, + "uplink": self.uplink.node.address if self.uplink else None, "node": address, "text": text, }) - print("MESH %s: [%s] %s" % (self.uplink_node, address, text)) + print("MESH %s: [%s] %s" % (self.uplink.node, address, text)) def add_dst_nodes(self, nodes=None, addresses=None): nodes = list(nodes) if nodes else [] @@ -266,7 +296,7 @@ class MeshConsumer(WebsocketConsumer): async_to_sync(self.channel_layer.group_send)(group, { "type": "mesh.dst_node_uplink", "node": address, - "uplink": self.uplink_node.address + "uplink": self.uplink.node.address }) # tell the node to dump its current information @@ -279,7 +309,7 @@ class MeshConsumer(WebsocketConsumer): # add the stuff to the db as well MeshNode.objects.filter(address__in=addresses).update( - uplink_id=self.uplink_node.address, + uplink=self.uplink, last_signin=timezone.now(), ) @@ -297,9 +327,7 @@ class MeshConsumer(WebsocketConsumer): # add the stuff to the db as well # todo: shouldn't do this because of race condition? - MeshNode.objects.filter(address__in=addresses, uplink_id=self.uplink_node.address).update( - uplink_id=None, - ) + MeshNode.objects.filter(address__in=addresses, uplink=self.uplink).update(uplink=None) class MeshUIConsumer(JsonWebsocketConsumer): diff --git a/src/c3nav/mesh/migrations/0009_meshuplink.py b/src/c3nav/mesh/migrations/0009_meshuplink.py new file mode 100644 index 00000000..bccc9379 --- /dev/null +++ b/src/c3nav/mesh/migrations/0009_meshuplink.py @@ -0,0 +1,149 @@ +# Generated by Django 4.2.1 on 2023-11-06 12:37 + +from django.db import migrations, models +import django.db.models.deletion +from django.db.models import Max, Min + + +def forwards_func(apps, schema_editor): + MeshNode = apps.get_model("mesh", "MeshNode") + MeshUplink = apps.get_model("mesh", "MeshUplink") + for node in MeshNode.objects.annotate(first_relayed=Min('relayed_messages__datetime'), + last_relayed=Max('relayed_messages__datetime')): + if node.first_relayed is None: + continue + uplink = MeshUplink.objects.create( + name="migration_generated", + started=node.first_relayed, + node=node, + last_ping=node.last_relayed, + ) + node.routed_nodes.update(uplink=uplink) + node.relayed_messages.update(uplink=uplink) + + +def backwards_func(apps, schema_editor): + MeshUplink = apps.get_model("mesh", "MeshUplink") + for uplink in MeshUplink.objects.select_related('node'): + uplink.routed_nodes.update(uplink_node=uplink.node) + uplink.relayed_messages.update(uplink_node=uplink.node) + + + +class Migration(migrations.Migration): + dependencies = [ + ("mesh", "0008_firmwarebuild_firmwarebuildboard_firmwareversion_and_more"), + ] + + operations = [ + migrations.AlterField( + model_name="nodemessage", + name="uplink_node", + field=models.ForeignKey( + null=True, + on_delete=django.db.models.deletion.PROTECT, + related_name="relayed_messages", + to="mesh.meshnode", + verbose_name="uplink node", + ), + ), + migrations.CreateModel( + name="MeshUplink", + fields=[ + ( + "id", + models.AutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("name", models.CharField(max_length=128, verbose_name="channel name")), + ( + "started", + models.DateTimeField(auto_now_add=True, verbose_name="started"), + ), + ( + "last_ping", + models.DateTimeField(verbose_name="last ping from consumer"), + ), + ( + "end_reason", + models.CharField( + choices=[ + ("closed", "closed"), + ("replaced", "replaced"), + ("new_timeout", "new (timeout)"), + ], + max_length=16, + null=True, + verbose_name="end reason", + ), + ), + ( + "node", + models.ForeignKey( + on_delete=django.db.models.deletion.PROTECT, + related_name="uplink_sessions", + to="mesh.meshnode", + verbose_name="node", + ), + ), + ], + ), + migrations.AddField( + model_name="nodemessage", + name="uplink", + field=models.ForeignKey( + null=True, + on_delete=django.db.models.deletion.PROTECT, + related_name="relayed_messages", + to="mesh.meshuplink", + verbose_name="uplink", + ), + ), + migrations.AddConstraint( + model_name="meshuplink", + constraint=models.UniqueConstraint( + condition=models.Q(("end_reason__isnull", True)), + fields=("node",), + name="only_one_active_uplink", + ), + ), + migrations.RenameField( + model_name="meshnode", + old_name="uplink", + new_name="uplink_node", + ), + migrations.AddField( + model_name="meshnode", + name="uplink", + field=models.ForeignKey( + null=True, + on_delete=django.db.models.deletion.PROTECT, + related_name="routed_nodes", + to="mesh.meshuplink", + verbose_name="uplink", + ), + ), + migrations.RunPython(forwards_func, backwards_func), + migrations.RemoveField( + model_name="meshnode", + name="uplink_node", + ), + migrations.RemoveField( + model_name="nodemessage", + name="uplink_node", + ), + migrations.AlterField( + model_name="nodemessage", + name="uplink", + field=models.ForeignKey( + on_delete=django.db.models.deletion.PROTECT, + related_name="relayed_messages", + to="mesh.meshuplink", + verbose_name="uplink", + ), + ), + ] diff --git a/src/c3nav/mesh/models.py b/src/c3nav/mesh/models.py index bec34bc6..cd9f6ffb 100644 --- a/src/c3nav/mesh/models.py +++ b/src/c3nav/mesh/models.py @@ -5,6 +5,7 @@ from typing import Any, Mapping, Self from django.contrib.auth import get_user_model from django.db import NotSupportedError, models +from django.db.models import Q, UniqueConstraint from django.utils.text import slugify from django.utils.translation import gettext_lazy as _ @@ -40,7 +41,7 @@ class MeshNodeQuerySet(models.QuerySet): for message in NodeMessage.objects.order_by('message_type', 'src_node', '-datetime', '-pk').filter( message_type__in=(t.name for t in self._prefetch_last_messages), src_node__in=nodes.keys(), - ).prefetch_related("uplink_node").distinct('message_type', 'src_node'): + ).prefetch_related("uplink").distinct('message_type', 'src_node'): nodes[message.src_node_id].last_messages[message.message_type] = message for node in nodes.values(): node.last_messages["any"] = max(node.last_messages.values(), key=attrgetter("datetime")) @@ -85,10 +86,13 @@ class LastMessagesByTypeLookup(UserDict): class MeshNode(models.Model): + """ + A nesh node. Any node. + """ address = models.CharField(_('mac address'), max_length=17, primary_key=True) name = models.CharField(_('name'), max_length=32, null=True, blank=True) first_seen = models.DateTimeField(_('first seen'), auto_now_add=True) - uplink = models.ForeignKey('MeshNode', models.PROTECT, null=True, + uplink = models.ForeignKey('MeshUplink', models.PROTECT, null=True, related_name='routed_nodes', verbose_name=_('uplink')) last_signin = models.DateTimeField(_('last signin'), null=True) objects = models.Manager.from_queryset(MeshNodeQuerySet)() @@ -103,12 +107,35 @@ class MeshNode(models.Model): return LastMessagesByTypeLookup(self) +class MeshUplink(models.Model): + """ + An uplink session, a direct connection to a node + """ + + class EndReason(models.TextChoices): + CLOSED = "closed", _("closed") + REPLACED = "replaced", _("replaced") + NEW_TIMEOUT = "new_timeout", _("new (timeout)") + + name = models.CharField(_('channel name'), max_length=128) + started = models.DateTimeField(_('started'), auto_now_add=True) + node = models.ForeignKey('MeshNode', models.PROTECT, related_name='uplink_sessions', + verbose_name=_('node')) + last_ping = models.DateTimeField(_('last ping from consumer')) + end_reason = models.CharField(_('end reason'), choices=EndReason.choices, null=True, max_length=16) + + class Meta: + constraints = ( + UniqueConstraint(fields=["node"], condition=Q(end_reason__isnull=True), name='only_one_active_uplink'), + ) + + class NodeMessage(models.Model): MESSAGE_TYPES = [(msgtype.name, msgtype.pretty_name) for msgtype in MeshMessageType] src_node = models.ForeignKey('MeshNode', models.PROTECT, related_name='received_messages', verbose_name=_('node')) - uplink_node = models.ForeignKey('MeshNode', models.PROTECT, - related_name='relayed_messages', verbose_name=_('uplink node')) + uplink = models.ForeignKey('MeshUplink', models.PROTECT, related_name='relayed_messages', + verbose_name=_('uplink')) datetime = models.DateTimeField(_('datetime'), db_index=True, auto_now_add=True) message_type = models.CharField(_('message type'), max_length=24, db_index=True, choices=MESSAGE_TYPES) data = models.JSONField(_('message data'))