|
|
|
|
@@ -26,6 +26,7 @@ import os
|
|
|
|
|
import re
|
|
|
|
|
import ssl
|
|
|
|
|
import sys
|
|
|
|
|
import time
|
|
|
|
|
import traceback
|
|
|
|
|
import asyncio
|
|
|
|
|
import socket
|
|
|
|
|
@@ -36,12 +37,13 @@ import itertools
|
|
|
|
|
import logging
|
|
|
|
|
import hashlib
|
|
|
|
|
import functools
|
|
|
|
|
import random
|
|
|
|
|
|
|
|
|
|
import aiorpcx
|
|
|
|
|
from aiorpcx import RPCSession, Notification, NetAddress, NewlineFramer
|
|
|
|
|
from aiorpcx.curio import timeout_after, TaskTimeout
|
|
|
|
|
from aiorpcx.jsonrpc import JSONRPC, CodeMessageError
|
|
|
|
|
from aiorpcx.rawsocket import RSClient
|
|
|
|
|
from aiorpcx.rawsocket import RSClient, RSTransport
|
|
|
|
|
import certifi
|
|
|
|
|
|
|
|
|
|
from .util import (ignore_exceptions, log_exceptions, bfh, ESocksProxy,
|
|
|
|
|
@@ -138,6 +140,7 @@ class NotificationSession(RPCSession):
|
|
|
|
|
self.cache = {}
|
|
|
|
|
self._msg_counter = itertools.count(start=1)
|
|
|
|
|
self.interface = interface
|
|
|
|
|
self.taskgroup = interface.taskgroup
|
|
|
|
|
self.cost_hard_limit = 0 # disable aiorpcx resource limits
|
|
|
|
|
|
|
|
|
|
async def handle_request(self, request):
|
|
|
|
|
@@ -273,6 +276,111 @@ class _RSClient(RSClient):
|
|
|
|
|
raise ConnectError(e) from e
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class PaddedRSTransport(RSTransport):
|
|
|
|
|
"""A raw socket transport that provides basic countermeasures against traffic analysis
|
|
|
|
|
by padding the jsonrpc payload with whitespaces to have ~uniform-size TCP packets.
|
|
|
|
|
(it is assumed that a network observer does not see plaintext transport contents,
|
|
|
|
|
due to it being wrapped e.g. in TLS)
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
MIN_PACKET_SIZE = 1024
|
|
|
|
|
WAIT_FOR_BUFFER_GROWTH_SECONDS = 1.0
|
|
|
|
|
|
|
|
|
|
session: Optional['RPCSession']
|
|
|
|
|
|
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
|
|
|
RSTransport.__init__(self, *args, **kwargs)
|
|
|
|
|
self._sbuffer = bytearray() # "send buffer"
|
|
|
|
|
self._sbuffer_task = None # type: Optional[asyncio.Task]
|
|
|
|
|
self._sbuffer_has_data_evt = asyncio.Event()
|
|
|
|
|
self._last_send = time.monotonic()
|
|
|
|
|
self._force_send = False # type: bool
|
|
|
|
|
|
|
|
|
|
# note: this does not call super().write() but is a complete reimplementation
|
|
|
|
|
async def write(self, message):
|
|
|
|
|
await self._can_send.wait()
|
|
|
|
|
if self.is_closing():
|
|
|
|
|
return
|
|
|
|
|
framed_message = self._framer.frame(message)
|
|
|
|
|
self._sbuffer += framed_message
|
|
|
|
|
self._sbuffer_has_data_evt.set()
|
|
|
|
|
self._maybe_consume_sbuffer()
|
|
|
|
|
|
|
|
|
|
def _maybe_consume_sbuffer(self) -> None:
|
|
|
|
|
"""Maybe take some data from sbuffer and send it on the wire."""
|
|
|
|
|
if not self._can_send.is_set() or self.is_closing():
|
|
|
|
|
return
|
|
|
|
|
buf = self._sbuffer
|
|
|
|
|
if not buf:
|
|
|
|
|
return
|
|
|
|
|
# if there is enough data in the buffer, or if we haven't sent in a while, send now:
|
|
|
|
|
if not (
|
|
|
|
|
self._force_send
|
|
|
|
|
or len(buf) >= self.MIN_PACKET_SIZE
|
|
|
|
|
or self._last_send + self.WAIT_FOR_BUFFER_GROWTH_SECONDS < time.monotonic()
|
|
|
|
|
):
|
|
|
|
|
return
|
|
|
|
|
assert buf[-2:] in (b"}\n", b"]\n"), f"unexpected json-rpc terminator: {buf[-2:]=!r}"
|
|
|
|
|
# either (1) pad length to next power of two, to create "lsize" packet:
|
|
|
|
|
payload_lsize = len(buf)
|
|
|
|
|
total_lsize = max(self.MIN_PACKET_SIZE, 2 ** (payload_lsize.bit_length()))
|
|
|
|
|
npad_lsize = total_lsize - payload_lsize
|
|
|
|
|
# or if that wasted a lot of bandwidth with padding, (2) defer sending some messages
|
|
|
|
|
# and create a packet with half that size ("ssize", s for small)
|
|
|
|
|
total_ssize = max(self.MIN_PACKET_SIZE, total_lsize // 2)
|
|
|
|
|
payload_ssize = buf.rfind(b"\n", 0, total_ssize)
|
|
|
|
|
if payload_ssize != -1:
|
|
|
|
|
payload_ssize += 1 # for "\n" char
|
|
|
|
|
npad_ssize = total_ssize - payload_ssize
|
|
|
|
|
else:
|
|
|
|
|
npad_ssize = float("inf")
|
|
|
|
|
# decide between (1) and (2):
|
|
|
|
|
if self._force_send or npad_lsize <= npad_ssize:
|
|
|
|
|
# (1) create "lsize" packet: consume full buffer
|
|
|
|
|
npad = npad_lsize
|
|
|
|
|
p_idx = payload_lsize
|
|
|
|
|
else:
|
|
|
|
|
# (2) create "ssize" packet: consume some, but defer some for later
|
|
|
|
|
npad = npad_ssize
|
|
|
|
|
p_idx = payload_ssize
|
|
|
|
|
# pad by adding spaces near end
|
|
|
|
|
# self.session.maybe_log(
|
|
|
|
|
# f"PaddedRSTransport. calling low-level write(). "
|
|
|
|
|
# f"chose between (lsize:{payload_lsize}+{npad_lsize}, ssize:{payload_ssize}+{npad_ssize}). "
|
|
|
|
|
# f"won: {'tie' if npad_lsize == npad_ssize else 'lsize' if npad_lsize < npad_ssize else 'ssize'}."
|
|
|
|
|
# )
|
|
|
|
|
json_rpc_terminator = buf[p_idx-2:p_idx]
|
|
|
|
|
assert json_rpc_terminator in (b"}\n", b"]\n"), f"unexpected {json_rpc_terminator=!r}"
|
|
|
|
|
buf2 = buf[:p_idx-2] + (npad * b" ") + json_rpc_terminator
|
|
|
|
|
self._asyncio_transport.write(buf2)
|
|
|
|
|
self._last_send = time.monotonic()
|
|
|
|
|
del self._sbuffer[:p_idx]
|
|
|
|
|
if not self._sbuffer:
|
|
|
|
|
self._sbuffer_has_data_evt.clear()
|
|
|
|
|
|
|
|
|
|
async def _poll_sbuffer(self):
|
|
|
|
|
while True:
|
|
|
|
|
await self._sbuffer_has_data_evt.wait() # to avoid busy-waiting
|
|
|
|
|
self._maybe_consume_sbuffer()
|
|
|
|
|
# If there is still data in the buffer, sleep until it would time out.
|
|
|
|
|
# note: If the transport is ~idle, when we wake up, we will send the current buf data,
|
|
|
|
|
# but if busy, we might wake up to completely new buffer contents. Either is fine.
|
|
|
|
|
if len(self._sbuffer) > 0:
|
|
|
|
|
timeout_abs = self._last_send + self.WAIT_FOR_BUFFER_GROWTH_SECONDS
|
|
|
|
|
timeout_rel = max(0.0, timeout_abs - time.monotonic())
|
|
|
|
|
await asyncio.sleep(timeout_rel)
|
|
|
|
|
|
|
|
|
|
def connection_made(self, transport: asyncio.BaseTransport):
|
|
|
|
|
super().connection_made(transport)
|
|
|
|
|
if isinstance(self.session, NotificationSession):
|
|
|
|
|
coro = self.session.taskgroup.spawn(self._poll_sbuffer())
|
|
|
|
|
self._sbuffer_task = self.loop.create_task(coro)
|
|
|
|
|
else:
|
|
|
|
|
# This a short-lived "fetch_certificate"-type session.
|
|
|
|
|
# No polling here, we always force-empty the buffer.
|
|
|
|
|
self._force_send = True
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ServerAddr:
|
|
|
|
|
|
|
|
|
|
def __init__(self, host: str, port: Union[int, str], *, protocol: str = None):
|
|
|
|
|
@@ -605,9 +713,13 @@ class Interface(Logger):
|
|
|
|
|
sslc = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS_CLIENT)
|
|
|
|
|
sslc.check_hostname = False
|
|
|
|
|
sslc.verify_mode = ssl.CERT_NONE
|
|
|
|
|
async with _RSClient(session_factory=RPCSession,
|
|
|
|
|
host=self.host, port=self.port,
|
|
|
|
|
ssl=sslc, proxy=self.proxy) as session:
|
|
|
|
|
async with _RSClient(
|
|
|
|
|
session_factory=RPCSession,
|
|
|
|
|
host=self.host, port=self.port,
|
|
|
|
|
ssl=sslc,
|
|
|
|
|
proxy=self.proxy,
|
|
|
|
|
transport=PaddedRSTransport,
|
|
|
|
|
) as session:
|
|
|
|
|
asyncio_transport = session.transport._asyncio_transport # type: asyncio.BaseTransport
|
|
|
|
|
ssl_object = asyncio_transport.get_extra_info("ssl_object") # type: ssl.SSLObject
|
|
|
|
|
return ssl_object.getpeercert(binary_form=True)
|
|
|
|
|
@@ -676,9 +788,13 @@ class Interface(Logger):
|
|
|
|
|
|
|
|
|
|
async def open_session(self, sslc, exit_early=False):
|
|
|
|
|
session_factory = lambda *args, iface=self, **kwargs: NotificationSession(*args, **kwargs, interface=iface)
|
|
|
|
|
async with _RSClient(session_factory=session_factory,
|
|
|
|
|
host=self.host, port=self.port,
|
|
|
|
|
ssl=sslc, proxy=self.proxy) as session:
|
|
|
|
|
async with _RSClient(
|
|
|
|
|
session_factory=session_factory,
|
|
|
|
|
host=self.host, port=self.port,
|
|
|
|
|
ssl=sslc,
|
|
|
|
|
proxy=self.proxy,
|
|
|
|
|
transport=PaddedRSTransport,
|
|
|
|
|
) as session:
|
|
|
|
|
self.session = session # type: NotificationSession
|
|
|
|
|
self.session.set_default_timeout(self.network.get_network_timeout_seconds(NetworkTimeout.Generic))
|
|
|
|
|
try:
|
|
|
|
|
@@ -730,8 +846,16 @@ class Interface(Logger):
|
|
|
|
|
raise GracefulDisconnect('session was closed')
|
|
|
|
|
|
|
|
|
|
async def ping(self):
|
|
|
|
|
# We periodically send a "ping" msg to make sure the server knows we are still here.
|
|
|
|
|
# Adding a bit of randomness generates some noise against traffic analysis.
|
|
|
|
|
while True:
|
|
|
|
|
await asyncio.sleep(300)
|
|
|
|
|
await asyncio.sleep(random.random() * 300)
|
|
|
|
|
await self.session.send_request('server.ping')
|
|
|
|
|
await self._maybe_send_noise()
|
|
|
|
|
|
|
|
|
|
async def _maybe_send_noise(self):
|
|
|
|
|
while random.random() < 0.2:
|
|
|
|
|
await asyncio.sleep(random.random())
|
|
|
|
|
await self.session.send_request('server.ping')
|
|
|
|
|
|
|
|
|
|
async def request_fee_estimates(self):
|
|
|
|
|
@@ -778,6 +902,7 @@ class Interface(Logger):
|
|
|
|
|
util.trigger_callback('network_updated')
|
|
|
|
|
await self.network.switch_unwanted_fork_interface()
|
|
|
|
|
await self.network.switch_lagging_interface()
|
|
|
|
|
await self.taskgroup.spawn(self._maybe_send_noise())
|
|
|
|
|
|
|
|
|
|
async def _process_header_at_tip(self) -> bool:
|
|
|
|
|
"""Returns:
|
|
|
|
|
|