1
0

move watchtower to a plugin.

remove watchtower dialog in qt
This commit is contained in:
ThomasV
2024-12-20 14:02:54 +01:00
parent 7113cec4c7
commit 29a8c41025
14 changed files with 404 additions and 426 deletions

View File

@@ -1298,7 +1298,7 @@ class Commands:
@command('wnl')
async def get_watchtower_ctn(self, channel_point, wallet: Abstract_Wallet = None):
""" return the local watchtower's ctn of channel. used in regtests """
return await self.network.local_watchtower.sweepstore.get_ctn(channel_point, None)
return wallet.lnworker.get_watchtower_ctn(channel_point)
@command('wnpl')
async def rebalance_channels(self, from_scid, dest_scid, amount, password=None, wallet: Abstract_Wallet = None):

View File

@@ -362,34 +362,6 @@ class CommandsServer(AuthenticatedServer):
return result
class WatchTowerServer(AuthenticatedServer):
def __init__(self, network: 'Network', port:int):
self.port = port
self.config = network.config
self.network = network
watchtower_user = self.config.WATCHTOWER_SERVER_USER or ""
watchtower_password = self.config.WATCHTOWER_SERVER_PASSWORD or ""
AuthenticatedServer.__init__(self, watchtower_user, watchtower_password)
self.lnwatcher = network.local_watchtower
self.app = web.Application()
self.app.router.add_post("/", self.handle)
self.register_method(self.get_ctn)
self.register_method(self.add_sweep_tx)
async def run(self):
self.runner = web.AppRunner(self.app)
await self.runner.setup()
site = web.TCPSite(self.runner, host='localhost', port=self.port)
await site.start()
self.logger.info(f"running and listening on port {self.port}")
async def get_ctn(self, *args):
return await self.lnwatcher.get_ctn(*args)
async def add_sweep_tx(self, *args):
return await self.lnwatcher.sweepstore.add_sweep_tx(*args)
@@ -397,7 +369,6 @@ class Daemon(Logger):
network: Optional[Network] = None
gui_object: Optional['gui.BaseElectrumGui'] = None
watchtower: Optional['WatchTowerServer'] = None
@profiler
def __init__(
@@ -460,11 +431,6 @@ class Daemon(Logger):
self.logger.info(f"starting network.")
assert not self.config.NETWORK_OFFLINE
assert self.network
# server-side watchtower
if watchtower_port := self.config.WATCHTOWER_SERVER_PORT:
self.watchtower = WatchTowerServer(self.network, watchtower_port)
asyncio.run_coroutine_threadsafe(self.taskgroup.spawn(self.watchtower.run), self.asyncio_loop)
self.network.start(jobs=[self.fx.run])
# prepare lightning functionality, also load channel db early
if self.config.LIGHTNING_USE_GOSSIP:

View File

@@ -81,7 +81,6 @@ from .main_window import ElectrumWindow
from .network_dialog import NetworkDialog
from .stylesheet_patcher import patch_qt_stylesheet
from .lightning_dialog import LightningDialog
from .watchtower_dialog import WatchtowerDialog
from .exception_window import Exception_Hook
from .wizard.server_connect import QEServerConnectWizard
from .wizard.wallet import QENewWalletWizard
@@ -118,7 +117,6 @@ class ElectrumGui(BaseElectrumGui, Logger):
network_dialog: Optional['NetworkDialog']
lightning_dialog: Optional['LightningDialog']
watchtower_dialog: Optional['WatchtowerDialog']
@profiler
def __init__(self, *, config: 'SimpleConfig', daemon: 'Daemon', plugins: 'Plugins'):
@@ -150,7 +148,6 @@ class ElectrumGui(BaseElectrumGui, Logger):
self.network_dialog = None
self.lightning_dialog = None
self.watchtower_dialog = None
self._num_wizards_in_progress = 0
self._num_wizards_lock = threading.Lock()
self.dark_icon = self.config.GUI_QT_DARK_TRAY_ICON
@@ -216,8 +213,6 @@ class ElectrumGui(BaseElectrumGui, Logger):
m.addAction(_("Network"), self.show_network_dialog)
if network and network.lngossip:
m.addAction(_("Lightning Network"), self.show_lightning_dialog)
if network and network.local_watchtower:
m.addAction(_("Local Watchtower"), self.show_watchtower_dialog)
for window in self.windows:
name = window.wallet.basename()
submenu = m.addMenu(name)
@@ -267,9 +262,6 @@ class ElectrumGui(BaseElectrumGui, Logger):
if self.lightning_dialog:
self.lightning_dialog.close()
self.lightning_dialog = None
if self.watchtower_dialog:
self.watchtower_dialog.close()
self.watchtower_dialog = None
# Shut down the timer cleanly
self.timer.stop()
self.timer = None
@@ -303,11 +295,6 @@ class ElectrumGui(BaseElectrumGui, Logger):
self.lightning_dialog = LightningDialog(self)
self.lightning_dialog.bring_to_top()
def show_watchtower_dialog(self):
if not self.watchtower_dialog:
self.watchtower_dialog = WatchtowerDialog(self)
self.watchtower_dialog.bring_to_top()
def show_network_dialog(self):
if self.network_dialog:
self.network_dialog.on_event_network_updated()

View File

@@ -752,8 +752,6 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger, QtEventListener):
tools_menu.addAction(_("Electrum preferences"), self.settings_dialog)
tools_menu.addAction(_("&Network"), self.gui_object.show_network_dialog).setEnabled(bool(self.network))
if self.network and self.network.local_watchtower:
tools_menu.addAction(_("Local &Watchtower"), self.gui_object.show_watchtower_dialog)
tools_menu.addAction(_("&Plugins"), self.plugins_dialog)
tools_menu.addSeparator()
tools_menu.addAction(_("&Sign/verify message"), self.sign_verify_message)

