use database transactions when adding/removing uplink destinations

This commit is contained in:
Laura Klünder 2023-11-07 15:37:45 +01:00
parent 18144dde30
commit ee539f678a

View file

@ -316,20 +316,8 @@ class MeshConsumer(AsyncWebsocketConsumer):
for address in addresses: for address in addresses:
await self.log_text(address, "destination added") await self.log_text(address, "destination added")
# create group name for this address # add ourselves as uplink
group = get_mesh_comm_group(address) await self._add_destination(address)
# if we aren't handling this address yet, join the group
if address not in self.dst_nodes:
await self.channel_layer.group_add(group, self.channel_name)
self.dst_nodes.add(address)
# tell other consumers to leave the group
await self.channel_layer.group_send(group, {
"type": "mesh.dst_node_uplink",
"node": address,
"uplink": self.uplink.node.address
})
# tell the node to dump its current information # tell the node to dump its current information
await self.send_msg( await self.send_msg(
@ -339,27 +327,51 @@ class MeshConsumer(AsyncWebsocketConsumer):
) )
) )
# add the stuff to the db as well @database_sync_to_async
await MeshNode.objects.filter(address__in=addresses).aupdate( def _add_destination(self, address):
uplink=self.uplink, with transaction.atomic():
last_signin=timezone.now(), node = MeshNode.objects.select_for_update().get(address=address)
)
# create group name for this address
group = get_mesh_comm_group(address)
# if we aren't handling this address yet, join the group
if address not in self.dst_nodes:
async_to_sync(self.channel_layer.group_add)(group, self.channel_name)
self.dst_nodes.add(address)
# tell other consumers to leave the group
async_to_sync(self.channel_layer.group_send)(group, {
"type": "mesh.dst_node_uplink",
"node": address,
"uplink": self.uplink.node.address
})
node.uplink = self.uplink,
node.last_signin = timezone.now()
node.save()
async def remove_dst_nodes(self, addresses): async def remove_dst_nodes(self, addresses):
for address in tuple(addresses): for address in tuple(addresses):
await self.log_text(address, "destination removed") await self.log_text(address, "destination removed")
await self._remove_destination(address)
@database_sync_to_async
def _remove_destination(self, address):
with transaction.atomic():
node = MeshNode.objects.select_for_update().get(address=address)
# create group name for this address # create group name for this address
group = get_mesh_comm_group(address) group = get_mesh_comm_group(address)
# leave the group # leave the group
if address in self.dst_nodes: if address in self.dst_nodes:
await self.channel_layer.group_discard(group, self.channel_name) async_to_sync(self.channel_layer.group_discard)(group, self.channel_name)
self.dst_nodes.discard(address) self.dst_nodes.discard(address)
# add the stuff to the db as well node.uplink = None
# todo: shouldn't do this because of race condition? node.save()
await MeshNode.objects.filter(address__in=addresses, uplink=self.uplink).aupdate(uplink=None)
class MeshUIConsumer(AsyncJsonWebsocketConsumer): class MeshUIConsumer(AsyncJsonWebsocketConsumer):