diff --git a/electrum/submarine_swaps.py b/electrum/submarine_swaps.py index 2c8fb4e84..9325f71d2 100644 --- a/electrum/submarine_swaps.py +++ b/electrum/submarine_swaps.py @@ -27,7 +27,7 @@ from .transaction import PartialTxInput, PartialTxOutput, PartialTransaction, Tr from .transaction import script_GetOp, match_script_against_template, OPPushDataGeneric, OPPushDataPubkey from .util import (log_exceptions, ignore_exceptions, BelowDustLimit, OldTaskGroup, age, ca_path, gen_nostr_ann_pow, get_nostr_ann_pow_amount, make_aiohttp_proxy_connector, - get_running_loop, get_asyncio_loop, wait_for2) + get_running_loop, get_asyncio_loop, wait_for2, run_sync_function_on_asyncio_thread) from .lnutil import REDEEM_AFTER_DOUBLE_SPENT_DELAY from .bitcoin import dust_threshold, DummyAddress from .logging import Logger @@ -956,8 +956,8 @@ class SwapManager(Logger): self.is_initialized.set() self.pairs_updated.set() self.pairs_updated.clear() - loop = get_asyncio_loop() - loop.call_soon_threadsafe(trigger) + + run_sync_function_on_asyncio_thread(trigger, block=True) def server_maybe_trigger_liquidity_update(self) -> None: """ diff --git a/electrum/util.py b/electrum/util.py index cc231ba89..e4a56b44d 100644 --- a/electrum/util.py +++ b/electrum/util.py @@ -53,6 +53,7 @@ from abc import abstractmethod, ABC import socket import enum from contextlib import nullcontext +import traceback import attr import aiohttp @@ -1709,6 +1710,41 @@ def _set_custom_task_factory(loop: asyncio.AbstractEventLoop): loop.set_task_factory(factory) +def run_sync_function_on_asyncio_thread(func: Callable, *, block: bool) -> None: + """Run a non-async fn on the asyncio thread. Can be called from any thread. + + If the current thread is already the asyncio thread, func is guaranteed + to have been completed when this method returns. + + For any other thread, we only wait for completion if `block` is True. + """ + assert not asyncio.iscoroutinefunction(func), "func must be a non-async function" + asyncio_loop = get_asyncio_loop() + if get_running_loop() == asyncio_loop: # we are running on the asyncio thread + func() + else: # non-asyncio thread + async def wrapper(): + return func() + fut = asyncio.run_coroutine_threadsafe(wrapper(), loop=asyncio_loop) + if block: + fut.result() + else: + # add explicit logging of exceptions, otherwise they might get lost + tb1 = traceback.format_stack()[:-1] + tb1_str = "".join(tb1) + def on_done(fut_: concurrent.futures.Future): + assert fut_.done() + if fut_.cancelled(): + _logger.debug(f"func cancelled. {func=}.") + elif exc := fut_.exception(): + # note: We explicitly log the first part of the traceback, tb1_str. + # The second part gets logged by setting "exc_info". + _logger.error( + f"func errored. {func=}. {exc=}" + f"\n{tb1_str}", exc_info=exc) + fut.add_done_callback(on_done) + + class OrderedDictWithIndex(OrderedDict): """An OrderedDict that keeps track of the positions of keys. @@ -1890,14 +1926,7 @@ class CallbackManager(Logger): self.logger.error(f"cb errored. {event=}. {exc=}", exc_info=exc) fut.add_done_callback(on_done) else: # non-async cb - # note: the cb needs to run in the asyncio thread - if get_running_loop() == loop: - # run callback immediately, so that it is guaranteed - # to have been executed when this method returns - callback(*args) - else: - # note: if cb raises, asyncio will log the exception - loop.call_soon_threadsafe(callback, *args) + run_sync_function_on_asyncio_thread(partial(callback, *args), block=False) callback_mgr = CallbackManager()