View File

@@ -1,118 +0,0 @@
#!/usr/bin/env python
#
# Electrum - lightweight Bitcoin client
# Copyright (C) 2012 thomasv@gitorious
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation files
# (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import enum
from PyQt6.QtGui import QStandardItemModel, QStandardItem
from PyQt6.QtCore import Qt
from PyQt6.QtWidgets import (QDialog, QVBoxLayout, QPushButton, QLabel)
from electrum.i18n import _
from .util import Buttons
from .my_treeview import MyTreeView
class WatcherList(MyTreeView):
class Columns(MyTreeView.BaseColumnsEnum):
OUTPOINT = enum.auto()
TX_COUNT = enum.auto()
STATUS = enum.auto()
headers = {
Columns.OUTPOINT: _('Outpoint'),
Columns.TX_COUNT: _('Tx'),
Columns.STATUS: _('Status'),
}
def __init__(self, parent: 'WatchtowerDialog'):
super().__init__(
parent=parent,
stretch_column=self.Columns.OUTPOINT,
)
self.parent = parent
self.setModel(QStandardItemModel(self))
self.setSortingEnabled(True)
self.update()
def update(self):
if self.parent.lnwatcher is None:
return
self.model().clear()
self.update_headers(self.__class__.headers)
lnwatcher = self.parent.lnwatcher
l = lnwatcher.list_sweep_tx()
for outpoint in l:
n = lnwatcher.get_num_tx(outpoint)
status = lnwatcher.get_channel_status(outpoint)
labels = [""] * len(self.Columns)
labels[self.Columns.OUTPOINT] = outpoint
labels[self.Columns.TX_COUNT] = str(n)
labels[self.Columns.STATUS] = status
items = [QStandardItem(e) for e in labels]
self.set_editability(items)
self.model().insertRow(self.model().rowCount(), items)
size = lnwatcher.sweepstore.filesize()
self.parent.size_label.setText('Database size: %.2f Mb'%(size/1024/1024.))
class WatchtowerDialog(QDialog):
def __init__(self, gui_object):
QDialog.__init__(self)
self.gui_object = gui_object
self.config = gui_object.config
self.network = gui_object.daemon.network
assert self.network
self.lnwatcher = self.network.local_watchtower
self.setWindowTitle(_('Watchtower'))
self.setMinimumSize(600, 200)
self.size_label = QLabel()
self.watcher_list = WatcherList(self)
vbox = QVBoxLayout(self)
vbox.addWidget(self.size_label)
vbox.addWidget(self.watcher_list)
b = QPushButton(_('Close'))
b.clicked.connect(self.close)
vbox.addLayout(Buttons(b))
self.watcher_list.update()
def is_hidden(self):
return self.isMinimized() or self.isHidden()
def show_or_hide(self):
if self.is_hidden():
self.bring_to_top()
else:
self.hide()
def bring_to_top(self):
self.show()
self.raise_()
def closeEvent(self, event):
self.gui_object.watchtower_dialog = None
event.accept()

