diff --git a/electrum/commands.py b/electrum/commands.py index a566e7be6..dab95477f 100644 --- a/electrum/commands.py +++ b/electrum/commands.py @@ -1298,7 +1298,7 @@ class Commands: @command('wnl') async def get_watchtower_ctn(self, channel_point, wallet: Abstract_Wallet = None): """ return the local watchtower's ctn of channel. used in regtests """ - return await self.network.local_watchtower.sweepstore.get_ctn(channel_point, None) + return wallet.lnworker.get_watchtower_ctn(channel_point) @command('wnpl') async def rebalance_channels(self, from_scid, dest_scid, amount, password=None, wallet: Abstract_Wallet = None): diff --git a/electrum/daemon.py b/electrum/daemon.py index 46fe96a84..825f2b058 100644 --- a/electrum/daemon.py +++ b/electrum/daemon.py @@ -362,34 +362,6 @@ class CommandsServer(AuthenticatedServer): return result -class WatchTowerServer(AuthenticatedServer): - - def __init__(self, network: 'Network', port:int): - self.port = port - self.config = network.config - self.network = network - watchtower_user = self.config.WATCHTOWER_SERVER_USER or "" - watchtower_password = self.config.WATCHTOWER_SERVER_PASSWORD or "" - AuthenticatedServer.__init__(self, watchtower_user, watchtower_password) - self.lnwatcher = network.local_watchtower - self.app = web.Application() - self.app.router.add_post("/", self.handle) - self.register_method(self.get_ctn) - self.register_method(self.add_sweep_tx) - - async def run(self): - self.runner = web.AppRunner(self.app) - await self.runner.setup() - site = web.TCPSite(self.runner, host='localhost', port=self.port) - await site.start() - self.logger.info(f"running and listening on port {self.port}") - - async def get_ctn(self, *args): - return await self.lnwatcher.get_ctn(*args) - - async def add_sweep_tx(self, *args): - return await self.lnwatcher.sweepstore.add_sweep_tx(*args) - @@ -397,7 +369,6 @@ class Daemon(Logger): network: Optional[Network] = None gui_object: Optional['gui.BaseElectrumGui'] = None - watchtower: Optional['WatchTowerServer'] = None @profiler def __init__( @@ -460,11 +431,6 @@ class Daemon(Logger): self.logger.info(f"starting network.") assert not self.config.NETWORK_OFFLINE assert self.network - # server-side watchtower - if watchtower_port := self.config.WATCHTOWER_SERVER_PORT: - self.watchtower = WatchTowerServer(self.network, watchtower_port) - asyncio.run_coroutine_threadsafe(self.taskgroup.spawn(self.watchtower.run), self.asyncio_loop) - self.network.start(jobs=[self.fx.run]) # prepare lightning functionality, also load channel db early if self.config.LIGHTNING_USE_GOSSIP: diff --git a/electrum/gui/qt/__init__.py b/electrum/gui/qt/__init__.py index 7a5067471..1e3468215 100644 --- a/electrum/gui/qt/__init__.py +++ b/electrum/gui/qt/__init__.py @@ -81,7 +81,6 @@ from .main_window import ElectrumWindow from .network_dialog import NetworkDialog from .stylesheet_patcher import patch_qt_stylesheet from .lightning_dialog import LightningDialog -from .watchtower_dialog import WatchtowerDialog from .exception_window import Exception_Hook from .wizard.server_connect import QEServerConnectWizard from .wizard.wallet import QENewWalletWizard @@ -118,7 +117,6 @@ class ElectrumGui(BaseElectrumGui, Logger): network_dialog: Optional['NetworkDialog'] lightning_dialog: Optional['LightningDialog'] - watchtower_dialog: Optional['WatchtowerDialog'] @profiler def __init__(self, *, config: 'SimpleConfig', daemon: 'Daemon', plugins: 'Plugins'): @@ -150,7 +148,6 @@ class ElectrumGui(BaseElectrumGui, Logger): self.network_dialog = None self.lightning_dialog = None - self.watchtower_dialog = None self._num_wizards_in_progress = 0 self._num_wizards_lock = threading.Lock() self.dark_icon = self.config.GUI_QT_DARK_TRAY_ICON @@ -216,8 +213,6 @@ class ElectrumGui(BaseElectrumGui, Logger): m.addAction(_("Network"), self.show_network_dialog) if network and network.lngossip: m.addAction(_("Lightning Network"), self.show_lightning_dialog) - if network and network.local_watchtower: - m.addAction(_("Local Watchtower"), self.show_watchtower_dialog) for window in self.windows: name = window.wallet.basename() submenu = m.addMenu(name) @@ -267,9 +262,6 @@ class ElectrumGui(BaseElectrumGui, Logger): if self.lightning_dialog: self.lightning_dialog.close() self.lightning_dialog = None - if self.watchtower_dialog: - self.watchtower_dialog.close() - self.watchtower_dialog = None # Shut down the timer cleanly self.timer.stop() self.timer = None @@ -303,11 +295,6 @@ class ElectrumGui(BaseElectrumGui, Logger): self.lightning_dialog = LightningDialog(self) self.lightning_dialog.bring_to_top() - def show_watchtower_dialog(self): - if not self.watchtower_dialog: - self.watchtower_dialog = WatchtowerDialog(self) - self.watchtower_dialog.bring_to_top() - def show_network_dialog(self): if self.network_dialog: self.network_dialog.on_event_network_updated() diff --git a/electrum/gui/qt/main_window.py b/electrum/gui/qt/main_window.py index ebb47358f..d5e873558 100644 --- a/electrum/gui/qt/main_window.py +++ b/electrum/gui/qt/main_window.py @@ -752,8 +752,6 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger, QtEventListener): tools_menu.addAction(_("Electrum preferences"), self.settings_dialog) tools_menu.addAction(_("&Network"), self.gui_object.show_network_dialog).setEnabled(bool(self.network)) - if self.network and self.network.local_watchtower: - tools_menu.addAction(_("Local &Watchtower"), self.gui_object.show_watchtower_dialog) tools_menu.addAction(_("&Plugins"), self.plugins_dialog) tools_menu.addSeparator() tools_menu.addAction(_("&Sign/verify message"), self.sign_verify_message) diff --git a/electrum/gui/qt/watchtower_dialog.py b/electrum/gui/qt/watchtower_dialog.py deleted file mode 100644 index eb8ccc95c..000000000 --- a/electrum/gui/qt/watchtower_dialog.py +++ /dev/null @@ -1,118 +0,0 @@ -#!/usr/bin/env python -# -# Electrum - lightweight Bitcoin client -# Copyright (C) 2012 thomasv@gitorious -# -# Permission is hereby granted, free of charge, to any person -# obtaining a copy of this software and associated documentation files -# (the "Software"), to deal in the Software without restriction, -# including without limitation the rights to use, copy, modify, merge, -# publish, distribute, sublicense, and/or sell copies of the Software, -# and to permit persons to whom the Software is furnished to do so, -# subject to the following conditions: -# -# The above copyright notice and this permission notice shall be -# included in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS -# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN -# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN -# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. - -import enum - -from PyQt6.QtGui import QStandardItemModel, QStandardItem -from PyQt6.QtCore import Qt -from PyQt6.QtWidgets import (QDialog, QVBoxLayout, QPushButton, QLabel) - -from electrum.i18n import _ -from .util import Buttons -from .my_treeview import MyTreeView - - -class WatcherList(MyTreeView): - - class Columns(MyTreeView.BaseColumnsEnum): - OUTPOINT = enum.auto() - TX_COUNT = enum.auto() - STATUS = enum.auto() - - headers = { - Columns.OUTPOINT: _('Outpoint'), - Columns.TX_COUNT: _('Tx'), - Columns.STATUS: _('Status'), - } - - def __init__(self, parent: 'WatchtowerDialog'): - super().__init__( - parent=parent, - stretch_column=self.Columns.OUTPOINT, - ) - self.parent = parent - self.setModel(QStandardItemModel(self)) - self.setSortingEnabled(True) - self.update() - - def update(self): - if self.parent.lnwatcher is None: - return - self.model().clear() - self.update_headers(self.__class__.headers) - lnwatcher = self.parent.lnwatcher - l = lnwatcher.list_sweep_tx() - for outpoint in l: - n = lnwatcher.get_num_tx(outpoint) - status = lnwatcher.get_channel_status(outpoint) - labels = [""] * len(self.Columns) - labels[self.Columns.OUTPOINT] = outpoint - labels[self.Columns.TX_COUNT] = str(n) - labels[self.Columns.STATUS] = status - items = [QStandardItem(e) for e in labels] - self.set_editability(items) - self.model().insertRow(self.model().rowCount(), items) - size = lnwatcher.sweepstore.filesize() - self.parent.size_label.setText('Database size: %.2f Mb'%(size/1024/1024.)) - - -class WatchtowerDialog(QDialog): - - def __init__(self, gui_object): - QDialog.__init__(self) - self.gui_object = gui_object - self.config = gui_object.config - self.network = gui_object.daemon.network - assert self.network - self.lnwatcher = self.network.local_watchtower - self.setWindowTitle(_('Watchtower')) - self.setMinimumSize(600, 200) - self.size_label = QLabel() - self.watcher_list = WatcherList(self) - - vbox = QVBoxLayout(self) - vbox.addWidget(self.size_label) - vbox.addWidget(self.watcher_list) - b = QPushButton(_('Close')) - b.clicked.connect(self.close) - vbox.addLayout(Buttons(b)) - self.watcher_list.update() - - def is_hidden(self): - return self.isMinimized() or self.isHidden() - - def show_or_hide(self): - if self.is_hidden(): - self.bring_to_top() - else: - self.hide() - - def bring_to_top(self): - self.show() - self.raise_() - - def closeEvent(self, event): - self.gui_object.watchtower_dialog = None - event.accept() diff --git a/electrum/lnwatcher.py b/electrum/lnwatcher.py index 3be963a1a..ce3db8911 100644 --- a/electrum/lnwatcher.py +++ b/electrum/lnwatcher.py @@ -10,13 +10,10 @@ from enum import IntEnum, auto from typing import NamedTuple, Dict from . import util -from .sql_db import SqlDB, sql from .wallet_db import WalletDB from .util import bfh, log_exceptions, ignore_exceptions, TxMinedInfo, random_shuffled_copy from .address_synchronizer import AddressSynchronizer, TX_HEIGHT_LOCAL, TX_HEIGHT_UNCONF_PARENT, TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_FUTURE from .transaction import Transaction, TxOutpoint, PartialTransaction -from .transaction import match_script_against_template -from .lnutil import WITNESS_TEMPLATE_RECEIVED_HTLC, WITNESS_TEMPLATE_OFFERED_HTLC from .logging import Logger @@ -39,101 +36,6 @@ class TxMinedDepth(IntEnum): FREE = auto() -create_sweep_txs=""" -CREATE TABLE IF NOT EXISTS sweep_txs ( -funding_outpoint VARCHAR(34) NOT NULL, -ctn INTEGER NOT NULL, -prevout VARCHAR(34), -tx VARCHAR -)""" - -create_channel_info=""" -CREATE TABLE IF NOT EXISTS channel_info ( -outpoint VARCHAR(34) NOT NULL, -address VARCHAR(32), -PRIMARY KEY(outpoint) -)""" - - -class SweepStore(SqlDB): - - def __init__(self, path, network): - super().__init__(network.asyncio_loop, path) - - def create_database(self): - c = self.conn.cursor() - c.execute(create_channel_info) - c.execute(create_sweep_txs) - self.conn.commit() - - @sql - def get_sweep_tx(self, funding_outpoint, prevout): - c = self.conn.cursor() - c.execute("SELECT tx FROM sweep_txs WHERE funding_outpoint=? AND prevout=?", (funding_outpoint, prevout)) - return [Transaction(r[0].hex()) for r in c.fetchall()] - - @sql - def list_sweep_tx(self): - c = self.conn.cursor() - c.execute("SELECT funding_outpoint FROM sweep_txs") - return set([r[0] for r in c.fetchall()]) - - @sql - def add_sweep_tx(self, funding_outpoint, ctn, prevout, raw_tx): - c = self.conn.cursor() - assert Transaction(raw_tx).is_complete() - c.execute("""INSERT INTO sweep_txs (funding_outpoint, ctn, prevout, tx) VALUES (?,?,?,?)""", (funding_outpoint, ctn, prevout, bfh(raw_tx))) - self.conn.commit() - - @sql - def get_num_tx(self, funding_outpoint): - c = self.conn.cursor() - c.execute("SELECT count(*) FROM sweep_txs WHERE funding_outpoint=?", (funding_outpoint,)) - return int(c.fetchone()[0]) - - @sql - def get_ctn(self, outpoint, addr): - if not self._has_channel(outpoint): - self._add_channel(outpoint, addr) - c = self.conn.cursor() - c.execute("SELECT max(ctn) FROM sweep_txs WHERE funding_outpoint=?", (outpoint,)) - return int(c.fetchone()[0] or 0) - - @sql - def remove_sweep_tx(self, funding_outpoint): - c = self.conn.cursor() - c.execute("DELETE FROM sweep_txs WHERE funding_outpoint=?", (funding_outpoint,)) - self.conn.commit() - - def _add_channel(self, outpoint, address): - c = self.conn.cursor() - c.execute("INSERT INTO channel_info (address, outpoint) VALUES (?,?)", (address, outpoint)) - self.conn.commit() - - @sql - def remove_channel(self, outpoint): - c = self.conn.cursor() - c.execute("DELETE FROM channel_info WHERE outpoint=?", (outpoint,)) - self.conn.commit() - - def _has_channel(self, outpoint): - c = self.conn.cursor() - c.execute("SELECT * FROM channel_info WHERE outpoint=?", (outpoint,)) - r = c.fetchone() - return r is not None - - @sql - def get_address(self, outpoint): - c = self.conn.cursor() - c.execute("SELECT address FROM channel_info WHERE outpoint=?", (outpoint,)) - r = c.fetchone() - return r[0] if r else None - - @sql - def list_channels(self): - c = self.conn.cursor() - c.execute("SELECT outpoint, address FROM channel_info") - return [(r[0], r[1]) for r in c.fetchall()] from .util import EventListener, event_listener @@ -256,58 +158,6 @@ class LNWatcher(Logger, EventListener): self.adb.add_address(o.address) return spender_txid - def inspect_tx_candidate(self, outpoint, n: int) -> Dict[str, str]: - """ - returns a dict of spenders for a transaction of interest. - subscribes to addresses as a side effect. - n==0 => outpoint is a channel funding. - n==1 => outpoint is a commitment or close output: to_local, to_remote or first-stage htlc - n==2 => outpoint is a second-stage htlc - """ - prev_txid, index = outpoint.split(':') - spender_txid = self.adb.db.get_spent_outpoint(prev_txid, int(index)) - result = {outpoint:spender_txid} - if n == 0: - if spender_txid is None: - self.channel_status[outpoint] = 'open' - elif not self.is_deeply_mined(spender_txid): - self.channel_status[outpoint] = 'closed (%d)' % self.adb.get_tx_height(spender_txid).conf - else: - self.channel_status[outpoint] = 'closed (deep)' - if spender_txid is None: - return result - spender_tx = self.adb.get_transaction(spender_txid) - if n == 1: - # if tx input is not a first-stage HTLC, we can stop recursion - # FIXME: this is not true for anchor channels - if len(spender_tx.inputs()) != 1: - return result - o = spender_tx.inputs()[0] - witness = o.witness_elements() - if not witness: - # This can happen if spender_tx is a local unsigned tx in the wallet history, e.g.: - # channel is coop-closed, outpoint is for our coop-close output, and spender_tx is an - # arbitrary wallet-spend. - return result - redeem_script = witness[-1] - if match_script_against_template(redeem_script, WITNESS_TEMPLATE_OFFERED_HTLC): - #self.logger.info(f"input script matches offered htlc {redeem_script.hex()}") - pass - elif match_script_against_template(redeem_script, WITNESS_TEMPLATE_RECEIVED_HTLC): - #self.logger.info(f"input script matches received htlc {redeem_script.hex()}") - pass - else: - return result - for i, o in enumerate(spender_tx.outputs()): - if o.address is None: - continue - if not self.adb.is_mine(o.address): - self.adb.add_address(o.address) - elif n < 2: - r = self.inspect_tx_candidate(spender_txid+':%d'%i, n+1) - result.update(r) - return result - def get_tx_mined_depth(self, txid: str): if not txid: return TxMinedDepth.FREE @@ -331,92 +181,6 @@ class LNWatcher(Logger, EventListener): return self.get_tx_mined_depth(txid) == TxMinedDepth.DEEP -class WatchTower(LNWatcher): - - LOGGING_SHORTCUT = 'W' - - def __init__(self, network: 'Network'): - adb = AddressSynchronizer(WalletDB('', storage=None, upgrade=True), network.config, name=self.diagnostic_name()) - adb.start_network(network) - LNWatcher.__init__(self, adb, network) - self.network = network - self.sweepstore = SweepStore(os.path.join(self.network.config.path, "watchtower_db"), network) - # this maps funding_outpoints to ListenerItems, which have an event for when the watcher is done, - # and a queue for seeing which txs are being published - self.tx_progress = {} # type: Dict[str, ListenerItem] - - async def stop(self): - await super().stop() - await self.adb.stop() - - def diagnostic_name(self): - return "local_tower" - - async def start_watching(self): - # I need to watch the addresses from sweepstore - lst = await self.sweepstore.list_channels() - for outpoint, address in random_shuffled_copy(lst): - self.add_channel(outpoint, address) - - async def sweep_commitment_transaction(self, funding_outpoint, closing_tx): - spenders = self.inspect_tx_candidate(funding_outpoint, 0) - keep_watching = False - for prevout, spender in spenders.items(): - if spender is not None: - keep_watching |= not self.is_deeply_mined(spender) - continue - sweep_txns = await self.sweepstore.get_sweep_tx(funding_outpoint, prevout) - for tx in sweep_txns: - await self.broadcast_or_log(funding_outpoint, tx) - keep_watching = True - return keep_watching - - async def broadcast_or_log(self, funding_outpoint: str, tx: Transaction): - height = self.adb.get_tx_height(tx.txid()).height - if height != TX_HEIGHT_LOCAL: - return - try: - txid = await self.network.broadcast_transaction(tx) - except Exception as e: - self.logger.info(f'broadcast failure: txid={tx.txid()}, funding_outpoint={funding_outpoint}: {repr(e)}') - else: - self.logger.info(f'broadcast success: txid={tx.txid()}, funding_outpoint={funding_outpoint}') - if funding_outpoint in self.tx_progress: - await self.tx_progress[funding_outpoint].tx_queue.put(tx) - return txid - - async def get_ctn(self, outpoint, addr): - if addr not in self.callbacks.keys(): - self.logger.info(f'watching new channel: {outpoint} {addr}') - self.add_channel(outpoint, addr) - return await self.sweepstore.get_ctn(outpoint, addr) - - def get_num_tx(self, outpoint): - async def f(): - return await self.sweepstore.get_num_tx(outpoint) - return self.network.run_from_another_thread(f()) - - def list_sweep_tx(self): - async def f(): - return await self.sweepstore.list_sweep_tx() - return self.network.run_from_another_thread(f()) - - def list_channels(self): - async def f(): - return await self.sweepstore.list_channels() - return self.network.run_from_another_thread(f()) - - async def unwatch_channel(self, address, funding_outpoint): - await super().unwatch_channel(address, funding_outpoint) - await self.sweepstore.remove_sweep_tx(funding_outpoint) - await self.sweepstore.remove_channel(funding_outpoint) - if funding_outpoint in self.tx_progress: - self.tx_progress[funding_outpoint].all_done.set() - - async def update_channel_state(self, *args, **kwargs): - pass - - class LNWalletWatcher(LNWatcher): diff --git a/electrum/lnworker.py b/electrum/lnworker.py index 6b67ff867..c384a2ebd 100644 --- a/electrum/lnworker.py +++ b/electrum/lnworker.py @@ -923,19 +923,10 @@ class LNWallet(LNWorker): def diagnostic_name(self): return self.wallet.diagnostic_name() - @ignore_exceptions - @log_exceptions - async def sync_with_local_watchtower(self): - watchtower = self.network.local_watchtower - if watchtower: - while True: - for chan in self.channels.values(): - await self.sync_channel_with_watchtower(chan, watchtower.sweepstore) - await asyncio.sleep(5) - @ignore_exceptions @log_exceptions async def sync_with_remote_watchtower(self): + self.watchtower_ctns = {} while True: # periodically poll if the user updated 'watchtower_url' await asyncio.sleep(5) @@ -958,6 +949,9 @@ class LNWallet(LNWorker): except aiohttp.client_exceptions.ClientConnectorError: self.logger.info(f'could not contact remote watchtower {watchtower_url}') + def get_watchtower_ctn(self, channel_point): + return self.watchtower_ctns.get(channel_point) + async def sync_channel_with_watchtower(self, chan: Channel, watchtower): outpoint = chan.funding_outpoint.to_str() addr = chan.get_funding_address() @@ -967,6 +961,7 @@ class LNWallet(LNWorker): sweeptxs = chan.create_sweeptxs_for_watchtower(ctn) for tx in sweeptxs: await watchtower.add_sweep_tx(outpoint, ctn, tx.inputs()[0].prevout.to_str(), tx.serialize()) + self.watchtower_ctns[outpoint] = ctn def start_network(self, network: 'Network'): super().start_network(network) @@ -985,7 +980,6 @@ class LNWallet(LNWorker): self.maybe_listen(), self.lnwatcher.trigger_callbacks(), # shortcut (don't block) if funding tx locked and verified self.reestablish_peers_and_channels(), - self.sync_with_local_watchtower(), self.sync_with_remote_watchtower(), ]: tg_coro = self.taskgroup.spawn(coro) diff --git a/electrum/network.py b/electrum/network.py index e0a0ff24a..2b10b6ad3 100644 --- a/electrum/network.py +++ b/electrum/network.py @@ -68,7 +68,7 @@ if TYPE_CHECKING: from .channel_db import ChannelDB from .lnrouter import LNPathFinder from .lnworker import LNGossip - from .lnwatcher import WatchTower + #from .lnwatcher import WatchTower from .daemon import Daemon from .simple_config import SimpleConfig @@ -290,7 +290,6 @@ class Network(Logger, NetworkRetryManager[ServerAddr]): channel_db: Optional['ChannelDB'] = None lngossip: Optional['LNGossip'] = None - local_watchtower: Optional['WatchTower'] = None path_finder: Optional['LNPathFinder'] = None def __init__(self, config: 'SimpleConfig', *, daemon: 'Daemon' = None): @@ -364,11 +363,6 @@ class Network(Logger, NetworkRetryManager[ServerAddr]): self._has_ever_managed_to_connect_to_server = False self._was_started = False - # lightning network - if self.config.WATCHTOWER_SERVER_ENABLED: - from . import lnwatcher - self.local_watchtower = lnwatcher.WatchTower(self) - asyncio.ensure_future(self.local_watchtower.start_watching()) def has_internet_connection(self) -> bool: """Our guess whether the device has Internet-connectivity.""" diff --git a/electrum/plugins/watchtower/__init__.py b/electrum/plugins/watchtower/__init__.py new file mode 100644 index 000000000..8cc2c72e0 --- /dev/null +++ b/electrum/plugins/watchtower/__init__.py @@ -0,0 +1,17 @@ +from electrum.i18n import _ + +fullname = _('Watchtower') +description = """ +Watchtower for Electrum. + +Example setup: + + electrum -o setconfig enable_plugin_watchtower True + electrum -o setconfig watchtower_user wtuser + electrum -o setconfig watchtower_password wtpassword + electrum -o setconfig watchtower_port 12345 + electrum daemon -v + +""" + +available_for = ['cmdline'] diff --git a/electrum/plugins/watchtower/cmdline.py b/electrum/plugins/watchtower/cmdline.py new file mode 100644 index 000000000..90981ba06 --- /dev/null +++ b/electrum/plugins/watchtower/cmdline.py @@ -0,0 +1,31 @@ +#!/usr/bin/env python +# +# Electrum - Lightweight Bitcoin Client +# Copyright (C) 2023 The Electrum Developers +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation files +# (the "Software"), to deal in the Software without restriction, +# including without limitation the rights to use, copy, modify, merge, +# publish, distribute, sublicense, and/or sell copies of the Software, +# and to permit persons to whom the Software is furnished to do so, +# subject to the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS +# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN +# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + + +from .watchtower import WatchtowerPlugin + +class Plugin(WatchtowerPlugin): + pass + diff --git a/electrum/plugins/watchtower/server.py b/electrum/plugins/watchtower/server.py new file mode 100644 index 000000000..172a0bff1 --- /dev/null +++ b/electrum/plugins/watchtower/server.py @@ -0,0 +1,46 @@ +import os +import asyncio +from collections import defaultdict +from typing import TYPE_CHECKING + +from aiohttp import web + +from electrum.util import log_exceptions, ignore_exceptions +from electrum.logging import Logger +from electrum.util import EventListener +from electrum.lnaddr import lndecode +from electrum.daemon import AuthenticatedServer + + +if TYPE_CHECKING: + from electrum.network import Network + + +class WatchTowerServer(AuthenticatedServer): + + def __init__(self, watchtower, network: 'Network', port:int): + self.port = port + self.config = network.config + self.network = network + watchtower_user = self.config.WATCHTOWER_SERVER_USER or "" + watchtower_password = self.config.WATCHTOWER_SERVER_PASSWORD or "" + AuthenticatedServer.__init__(self, watchtower_user, watchtower_password) + self.lnwatcher = watchtower + self.app = web.Application() + self.app.router.add_post("/", self.handle) + self.register_method(self.get_ctn) + self.register_method(self.add_sweep_tx) + + async def run(self): + self.runner = web.AppRunner(self.app) + await self.runner.setup() + site = web.TCPSite(self.runner, host='localhost', port=self.port) + await site.start() + self.logger.info(f"running and listening on port {self.port}") + + async def get_ctn(self, *args): + return await self.lnwatcher.get_ctn(*args) + + async def add_sweep_tx(self, *args): + return await self.lnwatcher.sweepstore.add_sweep_tx(*args) + diff --git a/electrum/plugins/watchtower/watchtower.py b/electrum/plugins/watchtower/watchtower.py new file mode 100644 index 000000000..778314d8d --- /dev/null +++ b/electrum/plugins/watchtower/watchtower.py @@ -0,0 +1,299 @@ +#!/usr/bin/env python +# +# Electrum - Lightweight Bitcoin Client +# Copyright (C) 2023 The Electrum Developers +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation files +# (the "Software"), to deal in the Software without restriction, +# including without limitation the rights to use, copy, modify, merge, +# publish, distribute, sublicense, and/or sell copies of the Software, +# and to permit persons to whom the Software is furnished to do so, +# subject to the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS +# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN +# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + + +import asyncio, os +from typing import TYPE_CHECKING +from typing import NamedTuple, Dict + +from electrum.util import log_exceptions, random_shuffled_copy +from electrum.plugin import BasePlugin, hook +from electrum.sql_db import SqlDB, sql +from electrum.lnwatcher import LNWatcher +from electrum.transaction import Transaction, match_script_against_template +from electrum.network import Network +from electrum.address_synchronizer import AddressSynchronizer, TX_HEIGHT_LOCAL +from electrum.wallet_db import WalletDB +from electrum.lnutil import WITNESS_TEMPLATE_RECEIVED_HTLC, WITNESS_TEMPLATE_OFFERED_HTLC + +from .server import WatchTowerServer + +if TYPE_CHECKING: + from electrum.simple_config import SimpleConfig + + +class WatchtowerPlugin(BasePlugin): + + def __init__(self, parent, config: 'SimpleConfig', name): + BasePlugin.__init__(self, parent, config, name) + self.config = config + self.network = Network.get_instance() + if self.network is None: + return + + self.watchtower = WatchTower(self.network) + asyncio.ensure_future(self.watchtower.start_watching()) + if watchtower_port := self.config.WATCHTOWER_SERVER_PORT: + self.server = WatchTowerServer(self.watchtower, self.network, watchtower_port) + asyncio.run_coroutine_threadsafe(self.network.taskgroup.spawn(self.server.run), self.network.asyncio_loop) + + +class WatchTower(LNWatcher): + + LOGGING_SHORTCUT = 'W' + + def __init__(self, network: 'Network'): + adb = AddressSynchronizer(WalletDB('', storage=None, upgrade=True), network.config, name=self.diagnostic_name()) + adb.start_network(network) + LNWatcher.__init__(self, adb, network) + self.network = network + self.sweepstore = SweepStore(os.path.join(self.network.config.path, "watchtower_db"), network) + # this maps funding_outpoints to ListenerItems, which have an event for when the watcher is done, + # and a queue for seeing which txs are being published + self.tx_progress = {} # type: Dict[str, ListenerItem] + + async def stop(self): + await super().stop() + await self.adb.stop() + + def diagnostic_name(self): + return "local_tower" + + @log_exceptions + async def start_watching(self): + # I need to watch the addresses from sweepstore + lst = await self.sweepstore.list_channels() + for outpoint, address in random_shuffled_copy(lst): + self.add_channel(outpoint, address) + + def inspect_tx_candidate(self, outpoint, n: int) -> Dict[str, str]: + """ + returns a dict of spenders for a transaction of interest. + subscribes to addresses as a side effect. + n==0 => outpoint is a channel funding. + n==1 => outpoint is a commitment or close output: to_local, to_remote or first-stage htlc + n==2 => outpoint is a second-stage htlc + """ + prev_txid, index = outpoint.split(':') + spender_txid = self.adb.db.get_spent_outpoint(prev_txid, int(index)) + result = {outpoint:spender_txid} + if n == 0: + if spender_txid is None: + self.channel_status[outpoint] = 'open' + elif not self.is_deeply_mined(spender_txid): + self.channel_status[outpoint] = 'closed (%d)' % self.adb.get_tx_height(spender_txid).conf + else: + self.channel_status[outpoint] = 'closed (deep)' + if spender_txid is None: + return result + spender_tx = self.adb.get_transaction(spender_txid) + if n == 1: + # if tx input is not a first-stage HTLC, we can stop recursion + # FIXME: this is not true for anchor channels + if len(spender_tx.inputs()) != 1: + return result + o = spender_tx.inputs()[0] + witness = o.witness_elements() + if not witness: + # This can happen if spender_tx is a local unsigned tx in the wallet history, e.g.: + # channel is coop-closed, outpoint is for our coop-close output, and spender_tx is an + # arbitrary wallet-spend. + return result + redeem_script = witness[-1] + if match_script_against_template(redeem_script, WITNESS_TEMPLATE_OFFERED_HTLC): + #self.logger.info(f"input script matches offered htlc {redeem_script.hex()}") + pass + elif match_script_against_template(redeem_script, WITNESS_TEMPLATE_RECEIVED_HTLC): + #self.logger.info(f"input script matches received htlc {redeem_script.hex()}") + pass + else: + return result + for i, o in enumerate(spender_tx.outputs()): + if o.address is None: + continue + if not self.adb.is_mine(o.address): + self.adb.add_address(o.address) + elif n < 2: + r = self.inspect_tx_candidate(spender_txid+':%d'%i, n+1) + result.update(r) + return result + + async def sweep_commitment_transaction(self, funding_outpoint, closing_tx): + spenders = self.inspect_tx_candidate(funding_outpoint, 0) + keep_watching = False + for prevout, spender in spenders.items(): + if spender is not None: + keep_watching |= not self.is_deeply_mined(spender) + continue + sweep_txns = await self.sweepstore.get_sweep_tx(funding_outpoint, prevout) + for tx in sweep_txns: + await self.broadcast_or_log(funding_outpoint, tx) + keep_watching = True + return keep_watching + + async def broadcast_or_log(self, funding_outpoint: str, tx: Transaction): + height = self.adb.get_tx_height(tx.txid()).height + if height != TX_HEIGHT_LOCAL: + return + try: + txid = await self.network.broadcast_transaction(tx) + except Exception as e: + self.logger.info(f'broadcast failure: txid={tx.txid()}, funding_outpoint={funding_outpoint}: {repr(e)}') + else: + self.logger.info(f'broadcast success: txid={tx.txid()}, funding_outpoint={funding_outpoint}') + if funding_outpoint in self.tx_progress: + await self.tx_progress[funding_outpoint].tx_queue.put(tx) + return txid + + async def get_ctn(self, outpoint, addr): + if addr not in self.callbacks.keys(): + self.logger.info(f'watching new channel: {outpoint} {addr}') + self.add_channel(outpoint, addr) + return await self.sweepstore.get_ctn(outpoint, addr) + + def get_num_tx(self, outpoint): + async def f(): + return await self.sweepstore.get_num_tx(outpoint) + return self.network.run_from_another_thread(f()) + + def list_sweep_tx(self): + async def f(): + return await self.sweepstore.list_sweep_tx() + return self.network.run_from_another_thread(f()) + + def list_channels(self): + async def f(): + return await self.sweepstore.list_channels() + return self.network.run_from_another_thread(f()) + + async def unwatch_channel(self, address, funding_outpoint): + await super().unwatch_channel(address, funding_outpoint) + await self.sweepstore.remove_sweep_tx(funding_outpoint) + await self.sweepstore.remove_channel(funding_outpoint) + if funding_outpoint in self.tx_progress: + self.tx_progress[funding_outpoint].all_done.set() + + async def update_channel_state(self, *args, **kwargs): + pass + + + +create_sweep_txs=""" +CREATE TABLE IF NOT EXISTS sweep_txs ( +funding_outpoint VARCHAR(34) NOT NULL, +ctn INTEGER NOT NULL, +prevout VARCHAR(34), +tx VARCHAR +)""" + +create_channel_info=""" +CREATE TABLE IF NOT EXISTS channel_info ( +outpoint VARCHAR(34) NOT NULL, +address VARCHAR(32), +PRIMARY KEY(outpoint) +)""" + + +class SweepStore(SqlDB): + + def __init__(self, path, network): + super().__init__(network.asyncio_loop, path) + + def create_database(self): + c = self.conn.cursor() + c.execute(create_channel_info) + c.execute(create_sweep_txs) + self.conn.commit() + + @sql + def get_sweep_tx(self, funding_outpoint, prevout): + c = self.conn.cursor() + c.execute("SELECT tx FROM sweep_txs WHERE funding_outpoint=? AND prevout=?", (funding_outpoint, prevout)) + return [Transaction(r[0].hex()) for r in c.fetchall()] + + @sql + def list_sweep_tx(self): + c = self.conn.cursor() + c.execute("SELECT funding_outpoint FROM sweep_txs") + return set([r[0] for r in c.fetchall()]) + + @sql + def add_sweep_tx(self, funding_outpoint, ctn, prevout, raw_tx): + c = self.conn.cursor() + assert Transaction(raw_tx).is_complete() + c.execute("""INSERT INTO sweep_txs (funding_outpoint, ctn, prevout, tx) VALUES (?,?,?,?)""", (funding_outpoint, ctn, prevout, bytes.fromhex(raw_tx))) + self.conn.commit() + + @sql + def get_num_tx(self, funding_outpoint): + c = self.conn.cursor() + c.execute("SELECT count(*) FROM sweep_txs WHERE funding_outpoint=?", (funding_outpoint,)) + return int(c.fetchone()[0]) + + @sql + def get_ctn(self, outpoint, addr): + if not self._has_channel(outpoint): + self._add_channel(outpoint, addr) + c = self.conn.cursor() + c.execute("SELECT max(ctn) FROM sweep_txs WHERE funding_outpoint=?", (outpoint,)) + return int(c.fetchone()[0] or 0) + + @sql + def remove_sweep_tx(self, funding_outpoint): + c = self.conn.cursor() + c.execute("DELETE FROM sweep_txs WHERE funding_outpoint=?", (funding_outpoint,)) + self.conn.commit() + + def _add_channel(self, outpoint, address): + c = self.conn.cursor() + c.execute("INSERT INTO channel_info (address, outpoint) VALUES (?,?)", (address, outpoint)) + self.conn.commit() + + @sql + def remove_channel(self, outpoint): + c = self.conn.cursor() + c.execute("DELETE FROM channel_info WHERE outpoint=?", (outpoint,)) + self.conn.commit() + + def _has_channel(self, outpoint): + c = self.conn.cursor() + c.execute("SELECT * FROM channel_info WHERE outpoint=?", (outpoint,)) + r = c.fetchone() + return r is not None + + @sql + def get_address(self, outpoint): + c = self.conn.cursor() + c.execute("SELECT address FROM channel_info WHERE outpoint=?", (outpoint,)) + r = c.fetchone() + return r[0] if r else None + + @sql + def list_channels(self): + c = self.conn.cursor() + c.execute("SELECT outpoint, address FROM channel_info") + return [(r[0], r[1]) for r in c.fetchall()] + + diff --git a/tests/regtest.py b/tests/regtest.py index 44cddbc70..5a6acb14a 100644 --- a/tests/regtest.py +++ b/tests/regtest.py @@ -110,7 +110,7 @@ class TestLightningWatchtower(TestLightning): 'watchtower_url': 'http://wtuser:wtpassword@127.0.0.1:12345', }, 'carol':{ - 'run_watchtower': 'true', + 'enable_plugin_watchtower': 'true', 'watchtower_user': 'wtuser', 'watchtower_password': 'wtpassword', 'watchtower_port': '12345', diff --git a/tests/regtest/regtest.sh b/tests/regtest/regtest.sh index 1600f4cd3..bcb882f9b 100755 --- a/tests/regtest/regtest.sh +++ b/tests/regtest/regtest.sh @@ -458,12 +458,12 @@ if [[ $1 == "watchtower" ]]; then echo "alice pays bob again" invoice2=$($bob add_request 0.01 -m "invoice2" | jq -r ".lightning_invoice") $alice lnpay $invoice2 - alice_ctn=$($alice list_channels | jq '.[0].local_ctn') + bob_ctn=$($bob list_channels | jq '.[0].local_ctn') msg="waiting until watchtower is synchronized" # watchtower needs to be at latest revoked ctn - while watchtower_ctn=$($carol get_watchtower_ctn $channel) && [[ $watchtower_ctn != $((alice_ctn-1)) ]]; do + while watchtower_ctn=$($bob get_watchtower_ctn $channel) && [[ $watchtower_ctn != $((bob_ctn-1)) ]]; do sleep 0.1 - printf "$msg $alice_ctn $watchtower_ctn\r" + printf "$msg $bob_ctn $watchtower_ctn\r" done printf "\n" echo "stopping alice and bob"