diff --git a/electrum/onion_message.py b/electrum/onion_message.py index 77ddfd094..beb020a31 100644 --- a/electrum/onion_message.py +++ b/electrum/onion_message.py @@ -26,6 +26,7 @@ import io import os import queue import threading +import time from random import random from typing import TYPE_CHECKING, Optional, List, Sequence @@ -40,7 +41,12 @@ from electrum.lnonion import (get_bolt04_onion_key, OnionPacket, process_onion_p OnionHopsDataSingle, decrypt_onionmsg_data_tlv, encrypt_onionmsg_data_tlv, get_shared_secrets_along_route, new_onion_packet) from electrum.lnutil import LnFeatures -from electrum.util import OldTaskGroup, now +from electrum.util import OldTaskGroup + +# do not use util.now, because it rounds to integers +def now(): + return time.time() + if TYPE_CHECKING: from electrum.lnworker import LNWallet @@ -52,12 +58,7 @@ if TYPE_CHECKING: logger = get_logger(__name__) -REQUEST_REPLY_TIMEOUT = 30 -REQUEST_REPLY_RETRY_DELAY = 5 REQUEST_REPLY_PATHS_MAX = 3 -FORWARD_RETRY_TIMEOUT = 4 -FORWARD_RETRY_DELAY = 2 -FORWARD_MAX_QUEUE = 3 class NoRouteFound(Exception): @@ -386,9 +387,14 @@ class OnionMessageManager(Logger): - association between onion message and their replies - manage re-send attempts, TODO: iterate through routes (both directions)""" - def __init__(self, lnwallet: 'LNWallet', *, - request_reply_timeout=REQUEST_REPLY_TIMEOUT, - request_reply_retry_delay=REQUEST_REPLY_RETRY_DELAY): + SLEEP_DELAY = 1 + REQUEST_REPLY_TIMEOUT = 30 + REQUEST_REPLY_RETRY_DELAY = 5 + FORWARD_RETRY_TIMEOUT = 4 + FORWARD_RETRY_DELAY = 2 + FORWARD_MAX_QUEUE = 3 + + def __init__(self, lnwallet: 'LNWallet'): Logger.__init__(self) self.network = None # type: Optional['Network'] self.taskgroup = None # type: OldTaskGroup @@ -401,9 +407,6 @@ class OnionMessageManager(Logger): self.forwardqueue = queue.PriorityQueue() self.forwardqueue_notempty = asyncio.Event() - self.request_reply_timeout = request_reply_timeout - self.request_reply_retry_delay = request_reply_retry_delay - def start_network(self, *, network: 'Network'): assert network assert self.network is None, "already started" @@ -441,7 +444,7 @@ class OnionMessageManager(Logger): if scheduled > now(): # return to queue self.forwardqueue.put_nowait((scheduled, expires, onion_packet, blinding, node_id)) - await asyncio.sleep(1) # sleep here, as the first queue item wasn't due yet + await asyncio.sleep(self.SLEEP_DELAY) # sleep here, as the first queue item wasn't due yet continue try: @@ -456,17 +459,17 @@ class OnionMessageManager(Logger): ) except BaseException as e: self.logger.debug(f'error while sending {node_id=} e={e!r}') - self.forwardqueue.put_nowait((now() + FORWARD_RETRY_DELAY, expires, onion_packet, blinding, node_id)) + self.forwardqueue.put_nowait((now() + self.FORWARD_RETRY_DELAY, expires, onion_packet, blinding, node_id)) def submit_forward( self, *, onion_packet: OnionPacket, blinding: bytes, node_id: bytes): - if self.forwardqueue.qsize() >= FORWARD_MAX_QUEUE: + if self.forwardqueue.qsize() >= self.FORWARD_MAX_QUEUE: self.logger.debug('forward queue full, dropping packet') return - expires = now() + FORWARD_RETRY_TIMEOUT + expires = now() + self.FORWARD_RETRY_TIMEOUT queueitem = (now(), expires, onion_packet, blinding, node_id) self.forwardqueue.put_nowait(queueitem) self.forwardqueue_notempty.set() @@ -498,8 +501,9 @@ class OnionMessageManager(Logger): continue if scheduled > now(): # return to queue + self.logger.debug(f'return to queue {key=}, {scheduled - now()}') self.requestreply_queue.put_nowait((scheduled, expires, key)) - await asyncio.sleep(1) # sleep here, as the first queue item wasn't due yet + await asyncio.sleep(self.SLEEP_DELAY) # sleep here, as the first queue item wasn't due yet continue try: @@ -513,7 +517,7 @@ class OnionMessageManager(Logger): await self.lnwallet.add_peer(str(e.peer_address)) else: self.logger.debug(f'resubmit {key=}') - self.requestreply_queue.put_nowait((now() + self.request_reply_retry_delay, expires, key)) + self.requestreply_queue.put_nowait((now() + self.REQUEST_REPLY_RETRY_DELAY, expires, key)) def get_requestreply(self, key): with self.pending_lock: @@ -564,7 +568,7 @@ class OnionMessageManager(Logger): } # tuple = (when to process, when it expires, key) - expires = now() + self.request_reply_timeout + expires = now() + self.REQUEST_REPLY_TIMEOUT queueitem = (now(), expires, key) self.requestreply_queue.put_nowait(queueitem) task = asyncio.create_task(self._requestreply_task(key)) diff --git a/tests/test_onion_message.py b/tests/test_onion_message.py index 04ddfe949..46bdc609a 100644 --- a/tests/test_onion_message.py +++ b/tests/test_onion_message.py @@ -24,6 +24,13 @@ from electrum.logging import console_stderr_handler from . import ElectrumTestCase, test_lnpeer from .test_lnpeer import PutIntoOthersQueueTransport, PeerInTests, keypair +TIME_STEP = 0.01 # run tests 100 x faster +OnionMessageManager.SLEEP_DELAY *= TIME_STEP +OnionMessageManager.REQUEST_REPLY_TIMEOUT *= TIME_STEP +OnionMessageManager.REQUEST_REPLY_RETRY_DELAY *= TIME_STEP +OnionMessageManager.FORWARD_RETRY_TIMEOUT *= TIME_STEP +OnionMessageManager.FORWARD_RETRY_DELAY *= TIME_STEP + # test vectors https://github.com/lightning/bolts/pull/759/files path = os.path.join(os.path.dirname(__file__), 'blinded-onion-message-onion-test.json') test_vectors = read_json_file(path) @@ -348,13 +355,13 @@ class TestOnionMessageManager(ElectrumTestCase): lnw = MockLNWallet(local_keypair=k, chans=[], tx_queue=q1, name='test', has_anchors=False) def slow(*args, **kwargs): - time.sleep(2) + time.sleep(2*TIME_STEP) def withreply(key, *args, **kwargs): t.on_onion_message_received_reply({'path_id': {'data': b'electrum' + key}}, {}) def slowwithreply(key, *args, **kwargs): - time.sleep(2) + time.sleep(2*TIME_STEP) t.on_onion_message_received_reply({'path_id': {'data': b'electrum' + key}}, {}) rkey1 = bfh('0102030405060708') @@ -364,22 +371,18 @@ class TestOnionMessageManager(ElectrumTestCase): lnw.peers[self.bob_pub] = MockPeer(self.bob_pub, on_send_message=slow) lnw.peers[self.carol_pub] = MockPeer(self.carol_pub, on_send_message=partial(withreply, rkey1)) lnw.peers[self.dave_pub] = MockPeer(self.dave_pub, on_send_message=partial(slowwithreply, rkey2)) - t = OnionMessageManager(lnw, request_reply_timeout=5, request_reply_retry_delay=1) + t = OnionMessageManager(lnw) t.start_network(network=n) try: - await asyncio.sleep(1) - + await asyncio.sleep(TIME_STEP) self.logger.debug('tests in sequence') - await self.run_test1(t) await self.run_test2(t) await self.run_test3(t, rkey1) await self.run_test4(t, rkey2) await self.run_test5(t) - self.logger.debug('tests in parallel') - async with OldTaskGroup() as group: await group.spawn(self.run_test1(t)) await group.spawn(self.run_test2(t)) @@ -387,7 +390,7 @@ class TestOnionMessageManager(ElectrumTestCase): await group.spawn(self.run_test4(t, rkey2)) await group.spawn(self.run_test5(t)) finally: - await asyncio.sleep(1) + await asyncio.sleep(TIME_STEP) self.logger.debug('stopping manager') await t.stop()