lnpeer: log both sent and recv msgs; use pubkey for incoming transports
Previously for incoming transports, the diagnostic_name (for log messages)
was just "responder" -- not sufficient to distinguish peers.
We now use the pubkey instead.
For outgoing transports it is f"{host}:{port}" (unchanged).
We could just use the pubkey for both uniformly; but it is quite long, and
it is hard to distinguish them at a glance.
This commit is contained in:
@@ -169,7 +169,7 @@ class AbstractChannel(Logger, ABC):
|
|||||||
old_state = self._state
|
old_state = self._state
|
||||||
if (old_state, state) not in state_transitions:
|
if (old_state, state) not in state_transitions:
|
||||||
raise Exception(f"Transition not allowed: {old_state.name} -> {state.name}")
|
raise Exception(f"Transition not allowed: {old_state.name} -> {state.name}")
|
||||||
self.logger.debug(f'Setting channel state: {old_state.name} -> {state.name}')
|
self.logger.debug(f'({self.get_id_for_log()}) Setting channel state: {old_state.name} -> {state.name}')
|
||||||
self._state = state
|
self._state = state
|
||||||
self.storage['state'] = self._state.name
|
self.storage['state'] = self._state.name
|
||||||
if self.lnworker:
|
if self.lnworker:
|
||||||
|
|||||||
@@ -65,6 +65,11 @@ LN_P2P_NETWORK_TIMEOUT = 20
|
|||||||
class Peer(Logger):
|
class Peer(Logger):
|
||||||
LOGGING_SHORTCUT = 'P'
|
LOGGING_SHORTCUT = 'P'
|
||||||
|
|
||||||
|
ORDERED_MESSAGES = (
|
||||||
|
'accept_channel', 'funding_signed', 'funding_created', 'accept_channel', 'channel_reestablish', 'closing_signed')
|
||||||
|
SPAMMY_MESSAGES = (
|
||||||
|
'ping', 'pong', 'channel_announcement', 'node_announcement', 'channel_update',)
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
lnworker: Union['LNGossip', 'LNWallet'],
|
lnworker: Union['LNGossip', 'LNWallet'],
|
||||||
@@ -91,7 +96,6 @@ class Peer(Logger):
|
|||||||
self.reply_channel_range = asyncio.Queue()
|
self.reply_channel_range = asyncio.Queue()
|
||||||
# gossip uses a single queue to preserve message order
|
# gossip uses a single queue to preserve message order
|
||||||
self.gossip_queue = asyncio.Queue()
|
self.gossip_queue = asyncio.Queue()
|
||||||
self.ordered_messages = ['accept_channel', 'funding_signed', 'funding_created', 'accept_channel', 'channel_reestablish', 'closing_signed']
|
|
||||||
self.ordered_message_queues = defaultdict(asyncio.Queue) # for messsage that are ordered
|
self.ordered_message_queues = defaultdict(asyncio.Queue) # for messsage that are ordered
|
||||||
self.temp_id_to_id = {} # to forward error messages
|
self.temp_id_to_id = {} # to forward error messages
|
||||||
self.funding_created_sent = set() # for channels in PREOPENING
|
self.funding_created_sent = set() # for channels in PREOPENING
|
||||||
@@ -109,7 +113,8 @@ class Peer(Logger):
|
|||||||
|
|
||||||
def send_message(self, message_name: str, **kwargs):
|
def send_message(self, message_name: str, **kwargs):
|
||||||
assert type(message_name) is str
|
assert type(message_name) is str
|
||||||
self.logger.debug(f"Sending {message_name.upper()}")
|
if message_name not in self.SPAMMY_MESSAGES:
|
||||||
|
self.logger.debug(f"Sending {message_name.upper()}")
|
||||||
if message_name.upper() != "INIT" and not self.is_initialized():
|
if message_name.upper() != "INIT" and not self.is_initialized():
|
||||||
raise Exception("tried to send message before we are initialized")
|
raise Exception("tried to send message before we are initialized")
|
||||||
raw_msg = encode_msg(message_name, **kwargs)
|
raw_msg = encode_msg(message_name, **kwargs)
|
||||||
@@ -184,10 +189,12 @@ class Peer(Logger):
|
|||||||
except UnknownOptionalMsgType as e:
|
except UnknownOptionalMsgType as e:
|
||||||
self.logger.info(f"received unknown message from peer. ignoring: {e!r}")
|
self.logger.info(f"received unknown message from peer. ignoring: {e!r}")
|
||||||
return
|
return
|
||||||
|
if message_type not in self.SPAMMY_MESSAGES:
|
||||||
|
self.logger.debug(f"Received {message_type.upper()}")
|
||||||
# only process INIT if we are a backup
|
# only process INIT if we are a backup
|
||||||
if self.is_channel_backup is True and message_type != 'init':
|
if self.is_channel_backup is True and message_type != 'init':
|
||||||
return
|
return
|
||||||
if message_type in self.ordered_messages:
|
if message_type in self.ORDERED_MESSAGES:
|
||||||
chan_id = payload.get('channel_id') or payload["temporary_channel_id"]
|
chan_id = payload.get('channel_id') or payload["temporary_channel_id"]
|
||||||
self.ordered_message_queues[chan_id].put_nowait((message_type, payload))
|
self.ordered_message_queues[chan_id].put_nowait((message_type, payload))
|
||||||
else:
|
else:
|
||||||
|
|||||||
@@ -157,6 +157,9 @@ class LNTransportBase:
|
|||||||
def close(self):
|
def close(self):
|
||||||
self.writer.close()
|
self.writer.close()
|
||||||
|
|
||||||
|
def remote_pubkey(self) -> Optional[bytes]:
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
|
||||||
class LNResponderTransport(LNTransportBase):
|
class LNResponderTransport(LNTransportBase):
|
||||||
"""Transport initiated by remote party."""
|
"""Transport initiated by remote party."""
|
||||||
@@ -166,9 +169,12 @@ class LNResponderTransport(LNTransportBase):
|
|||||||
self.reader = reader
|
self.reader = reader
|
||||||
self.writer = writer
|
self.writer = writer
|
||||||
self.privkey = privkey
|
self.privkey = privkey
|
||||||
|
self._pubkey = None # remote pubkey
|
||||||
|
|
||||||
def name(self):
|
def name(self):
|
||||||
return "responder"
|
pubkey = self.remote_pubkey()
|
||||||
|
pubkey_hex = pubkey.hex() if pubkey else pubkey
|
||||||
|
return f"{pubkey_hex}(in)"
|
||||||
|
|
||||||
async def handshake(self, **kwargs):
|
async def handshake(self, **kwargs):
|
||||||
hs = HandshakeState(privkey_to_pubkey(self.privkey))
|
hs = HandshakeState(privkey_to_pubkey(self.privkey))
|
||||||
@@ -221,8 +227,12 @@ class LNResponderTransport(LNTransportBase):
|
|||||||
_p = aead_decrypt(temp_k3, 0, hs.update(c), t)
|
_p = aead_decrypt(temp_k3, 0, hs.update(c), t)
|
||||||
self.rk, self.sk = get_bolt8_hkdf(ck, b'')
|
self.rk, self.sk = get_bolt8_hkdf(ck, b'')
|
||||||
self.init_counters(ck)
|
self.init_counters(ck)
|
||||||
|
self._pubkey = rs
|
||||||
return rs
|
return rs
|
||||||
|
|
||||||
|
def remote_pubkey(self) -> Optional[bytes]:
|
||||||
|
return self._pubkey
|
||||||
|
|
||||||
|
|
||||||
class LNTransport(LNTransportBase):
|
class LNTransport(LNTransportBase):
|
||||||
"""Transport initiated by local party."""
|
"""Transport initiated by local party."""
|
||||||
@@ -276,3 +286,6 @@ class LNTransport(LNTransportBase):
|
|||||||
self.writer.write(msg)
|
self.writer.write(msg)
|
||||||
self.sk, self.rk = get_bolt8_hkdf(hs.ck, b'')
|
self.sk, self.rk = get_bolt8_hkdf(hs.ck, b'')
|
||||||
self.init_counters(ck)
|
self.init_counters(ck)
|
||||||
|
|
||||||
|
def remote_pubkey(self) -> Optional[bytes]:
|
||||||
|
return self.peer_addr.pubkey
|
||||||
|
|||||||
Reference in New Issue
Block a user