View File

@@ -10,13 +10,10 @@ from enum import IntEnum, auto
from typing import NamedTuple, Dict
from . import util
from .sql_db import SqlDB, sql
from .wallet_db import WalletDB
from .util import bfh, log_exceptions, ignore_exceptions, TxMinedInfo, random_shuffled_copy
from .address_synchronizer import AddressSynchronizer, TX_HEIGHT_LOCAL, TX_HEIGHT_UNCONF_PARENT, TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_FUTURE
from .transaction import Transaction, TxOutpoint, PartialTransaction
from .transaction import match_script_against_template
from .lnutil import WITNESS_TEMPLATE_RECEIVED_HTLC, WITNESS_TEMPLATE_OFFERED_HTLC
from .logging import Logger
@@ -39,101 +36,6 @@ class TxMinedDepth(IntEnum):
FREE = auto()
create_sweep_txs="""
CREATE TABLE IF NOT EXISTS sweep_txs (
funding_outpoint VARCHAR(34) NOT NULL,
ctn INTEGER NOT NULL,
prevout VARCHAR(34),
tx VARCHAR
)"""
create_channel_info="""
CREATE TABLE IF NOT EXISTS channel_info (
outpoint VARCHAR(34) NOT NULL,
address VARCHAR(32),
PRIMARY KEY(outpoint)
)"""
class SweepStore(SqlDB):
def __init__(self, path, network):
super().__init__(network.asyncio_loop, path)
def create_database(self):
c = self.conn.cursor()
c.execute(create_channel_info)
c.execute(create_sweep_txs)
self.conn.commit()
@sql
def get_sweep_tx(self, funding_outpoint, prevout):
c = self.conn.cursor()
c.execute("SELECT tx FROM sweep_txs WHERE funding_outpoint=? AND prevout=?", (funding_outpoint, prevout))
return [Transaction(r[0].hex()) for r in c.fetchall()]
@sql
def list_sweep_tx(self):
c = self.conn.cursor()
c.execute("SELECT funding_outpoint FROM sweep_txs")
return set([r[0] for r in c.fetchall()])
@sql
def add_sweep_tx(self, funding_outpoint, ctn, prevout, raw_tx):
c = self.conn.cursor()
assert Transaction(raw_tx).is_complete()
c.execute("""INSERT INTO sweep_txs (funding_outpoint, ctn, prevout, tx) VALUES (?,?,?,?)""", (funding_outpoint, ctn, prevout, bfh(raw_tx)))
self.conn.commit()
@sql
def get_num_tx(self, funding_outpoint):
c = self.conn.cursor()
c.execute("SELECT count(*) FROM sweep_txs WHERE funding_outpoint=?", (funding_outpoint,))
return int(c.fetchone()[0])
@sql
def get_ctn(self, outpoint, addr):
if not self._has_channel(outpoint):
self._add_channel(outpoint, addr)
c = self.conn.cursor()
c.execute("SELECT max(ctn) FROM sweep_txs WHERE funding_outpoint=?", (outpoint,))
return int(c.fetchone()[0] or 0)
@sql
def remove_sweep_tx(self, funding_outpoint):
c = self.conn.cursor()
c.execute("DELETE FROM sweep_txs WHERE funding_outpoint=?", (funding_outpoint,))
self.conn.commit()
def _add_channel(self, outpoint, address):
c = self.conn.cursor()
c.execute("INSERT INTO channel_info (address, outpoint) VALUES (?,?)", (address, outpoint))
self.conn.commit()
@sql
def remove_channel(self, outpoint):
c = self.conn.cursor()
c.execute("DELETE FROM channel_info WHERE outpoint=?", (outpoint,))
self.conn.commit()
def _has_channel(self, outpoint):
c = self.conn.cursor()
c.execute("SELECT * FROM channel_info WHERE outpoint=?", (outpoint,))
r = c.fetchone()
return r is not None
@sql
def get_address(self, outpoint):
c = self.conn.cursor()
c.execute("SELECT address FROM channel_info WHERE outpoint=?", (outpoint,))
r = c.fetchone()
return r[0] if r else None
@sql
def list_channels(self):
c = self.conn.cursor()
c.execute("SELECT outpoint, address FROM channel_info")
return [(r[0], r[1]) for r in c.fetchall()]
from .util import EventListener, event_listener
@@ -256,58 +158,6 @@ class LNWatcher(Logger, EventListener):
self.adb.add_address(o.address)
return spender_txid
def inspect_tx_candidate(self, outpoint, n: int) -> Dict[str, str]:
"""
returns a dict of spenders for a transaction of interest.
subscribes to addresses as a side effect.
n==0 => outpoint is a channel funding.
n==1 => outpoint is a commitment or close output: to_local, to_remote or first-stage htlc
n==2 => outpoint is a second-stage htlc
"""
prev_txid, index = outpoint.split(':')
spender_txid = self.adb.db.get_spent_outpoint(prev_txid, int(index))
result = {outpoint:spender_txid}
if n == 0:
if spender_txid is None:
self.channel_status[outpoint] = 'open'
elif not self.is_deeply_mined(spender_txid):
self.channel_status[outpoint] = 'closed (%d)' % self.adb.get_tx_height(spender_txid).conf
else:
self.channel_status[outpoint] = 'closed (deep)'
if spender_txid is None:
return result
spender_tx = self.adb.get_transaction(spender_txid)
if n == 1:
# if tx input is not a first-stage HTLC, we can stop recursion
# FIXME: this is not true for anchor channels
if len(spender_tx.inputs()) != 1:
return result
o = spender_tx.inputs()[0]
witness = o.witness_elements()
if not witness:
# This can happen if spender_tx is a local unsigned tx in the wallet history, e.g.:
# channel is coop-closed, outpoint is for our coop-close output, and spender_tx is an
# arbitrary wallet-spend.
return result
redeem_script = witness[-1]
if match_script_against_template(redeem_script, WITNESS_TEMPLATE_OFFERED_HTLC):
#self.logger.info(f"input script matches offered htlc {redeem_script.hex()}")
pass
elif match_script_against_template(redeem_script, WITNESS_TEMPLATE_RECEIVED_HTLC):
#self.logger.info(f"input script matches received htlc {redeem_script.hex()}")
pass
else:
return result
for i, o in enumerate(spender_tx.outputs()):
if o.address is None:
continue
if not self.adb.is_mine(o.address):
self.adb.add_address(o.address)
elif n < 2:
r = self.inspect_tx_candidate(spender_txid+':%d'%i, n+1)
result.update(r)
return result
def get_tx_mined_depth(self, txid: str):
if not txid:
return TxMinedDepth.FREE
@@ -331,92 +181,6 @@ class LNWatcher(Logger, EventListener):
return self.get_tx_mined_depth(txid) == TxMinedDepth.DEEP
class WatchTower(LNWatcher):
LOGGING_SHORTCUT = 'W'
def __init__(self, network: 'Network'):
adb = AddressSynchronizer(WalletDB('', storage=None, upgrade=True), network.config, name=self.diagnostic_name())
adb.start_network(network)
LNWatcher.__init__(self, adb, network)
self.network = network
self.sweepstore = SweepStore(os.path.join(self.network.config.path, "watchtower_db"), network)
# this maps funding_outpoints to ListenerItems, which have an event for when the watcher is done,
# and a queue for seeing which txs are being published
self.tx_progress = {} # type: Dict[str, ListenerItem]
async def stop(self):
await super().stop()
await self.adb.stop()
def diagnostic_name(self):
return "local_tower"
async def start_watching(self):
# I need to watch the addresses from sweepstore
lst = await self.sweepstore.list_channels()
for outpoint, address in random_shuffled_copy(lst):
self.add_channel(outpoint, address)
async def sweep_commitment_transaction(self, funding_outpoint, closing_tx):
spenders = self.inspect_tx_candidate(funding_outpoint, 0)
keep_watching = False
for prevout, spender in spenders.items():
if spender is not None:
keep_watching |= not self.is_deeply_mined(spender)
continue
sweep_txns = await self.sweepstore.get_sweep_tx(funding_outpoint, prevout)
for tx in sweep_txns:
await self.broadcast_or_log(funding_outpoint, tx)
keep_watching = True
return keep_watching
async def broadcast_or_log(self, funding_outpoint: str, tx: Transaction):
height = self.adb.get_tx_height(tx.txid()).height
if height != TX_HEIGHT_LOCAL:
return
try:
txid = await self.network.broadcast_transaction(tx)
except Exception as e:
self.logger.info(f'broadcast failure: txid={tx.txid()}, funding_outpoint={funding_outpoint}: {repr(e)}')
else:
self.logger.info(f'broadcast success: txid={tx.txid()}, funding_outpoint={funding_outpoint}')
if funding_outpoint in self.tx_progress:
await self.tx_progress[funding_outpoint].tx_queue.put(tx)
return txid
async def get_ctn(self, outpoint, addr):
if addr not in self.callbacks.keys():
self.logger.info(f'watching new channel: {outpoint} {addr}')
self.add_channel(outpoint, addr)
return await self.sweepstore.get_ctn(outpoint, addr)
def get_num_tx(self, outpoint):
async def f():
return await self.sweepstore.get_num_tx(outpoint)
return self.network.run_from_another_thread(f())
def list_sweep_tx(self):
async def f():
return await self.sweepstore.list_sweep_tx()
return self.network.run_from_another_thread(f())
def list_channels(self):
async def f():
return await self.sweepstore.list_channels()
return self.network.run_from_another_thread(f())
async def unwatch_channel(self, address, funding_outpoint):
await super().unwatch_channel(address, funding_outpoint)
await self.sweepstore.remove_sweep_tx(funding_outpoint)
await self.sweepstore.remove_channel(funding_outpoint)
if funding_outpoint in self.tx_progress:
self.tx_progress[funding_outpoint].all_done.set()
async def update_channel_state(self, *args, **kwargs):
pass
class LNWalletWatcher(LNWatcher):

