1
0

util: add AsyncHangDetector, and use it for lnpeer._process_message

This commit is contained in:
SomberNight
2024-06-03 18:36:08 +00:00
parent eb6e503556
commit af2c9b081c
2 changed files with 36 additions and 2 deletions

View File

@@ -22,7 +22,7 @@ from . import ecc
from .ecc import ecdsa_sig64_from_r_and_s, ecdsa_der_sig_from_ecdsa_sig64, ECPubkey from .ecc import ecdsa_sig64_from_r_and_s, ecdsa_der_sig_from_ecdsa_sig64, ECPubkey
from . import constants from . import constants
from .util import (bfh, log_exceptions, ignore_exceptions, chunks, OldTaskGroup, from .util import (bfh, log_exceptions, ignore_exceptions, chunks, OldTaskGroup,
UnrelatedTransactionException, error_text_bytes_to_safe_str) UnrelatedTransactionException, error_text_bytes_to_safe_str, AsyncHangDetector)
from . import transaction from . import transaction
from .bitcoin import make_op_return, DummyAddress from .bitcoin import make_op_return, DummyAddress
from .transaction import PartialTxOutput, match_script_against_template, Sighash from .transaction import PartialTxOutput, match_script_against_template, Sighash
@@ -240,7 +240,11 @@ class Peer(Logger):
# note: the message handler might be async or non-async. In either case, by default, # note: the message handler might be async or non-async. In either case, by default,
# we wait for it to complete before we return, i.e. before the next message is processed. # we wait for it to complete before we return, i.e. before the next message is processed.
if asyncio.iscoroutinefunction(f): if asyncio.iscoroutinefunction(f):
await f(*args) async with AsyncHangDetector(
message=f"message handler still running for {message_type.upper()}",
logger=self.logger,
):
await f(*args)
else: else:
f(*args) f(*args)

View File

@@ -22,6 +22,7 @@
# SOFTWARE. # SOFTWARE.
import binascii import binascii
import concurrent.futures import concurrent.futures
import logging
import os, sys, re, json import os, sys, re, json
from collections import defaultdict, OrderedDict from collections import defaultdict, OrderedDict
from typing import (NamedTuple, Union, TYPE_CHECKING, Tuple, Optional, Callable, Any, from typing import (NamedTuple, Union, TYPE_CHECKING, Tuple, Optional, Callable, Any,
@@ -488,6 +489,35 @@ def profiler(func=None, *, min_threshold: Union[int, float, None] = None):
return do_profile return do_profile
class AsyncHangDetector:
"""Context manager that logs every `n` seconds if encapsulated context still has not exited."""
def __init__(
self,
*,
period_sec: int = 15,
message: str,
logger: logging.Logger = None,
):
self.period_sec = period_sec
self.message = message
self.logger = logger or _logger
async def _monitor(self):
# note: this assumes that the event loop itself is not blocked
t0 = time.monotonic()
while True:
await asyncio.sleep(self.period_sec)
t1 = time.monotonic()
self.logger.info(f"{self.message} (after {t1 - t0:.2f} sec)")
async def __aenter__(self):
self.mtask = asyncio.create_task(self._monitor())
async def __aexit__(self, exc_type, exc, tb):
self.mtask.cancel()
def android_ext_dir(): def android_ext_dir():
from android.storage import primary_external_storage_path from android.storage import primary_external_storage_path
return primary_external_storage_path() return primary_external_storage_path()