mv NetworkJobOnDefaultServer to util
break ref cycles
This commit is contained in:
@@ -883,54 +883,3 @@ class Network(PrintError):
|
||||
await self.interface.group.spawn(self._request_fee_estimates, self.interface)
|
||||
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
|
||||
class NetworkJobOnDefaultServer(PrintError):
|
||||
"""An abstract base class for a job that runs on the main network
|
||||
interface. Every time the main interface changes, the job is
|
||||
restarted, and some of its internals are reset.
|
||||
"""
|
||||
def __init__(self, network: Network):
|
||||
asyncio.set_event_loop(network.asyncio_loop)
|
||||
self.network = network
|
||||
self.interface = None # type: Interface
|
||||
self._restart_lock = asyncio.Lock()
|
||||
self._reset()
|
||||
asyncio.run_coroutine_threadsafe(self._restart(), network.asyncio_loop)
|
||||
network.register_callback(self._restart, ['default_server_changed'])
|
||||
|
||||
def _reset(self):
|
||||
"""Initialise fields. Called every time the underlying
|
||||
server connection changes.
|
||||
"""
|
||||
self.group = SilentTaskGroup()
|
||||
|
||||
async def _start(self, interface):
|
||||
self.interface = interface
|
||||
await interface.group.spawn(self._start_tasks)
|
||||
|
||||
async def _start_tasks(self):
|
||||
"""Start tasks in self.group. Called every time the underlying
|
||||
server connection changes.
|
||||
"""
|
||||
raise NotImplementedError() # implemented by subclasses
|
||||
|
||||
async def stop(self):
|
||||
await self.group.cancel_remaining()
|
||||
|
||||
@aiosafe
|
||||
async def _restart(self, *args):
|
||||
interface = self.network.interface
|
||||
if interface is None:
|
||||
return # we should get called again soon
|
||||
|
||||
async with self._restart_lock:
|
||||
await self.stop()
|
||||
self._reset()
|
||||
await self._start(interface)
|
||||
|
||||
@property
|
||||
def session(self):
|
||||
s = self.interface.session
|
||||
assert s is not None
|
||||
return s
|
||||
|
||||
@@ -30,9 +30,8 @@ from collections import defaultdict
|
||||
from aiorpcx import TaskGroup, run_in_thread
|
||||
|
||||
from .transaction import Transaction
|
||||
from .util import bh2u, make_aiohttp_session
|
||||
from .util import bh2u, make_aiohttp_session, NetworkJobOnDefaultServer
|
||||
from .bitcoin import address_to_scripthash
|
||||
from .network import NetworkJobOnDefaultServer
|
||||
|
||||
|
||||
def history_status(h):
|
||||
|
||||
@@ -906,3 +906,54 @@ class SilentTaskGroup(TaskGroup):
|
||||
if self._closed:
|
||||
raise asyncio.CancelledError()
|
||||
return super().spawn(*args, **kwargs)
|
||||
|
||||
|
||||
class NetworkJobOnDefaultServer(PrintError):
|
||||
"""An abstract base class for a job that runs on the main network
|
||||
interface. Every time the main interface changes, the job is
|
||||
restarted, and some of its internals are reset.
|
||||
"""
|
||||
def __init__(self, network):
|
||||
asyncio.set_event_loop(network.asyncio_loop)
|
||||
self.network = network
|
||||
self.interface = None
|
||||
self._restart_lock = asyncio.Lock()
|
||||
self._reset()
|
||||
asyncio.run_coroutine_threadsafe(self._restart(), network.asyncio_loop)
|
||||
network.register_callback(self._restart, ['default_server_changed'])
|
||||
|
||||
def _reset(self):
|
||||
"""Initialise fields. Called every time the underlying
|
||||
server connection changes.
|
||||
"""
|
||||
self.group = SilentTaskGroup()
|
||||
|
||||
async def _start(self, interface):
|
||||
self.interface = interface
|
||||
await interface.group.spawn(self._start_tasks)
|
||||
|
||||
async def _start_tasks(self):
|
||||
"""Start tasks in self.group. Called every time the underlying
|
||||
server connection changes.
|
||||
"""
|
||||
raise NotImplementedError() # implemented by subclasses
|
||||
|
||||
async def stop(self):
|
||||
await self.group.cancel_remaining()
|
||||
|
||||
@aiosafe
|
||||
async def _restart(self, *args):
|
||||
interface = self.network.interface
|
||||
if interface is None:
|
||||
return # we should get called again soon
|
||||
|
||||
async with self._restart_lock:
|
||||
await self.stop()
|
||||
self._reset()
|
||||
await self._start(interface)
|
||||
|
||||
@property
|
||||
def session(self):
|
||||
s = self.interface.session
|
||||
assert s is not None
|
||||
return s
|
||||
|
||||
@@ -26,13 +26,12 @@ from typing import Sequence, Optional
|
||||
|
||||
import aiorpcx
|
||||
|
||||
from .util import bh2u, VerifiedTxInfo
|
||||
from .util import bh2u, VerifiedTxInfo, NetworkJobOnDefaultServer
|
||||
from .bitcoin import Hash, hash_decode, hash_encode
|
||||
from .transaction import Transaction
|
||||
from .blockchain import hash_header
|
||||
from .interface import GracefulDisconnect
|
||||
from . import constants
|
||||
from .network import NetworkJobOnDefaultServer
|
||||
|
||||
|
||||
class MerkleVerificationFailure(Exception): pass
|
||||
|
||||
Reference in New Issue
Block a user