View File

@@ -923,19 +923,10 @@ class LNWallet(LNWorker):
def diagnostic_name(self):
return self.wallet.diagnostic_name()
@ignore_exceptions
@log_exceptions
async def sync_with_local_watchtower(self):
watchtower = self.network.local_watchtower
if watchtower:
while True:
for chan in self.channels.values():
await self.sync_channel_with_watchtower(chan, watchtower.sweepstore)
await asyncio.sleep(5)
@ignore_exceptions
@log_exceptions
async def sync_with_remote_watchtower(self):
self.watchtower_ctns = {}
while True:
# periodically poll if the user updated 'watchtower_url'
await asyncio.sleep(5)
@@ -958,6 +949,9 @@ class LNWallet(LNWorker):
except aiohttp.client_exceptions.ClientConnectorError:
self.logger.info(f'could not contact remote watchtower {watchtower_url}')
def get_watchtower_ctn(self, channel_point):
return self.watchtower_ctns.get(channel_point)
async def sync_channel_with_watchtower(self, chan: Channel, watchtower):
outpoint = chan.funding_outpoint.to_str()
addr = chan.get_funding_address()
@@ -967,6 +961,7 @@ class LNWallet(LNWorker):
sweeptxs = chan.create_sweeptxs_for_watchtower(ctn)
for tx in sweeptxs:
await watchtower.add_sweep_tx(outpoint, ctn, tx.inputs()[0].prevout.to_str(), tx.serialize())
self.watchtower_ctns[outpoint] = ctn
def start_network(self, network: 'Network'):
super().start_network(network)
@@ -985,7 +980,6 @@ class LNWallet(LNWorker):
self.maybe_listen(),
self.lnwatcher.trigger_callbacks(), # shortcut (don't block) if funding tx locked and verified
self.reestablish_peers_and_channels(),
self.sync_with_local_watchtower(),
self.sync_with_remote_watchtower(),
]:
tg_coro = self.taskgroup.spawn(coro)

