lnpeer: only set initialized after both sent AND received "init"
had a trace where we tried to send "funding_locked" before being initialized:
D | lnpeer.Peer.[iq7zhmhck54vcax2vlrdcavq2m32wao7ekh6jyeglmnuuvv3js57r4id.onion:9735] | Sending FUNDING_LOCKED
E | lnworker.LNWallet | Exception in on_update_open_channel: AttributeError("'LNTransport' object has no attribute 'sk'")
Traceback (most recent call last):
File "...\electrum\electrum\util.py", line 999, in wrapper
return await func(*args, **kwargs)
File "...\electrum\electrum\lnworker.py", line 674, in on_update_open_channel
peer.send_funding_locked(chan)
File "...\electrum\electrum\lnpeer.py", line 876, in send_funding_locked
self.send_message("funding_locked", channel_id=channel_id, next_per_commitment_point=per_commitment_point_second)
File "...\electrum\electrum\lnpeer.py", line 102, in send_message
self.transport.send_bytes(raw_msg)
File "...\electrum\electrum\lntransport.py", line 93, in send_bytes
lc = aead_encrypt(self.sk, self.sn(), b'', l)
AttributeError: 'LNTransport' object has no attribute 'sk'
This commit is contained in:
@@ -65,6 +65,8 @@ def channel_id_from_funding_tx(funding_txid: str, funding_index: int) -> Tuple[b
|
|||||||
class Peer(Logger):
|
class Peer(Logger):
|
||||||
|
|
||||||
def __init__(self, lnworker: Union['LNGossip', 'LNWallet'], pubkey:bytes, transport: LNTransportBase):
|
def __init__(self, lnworker: Union['LNGossip', 'LNWallet'], pubkey:bytes, transport: LNTransportBase):
|
||||||
|
self._sent_init = False # type: bool
|
||||||
|
self._received_init = False # type: bool
|
||||||
self.initialized = asyncio.Event()
|
self.initialized = asyncio.Event()
|
||||||
self.querying = asyncio.Event()
|
self.querying = asyncio.Event()
|
||||||
self.transport = transport
|
self.transport = transport
|
||||||
@@ -97,6 +99,8 @@ class Peer(Logger):
|
|||||||
def send_message(self, message_name: str, **kwargs):
|
def send_message(self, message_name: str, **kwargs):
|
||||||
assert type(message_name) is str
|
assert type(message_name) is str
|
||||||
self.logger.debug(f"Sending {message_name.upper()}")
|
self.logger.debug(f"Sending {message_name.upper()}")
|
||||||
|
if message_name.upper() != "INIT" and not self.initialized.is_set():
|
||||||
|
raise Exception("tried to send message before we are initialized")
|
||||||
raw_msg = encode_msg(message_name, **kwargs)
|
raw_msg = encode_msg(message_name, **kwargs)
|
||||||
self._store_raw_msg_if_local_update(raw_msg, message_name=message_name, channel_id=kwargs.get("channel_id"))
|
self._store_raw_msg_if_local_update(raw_msg, message_name=message_name, channel_id=kwargs.get("channel_id"))
|
||||||
self.transport.send_bytes(raw_msg)
|
self.transport.send_bytes(raw_msg)
|
||||||
@@ -116,6 +120,7 @@ class Peer(Logger):
|
|||||||
if isinstance(self.transport, LNTransport):
|
if isinstance(self.transport, LNTransport):
|
||||||
await self.transport.handshake()
|
await self.transport.handshake()
|
||||||
self.send_message("init", gflen=0, lflen=1, localfeatures=self.localfeatures)
|
self.send_message("init", gflen=0, lflen=1, localfeatures=self.localfeatures)
|
||||||
|
self._sent_init = True
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def channels(self) -> Dict[bytes, Channel]:
|
def channels(self) -> Dict[bytes, Channel]:
|
||||||
@@ -176,7 +181,7 @@ class Peer(Logger):
|
|||||||
self.funding_created[channel_id].put_nowait(payload)
|
self.funding_created[channel_id].put_nowait(payload)
|
||||||
|
|
||||||
def on_init(self, payload):
|
def on_init(self, payload):
|
||||||
if self.initialized.is_set():
|
if self._received_init:
|
||||||
self.logger.info("ALREADY INITIALIZED BUT RECEIVED INIT")
|
self.logger.info("ALREADY INITIALIZED BUT RECEIVED INIT")
|
||||||
return
|
return
|
||||||
# if they required some even flag we don't have, they will close themselves
|
# if they required some even flag we don't have, they will close themselves
|
||||||
@@ -199,7 +204,9 @@ class Peer(Logger):
|
|||||||
self.localfeatures |= 1 << get_ln_flag_pair_of_bit(flag)
|
self.localfeatures |= 1 << get_ln_flag_pair_of_bit(flag)
|
||||||
if isinstance(self.transport, LNTransport):
|
if isinstance(self.transport, LNTransport):
|
||||||
self.channel_db.add_recent_peer(self.transport.peer_addr)
|
self.channel_db.add_recent_peer(self.transport.peer_addr)
|
||||||
self.initialized.set()
|
self._received_init = True
|
||||||
|
if self._sent_init and self._received_init:
|
||||||
|
self.initialized.set()
|
||||||
|
|
||||||
def on_node_announcement(self, payload):
|
def on_node_announcement(self, payload):
|
||||||
self.gossip_queue.put_nowait(('node_announcement', payload))
|
self.gossip_queue.put_nowait(('node_announcement', payload))
|
||||||
@@ -436,6 +443,8 @@ class Peer(Logger):
|
|||||||
await asyncio.wait_for(self.initialize(), LN_P2P_NETWORK_TIMEOUT)
|
await asyncio.wait_for(self.initialize(), LN_P2P_NETWORK_TIMEOUT)
|
||||||
except (OSError, asyncio.TimeoutError, HandshakeFailed) as e:
|
except (OSError, asyncio.TimeoutError, HandshakeFailed) as e:
|
||||||
raise GracefulDisconnect(f'initialize failed: {repr(e)}') from e
|
raise GracefulDisconnect(f'initialize failed: {repr(e)}') from e
|
||||||
|
if self._sent_init and self._received_init:
|
||||||
|
self.initialized.set()
|
||||||
async for msg in self.transport.read_messages():
|
async for msg in self.transport.read_messages():
|
||||||
self.process_message(msg)
|
self.process_message(msg)
|
||||||
await asyncio.sleep(.01)
|
await asyncio.sleep(.01)
|
||||||
|
|||||||
@@ -670,7 +670,7 @@ class LNWallet(LNWorker):
|
|||||||
|
|
||||||
if chan.get_state() == channel_states.FUNDED:
|
if chan.get_state() == channel_states.FUNDED:
|
||||||
peer = self.peers.get(chan.node_id)
|
peer = self.peers.get(chan.node_id)
|
||||||
if peer:
|
if peer and peer.initialized.is_set():
|
||||||
peer.send_funding_locked(chan)
|
peer.send_funding_locked(chan)
|
||||||
|
|
||||||
elif chan.get_state() == channel_states.OPEN:
|
elif chan.get_state() == channel_states.OPEN:
|
||||||
|
|||||||
Reference in New Issue
Block a user