add MeshUplink model to keep better track of uplinks

This commit is contained in:
Laura Klünder 2023-11-06 14:22:35 +01:00
parent d69a9c4be0
commit 61673603ea
6 changed files with 260 additions and 52 deletions

View file

@ -132,7 +132,8 @@
{% endfor %}
{% endif %}
</td>
<td>{% mesh_node msg.uplink_node %}</td>
{% comment %}todo: more details{% endcomment %}
<td>{% mesh_node msg.uplink.node %}</td>
</tr>
{% endfor %}
</table>

View file

@ -12,7 +12,8 @@
<strong>Name:</strong> {% if node.name %}{{ node.name }}{% else %}<em>{% trans '(no name)' %}</em>{% endif %}
</p>
<p>
<strong>Uplink:</strong> {% if node.uplink %}<a href="{% url "control.mesh.node.detail" pk=node.uplink.address %}">{{ node.uplink }}</a><br>{% endif %}
{% comment %}todo: more details{% endcomment %}
<strong>Uplink:</strong> {% if node.uplink %}<a href="{% url "control.mesh.node.detail" pk=node.uplink.node_id %}">{{ node.uplink.node }}</a><br>{% endif %}
<strong>Last signin:</strong>
{{ node.last_signin.date }} {{ node.last_signin.time|date:"H:i:s" }}

View file

@ -66,7 +66,9 @@
{{ timesince }} ago
{% endblocktrans %}
</td>
<td>{% if node.uplink %}<a href="{% url "control.mesh.node.detail" pk=node.uplink.address %}">{{ node.uplink }}</a>{% endif %}</td>
{% comment %}todo: hide uplink if timed out{% endcomment %}
{% comment %}todo: more details{% endcomment %}
<td>{% if node.uplink %}<a href="{% url "control.mesh.node.detail" pk=node.uplink.node_id %}">{{ node.uplink.node }}</a>{% endif %}</td>
</tr>
{% endfor %}
</table>

View file

