@@ -43,7 +43,7 @@ from aiorpcx.jsonrpc import JSONRPC, CodeMessageError
|
|||||||
from aiorpcx.rawsocket import RSClient
|
from aiorpcx.rawsocket import RSClient
|
||||||
import certifi
|
import certifi
|
||||||
|
|
||||||
from .util import ignore_exceptions, log_exceptions, bfh, SilentTaskGroup
|
from .util import ignore_exceptions, log_exceptions, bfh, SilentTaskGroup, MySocksProxy
|
||||||
from . import util
|
from . import util
|
||||||
from . import x509
|
from . import x509
|
||||||
from . import pem
|
from . import pem
|
||||||
@@ -277,7 +277,7 @@ class Interface(Logger):
|
|||||||
self.blockchain = None # type: Optional[Blockchain]
|
self.blockchain = None # type: Optional[Blockchain]
|
||||||
self._requested_chunks = set() # type: Set[int]
|
self._requested_chunks = set() # type: Set[int]
|
||||||
self.network = network
|
self.network = network
|
||||||
self._set_proxy(proxy)
|
self.proxy = MySocksProxy.from_proxy_dict(proxy)
|
||||||
self.session = None # type: Optional[NotificationSession]
|
self.session = None # type: Optional[NotificationSession]
|
||||||
self._ipaddr_bucket = None
|
self._ipaddr_bucket = None
|
||||||
|
|
||||||
@@ -310,23 +310,6 @@ class Interface(Logger):
|
|||||||
def __str__(self):
|
def __str__(self):
|
||||||
return f"<Interface {self.diagnostic_name()}>"
|
return f"<Interface {self.diagnostic_name()}>"
|
||||||
|
|
||||||
def _set_proxy(self, proxy: dict):
|
|
||||||
if proxy:
|
|
||||||
username, pw = proxy.get('user'), proxy.get('password')
|
|
||||||
if not username or not pw:
|
|
||||||
auth = None
|
|
||||||
else:
|
|
||||||
auth = aiorpcx.socks.SOCKSUserAuth(username, pw)
|
|
||||||
addr = NetAddress(proxy['host'], proxy['port'])
|
|
||||||
if proxy['mode'] == "socks4":
|
|
||||||
self.proxy = aiorpcx.socks.SOCKSProxy(addr, aiorpcx.socks.SOCKS4a, auth)
|
|
||||||
elif proxy['mode'] == "socks5":
|
|
||||||
self.proxy = aiorpcx.socks.SOCKSProxy(addr, aiorpcx.socks.SOCKS5, auth)
|
|
||||||
else:
|
|
||||||
raise NotImplementedError # http proxy not available with aiorpcx
|
|
||||||
else:
|
|
||||||
self.proxy = None
|
|
||||||
|
|
||||||
async def is_server_ca_signed(self, ca_ssl_context):
|
async def is_server_ca_signed(self, ca_ssl_context):
|
||||||
"""Given a CA enforcing SSL context, returns True if the connection
|
"""Given a CA enforcing SSL context, returns True if the connection
|
||||||
can be established. Returns False if the server has a self-signed
|
can be established. Returns False if the server has a self-signed
|
||||||
|
|||||||
@@ -251,7 +251,8 @@ class Peer(Logger):
|
|||||||
return await func(self, *args, **kwargs)
|
return await func(self, *args, **kwargs)
|
||||||
except GracefulDisconnect as e:
|
except GracefulDisconnect as e:
|
||||||
self.logger.log(e.log_level, f"Disconnecting: {repr(e)}")
|
self.logger.log(e.log_level, f"Disconnecting: {repr(e)}")
|
||||||
except (LightningPeerConnectionClosed, IncompatibleLightningFeatures) as e:
|
except (LightningPeerConnectionClosed, IncompatibleLightningFeatures,
|
||||||
|
aiorpcx.socks.SOCKSError) as e:
|
||||||
self.logger.info(f"Disconnecting: {repr(e)}")
|
self.logger.info(f"Disconnecting: {repr(e)}")
|
||||||
finally:
|
finally:
|
||||||
self.close_and_cleanup()
|
self.close_and_cleanup()
|
||||||
|
|||||||
@@ -8,12 +8,14 @@
|
|||||||
import hashlib
|
import hashlib
|
||||||
import asyncio
|
import asyncio
|
||||||
from asyncio import StreamReader, StreamWriter
|
from asyncio import StreamReader, StreamWriter
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
from .crypto import sha256, hmac_oneshot, chacha20_poly1305_encrypt, chacha20_poly1305_decrypt
|
from .crypto import sha256, hmac_oneshot, chacha20_poly1305_encrypt, chacha20_poly1305_decrypt
|
||||||
from .lnutil import (get_ecdh, privkey_to_pubkey, LightningPeerConnectionClosed,
|
from .lnutil import (get_ecdh, privkey_to_pubkey, LightningPeerConnectionClosed,
|
||||||
HandshakeFailed, LNPeerAddr)
|
HandshakeFailed, LNPeerAddr)
|
||||||
from . import ecc
|
from . import ecc
|
||||||
from .util import bh2u
|
from .util import bh2u, MySocksProxy
|
||||||
|
|
||||||
|
|
||||||
class HandshakeState(object):
|
class HandshakeState(object):
|
||||||
prologue = b"lightning"
|
prologue = b"lightning"
|
||||||
@@ -217,17 +219,22 @@ class LNResponderTransport(LNTransportBase):
|
|||||||
class LNTransport(LNTransportBase):
|
class LNTransport(LNTransportBase):
|
||||||
"""Transport initiated by local party."""
|
"""Transport initiated by local party."""
|
||||||
|
|
||||||
def __init__(self, privkey: bytes, peer_addr: LNPeerAddr):
|
def __init__(self, privkey: bytes, peer_addr: LNPeerAddr, *,
|
||||||
|
proxy: Optional[dict]):
|
||||||
LNTransportBase.__init__(self)
|
LNTransportBase.__init__(self)
|
||||||
assert type(privkey) is bytes and len(privkey) == 32
|
assert type(privkey) is bytes and len(privkey) == 32
|
||||||
self.privkey = privkey
|
self.privkey = privkey
|
||||||
self.peer_addr = peer_addr
|
self.peer_addr = peer_addr
|
||||||
|
self.proxy = MySocksProxy.from_proxy_dict(proxy)
|
||||||
|
|
||||||
def name(self):
|
def name(self):
|
||||||
return self.peer_addr.net_addr_str()
|
return self.peer_addr.net_addr_str()
|
||||||
|
|
||||||
async def handshake(self):
|
async def handshake(self):
|
||||||
self.reader, self.writer = await asyncio.open_connection(self.peer_addr.host, self.peer_addr.port)
|
if not self.proxy:
|
||||||
|
self.reader, self.writer = await asyncio.open_connection(self.peer_addr.host, self.peer_addr.port)
|
||||||
|
else:
|
||||||
|
self.reader, self.writer = await self.proxy.open_connection(self.peer_addr.host, self.peer_addr.port)
|
||||||
hs = HandshakeState(self.peer_addr.pubkey)
|
hs = HandshakeState(self.peer_addr.pubkey)
|
||||||
# Get a new ephemeral key
|
# Get a new ephemeral key
|
||||||
epriv, epub = create_ephemeral_key()
|
epriv, epub = create_ephemeral_key()
|
||||||
|
|||||||
@@ -162,6 +162,8 @@ class LNWorker(Logger, NetworkRetryManager[LNPeerAddr]):
|
|||||||
self.features |= LnFeatures.VAR_ONION_OPT
|
self.features |= LnFeatures.VAR_ONION_OPT
|
||||||
self.features |= LnFeatures.PAYMENT_SECRET_OPT
|
self.features |= LnFeatures.PAYMENT_SECRET_OPT
|
||||||
|
|
||||||
|
util.register_callback(self.on_proxy_changed, ['proxy_set'])
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def peers(self) -> Mapping[bytes, Peer]:
|
def peers(self) -> Mapping[bytes, Peer]:
|
||||||
"""Returns a read-only copy of peers."""
|
"""Returns a read-only copy of peers."""
|
||||||
@@ -191,6 +193,7 @@ class LNWorker(Logger, NetworkRetryManager[LNPeerAddr]):
|
|||||||
await self.taskgroup.spawn(peer.main_loop())
|
await self.taskgroup.spawn(peer.main_loop())
|
||||||
try:
|
try:
|
||||||
# FIXME: server.close(), server.wait_closed(), etc... ?
|
# FIXME: server.close(), server.wait_closed(), etc... ?
|
||||||
|
# TODO: onion hidden service?
|
||||||
server = await asyncio.start_server(cb, addr, int(port))
|
server = await asyncio.start_server(cb, addr, int(port))
|
||||||
except OSError as e:
|
except OSError as e:
|
||||||
self.logger.error(f"cannot listen for lightning p2p. error: {e!r}")
|
self.logger.error(f"cannot listen for lightning p2p. error: {e!r}")
|
||||||
@@ -224,7 +227,8 @@ class LNWorker(Logger, NetworkRetryManager[LNPeerAddr]):
|
|||||||
return self._peers[node_id]
|
return self._peers[node_id]
|
||||||
port = int(port)
|
port = int(port)
|
||||||
peer_addr = LNPeerAddr(host, port, node_id)
|
peer_addr = LNPeerAddr(host, port, node_id)
|
||||||
transport = LNTransport(self.node_keypair.privkey, peer_addr)
|
transport = LNTransport(self.node_keypair.privkey, peer_addr,
|
||||||
|
proxy=self.network.proxy)
|
||||||
self._trying_addr_now(peer_addr)
|
self._trying_addr_now(peer_addr)
|
||||||
self.logger.info(f"adding peer {peer_addr}")
|
self.logger.info(f"adding peer {peer_addr}")
|
||||||
peer = Peer(self, node_id, transport)
|
peer = Peer(self, node_id, transport)
|
||||||
@@ -381,6 +385,10 @@ class LNWorker(Logger, NetworkRetryManager[LNPeerAddr]):
|
|||||||
choice = random.choice(addr_list)
|
choice = random.choice(addr_list)
|
||||||
return choice
|
return choice
|
||||||
|
|
||||||
|
def on_proxy_changed(self, event, *args):
|
||||||
|
for peer in self.peers.values():
|
||||||
|
peer.close_and_cleanup()
|
||||||
|
|
||||||
|
|
||||||
class LNGossip(LNWorker):
|
class LNGossip(LNWorker):
|
||||||
max_age = 14*24*3600
|
max_age = 14*24*3600
|
||||||
@@ -1415,7 +1423,8 @@ class LNBackups(Logger):
|
|||||||
async def request_force_close(self, channel_id):
|
async def request_force_close(self, channel_id):
|
||||||
cb = self.channel_backups[channel_id].cb
|
cb = self.channel_backups[channel_id].cb
|
||||||
peer_addr = LNPeerAddr(cb.host, cb.port, cb.node_id)
|
peer_addr = LNPeerAddr(cb.host, cb.port, cb.node_id)
|
||||||
transport = LNTransport(cb.privkey, peer_addr)
|
transport = LNTransport(cb.privkey, peer_addr,
|
||||||
|
proxy=self.network.proxy)
|
||||||
peer = Peer(self, cb.node_id, transport)
|
peer = Peer(self, cb.node_id, transport)
|
||||||
await self.taskgroup.spawn(peer._message_loop())
|
await self.taskgroup.spawn(peer._message_loop())
|
||||||
await peer.initialized
|
await peer.initialized
|
||||||
|
|||||||
@@ -57,7 +57,7 @@ class TestLNTransport(ElectrumTestCase):
|
|||||||
server = server_future.result() # type: asyncio.Server
|
server = server_future.result() # type: asyncio.Server
|
||||||
async def connect():
|
async def connect():
|
||||||
peer_addr = LNPeerAddr('127.0.0.1', 42898, responder_key.get_public_key_bytes())
|
peer_addr = LNPeerAddr('127.0.0.1', 42898, responder_key.get_public_key_bytes())
|
||||||
t = LNTransport(initiator_key.get_secret_bytes(), peer_addr)
|
t = LNTransport(initiator_key.get_secret_bytes(), peer_addr, proxy=None)
|
||||||
await t.handshake()
|
await t.handshake()
|
||||||
t.send_bytes(b'hello from client')
|
t.send_bytes(b'hello from client')
|
||||||
self.assertEqual(await t.read_messages().__anext__(), b'hello from server')
|
self.assertEqual(await t.read_messages().__anext__(), b'hello from server')
|
||||||
|
|||||||
@@ -46,6 +46,7 @@ import random
|
|||||||
|
|
||||||
import aiohttp
|
import aiohttp
|
||||||
from aiohttp_socks import ProxyConnector, ProxyType
|
from aiohttp_socks import ProxyConnector, ProxyType
|
||||||
|
import aiorpcx
|
||||||
from aiorpcx import TaskGroup
|
from aiorpcx import TaskGroup
|
||||||
import certifi
|
import certifi
|
||||||
import dns.resolver
|
import dns.resolver
|
||||||
@@ -1397,3 +1398,33 @@ class NetworkRetryManager(Generic[_NetAddrType]):
|
|||||||
|
|
||||||
def _clear_addr_retry_times(self) -> None:
|
def _clear_addr_retry_times(self) -> None:
|
||||||
self._last_tried_addr.clear()
|
self._last_tried_addr.clear()
|
||||||
|
|
||||||
|
|
||||||
|
class MySocksProxy(aiorpcx.SOCKSProxy):
|
||||||
|
|
||||||
|
async def open_connection(self, host=None, port=None, **kwargs):
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
reader = asyncio.StreamReader(loop=loop)
|
||||||
|
protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
|
||||||
|
transport, _ = await self.create_connection(
|
||||||
|
lambda: protocol, host, port, **kwargs)
|
||||||
|
writer = asyncio.StreamWriter(transport, protocol, reader, loop)
|
||||||
|
return reader, writer
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_proxy_dict(cls, proxy: dict = None) -> Optional['MySocksProxy']:
|
||||||
|
if not proxy:
|
||||||
|
return None
|
||||||
|
username, pw = proxy.get('user'), proxy.get('password')
|
||||||
|
if not username or not pw:
|
||||||
|
auth = None
|
||||||
|
else:
|
||||||
|
auth = aiorpcx.socks.SOCKSUserAuth(username, pw)
|
||||||
|
addr = aiorpcx.NetAddress(proxy['host'], proxy['port'])
|
||||||
|
if proxy['mode'] == "socks4":
|
||||||
|
ret = cls(addr, aiorpcx.socks.SOCKS4a, auth)
|
||||||
|
elif proxy['mode'] == "socks5":
|
||||||
|
ret = cls(addr, aiorpcx.socks.SOCKS5, auth)
|
||||||
|
else:
|
||||||
|
raise NotImplementedError # http proxy not available with aiorpcx
|
||||||
|
return ret
|
||||||
|
|||||||
Reference in New Issue
Block a user