scripts: fix "cannot schedule new futures after interpreter shutdown"
- looks like around python3.9, they changed it so that
if we don't block on the main thread, it starts to shut things down
- polling thread.join() makes Ctrl+C work. kind of.
```
$ ./electrum/scripts/txradar.py 6bde84a981e72573666fcc51c81ec3f8f4a813709bf16451dce3f106a114d392
Exception in run: RuntimeError('cannot schedule new futures after interpreter shutdown')
Traceback (most recent call last):
File "/home/user/wspace/electrum/electrum/util.py", line 1218, in wrapper
return await func(*args, **kwargs)
File "/home/user/wspace/electrum/electrum/interface.py", line 649, in wrapper_func
return await func(self, *args, **kwargs)
File "/home/user/wspace/electrum/electrum/interface.py", line 675, in run
await self.open_session(ssl_context=ssl_context)
File "/home/user/wspace/electrum/electrum/interface.py", line 872, in open_session
async with _RSClient(
File "/home/user/.local/lib/python3.10/site-packages/aiorpcx/rawsocket.py", line 167, in __aenter__
_transport, protocol = await self.create_connection()
File "/home/user/wspace/electrum/electrum/interface.py", line 285, in create_connection
return await super().create_connection()
File "/home/user/.local/lib/python3.10/site-packages/aiorpcx/rawsocket.py", line 163, in create_connection
return await connector.create_connection(
File "/usr/lib/python3.10/asyncio/base_events.py", line 1036, in create_connection
infos = await self._ensure_resolved(
File "/usr/lib/python3.10/asyncio/base_events.py", line 1418, in _ensure_resolved
return await loop.getaddrinfo(host, port, family=family, type=type,
File "/usr/lib/python3.10/asyncio/base_events.py", line 863, in getaddrinfo
return await self.run_in_executor(
File "/usr/lib/python3.10/asyncio/base_events.py", line 821, in run_in_executor
executor.submit(func, *args), loop=self)
File "/usr/lib/python3.10/concurrent/futures/thread.py", line 169, in submit
raise RuntimeError('cannot schedule new futures after '
RuntimeError: cannot schedule new futures after interpreter shutdown
```
This commit is contained in:
@@ -38,3 +38,5 @@ async def f():
|
|||||||
stopping_fut.set_result(1)
|
stopping_fut.set_result(1)
|
||||||
|
|
||||||
asyncio.run_coroutine_threadsafe(f(), loop)
|
asyncio.run_coroutine_threadsafe(f(), loop)
|
||||||
|
while loop_thread.is_alive():
|
||||||
|
loop_thread.join(1)
|
||||||
|
|||||||
@@ -36,3 +36,5 @@ async def f():
|
|||||||
|
|
||||||
# 2. send the subscription
|
# 2. send the subscription
|
||||||
asyncio.run_coroutine_threadsafe(f(), loop)
|
asyncio.run_coroutine_threadsafe(f(), loop)
|
||||||
|
while loop_thread.is_alive():
|
||||||
|
loop_thread.join(1)
|
||||||
|
|||||||
@@ -28,3 +28,5 @@ async def f():
|
|||||||
stopping_fut.set_result(1)
|
stopping_fut.set_result(1)
|
||||||
|
|
||||||
asyncio.run_coroutine_threadsafe(f(), loop)
|
asyncio.run_coroutine_threadsafe(f(), loop)
|
||||||
|
while loop_thread.is_alive():
|
||||||
|
loop_thread.join(1)
|
||||||
|
|||||||
@@ -31,3 +31,5 @@ async def f():
|
|||||||
stopping_fut.set_result(1)
|
stopping_fut.set_result(1)
|
||||||
|
|
||||||
asyncio.run_coroutine_threadsafe(f(), loop)
|
asyncio.run_coroutine_threadsafe(f(), loop)
|
||||||
|
while loop_thread.is_alive():
|
||||||
|
loop_thread.join(1)
|
||||||
|
|||||||
@@ -8,6 +8,9 @@ https://github.com/lightningnetwork/lightning-rfc/blob/master/09-features.md
|
|||||||
import asyncio
|
import asyncio
|
||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from aiorpcx import NetAddress
|
||||||
|
|
||||||
from electrum.logging import get_logger, configure_logging
|
from electrum.logging import get_logger, configure_logging
|
||||||
from electrum.simple_config import SimpleConfig
|
from electrum.simple_config import SimpleConfig
|
||||||
@@ -52,7 +55,6 @@ if not os.path.exists(wallet_path):
|
|||||||
|
|
||||||
# open wallet
|
# open wallet
|
||||||
wallet = daemon.load_wallet(wallet_path, password=None, upgrade=True)
|
wallet = daemon.load_wallet(wallet_path, password=None, upgrade=True)
|
||||||
wallet.start_network(network)
|
|
||||||
|
|
||||||
|
|
||||||
async def worker(work_queue: asyncio.Queue, results_queue: asyncio.Queue, flag):
|
async def worker(work_queue: asyncio.Queue, results_queue: asyncio.Queue, flag):
|
||||||
@@ -67,19 +69,15 @@ async def worker(work_queue: asyncio.Queue, results_queue: asyncio.Queue, flag):
|
|||||||
work = await work_queue.get()
|
work = await work_queue.get()
|
||||||
|
|
||||||
# only check non-onion addresses
|
# only check non-onion addresses
|
||||||
addr = None
|
addr = None # type: Optional[NetAddress]
|
||||||
for a in work['addrs']:
|
for a in work['addrs']: # type: NetAddress
|
||||||
if "onion" not in a[0]:
|
if not str(a.host).endswith(".onion"):
|
||||||
addr = a
|
addr = a
|
||||||
if not addr:
|
if not addr:
|
||||||
await results_queue.put(None)
|
await results_queue.put(None)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# handle ipv4/ipv6
|
connect_str = f"{work['pk'].hex()}@{addr}"
|
||||||
if ':' in addr[0]:
|
|
||||||
connect_str = f"{work['pk'].hex()}@[{addr.host}]:{addr.port}"
|
|
||||||
else:
|
|
||||||
connect_str = f"{work['pk'].hex()}@{addr.host}:{addr.port}"
|
|
||||||
|
|
||||||
print(f"worker connecting to {connect_str}")
|
print(f"worker connecting to {connect_str}")
|
||||||
try:
|
try:
|
||||||
@@ -177,3 +175,5 @@ async def node_flag_stats(opt_flag: LnFeatures, presync: False):
|
|||||||
|
|
||||||
asyncio.run_coroutine_threadsafe(
|
asyncio.run_coroutine_threadsafe(
|
||||||
node_flag_stats(FLAG, presync=PRESYNC), loop)
|
node_flag_stats(FLAG, presync=PRESYNC), loop)
|
||||||
|
while loop_thread.is_alive():
|
||||||
|
loop_thread.join(1)
|
||||||
|
|||||||
@@ -27,3 +27,5 @@ async def f():
|
|||||||
stopping_fut.set_result(1)
|
stopping_fut.set_result(1)
|
||||||
|
|
||||||
asyncio.run_coroutine_threadsafe(f(), loop)
|
asyncio.run_coroutine_threadsafe(f(), loop)
|
||||||
|
while loop_thread.is_alive():
|
||||||
|
loop_thread.join(1)
|
||||||
|
|||||||
@@ -25,3 +25,5 @@ async def f():
|
|||||||
stopping_fut.set_result(1)
|
stopping_fut.set_result(1)
|
||||||
|
|
||||||
asyncio.run_coroutine_threadsafe(f(), loop)
|
asyncio.run_coroutine_threadsafe(f(), loop)
|
||||||
|
while loop_thread.is_alive():
|
||||||
|
loop_thread.join(1)
|
||||||
|
|||||||
@@ -34,3 +34,5 @@ async def f():
|
|||||||
stopping_fut.set_result(1)
|
stopping_fut.set_result(1)
|
||||||
|
|
||||||
asyncio.run_coroutine_threadsafe(f(), loop)
|
asyncio.run_coroutine_threadsafe(f(), loop)
|
||||||
|
while loop_thread.is_alive():
|
||||||
|
loop_thread.join(1)
|
||||||
|
|||||||
@@ -36,3 +36,5 @@ async def f():
|
|||||||
stopping_fut.set_result(1)
|
stopping_fut.set_result(1)
|
||||||
|
|
||||||
asyncio.run_coroutine_threadsafe(f(), loop)
|
asyncio.run_coroutine_threadsafe(f(), loop)
|
||||||
|
while loop_thread.is_alive():
|
||||||
|
loop_thread.join(1)
|
||||||
|
|||||||
@@ -61,3 +61,5 @@ async def f():
|
|||||||
stopping_fut.set_result(1)
|
stopping_fut.set_result(1)
|
||||||
|
|
||||||
asyncio.run_coroutine_threadsafe(f(), loop)
|
asyncio.run_coroutine_threadsafe(f(), loop)
|
||||||
|
while loop_thread.is_alive():
|
||||||
|
loop_thread.join(1)
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ except Exception:
|
|||||||
config = SimpleConfig()
|
config = SimpleConfig()
|
||||||
|
|
||||||
# start network
|
# start network
|
||||||
loop = create_and_start_event_loop()[0]
|
loop, stopping_fut, loop_thread = create_and_start_event_loop()
|
||||||
network = Network(config)
|
network = Network(config)
|
||||||
network.start()
|
network.start()
|
||||||
|
|
||||||
@@ -45,3 +45,5 @@ class Notifier(SynchronizerBase):
|
|||||||
|
|
||||||
notifier = Notifier(network)
|
notifier = Notifier(network)
|
||||||
asyncio.run_coroutine_threadsafe(notifier.watch_queue.put(addr), loop)
|
asyncio.run_coroutine_threadsafe(notifier.watch_queue.put(addr), loop)
|
||||||
|
while loop_thread.is_alive():
|
||||||
|
loop_thread.join(1)
|
||||||
|
|||||||
Reference in New Issue
Block a user