@ -2,12 +2,13 @@ import traceback
from asgiref.sync import async_to_sync
from channels.generic.websocket import JsonWebsocketConsumer, WebsocketConsumer
from django.db import transaction
from django.utils import timezone
from c3nav.mesh import messages
from c3nav.mesh.messages import (MESH_BROADCAST_ADDRESS, MESH_NONE_ADDRESS, MESH_ROOT_ADDRESS, MeshMessage,
MeshMessageType)
from c3nav.mesh.models import MeshNode, NodeMessage
from c3nav.mesh.models import MeshNode, MeshUplink, NodeMessage
from c3nav.mesh.tasks import send_channel_msg
from c3nav.mesh.utils import get_mesh_comm_group
@ -16,15 +17,17 @@ from c3nav.mesh.utils import get_mesh_comm_group
class MeshConsumer(WebsocketConsumer):
def connect(self):
# todo: auth
self.uplink_node = None
self.uplink = None
self.log_text(None, "new mesh websocket connection")
self.dst_nodes = set()
self.open_requests = set()
self.accept()
# todo: ping stuff
def disconnect(self, close_code):
self.log_text(self.uplink_node, "mesh websocket disconnected")
if self.uplink_node is not None:
self.log_text(self.uplink.node, "mesh websocket disconnected")
if self.uplink is not None:
# leave broadcast group
async_to_sync(self.channel_layer.group_discard)(
get_mesh_comm_group(MESH_BROADCAST_ADDRESS), self.channel_name
@ -33,6 +36,15 @@ class MeshConsumer(WebsocketConsumer):
# remove all other destinations
self.remove_dst_nodes(self.dst_nodes)
# set end reason (unless we set it to replaced already)
MeshUplink.objects.filter(
pk=self.uplink.pk,
).exclude(
end_reason=MeshUplink.EndReason.REPLACED
).update(
end_reason=MeshUplink.EndReason.CLOSED
)
def send_msg(self, msg, sender=None, exclude_uplink_address=None):
# print("sending", msg, MeshMessage.encode(msg).hex(' ', 1))
# self.log_text(msg.dst, "sending %s" % msg)
@ -42,7 +54,7 @@ class MeshConsumer(WebsocketConsumer):
"timestamp": timezone.now().strftime("%d.%m.%y %H:%M:%S.%f"),
"channel": self.channel_name,
"sender": sender,
"uplink": self.uplink_node.address if self.uplink_node else None,
"uplink": self.uplink.node.address if self.uplink else None,
"recipient": msg.dst,
# "msg": msg.tojson(), # not doing this part for privacy reasons
})
@ -62,8 +74,8 @@ class MeshConsumer(WebsocketConsumer):
# message not adressed to us, forward it
print('Received message for forwarding:', msg)
if not self.uplink_node:
self.log_text(self.uplink_node, "received message not for us before sign in message, ignoring...")
if not self.uplink:
self.log_text(None, "received message not for us before sign in message, ignoring...")
print('no sign in yet, ignoring')
return
@ -73,7 +85,7 @@ class MeshConsumer(WebsocketConsumer):
self.log_text(MESH_ROOT_ADDRESS, "adding ourselves to trace message before forwarding")
msg.trace.append(MESH_ROOT_ADDRESS)
msg.send(exclude_uplink_address=self.uplink_node.address)
msg.send(exclude_uplink_address=self.uplink.node.address)
# don't handle this message unless it's a broadcast message
if msg.dst != messages.MESH_BROADCAST_ADDRESS:
@ -88,7 +100,28 @@ class MeshConsumer(WebsocketConsumer):
src_node, created = MeshNode.objects.get_or_create(address=msg.src)
if isinstance(msg, messages.MeshSigninMessage):
self.uplink_node = src_node
with transaction.atomic():
# tatabase fumbling, lock the mesh node database row
locked_node = MeshNode.objects.select_for_update().get(address=msg.src)
# close other uplinks in the database (they might add their own close reason in a bit)
locked_node.uplink_sessions.filter(end_reason__isnull=True).update(
end_reason=MeshUplink.EndReason.NEW_TIMEOUT
)
# create our own uplink in the database
self.uplink = MeshUplink.objects.create(
node=locked_node,
last_ping=timezone.now(),
name=self.channel_name,
)
# inform other uplinks to shut down
async_to_sync(self.channel_layer.group_send)(get_mesh_comm_group(msg.src), {
"type": "mesh.uplink_consumer",
"name": self.channel_name,
})
# log message, since we will not log it further down
self.log_received_message(src_node, msg)
@ -104,18 +137,12 @@ class MeshConsumer(WebsocketConsumer):
get_mesh_comm_group(MESH_BROADCAST_ADDRESS), self.channel_name
)
# kick out other consumers talking to the same uplink
async_to_sync(self.channel_layer.group_send)(get_mesh_comm_group(msg.src), {
"type": "mesh.uplink_consumer",
"name": self.channel_name,
})
# add this node as a destination that this uplink handles (duh)
self.add_dst_nodes(nodes=(src_node, ))
return
if self.uplink_node is None:
if self.uplink is None:
print('Expected sign-in message, but got a different one!')
self.close()
return
@ -156,35 +183,38 @@ class MeshConsumer(WebsocketConsumer):
def mesh_uplink_consumer(self, data):
# message handler: if we are not the given uplink, leave this group
if data["name"] != self.channel_name:
self.log_text(self.uplink_node, "shutting down, uplink now served by new consumer")
self.log_text(self.uplink.node, "shutting down, uplink now served by new consumer")
MeshUplink.objects.filter(pk=self.uplink.pk,).update(
end_reason=MeshUplink.EndReason.REPLACED
)
self.close()
def mesh_dst_node_uplink(self, data):
# message handler: if we are not the given uplink, leave this group
if data["uplink"] != self.uplink_node.address:
if data["uplink"] != self.uplink.node.address:
self.log_text(data["address"], "node now served by new consumer")
self.remove_dst_nodes((data["address"], ))
def mesh_send(self, data):
if self.uplink_node.address == data["exclude_uplink_address"]:
if self.uplink.node.address == data["exclude_uplink_address"]:
if data["msg"]["dst"] == MESH_BROADCAST_ADDRESS:
self.log_text(
self.uplink_node.address, "not forwarding this broadcast message via us since it came from here"
self.uplink.node.address, "not forwarding this broadcast message via us since it came from here"
)
else:
self.log_text(
self.uplink_node.address, "we're the route for this message but it came from here so... no"
self.uplink.node.address, "we're the route for this message but it came from here so... no"
)
return
self.send_msg(MeshMessage.fromjson(data["msg"]), data["sender"])
def mesh_send_route_response(self, data):
self.log_text(self.uplink_node.address, "we're the uplink for this address, sending route response...")
self.log_text(self.uplink.node.address, "we're the uplink for this address, sending route response...")
messages.MeshRouteResponseMessage(
src=MESH_ROOT_ADDRESS,
dst=data["dst"],
request_id=data["request_id"],
route=self.uplink_node.address,
route=self.uplink.node.address,
).send()
async_to_sync(self.channel_layer.send)(data["channel"], {
"type": "mesh.route_response_sent",
@ -213,11 +243,11 @@ class MeshConsumer(WebsocketConsumer):
"type": "mesh.msg_received",
"timestamp": timezone.now().strftime("%d.%m.%y %H:%M:%S.%f"),
"channel": self.channel_name,
"uplink": self.uplink_node.address if self.uplink_node else None,
"uplink": self.uplink.node.address if self.uplink else None,
"msg": as_json,
})
NodeMessage.objects.create(
uplink_node=self.uplink_node,
uplink=self.uplink,
src_node=src_node,
message_type=msg.msg_type.name,
data=as_json,
@ -229,11 +259,11 @@ class MeshConsumer(WebsocketConsumer):
"type": "mesh.log_entry",
"timestamp": timezone.now().strftime("%d.%m.%y %H:%M:%S.%f"),
"channel": self.channel_name,
"uplink": self.uplink_node.address if self.uplink_node else None,
"uplink": self.uplink.node.address if self.uplink else None,
"node": address,
"text": text,
})
print("MESH %s: [%s] %s" % (self.uplink_node, address, text))
print("MESH %s: [%s] %s" % (self.uplink.node, address, text))
def add_dst_nodes(self, nodes=None, addresses=None):
nodes = list(nodes) if nodes else []
@ -266,7 +296,7 @@ class MeshConsumer(WebsocketConsumer):
async_to_sync(self.channel_layer.group_send)(group, {
"type": "mesh.dst_node_uplink",
"node": address,
"uplink": self.uplink_node.address
"uplink": self.uplink.node.address
})
# tell the node to dump its current information
@ -279,7 +309,7 @@ class MeshConsumer(WebsocketConsumer):
# add the stuff to the db as well
MeshNode.objects.filter(address__in=addresses).update(
uplink_id=self.uplink_node.address,
uplink=self.uplink,
last_signin=timezone.now(),
)
@ -297,9 +327,7 @@ class MeshConsumer(WebsocketConsumer):
# add the stuff to the db as well
# todo: shouldn't do this because of race condition?
MeshNode.objects.filter(address__in=addresses, uplink_id=self.uplink_node.address).update(
uplink_id=None,
)
MeshNode.objects.filter(address__in=addresses, uplink=self.uplink).update(uplink=None)
class MeshUIConsumer(JsonWebsocketConsumer):

