1
0

Merge pull request #10429 from f321x/fix_deadlock

plugin: make DeviceMgr.run non-blocking, fix lock
This commit is contained in:
ghost43
2026-01-21 15:28:20 +00:00
committed by GitHub
2 changed files with 17 additions and 3 deletions

View File

@@ -36,6 +36,7 @@ from urllib.parse import urlparse
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
import concurrent
from concurrent.futures import Future
import zipimport
from functools import wraps, partial
from itertools import chain
@@ -1040,6 +1041,7 @@ class DeviceMgr(ThreadJob):
self._recognised_vendor = {} # type: Dict[int, HW_PluginBase] # vendor_id -> Plugin
# Custom enumerate functions for devices we don't know about.
self._enumerate_func = set() # Needs self.lock.
self._ongoing_timeout_checks = {} # type: Dict[str, Future]
self.lock = threading.RLock()
@@ -1053,10 +1055,16 @@ class DeviceMgr(ThreadJob):
"""Handle device timeouts. Runs in the context of the Plugins
thread."""
with self.lock:
clients = list(self.clients.keys())
clients = list(self.clients.items())
cutoff = time.time() - self.config.get_session_timeout()
for client in clients:
client.timeout(cutoff)
for client, client_id in clients:
if fut := self._ongoing_timeout_checks.get(client_id):
if not fut.done():
continue
# scheduling the timeout check prevents blocking the Plugins DaemonThread if the
# _hwd_comms_executor Thread is blocked (e.g. due to it awaiting user input).
fut = _hwd_comms_executor.submit(client.timeout, cutoff)
self._ongoing_timeout_checks[client_id] = fut
def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
for pair in device_pairs:
@@ -1113,6 +1121,8 @@ class DeviceMgr(ThreadJob):
with self.lock:
client = self._client_by_id(id_)
self.clients.pop(client, None)
if fut := self._ongoing_timeout_checks.pop(id_, None):
fut.cancel()
if client:
client.close()

View File

@@ -378,10 +378,14 @@ class DaemonThread(threading.Thread, Logger):
# malformed or malicious server responses
with self.job_lock:
for job in self.jobs:
start = time.perf_counter()
try:
job.run()
except Exception as e:
self.logger.exception('')
duration = time.perf_counter() - start
if duration > 0.5:
self.logger.warning(f"thread job {job} blocked {self} DaemonThread for {duration:.2f} s")
def remove_jobs(self, jobs):
with self.job_lock: