This reverts commit dc6c481406 as it introduced its own issue:
while add_address was running on one thread, synchronizer._reset could be running on another,
and by the time the "enqueue" coro would run, it would use a new add_queue and
addr would not be in requested_addrs anymore...
```
I/w | wallet.Standard_Wallet.[test_segwit_2] | starting taskgroup.
I | lnworker.LNWallet.[test_segwit_2] | starting taskgroup.
E/i | interface.[testnet.qtornado.com:51002] | Exception in run: KeyError('tb1q3wmgf8n5eettnj50pzgnfrrpdpjmwn37x7nzsc5780kk4je9v4hspym8mu')
Traceback (most recent call last):
File ".../electrum/electrum/util.py", line 1243, in wrapper
return await func(*args, **kwargs)
File ".../electrum/electrum/interface.py", line 506, in wrapper_func
return await func(self, *args, **kwargs)
File ".../electrum/electrum/interface.py", line 529, in run
await self.open_session(ssl_context)
File ".../electrum/electrum/interface.py", line 679, in open_session
async with self.taskgroup as group:
File ".../aiorpcX/aiorpcx/curio.py", line 304, in __aexit__
await self.join()
File ".../electrum/electrum/util.py", line 1339, in join
task.result()
File ".../electrum/electrum/synchronizer.py", line 80, in _run_tasks
async with taskgroup as group:
File ".../aiorpcX/aiorpcx/curio.py", line 304, in __aexit__
await self.join()
File ".../electrum/electrum/util.py", line 1339, in join
task.result()
File ".../electrum/electrum/synchronizer.py", line 127, in subscribe_to_address
self.requested_addrs.remove(addr)
KeyError: 'tb1q3wmgf8n5eettnj50pzgnfrrpdpjmwn37x7nzsc5780kk4je9v4hspym8mu'
```
48 lines
1.2 KiB
Python
Executable File
48 lines
1.2 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import sys
|
|
import asyncio
|
|
|
|
from electrum.network import Network
|
|
from electrum.util import print_msg, create_and_start_event_loop
|
|
from electrum.synchronizer import SynchronizerBase
|
|
from electrum.simple_config import SimpleConfig
|
|
|
|
|
|
try:
|
|
addr = sys.argv[1]
|
|
except Exception:
|
|
print("usage: watch_address <bitcoin_address>")
|
|
sys.exit(1)
|
|
|
|
config = SimpleConfig()
|
|
|
|
# start network
|
|
loop = create_and_start_event_loop()[0]
|
|
network = Network(config)
|
|
network.start()
|
|
|
|
|
|
class Notifier(SynchronizerBase):
|
|
def __init__(self, network):
|
|
SynchronizerBase.__init__(self, network)
|
|
self.watched_addresses = set()
|
|
self.watch_queue = asyncio.Queue()
|
|
|
|
async def main(self):
|
|
# resend existing subscriptions if we were restarted
|
|
for addr in self.watched_addresses:
|
|
await self._add_address(addr)
|
|
# main loop
|
|
while True:
|
|
addr = await self.watch_queue.get()
|
|
self.watched_addresses.add(addr)
|
|
await self._add_address(addr)
|
|
|
|
async def _on_address_status(self, addr, status):
|
|
print_msg(f"addr {addr}, status {status}")
|
|
|
|
|
|
notifier = Notifier(network)
|
|
asyncio.run_coroutine_threadsafe(notifier.watch_queue.put(addr), loop)
|