From 59ebdd74bb2a03e07ba24a0cfcb21aa3a3251450 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laura=20Kl=C3=BCnder?= Date: Fri, 6 Oct 2023 02:15:51 +0200 Subject: [PATCH] some more message parsing fixes and routing implementation --- src/c3nav/mesh/consumers.py | 70 ++++++++++++++++++++++++++++++++--- src/c3nav/mesh/dataformats.py | 6 +-- src/c3nav/mesh/messages.py | 3 +- 3 files changed, 69 insertions(+), 10 deletions(-) diff --git a/src/c3nav/mesh/consumers.py b/src/c3nav/mesh/consumers.py index f14e8fd7..c5a170ff 100644 --- a/src/c3nav/mesh/consumers.py +++ b/src/c3nav/mesh/consumers.py @@ -6,7 +6,7 @@ from django.utils import timezone from c3nav.mesh.utils import get_mesh_comm_group from c3nav.mesh import messages -from c3nav.mesh.messages import MeshMessage, MESH_BROADCAST_ADDRESS, MeshMessageType +from c3nav.mesh.messages import MeshMessage, MESH_BROADCAST_ADDRESS, MeshMessageType, MESH_ROOT_ADDRESS from c3nav.mesh.models import MeshNode, NodeMessage @@ -30,9 +30,9 @@ class MeshConsumer(WebsocketConsumer): # remove all other destinations self.remove_dst_nodes(self.dst_nodes) - def send_msg(self, msg, sender=None): - # print("sending", msg) - # self.log_text(msg.dst, "sending %s" % msg) + def send_msg(self, msg, sender=None, exclude_uplink_address=None): + #print("sending", msg, MeshMessage.encode(msg).hex(' ', 1)) + #self.log_text(msg.dst, "sending %s" % msg) self.send(bytes_data=MeshMessage.encode(msg)) async_to_sync(self.channel_layer.group_send)("mesh_msg_sent", { "type": "mesh.msg_sent", @@ -54,9 +54,29 @@ class MeshConsumer(WebsocketConsumer): return if msg.dst != messages.MESH_ROOT_ADDRESS and msg.dst != messages.MESH_PARENT_ADDRESS: + # message not adressed to us, forward it print('Received message for forwarding:', msg) - # todo: this message isn't for us, forward it - return + + if not self.uplink_node: + self.log_text(self.uplink_node, "received message not for us before sign in message, ignoring...") + print('no sign in yet, ignoring') + return + + # trace messages collect node adresses before forwarding + if isinstance(msg, messages.MeshRouteTraceMessage): + print('adding ourselves to trace message before forwarding') + self.log_text(MESH_ROOT_ADDRESS, "adding ourselves to trace message before forwarding") + msg.trace.append(MESH_ROOT_ADDRESS) + + msg.send(exclude_uplink_address=self.uplink_node.address) + + # don't handle this message unless it's a broadcast message + if msg.dst != messages.MESH_BROADCAST_ADDRESS: + # don't handle this message unless it's a broadcast message + self.log_text(MESH_ROOT_ADDRESS, "received non-broadcast message not for us, forwarding...") + return + print('it\'s a broadcast so it\'s also for us') + self.log_text(MESH_ROOT_ADDRESS, "received broadcast message, forwarding and handling...") #print('Received message:', msg) @@ -103,6 +123,24 @@ class MeshConsumer(WebsocketConsumer): if isinstance(msg, messages.MeshRemoveDestinationsMessage): self.remove_dst_nodes(addresses=msg.addresses) + if isinstance(msg, messages.MeshRouteRequestMessage): + if msg.address == MESH_ROOT_ADDRESS: + self.log_text(MESH_ROOT_ADDRESS, "route request about us, start a trace") + messages.MeshRouteTraceMessage( + src=MESH_ROOT_ADDRESS, + dst=msg.src, + request_id=msg.request_id, + trace=[MESH_ROOT_ADDRESS], + ).send() + else: + # todo: find a way to send a "no route" message if there is no route + self.log_text(MESH_ROOT_ADDRESS, "requesting route response responsible uplink") + async_to_sync(self.channel_layer.group_send)(get_mesh_comm_group(msg.address), { + "type": "mesh.send_route_response", + "request_id": msg.request_id, + "dst": msg.src, + }) + def mesh_uplink_consumer(self, data): # message handler: if we are not the given uplink, leave this group if data["name"] != self.channel_name: @@ -116,8 +154,27 @@ class MeshConsumer(WebsocketConsumer): self.remove_dst_nodes((data["address"], )) def mesh_send(self, data): + if self.uplink_node.address == data["exclude_uplink_address"]: + if data["msg"]["dst"] == MESH_BROADCAST_ADDRESS: + self.log_text( + self.uplink_node.address, "not forwarding this broadcast message via us since it came from here" + ) + else: + self.log_text( + self.uplink_node.address, "we're the route for this message but it came from here so... no" + ) + return self.send_msg(MeshMessage.fromjson(data["msg"]), data["sender"]) + def mesh_send_route_response(self, data): + self.log_text(self.uplink_node.address, "we're the uplink for this address, sending route response...") + messages.MeshRouteResponseMessage( + src=MESH_ROOT_ADDRESS, + dst=data["dst"], + request_id=data["request_id"], + route=self.uplink_node.address, + ).send() + def log_received_message(self, src_node: MeshNode, msg: messages.MeshMessage): as_json = MeshMessage.tojson(msg) async_to_sync(self.channel_layer.group_send)("mesh_msg_received", { @@ -144,6 +201,7 @@ class MeshConsumer(WebsocketConsumer): "node": address, "text": text, }) + print("MESH %s: [%s] %s" % (self.uplink_node, address, text)) def add_dst_nodes(self, nodes=None, addresses=None): nodes = list(nodes) if nodes else [] diff --git a/src/c3nav/mesh/dataformats.py b/src/c3nav/mesh/dataformats.py index 4cd07806..f4c47ee0 100644 --- a/src/c3nav/mesh/dataformats.py +++ b/src/c3nav/mesh/dataformats.py @@ -132,7 +132,7 @@ class VarArrayFormat(BaseVarFormat): self.child_size = self.child_type.get_min_size() def encode(self, values: Sequence) -> bytes: - data = struct.pack(self.num_fmt, (len(values),)) + data = struct.pack(self.num_fmt, len(values)) for value in values: data += self.child_type.encode(value) return data @@ -153,7 +153,7 @@ class VarArrayFormat(BaseVarFormat): class VarStrFormat(BaseVarFormat): def encode(self, value: str) -> bytes: - return struct.pack(self.num_fmt, (len(str),))+value.encode() + return struct.pack(self.num_fmt, len(str))+value.encode() def decode(self, data: bytes) -> tuple[str, bytes]: num = struct.unpack(self.num_fmt, data[:self.num_size])[0] @@ -238,7 +238,7 @@ class StructType: return data @classmethod - def decode(cls, data: bytes) -> Self: + def decode(cls, data: bytes) -> tuple[Self, bytes]: orig_data = data kwargs = {} no_init_data = {} diff --git a/src/c3nav/mesh/messages.py b/src/c3nav/mesh/messages.py index 42e8cc7e..f4987dff 100644 --- a/src/c3nav/mesh/messages.py +++ b/src/c3nav/mesh/messages.py @@ -67,10 +67,11 @@ class MeshMessage(StructType, union_type_field="msg_id"): raise TypeError('duplicate use of c_struct_name %s' % c_struct_name) MeshMessage.c_structs[c_struct_name] = cls - def send(self, sender=None): + def send(self, sender=None, exclude_uplink_address=None): async_to_sync(channels.layers.get_channel_layer().group_send)(get_mesh_comm_group(self.dst), { "type": "mesh.send", "sender": sender, + "exclude_uplink_address": exclude_uplink_address, "msg": MeshMessage.tojson(self), })