interface: add padding and some noise to protocol messages
basic countermeasures against traffic analysis
This commit is contained in:
@@ -1,7 +1,7 @@
|
|||||||
qrcode
|
qrcode
|
||||||
protobuf>=3.20
|
protobuf>=3.20
|
||||||
qdarkstyle>=3.2
|
qdarkstyle>=3.2
|
||||||
aiorpcx>=0.22.0,<0.26
|
aiorpcx>=0.25.0,<0.26
|
||||||
aiohttp>=3.11.0,<4.0.0
|
aiohttp>=3.11.0,<4.0.0
|
||||||
aiohttp_socks>=0.9.2
|
aiohttp_socks>=0.9.2
|
||||||
certifi
|
certifi
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ import os
|
|||||||
import re
|
import re
|
||||||
import ssl
|
import ssl
|
||||||
import sys
|
import sys
|
||||||
|
import time
|
||||||
import traceback
|
import traceback
|
||||||
import asyncio
|
import asyncio
|
||||||
import socket
|
import socket
|
||||||
@@ -36,12 +37,13 @@ import itertools
|
|||||||
import logging
|
import logging
|
||||||
import hashlib
|
import hashlib
|
||||||
import functools
|
import functools
|
||||||
|
import random
|
||||||
|
|
||||||
import aiorpcx
|
import aiorpcx
|
||||||
from aiorpcx import RPCSession, Notification, NetAddress, NewlineFramer
|
from aiorpcx import RPCSession, Notification, NetAddress, NewlineFramer
|
||||||
from aiorpcx.curio import timeout_after, TaskTimeout
|
from aiorpcx.curio import timeout_after, TaskTimeout
|
||||||
from aiorpcx.jsonrpc import JSONRPC, CodeMessageError
|
from aiorpcx.jsonrpc import JSONRPC, CodeMessageError
|
||||||
from aiorpcx.rawsocket import RSClient
|
from aiorpcx.rawsocket import RSClient, RSTransport
|
||||||
import certifi
|
import certifi
|
||||||
|
|
||||||
from .util import (ignore_exceptions, log_exceptions, bfh, ESocksProxy,
|
from .util import (ignore_exceptions, log_exceptions, bfh, ESocksProxy,
|
||||||
@@ -265,6 +267,11 @@ class ConnectError(NetworkException): pass
|
|||||||
|
|
||||||
|
|
||||||
class _RSClient(RSClient):
|
class _RSClient(RSClient):
|
||||||
|
def __init__(self, *, transport=None, **kwargs):
|
||||||
|
if transport is None:
|
||||||
|
transport = PaddedRSTransport
|
||||||
|
RSClient.__init__(self, transport=transport, **kwargs)
|
||||||
|
|
||||||
async def create_connection(self):
|
async def create_connection(self):
|
||||||
try:
|
try:
|
||||||
return await super().create_connection()
|
return await super().create_connection()
|
||||||
@@ -273,6 +280,110 @@ class _RSClient(RSClient):
|
|||||||
raise ConnectError(e) from e
|
raise ConnectError(e) from e
|
||||||
|
|
||||||
|
|
||||||
|
class PaddedRSTransport(RSTransport):
|
||||||
|
"""A raw socket transport that provides basic countermeasures against traffic analysis
|
||||||
|
by padding the jsonrpc payload with whitespaces to have ~uniform-size TCP packets.
|
||||||
|
(it is assumed that a network observer does not see plaintext transport contents,
|
||||||
|
due to it being wrapped e.g. in TLS)
|
||||||
|
"""
|
||||||
|
|
||||||
|
MIN_PACKET_SIZE = 1024
|
||||||
|
WAIT_FOR_BUFFER_GROWTH_SECONDS = 1.0
|
||||||
|
|
||||||
|
session: Optional['RPCSession']
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
RSTransport.__init__(self, *args, **kwargs)
|
||||||
|
self._sbuffer = bytearray() # "send buffer"
|
||||||
|
self._sbuffer_task = None # type: Optional[asyncio.Task]
|
||||||
|
self._sbuffer_has_data_evt = asyncio.Event()
|
||||||
|
self._last_send = time.monotonic()
|
||||||
|
self._force_send = False # type: bool
|
||||||
|
|
||||||
|
# note: this does not call super().write() but is a complete reimplementation
|
||||||
|
async def write(self, message):
|
||||||
|
await self._can_send.wait()
|
||||||
|
if self.is_closing():
|
||||||
|
return
|
||||||
|
framed_message = self._framer.frame(message)
|
||||||
|
self._sbuffer += framed_message
|
||||||
|
self._sbuffer_has_data_evt.set()
|
||||||
|
self._maybe_consume_sbuffer()
|
||||||
|
|
||||||
|
def _maybe_consume_sbuffer(self):
|
||||||
|
if not self._can_send.is_set() or self.is_closing():
|
||||||
|
return
|
||||||
|
buf = self._sbuffer
|
||||||
|
if not buf:
|
||||||
|
return
|
||||||
|
# if there is enough data in the buffer, or if we haven't sent in a while, send now:
|
||||||
|
if not (
|
||||||
|
self._force_send
|
||||||
|
or len(buf) >= self.MIN_PACKET_SIZE
|
||||||
|
or self._last_send + self.WAIT_FOR_BUFFER_GROWTH_SECONDS < time.monotonic()
|
||||||
|
):
|
||||||
|
return
|
||||||
|
assert buf[-2:] in (b"}\n", b"]\n"), f"unexpected json-rpc terminator: {buf[-2:]=!r}"
|
||||||
|
# either (1) pad length to next power of two, to create "lsize" packet:
|
||||||
|
payload_lsize = len(buf)
|
||||||
|
total_lsize = max(self.MIN_PACKET_SIZE, 2 ** (payload_lsize.bit_length()))
|
||||||
|
npad_lsize = total_lsize - payload_lsize
|
||||||
|
# or if that wasted a lot of bandwidth with padding, (2) defer sending some messages
|
||||||
|
# and create a packet with half that size ("ssize", s for small)
|
||||||
|
total_ssize = max(self.MIN_PACKET_SIZE, total_lsize // 2)
|
||||||
|
payload_ssize = buf.rfind(b"\n", 0, total_ssize)
|
||||||
|
if payload_ssize != -1:
|
||||||
|
payload_ssize += 1 # for "\n" char
|
||||||
|
npad_ssize = total_ssize - payload_ssize
|
||||||
|
else:
|
||||||
|
npad_ssize = float("inf")
|
||||||
|
# decide between (1) and (2):
|
||||||
|
if self._force_send or npad_lsize <= npad_ssize:
|
||||||
|
# (1) create "lsize" packet: consume full buffer
|
||||||
|
npad = npad_lsize
|
||||||
|
p_idx = payload_lsize
|
||||||
|
else:
|
||||||
|
# (2) create "ssize" packet: consume some, but defer some for later
|
||||||
|
npad = npad_ssize
|
||||||
|
p_idx = payload_ssize
|
||||||
|
# pad by adding spaces near end
|
||||||
|
# self.session.maybe_log(
|
||||||
|
# f"PaddedRSTransport. calling low-level write(). "
|
||||||
|
# f"chose between (lsize:{payload_lsize}+{npad_lsize}, ssize:{payload_ssize}+{npad_ssize}). "
|
||||||
|
# f"won: {'tie' if npad_lsize == npad_ssize else 'lsize' if npad_lsize < npad_ssize else 'ssize'}."
|
||||||
|
# )
|
||||||
|
json_rpc_terminator = buf[p_idx-2:p_idx]
|
||||||
|
assert json_rpc_terminator in (b"}\n", b"]\n"), f"unexpected {json_rpc_terminator=!r}"
|
||||||
|
buf2 = buf[:p_idx-2] + (npad * b" ") + json_rpc_terminator
|
||||||
|
self._asyncio_transport.write(buf2)
|
||||||
|
self._last_send = time.monotonic()
|
||||||
|
del self._sbuffer[:p_idx]
|
||||||
|
if not self._sbuffer:
|
||||||
|
self._sbuffer_has_data_evt.clear()
|
||||||
|
|
||||||
|
async def _poll_sbuffer(self):
|
||||||
|
while True:
|
||||||
|
await self._sbuffer_has_data_evt.wait() # to avoid busy-waiting
|
||||||
|
self._maybe_consume_sbuffer()
|
||||||
|
# If there is still data in the buffer, sleep until it would time out.
|
||||||
|
# note: If the transport is ~idle, when we wake up, we will send the current buf data,
|
||||||
|
# but if busy, we might wake up to completely new buffer contents. Either is fine.
|
||||||
|
if len(self._sbuffer) > 0:
|
||||||
|
timeout_abs = self._last_send + self.WAIT_FOR_BUFFER_GROWTH_SECONDS
|
||||||
|
timeout_rel = max(0.0, timeout_abs - time.monotonic())
|
||||||
|
await asyncio.sleep(timeout_rel)
|
||||||
|
|
||||||
|
def connection_made(self, transport: asyncio.BaseTransport):
|
||||||
|
super().connection_made(transport)
|
||||||
|
if isinstance(self.session, NotificationSession):
|
||||||
|
coro = self.session.interface.taskgroup.spawn(self._poll_sbuffer())
|
||||||
|
self._sbuffer_task = self.loop.create_task(coro)
|
||||||
|
else:
|
||||||
|
# This a short-lived "fetch_certificate"-type session.
|
||||||
|
# No polling here, we always force-empty the buffer.
|
||||||
|
self._force_send = True
|
||||||
|
|
||||||
|
|
||||||
class ServerAddr:
|
class ServerAddr:
|
||||||
|
|
||||||
def __init__(self, host: str, port: Union[int, str], *, protocol: str = None):
|
def __init__(self, host: str, port: Union[int, str], *, protocol: str = None):
|
||||||
@@ -730,8 +841,16 @@ class Interface(Logger):
|
|||||||
raise GracefulDisconnect('session was closed')
|
raise GracefulDisconnect('session was closed')
|
||||||
|
|
||||||
async def ping(self):
|
async def ping(self):
|
||||||
|
# We periodically send a "ping" msg to make sure the server knows we are still here.
|
||||||
|
# Adding a bit of randomness generates some noise against traffic analysis.
|
||||||
while True:
|
while True:
|
||||||
await asyncio.sleep(300)
|
await asyncio.sleep(random.random() * 300)
|
||||||
|
await self.session.send_request('server.ping')
|
||||||
|
await self._maybe_send_noise()
|
||||||
|
|
||||||
|
async def _maybe_send_noise(self):
|
||||||
|
while random.random() < 0.2:
|
||||||
|
await asyncio.sleep(random.random())
|
||||||
await self.session.send_request('server.ping')
|
await self.session.send_request('server.ping')
|
||||||
|
|
||||||
async def request_fee_estimates(self):
|
async def request_fee_estimates(self):
|
||||||
@@ -778,6 +897,7 @@ class Interface(Logger):
|
|||||||
util.trigger_callback('network_updated')
|
util.trigger_callback('network_updated')
|
||||||
await self.network.switch_unwanted_fork_interface()
|
await self.network.switch_unwanted_fork_interface()
|
||||||
await self.network.switch_lagging_interface()
|
await self.network.switch_lagging_interface()
|
||||||
|
await self.taskgroup.spawn(self._maybe_send_noise())
|
||||||
|
|
||||||
async def _process_header_at_tip(self) -> bool:
|
async def _process_header_at_tip(self) -> bool:
|
||||||
"""Returns:
|
"""Returns:
|
||||||
|
|||||||
@@ -75,8 +75,8 @@ def check_imports():
|
|||||||
import aiorpcx
|
import aiorpcx
|
||||||
except ImportError as e:
|
except ImportError as e:
|
||||||
sys.exit(f"Error: {str(e)}. Try 'sudo python3 -m pip install <module-name>'")
|
sys.exit(f"Error: {str(e)}. Try 'sudo python3 -m pip install <module-name>'")
|
||||||
if not ((0, 22, 0) <= aiorpcx._version < (0, 26)):
|
if not ((0, 25, 0) <= aiorpcx._version < (0, 26)):
|
||||||
raise RuntimeError(f'aiorpcX version {aiorpcx._version} does not match required: 0.22.0<=ver<0.26')
|
raise RuntimeError(f'aiorpcX version {aiorpcx._version} does not match required: 0.25.0<=ver<0.26')
|
||||||
# the following imports are for pyinstaller
|
# the following imports are for pyinstaller
|
||||||
from google.protobuf import descriptor
|
from google.protobuf import descriptor
|
||||||
from google.protobuf import message
|
from google.protobuf import message
|
||||||
|
|||||||
Reference in New Issue
Block a user