1
0

ln: restore channels correctly after restart

* save funding_locked_received: if a node already sent us
funding_locked, save it to avoid superfluous messages

* use Queues instead of Futures: this ensure that we don't error if we
receive two messages of the same type, and in avoids having to delete
futures in finally blocks. A queue monitor could be added to detect
queue elements that are not popped.

* request initial routing sync: since we don't store the graph yet, it
is better to request the graph from the Peer so that we can route

* channel_state cleanup: now each channel should have a state, which is
initialized to OPENING and only marked OPEN once we have verified that
the funding_tx has been mined
This commit is contained in:
Janus
2018-05-29 18:12:48 +02:00
committed by ThomasV
parent aafbe74a28
commit ae3971259d
2 changed files with 73 additions and 111 deletions

View File

@@ -12,7 +12,7 @@ import asyncio
from . import constants
from .bitcoin import sha256, COIN
from .util import bh2u, bfh
from .util import bh2u, bfh, PrintError
from .constants import set_testnet, set_simnet
from .simple_config import SimpleConfig
from .network import Network
@@ -86,7 +86,7 @@ node_list = [
class LNWorker:
class LNWorker(PrintError):
def __init__(self, wallet, network):
self.wallet = wallet
@@ -100,7 +100,7 @@ class LNWorker:
self.path_finder = lnrouter.LNPathFinder(self.channel_db)
self.channels = [reconstruct_namedtuples(x) for x in wallet.storage.get("channels", {})]
peer_list = network.config.get('lightning_peers', node_list)
self.channel_state = {}
self.channel_state = {chan.channel_id: "OPENING" for chan in self.channels}
for host, port, pubkey in peer_list:
self.add_peer(host, int(port), pubkey)
# wait until we see confirmations
@@ -110,49 +110,65 @@ class LNWorker:
def add_peer(self, host, port, pubkey):
node_id = bfh(pubkey)
channels = list(filter(lambda x: x.node_id == node_id, self.channels))
peer = Peer(host, int(port), node_id, self.privkey, self.network, self.channel_db, self.path_finder, self.channel_state, channels)
peer = Peer(host, int(port), node_id, self.privkey, self.network, self.channel_db, self.path_finder, self.channel_state, channels, request_initial_sync=True)
self.network.futures.append(asyncio.run_coroutine_threadsafe(peer.main_loop(), asyncio.get_event_loop()))
self.peers[node_id] = peer
def save_channel(self, openchannel):
if openchannel.channel_id not in self.channel_state:
self.channel_state[openchannel.channel_id] = "OPENING"
self.channels = [openchannel] # TODO multiple channels
dumped = serialize_channels(self.channels)
self.wallet.storage.put("channels", dumped)
self.wallet.storage.write()
def save_short_chan_id(self, chan):
"""
Checks if the Funding TX has been mined. If it has save the short channel ID to disk and return the new OpenChannel.
If the Funding TX has not been mined, return None
"""
assert self.channel_state[chan.channel_id] == "OPENING"
peer = self.peers[chan.node_id]
conf = self.wallet.get_tx_height(chan.funding_outpoint.txid)[1]
if conf >= chan.constraints.funding_txn_minimum_depth:
block_height, tx_pos = self.wallet.get_txpos(chan.funding_outpoint.txid)
if tx_pos == -1:
self.print_error('funding tx is not yet SPV verified.. but there are '
'already enough confirmations (currently {})'.format(conf))
return None
chan = chan._replace(short_channel_id = calc_short_channel_id(block_height, tx_pos, chan.funding_outpoint.output_index))
self.save_channel(chan)
return chan
return None
def on_network_update(self, event, *args):
for chan in self.channels:
if self.channel_state[chan.channel_id] == "OPEN":
continue
chan = self.save_short_chan_id(chan)
if not chan:
self.print_error("network update but funding tx is still not at sufficient depth")
continue
peer = self.peers[chan.node_id]
conf = self.wallet.get_tx_height(chan.funding_outpoint.txid)[1]
if conf >= chan.constraints.funding_txn_minimum_depth:
block_height, tx_pos = self.wallet.get_txpos(chan.funding_outpoint.txid)
if tx_pos == -1:
self.print_error('funding tx is not yet SPV verified.. but there are '
'already enough confirmations (currently {})'.format(conf))
return
if chan.channel_id not in self.channel_state or self.channel_state[chan.channel_id] != "OPENING":
return
asyncio.run_coroutine_threadsafe(self.set_local_funding_locked_result(peer, chan, block_height, tx_pos), asyncio.get_event_loop())
asyncio.run_coroutine_threadsafe(self.wait_funding_locked_and_mark_open(peer, chan), asyncio.get_event_loop())
# aiosafe because we don't wait for result
@aiosafe
async def set_local_funding_locked_result(self, peer, chan, block_height, tx_pos):
channel_id = chan.channel_id
short_channel_id = calc_short_channel_id(block_height, tx_pos, chan.funding_outpoint.output_index)
try:
peer.local_funding_locked[channel_id].set_result(short_channel_id)
except (asyncio.InvalidStateError, KeyError) as e:
# FIXME race condition if updates come in quickly, set_result might be called multiple times
# or self.local_funding_locked[channel_id] might be deleted already
self.print_error('local_funding_locked.set_result error for channel {}: {}'.format(channel_id, e))
openchannel = await peer.funding_locked(chan)
self.save_channel(openchannel)
print("CHANNEL OPENING COMPLETED")
async def wait_funding_locked_and_mark_open(self, peer, chan):
if self.channel_state[chan.channel_id] == "OPEN":
return
if not chan.local_state.funding_locked_received:
chan = await peer.funding_locked(chan)
self.save_channel(chan)
self.print_error("CHANNEL OPENING COMPLETED")
self.channel_state[chan.channel_id] = "OPEN"
# not aiosafe because we call .result() which will propagate an exception
async def _open_channel_coroutine(self, node_id, amount, push_msat, password):
peer = self.peers[bfh(node_id)]
openingchannel = await peer.channel_establishment_flow(self.wallet, self.config, password, amount, push_msat, temp_channel_id=os.urandom(32))
self.print_error("SAVING OPENING CHANNEL")
self.save_channel(openingchannel)
def open_channel(self, node_id, local_amt, push_amt, pw):