Revert "wallet.is_up_to_date: fix flickering during sync due to race"
This reverts commit dc6c481406 as it introduced its own issue:
while add_address was running on one thread, synchronizer._reset could be running on another,
and by the time the "enqueue" coro would run, it would use a new add_queue and
addr would not be in requested_addrs anymore...
```
I/w | wallet.Standard_Wallet.[test_segwit_2] | starting taskgroup.
I | lnworker.LNWallet.[test_segwit_2] | starting taskgroup.
E/i | interface.[testnet.qtornado.com:51002] | Exception in run: KeyError('tb1q3wmgf8n5eettnj50pzgnfrrpdpjmwn37x7nzsc5780kk4je9v4hspym8mu')
Traceback (most recent call last):
File ".../electrum/electrum/util.py", line 1243, in wrapper
return await func(*args, **kwargs)
File ".../electrum/electrum/interface.py", line 506, in wrapper_func
return await func(self, *args, **kwargs)
File ".../electrum/electrum/interface.py", line 529, in run
await self.open_session(ssl_context)
File ".../electrum/electrum/interface.py", line 679, in open_session
async with self.taskgroup as group:
File ".../aiorpcX/aiorpcx/curio.py", line 304, in __aexit__
await self.join()
File ".../electrum/electrum/util.py", line 1339, in join
task.result()
File ".../electrum/electrum/synchronizer.py", line 80, in _run_tasks
async with taskgroup as group:
File ".../aiorpcX/aiorpcx/curio.py", line 304, in __aexit__
await self.join()
File ".../electrum/electrum/util.py", line 1339, in join
task.result()
File ".../electrum/electrum/synchronizer.py", line 127, in subscribe_to_address
self.requested_addrs.remove(addr)
KeyError: 'tb1q3wmgf8n5eettnj50pzgnfrrpdpjmwn37x7nzsc5780kk4je9v4hspym8mu'
```
This commit is contained in:
@@ -212,7 +212,7 @@ class AddressSynchronizer(Logger, EventListener):
|
||||
self.db.history[address] = []
|
||||
self.set_up_to_date(False)
|
||||
if self.synchronizer:
|
||||
self.synchronizer.add_address(address)
|
||||
self.synchronizer.add(address)
|
||||
|
||||
def get_conflicting_transactions(self, tx_hash, tx: Transaction, include_self=False):
|
||||
"""Returns a set of transaction hashes from the wallet history that are
|
||||
|
||||
@@ -32,12 +32,12 @@ class Notifier(SynchronizerBase):
|
||||
async def main(self):
|
||||
# resend existing subscriptions if we were restarted
|
||||
for addr in self.watched_addresses:
|
||||
self.add_address(addr)
|
||||
await self._add_address(addr)
|
||||
# main loop
|
||||
while True:
|
||||
addr = await self.watch_queue.get()
|
||||
self.watched_addresses.add(addr)
|
||||
self.add_address(addr)
|
||||
await self._add_address(addr)
|
||||
|
||||
async def _on_address_status(self, addr, status):
|
||||
print_msg(f"addr {addr}, status {status}")
|
||||
|
||||
@@ -83,17 +83,16 @@ class SynchronizerBase(NetworkJobOnDefaultServer):
|
||||
# we are being cancelled now
|
||||
self.session.unsubscribe(self.status_queue)
|
||||
|
||||
def add_address(self, addr: str) -> None:
|
||||
"""Add an address to subscribe to.
|
||||
Thread-safe. When the method returns, the Synchronizer (e.g. is_up_to_date) will know about the address.
|
||||
"""
|
||||
def add(self, addr):
|
||||
# FIXME is_up_to_date does not take addr into account until _add_address executes
|
||||
asyncio.run_coroutine_threadsafe(self._add_address(addr), self.asyncio_loop)
|
||||
|
||||
async def _add_address(self, addr: str):
|
||||
# note: this method is async as add_queue.put_nowait is not thread-safe.
|
||||
if not is_address(addr): raise ValueError(f"invalid bitcoin address {addr}")
|
||||
if addr in self.requested_addrs: return
|
||||
self.requested_addrs.add(addr)
|
||||
async def enqueue():
|
||||
# note: this method is async as add_queue.put_nowait is not thread-safe.
|
||||
self.add_queue.put_nowait(addr)
|
||||
asyncio.run_coroutine_threadsafe(enqueue(), self.asyncio_loop)
|
||||
self.add_queue.put_nowait(addr)
|
||||
|
||||
async def _on_address_status(self, addr, status):
|
||||
"""Handle the change of the status of an address."""
|
||||
@@ -247,7 +246,7 @@ class Synchronizer(SynchronizerBase):
|
||||
await self._request_missing_txs(history, allow_server_not_finding_tx=True)
|
||||
# add addresses to bootstrap
|
||||
for addr in random_shuffled_copy(self.adb.get_addresses()):
|
||||
self.add_address(addr)
|
||||
await self._add_address(addr)
|
||||
# main loop
|
||||
while True:
|
||||
await asyncio.sleep(0.1)
|
||||
@@ -273,12 +272,12 @@ class Notifier(SynchronizerBase):
|
||||
async def main(self):
|
||||
# resend existing subscriptions if we were restarted
|
||||
for addr in self.watched_addresses:
|
||||
self.add_address(addr)
|
||||
await self._add_address(addr)
|
||||
# main loop
|
||||
while True:
|
||||
addr, url = await self._start_watching_queue.get()
|
||||
self.watched_addresses[addr].append(url)
|
||||
self.add_address(addr)
|
||||
await self._add_address(addr)
|
||||
|
||||
async def start_watching_addr(self, addr: str, url: str):
|
||||
await self._start_watching_queue.put((addr, url))
|
||||
|
||||
Reference in New Issue
Block a user