wallet: recreate Synchronizer and Verifier when switching servers
not that nice but solves races
This commit is contained in:
@@ -143,6 +143,8 @@ class AddressSynchronizer(PrintError):
|
|||||||
interface = self.network.interface
|
interface = self.network.interface
|
||||||
if interface is None:
|
if interface is None:
|
||||||
return # we should get called again soon
|
return # we should get called again soon
|
||||||
|
self.verifier = SPV(self.network, self)
|
||||||
|
self.synchronizer = Synchronizer(self)
|
||||||
await interface.group.spawn(self.verifier.main(interface))
|
await interface.group.spawn(self.verifier.main(interface))
|
||||||
await interface.group.spawn(self.synchronizer.send_subscriptions(interface))
|
await interface.group.spawn(self.synchronizer.send_subscriptions(interface))
|
||||||
await interface.group.spawn(self.synchronizer.handle_status(interface))
|
await interface.group.spawn(self.synchronizer.handle_status(interface))
|
||||||
@@ -151,8 +153,6 @@ class AddressSynchronizer(PrintError):
|
|||||||
def start_threads(self, network):
|
def start_threads(self, network):
|
||||||
self.network = network
|
self.network = network
|
||||||
if self.network is not None:
|
if self.network is not None:
|
||||||
self.verifier = SPV(self.network, self)
|
|
||||||
self.synchronizer = Synchronizer(self)
|
|
||||||
self.network.register_callback(self.on_default_server_changed, ['default_server_changed'])
|
self.network.register_callback(self.on_default_server_changed, ['default_server_changed'])
|
||||||
self.network.trigger_callback('default_server_changed')
|
self.network.trigger_callback('default_server_changed')
|
||||||
else:
|
else:
|
||||||
|
|||||||
@@ -72,6 +72,14 @@ class NotificationSession(ClientSession):
|
|||||||
class GracefulDisconnect(AIOSafeSilentException): pass
|
class GracefulDisconnect(AIOSafeSilentException): pass
|
||||||
|
|
||||||
|
|
||||||
|
class CustomTaskGroup(TaskGroup):
|
||||||
|
|
||||||
|
def spawn(self, *args, **kwargs):
|
||||||
|
if self._closed:
|
||||||
|
raise asyncio.CancelledError()
|
||||||
|
return super().spawn(*args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
class Interface(PrintError):
|
class Interface(PrintError):
|
||||||
|
|
||||||
def __init__(self, network, server, config_path, proxy):
|
def __init__(self, network, server, config_path, proxy):
|
||||||
@@ -89,7 +97,7 @@ class Interface(PrintError):
|
|||||||
|
|
||||||
# TODO combine?
|
# TODO combine?
|
||||||
self.fut = asyncio.get_event_loop().create_task(self.run())
|
self.fut = asyncio.get_event_loop().create_task(self.run())
|
||||||
self.group = TaskGroup()
|
self.group = CustomTaskGroup()
|
||||||
|
|
||||||
if proxy:
|
if proxy:
|
||||||
username, pw = proxy.get('user'), proxy.get('password')
|
username, pw = proxy.get('user'), proxy.get('password')
|
||||||
@@ -255,7 +263,7 @@ class Interface(PrintError):
|
|||||||
self.tip = new_header['block_height']
|
self.tip = new_header['block_height']
|
||||||
await copy_header_queue.put(new_header)
|
await copy_header_queue.put(new_header)
|
||||||
except concurrent.futures.TimeoutError:
|
except concurrent.futures.TimeoutError:
|
||||||
await asyncio.wait_for(self.session.send_request('server.ping'), 5)
|
await asyncio.wait_for(self.session.send_request('server.ping'), 10)
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
self.fut.cancel()
|
self.fut.cancel()
|
||||||
|
|||||||
@@ -242,9 +242,6 @@ class Network(PrintError):
|
|||||||
self.start_network(deserialize_server(self.default_server)[2],
|
self.start_network(deserialize_server(self.default_server)[2],
|
||||||
deserialize_proxy(self.config.get('proxy')))
|
deserialize_proxy(self.config.get('proxy')))
|
||||||
self.asyncio_loop = asyncio.get_event_loop()
|
self.asyncio_loop = asyncio.get_event_loop()
|
||||||
self.server_info_job = asyncio.Future()
|
|
||||||
# just to not trigger a warning from switch_to_interface the first time we change default_server
|
|
||||||
self.server_info_job.set_result(1)
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_instance():
|
def get_instance():
|
||||||
@@ -330,7 +327,6 @@ class Network(PrintError):
|
|||||||
def is_connecting(self):
|
def is_connecting(self):
|
||||||
return self.connection_status == 'connecting'
|
return self.connection_status == 'connecting'
|
||||||
|
|
||||||
@util.aiosafe
|
|
||||||
async def request_server_info(self, interface):
|
async def request_server_info(self, interface):
|
||||||
await interface.ready
|
await interface.ready
|
||||||
session = interface.session
|
session = interface.session
|
||||||
@@ -560,7 +556,6 @@ class Network(PrintError):
|
|||||||
being opened, start a thread to connect. The actual switch will
|
being opened, start a thread to connect. The actual switch will
|
||||||
happen on receipt of the connection notification. Do nothing
|
happen on receipt of the connection notification. Do nothing
|
||||||
if server already is our interface.'''
|
if server already is our interface.'''
|
||||||
old_default_server = self.default_server
|
|
||||||
self.default_server = server
|
self.default_server = server
|
||||||
if server not in self.interfaces:
|
if server not in self.interfaces:
|
||||||
self.interface = None
|
self.interface = None
|
||||||
@@ -570,14 +565,18 @@ class Network(PrintError):
|
|||||||
i = self.interfaces[server]
|
i = self.interfaces[server]
|
||||||
if self.interface != i:
|
if self.interface != i:
|
||||||
self.print_error("switching to", server)
|
self.print_error("switching to", server)
|
||||||
# stop any current interface in order to terminate subscriptions
|
if self.interface is not None:
|
||||||
# fixme: we don't want to close headers sub
|
# Stop any current interface in order to terminate subscriptions,
|
||||||
#self.close_interface(self.interface)
|
# and to cancel tasks in interface.group.
|
||||||
|
# However, for headers sub, give preference to this interface
|
||||||
|
# over unknown ones, i.e. start it again right away.
|
||||||
|
self.close_interface(self.interface)
|
||||||
|
if len(self.interfaces) <= self.num_server:
|
||||||
|
self.start_interface(self.interface.server)
|
||||||
|
|
||||||
self.interface = i
|
self.interface = i
|
||||||
if not self.server_info_job.done():
|
asyncio.get_event_loop().create_task(
|
||||||
self.print_error('cancelled previous request_server_info job, was it too slow? server was:', old_default_server)
|
i.group.spawn(self.request_server_info(i)))
|
||||||
self.server_info_job.cancel()
|
|
||||||
self.server_info_job = asyncio.get_event_loop().create_task(self.request_server_info(i))
|
|
||||||
self.trigger_callback('default_server_changed')
|
self.trigger_callback('default_server_changed')
|
||||||
self.set_status('connected')
|
self.set_status('connected')
|
||||||
self.notify('updated')
|
self.notify('updated')
|
||||||
@@ -876,12 +875,11 @@ class Network(PrintError):
|
|||||||
changed = True
|
changed = True
|
||||||
else:
|
else:
|
||||||
if self.config.is_fee_estimates_update_required():
|
if self.config.is_fee_estimates_update_required():
|
||||||
asyncio.get_event_loop().create_task(self.attempt_fee_estimate_update())
|
await self.interface.group.spawn(self.attempt_fee_estimate_update())
|
||||||
|
|
||||||
if changed:
|
if changed:
|
||||||
self.notify('updated')
|
self.notify('updated')
|
||||||
await asyncio.sleep(1)
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
@util.aiosafe
|
|
||||||
async def attempt_fee_estimate_update(self):
|
async def attempt_fee_estimate_update(self):
|
||||||
await asyncio.wait_for(self.request_fee_estimates(self.interface), 5)
|
await asyncio.wait_for(self.request_fee_estimates(self.interface), 5)
|
||||||
|
|||||||
@@ -151,13 +151,13 @@ class Synchronizer(PrintError):
|
|||||||
async def send_subscriptions(self, interface):
|
async def send_subscriptions(self, interface):
|
||||||
while True:
|
while True:
|
||||||
addr = await self.add_queue.get()
|
addr = await self.add_queue.get()
|
||||||
await interface.group.spawn(self.subscribe_to_address(addr))
|
await interface.group.spawn(self.subscribe_to_address, addr)
|
||||||
|
|
||||||
async def handle_status(self, interface):
|
async def handle_status(self, interface):
|
||||||
while True:
|
while True:
|
||||||
h, status = await self.status_queue.get()
|
h, status = await self.status_queue.get()
|
||||||
addr = self.scripthash_to_address[h]
|
addr = self.scripthash_to_address[h]
|
||||||
await interface.group.spawn(self.on_address_status(addr, status))
|
await interface.group.spawn(self.on_address_status, addr, status)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def session(self):
|
def session(self):
|
||||||
|
|||||||
Reference in New Issue
Block a user