From 097eabed1fd1ccc3952f069ea594c973294a8d37 Mon Sep 17 00:00:00 2001 From: SomberNight Date: Thu, 13 Feb 2025 18:48:11 +0000 Subject: [PATCH] interface: add padding and some noise to protocol messages basic countermeasures against traffic analysis --- contrib/requirements/requirements.txt | 2 +- electrum/interface.py | 103 +++++++++++++++++++++++++- run_electrum | 4 +- 3 files changed, 104 insertions(+), 5 deletions(-) diff --git a/contrib/requirements/requirements.txt b/contrib/requirements/requirements.txt index 7dfe07373..404e76995 100644 --- a/contrib/requirements/requirements.txt +++ b/contrib/requirements/requirements.txt @@ -1,7 +1,7 @@ qrcode protobuf>=3.20 qdarkstyle>=3.2 -aiorpcx>=0.22.0,<0.26 +aiorpcx>=0.25.0,<0.26 aiohttp>=3.11.0,<4.0.0 aiohttp_socks>=0.9.2 certifi diff --git a/electrum/interface.py b/electrum/interface.py index 8649652b9..70fb75cad 100644 --- a/electrum/interface.py +++ b/electrum/interface.py @@ -26,6 +26,7 @@ import os import re import ssl import sys +import time import traceback import asyncio import socket @@ -36,12 +37,13 @@ import itertools import logging import hashlib import functools +import random import aiorpcx from aiorpcx import RPCSession, Notification, NetAddress, NewlineFramer from aiorpcx.curio import timeout_after, TaskTimeout from aiorpcx.jsonrpc import JSONRPC, CodeMessageError -from aiorpcx.rawsocket import RSClient +from aiorpcx.rawsocket import RSClient, RSTransport import certifi from .util import (ignore_exceptions, log_exceptions, bfh, ESocksProxy, @@ -265,6 +267,11 @@ class ConnectError(NetworkException): pass class _RSClient(RSClient): + def __init__(self, *, transport=None, **kwargs): + if transport is None: + transport = PaddedRSTransport + RSClient.__init__(self, transport=transport, **kwargs) + async def create_connection(self): try: return await super().create_connection() @@ -273,6 +280,89 @@ class _RSClient(RSClient): raise ConnectError(e) from e +class PaddedRSTransport(RSTransport): + """A raw socket transport that provides basic countermeasures against traffic analysis + by padding the jsonrpc payload with whitespaces to have ~uniform-size TCP packets. + (it is assumed that a network observer does not see plaintext transport contents, + due to it being wrapped e.g. in TLS) + """ + + MIN_PAYLOAD_SIZE = 1024 + + session: Optional['NotificationSession'] + + def __init__(self, *args, **kwargs): + RSTransport.__init__(self, *args, **kwargs) + self._sbuffer = bytearray() # "send buffer" + self._sbuffer_task = None # type: Optional[asyncio.Task] + self._sbuffer_has_data_evt = asyncio.Event() + self._last_send = time.monotonic() + + async def write(self, message): + await self._can_send.wait() + if self.is_closing(): + return + framed_message = self._framer.frame(message) + self._sbuffer += framed_message + self._sbuffer_has_data_evt.set() + self._maybe_consume_sbuffer() + if not self._sbuffer: + self._sbuffer_has_data_evt.clear() + + def _maybe_consume_sbuffer(self): + if not self._can_send.is_set() or self.is_closing(): + return + buf = self._sbuffer + if not buf: + return + # if there is enough data in the buffer, or if we haven't sent in a while, send now: + if not (len(buf) >= self.MIN_PAYLOAD_SIZE or self._last_send + 1 < time.monotonic()): + return + assert buf[-2:] in (b"}\n", b"]\n"), f"unexpected json-rpc terminator: {buf[-2:]=!r}" + # either (1) pad length to next power of two, to create "lsize" packet: + payload_lsize = len(buf) + total_lsize = max(self.MIN_PAYLOAD_SIZE, 2 ** (payload_lsize.bit_length())) + npad_lsize = total_lsize - payload_lsize + # or if that wasted a lot of bandwidth with padding, (2) defer sending some messages + # and create a packet with half that size ("ssize", s for small) + total_ssize = max(self.MIN_PAYLOAD_SIZE, total_lsize//2) + payload_ssize = buf.rfind(b"\n", 0, total_ssize) + if payload_ssize != -1: + payload_ssize += 1 # for "\n" char + npad_ssize = total_ssize - payload_ssize + else: + npad_ssize = float("inf") + # decide between (1) and (2): + if npad_lsize <= npad_ssize: + npad = npad_lsize + p_idx = payload_lsize + else: + npad = npad_ssize + p_idx = payload_ssize + # pad by adding spaces near end + assert buf[p_idx-2:p_idx] in (b"}\n", b"]\n"), f"unexpected json-rpc terminator: {buf[p_idx-2:p_idx]=!r}" + self.session.maybe_log( + f"PaddedRSTransport. calling low-level write(). " + f"chose between (lsize:{payload_lsize}+{npad_lsize}, ssize:{payload_ssize}+{npad_ssize}). " + f"won: {'tie' if npad_lsize == npad_ssize else 'lsize' if npad_lsize < npad_ssize else 'ssize'}." + ) + buf2 = buf[:p_idx - 2] + (npad * b" ") + buf[p_idx - 2:p_idx] + self._asyncio_transport.write(buf2) + self._last_send = time.monotonic() + del self._sbuffer[:p_idx] + + async def _poll_sbuffer(self): + while True: + await asyncio.sleep(0.5) # gives time for buffer to grow + await self._sbuffer_has_data_evt.wait() # lowers CPU cost compared to pure polling + self._maybe_consume_sbuffer() + + def connection_made(self, transport: asyncio.BaseTransport): + super().connection_made(transport) + coro = self.session.interface.taskgroup.spawn(self._poll_sbuffer()) + self._sbuffer_task = self.loop.create_task(coro) + + class ServerAddr: def __init__(self, host: str, port: Union[int, str], *, protocol: str = None): @@ -732,8 +822,16 @@ class Interface(Logger): raise GracefulDisconnect('session was closed') async def ping(self): + # We periodically send a "ping" msg to make sure the server knows we are still here. + # Adding a bit of randomness generates some noise against traffic analysis. while True: - await asyncio.sleep(300) + await asyncio.sleep(random.random() * 300) + await self.session.send_request('server.ping') + await self._maybe_send_noise() + + async def _maybe_send_noise(self): + while random.random() < 0.2: + await asyncio.sleep(random.random()) await self.session.send_request('server.ping') async def request_fee_estimates(self): @@ -778,6 +876,7 @@ class Interface(Logger): util.trigger_callback('network_updated') await self.network.switch_unwanted_fork_interface() await self.network.switch_lagging_interface() + await self.taskgroup.spawn(self._maybe_send_noise()) async def _process_header_at_tip(self) -> bool: """Returns: diff --git a/run_electrum b/run_electrum index 4009a1aa2..bb8c1d891 100755 --- a/run_electrum +++ b/run_electrum @@ -75,8 +75,8 @@ def check_imports(): import aiorpcx except ImportError as e: sys.exit(f"Error: {str(e)}. Try 'sudo python3 -m pip install '") - if not ((0, 22, 0) <= aiorpcx._version < (0, 26)): - raise RuntimeError(f'aiorpcX version {aiorpcx._version} does not match required: 0.22.0<=ver<0.26') + if not ((0, 25, 0) <= aiorpcx._version < (0, 26)): + raise RuntimeError(f'aiorpcX version {aiorpcx._version} does not match required: 0.25.0<=ver<0.26') # the following imports are for pyinstaller from google.protobuf import descriptor from google.protobuf import message