1
0

interface: add padding and some noise to protocol messages

basic countermeasures against traffic analysis
This commit is contained in:
SomberNight
2025-02-13 18:48:11 +00:00
parent 2600a3bc74
commit 097eabed1f
3 changed files with 104 additions and 5 deletions

View File

@@ -1,7 +1,7 @@
qrcode
protobuf>=3.20
qdarkstyle>=3.2
aiorpcx>=0.22.0,<0.26
aiorpcx>=0.25.0,<0.26
aiohttp>=3.11.0,<4.0.0
aiohttp_socks>=0.9.2
certifi

View File

@@ -26,6 +26,7 @@ import os
import re
import ssl
import sys
import time
import traceback
import asyncio
import socket
@@ -36,12 +37,13 @@ import itertools
import logging
import hashlib
import functools
import random
import aiorpcx
from aiorpcx import RPCSession, Notification, NetAddress, NewlineFramer
from aiorpcx.curio import timeout_after, TaskTimeout
from aiorpcx.jsonrpc import JSONRPC, CodeMessageError
from aiorpcx.rawsocket import RSClient
from aiorpcx.rawsocket import RSClient, RSTransport
import certifi
from .util import (ignore_exceptions, log_exceptions, bfh, ESocksProxy,
@@ -265,6 +267,11 @@ class ConnectError(NetworkException): pass
class _RSClient(RSClient):
def __init__(self, *, transport=None, **kwargs):
if transport is None:
transport = PaddedRSTransport
RSClient.__init__(self, transport=transport, **kwargs)
async def create_connection(self):
try:
return await super().create_connection()
@@ -273,6 +280,89 @@ class _RSClient(RSClient):
raise ConnectError(e) from e
class PaddedRSTransport(RSTransport):
"""A raw socket transport that provides basic countermeasures against traffic analysis
by padding the jsonrpc payload with whitespaces to have ~uniform-size TCP packets.
(it is assumed that a network observer does not see plaintext transport contents,
due to it being wrapped e.g. in TLS)
"""
MIN_PAYLOAD_SIZE = 1024
session: Optional['NotificationSession']
def __init__(self, *args, **kwargs):
RSTransport.__init__(self, *args, **kwargs)
self._sbuffer = bytearray() # "send buffer"
self._sbuffer_task = None # type: Optional[asyncio.Task]
self._sbuffer_has_data_evt = asyncio.Event()
self._last_send = time.monotonic()
async def write(self, message):
await self._can_send.wait()
if self.is_closing():
return
framed_message = self._framer.frame(message)
self._sbuffer += framed_message
self._sbuffer_has_data_evt.set()
self._maybe_consume_sbuffer()
if not self._sbuffer:
self._sbuffer_has_data_evt.clear()
def _maybe_consume_sbuffer(self):
if not self._can_send.is_set() or self.is_closing():
return
buf = self._sbuffer
if not buf:
return
# if there is enough data in the buffer, or if we haven't sent in a while, send now:
if not (len(buf) >= self.MIN_PAYLOAD_SIZE or self._last_send + 1 < time.monotonic()):
return
assert buf[-2:] in (b"}\n", b"]\n"), f"unexpected json-rpc terminator: {buf[-2:]=!r}"
# either (1) pad length to next power of two, to create "lsize" packet:
payload_lsize = len(buf)
total_lsize = max(self.MIN_PAYLOAD_SIZE, 2 ** (payload_lsize.bit_length()))
npad_lsize = total_lsize - payload_lsize
# or if that wasted a lot of bandwidth with padding, (2) defer sending some messages
# and create a packet with half that size ("ssize", s for small)
total_ssize = max(self.MIN_PAYLOAD_SIZE, total_lsize//2)
payload_ssize = buf.rfind(b"\n", 0, total_ssize)
if payload_ssize != -1:
payload_ssize += 1 # for "\n" char
npad_ssize = total_ssize - payload_ssize
else:
npad_ssize = float("inf")
# decide between (1) and (2):
if npad_lsize <= npad_ssize:
npad = npad_lsize
p_idx = payload_lsize
else:
npad = npad_ssize
p_idx = payload_ssize
# pad by adding spaces near end
assert buf[p_idx-2:p_idx] in (b"}\n", b"]\n"), f"unexpected json-rpc terminator: {buf[p_idx-2:p_idx]=!r}"
self.session.maybe_log(
f"PaddedRSTransport. calling low-level write(). "
f"chose between (lsize:{payload_lsize}+{npad_lsize}, ssize:{payload_ssize}+{npad_ssize}). "
f"won: {'tie' if npad_lsize == npad_ssize else 'lsize' if npad_lsize < npad_ssize else 'ssize'}."
)
buf2 = buf[:p_idx - 2] + (npad * b" ") + buf[p_idx - 2:p_idx]
self._asyncio_transport.write(buf2)
self._last_send = time.monotonic()
del self._sbuffer[:p_idx]
async def _poll_sbuffer(self):
while True:
await asyncio.sleep(0.5) # gives time for buffer to grow
await self._sbuffer_has_data_evt.wait() # lowers CPU cost compared to pure polling
self._maybe_consume_sbuffer()
def connection_made(self, transport: asyncio.BaseTransport):
super().connection_made(transport)
coro = self.session.interface.taskgroup.spawn(self._poll_sbuffer())
self._sbuffer_task = self.loop.create_task(coro)
class ServerAddr:
def __init__(self, host: str, port: Union[int, str], *, protocol: str = None):
@@ -732,8 +822,16 @@ class Interface(Logger):
raise GracefulDisconnect('session was closed')
async def ping(self):
# We periodically send a "ping" msg to make sure the server knows we are still here.
# Adding a bit of randomness generates some noise against traffic analysis.
while True:
await asyncio.sleep(300)
await asyncio.sleep(random.random() * 300)
await self.session.send_request('server.ping')
await self._maybe_send_noise()
async def _maybe_send_noise(self):
while random.random() < 0.2:
await asyncio.sleep(random.random())
await self.session.send_request('server.ping')
async def request_fee_estimates(self):
@@ -778,6 +876,7 @@ class Interface(Logger):
util.trigger_callback('network_updated')
await self.network.switch_unwanted_fork_interface()
await self.network.switch_lagging_interface()
await self.taskgroup.spawn(self._maybe_send_noise())
async def _process_header_at_tip(self) -> bool:
"""Returns:

View File

@@ -75,8 +75,8 @@ def check_imports():
import aiorpcx
except ImportError as e:
sys.exit(f"Error: {str(e)}. Try 'sudo python3 -m pip install <module-name>'")
if not ((0, 22, 0) <= aiorpcx._version < (0, 26)):
raise RuntimeError(f'aiorpcX version {aiorpcx._version} does not match required: 0.22.0<=ver<0.26')
if not ((0, 25, 0) <= aiorpcx._version < (0, 26)):
raise RuntimeError(f'aiorpcX version {aiorpcx._version} does not match required: 0.25.0<=ver<0.26')
# the following imports are for pyinstaller
from google.protobuf import descriptor
from google.protobuf import message