1
0

logging: basics

This commit is contained in:
SomberNight
2019-04-26 18:52:26 +02:00
parent 4d64e132d7
commit 3385a94753
68 changed files with 681 additions and 563 deletions

View File

@@ -42,7 +42,7 @@ from aiorpcx import TaskGroup
from aiohttp import ClientResponse
from . import util
from .util import (PrintError, print_error, log_exceptions, ignore_exceptions,
from .util import (log_exceptions, ignore_exceptions,
bfh, SilentTaskGroup, make_aiohttp_session, send_exception_to_crash_reporter,
is_hash256_str, is_non_negative_integer)
@@ -56,6 +56,11 @@ from .interface import (Interface, serialize_server, deserialize_server,
from .version import PROTOCOL_VERSION
from .simple_config import SimpleConfig
from .i18n import _
from .logging import get_logger, Logger
_logger = get_logger(__name__)
NODES_RETRY_INTERVAL = 60
SERVER_RETRY_INTERVAL = 10
@@ -214,7 +219,7 @@ class UntrustedServerReturnedError(Exception):
INSTANCE = None
class Network(PrintError):
class Network(Logger):
"""The Network class manages a set of connections to remote electrum
servers, each connected socket is handled by an Interface() object.
"""
@@ -224,6 +229,8 @@ class Network(PrintError):
global INSTANCE
INSTANCE = self
Logger.__init__(self)
self.asyncio_loop = asyncio.get_event_loop()
assert self.asyncio_loop.is_running(), "event loop not running"
self._loop_thread = None # type: threading.Thread # set by caller; only used for sanity checks
@@ -232,7 +239,7 @@ class Network(PrintError):
config = {} # Do not use mutables as default values!
self.config = SimpleConfig(config) if isinstance(config, dict) else config # type: SimpleConfig
blockchain.read_blockchains(self.config)
self.print_error("blockchains", list(map(lambda b: b.forkpoint, blockchain.blockchains.values())))
self.logger.info(f"blockchains {list(map(lambda b: b.forkpoint, blockchain.blockchains.values()))}")
self._blockchain_preferred_block = self.config.get('blockchain_preferred_block', None) # type: Optional[Dict]
self._blockchain = blockchain.get_best_chain()
# Server for addresses and transactions
@@ -242,7 +249,7 @@ class Network(PrintError):
try:
deserialize_server(self.default_server)
except:
self.print_error('Warning: failed to parse server-string; falling back to random.')
self.logger.warning('failed to parse server-string; falling back to random.')
self.default_server = None
if not self.default_server:
self.default_server = pick_random_server()
@@ -351,12 +358,12 @@ class Network(PrintError):
async def _server_is_lagging(self):
sh = self.get_server_height()
if not sh:
self.print_error('no height for main interface')
self.logger.info('no height for main interface')
return True
lh = self.get_local_height()
result = (lh - sh) > 1
if result:
self.print_error(f'{self.default_server} is lagging ({sh} vs {lh})')
self.logger.info(f'{self.default_server} is lagging ({sh} vs {lh})')
return result
def _set_status(self, status):
@@ -381,7 +388,7 @@ class Network(PrintError):
addr = await session.send_request('server.donation_address')
if not bitcoin.is_address(addr):
if addr: # ignore empty string
self.print_error(f"invalid donation address from server: {repr(addr)}")
self.logger.info(f"invalid donation address from server: {repr(addr)}")
addr = ''
self.donation_address = addr
async def get_server_peers():
@@ -416,7 +423,7 @@ class Network(PrintError):
for i in FEE_ETA_TARGETS:
fee_tasks.append((i, await group.spawn(session.send_request('blockchain.estimatefee', [i]))))
self.config.mempool_fees = histogram = histogram_task.result()
self.print_error(f'fee_histogram {histogram}')
self.logger.info(f'fee_histogram {histogram}')
self.notify('fee_histogram')
fee_estimates_eta = {}
for nblock_target, task in fee_tasks:
@@ -424,7 +431,7 @@ class Network(PrintError):
fee_estimates_eta[nblock_target] = fee
if fee < 0: continue
self.config.update_fee_estimates(nblock_target, fee)
self.print_error(f'fee_estimates {fee_estimates_eta}')
self.logger.info(f'fee_estimates {fee_estimates_eta}')
self.notify('fee')
def get_status_value(self, key):
@@ -490,7 +497,7 @@ class Network(PrintError):
def _start_interface(self, server: str):
if server not in self.interfaces and server not in self.connecting:
if server == self.default_server:
self.print_error(f"connecting to {server} as new interface")
self.logger.info(f"connecting to {server} as new interface")
self._set_status('connecting')
self.connecting.add(server)
self.server_queue.put(server)
@@ -509,7 +516,7 @@ class Network(PrintError):
if not hasattr(socket, "_getaddrinfo"):
socket._getaddrinfo = socket.getaddrinfo
if proxy:
self.print_error('setting proxy', proxy)
self.logger.info(f'setting proxy {proxy}')
# prevent dns leaks, see http://stackoverflow.com/questions/13184205/dns-over-proxy
socket.getaddrinfo = lambda *args: [(socket.AF_INET, socket.SOCK_STREAM, 6, '', (args[0], args[1]))]
else:
@@ -542,7 +549,7 @@ class Network(PrintError):
except dns.exception.DNSException as e:
pass
except BaseException as e:
print_error(f'dnspython failed to resolve dns (AAAA) with error: {e}')
_logger.info(f'dnspython failed to resolve dns (AAAA) with error: {e}')
# try IPv4
try:
answers = dns.resolver.query(host, dns.rdatatype.A)
@@ -554,7 +561,7 @@ class Network(PrintError):
raise socket.gaierror(11001, 'getaddrinfo failed') from e
except BaseException as e:
# Possibly internal error in dnspython :( see #4483
print_error(f'dnspython failed to resolve dns (A) with error: {e}')
_logger.info(f'dnspython failed to resolve dns (A) with error: {e}')
if addrs:
return addrs
# Fall back to original socket.getaddrinfo to resolve dns.
@@ -641,24 +648,24 @@ class Network(PrintError):
filtered = list(filter(lambda iface: iface.blockchain.check_hash(pref_height, pref_hash),
interfaces))
if filtered:
self.print_error("switching to preferred fork")
self.logger.info("switching to preferred fork")
chosen_iface = random.choice(filtered)
await self.switch_to_interface(chosen_iface.server)
return
else:
self.print_error("tried to switch to preferred fork but no interfaces are on it")
self.logger.info("tried to switch to preferred fork but no interfaces are on it")
# try to switch to best chain
if self.blockchain().parent is None:
return # already on best chain
filtered = list(filter(lambda iface: iface.blockchain.parent is None,
interfaces))
if filtered:
self.print_error("switching to best chain")
self.logger.info("switching to best chain")
chosen_iface = random.choice(filtered)
await self.switch_to_interface(chosen_iface.server)
else:
# FIXME switch to best available?
self.print_error("tried to switch to best chain but no interfaces are on it")
self.logger.info("tried to switch to best chain but no interfaces are on it")
async def switch_to_interface(self, server: str):
"""Switch to server as our main interface. If no connection exists,
@@ -685,7 +692,7 @@ class Network(PrintError):
i = self.interfaces[server]
if old_interface != i:
self.print_error("switching to", server)
self.logger.info(f"switching to {server}")
blockchain_updated = i.blockchain != self.blockchain()
self.interface = i
await i.group.spawn(self._request_server_info(i))
@@ -739,8 +746,7 @@ class Network(PrintError):
try:
await asyncio.wait_for(interface.ready, timeout)
except BaseException as e:
#traceback.print_exc()
self.print_error(f"couldn't launch iface {server} -- {repr(e)}")
self.logger.info(f"couldn't launch iface {server} -- {repr(e)}")
await interface.close()
return
else:
@@ -854,14 +860,14 @@ class Network(PrintError):
except (RequestTimedOut, asyncio.CancelledError, asyncio.TimeoutError):
raise # pass-through
except aiorpcx.jsonrpc.CodeMessageError as e:
self.print_error(f"broadcast_transaction error: {repr(e)}")
self.logger.info(f"broadcast_transaction error: {repr(e)}")
raise TxBroadcastServerReturnedError(self.sanitize_tx_broadcast_response(e.message)) from e
except BaseException as e: # intentional BaseException for sanity!
self.print_error(f"broadcast_transaction error2: {repr(e)}")
self.logger.info(f"broadcast_transaction error2: {repr(e)}")
send_exception_to_crash_reporter(e)
raise TxBroadcastUnknownError() from e
if out != tx.txid():
self.print_error(f"unexpected txid for broadcast_transaction: {out} != {tx.txid()}")
self.logger.info(f"unexpected txid for broadcast_transaction: {out} != {tx.txid()}")
raise TxBroadcastHashMismatch(_("Server returned unexpected transaction ID."))
@staticmethod
@@ -1103,7 +1109,7 @@ class Network(PrintError):
self.main_taskgroup = main_taskgroup = SilentTaskGroup()
assert not self.interface and not self.interfaces
assert not self.connecting and not self.server_queue
self.print_error('starting network')
self.logger.info('starting network')
self.disconnected_servers = set([])
self.protocol = deserialize_server(self.default_server)[2]
self.server_queue = queue.Queue()
@@ -1120,7 +1126,7 @@ class Network(PrintError):
await group.spawn(self._maintain_sessions())
[await group.spawn(job) for job in self._jobs]
except Exception as e:
traceback.print_exc(file=sys.stderr)
self.logger.exception('')
raise e
asyncio.run_coroutine_threadsafe(main(), self.asyncio_loop)
@@ -1132,11 +1138,11 @@ class Network(PrintError):
@log_exceptions
async def _stop(self, full_shutdown=False):
self.print_error("stopping network")
self.logger.info("stopping network")
try:
await asyncio.wait_for(self.main_taskgroup.cancel_remaining(), timeout=2)
except (asyncio.TimeoutError, asyncio.CancelledError) as e:
self.print_error(f"exc during main_taskgroup cancellation: {repr(e)}")
self.logger.info(f"exc during main_taskgroup cancellation: {repr(e)}")
self.main_taskgroup = None # type: TaskGroup
self.interface = None # type: Interface
self.interfaces = {} # type: Dict[str, Interface]
@@ -1179,7 +1185,7 @@ class Network(PrintError):
# FIXME this should try to honour "healthy spread of connected servers"
self._start_random_interface()
if now - self.nodes_retry_time > NODES_RETRY_INTERVAL:
self.print_error('network: retrying connections')
self.logger.info('network: retrying connections')
self.disconnected_servers = set([])
self.nodes_retry_time = now
async def maintain_healthy_spread_of_connected_servers():
@@ -1187,7 +1193,7 @@ class Network(PrintError):
random.shuffle(interfaces)
for iface in interfaces:
if not self.check_interface_against_healthy_spread_of_connected_servers(iface):
self.print_error(f"disconnecting from {iface.server}. too many connected "
self.logger.info(f"disconnecting from {iface.server}. too many connected "
f"servers already in bucket {iface.bucket_based_on_ipaddress()}")
await self._close_interface(iface)
async def maintain_main_interface():