Merge pull request #8814 from SomberNight/202401_lnpeer_chan_reest
lnpeer.reestablish_channel: fix some timing issues for dataloss case
This commit is contained in:
@@ -119,9 +119,6 @@ class ChannelsList(MyTreeView):
|
||||
def on_channel_closed(self, txid):
|
||||
self.main_window.show_error('Channel closed' + '\n' + txid)
|
||||
|
||||
def on_request_sent(self, b):
|
||||
self.main_window.show_message(_('Request sent'))
|
||||
|
||||
def on_failure(self, exc_info):
|
||||
type_, e, tb = exc_info
|
||||
traceback.print_tb(tb)
|
||||
@@ -137,7 +134,7 @@ class ChannelsList(MyTreeView):
|
||||
on_success = self.on_channel_closed
|
||||
def task():
|
||||
return self.network.run_from_another_thread(coro)
|
||||
WaitingDialog(self, 'please wait..', task, on_success, self.on_failure)
|
||||
WaitingDialog(self, _('Please wait...'), task, on_success, self.on_failure)
|
||||
|
||||
def force_close(self, channel_id):
|
||||
self.save_backup = True
|
||||
@@ -161,7 +158,7 @@ class ChannelsList(MyTreeView):
|
||||
def task():
|
||||
coro = self.lnworker.force_close_channel(channel_id)
|
||||
return self.network.run_from_another_thread(coro)
|
||||
WaitingDialog(self, 'please wait..', task, self.on_channel_closed, self.on_failure)
|
||||
WaitingDialog(self, _('Please wait...'), task, self.on_channel_closed, self.on_failure)
|
||||
|
||||
def remove_channel(self, channel_id):
|
||||
if self.main_window.question(_('Are you sure you want to delete this channel? This will purge associated transactions from your wallet history.')):
|
||||
@@ -191,7 +188,9 @@ class ChannelsList(MyTreeView):
|
||||
def task():
|
||||
coro = self.lnworker.request_force_close(channel_id)
|
||||
return self.network.run_from_another_thread(coro)
|
||||
WaitingDialog(self, 'please wait..', task, self.on_request_sent, self.on_failure)
|
||||
def on_done(b):
|
||||
self.main_window.show_message(_('Request scheduled'))
|
||||
WaitingDialog(self, _('Please wait...'), task, on_done, self.on_failure)
|
||||
|
||||
def set_frozen(self, chan, *, for_sending, value):
|
||||
if not self.lnworker.uses_trampoline() or self.lnworker.is_trampoline_peer(chan.node_id):
|
||||
|
||||
@@ -140,6 +140,7 @@ state_transitions = [
|
||||
(cs.OPEN, cs.WE_ARE_TOXIC),
|
||||
(cs.SHUTDOWN, cs.WE_ARE_TOXIC),
|
||||
(cs.REQUESTED_FCLOSE, cs.WE_ARE_TOXIC),
|
||||
(cs.WE_ARE_TOXIC, cs.WE_ARE_TOXIC),
|
||||
#
|
||||
(cs.FORCE_CLOSING, cs.FORCE_CLOSING), # allow multiple attempts
|
||||
(cs.FORCE_CLOSING, cs.CLOSED),
|
||||
@@ -1015,6 +1016,8 @@ class Channel(AbstractChannel):
|
||||
def should_try_to_reestablish_peer(self) -> bool:
|
||||
if self.peer_state != PeerState.DISCONNECTED:
|
||||
return False
|
||||
if self.should_request_force_close:
|
||||
return True
|
||||
return ChannelState.PREOPENING < self._state < ChannelState.CLOSING
|
||||
|
||||
def get_funding_address(self):
|
||||
@@ -1629,6 +1632,8 @@ class Channel(AbstractChannel):
|
||||
if not self.has_unsettled_htlcs():
|
||||
ret.append(ChanCloseOption.COOP_CLOSE)
|
||||
ret.append(ChanCloseOption.REQUEST_REMOTE_FCLOSE)
|
||||
if self.get_state() == ChannelState.WE_ARE_TOXIC:
|
||||
ret.append(ChanCloseOption.REQUEST_REMOTE_FCLOSE)
|
||||
if not self.is_closed() or self.get_state() == ChannelState.REQUESTED_FCLOSE:
|
||||
ret.append(ChanCloseOption.LOCAL_FCLOSE)
|
||||
assert not (self.get_state() == ChannelState.WE_ARE_TOXIC and ChanCloseOption.LOCAL_FCLOSE in ret), "local force-close unsafe if we are toxic"
|
||||
|
||||
@@ -1113,6 +1113,7 @@ class Peer(Logger):
|
||||
async def request_force_close(self, channel_id: bytes):
|
||||
"""Try to trigger the remote peer to force-close."""
|
||||
await self.initialized
|
||||
self.logger.info(f"trying to get remote peer to force-close chan {channel_id.hex()}")
|
||||
# First, we intentionally send a "channel_reestablish" msg with an old state.
|
||||
# Many nodes (but not all) automatically force-close when seeing this.
|
||||
latest_point = secret_to_pubkey(42) # we need a valid point (BOLT2)
|
||||
@@ -1146,7 +1147,11 @@ class Peer(Logger):
|
||||
self.logger.info(f"tried to force-close channel {chan.get_id_for_log()} "
|
||||
f"but close option is not allowed. {chan.get_state()=!r}")
|
||||
|
||||
def on_channel_reestablish(self, chan, msg):
|
||||
def on_channel_reestablish(self, chan: Channel, msg):
|
||||
# Note: it is critical for this message handler to block processing of further messages,
|
||||
# until this msg is processed. If we are behind (lost state), and send chan_reest to the remote,
|
||||
# when the remote realizes we are behind, they might send an "error" message - but the spec mandates
|
||||
# they send chan_reest first. If we processed the error first, we might force-close and lose money!
|
||||
their_next_local_ctn = msg["next_commitment_number"]
|
||||
their_oldest_unrevoked_remote_ctn = msg["next_revocation_number"]
|
||||
their_local_pcp = msg.get("my_current_per_commitment_point")
|
||||
@@ -1230,40 +1235,23 @@ class Peer(Logger):
|
||||
self.lnworker.save_channel(chan)
|
||||
chan.peer_state = PeerState.BAD
|
||||
# raise after we send channel_reestablish, so the remote can realize they are ahead
|
||||
fut.set_exception(RemoteMisbehaving("remote ahead of us"))
|
||||
# FIXME what if we have multiple chans with peer? timing...
|
||||
fut.set_exception(GracefulDisconnect("remote ahead of us"))
|
||||
elif we_are_ahead:
|
||||
self.logger.warning(f"channel_reestablish ({chan.get_id_for_log()}): we are ahead of remote! trying to force-close.")
|
||||
self.schedule_force_closing(chan.channel_id)
|
||||
fut.set_exception(RemoteMisbehaving("we are ahead of remote"))
|
||||
# FIXME what if we have multiple chans with peer? timing...
|
||||
fut.set_exception(GracefulDisconnect("we are ahead of remote"))
|
||||
else:
|
||||
# all good
|
||||
fut.set_result((we_must_resend_revoke_and_ack, their_next_local_ctn))
|
||||
|
||||
async def reestablish_channel(self, chan: Channel):
|
||||
await self.initialized
|
||||
def _send_channel_reestablish(self, chan: Channel):
|
||||
assert self.is_initialized()
|
||||
chan_id = chan.channel_id
|
||||
if chan.should_request_force_close:
|
||||
chan.set_state(ChannelState.REQUESTED_FCLOSE)
|
||||
await self.request_force_close(chan_id)
|
||||
chan.should_request_force_close = False
|
||||
return
|
||||
assert ChannelState.PREOPENING < chan.get_state() < ChannelState.FORCE_CLOSING
|
||||
if chan.peer_state != PeerState.DISCONNECTED:
|
||||
self.logger.info(
|
||||
f'reestablish_channel was called but channel {chan.get_id_for_log()} '
|
||||
f'already in peer_state {chan.peer_state!r}')
|
||||
return
|
||||
chan.peer_state = PeerState.REESTABLISHING
|
||||
util.trigger_callback('channel', self.lnworker.wallet, chan)
|
||||
# ctns
|
||||
oldest_unrevoked_local_ctn = chan.get_oldest_unrevoked_ctn(LOCAL)
|
||||
latest_local_ctn = chan.get_latest_ctn(LOCAL)
|
||||
next_local_ctn = chan.get_next_ctn(LOCAL)
|
||||
oldest_unrevoked_remote_ctn = chan.get_oldest_unrevoked_ctn(REMOTE)
|
||||
latest_remote_ctn = chan.get_latest_ctn(REMOTE)
|
||||
next_remote_ctn = chan.get_next_ctn(REMOTE)
|
||||
# BOLT-02: "A node [...] upon disconnection [...] MUST reverse any uncommitted updates sent by the other side"
|
||||
chan.hm.discard_unsigned_remote_updates()
|
||||
# send message
|
||||
assert chan.is_static_remotekey_enabled()
|
||||
latest_secret, latest_point = chan.get_secret_and_point(LOCAL, 0)
|
||||
@@ -1284,6 +1272,46 @@ class Peer(Logger):
|
||||
f'(next_local_ctn={next_local_ctn}, '
|
||||
f'oldest_unrevoked_remote_ctn={oldest_unrevoked_remote_ctn})')
|
||||
|
||||
async def reestablish_channel(self, chan: Channel):
|
||||
await self.initialized
|
||||
chan_id = chan.channel_id
|
||||
if chan.should_request_force_close:
|
||||
if chan.get_state() != ChannelState.WE_ARE_TOXIC:
|
||||
chan.set_state(ChannelState.REQUESTED_FCLOSE)
|
||||
await self.request_force_close(chan_id)
|
||||
chan.should_request_force_close = False
|
||||
return
|
||||
if chan.get_state() == ChannelState.WE_ARE_TOXIC:
|
||||
# Depending on timing, the remote might not know we are behind.
|
||||
# We should let them know, so that they force-close.
|
||||
# We do "request force-close" with ctn=0, instead of leaking our actual ctns,
|
||||
# to decrease the remote's confidence of actual data loss on our part.
|
||||
await self.request_force_close(chan_id)
|
||||
return
|
||||
if chan.get_state() == ChannelState.FORCE_CLOSING:
|
||||
# We likely got here because we found out that we are ahead (i.e. remote lost state).
|
||||
# Depending on timing, the remote might not know they are behind.
|
||||
# We should let them know:
|
||||
self._send_channel_reestablish(chan)
|
||||
return
|
||||
# if we get here, we will try to do a proper reestablish
|
||||
if not (ChannelState.PREOPENING < chan.get_state() < ChannelState.FORCE_CLOSING):
|
||||
raise Exception(f"unexpected {chan.get_state()=} for reestablish")
|
||||
if chan.peer_state != PeerState.DISCONNECTED:
|
||||
self.logger.info(
|
||||
f'reestablish_channel was called but channel {chan.get_id_for_log()} '
|
||||
f'already in peer_state {chan.peer_state!r}')
|
||||
return
|
||||
chan.peer_state = PeerState.REESTABLISHING
|
||||
util.trigger_callback('channel', self.lnworker.wallet, chan)
|
||||
# ctns
|
||||
oldest_unrevoked_local_ctn = chan.get_oldest_unrevoked_ctn(LOCAL)
|
||||
next_local_ctn = chan.get_next_ctn(LOCAL)
|
||||
oldest_unrevoked_remote_ctn = chan.get_oldest_unrevoked_ctn(REMOTE)
|
||||
# BOLT-02: "A node [...] upon disconnection [...] MUST reverse any uncommitted updates sent by the other side"
|
||||
chan.hm.discard_unsigned_remote_updates()
|
||||
# send message
|
||||
self._send_channel_reestablish(chan)
|
||||
# wait until we receive their channel_reestablish
|
||||
fut = self.channel_reestablish_msg[chan_id]
|
||||
await fut
|
||||
|
||||
@@ -2904,9 +2904,8 @@ class LNWallet(LNWorker):
|
||||
if self._can_retry_addr(peer, urgent=True):
|
||||
await self._add_peer(peer.host, peer.port, peer.pubkey)
|
||||
for chan in self.channels.values():
|
||||
if chan.is_closed():
|
||||
continue
|
||||
# reestablish
|
||||
# note: we delegate filtering out uninteresting chans to this:
|
||||
if not chan.should_try_to_reestablish_peer():
|
||||
continue
|
||||
peer = self._peers.get(chan.node_id, None)
|
||||
@@ -2961,10 +2960,9 @@ class LNWallet(LNWorker):
|
||||
if channel_id in self.channels:
|
||||
chan = self.channels[channel_id]
|
||||
peer = self._peers.get(chan.node_id)
|
||||
if not peer:
|
||||
raise Exception('Peer not found')
|
||||
chan.should_request_force_close = True
|
||||
peer.close_and_cleanup()
|
||||
if peer:
|
||||
peer.close_and_cleanup() # to force a reconnect
|
||||
elif connect_str:
|
||||
peer = await self.add_peer(connect_str)
|
||||
await peer.request_force_close(channel_id)
|
||||
|
||||
@@ -566,30 +566,50 @@ class TestPeerDirect(TestPeer):
|
||||
await gath
|
||||
|
||||
async def test_reestablish_with_old_state(self):
|
||||
random_seed = os.urandom(32)
|
||||
alice_channel, bob_channel = create_test_channels(random_seed=random_seed)
|
||||
alice_channel_0, bob_channel_0 = create_test_channels(random_seed=random_seed) # these are identical
|
||||
p1, p2, w1, w2, _q1, _q2 = self.prepare_peers(alice_channel, bob_channel)
|
||||
lnaddr, pay_req = self.prepare_invoice(w2)
|
||||
async def pay():
|
||||
result, log = await w1.pay_invoice(pay_req)
|
||||
self.assertEqual(result, True)
|
||||
gath.cancel()
|
||||
gath = asyncio.gather(pay(), p1._message_loop(), p2._message_loop(), p1.htlc_switch(), p2.htlc_switch())
|
||||
with self.assertRaises(asyncio.CancelledError):
|
||||
await gath
|
||||
p1, p2, w1, w2, _q1, _q2 = self.prepare_peers(alice_channel_0, bob_channel)
|
||||
for chan in (alice_channel_0, bob_channel):
|
||||
chan.peer_state = PeerState.DISCONNECTED
|
||||
async def reestablish():
|
||||
await asyncio.gather(
|
||||
p1.reestablish_channel(alice_channel_0),
|
||||
p2.reestablish_channel(bob_channel))
|
||||
gath = asyncio.gather(reestablish(), p1._message_loop(), p2._message_loop(), p1.htlc_switch(), p2.htlc_switch())
|
||||
with self.assertRaises(lnutil.RemoteMisbehaving):
|
||||
await gath
|
||||
self.assertEqual(alice_channel_0.peer_state, PeerState.BAD)
|
||||
self.assertEqual(bob_channel._state, ChannelState.FORCE_CLOSING)
|
||||
async def f(alice_slow: bool, bob_slow: bool):
|
||||
random_seed = os.urandom(32)
|
||||
alice_channel, bob_channel = create_test_channels(random_seed=random_seed)
|
||||
alice_channel_0, bob_channel_0 = create_test_channels(random_seed=random_seed) # these are identical
|
||||
p1, p2, w1, w2, _q1, _q2 = self.prepare_peers(alice_channel, bob_channel)
|
||||
lnaddr, pay_req = self.prepare_invoice(w2)
|
||||
async def pay():
|
||||
result, log = await w1.pay_invoice(pay_req)
|
||||
self.assertEqual(result, True)
|
||||
gath.cancel()
|
||||
gath = asyncio.gather(pay(), p1._message_loop(), p2._message_loop(), p1.htlc_switch(), p2.htlc_switch())
|
||||
with self.assertRaises(asyncio.CancelledError):
|
||||
await gath
|
||||
p1, p2, w1, w2, _q1, _q2 = self.prepare_peers(alice_channel_0, bob_channel)
|
||||
for chan in (alice_channel_0, bob_channel):
|
||||
chan.peer_state = PeerState.DISCONNECTED
|
||||
|
||||
async def alice_sends_reest():
|
||||
if alice_slow: await asyncio.sleep(0.05)
|
||||
await p1.reestablish_channel(alice_channel_0)
|
||||
async def bob_sends_reest():
|
||||
if bob_slow: await asyncio.sleep(0.05)
|
||||
await p2.reestablish_channel(bob_channel)
|
||||
|
||||
with self.assertRaises(GracefulDisconnect):
|
||||
async with OldTaskGroup() as group:
|
||||
await group.spawn(p1._message_loop())
|
||||
await group.spawn(p1.htlc_switch())
|
||||
await group.spawn(p2._message_loop())
|
||||
await group.spawn(p2.htlc_switch())
|
||||
await group.spawn(alice_sends_reest)
|
||||
await group.spawn(bob_sends_reest)
|
||||
self.assertEqual(alice_channel_0.peer_state, PeerState.BAD)
|
||||
self.assertEqual(alice_channel_0._state, ChannelState.WE_ARE_TOXIC)
|
||||
self.assertEqual(bob_channel._state, ChannelState.FORCE_CLOSING)
|
||||
|
||||
with self.subTest(msg="both fast"):
|
||||
# FIXME: we want to test the case where both Alice and Bob sends channel-reestablish before
|
||||
# receiving what the other sent. This is not a reliable way to do that...
|
||||
await f(alice_slow=False, bob_slow=False)
|
||||
with self.subTest(msg="alice is slow"):
|
||||
await f(alice_slow=True, bob_slow=False)
|
||||
with self.subTest(msg="bob is slow"):
|
||||
await f(alice_slow=False, bob_slow=True)
|
||||
|
||||
@staticmethod
|
||||
def _send_fake_htlc(peer: Peer, chan: Channel) -> UpdateAddHtlc:
|
||||
|
||||
Reference in New Issue
Block a user