1
0

Synchronize watchtower asynchronously:

- remove remote_commitment_to_be_revoked
- pass old ctns to lnsweep.create_sweeptxs_for_watchtower
- store the ctn of sweeptxs in sweepStore database
- request the highest ctn from sweepstore using get_ctn
- send sweeptxs asynchronously in LNWallet.sync_with_watchtower
This commit is contained in:
ThomasV
2019-07-05 14:42:09 +02:00
parent f060e53912
commit f7c05f2602
10 changed files with 189 additions and 137 deletions

View File

@@ -41,10 +41,9 @@ class TxMinedDepth(IntEnum):
create_sweep_txs="""
CREATE TABLE IF NOT EXISTS sweep_txs (
funding_outpoint VARCHAR(34) NOT NULL,
"index" INTEGER NOT NULL,
ctn INTEGER NOT NULL,
prevout VARCHAR(34),
tx VARCHAR,
PRIMARY KEY(funding_outpoint, "index")
tx VARCHAR
)"""
create_channel_info="""
@@ -72,13 +71,6 @@ class SweepStore(SqlDB):
c.execute("SELECT tx FROM sweep_txs WHERE funding_outpoint=? AND prevout=?", (funding_outpoint, prevout))
return [Transaction(bh2u(r[0])) for r in c.fetchall()]
@sql
def get_tx_by_index(self, funding_outpoint, index):
c = self.conn.cursor()
c.execute("""SELECT prevout, tx FROM sweep_txs WHERE funding_outpoint=? AND "index"=?""", (funding_outpoint, index))
r = c.fetchone()[0]
return str(r[0]), bh2u(r[1])
@sql
def list_sweep_tx(self):
c = self.conn.cursor()
@@ -86,11 +78,9 @@ class SweepStore(SqlDB):
return set([r[0] for r in c.fetchall()])
@sql
def add_sweep_tx(self, funding_outpoint, prevout, tx):
def add_sweep_tx(self, funding_outpoint, ctn, prevout, tx):
c = self.conn.cursor()
c.execute("SELECT count(*) FROM sweep_txs WHERE funding_outpoint=?", (funding_outpoint,))
n = int(c.fetchone()[0])
c.execute("""INSERT INTO sweep_txs (funding_outpoint, "index", prevout, tx) VALUES (?,?,?,?)""", (funding_outpoint, n, prevout, bfh(str(tx))))
c.execute("""INSERT INTO sweep_txs (funding_outpoint, ctn, prevout, tx) VALUES (?,?,?,?)""", (funding_outpoint, ctn, prevout, bfh(str(tx))))
self.conn.commit()
@sql
@@ -99,14 +89,21 @@ class SweepStore(SqlDB):
c.execute("SELECT count(*) FROM sweep_txs WHERE funding_outpoint=?", (funding_outpoint,))
return int(c.fetchone()[0])
@sql
def get_ctn(self, outpoint, addr):
if not self._has_channel(outpoint):
self._add_channel(outpoint, addr)
c = self.conn.cursor()
c.execute("SELECT max(ctn) FROM sweep_txs WHERE funding_outpoint=?", (outpoint,))
return int(c.fetchone()[0] or 0)
@sql
def remove_sweep_tx(self, funding_outpoint):
c = self.conn.cursor()
c.execute("DELETE FROM sweep_txs WHERE funding_outpoint=?", (funding_outpoint,))
self.conn.commit()
@sql
def add_channel(self, outpoint, address):
def _add_channel(self, outpoint, address):
c = self.conn.cursor()
c.execute("INSERT INTO channel_info (address, outpoint) VALUES (?,?)", (address, outpoint))
self.conn.commit()
@@ -117,8 +114,7 @@ class SweepStore(SqlDB):
c.execute("DELETE FROM channel_info WHERE outpoint=?", (outpoint,))
self.conn.commit()
@sql
def has_channel(self, outpoint):
def _has_channel(self, outpoint):
c = self.conn.cursor()
c.execute("SELECT * FROM channel_info WHERE outpoint=?", (outpoint,))
r = c.fetchone()
@@ -132,9 +128,9 @@ class SweepStore(SqlDB):
return r[0] if r else None
@sql
def list_channel_info(self):
def list_channels(self):
c = self.conn.cursor()
c.execute("SELECT address, outpoint FROM channel_info")
c.execute("SELECT outpoint, address FROM channel_info")
return [(r[0], r[1]) for r in c.fetchall()]
@@ -145,77 +141,22 @@ class LNWatcher(AddressSynchronizer):
def __init__(self, network: 'Network'):
AddressSynchronizer.__init__(self, JsonDB({}, manual_upgrades=False))
self.config = network.config
self.start_network(network)
self.lock = threading.RLock()
self.sweepstore = None
self.channels = {}
if self.config.get('sweepstore', False):
self.sweepstore = SweepStore(os.path.join(network.config.path, "watchtower_db"), network)
self.watchtower = None
if self.config.get('watchtower_url'):
self.set_remote_watchtower()
self.network = network
self.network.register_callback(self.on_network_update,
['network_updated', 'blockchain_updated', 'verified', 'wallet_updated'])
# this maps funding_outpoints to ListenerItems, which have an event for when the watcher is done,
# and a queue for seeing which txs are being published
self.tx_progress = {} # type: Dict[str, ListenerItem]
# status gets populated when we run
self.channel_status = {}
def get_channel_status(self, outpoint):
return self.channel_status.get(outpoint, 'unknown')
def set_remote_watchtower(self):
watchtower_url = self.config.get('watchtower_url')
try:
self.watchtower = jsonrpclib.Server(watchtower_url) if watchtower_url else None
except:
self.watchtower = None
self.watchtower_queue = asyncio.Queue()
def get_num_tx(self, outpoint):
if not self.sweepstore:
return 0
async def f():
return await self.sweepstore.get_num_tx(outpoint)
return self.network.run_from_another_thread(f())
def list_sweep_tx(self):
if not self.sweepstore:
return []
async def f():
return await self.sweepstore.list_sweep_tx()
return self.network.run_from_another_thread(f())
@ignore_exceptions
@log_exceptions
async def watchtower_task(self):
if not self.watchtower:
return
self.logger.info('watchtower task started')
while True:
outpoint, prevout, tx = await self.watchtower_queue.get()
try:
self.watchtower.add_sweep_tx(outpoint, prevout, tx)
self.logger.info("transaction sent to watchtower")
except ConnectionRefusedError:
self.logger.info('could not reach watchtower, will retry in 5s')
await asyncio.sleep(5)
await self.watchtower_queue.put((outpoint, prevout, tx))
def add_channel(self, outpoint, address):
self.add_address(address)
self.channels[address] = outpoint
#if self.sweepstore:
# if not await self.sweepstore.has_channel(outpoint):
# await self.sweepstore.add_channel(outpoint, address)
async def unwatch_channel(self, address, funding_outpoint):
self.logger.info(f'unwatching {funding_outpoint}')
await self.sweepstore.remove_sweep_tx(funding_outpoint)
await self.sweepstore.remove_channel(funding_outpoint)
if funding_outpoint in self.tx_progress:
self.tx_progress[funding_outpoint].all_done.set()
pass
@log_exceptions
async def on_network_update(self, event, *args):
@@ -281,6 +222,44 @@ class LNWatcher(AddressSynchronizer):
result.update(r)
return keep_watching, result
def get_tx_mined_depth(self, txid: str):
if not txid:
return TxMinedDepth.FREE
tx_mined_depth = self.get_tx_height(txid)
height, conf = tx_mined_depth.height, tx_mined_depth.conf
if conf > 100:
return TxMinedDepth.DEEP
elif conf > 0:
return TxMinedDepth.SHALLOW
elif height in (TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT):
return TxMinedDepth.MEMPOOL
elif height == TX_HEIGHT_LOCAL:
return TxMinedDepth.FREE
elif height > 0 and conf == 0:
# unverified but claimed to be mined
return TxMinedDepth.MEMPOOL
else:
raise NotImplementedError()
class WatchTower(LNWatcher):
verbosity_filter = 'W'
def __init__(self, network):
LNWatcher.__init__(self, network)
self.network = network
self.sweepstore = SweepStore(os.path.join(self.network.config.path, "watchtower_db"), network)
# this maps funding_outpoints to ListenerItems, which have an event for when the watcher is done,
# and a queue for seeing which txs are being published
self.tx_progress = {} # type: Dict[str, ListenerItem]
async def start_watching(self):
# I need to watch the addresses from sweepstore
l = await self.sweepstore.list_channels()
for outpoint, address in l:
self.add_channel(outpoint, address)
async def do_breach_remedy(self, funding_outpoint, spenders):
for prevout, spender in spenders.items():
if spender is not None:
@@ -303,27 +282,34 @@ class LNWatcher(AddressSynchronizer):
await self.tx_progress[funding_outpoint].tx_queue.put(tx)
return txid
async def add_sweep_tx(self, funding_outpoint: str, prevout: str, tx: str):
if self.sweepstore:
await self.sweepstore.add_sweep_tx(funding_outpoint, prevout, tx)
if self.watchtower:
self.watchtower_queue.put_nowait(funding_outpoint, prevout, tx)
def get_ctn(self, outpoint, addr):
async def f():
return await self.sweepstore.get_ctn(outpoint, addr)
return self.network.run_from_another_thread(f())
def get_tx_mined_depth(self, txid: str):
if not txid:
return TxMinedDepth.FREE
tx_mined_depth = self.get_tx_height(txid)
height, conf = tx_mined_depth.height, tx_mined_depth.conf
if conf > 100:
return TxMinedDepth.DEEP
elif conf > 0:
return TxMinedDepth.SHALLOW
elif height in (TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT):
return TxMinedDepth.MEMPOOL
elif height == TX_HEIGHT_LOCAL:
return TxMinedDepth.FREE
elif height > 0 and conf == 0:
# unverified but claimed to be mined
return TxMinedDepth.MEMPOOL
else:
raise NotImplementedError()
def get_num_tx(self, outpoint):
async def f():
return await self.sweepstore.get_num_tx(outpoint)
return self.network.run_from_another_thread(f())
def add_sweep_tx(self, funding_outpoint: str, address:str, ctn:int, prevout: str, tx: str):
async def f():
return await self.sweepstore.add_sweep_tx(funding_outpoint, ctn, prevout, tx)
return self.network.run_from_another_thread(f())
def list_sweep_tx(self):
async def f():
return await self.sweepstore.list_sweep_tx()
return self.network.run_from_another_thread(f())
def list_channels(self):
async def f():
return await self.sweepstore.list_channels()
return self.network.run_from_another_thread(f())
async def unwatch_channel(self, address, funding_outpoint):
self.logger.info(f'unwatching {funding_outpoint}')
await self.sweepstore.remove_sweep_tx(funding_outpoint)
await self.sweepstore.remove_channel(funding_outpoint)
if funding_outpoint in self.tx_progress:
self.tx_progress[funding_outpoint].all_done.set()