View file

@ -0,0 +1,149 @@
# Generated by Django 4.2.1 on 2023-11-06 12:37
from django.db import migrations, models
import django.db.models.deletion
from django.db.models import Max, Min
def forwards_func(apps, schema_editor):
MeshNode = apps.get_model("mesh", "MeshNode")
MeshUplink = apps.get_model("mesh", "MeshUplink")
for node in MeshNode.objects.annotate(first_relayed=Min('relayed_messages__datetime'),
last_relayed=Max('relayed_messages__datetime')):
if node.first_relayed is None:
continue
uplink = MeshUplink.objects.create(
name="migration_generated",
started=node.first_relayed,
node=node,
last_ping=node.last_relayed,
)
node.routed_nodes.update(uplink=uplink)
node.relayed_messages.update(uplink=uplink)
def backwards_func(apps, schema_editor):
MeshUplink = apps.get_model("mesh", "MeshUplink")
for uplink in MeshUplink.objects.select_related('node'):
uplink.routed_nodes.update(uplink_node=uplink.node)
uplink.relayed_messages.update(uplink_node=uplink.node)
class Migration(migrations.Migration):
dependencies = [
("mesh", "0008_firmwarebuild_firmwarebuildboard_firmwareversion_and_more"),
]
operations = [
migrations.AlterField(
model_name="nodemessage",
name="uplink_node",
field=models.ForeignKey(
null=True,
on_delete=django.db.models.deletion.PROTECT,
related_name="relayed_messages",
to="mesh.meshnode",
verbose_name="uplink node",
),
),
migrations.CreateModel(
name="MeshUplink",
fields=[
(
"id",
models.AutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("name", models.CharField(max_length=128, verbose_name="channel name")),
(
"started",
models.DateTimeField(auto_now_add=True, verbose_name="started"),
),
(
"last_ping",
models.DateTimeField(verbose_name="last ping from consumer"),
),
(
"end_reason",
models.CharField(
choices=[
("closed", "closed"),
("replaced", "replaced"),
("new_timeout", "new (timeout)"),
],
max_length=16,
null=True,
verbose_name="end reason",
),
),
(
"node",
models.ForeignKey(
on_delete=django.db.models.deletion.PROTECT,
related_name="uplink_sessions",
to="mesh.meshnode",
verbose_name="node",
),
),
],
),
migrations.AddField(
model_name="nodemessage",
name="uplink",
field=models.ForeignKey(
null=True,
on_delete=django.db.models.deletion.PROTECT,
related_name="relayed_messages",
to="mesh.meshuplink",
verbose_name="uplink",
),
),
migrations.AddConstraint(
model_name="meshuplink",
constraint=models.UniqueConstraint(
condition=models.Q(("end_reason__isnull", True)),
fields=("node",),
name="only_one_active_uplink",
),
),
migrations.RenameField(
model_name="meshnode",
old_name="uplink",
new_name="uplink_node",
),
migrations.AddField(
model_name="meshnode",
name="uplink",
field=models.ForeignKey(
null=True,
on_delete=django.db.models.deletion.PROTECT,
related_name="routed_nodes",
to="mesh.meshuplink",
verbose_name="uplink",
),
),
migrations.RunPython(forwards_func, backwards_func),
migrations.RemoveField(
model_name="meshnode",
name="uplink_node",
),
migrations.RemoveField(
model_name="nodemessage",
name="uplink_node",
),
migrations.AlterField(
model_name="nodemessage",
name="uplink",
field=models.ForeignKey(
on_delete=django.db.models.deletion.PROTECT,
related_name="relayed_messages",
to="mesh.meshuplink",
verbose_name="uplink",
),
),
]

View file

@ -5,6 +5,7 @@ from typing import Any, Mapping, Self
from django.contrib.auth import get_user_model
from django.db import NotSupportedError, models
from django.db.models import Q, UniqueConstraint
from django.utils.text import slugify
from django.utils.translation import gettext_lazy as _
@ -40,7 +41,7 @@ class MeshNodeQuerySet(models.QuerySet):
for message in NodeMessage.objects.order_by('message_type', 'src_node', '-datetime', '-pk').filter(
message_type__in=(t.name for t in self._prefetch_last_messages),
src_node__in=nodes.keys(),
).prefetch_related("uplink_node").distinct('message_type', 'src_node'):
).prefetch_related("uplink").distinct('message_type', 'src_node'):
nodes[message.src_node_id].last_messages[message.message_type] = message
for node in nodes.values():
node.last_messages["any"] = max(node.last_messages.values(), key=attrgetter("datetime"))
@ -85,10 +86,13 @@ class LastMessagesByTypeLookup(UserDict):
class MeshNode(models.Model):
"""
A nesh node. Any node.
"""
address = models.CharField(_('mac address'), max_length=17, primary_key=True)
name = models.CharField(_('name'), max_length=32, null=True, blank=True)
first_seen = models.DateTimeField(_('first seen'), auto_now_add=True)
uplink = models.ForeignKey('MeshNode', models.PROTECT, null=True,
uplink = models.ForeignKey('MeshUplink', models.PROTECT, null=True,
related_name='routed_nodes', verbose_name=_('uplink'))
last_signin = models.DateTimeField(_('last signin'), null=True)
objects = models.Manager.from_queryset(MeshNodeQuerySet)()
@ -103,12 +107,35 @@ class MeshNode(models.Model):
return LastMessagesByTypeLookup(self)
class MeshUplink(models.Model):
"""
An uplink session, a direct connection to a node
"""
class EndReason(models.TextChoices):
CLOSED = "closed", _("closed")
REPLACED = "replaced", _("replaced")
NEW_TIMEOUT = "new_timeout", _("new (timeout)")
name = models.CharField(_('channel name'), max_length=128)
started = models.DateTimeField(_('started'), auto_now_add=True)
node = models.ForeignKey('MeshNode', models.PROTECT, related_name='uplink_sessions',
verbose_name=_('node'))
last_ping = models.DateTimeField(_('last ping from consumer'))
end_reason = models.CharField(_('end reason'), choices=EndReason.choices, null=True, max_length=16)
class Meta:
constraints = (
UniqueConstraint(fields=["node"], condition=Q(end_reason__isnull=True), name='only_one_active_uplink'),
)
class NodeMessage(models.Model):
MESSAGE_TYPES = [(msgtype.name, msgtype.pretty_name) for msgtype in MeshMessageType]
src_node = models.ForeignKey('MeshNode', models.PROTECT,
related_name='received_messages', verbose_name=_('node'))
uplink_node = models.ForeignKey('MeshNode', models.PROTECT,
related_name='relayed_messages', verbose_name=_('uplink node'))
uplink = models.ForeignKey('MeshUplink', models.PROTECT, related_name='relayed_messages',
verbose_name=_('uplink'))
datetime = models.DateTimeField(_('datetime'), db_index=True, auto_now_add=True)
message_type = models.CharField(_('message type'), max_length=24, db_index=True, choices=MESSAGE_TYPES)
data = models.JSONField(_('message data'))