1
0

lnrouter fixes:

- use gossip_queries_req instead of initial_routing_sync
 - add connected nodes to recent peers only after successful init
 - derive timestamp used with gossip_timestamp_filter from channel_db
 - fix query_short_channel_ids:
     1. channel IDs must be sorted with zlib
     2. limit request to 100
     3. do not abuse this to request node_announcements; it is fine not to have all nodes.
 - fix get_recent_peers:
     1. do not set last_connected_date to 'now' if we never connected a node
     2. sql query was misconstructed and was returning only one peer
 - populate FALLBACK_NODE_LIST_MAINNET with nodes that have the requested flags
This commit is contained in:
ThomasV
2019-03-18 11:03:37 +01:00
parent e7888a50be
commit aa398993cf
4 changed files with 86 additions and 87 deletions

View File

@@ -23,6 +23,7 @@
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from datetime import datetime
import time
import random
import queue
@@ -157,9 +158,8 @@ class NodeInfo(Base):
addresses = NodeInfo.parse_addresses_field(payload['addresses'])
alias = payload['alias'].rstrip(b'\x00').hex()
timestamp = int.from_bytes(payload['timestamp'], "big")
now = int(time.time())
return NodeInfo(node_id=node_id, features=features, timestamp=timestamp, alias=alias), [
Address(host=host, port=port, node_id=node_id, last_connected_date=now) for host, port in addresses]
Address(host=host, port=port, node_id=node_id, last_connected_date=None) for host, port in addresses]
@staticmethod
def parse_addresses_field(addresses_field):
@@ -206,8 +206,7 @@ class Address(Base):
node_id = Column(String(66), ForeignKey('node_info.node_id'), primary_key=True)
host = Column(String(256), primary_key=True)
port = Column(Integer, primary_key=True)
last_connected_date = Column(Integer(), nullable=False)
last_connected_date = Column(Integer(), nullable=True)
@@ -273,11 +272,8 @@ class ChannelDB(SqlDB):
@sql
def get_recent_peers(self):
return [LNPeerAddr(x.host, x.port, bytes.fromhex(x.node_id)) for x in self.DBSession \
.query(Address) \
.select_from(NodeInfo) \
.order_by(Address.last_connected_date.desc()) \
.limit(self.NUM_MAX_RECENT_PEERS)]
r = self.DBSession.query(Address).filter(Address.last_connected_date.isnot(None)).order_by(Address.last_connected_date.desc()).limit(self.NUM_MAX_RECENT_PEERS).all()
return [LNPeerAddr(x.host, x.port, bytes.fromhex(x.node_id)) for x in r]
@sql
def get_channel_info(self, channel_id: bytes):
@@ -298,15 +294,6 @@ class ChannelDB(SqlDB):
chan_ids_from_policy = set(x[0] for x in self.DBSession.query(Policy.short_channel_id).filter(expr).all())
if chan_ids_from_policy:
return chan_ids_from_policy
# fetch channels for node_ids missing in node_info. that will also give us node_announcement
expr = not_(ChannelInfo.node1_id.in_(self.DBSession.query(NodeInfo.node_id)))
chan_ids_from_id1 = set(x[0] for x in self.DBSession.query(ChannelInfo.short_channel_id).filter(expr).all())
if chan_ids_from_id1:
return chan_ids_from_id1
expr = not_(ChannelInfo.node2_id.in_(self.DBSession.query(NodeInfo.node_id)))
chan_ids_from_id2 = set(x[0] for x in self.DBSession.query(ChannelInfo.short_channel_id).filter(expr).all())
if chan_ids_from_id2:
return chan_ids_from_id2
return set()
@sql
@@ -318,7 +305,7 @@ class ChannelDB(SqlDB):
self.DBSession.commit()
@sql
#@profiler
@profiler
def on_channel_announcement(self, msg_payloads, trusted=False):
if type(msg_payloads) is dict:
msg_payloads = [msg_payloads]
@@ -342,10 +329,16 @@ class ChannelDB(SqlDB):
for channel_info in new_channels.values():
self.DBSession.add(channel_info)
self.DBSession.commit()
self.print_error('on_channel_announcement: %d/%d'%(len(new_channels), len(msg_payloads)))
#self.print_error('on_channel_announcement: %d/%d'%(len(new_channels), len(msg_payloads)))
self._update_counts()
self.network.trigger_callback('ln_status')
@sql
def get_last_timestamp(self):
from sqlalchemy.sql import func
r = self.DBSession.query(func.max(Policy.timestamp).label('max_timestamp')).one()
return r.max_timestamp or 0
@sql
@profiler
def on_channel_update(self, msg_payloads, trusted=False):
@@ -368,7 +361,8 @@ class ChannelDB(SqlDB):
if not trusted and not verify_sig_for_channel_update(msg_payload, bytes.fromhex(node_id)):
continue
short_channel_id = channel_info.short_channel_id
new_policy = Policy.from_msg(msg_payload, node_id, channel_info.short_channel_id)
new_policy = Policy.from_msg(msg_payload, node_id, short_channel_id)
#self.print_error('on_channel_update', datetime.fromtimestamp(new_policy.timestamp).ctime())
old_policy = self.DBSession.query(Policy).filter_by(short_channel_id=short_channel_id, start_node=node_id).one_or_none()
if old_policy:
if old_policy.timestamp >= new_policy.timestamp:
@@ -378,6 +372,7 @@ class ChannelDB(SqlDB):
if p and p.timestamp >= new_policy.timestamp:
continue
new_policies[(short_channel_id, node_id)] = new_policy
self.print_error('on_channel_update: %d/%d'%(len(new_policies), len(msg_payloads)))
# commit pending removals
self.DBSession.commit()
# add and commit new policies
@@ -386,7 +381,7 @@ class ChannelDB(SqlDB):
self.DBSession.commit()
@sql
#@profiler
@profiler
def on_node_announcement(self, msg_payloads):
if type(msg_payloads) is dict:
msg_payloads = [msg_payloads]
@@ -403,7 +398,13 @@ class ChannelDB(SqlDB):
node_info, node_addresses = NodeInfo.from_msg(msg_payload)
except UnknownEvenFeatureBits:
continue
#self.print_error('received node announcement from', datetime.fromtimestamp(node_info.timestamp).ctime())
node_id = node_info.node_id
# Ignore node if it has no associated channel (DoS protection)
expr = or_(ChannelInfo.node1_id==node_id, ChannelInfo.node2_id==node_id)
if self.DBSession.query(ChannelInfo.short_channel_id).filter(expr).count() == 0:
#self.print_error('ignoring orphan node_announcement')
continue
node = self.DBSession.query(NodeInfo).filter_by(node_id=node_id).one_or_none()
if node and node.timestamp >= node_info.timestamp:
continue
@@ -413,20 +414,13 @@ class ChannelDB(SqlDB):
new_nodes[node_id] = node_info
for addr in node_addresses:
new_addresses[(addr.node_id,addr.host,addr.port)] = addr
self.print_error("on_node_announcements: %d/%d"%(len(new_nodes), len(msg_payloads)))
self.print_error("on_node_announcement: %d/%d"%(len(new_nodes), len(msg_payloads)))
for node_info in new_nodes.values():
self.DBSession.add(node_info)
for new_addr in new_addresses.values():
old_addr = self.DBSession.query(Address).filter_by(node_id=new_addr.node_id, host=new_addr.host, port=new_addr.port).one_or_none()
if old_addr:
old_addr.last_connected_date = new_addr.last_connected_date
else:
if not old_addr:
self.DBSession.add(new_addr)
# TODO if this message is for a new node, and if we have no associated
# channels for this node, we should ignore the message and return here,
# to mitigate DOS. but race condition: the channels we have for this
# node, might be under verification in self.ca_verifier, what then?
self.DBSession.commit()
self._update_counts()
self.network.trigger_callback('ln_status')