swapserver: use taskgroup (follow-up 0083560ee6)
This commit is contained in:
@@ -15,7 +15,7 @@ from .bitcoin import (script_to_p2wsh, opcodes, p2wsh_nested_script, push_script
|
||||
is_segwit_address, construct_witness)
|
||||
from .transaction import PartialTxInput, PartialTxOutput, PartialTransaction, Transaction, TxInput, TxOutpoint
|
||||
from .transaction import script_GetOp, match_script_against_template, OPPushDataGeneric, OPPushDataPubkey
|
||||
from .util import log_exceptions, BelowDustLimit
|
||||
from .util import log_exceptions, BelowDustLimit, OldTaskGroup
|
||||
from .lnutil import REDEEM_AFTER_DOUBLE_SPENT_DELAY, ln_dummy_address
|
||||
from .bitcoin import dust_threshold
|
||||
from .logging import Logger
|
||||
@@ -187,6 +187,7 @@ class SwapManager(Logger):
|
||||
self.server_supports_htlc_first = False
|
||||
self.wallet = wallet
|
||||
self.lnworker = lnworker
|
||||
self.taskgroup = None
|
||||
|
||||
self.swaps = self.wallet.db.get_dict('submarine_swaps') # type: Dict[str, SwapData]
|
||||
self._swaps_by_funding_outpoint = {} # type: Dict[TxOutpoint, SwapData]
|
||||
@@ -216,8 +217,27 @@ class SwapManager(Logger):
|
||||
if swap.is_redeemed:
|
||||
continue
|
||||
self.add_lnwatcher_callback(swap)
|
||||
|
||||
self.taskgroup = OldTaskGroup()
|
||||
coro = self.pay_pending_invoices()
|
||||
asyncio.run_coroutine_threadsafe(network.taskgroup.spawn(coro), network.asyncio_loop)
|
||||
asyncio.run_coroutine_threadsafe(self.taskgroup.spawn(coro), self.network.asyncio_loop)
|
||||
|
||||
async def pay_invoice(self, key):
|
||||
self.logger.info(f'trying to pay invoice {key}')
|
||||
self.invoices_to_pay[key] = 1000000000000 # lock
|
||||
try:
|
||||
invoice = self.wallet.get_invoice(key)
|
||||
success, log = await self.lnworker.pay_invoice(invoice.lightning_invoice, attempts=10)
|
||||
except Exception as e:
|
||||
self.logger.info(f'exception paying {key}, will not retry')
|
||||
self.invoices_to_pay.pop(key, None)
|
||||
return
|
||||
if not success:
|
||||
self.logger.info(f'failed to pay {key}, will retry in 10 minutes')
|
||||
self.invoices_to_pay[key] = now() + 600
|
||||
else:
|
||||
self.logger.info(f'paid invoice {key}')
|
||||
self.invoices_to_pay.pop(key, None)
|
||||
|
||||
async def pay_pending_invoices(self):
|
||||
self.invoices_to_pay = {}
|
||||
@@ -226,24 +246,7 @@ class SwapManager(Logger):
|
||||
for key, not_before in list(self.invoices_to_pay.items()):
|
||||
if now() < not_before:
|
||||
continue
|
||||
swap = self.swaps.get(key)
|
||||
if not swap:
|
||||
continue
|
||||
invoice = self.wallet.get_invoice(key)
|
||||
if not invoice:
|
||||
continue
|
||||
self.logger.info(f'trying to pay invoice {key}')
|
||||
try:
|
||||
success, log = await self.lnworker.pay_invoice(invoice.lightning_invoice, attempts=10)
|
||||
except Exception as e:
|
||||
success = False
|
||||
if success:
|
||||
self.invoices_to_pay.pop(key)
|
||||
continue
|
||||
# retry in 10 minutes
|
||||
self.logger.info(f'failed to pay invoice {key}')
|
||||
self.invoices_to_pay[key] = now() + 600
|
||||
|
||||
await self.taskgroup.spawn(self.pay_invoice(key))
|
||||
|
||||
@log_exceptions
|
||||
async def _claim_swap(self, swap: SwapData) -> None:
|
||||
|
||||
Reference in New Issue
Block a user