Revert "interface: add padding and some noise to protocol messages"
Unforeseen issues. Needs more work..
This reverts commit 097eabed1f.
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
qrcode
|
||||
protobuf>=3.20
|
||||
qdarkstyle>=3.2
|
||||
aiorpcx>=0.25.0,<0.26
|
||||
aiorpcx>=0.22.0,<0.26
|
||||
aiohttp>=3.11.0,<4.0.0
|
||||
aiohttp_socks>=0.9.2
|
||||
certifi
|
||||
|
||||
@@ -26,7 +26,6 @@ import os
|
||||
import re
|
||||
import ssl
|
||||
import sys
|
||||
import time
|
||||
import traceback
|
||||
import asyncio
|
||||
import socket
|
||||
@@ -37,13 +36,12 @@ import itertools
|
||||
import logging
|
||||
import hashlib
|
||||
import functools
|
||||
import random
|
||||
|
||||
import aiorpcx
|
||||
from aiorpcx import RPCSession, Notification, NetAddress, NewlineFramer
|
||||
from aiorpcx.curio import timeout_after, TaskTimeout
|
||||
from aiorpcx.jsonrpc import JSONRPC, CodeMessageError
|
||||
from aiorpcx.rawsocket import RSClient, RSTransport
|
||||
from aiorpcx.rawsocket import RSClient
|
||||
import certifi
|
||||
|
||||
from .util import (ignore_exceptions, log_exceptions, bfh, ESocksProxy,
|
||||
@@ -267,11 +265,6 @@ class ConnectError(NetworkException): pass
|
||||
|
||||
|
||||
class _RSClient(RSClient):
|
||||
def __init__(self, *, transport=None, **kwargs):
|
||||
if transport is None:
|
||||
transport = PaddedRSTransport
|
||||
RSClient.__init__(self, transport=transport, **kwargs)
|
||||
|
||||
async def create_connection(self):
|
||||
try:
|
||||
return await super().create_connection()
|
||||
@@ -280,89 +273,6 @@ class _RSClient(RSClient):
|
||||
raise ConnectError(e) from e
|
||||
|
||||
|
||||
class PaddedRSTransport(RSTransport):
|
||||
"""A raw socket transport that provides basic countermeasures against traffic analysis
|
||||
by padding the jsonrpc payload with whitespaces to have ~uniform-size TCP packets.
|
||||
(it is assumed that a network observer does not see plaintext transport contents,
|
||||
due to it being wrapped e.g. in TLS)
|
||||
"""
|
||||
|
||||
MIN_PAYLOAD_SIZE = 1024
|
||||
|
||||
session: Optional['NotificationSession']
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
RSTransport.__init__(self, *args, **kwargs)
|
||||
self._sbuffer = bytearray() # "send buffer"
|
||||
self._sbuffer_task = None # type: Optional[asyncio.Task]
|
||||
self._sbuffer_has_data_evt = asyncio.Event()
|
||||
self._last_send = time.monotonic()
|
||||
|
||||
async def write(self, message):
|
||||
await self._can_send.wait()
|
||||
if self.is_closing():
|
||||
return
|
||||
framed_message = self._framer.frame(message)
|
||||
self._sbuffer += framed_message
|
||||
self._sbuffer_has_data_evt.set()
|
||||
self._maybe_consume_sbuffer()
|
||||
if not self._sbuffer:
|
||||
self._sbuffer_has_data_evt.clear()
|
||||
|
||||
def _maybe_consume_sbuffer(self):
|
||||
if not self._can_send.is_set() or self.is_closing():
|
||||
return
|
||||
buf = self._sbuffer
|
||||
if not buf:
|
||||
return
|
||||
# if there is enough data in the buffer, or if we haven't sent in a while, send now:
|
||||
if not (len(buf) >= self.MIN_PAYLOAD_SIZE or self._last_send + 1 < time.monotonic()):
|
||||
return
|
||||
assert buf[-2:] in (b"}\n", b"]\n"), f"unexpected json-rpc terminator: {buf[-2:]=!r}"
|
||||
# either (1) pad length to next power of two, to create "lsize" packet:
|
||||
payload_lsize = len(buf)
|
||||
total_lsize = max(self.MIN_PAYLOAD_SIZE, 2 ** (payload_lsize.bit_length()))
|
||||
npad_lsize = total_lsize - payload_lsize
|
||||
# or if that wasted a lot of bandwidth with padding, (2) defer sending some messages
|
||||
# and create a packet with half that size ("ssize", s for small)
|
||||
total_ssize = max(self.MIN_PAYLOAD_SIZE, total_lsize//2)
|
||||
payload_ssize = buf.rfind(b"\n", 0, total_ssize)
|
||||
if payload_ssize != -1:
|
||||
payload_ssize += 1 # for "\n" char
|
||||
npad_ssize = total_ssize - payload_ssize
|
||||
else:
|
||||
npad_ssize = float("inf")
|
||||
# decide between (1) and (2):
|
||||
if npad_lsize <= npad_ssize:
|
||||
npad = npad_lsize
|
||||
p_idx = payload_lsize
|
||||
else:
|
||||
npad = npad_ssize
|
||||
p_idx = payload_ssize
|
||||
# pad by adding spaces near end
|
||||
assert buf[p_idx-2:p_idx] in (b"}\n", b"]\n"), f"unexpected json-rpc terminator: {buf[p_idx-2:p_idx]=!r}"
|
||||
self.session.maybe_log(
|
||||
f"PaddedRSTransport. calling low-level write(). "
|
||||
f"chose between (lsize:{payload_lsize}+{npad_lsize}, ssize:{payload_ssize}+{npad_ssize}). "
|
||||
f"won: {'tie' if npad_lsize == npad_ssize else 'lsize' if npad_lsize < npad_ssize else 'ssize'}."
|
||||
)
|
||||
buf2 = buf[:p_idx - 2] + (npad * b" ") + buf[p_idx - 2:p_idx]
|
||||
self._asyncio_transport.write(buf2)
|
||||
self._last_send = time.monotonic()
|
||||
del self._sbuffer[:p_idx]
|
||||
|
||||
async def _poll_sbuffer(self):
|
||||
while True:
|
||||
await asyncio.sleep(0.5) # gives time for buffer to grow
|
||||
await self._sbuffer_has_data_evt.wait() # lowers CPU cost compared to pure polling
|
||||
self._maybe_consume_sbuffer()
|
||||
|
||||
def connection_made(self, transport: asyncio.BaseTransport):
|
||||
super().connection_made(transport)
|
||||
coro = self.session.interface.taskgroup.spawn(self._poll_sbuffer())
|
||||
self._sbuffer_task = self.loop.create_task(coro)
|
||||
|
||||
|
||||
class ServerAddr:
|
||||
|
||||
def __init__(self, host: str, port: Union[int, str], *, protocol: str = None):
|
||||
@@ -822,16 +732,8 @@ class Interface(Logger):
|
||||
raise GracefulDisconnect('session was closed')
|
||||
|
||||
async def ping(self):
|
||||
# We periodically send a "ping" msg to make sure the server knows we are still here.
|
||||
# Adding a bit of randomness generates some noise against traffic analysis.
|
||||
while True:
|
||||
await asyncio.sleep(random.random() * 300)
|
||||
await self.session.send_request('server.ping')
|
||||
await self._maybe_send_noise()
|
||||
|
||||
async def _maybe_send_noise(self):
|
||||
while random.random() < 0.2:
|
||||
await asyncio.sleep(random.random())
|
||||
await asyncio.sleep(300)
|
||||
await self.session.send_request('server.ping')
|
||||
|
||||
async def request_fee_estimates(self):
|
||||
@@ -876,7 +778,6 @@ class Interface(Logger):
|
||||
util.trigger_callback('network_updated')
|
||||
await self.network.switch_unwanted_fork_interface()
|
||||
await self.network.switch_lagging_interface()
|
||||
await self.taskgroup.spawn(self._maybe_send_noise())
|
||||
|
||||
async def _process_header_at_tip(self) -> bool:
|
||||
"""Returns:
|
||||
|
||||
@@ -75,8 +75,8 @@ def check_imports():
|
||||
import aiorpcx
|
||||
except ImportError as e:
|
||||
sys.exit(f"Error: {str(e)}. Try 'sudo python3 -m pip install <module-name>'")
|
||||
if not ((0, 25, 0) <= aiorpcx._version < (0, 26)):
|
||||
raise RuntimeError(f'aiorpcX version {aiorpcx._version} does not match required: 0.25.0<=ver<0.26')
|
||||
if not ((0, 22, 0) <= aiorpcx._version < (0, 26)):
|
||||
raise RuntimeError(f'aiorpcX version {aiorpcx._version} does not match required: 0.22.0<=ver<0.26')
|
||||
# the following imports are for pyinstaller
|
||||
from google.protobuf import descriptor
|
||||
from google.protobuf import message
|
||||
|
||||
Reference in New Issue
Block a user