lnpeer: distinguish local and remote pending updates
This commit is contained in:
@@ -77,13 +77,14 @@ class Peer(PrintError):
|
|||||||
self.localfeatures |= LnLocalFeatures.OPTION_DATA_LOSS_PROTECT_REQ
|
self.localfeatures |= LnLocalFeatures.OPTION_DATA_LOSS_PROTECT_REQ
|
||||||
self.attempted_route = {}
|
self.attempted_route = {}
|
||||||
self.orphan_channel_updates = OrderedDict()
|
self.orphan_channel_updates = OrderedDict()
|
||||||
self.pending_updates = defaultdict(bool)
|
self.remote_pending_updates = defaultdict(bool) # true if we sent updates that we have not commited yet
|
||||||
|
self.local_pending_updates = defaultdict(bool) # true if we received updates that we have not commited yet
|
||||||
self._local_changed_events = defaultdict(asyncio.Event)
|
self._local_changed_events = defaultdict(asyncio.Event)
|
||||||
self._remote_changed_events = defaultdict(asyncio.Event)
|
self._remote_changed_events = defaultdict(asyncio.Event)
|
||||||
|
|
||||||
def send_message(self, message_name: str, **kwargs):
|
def send_message(self, message_name: str, **kwargs):
|
||||||
assert type(message_name) is str
|
assert type(message_name) is str
|
||||||
#self.print_error("Sending '%s'"%message_name.upper())
|
self.print_error("Sending '%s'"%message_name.upper())
|
||||||
self.transport.send_bytes(encode_msg(message_name, **kwargs))
|
self.transport.send_bytes(encode_msg(message_name, **kwargs))
|
||||||
|
|
||||||
async def initialize(self):
|
async def initialize(self):
|
||||||
@@ -770,7 +771,7 @@ class Peer(PrintError):
|
|||||||
# process update_fail_htlc on channel
|
# process update_fail_htlc on channel
|
||||||
chan = self.channels[channel_id]
|
chan = self.channels[channel_id]
|
||||||
chan.receive_fail_htlc(htlc_id)
|
chan.receive_fail_htlc(htlc_id)
|
||||||
self.pending_updates[chan] = True
|
self.local_pending_updates[chan] = True
|
||||||
local_ctn = chan.get_current_ctn(LOCAL)
|
local_ctn = chan.get_current_ctn(LOCAL)
|
||||||
asyncio.ensure_future(self._on_update_fail_htlc(chan, htlc_id, local_ctn))
|
asyncio.ensure_future(self._on_update_fail_htlc(chan, htlc_id, local_ctn))
|
||||||
|
|
||||||
@@ -821,12 +822,13 @@ class Peer(PrintError):
|
|||||||
self.network.path_finder.blacklist.add(short_chan_id)
|
self.network.path_finder.blacklist.add(short_chan_id)
|
||||||
|
|
||||||
def maybe_send_commitment(self, chan: Channel):
|
def maybe_send_commitment(self, chan: Channel):
|
||||||
if not self.pending_updates[chan]:
|
if not self.local_pending_updates[chan] and not self.remote_pending_updates[chan]:
|
||||||
return
|
return
|
||||||
self.print_error('send_commitment')
|
self.print_error('send_commitment')
|
||||||
sig_64, htlc_sigs = chan.sign_next_commitment()
|
sig_64, htlc_sigs = chan.sign_next_commitment()
|
||||||
self.send_message("commitment_signed", channel_id=chan.channel_id, signature=sig_64, num_htlcs=len(htlc_sigs), htlc_signature=b"".join(htlc_sigs))
|
self.send_message("commitment_signed", channel_id=chan.channel_id, signature=sig_64, num_htlcs=len(htlc_sigs), htlc_signature=b"".join(htlc_sigs))
|
||||||
self.pending_updates[chan] = False
|
self.local_pending_updates[chan] = False
|
||||||
|
self.remote_pending_updates[chan] = False
|
||||||
|
|
||||||
async def await_remote(self, chan: Channel, ctn: int):
|
async def await_remote(self, chan: Channel, ctn: int):
|
||||||
self.maybe_send_commitment(chan)
|
self.maybe_send_commitment(chan)
|
||||||
@@ -862,7 +864,7 @@ class Peer(PrintError):
|
|||||||
amount_msat=amount_msat,
|
amount_msat=amount_msat,
|
||||||
payment_hash=payment_hash,
|
payment_hash=payment_hash,
|
||||||
onion_routing_packet=onion.to_bytes())
|
onion_routing_packet=onion.to_bytes())
|
||||||
self.pending_updates[chan] = True
|
self.remote_pending_updates[chan] = True
|
||||||
await self.await_remote(chan, remote_ctn)
|
await self.await_remote(chan, remote_ctn)
|
||||||
return UpdateAddHtlc(**htlc, htlc_id=htlc_id)
|
return UpdateAddHtlc(**htlc, htlc_id=htlc_id)
|
||||||
|
|
||||||
@@ -891,7 +893,7 @@ class Peer(PrintError):
|
|||||||
preimage = update_fulfill_htlc_msg["payment_preimage"]
|
preimage = update_fulfill_htlc_msg["payment_preimage"]
|
||||||
htlc_id = int.from_bytes(update_fulfill_htlc_msg["id"], "big")
|
htlc_id = int.from_bytes(update_fulfill_htlc_msg["id"], "big")
|
||||||
chan.receive_htlc_settle(preimage, htlc_id)
|
chan.receive_htlc_settle(preimage, htlc_id)
|
||||||
self.pending_updates[chan] = True
|
self.local_pending_updates[chan] = True
|
||||||
local_ctn = chan.get_current_ctn(LOCAL)
|
local_ctn = chan.get_current_ctn(LOCAL)
|
||||||
asyncio.ensure_future(self._on_update_fulfill_htlc(chan, htlc_id, preimage, local_ctn))
|
asyncio.ensure_future(self._on_update_fulfill_htlc(chan, htlc_id, preimage, local_ctn))
|
||||||
|
|
||||||
@@ -923,15 +925,18 @@ class Peer(PrintError):
|
|||||||
# add htlc
|
# add htlc
|
||||||
htlc = {'amount_msat': amount_msat_htlc, 'payment_hash':payment_hash, 'cltv_expiry':cltv_expiry}
|
htlc = {'amount_msat': amount_msat_htlc, 'payment_hash':payment_hash, 'cltv_expiry':cltv_expiry}
|
||||||
htlc_id = chan.receive_htlc(htlc)
|
htlc_id = chan.receive_htlc(htlc)
|
||||||
|
self.local_pending_updates[chan] = True
|
||||||
local_ctn = chan.get_current_ctn(LOCAL)
|
local_ctn = chan.get_current_ctn(LOCAL)
|
||||||
|
remote_ctn = chan.get_current_ctn(REMOTE)
|
||||||
if processed_onion.are_we_final:
|
if processed_onion.are_we_final:
|
||||||
asyncio.ensure_future(self._maybe_fulfill_htlc(chan, local_ctn, htlc_id, htlc, payment_hash, cltv_expiry, amount_msat_htlc, processed_onion))
|
asyncio.ensure_future(self._maybe_fulfill_htlc(chan, local_ctn, remote_ctn, htlc_id, htlc, payment_hash, cltv_expiry, amount_msat_htlc, processed_onion))
|
||||||
else:
|
else:
|
||||||
asyncio.ensure_future(self._maybe_forward_htlc(chan, local_ctn, htlc_id, htlc, payment_hash, cltv_expiry, amount_msat_htlc, processed_onion))
|
asyncio.ensure_future(self._maybe_forward_htlc(chan, local_ctn, remote_ctn, htlc_id, htlc, payment_hash, cltv_expiry, amount_msat_htlc, processed_onion))
|
||||||
|
|
||||||
@log_exceptions
|
@log_exceptions
|
||||||
async def _maybe_forward_htlc(self, chan, local_ctn, htlc_id, htlc, payment_hash, cltv_expiry, amount_msat_htlc, processed_onion):
|
async def _maybe_forward_htlc(self, chan, local_ctn, remote_ctn, htlc_id, htlc, payment_hash, cltv_expiry, amount_msat_htlc, processed_onion):
|
||||||
await self.await_local(chan, local_ctn)
|
await self.await_local(chan, local_ctn)
|
||||||
|
await self.await_remote(chan, remote_ctn)
|
||||||
# Forward HTLC
|
# Forward HTLC
|
||||||
# FIXME: this is not robust to us going offline before payment is fulfilled
|
# FIXME: this is not robust to us going offline before payment is fulfilled
|
||||||
dph = processed_onion.hop_data.per_hop
|
dph = processed_onion.hop_data.per_hop
|
||||||
@@ -957,7 +962,7 @@ class Peer(PrintError):
|
|||||||
payment_hash=payment_hash,
|
payment_hash=payment_hash,
|
||||||
onion_routing_packet=processed_onion.next_packet.to_bytes()
|
onion_routing_packet=processed_onion.next_packet.to_bytes()
|
||||||
)
|
)
|
||||||
next_peer.pending_updates[next_chan] = True
|
next_peer.remote_pending_updates[next_chan] = True
|
||||||
await next_peer.await_remote(next_chan, next_remote_ctn)
|
await next_peer.await_remote(next_chan, next_remote_ctn)
|
||||||
# wait until we get paid
|
# wait until we get paid
|
||||||
preimage = await next_peer.payment_preimages[payment_hash].get()
|
preimage = await next_peer.payment_preimages[payment_hash].get()
|
||||||
@@ -966,8 +971,9 @@ class Peer(PrintError):
|
|||||||
self.print_error("htlc forwarded successfully")
|
self.print_error("htlc forwarded successfully")
|
||||||
|
|
||||||
@log_exceptions
|
@log_exceptions
|
||||||
async def _maybe_fulfill_htlc(self, chan, local_ctn, htlc_id, htlc, payment_hash, cltv_expiry, amount_msat_htlc, processed_onion):
|
async def _maybe_fulfill_htlc(self, chan, local_ctn, remote_ctn, htlc_id, htlc, payment_hash, cltv_expiry, amount_msat_htlc, processed_onion):
|
||||||
await self.await_local(chan, local_ctn)
|
await self.await_local(chan, local_ctn)
|
||||||
|
await self.await_remote(chan, remote_ctn)
|
||||||
try:
|
try:
|
||||||
preimage, invoice = self.lnworker.get_invoice(payment_hash)
|
preimage, invoice = self.lnworker.get_invoice(payment_hash)
|
||||||
except UnknownPaymentHash:
|
except UnknownPaymentHash:
|
||||||
@@ -1009,7 +1015,7 @@ class Peer(PrintError):
|
|||||||
channel_id=chan.channel_id,
|
channel_id=chan.channel_id,
|
||||||
id=htlc_id,
|
id=htlc_id,
|
||||||
payment_preimage=preimage)
|
payment_preimage=preimage)
|
||||||
self.pending_updates[chan] = True
|
self.remote_pending_updates[chan] = True
|
||||||
await self.await_remote(chan, remote_ctn)
|
await self.await_remote(chan, remote_ctn)
|
||||||
self.network.trigger_callback('ln_message', self.lnworker, 'Payment received', htlc_id)
|
self.network.trigger_callback('ln_message', self.lnworker, 'Payment received', htlc_id)
|
||||||
|
|
||||||
@@ -1024,7 +1030,7 @@ class Peer(PrintError):
|
|||||||
id=htlc_id,
|
id=htlc_id,
|
||||||
len=len(error_packet),
|
len=len(error_packet),
|
||||||
reason=error_packet)
|
reason=error_packet)
|
||||||
self.pending_updates[chan] = True
|
self.remote_pending_updates[chan] = True
|
||||||
await self.await_remote(chan, remote_ctn)
|
await self.await_remote(chan, remote_ctn)
|
||||||
|
|
||||||
def on_revoke_and_ack(self, payload):
|
def on_revoke_and_ack(self, payload):
|
||||||
@@ -1041,7 +1047,7 @@ class Peer(PrintError):
|
|||||||
feerate =int.from_bytes(payload["feerate_per_kw"], "big")
|
feerate =int.from_bytes(payload["feerate_per_kw"], "big")
|
||||||
chan = self.channels[channel_id]
|
chan = self.channels[channel_id]
|
||||||
chan.update_fee(feerate, False)
|
chan.update_fee(feerate, False)
|
||||||
self.pending_updates[chan] = True
|
self.local_pending_updates[chan] = True
|
||||||
|
|
||||||
async def bitcoin_fee_update(self, chan: Channel):
|
async def bitcoin_fee_update(self, chan: Channel):
|
||||||
"""
|
"""
|
||||||
@@ -1065,7 +1071,7 @@ class Peer(PrintError):
|
|||||||
self.send_message("update_fee",
|
self.send_message("update_fee",
|
||||||
channel_id=chan.channel_id,
|
channel_id=chan.channel_id,
|
||||||
feerate_per_kw=feerate_per_kw)
|
feerate_per_kw=feerate_per_kw)
|
||||||
self.pending_updates[chan] = True
|
self.remote_pending_updates[chan] = True
|
||||||
await self.await_remote(chan, remote_ctn)
|
await self.await_remote(chan, remote_ctn)
|
||||||
|
|
||||||
def on_closing_signed(self, payload):
|
def on_closing_signed(self, payload):
|
||||||
|
|||||||
Reference in New Issue
Block a user