View File

@@ -68,7 +68,7 @@ if TYPE_CHECKING:
from .channel_db import ChannelDB
from .lnrouter import LNPathFinder
from .lnworker import LNGossip
from .lnwatcher import WatchTower
#from .lnwatcher import WatchTower
from .daemon import Daemon
from .simple_config import SimpleConfig
@@ -290,7 +290,6 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
channel_db: Optional['ChannelDB'] = None
lngossip: Optional['LNGossip'] = None
local_watchtower: Optional['WatchTower'] = None
path_finder: Optional['LNPathFinder'] = None
def __init__(self, config: 'SimpleConfig', *, daemon: 'Daemon' = None):
@@ -364,11 +363,6 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
self._has_ever_managed_to_connect_to_server = False
self._was_started = False
# lightning network
if self.config.WATCHTOWER_SERVER_ENABLED:
from . import lnwatcher
self.local_watchtower = lnwatcher.WatchTower(self)
asyncio.ensure_future(self.local_watchtower.start_watching())
def has_internet_connection(self) -> bool:
"""Our guess whether the device has Internet-connectivity."""

View File

@@ -0,0 +1,17 @@
from electrum.i18n import _
fullname = _('Watchtower')
description = """
Watchtower for Electrum.
Example setup:
electrum -o setconfig enable_plugin_watchtower True
electrum -o setconfig watchtower_user wtuser
electrum -o setconfig watchtower_password wtpassword
electrum -o setconfig watchtower_port 12345
electrum daemon -v
"""
available_for = ['cmdline']

