From ee539f678a1c0e9b897a6f211882940f1dcb7115 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laura=20Kl=C3=BCnder?= Date: Tue, 7 Nov 2023 15:37:45 +0100 Subject: [PATCH] use database transactions when adding/removing uplink destinations --- src/c3nav/mesh/consumers.py | 58 ++++++++++++++++++++++--------------- 1 file changed, 35 insertions(+), 23 deletions(-) diff --git a/src/c3nav/mesh/consumers.py b/src/c3nav/mesh/consumers.py index 827aa1f7..353f7d42 100644 --- a/src/c3nav/mesh/consumers.py +++ b/src/c3nav/mesh/consumers.py @@ -316,20 +316,8 @@ class MeshConsumer(AsyncWebsocketConsumer): for address in addresses: await self.log_text(address, "destination added") - # create group name for this address - group = get_mesh_comm_group(address) - - # if we aren't handling this address yet, join the group - if address not in self.dst_nodes: - await self.channel_layer.group_add(group, self.channel_name) - self.dst_nodes.add(address) - - # tell other consumers to leave the group - await self.channel_layer.group_send(group, { - "type": "mesh.dst_node_uplink", - "node": address, - "uplink": self.uplink.node.address - }) + # add ourselves as uplink + await self._add_destination(address) # tell the node to dump its current information await self.send_msg( @@ -339,27 +327,51 @@ class MeshConsumer(AsyncWebsocketConsumer): ) ) - # add the stuff to the db as well - await MeshNode.objects.filter(address__in=addresses).aupdate( - uplink=self.uplink, - last_signin=timezone.now(), - ) + @database_sync_to_async + def _add_destination(self, address): + with transaction.atomic(): + node = MeshNode.objects.select_for_update().get(address=address) + + # create group name for this address + group = get_mesh_comm_group(address) + + # if we aren't handling this address yet, join the group + if address not in self.dst_nodes: + async_to_sync(self.channel_layer.group_add)(group, self.channel_name) + self.dst_nodes.add(address) + + # tell other consumers to leave the group + async_to_sync(self.channel_layer.group_send)(group, { + "type": "mesh.dst_node_uplink", + "node": address, + "uplink": self.uplink.node.address + }) + + node.uplink = self.uplink, + node.last_signin = timezone.now() + node.save() async def remove_dst_nodes(self, addresses): for address in tuple(addresses): await self.log_text(address, "destination removed") + await self._remove_destination(address) + + @database_sync_to_async + def _remove_destination(self, address): + with transaction.atomic(): + node = MeshNode.objects.select_for_update().get(address=address) + # create group name for this address group = get_mesh_comm_group(address) # leave the group if address in self.dst_nodes: - await self.channel_layer.group_discard(group, self.channel_name) + async_to_sync(self.channel_layer.group_discard)(group, self.channel_name) self.dst_nodes.discard(address) - # add the stuff to the db as well - # todo: shouldn't do this because of race condition? - await MeshNode.objects.filter(address__in=addresses, uplink=self.uplink).aupdate(uplink=None) + node.uplink = None + node.save() class MeshUIConsumer(AsyncJsonWebsocketConsumer):