some more message parsing fixes and routing implementation
This commit is contained in:
parent
b40b396eb4
commit
59ebdd74bb
3 changed files with 69 additions and 10 deletions
|
@ -6,7 +6,7 @@ from django.utils import timezone
|
||||||
|
|
||||||
from c3nav.mesh.utils import get_mesh_comm_group
|
from c3nav.mesh.utils import get_mesh_comm_group
|
||||||
from c3nav.mesh import messages
|
from c3nav.mesh import messages
|
||||||
from c3nav.mesh.messages import MeshMessage, MESH_BROADCAST_ADDRESS, MeshMessageType
|
from c3nav.mesh.messages import MeshMessage, MESH_BROADCAST_ADDRESS, MeshMessageType, MESH_ROOT_ADDRESS
|
||||||
from c3nav.mesh.models import MeshNode, NodeMessage
|
from c3nav.mesh.models import MeshNode, NodeMessage
|
||||||
|
|
||||||
|
|
||||||
|
@ -30,9 +30,9 @@ class MeshConsumer(WebsocketConsumer):
|
||||||
# remove all other destinations
|
# remove all other destinations
|
||||||
self.remove_dst_nodes(self.dst_nodes)
|
self.remove_dst_nodes(self.dst_nodes)
|
||||||
|
|
||||||
def send_msg(self, msg, sender=None):
|
def send_msg(self, msg, sender=None, exclude_uplink_address=None):
|
||||||
# print("sending", msg)
|
#print("sending", msg, MeshMessage.encode(msg).hex(' ', 1))
|
||||||
# self.log_text(msg.dst, "sending %s" % msg)
|
#self.log_text(msg.dst, "sending %s" % msg)
|
||||||
self.send(bytes_data=MeshMessage.encode(msg))
|
self.send(bytes_data=MeshMessage.encode(msg))
|
||||||
async_to_sync(self.channel_layer.group_send)("mesh_msg_sent", {
|
async_to_sync(self.channel_layer.group_send)("mesh_msg_sent", {
|
||||||
"type": "mesh.msg_sent",
|
"type": "mesh.msg_sent",
|
||||||
|
@ -54,9 +54,29 @@ class MeshConsumer(WebsocketConsumer):
|
||||||
return
|
return
|
||||||
|
|
||||||
if msg.dst != messages.MESH_ROOT_ADDRESS and msg.dst != messages.MESH_PARENT_ADDRESS:
|
if msg.dst != messages.MESH_ROOT_ADDRESS and msg.dst != messages.MESH_PARENT_ADDRESS:
|
||||||
|
# message not adressed to us, forward it
|
||||||
print('Received message for forwarding:', msg)
|
print('Received message for forwarding:', msg)
|
||||||
# todo: this message isn't for us, forward it
|
|
||||||
return
|
if not self.uplink_node:
|
||||||
|
self.log_text(self.uplink_node, "received message not for us before sign in message, ignoring...")
|
||||||
|
print('no sign in yet, ignoring')
|
||||||
|
return
|
||||||
|
|
||||||
|
# trace messages collect node adresses before forwarding
|
||||||
|
if isinstance(msg, messages.MeshRouteTraceMessage):
|
||||||
|
print('adding ourselves to trace message before forwarding')
|
||||||
|
self.log_text(MESH_ROOT_ADDRESS, "adding ourselves to trace message before forwarding")
|
||||||
|
msg.trace.append(MESH_ROOT_ADDRESS)
|
||||||
|
|
||||||
|
msg.send(exclude_uplink_address=self.uplink_node.address)
|
||||||
|
|
||||||
|
# don't handle this message unless it's a broadcast message
|
||||||
|
if msg.dst != messages.MESH_BROADCAST_ADDRESS:
|
||||||
|
# don't handle this message unless it's a broadcast message
|
||||||
|
self.log_text(MESH_ROOT_ADDRESS, "received non-broadcast message not for us, forwarding...")
|
||||||
|
return
|
||||||
|
print('it\'s a broadcast so it\'s also for us')
|
||||||
|
self.log_text(MESH_ROOT_ADDRESS, "received broadcast message, forwarding and handling...")
|
||||||
|
|
||||||
#print('Received message:', msg)
|
#print('Received message:', msg)
|
||||||
|
|
||||||
|
@ -103,6 +123,24 @@ class MeshConsumer(WebsocketConsumer):
|
||||||
if isinstance(msg, messages.MeshRemoveDestinationsMessage):
|
if isinstance(msg, messages.MeshRemoveDestinationsMessage):
|
||||||
self.remove_dst_nodes(addresses=msg.addresses)
|
self.remove_dst_nodes(addresses=msg.addresses)
|
||||||
|
|
||||||
|
if isinstance(msg, messages.MeshRouteRequestMessage):
|
||||||
|
if msg.address == MESH_ROOT_ADDRESS:
|
||||||
|
self.log_text(MESH_ROOT_ADDRESS, "route request about us, start a trace")
|
||||||
|
messages.MeshRouteTraceMessage(
|
||||||
|
src=MESH_ROOT_ADDRESS,
|
||||||
|
dst=msg.src,
|
||||||
|
request_id=msg.request_id,
|
||||||
|
trace=[MESH_ROOT_ADDRESS],
|
||||||
|
).send()
|
||||||
|
else:
|
||||||
|
# todo: find a way to send a "no route" message if there is no route
|
||||||
|
self.log_text(MESH_ROOT_ADDRESS, "requesting route response responsible uplink")
|
||||||
|
async_to_sync(self.channel_layer.group_send)(get_mesh_comm_group(msg.address), {
|
||||||
|
"type": "mesh.send_route_response",
|
||||||
|
"request_id": msg.request_id,
|
||||||
|
"dst": msg.src,
|
||||||
|
})
|
||||||
|
|
||||||
def mesh_uplink_consumer(self, data):
|
def mesh_uplink_consumer(self, data):
|
||||||
# message handler: if we are not the given uplink, leave this group
|
# message handler: if we are not the given uplink, leave this group
|
||||||
if data["name"] != self.channel_name:
|
if data["name"] != self.channel_name:
|
||||||
|
@ -116,8 +154,27 @@ class MeshConsumer(WebsocketConsumer):
|
||||||
self.remove_dst_nodes((data["address"], ))
|
self.remove_dst_nodes((data["address"], ))
|
||||||
|
|
||||||
def mesh_send(self, data):
|
def mesh_send(self, data):
|
||||||
|
if self.uplink_node.address == data["exclude_uplink_address"]:
|
||||||
|
if data["msg"]["dst"] == MESH_BROADCAST_ADDRESS:
|
||||||
|
self.log_text(
|
||||||
|
self.uplink_node.address, "not forwarding this broadcast message via us since it came from here"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
self.log_text(
|
||||||
|
self.uplink_node.address, "we're the route for this message but it came from here so... no"
|
||||||
|
)
|
||||||
|
return
|
||||||
self.send_msg(MeshMessage.fromjson(data["msg"]), data["sender"])
|
self.send_msg(MeshMessage.fromjson(data["msg"]), data["sender"])
|
||||||
|
|
||||||
|
def mesh_send_route_response(self, data):
|
||||||
|
self.log_text(self.uplink_node.address, "we're the uplink for this address, sending route response...")
|
||||||
|
messages.MeshRouteResponseMessage(
|
||||||
|
src=MESH_ROOT_ADDRESS,
|
||||||
|
dst=data["dst"],
|
||||||
|
request_id=data["request_id"],
|
||||||
|
route=self.uplink_node.address,
|
||||||
|
).send()
|
||||||
|
|
||||||
def log_received_message(self, src_node: MeshNode, msg: messages.MeshMessage):
|
def log_received_message(self, src_node: MeshNode, msg: messages.MeshMessage):
|
||||||
as_json = MeshMessage.tojson(msg)
|
as_json = MeshMessage.tojson(msg)
|
||||||
async_to_sync(self.channel_layer.group_send)("mesh_msg_received", {
|
async_to_sync(self.channel_layer.group_send)("mesh_msg_received", {
|
||||||
|
@ -144,6 +201,7 @@ class MeshConsumer(WebsocketConsumer):
|
||||||
"node": address,
|
"node": address,
|
||||||
"text": text,
|
"text": text,
|
||||||
})
|
})
|
||||||
|
print("MESH %s: [%s] %s" % (self.uplink_node, address, text))
|
||||||
|
|
||||||
def add_dst_nodes(self, nodes=None, addresses=None):
|
def add_dst_nodes(self, nodes=None, addresses=None):
|
||||||
nodes = list(nodes) if nodes else []
|
nodes = list(nodes) if nodes else []
|
||||||
|
|
|
@ -132,7 +132,7 @@ class VarArrayFormat(BaseVarFormat):
|
||||||
self.child_size = self.child_type.get_min_size()
|
self.child_size = self.child_type.get_min_size()
|
||||||
|
|
||||||
def encode(self, values: Sequence) -> bytes:
|
def encode(self, values: Sequence) -> bytes:
|
||||||
data = struct.pack(self.num_fmt, (len(values),))
|
data = struct.pack(self.num_fmt, len(values))
|
||||||
for value in values:
|
for value in values:
|
||||||
data += self.child_type.encode(value)
|
data += self.child_type.encode(value)
|
||||||
return data
|
return data
|
||||||
|
@ -153,7 +153,7 @@ class VarArrayFormat(BaseVarFormat):
|
||||||
|
|
||||||
class VarStrFormat(BaseVarFormat):
|
class VarStrFormat(BaseVarFormat):
|
||||||
def encode(self, value: str) -> bytes:
|
def encode(self, value: str) -> bytes:
|
||||||
return struct.pack(self.num_fmt, (len(str),))+value.encode()
|
return struct.pack(self.num_fmt, len(str))+value.encode()
|
||||||
|
|
||||||
def decode(self, data: bytes) -> tuple[str, bytes]:
|
def decode(self, data: bytes) -> tuple[str, bytes]:
|
||||||
num = struct.unpack(self.num_fmt, data[:self.num_size])[0]
|
num = struct.unpack(self.num_fmt, data[:self.num_size])[0]
|
||||||
|
@ -238,7 +238,7 @@ class StructType:
|
||||||
return data
|
return data
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def decode(cls, data: bytes) -> Self:
|
def decode(cls, data: bytes) -> tuple[Self, bytes]:
|
||||||
orig_data = data
|
orig_data = data
|
||||||
kwargs = {}
|
kwargs = {}
|
||||||
no_init_data = {}
|
no_init_data = {}
|
||||||
|
|
|
@ -67,10 +67,11 @@ class MeshMessage(StructType, union_type_field="msg_id"):
|
||||||
raise TypeError('duplicate use of c_struct_name %s' % c_struct_name)
|
raise TypeError('duplicate use of c_struct_name %s' % c_struct_name)
|
||||||
MeshMessage.c_structs[c_struct_name] = cls
|
MeshMessage.c_structs[c_struct_name] = cls
|
||||||
|
|
||||||
def send(self, sender=None):
|
def send(self, sender=None, exclude_uplink_address=None):
|
||||||
async_to_sync(channels.layers.get_channel_layer().group_send)(get_mesh_comm_group(self.dst), {
|
async_to_sync(channels.layers.get_channel_layer().group_send)(get_mesh_comm_group(self.dst), {
|
||||||
"type": "mesh.send",
|
"type": "mesh.send",
|
||||||
"sender": sender,
|
"sender": sender,
|
||||||
|
"exclude_uplink_address": exclude_uplink_address,
|
||||||
"msg": MeshMessage.tojson(self),
|
"msg": MeshMessage.tojson(self),
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue