network: auto-switch servers to preferred fork (or longest chain)
If auto_connect is enabled, allow jumping between forks too. (Previously auto_connect was only switching servers on a given fork, not across forks) If there is a preferred fork set, jump to that (and stay); if there isn't, always jump to the longest fork.
This commit is contained in:
@@ -32,7 +32,7 @@ import json
|
||||
import sys
|
||||
import ipaddress
|
||||
import asyncio
|
||||
from typing import NamedTuple, Optional, Sequence, List
|
||||
from typing import NamedTuple, Optional, Sequence, List, Dict
|
||||
import traceback
|
||||
|
||||
import dns
|
||||
@@ -172,10 +172,9 @@ class Network(PrintError):
|
||||
self.config = SimpleConfig(config) if isinstance(config, dict) else config
|
||||
self.num_server = 10 if not self.config.get('oneserver') else 0
|
||||
blockchain.blockchains = blockchain.read_blockchains(self.config)
|
||||
self.print_error("blockchains", list(blockchain.blockchains.keys()))
|
||||
self.blockchain_index = config.get('blockchain_index', 0)
|
||||
if self.blockchain_index not in blockchain.blockchains.keys():
|
||||
self.blockchain_index = 0
|
||||
self.print_error("blockchains", list(blockchain.blockchains))
|
||||
self._blockchain_preferred_block = self.config.get('blockchain_preferred_block', None) # type: Optional[Dict]
|
||||
self._blockchain_index = 0
|
||||
# Server for addresses and transactions
|
||||
self.default_server = self.config.get('server', None)
|
||||
# Sanitize default server
|
||||
@@ -213,11 +212,10 @@ class Network(PrintError):
|
||||
# retry times
|
||||
self.server_retry_time = time.time()
|
||||
self.nodes_retry_time = time.time()
|
||||
# kick off the network. interface is the main server we are currently
|
||||
# communicating with. interfaces is the set of servers we are connecting
|
||||
# to or have an ongoing connection with
|
||||
# the main server we are currently communicating with
|
||||
self.interface = None # type: Interface
|
||||
self.interfaces = {}
|
||||
# set of servers we have an ongoing connection with
|
||||
self.interfaces = {} # type: Dict[str, Interface]
|
||||
self.auto_connect = self.config.get('auto_connect', True)
|
||||
self.connecting = set()
|
||||
self.server_queue = None
|
||||
@@ -227,8 +225,8 @@ class Network(PrintError):
|
||||
#self.asyncio_loop.set_debug(1)
|
||||
self._run_forever = asyncio.Future()
|
||||
self._thread = threading.Thread(target=self.asyncio_loop.run_until_complete,
|
||||
args=(self._run_forever,),
|
||||
name='Network')
|
||||
args=(self._run_forever,),
|
||||
name='Network')
|
||||
self._thread.start()
|
||||
|
||||
def run_from_another_thread(self, coro):
|
||||
@@ -523,20 +521,40 @@ class Network(PrintError):
|
||||
|
||||
async def switch_lagging_interface(self):
|
||||
'''If auto_connect and lagging, switch interface'''
|
||||
if await self._server_is_lagging() and self.auto_connect:
|
||||
if self.auto_connect and await self._server_is_lagging():
|
||||
# switch to one that has the correct header (not height)
|
||||
header = self.blockchain().read_header(self.get_local_height())
|
||||
def filt(x):
|
||||
a = x[1].tip_header
|
||||
b = header
|
||||
assert type(a) is type(b)
|
||||
return a == b
|
||||
|
||||
with self.interfaces_lock: interfaces_items = list(self.interfaces.items())
|
||||
filtered = list(map(lambda x: x[0], filter(filt, interfaces_items)))
|
||||
best_header = self.blockchain().read_header(self.get_local_height())
|
||||
with self.interfaces_lock: interfaces = list(self.interfaces.values())
|
||||
filtered = list(filter(lambda iface: iface.tip_header == best_header, interfaces))
|
||||
if filtered:
|
||||
choice = random.choice(filtered)
|
||||
await self.switch_to_interface(choice)
|
||||
chosen_iface = random.choice(filtered)
|
||||
await self.switch_to_interface(chosen_iface.server)
|
||||
|
||||
async def switch_unwanted_fork_interface(self):
|
||||
"""If auto_connect and main interface is not on preferred fork,
|
||||
try to switch to preferred fork.
|
||||
"""
|
||||
if not self.auto_connect:
|
||||
return
|
||||
with self.interfaces_lock: interfaces = list(self.interfaces.values())
|
||||
# try to switch to preferred fork
|
||||
if self._blockchain_preferred_block:
|
||||
pref_height = self._blockchain_preferred_block['height']
|
||||
pref_hash = self._blockchain_preferred_block['hash']
|
||||
filtered = list(filter(lambda iface: iface.blockchain.check_hash(pref_height, pref_hash),
|
||||
interfaces))
|
||||
if filtered:
|
||||
chosen_iface = random.choice(filtered)
|
||||
await self.switch_to_interface(chosen_iface.server)
|
||||
return
|
||||
# try to switch to longest chain
|
||||
if self.blockchain().parent_id is None:
|
||||
return # already on longest chain
|
||||
filtered = list(filter(lambda iface: iface.blockchain.parent_id is None,
|
||||
interfaces))
|
||||
if filtered:
|
||||
chosen_iface = random.choice(filtered)
|
||||
await self.switch_to_interface(chosen_iface.server)
|
||||
|
||||
async def switch_to_interface(self, server: str):
|
||||
"""Switch to server as our main interface. If no connection exists,
|
||||
@@ -704,8 +722,8 @@ class Network(PrintError):
|
||||
def blockchain(self) -> Blockchain:
|
||||
interface = self.interface
|
||||
if interface and interface.blockchain is not None:
|
||||
self.blockchain_index = interface.blockchain.forkpoint
|
||||
return blockchain.blockchains[self.blockchain_index]
|
||||
self._blockchain_index = interface.blockchain.forkpoint
|
||||
return blockchain.blockchains[self._blockchain_index]
|
||||
|
||||
def get_blockchains(self):
|
||||
out = {} # blockchain_id -> list(interfaces)
|
||||
@@ -724,24 +742,42 @@ class Network(PrintError):
|
||||
await self.connection_down(interface.server)
|
||||
return ifaces
|
||||
|
||||
async def follow_chain(self, chain_id):
|
||||
bc = blockchain.blockchains.get(chain_id)
|
||||
if bc:
|
||||
self.blockchain_index = chain_id
|
||||
self.config.set_key('blockchain_index', chain_id)
|
||||
with self.interfaces_lock: interfaces_values = list(self.interfaces.values())
|
||||
for iface in interfaces_values:
|
||||
if iface.blockchain == bc:
|
||||
await self.switch_to_interface(iface.server)
|
||||
break
|
||||
else:
|
||||
raise Exception('blockchain not found', chain_id)
|
||||
def _set_preferred_chain(self, chain: Blockchain):
|
||||
height = chain.get_max_forkpoint()
|
||||
header_hash = chain.get_hash(height)
|
||||
self._blockchain_preferred_block = {
|
||||
'height': height,
|
||||
'hash': header_hash,
|
||||
}
|
||||
self.config.set_key('blockchain_preferred_block', self._blockchain_preferred_block)
|
||||
|
||||
if self.interface:
|
||||
net_params = self.get_parameters()
|
||||
host, port, protocol = deserialize_server(self.interface.server)
|
||||
net_params = net_params._replace(host=host, port=port, protocol=protocol)
|
||||
await self.set_parameters(net_params)
|
||||
async def follow_chain_given_id(self, chain_id: int) -> None:
|
||||
bc = blockchain.blockchains.get(chain_id)
|
||||
if not bc:
|
||||
raise Exception('blockchain {} not found'.format(chain_id))
|
||||
self._set_preferred_chain(bc)
|
||||
# select server on this chain
|
||||
with self.interfaces_lock: interfaces = list(self.interfaces.values())
|
||||
interfaces_on_selected_chain = list(filter(lambda iface: iface.blockchain == bc, interfaces))
|
||||
if len(interfaces_on_selected_chain) == 0: return
|
||||
chosen_iface = random.choice(interfaces_on_selected_chain)
|
||||
# switch to server (and save to config)
|
||||
net_params = self.get_parameters()
|
||||
host, port, protocol = deserialize_server(chosen_iface.server)
|
||||
net_params = net_params._replace(host=host, port=port, protocol=protocol)
|
||||
await self.set_parameters(net_params)
|
||||
|
||||
async def follow_chain_given_server(self, server_str: str) -> None:
|
||||
# note that server_str should correspond to a connected interface
|
||||
iface = self.interfaces.get(server_str)
|
||||
if iface is None:
|
||||
return
|
||||
self._set_preferred_chain(iface.blockchain)
|
||||
# switch to server (and save to config)
|
||||
net_params = self.get_parameters()
|
||||
host, port, protocol = deserialize_server(server_str)
|
||||
net_params = net_params._replace(host=host, port=port, protocol=protocol)
|
||||
await self.set_parameters(net_params)
|
||||
|
||||
def get_local_height(self):
|
||||
return self.blockchain().height()
|
||||
|
||||
Reference in New Issue
Block a user