network: test if interface is alive before iface.taskgroup.spawn
closes https://github.com/spesmilo/electrum/issues/7677 ``` E/n | network | taskgroup died. Traceback (most recent call last): File "/opt/electrum/electrum/network.py", line 1204, in main [await group.spawn(job) for job in self._jobs] File "/home/voegtlin/.local/lib/python3.8/site-packages/aiorpcx/curio.py", line 297, in __aexit__ await self.join() File "/opt/electrum/electrum/util.py", line 1255, in join task.result() File "/opt/electrum/electrum/network.py", line 1277, in _maintain_sessions await maintain_main_interface() File "/opt/electrum/electrum/network.py", line 1268, in maintain_main_interface await self._ensure_there_is_a_main_interface() File "/opt/electrum/electrum/network.py", line 1245, in _ensure_there_is_a_main_interface await self._switch_to_random_interface() File "/opt/electrum/electrum/network.py", line 648, in _switch_to_random_interface await self.switch_to_interface(random.choice(servers)) File "/opt/electrum/electrum/network.py", line 714, in switch_to_interface await i.taskgroup.spawn(self._request_server_info(i)) File "/home/voegtlin/.local/lib/python3.8/site-packages/aiorpcx/curio.py", line 204, in spawn self._add_task(task) File "/home/voegtlin/.local/lib/python3.8/site-packages/aiorpcx/curio.py", line 150, in _add_task raise RuntimeError('task group terminated') RuntimeError: task group terminated ``` I believe the "suppress spurious cancellations" block was added as SilentTaskGroup raised CancelledError instead of RuntimeError for this scenario.
This commit is contained in:
@@ -489,7 +489,7 @@ class Interface(Logger):
|
||||
self.logger.warning(f"disconnecting due to {repr(e)}")
|
||||
self.logger.debug(f"(disconnect) trace for {repr(e)}", exc_info=True)
|
||||
finally:
|
||||
self.got_disconnected.set()
|
||||
self.got_disconnected.set() # set this ASAP, ideally before any awaits
|
||||
await self.network.connection_down(self)
|
||||
# if was not 'ready' yet, schedule waiting coroutines:
|
||||
self.ready.cancel()
|
||||
@@ -534,6 +534,9 @@ class Interface(Logger):
|
||||
|
||||
self.ready.set_result(1)
|
||||
|
||||
def is_connected_and_ready(self) -> bool:
|
||||
return self.ready.done() and not self.got_disconnected.is_set()
|
||||
|
||||
async def _save_certificate(self) -> None:
|
||||
if not os.path.exists(self.cert_path):
|
||||
# we may need to retry this a few times, in case the handshake hasn't completed
|
||||
|
||||
@@ -437,7 +437,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
|
||||
|
||||
def is_connected(self):
|
||||
interface = self.interface
|
||||
return interface is not None and interface.ready.done()
|
||||
return interface is not None and interface.is_connected_and_ready()
|
||||
|
||||
def is_connecting(self):
|
||||
return self.connection_status == 'connecting'
|
||||
@@ -707,8 +707,9 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
|
||||
|
||||
i = self.interfaces[server]
|
||||
if old_interface != i:
|
||||
if not i.is_connected_and_ready():
|
||||
return
|
||||
self.logger.info(f"switching to {server}")
|
||||
assert i.ready.done(), "interface we are switching to is not ready yet"
|
||||
blockchain_updated = i.blockchain != self.blockchain()
|
||||
self.interface = i
|
||||
await i.taskgroup.spawn(self._request_server_info(i))
|
||||
@@ -1195,7 +1196,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
|
||||
await self.taskgroup.spawn(self._run_new_interface(self.default_server))
|
||||
|
||||
async def main():
|
||||
self.logger.info("starting taskgroup.")
|
||||
self.logger.info(f"starting taskgroup ({hex(id(taskgroup))}).")
|
||||
try:
|
||||
# note: if a task finishes with CancelledError, that
|
||||
# will NOT raise, and the group will keep the other tasks running
|
||||
@@ -1203,9 +1204,9 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
|
||||
await group.spawn(self._maintain_sessions())
|
||||
[await group.spawn(job) for job in self._jobs]
|
||||
except Exception as e:
|
||||
self.logger.exception("taskgroup died.")
|
||||
self.logger.exception(f"taskgroup died ({hex(id(taskgroup))}).")
|
||||
finally:
|
||||
self.logger.info("taskgroup stopped.")
|
||||
self.logger.info(f"taskgroup stopped ({hex(id(taskgroup))}).")
|
||||
asyncio.run_coroutine_threadsafe(main(), self.asyncio_loop)
|
||||
|
||||
util.trigger_callback('network_updated')
|
||||
@@ -1238,13 +1239,13 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
|
||||
util.trigger_callback('network_updated')
|
||||
|
||||
async def _ensure_there_is_a_main_interface(self):
|
||||
if self.is_connected():
|
||||
if self.interface:
|
||||
return
|
||||
# if auto_connect is set, try a different server
|
||||
if self.auto_connect and not self.is_connecting():
|
||||
await self._switch_to_random_interface()
|
||||
# if auto_connect is not set, or still no main interface, retry current
|
||||
if not self.is_connected() and not self.is_connecting():
|
||||
if not self.interface and not self.is_connecting():
|
||||
if self._can_retry_addr(self.default_server, urgent=True):
|
||||
await self.switch_to_interface(self.default_server)
|
||||
|
||||
@@ -1271,15 +1272,9 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
|
||||
await self.interface.taskgroup.spawn(self._request_fee_estimates, self.interface)
|
||||
|
||||
while True:
|
||||
try:
|
||||
await maybe_start_new_interfaces()
|
||||
await maintain_healthy_spread_of_connected_servers()
|
||||
await maintain_main_interface()
|
||||
except asyncio.CancelledError:
|
||||
# suppress spurious cancellations
|
||||
group = self.taskgroup
|
||||
if not group or group.joined:
|
||||
raise
|
||||
await maybe_start_new_interfaces()
|
||||
await maintain_healthy_spread_of_connected_servers()
|
||||
await maintain_main_interface()
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
@classmethod
|
||||
|
||||
Reference in New Issue
Block a user