Merge pull request #9250 from accumulator/network_tor_stream_isolation
network: use TOR stream isolation
This commit is contained in:
@@ -44,7 +44,7 @@ from aiorpcx.jsonrpc import JSONRPC, CodeMessageError
|
|||||||
from aiorpcx.rawsocket import RSClient
|
from aiorpcx.rawsocket import RSClient
|
||||||
import certifi
|
import certifi
|
||||||
|
|
||||||
from .util import (ignore_exceptions, log_exceptions, bfh, MySocksProxy,
|
from .util import (ignore_exceptions, log_exceptions, bfh, ESocksProxy,
|
||||||
is_integer, is_non_negative_integer, is_hash256_str, is_hex_str,
|
is_integer, is_non_negative_integer, is_hash256_str, is_hex_str,
|
||||||
is_int_or_float, is_non_negative_int_or_float, OldTaskGroup)
|
is_int_or_float, is_non_negative_int_or_float, OldTaskGroup)
|
||||||
from . import util
|
from . import util
|
||||||
@@ -375,7 +375,7 @@ class Interface(Logger):
|
|||||||
|
|
||||||
LOGGING_SHORTCUT = 'i'
|
LOGGING_SHORTCUT = 'i'
|
||||||
|
|
||||||
def __init__(self, *, network: 'Network', server: ServerAddr, proxy: Optional[dict]):
|
def __init__(self, *, network: 'Network', server: ServerAddr):
|
||||||
self.ready = network.asyncio_loop.create_future()
|
self.ready = network.asyncio_loop.create_future()
|
||||||
self.got_disconnected = asyncio.Event()
|
self.got_disconnected = asyncio.Event()
|
||||||
self.server = server
|
self.server = server
|
||||||
@@ -394,8 +394,9 @@ class Interface(Logger):
|
|||||||
# addresses...? e.g. 192.168.x.x
|
# addresses...? e.g. 192.168.x.x
|
||||||
if util.is_localhost(server.host):
|
if util.is_localhost(server.host):
|
||||||
self.logger.info(f"looks like localhost: not using proxy for this server")
|
self.logger.info(f"looks like localhost: not using proxy for this server")
|
||||||
proxy = None
|
self.proxy = None
|
||||||
self.proxy = MySocksProxy.from_proxy_dict(proxy)
|
else:
|
||||||
|
self.proxy = ESocksProxy.from_network_settings(network)
|
||||||
|
|
||||||
# Latest block header and corresponding height, as claimed by the server.
|
# Latest block header and corresponding height, as claimed by the server.
|
||||||
# Note that these values are updated before they are verified.
|
# Note that these values are updated before they are verified.
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ import re
|
|||||||
import hashlib
|
import hashlib
|
||||||
import asyncio
|
import asyncio
|
||||||
from asyncio import StreamReader, StreamWriter
|
from asyncio import StreamReader, StreamWriter
|
||||||
from typing import Optional
|
from typing import Optional, TYPE_CHECKING
|
||||||
from functools import cached_property
|
from functools import cached_property
|
||||||
from typing import NamedTuple, List, Tuple, Mapping, Optional, TYPE_CHECKING, Union, Dict, Set, Sequence
|
from typing import NamedTuple, List, Tuple, Mapping, Optional, TYPE_CHECKING, Union, Dict, Set, Sequence
|
||||||
|
|
||||||
@@ -17,12 +17,18 @@ from aiorpcx import NetAddress
|
|||||||
import electrum_ecc as ecc
|
import electrum_ecc as ecc
|
||||||
|
|
||||||
from .crypto import sha256, hmac_oneshot, chacha20_poly1305_encrypt, chacha20_poly1305_decrypt, get_ecdh, privkey_to_pubkey
|
from .crypto import sha256, hmac_oneshot, chacha20_poly1305_encrypt, chacha20_poly1305_decrypt, get_ecdh, privkey_to_pubkey
|
||||||
from .util import MySocksProxy
|
from .util import ESocksProxy
|
||||||
|
|
||||||
|
|
||||||
class LightningPeerConnectionClosed(Exception): pass
|
class LightningPeerConnectionClosed(Exception): pass
|
||||||
class HandshakeFailed(Exception): pass
|
class HandshakeFailed(Exception): pass
|
||||||
class ConnStringFormatError(Exception): pass
|
class ConnStringFormatError(Exception): pass
|
||||||
|
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from electrum.network import Network
|
||||||
|
|
||||||
|
|
||||||
class HandshakeState(object):
|
class HandshakeState(object):
|
||||||
prologue = b"lightning"
|
prologue = b"lightning"
|
||||||
protocol_name = b"Noise_XK_secp256k1_ChaChaPoly_SHA256"
|
protocol_name = b"Noise_XK_secp256k1_ChaChaPoly_SHA256"
|
||||||
@@ -342,18 +348,18 @@ class LNTransport(LNTransportBase):
|
|||||||
"""Transport initiated by local party."""
|
"""Transport initiated by local party."""
|
||||||
|
|
||||||
def __init__(self, privkey: bytes, peer_addr: LNPeerAddr, *,
|
def __init__(self, privkey: bytes, peer_addr: LNPeerAddr, *,
|
||||||
proxy: Optional[dict]):
|
e_proxy: Optional['ESocksProxy']):
|
||||||
LNTransportBase.__init__(self)
|
LNTransportBase.__init__(self)
|
||||||
assert type(privkey) is bytes and len(privkey) == 32
|
assert type(privkey) is bytes and len(privkey) == 32
|
||||||
self.privkey = privkey
|
self.privkey = privkey
|
||||||
self.peer_addr = peer_addr
|
self.peer_addr = peer_addr
|
||||||
self.proxy = MySocksProxy.from_proxy_dict(proxy)
|
self.e_proxy = e_proxy
|
||||||
|
|
||||||
async def handshake(self):
|
async def handshake(self):
|
||||||
if not self.proxy:
|
if not self.e_proxy:
|
||||||
self.reader, self.writer = await asyncio.open_connection(self.peer_addr.host, self.peer_addr.port)
|
self.reader, self.writer = await asyncio.open_connection(self.peer_addr.host, self.peer_addr.port)
|
||||||
else:
|
else:
|
||||||
self.reader, self.writer = await self.proxy.open_connection(self.peer_addr.host, self.peer_addr.port)
|
self.reader, self.writer = await self.e_proxy.open_connection(self.peer_addr.host, self.peer_addr.port)
|
||||||
hs = HandshakeState(self.peer_addr.pubkey)
|
hs = HandshakeState(self.peer_addr.pubkey)
|
||||||
# Get a new ephemeral key
|
# Get a new ephemeral key
|
||||||
epriv, epub = create_ephemeral_key()
|
epriv, epub = create_ephemeral_key()
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ from electrum_ecc import ecdsa_der_sig_from_ecdsa_sig64
|
|||||||
|
|
||||||
from . import constants, util
|
from . import constants, util
|
||||||
from . import keystore
|
from . import keystore
|
||||||
from .util import profiler, chunks, OldTaskGroup
|
from .util import profiler, chunks, OldTaskGroup, ESocksProxy
|
||||||
from .invoices import Invoice, PR_UNPAID, PR_EXPIRED, PR_PAID, PR_INFLIGHT, PR_FAILED, PR_ROUTING, LN_EXPIRY_NEVER
|
from .invoices import Invoice, PR_UNPAID, PR_EXPIRED, PR_PAID, PR_INFLIGHT, PR_FAILED, PR_ROUTING, LN_EXPIRY_NEVER
|
||||||
from .invoices import BaseInvoice
|
from .invoices import BaseInvoice
|
||||||
from .util import NetworkRetryManager, JsonRPCClient, NotEnoughFunds
|
from .util import NetworkRetryManager, JsonRPCClient, NotEnoughFunds
|
||||||
@@ -356,7 +356,7 @@ class LNWorker(Logger, EventListener, NetworkRetryManager[LNPeerAddr]):
|
|||||||
if node_id == self.node_keypair.pubkey:
|
if node_id == self.node_keypair.pubkey:
|
||||||
raise ErrorAddingPeer("cannot connect to self")
|
raise ErrorAddingPeer("cannot connect to self")
|
||||||
transport = LNTransport(self.node_keypair.privkey, peer_addr,
|
transport = LNTransport(self.node_keypair.privkey, peer_addr,
|
||||||
proxy=self.network.proxy)
|
e_proxy=ESocksProxy.from_network_settings(self.network))
|
||||||
peer = await self._add_peer_from_transport(node_id=node_id, transport=transport)
|
peer = await self._add_peer_from_transport(node_id=node_id, transport=transport)
|
||||||
assert peer
|
assert peer
|
||||||
return peer
|
return peer
|
||||||
@@ -3054,7 +3054,7 @@ class LNWallet(LNWorker):
|
|||||||
async def _request_fclose(addresses):
|
async def _request_fclose(addresses):
|
||||||
for host, port, timestamp in addresses:
|
for host, port, timestamp in addresses:
|
||||||
peer_addr = LNPeerAddr(host, port, node_id)
|
peer_addr = LNPeerAddr(host, port, node_id)
|
||||||
transport = LNTransport(privkey, peer_addr, proxy=self.network.proxy)
|
transport = LNTransport(privkey, peer_addr, e_proxy=ESocksProxy.from_network_settings(self.network))
|
||||||
peer = Peer(self, node_id, transport, is_channel_backup=True)
|
peer = Peer(self, node_id, transport, is_channel_backup=True)
|
||||||
try:
|
try:
|
||||||
async with OldTaskGroup(wait=any) as group:
|
async with OldTaskGroup(wait=any) as group:
|
||||||
|
|||||||
@@ -323,7 +323,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
|
|||||||
|
|
||||||
self._allowed_protocols = {PREFERRED_NETWORK_PROTOCOL}
|
self._allowed_protocols = {PREFERRED_NETWORK_PROTOCOL}
|
||||||
|
|
||||||
self.proxy = None
|
self.proxy = None # type: Optional[dict]
|
||||||
self.is_proxy_tor = None
|
self.is_proxy_tor = None
|
||||||
self._init_parameters_from_config()
|
self._init_parameters_from_config()
|
||||||
|
|
||||||
@@ -885,7 +885,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
|
|||||||
self._set_status(ConnectionState.CONNECTING)
|
self._set_status(ConnectionState.CONNECTING)
|
||||||
self._trying_addr_now(server)
|
self._trying_addr_now(server)
|
||||||
|
|
||||||
interface = Interface(network=self, server=server, proxy=self.proxy)
|
interface = Interface(network=self, server=server)
|
||||||
# note: using longer timeouts here as DNS can sometimes be slow!
|
# note: using longer timeouts here as DNS can sometimes be slow!
|
||||||
timeout = self.get_network_timeout_seconds(NetworkTimeout.Generic)
|
timeout = self.get_network_timeout_seconds(NetworkTimeout.Generic)
|
||||||
try:
|
try:
|
||||||
@@ -1479,7 +1479,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
|
|||||||
timeout = self.get_network_timeout_seconds(NetworkTimeout.Urgent)
|
timeout = self.get_network_timeout_seconds(NetworkTimeout.Urgent)
|
||||||
responses = dict()
|
responses = dict()
|
||||||
async def get_response(server: ServerAddr):
|
async def get_response(server: ServerAddr):
|
||||||
interface = Interface(network=self, server=server, proxy=self.proxy)
|
interface = Interface(network=self, server=server)
|
||||||
try:
|
try:
|
||||||
await util.wait_for2(interface.ready, timeout)
|
await util.wait_for2(interface.ready, timeout)
|
||||||
except BaseException as e:
|
except BaseException as e:
|
||||||
|
|||||||
@@ -1944,7 +1944,7 @@ class NetworkRetryManager(Generic[_NetAddrType]):
|
|||||||
self._last_tried_addr.clear()
|
self._last_tried_addr.clear()
|
||||||
|
|
||||||
|
|
||||||
class MySocksProxy(aiorpcx.SOCKSProxy):
|
class ESocksProxy(aiorpcx.SOCKSProxy):
|
||||||
# note: proxy will not leak DNS as create_connection()
|
# note: proxy will not leak DNS as create_connection()
|
||||||
# sets (local DNS) resolve=False by default
|
# sets (local DNS) resolve=False by default
|
||||||
|
|
||||||
@@ -1958,12 +1958,17 @@ class MySocksProxy(aiorpcx.SOCKSProxy):
|
|||||||
return reader, writer
|
return reader, writer
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_proxy_dict(cls, proxy: dict = None) -> Optional['MySocksProxy']:
|
def from_network_settings(cls, network: Optional['Network']) -> Optional['ESocksProxy']:
|
||||||
if not proxy:
|
if not network or not network.proxy:
|
||||||
return None
|
return None
|
||||||
|
proxy = network.proxy
|
||||||
username, pw = proxy.get('user'), proxy.get('password')
|
username, pw = proxy.get('user'), proxy.get('password')
|
||||||
if not username or not pw:
|
if not username or not pw:
|
||||||
auth = None
|
# is_proxy_tor is tri-state; None indicates it is still probing the proxy to test for TOR
|
||||||
|
if network.is_proxy_tor:
|
||||||
|
auth = aiorpcx.socks.SOCKSRandomAuth()
|
||||||
|
else:
|
||||||
|
auth = None
|
||||||
else:
|
else:
|
||||||
auth = aiorpcx.socks.SOCKSUserAuth(username, pw)
|
auth = aiorpcx.socks.SOCKSUserAuth(username, pw)
|
||||||
addr = aiorpcx.NetAddress(proxy['host'], proxy['port'])
|
addr = aiorpcx.NetAddress(proxy['host'], proxy['port'])
|
||||||
|
|||||||
@@ -78,7 +78,7 @@ class TestLNTransport(ElectrumTestCase):
|
|||||||
responder_shaked.set()
|
responder_shaked.set()
|
||||||
async def connect(port: int):
|
async def connect(port: int):
|
||||||
peer_addr = LNPeerAddr('127.0.0.1', port, responder_key.get_public_key_bytes())
|
peer_addr = LNPeerAddr('127.0.0.1', port, responder_key.get_public_key_bytes())
|
||||||
t = LNTransport(initiator_key.get_secret_bytes(), peer_addr, proxy=None)
|
t = LNTransport(initiator_key.get_secret_bytes(), peer_addr, e_proxy=None)
|
||||||
await t.handshake()
|
await t.handshake()
|
||||||
async with OldTaskGroup() as group:
|
async with OldTaskGroup() as group:
|
||||||
await group.spawn(read_messages(t, messages_sent_by_server))
|
await group.spawn(read_messages(t, messages_sent_by_server))
|
||||||
|
|||||||
@@ -18,14 +18,14 @@ class MockNetwork:
|
|||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.asyncio_loop = util.get_asyncio_loop()
|
self.asyncio_loop = util.get_asyncio_loop()
|
||||||
self.taskgroup = OldTaskGroup()
|
self.taskgroup = OldTaskGroup()
|
||||||
|
self.proxy = None
|
||||||
|
|
||||||
class MockInterface(Interface):
|
class MockInterface(Interface):
|
||||||
def __init__(self, config):
|
def __init__(self, config):
|
||||||
self.config = config
|
self.config = config
|
||||||
network = MockNetwork()
|
network = MockNetwork()
|
||||||
network.config = config
|
network.config = config
|
||||||
super().__init__(network=network, server=ServerAddr.from_str('mock-server:50000:t'), proxy=None)
|
super().__init__(network=network, server=ServerAddr.from_str('mock-server:50000:t'))
|
||||||
self.q = asyncio.Queue()
|
self.q = asyncio.Queue()
|
||||||
self.blockchain = blockchain.Blockchain(config=self.config, forkpoint=0,
|
self.blockchain = blockchain.Blockchain(config=self.config, forkpoint=0,
|
||||||
parent=None, forkpoint_hash=constants.net.GENESIS, prev_hash=None)
|
parent=None, forkpoint_hash=constants.net.GENESIS, prev_hash=None)
|
||||||
|
|||||||
Reference in New Issue
Block a user