Deobfuscate names
The name 'requestreply' do not mean anything. If something can either be a request or a reply, perhaps we can call it 'message', instead of introducing a new name? In general, coming up with new names comes at a cost, because you are forcing other developers to learn and use your terminology. Please minimize that.
This commit is contained in:
committed by
Sander van Grieken
parent
aeaec452a0
commit
d814796484
@@ -401,8 +401,8 @@ class OnionMessageManager(Logger):
|
||||
|
||||
self.pending = {}
|
||||
self.pending_lock = threading.Lock()
|
||||
self.requestreply_queue = asyncio.PriorityQueue()
|
||||
self.forwardqueue = asyncio.PriorityQueue()
|
||||
self.send_queue = asyncio.PriorityQueue()
|
||||
self.forward_queue = asyncio.PriorityQueue()
|
||||
|
||||
def start_network(self, *, network: 'Network'):
|
||||
assert network
|
||||
@@ -415,7 +415,7 @@ class OnionMessageManager(Logger):
|
||||
self.logger.info("starting taskgroup.")
|
||||
try:
|
||||
async with self.taskgroup as group:
|
||||
await group.spawn(self.process_request_reply_queue())
|
||||
await group.spawn(self.process_send_queue())
|
||||
await group.spawn(self.process_forward_queue())
|
||||
except Exception as e:
|
||||
self.logger.exception("taskgroup died.")
|
||||
@@ -427,13 +427,13 @@ class OnionMessageManager(Logger):
|
||||
|
||||
async def process_forward_queue(self):
|
||||
while True:
|
||||
scheduled, expires, onion_packet, blinding, node_id = await self.forwardqueue.get()
|
||||
scheduled, expires, onion_packet, blinding, node_id = await self.forward_queue.get()
|
||||
if expires <= now():
|
||||
self.logger.debug(f'forward expired {node_id=}')
|
||||
continue
|
||||
if scheduled > now():
|
||||
# return to queue
|
||||
self.forwardqueue.put_nowait((scheduled, expires, onion_packet, blinding, node_id))
|
||||
self.forward_queue.put_nowait((scheduled, expires, onion_packet, blinding, node_id))
|
||||
await asyncio.sleep(self.SLEEP_DELAY) # sleep here, as the first queue item wasn't due yet
|
||||
continue
|
||||
|
||||
@@ -449,24 +449,24 @@ class OnionMessageManager(Logger):
|
||||
)
|
||||
except BaseException as e:
|
||||
self.logger.debug(f'error while sending {node_id=} e={e!r}')
|
||||
self.forwardqueue.put_nowait((now() + self.FORWARD_RETRY_DELAY, expires, onion_packet, blinding, node_id))
|
||||
self.forward_queue.put_nowait((now() + self.FORWARD_RETRY_DELAY, expires, onion_packet, blinding, node_id))
|
||||
|
||||
def submit_forward(
|
||||
self, *,
|
||||
onion_packet: OnionPacket,
|
||||
blinding: bytes,
|
||||
node_id: bytes):
|
||||
if self.forwardqueue.qsize() >= self.FORWARD_MAX_QUEUE:
|
||||
if self.forward_queue.qsize() >= self.FORWARD_MAX_QUEUE:
|
||||
self.logger.debug('forward queue full, dropping packet')
|
||||
return
|
||||
expires = now() + self.FORWARD_RETRY_TIMEOUT
|
||||
queueitem = (now(), expires, onion_packet, blinding, node_id)
|
||||
self.forwardqueue.put_nowait(queueitem)
|
||||
self.forward_queue.put_nowait(queueitem)
|
||||
|
||||
async def process_request_reply_queue(self):
|
||||
async def process_send_queue(self):
|
||||
while True:
|
||||
scheduled, expires, key = await self.requestreply_queue.get()
|
||||
requestreply = self.get_requestreply(key)
|
||||
scheduled, expires, key = await self.send_queue.get()
|
||||
requestreply = self.get_pending_message(key)
|
||||
if requestreply is None:
|
||||
self.logger.debug(f'no data for key {key=}')
|
||||
continue
|
||||
@@ -475,32 +475,32 @@ class OnionMessageManager(Logger):
|
||||
continue
|
||||
if expires <= now():
|
||||
self.logger.debug(f'expired {key=}')
|
||||
self._set_requestreply_result(key, Timeout())
|
||||
self._set_message_result(key, Timeout())
|
||||
continue
|
||||
if scheduled > now():
|
||||
# return to queue
|
||||
self.logger.debug(f'return to queue {key=}, {scheduled - now()}')
|
||||
self.requestreply_queue.put_nowait((scheduled, expires, key))
|
||||
self.send_queue.put_nowait((scheduled, expires, key))
|
||||
await asyncio.sleep(self.SLEEP_DELAY) # sleep here, as the first queue item wasn't due yet
|
||||
continue
|
||||
|
||||
try:
|
||||
self._send_pending_requestreply(key)
|
||||
self._send_pending_message(key)
|
||||
except BaseException as e:
|
||||
self.logger.debug(f'error while sending {key=} {e!r}')
|
||||
self._set_requestreply_result(key, copy.copy(e))
|
||||
self._set_message_result(key, copy.copy(e))
|
||||
# NOTE: above, when passing the caught exception instance e directly it leads to GeneratorExit() in
|
||||
if isinstance(e, NoRouteFound) and e.peer_address:
|
||||
await self.lnwallet.add_peer(str(e.peer_address))
|
||||
else:
|
||||
self.logger.debug(f'resubmit {key=}')
|
||||
self.requestreply_queue.put_nowait((now() + self.REQUEST_REPLY_RETRY_DELAY, expires, key))
|
||||
self.send_queue.put_nowait((now() + self.REQUEST_REPLY_RETRY_DELAY, expires, key))
|
||||
|
||||
def get_requestreply(self, key):
|
||||
def get_pending_message(self, key):
|
||||
with self.pending_lock:
|
||||
return self.pending.get(key)
|
||||
|
||||
def _set_requestreply_result(self, key, result):
|
||||
def _set_message_result(self, key, result):
|
||||
with self.pending_lock:
|
||||
requestreply = self.pending.get(key)
|
||||
if requestreply is None:
|
||||
@@ -509,7 +509,7 @@ class OnionMessageManager(Logger):
|
||||
self.pending[key]['result'] = result
|
||||
requestreply['ev'].set()
|
||||
|
||||
def _remove_requestreply(self, key):
|
||||
def _remove_pending_message(self, key):
|
||||
with self.pending_lock:
|
||||
requestreply = self.pending.get(key)
|
||||
if requestreply is None:
|
||||
@@ -517,7 +517,7 @@ class OnionMessageManager(Logger):
|
||||
requestreply['ev'].set()
|
||||
del self.pending[key]
|
||||
|
||||
def submit_requestreply(
|
||||
def submit_send(
|
||||
self, *,
|
||||
payload: dict,
|
||||
node_id_or_blinded_path: bytes,
|
||||
@@ -533,7 +533,7 @@ class OnionMessageManager(Logger):
|
||||
key = os.urandom(8)
|
||||
assert type(key) is bytes and len(key) >= 8
|
||||
|
||||
self.logger.debug(f'submit_requestreply {key=} {payload=} {node_id_or_blinded_path=}')
|
||||
self.logger.debug(f'submit_send {key=} {payload=} {node_id_or_blinded_path=}')
|
||||
|
||||
with self.pending_lock:
|
||||
if key in self.pending:
|
||||
@@ -547,12 +547,12 @@ class OnionMessageManager(Logger):
|
||||
# tuple = (when to process, when it expires, key)
|
||||
expires = now() + self.REQUEST_REPLY_TIMEOUT
|
||||
queueitem = (now(), expires, key)
|
||||
self.requestreply_queue.put_nowait(queueitem)
|
||||
task = asyncio.create_task(self._requestreply_task(key))
|
||||
self.send_queue.put_nowait(queueitem)
|
||||
task = asyncio.create_task(self._wait_task(key))
|
||||
return task
|
||||
|
||||
async def _requestreply_task(self, key):
|
||||
requestreply = self.get_requestreply(key)
|
||||
async def _wait_task(self, key):
|
||||
requestreply = self.get_pending_message(key)
|
||||
assert requestreply
|
||||
if requestreply is None:
|
||||
return
|
||||
@@ -563,21 +563,21 @@ class OnionMessageManager(Logger):
|
||||
self.logger.debug(f'wait task end {key}')
|
||||
|
||||
try:
|
||||
requestreply = self.get_requestreply(key)
|
||||
requestreply = self.get_pending_message(key)
|
||||
assert requestreply
|
||||
result = requestreply.get('result')
|
||||
if isinstance(result, Exception):
|
||||
raise result # raising in the task requires caller to explicitly extract exception.
|
||||
return result
|
||||
finally:
|
||||
self._remove_requestreply(key)
|
||||
self._remove_pending_message(key)
|
||||
|
||||
def _send_pending_requestreply(self, key):
|
||||
def _send_pending_message(self, key):
|
||||
"""adds reply_path to payload"""
|
||||
data = self.get_requestreply(key)
|
||||
data = self.get_pending_message(key)
|
||||
payload = data.get('payload')
|
||||
node_id_or_blinded_path = data.get('node_id_or_blinded_path')
|
||||
self.logger.debug(f'send_requestreply {key=} {payload=} {node_id_or_blinded_path=}')
|
||||
self.logger.debug(f'send_pending_message {key=} {payload=} {node_id_or_blinded_path=}')
|
||||
|
||||
final_payload = copy.deepcopy(payload)
|
||||
|
||||
@@ -624,12 +624,12 @@ class OnionMessageManager(Logger):
|
||||
self.logger.warning('not a reply to our request (unknown path_id prefix)')
|
||||
return
|
||||
key = correl_data[8:]
|
||||
requestreply = self.get_requestreply(key)
|
||||
requestreply = self.get_pending_message(key)
|
||||
if requestreply is None:
|
||||
self.logger.warning('not a reply to our request (unknown request)')
|
||||
return
|
||||
|
||||
self._set_requestreply_result(key, (recipient_data, payload))
|
||||
self._set_message_result(key, (recipient_data, payload))
|
||||
|
||||
def on_onion_message_received_unsolicited(self, recipient_data, payload):
|
||||
self.logger.debug('unsolicited onion_message received')
|
||||
|
||||
@@ -307,7 +307,7 @@ class TestOnionMessageManager(ElectrumTestCase):
|
||||
self.eve_pub = self.eve.get_public_key_bytes(compressed=True)
|
||||
|
||||
async def run_test1(self, t):
|
||||
t1 = t.submit_requestreply(
|
||||
t1 = t.submit_send(
|
||||
payload={'message': {'text': 'alice_timeout'.encode('utf-8')}},
|
||||
node_id_or_blinded_path=self.alice_pub)
|
||||
|
||||
@@ -315,7 +315,7 @@ class TestOnionMessageManager(ElectrumTestCase):
|
||||
await t1
|
||||
|
||||
async def run_test2(self, t):
|
||||
t2 = t.submit_requestreply(
|
||||
t2 = t.submit_send(
|
||||
payload={'message': {'text': 'bob_slow_timeout'.encode('utf-8')}},
|
||||
node_id_or_blinded_path=self.bob_pub)
|
||||
|
||||
@@ -323,7 +323,7 @@ class TestOnionMessageManager(ElectrumTestCase):
|
||||
await t2
|
||||
|
||||
async def run_test3(self, t, rkey):
|
||||
t3 = t.submit_requestreply(
|
||||
t3 = t.submit_send(
|
||||
payload={'message': {'text': 'carol_with_immediate_reply'.encode('utf-8')}},
|
||||
node_id_or_blinded_path=self.carol_pub,
|
||||
key=rkey)
|
||||
@@ -332,7 +332,7 @@ class TestOnionMessageManager(ElectrumTestCase):
|
||||
self.assertEqual(t3_result, ({'path_id': {'data': b'electrum' + rkey}}, {}))
|
||||
|
||||
async def run_test4(self, t, rkey):
|
||||
t4 = t.submit_requestreply(
|
||||
t4 = t.submit_send(
|
||||
payload={'message': {'text': 'dave_with_slow_reply'.encode('utf-8')}},
|
||||
node_id_or_blinded_path=self.dave_pub,
|
||||
key=rkey)
|
||||
@@ -341,7 +341,7 @@ class TestOnionMessageManager(ElectrumTestCase):
|
||||
self.assertEqual(t4_result, ({'path_id': {'data': b'electrum' + rkey}}, {}))
|
||||
|
||||
async def run_test5(self, t):
|
||||
t5 = t.submit_requestreply(
|
||||
t5 = t.submit_send(
|
||||
payload={'message': {'text': 'no_peer'.encode('utf-8')}},
|
||||
node_id_or_blinded_path=self.eve_pub)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user