1
0

Remove unnecessary events

We do not need asyncio Events in order to signal that a queue is
empty. Instead, we should use asyncio queues.
This commit is contained in:
ThomasV
2025-01-16 18:43:36 +01:00
committed by Sander van Grieken
parent 86432f55ee
commit aeaec452a0

View File

@@ -24,7 +24,6 @@ import asyncio
import copy import copy
import io import io
import os import os
import queue
import threading import threading
import time import time
from random import random from random import random
@@ -402,10 +401,8 @@ class OnionMessageManager(Logger):
self.pending = {} self.pending = {}
self.pending_lock = threading.Lock() self.pending_lock = threading.Lock()
self.requestreply_queue = queue.PriorityQueue() self.requestreply_queue = asyncio.PriorityQueue()
self.requestreply_queue_notempty = asyncio.Event() self.forwardqueue = asyncio.PriorityQueue()
self.forwardqueue = queue.PriorityQueue()
self.forwardqueue_notempty = asyncio.Event()
def start_network(self, *, network: 'Network'): def start_network(self, *, network: 'Network'):
assert network assert network
@@ -430,14 +427,7 @@ class OnionMessageManager(Logger):
async def process_forward_queue(self): async def process_forward_queue(self):
while True: while True:
try: scheduled, expires, onion_packet, blinding, node_id = await self.forwardqueue.get()
scheduled, expires, onion_packet, blinding, node_id = self.forwardqueue.get_nowait()
except queue.Empty:
self.logger.info(f'forward queue empty')
self.forwardqueue_notempty.clear()
await self.forwardqueue_notempty.wait()
continue
if expires <= now(): if expires <= now():
self.logger.debug(f'forward expired {node_id=}') self.logger.debug(f'forward expired {node_id=}')
continue continue
@@ -472,21 +462,10 @@ class OnionMessageManager(Logger):
expires = now() + self.FORWARD_RETRY_TIMEOUT expires = now() + self.FORWARD_RETRY_TIMEOUT
queueitem = (now(), expires, onion_packet, blinding, node_id) queueitem = (now(), expires, onion_packet, blinding, node_id)
self.forwardqueue.put_nowait(queueitem) self.forwardqueue.put_nowait(queueitem)
self.forwardqueue_notempty.set()
async def process_request_reply_queue(self): async def process_request_reply_queue(self):
while True: while True:
try: scheduled, expires, key = await self.requestreply_queue.get()
scheduled, expires, key = self.requestreply_queue.get_nowait()
except queue.Empty:
self.logger.info(f'requestreply queue empty')
try:
self.requestreply_queue_notempty.clear()
await self.requestreply_queue_notempty.wait() # NOTE: quirk, see note below
except Exception as e:
self.logger.info(f'Exception e={e!r}')
continue
requestreply = self.get_requestreply(key) requestreply = self.get_requestreply(key)
if requestreply is None: if requestreply is None:
self.logger.debug(f'no data for key {key=}') self.logger.debug(f'no data for key {key=}')
@@ -511,7 +490,6 @@ class OnionMessageManager(Logger):
self.logger.debug(f'error while sending {key=} {e!r}') self.logger.debug(f'error while sending {key=} {e!r}')
self._set_requestreply_result(key, copy.copy(e)) self._set_requestreply_result(key, copy.copy(e))
# NOTE: above, when passing the caught exception instance e directly it leads to GeneratorExit() in # NOTE: above, when passing the caught exception instance e directly it leads to GeneratorExit() in
# queue_notempty.wait() later (??). pass a copy instead.
if isinstance(e, NoRouteFound) and e.peer_address: if isinstance(e, NoRouteFound) and e.peer_address:
await self.lnwallet.add_peer(str(e.peer_address)) await self.lnwallet.add_peer(str(e.peer_address))
else: else:
@@ -571,7 +549,6 @@ class OnionMessageManager(Logger):
queueitem = (now(), expires, key) queueitem = (now(), expires, key)
self.requestreply_queue.put_nowait(queueitem) self.requestreply_queue.put_nowait(queueitem)
task = asyncio.create_task(self._requestreply_task(key)) task = asyncio.create_task(self._requestreply_task(key))
self.requestreply_queue_notempty.set()
return task return task
async def _requestreply_task(self, key): async def _requestreply_task(self, key):