diff --git a/electrum/onion_message.py b/electrum/onion_message.py index b01fd90d2..f043fca58 100644 --- a/electrum/onion_message.py +++ b/electrum/onion_message.py @@ -24,7 +24,6 @@ import asyncio import copy import io import os -import queue import threading import time from random import random @@ -402,10 +401,8 @@ class OnionMessageManager(Logger): self.pending = {} self.pending_lock = threading.Lock() - self.requestreply_queue = queue.PriorityQueue() - self.requestreply_queue_notempty = asyncio.Event() - self.forwardqueue = queue.PriorityQueue() - self.forwardqueue_notempty = asyncio.Event() + self.requestreply_queue = asyncio.PriorityQueue() + self.forwardqueue = asyncio.PriorityQueue() def start_network(self, *, network: 'Network'): assert network @@ -430,14 +427,7 @@ class OnionMessageManager(Logger): async def process_forward_queue(self): while True: - try: - scheduled, expires, onion_packet, blinding, node_id = self.forwardqueue.get_nowait() - except queue.Empty: - self.logger.info(f'forward queue empty') - self.forwardqueue_notempty.clear() - await self.forwardqueue_notempty.wait() - continue - + scheduled, expires, onion_packet, blinding, node_id = await self.forwardqueue.get() if expires <= now(): self.logger.debug(f'forward expired {node_id=}') continue @@ -472,21 +462,10 @@ class OnionMessageManager(Logger): expires = now() + self.FORWARD_RETRY_TIMEOUT queueitem = (now(), expires, onion_packet, blinding, node_id) self.forwardqueue.put_nowait(queueitem) - self.forwardqueue_notempty.set() async def process_request_reply_queue(self): while True: - try: - scheduled, expires, key = self.requestreply_queue.get_nowait() - except queue.Empty: - self.logger.info(f'requestreply queue empty') - try: - self.requestreply_queue_notempty.clear() - await self.requestreply_queue_notempty.wait() # NOTE: quirk, see note below - except Exception as e: - self.logger.info(f'Exception e={e!r}') - continue - + scheduled, expires, key = await self.requestreply_queue.get() requestreply = self.get_requestreply(key) if requestreply is None: self.logger.debug(f'no data for key {key=}') @@ -511,7 +490,6 @@ class OnionMessageManager(Logger): self.logger.debug(f'error while sending {key=} {e!r}') self._set_requestreply_result(key, copy.copy(e)) # NOTE: above, when passing the caught exception instance e directly it leads to GeneratorExit() in - # queue_notempty.wait() later (??). pass a copy instead. if isinstance(e, NoRouteFound) and e.peer_address: await self.lnwallet.add_peer(str(e.peer_address)) else: @@ -571,7 +549,6 @@ class OnionMessageManager(Logger): queueitem = (now(), expires, key) self.requestreply_queue.put_nowait(queueitem) task = asyncio.create_task(self._requestreply_task(key)) - self.requestreply_queue_notempty.set() return task async def _requestreply_task(self, key):