diff --git a/electrum/onion_message.py b/electrum/onion_message.py index f043fca58..64f89927f 100644 --- a/electrum/onion_message.py +++ b/electrum/onion_message.py @@ -401,8 +401,8 @@ class OnionMessageManager(Logger): self.pending = {} self.pending_lock = threading.Lock() - self.requestreply_queue = asyncio.PriorityQueue() - self.forwardqueue = asyncio.PriorityQueue() + self.send_queue = asyncio.PriorityQueue() + self.forward_queue = asyncio.PriorityQueue() def start_network(self, *, network: 'Network'): assert network @@ -415,7 +415,7 @@ class OnionMessageManager(Logger): self.logger.info("starting taskgroup.") try: async with self.taskgroup as group: - await group.spawn(self.process_request_reply_queue()) + await group.spawn(self.process_send_queue()) await group.spawn(self.process_forward_queue()) except Exception as e: self.logger.exception("taskgroup died.") @@ -427,13 +427,13 @@ class OnionMessageManager(Logger): async def process_forward_queue(self): while True: - scheduled, expires, onion_packet, blinding, node_id = await self.forwardqueue.get() + scheduled, expires, onion_packet, blinding, node_id = await self.forward_queue.get() if expires <= now(): self.logger.debug(f'forward expired {node_id=}') continue if scheduled > now(): # return to queue - self.forwardqueue.put_nowait((scheduled, expires, onion_packet, blinding, node_id)) + self.forward_queue.put_nowait((scheduled, expires, onion_packet, blinding, node_id)) await asyncio.sleep(self.SLEEP_DELAY) # sleep here, as the first queue item wasn't due yet continue @@ -449,24 +449,24 @@ class OnionMessageManager(Logger): ) except BaseException as e: self.logger.debug(f'error while sending {node_id=} e={e!r}') - self.forwardqueue.put_nowait((now() + self.FORWARD_RETRY_DELAY, expires, onion_packet, blinding, node_id)) + self.forward_queue.put_nowait((now() + self.FORWARD_RETRY_DELAY, expires, onion_packet, blinding, node_id)) def submit_forward( self, *, onion_packet: OnionPacket, blinding: bytes, node_id: bytes): - if self.forwardqueue.qsize() >= self.FORWARD_MAX_QUEUE: + if self.forward_queue.qsize() >= self.FORWARD_MAX_QUEUE: self.logger.debug('forward queue full, dropping packet') return expires = now() + self.FORWARD_RETRY_TIMEOUT queueitem = (now(), expires, onion_packet, blinding, node_id) - self.forwardqueue.put_nowait(queueitem) + self.forward_queue.put_nowait(queueitem) - async def process_request_reply_queue(self): + async def process_send_queue(self): while True: - scheduled, expires, key = await self.requestreply_queue.get() - requestreply = self.get_requestreply(key) + scheduled, expires, key = await self.send_queue.get() + requestreply = self.get_pending_message(key) if requestreply is None: self.logger.debug(f'no data for key {key=}') continue @@ -475,32 +475,32 @@ class OnionMessageManager(Logger): continue if expires <= now(): self.logger.debug(f'expired {key=}') - self._set_requestreply_result(key, Timeout()) + self._set_message_result(key, Timeout()) continue if scheduled > now(): # return to queue self.logger.debug(f'return to queue {key=}, {scheduled - now()}') - self.requestreply_queue.put_nowait((scheduled, expires, key)) + self.send_queue.put_nowait((scheduled, expires, key)) await asyncio.sleep(self.SLEEP_DELAY) # sleep here, as the first queue item wasn't due yet continue try: - self._send_pending_requestreply(key) + self._send_pending_message(key) except BaseException as e: self.logger.debug(f'error while sending {key=} {e!r}') - self._set_requestreply_result(key, copy.copy(e)) + self._set_message_result(key, copy.copy(e)) # NOTE: above, when passing the caught exception instance e directly it leads to GeneratorExit() in if isinstance(e, NoRouteFound) and e.peer_address: await self.lnwallet.add_peer(str(e.peer_address)) else: self.logger.debug(f'resubmit {key=}') - self.requestreply_queue.put_nowait((now() + self.REQUEST_REPLY_RETRY_DELAY, expires, key)) + self.send_queue.put_nowait((now() + self.REQUEST_REPLY_RETRY_DELAY, expires, key)) - def get_requestreply(self, key): + def get_pending_message(self, key): with self.pending_lock: return self.pending.get(key) - def _set_requestreply_result(self, key, result): + def _set_message_result(self, key, result): with self.pending_lock: requestreply = self.pending.get(key) if requestreply is None: @@ -509,7 +509,7 @@ class OnionMessageManager(Logger): self.pending[key]['result'] = result requestreply['ev'].set() - def _remove_requestreply(self, key): + def _remove_pending_message(self, key): with self.pending_lock: requestreply = self.pending.get(key) if requestreply is None: @@ -517,7 +517,7 @@ class OnionMessageManager(Logger): requestreply['ev'].set() del self.pending[key] - def submit_requestreply( + def submit_send( self, *, payload: dict, node_id_or_blinded_path: bytes, @@ -533,7 +533,7 @@ class OnionMessageManager(Logger): key = os.urandom(8) assert type(key) is bytes and len(key) >= 8 - self.logger.debug(f'submit_requestreply {key=} {payload=} {node_id_or_blinded_path=}') + self.logger.debug(f'submit_send {key=} {payload=} {node_id_or_blinded_path=}') with self.pending_lock: if key in self.pending: @@ -547,12 +547,12 @@ class OnionMessageManager(Logger): # tuple = (when to process, when it expires, key) expires = now() + self.REQUEST_REPLY_TIMEOUT queueitem = (now(), expires, key) - self.requestreply_queue.put_nowait(queueitem) - task = asyncio.create_task(self._requestreply_task(key)) + self.send_queue.put_nowait(queueitem) + task = asyncio.create_task(self._wait_task(key)) return task - async def _requestreply_task(self, key): - requestreply = self.get_requestreply(key) + async def _wait_task(self, key): + requestreply = self.get_pending_message(key) assert requestreply if requestreply is None: return @@ -563,21 +563,21 @@ class OnionMessageManager(Logger): self.logger.debug(f'wait task end {key}') try: - requestreply = self.get_requestreply(key) + requestreply = self.get_pending_message(key) assert requestreply result = requestreply.get('result') if isinstance(result, Exception): raise result # raising in the task requires caller to explicitly extract exception. return result finally: - self._remove_requestreply(key) + self._remove_pending_message(key) - def _send_pending_requestreply(self, key): + def _send_pending_message(self, key): """adds reply_path to payload""" - data = self.get_requestreply(key) + data = self.get_pending_message(key) payload = data.get('payload') node_id_or_blinded_path = data.get('node_id_or_blinded_path') - self.logger.debug(f'send_requestreply {key=} {payload=} {node_id_or_blinded_path=}') + self.logger.debug(f'send_pending_message {key=} {payload=} {node_id_or_blinded_path=}') final_payload = copy.deepcopy(payload) @@ -624,12 +624,12 @@ class OnionMessageManager(Logger): self.logger.warning('not a reply to our request (unknown path_id prefix)') return key = correl_data[8:] - requestreply = self.get_requestreply(key) + requestreply = self.get_pending_message(key) if requestreply is None: self.logger.warning('not a reply to our request (unknown request)') return - self._set_requestreply_result(key, (recipient_data, payload)) + self._set_message_result(key, (recipient_data, payload)) def on_onion_message_received_unsolicited(self, recipient_data, payload): self.logger.debug('unsolicited onion_message received') diff --git a/tests/test_onion_message.py b/tests/test_onion_message.py index 46bdc609a..13eee113a 100644 --- a/tests/test_onion_message.py +++ b/tests/test_onion_message.py @@ -307,7 +307,7 @@ class TestOnionMessageManager(ElectrumTestCase): self.eve_pub = self.eve.get_public_key_bytes(compressed=True) async def run_test1(self, t): - t1 = t.submit_requestreply( + t1 = t.submit_send( payload={'message': {'text': 'alice_timeout'.encode('utf-8')}}, node_id_or_blinded_path=self.alice_pub) @@ -315,7 +315,7 @@ class TestOnionMessageManager(ElectrumTestCase): await t1 async def run_test2(self, t): - t2 = t.submit_requestreply( + t2 = t.submit_send( payload={'message': {'text': 'bob_slow_timeout'.encode('utf-8')}}, node_id_or_blinded_path=self.bob_pub) @@ -323,7 +323,7 @@ class TestOnionMessageManager(ElectrumTestCase): await t2 async def run_test3(self, t, rkey): - t3 = t.submit_requestreply( + t3 = t.submit_send( payload={'message': {'text': 'carol_with_immediate_reply'.encode('utf-8')}}, node_id_or_blinded_path=self.carol_pub, key=rkey) @@ -332,7 +332,7 @@ class TestOnionMessageManager(ElectrumTestCase): self.assertEqual(t3_result, ({'path_id': {'data': b'electrum' + rkey}}, {})) async def run_test4(self, t, rkey): - t4 = t.submit_requestreply( + t4 = t.submit_send( payload={'message': {'text': 'dave_with_slow_reply'.encode('utf-8')}}, node_id_or_blinded_path=self.dave_pub, key=rkey) @@ -341,7 +341,7 @@ class TestOnionMessageManager(ElectrumTestCase): self.assertEqual(t4_result, ({'path_id': {'data': b'electrum' + rkey}}, {})) async def run_test5(self, t): - t5 = t.submit_requestreply( + t5 = t.submit_send( payload={'message': {'text': 'no_peer'.encode('utf-8')}}, node_id_or_blinded_path=self.eve_pub)