diff --git a/electrum/lnpeer.py b/electrum/lnpeer.py index 39c6cf943..dc40eb040 100644 --- a/electrum/lnpeer.py +++ b/electrum/lnpeer.py @@ -2742,13 +2742,9 @@ class Peer(Logger, EventListener): return closing_tx.txid() async def htlc_switch(self): - # In this loop, an item of chan.unfulfilled_htlcs may go through 4 stages: - # - 1. not forwarded yet: (None, onion_packet_hex) - # - 2. forwarded: (forwarding_key, onion_packet_hex) - # - 3. processed: (forwarding_key, None), not irrevocably removed yet - # - 4. done: (forwarding_key, None), irrevocably removed - await self.initialized + # don't context switch in a htlc switch iteration as htlc sets are shared between peers + assert not inspect.iscoroutinefunction(self._run_htlc_switch_iteration) while True: await self.ping_if_required() self._htlc_switch_iterdone_event.set() @@ -2764,75 +2760,84 @@ class Peer(Logger, EventListener): await group.spawn(self.downstream_htlc_resolved_event.wait()) self._htlc_switch_iterstart_event.set() self._htlc_switch_iterstart_event.clear() - self._maybe_cleanup_received_htlcs_pending_removal() - for chan_id, chan in self.channels.items(): - if not chan.can_update_ctx(proposer=LOCAL): + self._run_htlc_switch_iteration() + + @util.profiler(min_threshold=0.02) + def _run_htlc_switch_iteration(self): + self._maybe_cleanup_received_htlcs_pending_removal() + # In this loop, an item of chan.unfulfilled_htlcs may go through 4 stages: + # - 1. not forwarded yet: (None, onion_packet_hex) + # - 2. forwarded: (forwarding_key, onion_packet_hex) + # - 3. processed: (forwarding_key, None), not irrevocably removed yet + # - 4. done: (forwarding_key, None), irrevocably removed + for chan_id, chan in self.channels.items(): + if not chan.can_update_ctx(proposer=LOCAL): + continue + self.maybe_send_commitment(chan) + done = set() + unfulfilled = chan.unfulfilled_htlcs + for htlc_id, (onion_packet_hex, forwarding_key) in unfulfilled.items(): + if not chan.hm.is_htlc_irrevocably_added_yet(htlc_proposer=REMOTE, htlc_id=htlc_id): continue - self.maybe_send_commitment(chan) - done = set() - unfulfilled = chan.unfulfilled_htlcs - for htlc_id, (onion_packet_hex, forwarding_key) in unfulfilled.items(): - if not chan.hm.is_htlc_irrevocably_added_yet(htlc_proposer=REMOTE, htlc_id=htlc_id): - continue - htlc = chan.hm.get_htlc_by_id(REMOTE, htlc_id) - if chan.hm.is_htlc_irrevocably_removed_yet(htlc_proposer=REMOTE, htlc_id=htlc_id): - assert onion_packet_hex is None - self.lnworker.maybe_cleanup_mpp(chan.get_scid_or_local_alias(), htlc) - if forwarding_key: - self.lnworker.maybe_cleanup_forwarding(forwarding_key) - done.add(htlc_id) - continue - if onion_packet_hex is None: - # has been processed already - continue - error_reason = None # type: Optional[OnionRoutingFailure] - error_bytes = None # type: Optional[bytes] - preimage = None - onion_packet_bytes = bytes.fromhex(onion_packet_hex) - onion_packet = None + htlc = chan.hm.get_htlc_by_id(REMOTE, htlc_id) + if chan.hm.is_htlc_irrevocably_removed_yet(htlc_proposer=REMOTE, htlc_id=htlc_id): + assert onion_packet_hex is None + self.lnworker.maybe_cleanup_mpp(chan.get_scid_or_local_alias(), htlc) + if forwarding_key: + self.lnworker.maybe_cleanup_forwarding(forwarding_key) + done.add(htlc_id) + continue + if onion_packet_hex is None: + # has been processed already + continue + error_reason = None # type: Optional[OnionRoutingFailure] + error_bytes = None # type: Optional[bytes] + preimage = None + onion_packet_bytes = bytes.fromhex(onion_packet_hex) + onion_packet = None + try: + onion_packet = OnionPacket.from_bytes(onion_packet_bytes) + except OnionRoutingFailure as e: + error_reason = e + else: try: - onion_packet = OnionPacket.from_bytes(onion_packet_bytes) + preimage, _forwarding_key, error_bytes = self.process_unfulfilled_htlc( + chan=chan, + htlc=htlc, + forwarding_key=forwarding_key, + onion_packet_bytes=onion_packet_bytes, + onion_packet=onion_packet) + if _forwarding_key: + assert forwarding_key is None + unfulfilled[htlc_id] = onion_packet_hex, _forwarding_key except OnionRoutingFailure as e: - error_reason = e + error_bytes = construct_onion_error(e, onion_packet.public_key, self.privkey, self.network.get_local_height()) + if error_bytes: + error_bytes = obfuscate_onion_error(error_bytes, onion_packet.public_key, our_onion_private_key=self.privkey) + + if preimage or error_reason or error_bytes: + if preimage: + self.lnworker.set_request_status(htlc.payment_hash, PR_PAID) + if not self.lnworker.enable_htlc_settle: + continue + self.fulfill_htlc(chan, htlc.htlc_id, preimage) + elif error_bytes: + self.fail_htlc( + chan=chan, + htlc_id=htlc.htlc_id, + error_bytes=error_bytes) else: - try: - preimage, _forwarding_key, error_bytes = self.process_unfulfilled_htlc( - chan=chan, - htlc=htlc, - forwarding_key=forwarding_key, - onion_packet_bytes=onion_packet_bytes, - onion_packet=onion_packet) - if _forwarding_key: - assert forwarding_key is None - unfulfilled[htlc_id] = onion_packet_hex, _forwarding_key - except OnionRoutingFailure as e: - error_bytes = construct_onion_error(e, onion_packet.public_key, self.privkey, self.network.get_local_height()) - if error_bytes: - error_bytes = obfuscate_onion_error(error_bytes, onion_packet.public_key, our_onion_private_key=self.privkey) + self.fail_malformed_htlc( + chan=chan, + htlc_id=htlc.htlc_id, + reason=error_reason) + # blank onion field to mark it as processed + unfulfilled[htlc_id] = None, forwarding_key - if preimage or error_reason or error_bytes: - if preimage: - self.lnworker.set_request_status(htlc.payment_hash, PR_PAID) - if not self.lnworker.enable_htlc_settle: - continue - self.fulfill_htlc(chan, htlc.htlc_id, preimage) - elif error_bytes: - self.fail_htlc( - chan=chan, - htlc_id=htlc.htlc_id, - error_bytes=error_bytes) - else: - self.fail_malformed_htlc( - chan=chan, - htlc_id=htlc.htlc_id, - reason=error_reason) - # blank onion field to mark it as processed - unfulfilled[htlc_id] = None, forwarding_key - - # cleanup - for htlc_id in done: - unfulfilled.pop(htlc_id) - self.maybe_send_commitment(chan) + # cleanup + for htlc_id in done: + unfulfilled.pop(htlc_id) + self.maybe_send_commitment(chan) def _maybe_cleanup_received_htlcs_pending_removal(self) -> None: done = set()