util.CallbackManager: make sure exceptions in cbs are logged
this resolves the existing FIXME
This commit is contained in:
@@ -21,6 +21,7 @@
|
|||||||
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
# SOFTWARE.
|
# SOFTWARE.
|
||||||
import binascii
|
import binascii
|
||||||
|
import concurrent.futures
|
||||||
import os, sys, re, json
|
import os, sys, re, json
|
||||||
from collections import defaultdict, OrderedDict
|
from collections import defaultdict, OrderedDict
|
||||||
from typing import (NamedTuple, Union, TYPE_CHECKING, Tuple, Optional, Callable, Any,
|
from typing import (NamedTuple, Union, TYPE_CHECKING, Tuple, Optional, Callable, Any,
|
||||||
@@ -1729,11 +1730,12 @@ def randrange(bound: int) -> int:
|
|||||||
return secrets.randbelow(bound - 1) + 1
|
return secrets.randbelow(bound - 1) + 1
|
||||||
|
|
||||||
|
|
||||||
class CallbackManager:
|
class CallbackManager(Logger):
|
||||||
# callbacks set by the GUI or any thread
|
# callbacks set by the GUI or any thread
|
||||||
# guarantee: the callbacks will always get triggered from the asyncio thread.
|
# guarantee: the callbacks will always get triggered from the asyncio thread.
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
Logger.__init__(self)
|
||||||
self.callback_lock = threading.Lock()
|
self.callback_lock = threading.Lock()
|
||||||
self.callbacks = defaultdict(list) # note: needs self.callback_lock
|
self.callbacks = defaultdict(list) # note: needs self.callback_lock
|
||||||
self._running_cb_futs = set()
|
self._running_cb_futs = set()
|
||||||
@@ -1759,18 +1761,25 @@ class CallbackManager:
|
|||||||
with self.callback_lock:
|
with self.callback_lock:
|
||||||
callbacks = self.callbacks[event][:]
|
callbacks = self.callbacks[event][:]
|
||||||
for callback in callbacks:
|
for callback in callbacks:
|
||||||
# FIXME: if callback throws, we will lose the traceback
|
if asyncio.iscoroutinefunction(callback): # async cb
|
||||||
if asyncio.iscoroutinefunction(callback):
|
|
||||||
fut = asyncio.run_coroutine_threadsafe(callback(*args), loop)
|
fut = asyncio.run_coroutine_threadsafe(callback(*args), loop)
|
||||||
# keep strong references around to avoid GC issues:
|
# keep strong references around to avoid GC issues:
|
||||||
self._running_cb_futs.add(fut)
|
self._running_cb_futs.add(fut)
|
||||||
fut.add_done_callback(lambda fut_: self._running_cb_futs.remove(fut_))
|
def on_done(fut_: concurrent.futures.Future):
|
||||||
elif get_running_loop() == loop:
|
assert fut_.done()
|
||||||
# run callback immediately, so that it is guaranteed
|
self._running_cb_futs.remove(fut_)
|
||||||
# to have been executed when this method returns
|
if exc := fut_.exception():
|
||||||
callback(*args)
|
self.logger.error(f"cb errored. {event=}. {exc=}", exc_info=exc)
|
||||||
else:
|
fut.add_done_callback(on_done)
|
||||||
loop.call_soon_threadsafe(callback, *args)
|
else: # non-async cb
|
||||||
|
# note: the cb needs to run in the asyncio thread
|
||||||
|
if get_running_loop() == loop:
|
||||||
|
# run callback immediately, so that it is guaranteed
|
||||||
|
# to have been executed when this method returns
|
||||||
|
callback(*args)
|
||||||
|
else:
|
||||||
|
# note: if cb raises, asyncio will log the exception
|
||||||
|
loop.call_soon_threadsafe(callback, *args)
|
||||||
|
|
||||||
|
|
||||||
callback_mgr = CallbackManager()
|
callback_mgr = CallbackManager()
|
||||||
|
|||||||
Reference in New Issue
Block a user