View File

@@ -0,0 +1,31 @@
#!/usr/bin/env python
#
# Electrum - Lightweight Bitcoin Client
# Copyright (C) 2023 The Electrum Developers
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation files
# (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from .watchtower import WatchtowerPlugin
class Plugin(WatchtowerPlugin):
pass

View File

@@ -0,0 +1,46 @@
import os
import asyncio
from collections import defaultdict
from typing import TYPE_CHECKING
from aiohttp import web
from electrum.util import log_exceptions, ignore_exceptions
from electrum.logging import Logger
from electrum.util import EventListener
from electrum.lnaddr import lndecode
from electrum.daemon import AuthenticatedServer
if TYPE_CHECKING:
from electrum.network import Network
class WatchTowerServer(AuthenticatedServer):
def __init__(self, watchtower, network: 'Network', port:int):
self.port = port
self.config = network.config
self.network = network
watchtower_user = self.config.WATCHTOWER_SERVER_USER or ""
watchtower_password = self.config.WATCHTOWER_SERVER_PASSWORD or ""
AuthenticatedServer.__init__(self, watchtower_user, watchtower_password)
self.lnwatcher = watchtower
self.app = web.Application()
self.app.router.add_post("/", self.handle)
self.register_method(self.get_ctn)
self.register_method(self.add_sweep_tx)
async def run(self):
self.runner = web.AppRunner(self.app)
await self.runner.setup()
site = web.TCPSite(self.runner, host='localhost', port=self.port)
await site.start()
self.logger.info(f"running and listening on port {self.port}")
async def get_ctn(self, *args):
return await self.lnwatcher.get_ctn(*args)
async def add_sweep_tx(self, *args):
return await self.lnwatcher.sweepstore.add_sweep_tx(*args)

View File

