1
0

TxBatcher: store fee policy names instead of fee descriptors

This allows to change the fee policy of batches dynamically.
Config.fee_policy is a mapping between policy names and descriptors.
This commit is contained in:
ThomasV
2025-05-25 14:44:42 +02:00
parent b91c5d18cb
commit 58480a69db
5 changed files with 29 additions and 23 deletions

View File

@@ -196,7 +196,7 @@ class LNWatcher(Logger, EventListener):
def maybe_redeem(self, sweep_info: 'SweepInfo') -> bool: def maybe_redeem(self, sweep_info: 'SweepInfo') -> bool:
""" returns False if it was dust """ """ returns False if it was dust """
try: try:
self.lnworker.wallet.txbatcher.add_sweep_input('lnwatcher', sweep_info, self.config.FEE_POLICY_LIGHTNING) self.lnworker.wallet.txbatcher.add_sweep_input('lnwatcher', sweep_info)
except BelowDustLimit: except BelowDustLimit:
return False return False
return True return True

View File

@@ -734,9 +734,10 @@ Warning: setting this to too low will result in lots of payment failures."""),
TEST_SHUTDOWN_FEE_RANGE = ConfigVar('test_shutdown_fee_range', default=None) TEST_SHUTDOWN_FEE_RANGE = ConfigVar('test_shutdown_fee_range', default=None)
TEST_SHUTDOWN_LEGACY = ConfigVar('test_shutdown_legacy', default=False, type_=bool) TEST_SHUTDOWN_LEGACY = ConfigVar('test_shutdown_legacy', default=False, type_=bool)
FEE_POLICY = ConfigVar('fee_policy', default='eta:2', type_=str) # exposed to GUI # fee_policy is a dict: fee_policy_name -> fee_policy_descriptor
FEE_POLICY_LIGHTNING = ConfigVar('fee_policy_lightning', default='eta:2', type_=str) # for txbatcher (sweeping) FEE_POLICY = ConfigVar('fee_policy.default', default='eta:2', type_=str) # exposed to GUI
FEE_POLICY_SWAPS = ConfigVar('fee_policy_swaps', default='eta:2', type_=str) # for txbatcher (sweeping and sending if we are a swapserver) FEE_POLICY_LIGHTNING = ConfigVar('fee_policy.lnwatcher', default='eta:2', type_=str) # for txbatcher (sweeping)
FEE_POLICY_SWAPS = ConfigVar('fee_policy.swaps', default='eta:2', type_=str) # for txbatcher (sweeping and sending if we are a swapserver)
RPC_USERNAME = ConfigVar('rpcuser', default=None, type_=str) RPC_USERNAME = ConfigVar('rpcuser', default=None, type_=str)
RPC_PASSWORD = ConfigVar('rpcpassword', default=None, type_=str) RPC_PASSWORD = ConfigVar('rpcpassword', default=None, type_=str)

View File

@@ -461,7 +461,7 @@ class SwapManager(Logger):
can_be_batched=can_be_batched, can_be_batched=can_be_batched,
) )
try: try:
self.wallet.txbatcher.add_sweep_input('swaps', sweep_info, self.config.FEE_POLICY_SWAPS) self.wallet.txbatcher.add_sweep_input('swaps', sweep_info)
except BelowDustLimit: except BelowDustLimit:
self.logger.info('utxo value below dust threshold') self.logger.info('utxo value below dust threshold')
return return
@@ -496,7 +496,7 @@ class SwapManager(Logger):
swap = self.swaps[key] swap = self.swaps[key]
if not swap.is_funded(): if not swap.is_funded():
output = self.create_funding_output(swap) output = self.create_funding_output(swap)
self.wallet.txbatcher.add_payment_output('swaps', output, self.config.FEE_POLICY_SWAPS) self.wallet.txbatcher.add_payment_output('swaps', output)
swap._payment_pending = True swap._payment_pending = True
else: else:
self.logger.info(f'key not in swaps {key}') self.logger.info(f'key not in swaps {key}')

View File

@@ -96,12 +96,12 @@ class TxBatcher(Logger):
self.password_future = None self.password_future = None
@locked @locked
def add_payment_output(self, key: str, output: 'PartialTxOutput', fee_policy_descriptor: str): def add_payment_output(self, key: str, output: 'PartialTxOutput'):
batch = self._maybe_create_new_batch(key, fee_policy_descriptor) batch = self._maybe_create_new_batch(key, fee_policy_name=key)
batch.add_payment_output(output) batch.add_payment_output(output)
@locked @locked
def add_sweep_input(self, key: str, sweep_info: 'SweepInfo', fee_policy_descriptor: str): def add_sweep_input(self, key: str, sweep_info: 'SweepInfo'):
if sweep_info.txin and sweep_info.txout: if sweep_info.txin and sweep_info.txout:
# detect legacy htlc using name and csv delay # detect legacy htlc using name and csv delay
if sweep_info.name in ['received-htlc', 'offered-htlc'] and sweep_info.csv_delay == 0: if sweep_info.name in ['received-htlc', 'offered-htlc'] and sweep_info.csv_delay == 0:
@@ -109,23 +109,21 @@ class TxBatcher(Logger):
self.logger.info(f'received {sweep_info.name}') self.logger.info(f'received {sweep_info.name}')
self._legacy_htlcs[sweep_info.txin.prevout] = sweep_info self._legacy_htlcs[sweep_info.txin.prevout] = sweep_info
return return
fee_policy_name = key
if not sweep_info.can_be_batched: if not sweep_info.can_be_batched:
# create a batch only for that input # create a batch only for that input
key = sweep_info.txin.prevout.to_str() key = sweep_info.txin.prevout.to_str()
batch = self._maybe_create_new_batch(key, fee_policy_descriptor) batch = self._maybe_create_new_batch(key, fee_policy_name)
batch.add_sweep_input(sweep_info) batch.add_sweep_input(sweep_info)
def _maybe_create_new_batch(self, key, fee_policy_descriptor: str): def _maybe_create_new_batch(self, key, fee_policy_name: str):
assert util.get_running_loop() == util.get_asyncio_loop(), f"this must be run on the asyncio thread!" assert util.get_running_loop() == util.get_asyncio_loop(), f"this must be run on the asyncio thread!"
if key not in self.storage: if key not in self.storage:
self.logger.info(f'creating new batch: {key}') self.logger.info(f'creating new batch: {key}')
self.storage[key] = { 'fee_policy': fee_policy_descriptor, 'txids': [], 'prevout': None } self.storage[key] = { 'fee_policy_name': fee_policy_name, 'txids': [], 'prevout': None }
self.tx_batches[key] = batch = TxBatch(self.wallet, self.storage[key]) self.tx_batches[key] = batch = TxBatch(self.wallet, self.storage[key])
if self.taskgroup: if self.taskgroup:
asyncio.ensure_future(self.taskgroup.spawn(self.run_batch(key, batch))) asyncio.ensure_future(self.taskgroup.spawn(self.run_batch(key, batch)))
elif self.storage[key]['fee_policy'] != fee_policy_descriptor:
# maybe update policy?
self.logger.warning('fee policy passed to txbatcher inconsistent with existing batch')
return self.tx_batches[key] return self.tx_batches[key]
@locked @locked
@@ -233,11 +231,18 @@ class TxBatch(Logger):
# list of tx that were broadcast. Each tx is a RBF replacement of the previous one. Ony one can get mined. # list of tx that were broadcast. Each tx is a RBF replacement of the previous one. Ony one can get mined.
self._prevout = storage.get('prevout') self._prevout = storage.get('prevout')
self._batch_txids = storage['txids'] self._batch_txids = storage['txids']
self.fee_policy = FeePolicy(storage['fee_policy']) self._fee_policy_name = storage.get('fee_policy_name', 'default')
self._base_tx = None # current batch tx. last element of batch_txids self._base_tx = None # current batch tx. last element of batch_txids
self._parent_tx = None self._parent_tx = None
self._unconfirmed_sweeps = set() # list of inputs we are sweeping (until spending tx is confirmed) self._unconfirmed_sweeps = set() # list of inputs we are sweeping (until spending tx is confirmed)
@property
def fee_policy(self):
# this assumes the descriptor is in config.fee_policy
cv_name = 'fee_policy' + '.' + self._fee_policy_name
descriptor = self.wallet.config.get(cv_name, 'eta:2')
return FeePolicy(descriptor)
@log_exceptions @log_exceptions
async def run(self): async def run(self):
while not self.is_done(): while not self.is_done():

View File

@@ -70,7 +70,7 @@ class TestTxBatcher(ElectrumTestCase):
def setUp(self): def setUp(self):
super().setUp() super().setUp()
self.config = SimpleConfig({'electrum_path': self.electrum_path}) self.config = SimpleConfig({'electrum_path': self.electrum_path})
self.fee_policy_descriptor = 'feerate:5000' self.config.FEE_POLICY = 'feerate:5000'
async def asyncSetUp(self): async def asyncSetUp(self):
await super().asyncSetUp() await super().asyncSetUp()
@@ -109,12 +109,12 @@ class TestTxBatcher(ElectrumTestCase):
self.logger.info(f'wallet balance {wallet.get_balance()}') self.logger.info(f'wallet balance {wallet.get_balance()}')
# payment 1 -> tx1(output1) # payment 1 -> tx1(output1)
output1 = PartialTxOutput.from_address_and_value(OUTGOING_ADDRESS, 10_000) output1 = PartialTxOutput.from_address_and_value(OUTGOING_ADDRESS, 10_000)
wallet.txbatcher.add_payment_output('default', output1, self.fee_policy_descriptor) wallet.txbatcher.add_payment_output('default', output1)
tx1 = await self.network.next_tx() tx1 = await self.network.next_tx()
assert output1 in tx1.outputs() assert output1 in tx1.outputs()
# payment 2 -> tx2(output1, output2) # payment 2 -> tx2(output1, output2)
output2 = PartialTxOutput.from_address_and_value(OUTGOING_ADDRESS, 20_000) output2 = PartialTxOutput.from_address_and_value(OUTGOING_ADDRESS, 20_000)
wallet.txbatcher.add_payment_output('default', output2, self.fee_policy_descriptor) wallet.txbatcher.add_payment_output('default', output2)
tx1_prime = await self.network.next_tx() tx1_prime = await self.network.next_tx()
assert wallet.adb.get_transaction(tx1_prime.txid()) is not None assert wallet.adb.get_transaction(tx1_prime.txid()) is not None
assert len(tx1_prime.outputs()) == 3 assert len(tx1_prime.outputs()) == 3
@@ -152,14 +152,14 @@ class TestTxBatcher(ElectrumTestCase):
# to_self_payment tx1 # to_self_payment tx1
output1 = PartialTxOutput.from_address_and_value("tb1qyfnv3y866ufedugxxxfksyratv4pz3h78g9dad", 20_000) output1 = PartialTxOutput.from_address_and_value("tb1qyfnv3y866ufedugxxxfksyratv4pz3h78g9dad", 20_000)
wallet.txbatcher.add_payment_output('default', output1, self.fee_policy_descriptor) wallet.txbatcher.add_payment_output('default', output1)
tx1 = await self.network.next_tx() tx1 = await self.network.next_tx()
assert len(tx1.outputs()) == 2 assert len(tx1.outputs()) == 2
assert output1 in tx1.outputs() assert output1 in tx1.outputs()
# outgoing payment tx2 # outgoing payment tx2
output2 = PartialTxOutput.from_address_and_value("tb1qkfn0fude7z789uys2u7sf80kd4805zpvs3na0h", 90_000) output2 = PartialTxOutput.from_address_and_value("tb1qkfn0fude7z789uys2u7sf80kd4805zpvs3na0h", 90_000)
wallet.txbatcher.add_payment_output('default', output2, self.fee_policy_descriptor) wallet.txbatcher.add_payment_output('default', output2)
# before tx1 gets confirmed, txbatch.create_transaction will raise notenoughfunds # before tx1 gets confirmed, txbatch.create_transaction will raise notenoughfunds
await asyncio.sleep(wallet.txbatcher.SLEEP_INTERVAL) await asyncio.sleep(wallet.txbatcher.SLEEP_INTERVAL)
@@ -208,14 +208,14 @@ class TestTxBatcher(ElectrumTestCase):
name='swap claim', name='swap claim',
can_be_batched=True, can_be_batched=True,
) )
wallet.txbatcher.add_sweep_input('swaps', sweep_info, self.fee_policy_descriptor) wallet.txbatcher.add_sweep_input('default', sweep_info)
tx = await self.network.next_tx() tx = await self.network.next_tx()
txid = tx.txid() txid = tx.txid()
self.assertEqual(SWAP_CLAIM_TX, str(tx)) self.assertEqual(SWAP_CLAIM_TX, str(tx))
# add a new payment, reusing the same input # add a new payment, reusing the same input
# this tests that txin.make_witness() can be called more than once # this tests that txin.make_witness() can be called more than once
output1 = PartialTxOutput.from_address_and_value("tb1qyfnv3y866ufedugxxxfksyratv4pz3h78g9dad", 20_000) output1 = PartialTxOutput.from_address_and_value("tb1qyfnv3y866ufedugxxxfksyratv4pz3h78g9dad", 20_000)
wallet.txbatcher.add_payment_output('swaps', output1, self.fee_policy_descriptor) wallet.txbatcher.add_payment_output('default', output1)
new_tx = await self.network.next_tx() new_tx = await self.network.next_tx()
# check that we batched with previous tx # check that we batched with previous tx
assert new_tx.inputs()[0].prevout == tx.inputs()[0].prevout == txin.prevout assert new_tx.inputs()[0].prevout == tx.inputs()[0].prevout == txin.prevout