@@ -0,0 +1,299 @@
#!/usr/bin/env python
#
# Electrum - Lightweight Bitcoin Client
# Copyright (C) 2023 The Electrum Developers
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation files
# (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import asyncio, os
from typing import TYPE_CHECKING
from typing import NamedTuple, Dict
from electrum.util import log_exceptions, random_shuffled_copy
from electrum.plugin import BasePlugin, hook
from electrum.sql_db import SqlDB, sql
from electrum.lnwatcher import LNWatcher
from electrum.transaction import Transaction, match_script_against_template
from electrum.network import Network
from electrum.address_synchronizer import AddressSynchronizer, TX_HEIGHT_LOCAL
from electrum.wallet_db import WalletDB
from electrum.lnutil import WITNESS_TEMPLATE_RECEIVED_HTLC, WITNESS_TEMPLATE_OFFERED_HTLC
from .server import WatchTowerServer
if TYPE_CHECKING:
from electrum.simple_config import SimpleConfig
class WatchtowerPlugin(BasePlugin):
def __init__(self, parent, config: 'SimpleConfig', name):
BasePlugin.__init__(self, parent, config, name)
self.config = config
self.network = Network.get_instance()
if self.network is None:
return
self.watchtower = WatchTower(self.network)
asyncio.ensure_future(self.watchtower.start_watching())
if watchtower_port := self.config.WATCHTOWER_SERVER_PORT:
self.server = WatchTowerServer(self.watchtower, self.network, watchtower_port)
asyncio.run_coroutine_threadsafe(self.network.taskgroup.spawn(self.server.run), self.network.asyncio_loop)
class WatchTower(LNWatcher):
LOGGING_SHORTCUT = 'W'
def __init__(self, network: 'Network'):
adb = AddressSynchronizer(WalletDB('', storage=None, upgrade=True), network.config, name=self.diagnostic_name())
adb.start_network(network)
LNWatcher.__init__(self, adb, network)
self.network = network
self.sweepstore = SweepStore(os.path.join(self.network.config.path, "watchtower_db"), network)
# this maps funding_outpoints to ListenerItems, which have an event for when the watcher is done,
# and a queue for seeing which txs are being published
self.tx_progress = {} # type: Dict[str, ListenerItem]
async def stop(self):
await super().stop()
await self.adb.stop()
def diagnostic_name(self):
return "local_tower"
@log_exceptions
async def start_watching(self):
# I need to watch the addresses from sweepstore
lst = await self.sweepstore.list_channels()
for outpoint, address in random_shuffled_copy(lst):
self.add_channel(outpoint, address)
def inspect_tx_candidate(self, outpoint, n: int) -> Dict[str, str]:
"""
returns a dict of spenders for a transaction of interest.
subscribes to addresses as a side effect.
n==0 => outpoint is a channel funding.
n==1 => outpoint is a commitment or close output: to_local, to_remote or first-stage htlc
n==2 => outpoint is a second-stage htlc
"""
prev_txid, index = outpoint.split(':')
spender_txid = self.adb.db.get_spent_outpoint(prev_txid, int(index))
result = {outpoint:spender_txid}
if n == 0:
if spender_txid is None:
self.channel_status[outpoint] = 'open'
elif not self.is_deeply_mined(spender_txid):
self.channel_status[outpoint] = 'closed (%d)' % self.adb.get_tx_height(spender_txid).conf
else:
self.channel_status[outpoint] = 'closed (deep)'
if spender_txid is None:
return result
spender_tx = self.adb.get_transaction(spender_txid)
if n == 1:
# if tx input is not a first-stage HTLC, we can stop recursion
# FIXME: this is not true for anchor channels
if len(spender_tx.inputs()) != 1:
return result
o = spender_tx.inputs()[0]
witness = o.witness_elements()
if not witness:
# This can happen if spender_tx is a local unsigned tx in the wallet history, e.g.:
# channel is coop-closed, outpoint is for our coop-close output, and spender_tx is an
# arbitrary wallet-spend.
return result
redeem_script = witness[-1]
if match_script_against_template(redeem_script, WITNESS_TEMPLATE_OFFERED_HTLC):
#self.logger.info(f"input script matches offered htlc {redeem_script.hex()}")
pass
elif match_script_against_template(redeem_script, WITNESS_TEMPLATE_RECEIVED_HTLC):
#self.logger.info(f"input script matches received htlc {redeem_script.hex()}")
pass
else:
return result
for i, o in enumerate(spender_tx.outputs()):
if o.address is None:
continue
if not self.adb.is_mine(o.address):
self.adb.add_address(o.address)
elif n < 2:
r = self.inspect_tx_candidate(spender_txid+':%d'%i, n+1)
result.update(r)
return result
async def sweep_commitment_transaction(self, funding_outpoint, closing_tx):
spenders = self.inspect_tx_candidate(funding_outpoint, 0)
keep_watching = False
for prevout, spender in spenders.items():
if spender is not None:
keep_watching |= not self.is_deeply_mined(spender)
continue
sweep_txns = await self.sweepstore.get_sweep_tx(funding_outpoint, prevout)
for tx in sweep_txns:
await self.broadcast_or_log(funding_outpoint, tx)
keep_watching = True
return keep_watching
async def broadcast_or_log(self, funding_outpoint: str, tx: Transaction):
height = self.adb.get_tx_height(tx.txid()).height
if height != TX_HEIGHT_LOCAL:
return
try:
txid = await self.network.broadcast_transaction(tx)
except Exception as e:
self.logger.info(f'broadcast failure: txid={tx.txid()}, funding_outpoint={funding_outpoint}: {repr(e)}')
else:
self.logger.info(f'broadcast success: txid={tx.txid()}, funding_outpoint={funding_outpoint}')
if funding_outpoint in self.tx_progress:
await self.tx_progress[funding_outpoint].tx_queue.put(tx)
return txid
async def get_ctn(self, outpoint, addr):
if addr not in self.callbacks.keys():
self.logger.info(f'watching new channel: {outpoint} {addr}')
self.add_channel(outpoint, addr)
return await self.sweepstore.get_ctn(outpoint, addr)
def get_num_tx(self, outpoint):
async def f():
return await self.sweepstore.get_num_tx(outpoint)
return self.network.run_from_another_thread(f())
def list_sweep_tx(self):
async def f():
return await self.sweepstore.list_sweep_tx()
return self.network.run_from_another_thread(f())
def list_channels(self):
async def f():
return await self.sweepstore.list_channels()
return self.network.run_from_another_thread(f())
async def unwatch_channel(self, address, funding_outpoint):
await super().unwatch_channel(address, funding_outpoint)
await self.sweepstore.remove_sweep_tx(funding_outpoint)
await self.sweepstore.remove_channel(funding_outpoint)
if funding_outpoint in self.tx_progress:
self.tx_progress[funding_outpoint].all_done.set()
async def update_channel_state(self, *args, **kwargs):
pass
create_sweep_txs="""
CREATE TABLE IF NOT EXISTS sweep_txs (
funding_outpoint VARCHAR(34) NOT NULL,
ctn INTEGER NOT NULL,
prevout VARCHAR(34),
tx VARCHAR
)"""
create_channel_info="""
CREATE TABLE IF NOT EXISTS channel_info (
outpoint VARCHAR(34) NOT NULL,
address VARCHAR(32),
PRIMARY KEY(outpoint)
)"""
class SweepStore(SqlDB):
def __init__(self, path, network):
super().__init__(network.asyncio_loop, path)
def create_database(self):
c = self.conn.cursor()
c.execute(create_channel_info)
c.execute(create_sweep_txs)
self.conn.commit()
@sql
def get_sweep_tx(self, funding_outpoint, prevout):
c = self.conn.cursor()
c.execute("SELECT tx FROM sweep_txs WHERE funding_outpoint=? AND prevout=?", (funding_outpoint, prevout))
return [Transaction(r[0].hex()) for r in c.fetchall()]
@sql
def list_sweep_tx(self):
c = self.conn.cursor()
c.execute("SELECT funding_outpoint FROM sweep_txs")
return set([r[0] for r in c.fetchall()])
@sql
def add_sweep_tx(self, funding_outpoint, ctn, prevout, raw_tx):
c = self.conn.cursor()
assert Transaction(raw_tx).is_complete()
c.execute("""INSERT INTO sweep_txs (funding_outpoint, ctn, prevout, tx) VALUES (?,?,?,?)""", (funding_outpoint, ctn, prevout, bytes.fromhex(raw_tx)))
self.conn.commit()
@sql
def get_num_tx(self, funding_outpoint):
c = self.conn.cursor()
c.execute("SELECT count(*) FROM sweep_txs WHERE funding_outpoint=?", (funding_outpoint,))
return int(c.fetchone()[0])
@sql
def get_ctn(self, outpoint, addr):
if not self._has_channel(outpoint):
self._add_channel(outpoint, addr)
c = self.conn.cursor()
c.execute("SELECT max(ctn) FROM sweep_txs WHERE funding_outpoint=?", (outpoint,))
return int(c.fetchone()[0] or 0)
@sql
def remove_sweep_tx(self, funding_outpoint):
c = self.conn.cursor()
c.execute("DELETE FROM sweep_txs WHERE funding_outpoint=?", (funding_outpoint,))
self.conn.commit()
def _add_channel(self, outpoint, address):
c = self.conn.cursor()
c.execute("INSERT INTO channel_info (address, outpoint) VALUES (?,?)", (address, outpoint))
self.conn.commit()
@sql
def remove_channel(self, outpoint):
c = self.conn.cursor()
c.execute("DELETE FROM channel_info WHERE outpoint=?", (outpoint,))
self.conn.commit()
def _has_channel(self, outpoint):
c = self.conn.cursor()
c.execute("SELECT * FROM channel_info WHERE outpoint=?", (outpoint,))
r = c.fetchone()
return r is not None
@sql
def get_address(self, outpoint):
c = self.conn.cursor()
c.execute("SELECT address FROM channel_info WHERE outpoint=?", (outpoint,))
r = c.fetchone()
return r[0] if r else None
@sql
def list_channels(self):
c = self.conn.cursor()
c.execute("SELECT outpoint, address FROM channel_info")
return [(r[0], r[1]) for r in c.fetchall()]