Unify message IDs between network and interfaces
Previously network.py had its own idea of request IDs, and each interface had its own which was sent on the wire. The interface would jump through hoops to translate one to the other. This unifies them so that a message ID is passed when queueing a request, in addition to the method and params. network.py is now solely responsible for message ID management. Apart from being simpler and clearer, this also should be faster as there is much less data structure manipulation and rebuilding happening.
This commit is contained in:
@@ -221,7 +221,6 @@ class Interface(util.PrintError):
|
|||||||
self.pipe.set_timeout(0.0) # Don't wait for data
|
self.pipe.set_timeout(0.0) # Don't wait for data
|
||||||
# Dump network messages. Set at runtime from the console.
|
# Dump network messages. Set at runtime from the console.
|
||||||
self.debug = False
|
self.debug = False
|
||||||
self.message_id = 0
|
|
||||||
self.unsent_requests = []
|
self.unsent_requests = []
|
||||||
self.unanswered_requests = {}
|
self.unanswered_requests = {}
|
||||||
# Set last ping to zero to ensure immediate ping
|
# Set last ping to zero to ensure immediate ping
|
||||||
@@ -241,32 +240,26 @@ class Interface(util.PrintError):
|
|||||||
self.socket.shutdown(socket.SHUT_RDWR)
|
self.socket.shutdown(socket.SHUT_RDWR)
|
||||||
self.socket.close()
|
self.socket.close()
|
||||||
|
|
||||||
def queue_request(self, request):
|
def queue_request(self, *args): # method, params, _id
|
||||||
'''Queue a request.'''
|
'''Queue a request, later to be send with send_requests when the
|
||||||
|
socket is available for writing.
|
||||||
|
'''
|
||||||
self.request_time = time.time()
|
self.request_time = time.time()
|
||||||
self.unsent_requests.append(request)
|
self.unsent_requests.append(args)
|
||||||
|
|
||||||
def send_requests(self):
|
def send_requests(self):
|
||||||
'''Sends all queued requests. Returns False on failure.'''
|
'''Sends all queued requests. Returns False on failure.'''
|
||||||
def copy_request(orig):
|
make_dict = lambda (m, p, i): {'method': m, 'params': p, 'id': i}
|
||||||
# Replace ID after making copy - mustn't change caller's copy
|
wire_requests = map(make_dict, self.unsent_requests)
|
||||||
request = orig.copy()
|
|
||||||
request['id'] = self.message_id
|
|
||||||
self.message_id += 1
|
|
||||||
if self.debug:
|
|
||||||
self.print_error("-->", request, orig.get('id'))
|
|
||||||
return request
|
|
||||||
|
|
||||||
requests_as_sent = map(copy_request, self.unsent_requests)
|
|
||||||
try:
|
try:
|
||||||
self.pipe.send_all(requests_as_sent)
|
self.pipe.send_all(wire_requests)
|
||||||
except socket.error, e:
|
except socket.error, e:
|
||||||
self.print_error("socket error:", e)
|
self.print_error("socket error:", e)
|
||||||
return False
|
return False
|
||||||
# unanswered_requests stores the original unmodified user
|
for request in self.unsent_requests:
|
||||||
# request, keyed by wire ID
|
if self.debug:
|
||||||
for n, request in enumerate(self.unsent_requests):
|
self.print_error("-->", request)
|
||||||
self.unanswered_requests[requests_as_sent[n]['id']] = request
|
self.unanswered_requests[request[2]] = request
|
||||||
self.unsent_requests = []
|
self.unsent_requests = []
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@@ -291,37 +284,39 @@ class Interface(util.PrintError):
|
|||||||
|
|
||||||
def get_responses(self):
|
def get_responses(self):
|
||||||
'''Call if there is data available on the socket. Returns a list of
|
'''Call if there is data available on the socket. Returns a list of
|
||||||
notifications and a list of responses. The notifications are
|
(request, response) pairs. Notifications are singleton
|
||||||
singleton unsolicited responses presumably as a result of
|
unsolicited responses presumably as a result of prior
|
||||||
prior subscriptions. The responses are (request, response)
|
subscriptions, so request is None and there is no 'id' member.
|
||||||
pairs. If the connection was closed remotely or the remote
|
Otherwise it is a response, which has an 'id' member and a
|
||||||
server is misbehaving, the last notification will be None.
|
corresponding request. If the connection was closed remotely
|
||||||
|
or the remote server is misbehaving, a (None, None) will appear.
|
||||||
'''
|
'''
|
||||||
notifications, responses = [], []
|
responses = []
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
response = self.pipe.get()
|
response = self.pipe.get()
|
||||||
except util.timeout:
|
except util.timeout:
|
||||||
break
|
break
|
||||||
if response is None:
|
if response is None:
|
||||||
notifications.append(None)
|
responses.append((None, None))
|
||||||
self.closed_remotely = True
|
self.closed_remotely = True
|
||||||
self.print_error("connection closed remotely")
|
self.print_error("connection closed remotely")
|
||||||
break
|
break
|
||||||
if self.debug:
|
if self.debug:
|
||||||
self.print_error("<--", response)
|
self.print_error("<--", response)
|
||||||
wire_id = response.pop('id', None)
|
wire_id = response.get('id', None)
|
||||||
if wire_id is None:
|
if wire_id is None: # Notification
|
||||||
notifications.append(response)
|
responses.append((None, response))
|
||||||
elif wire_id in self.unanswered_requests:
|
|
||||||
request = self.unanswered_requests.pop(wire_id)
|
|
||||||
responses.append((request, response))
|
|
||||||
else:
|
else:
|
||||||
notifications.append(None)
|
request = self.unanswered_requests.pop(wire_id, None)
|
||||||
self.print_error("unknown wire ID", wire_id)
|
if request:
|
||||||
break
|
responses.append((request, response))
|
||||||
|
else:
|
||||||
|
self.print_error("unknown wire ID", wire_id)
|
||||||
|
responses.append(None, None) # Signal
|
||||||
|
break
|
||||||
|
|
||||||
return notifications, responses
|
return responses
|
||||||
|
|
||||||
|
|
||||||
def check_cert(host, cert):
|
def check_cert(host, cert):
|
||||||
|
|||||||
105
lib/network.py
105
lib/network.py
@@ -254,15 +254,26 @@ class Network(util.DaemonThread):
|
|||||||
def is_up_to_date(self):
|
def is_up_to_date(self):
|
||||||
return self.unanswered_requests == {}
|
return self.unanswered_requests == {}
|
||||||
|
|
||||||
def queue_request(self, method, params):
|
def queue_request(self, method, params, interface=None):
|
||||||
self.interface.queue_request({'method': method, 'params': params})
|
# If you want to queue a request on any interface it must go
|
||||||
|
# through this function so message ids are properly tracked
|
||||||
|
if interface is None:
|
||||||
|
interface = self.interface
|
||||||
|
message_id = self.message_id
|
||||||
|
self.message_id += 1
|
||||||
|
interface.queue_request(method, params, message_id)
|
||||||
|
return message_id
|
||||||
|
|
||||||
def send_subscriptions(self):
|
def send_subscriptions(self):
|
||||||
# clear cache
|
# clear cache
|
||||||
self.cached_responses = {}
|
self.cached_responses = {}
|
||||||
self.print_error('sending subscriptions to', self.interface.server, len(self.unanswered_requests), len(self.subscribed_addresses))
|
self.print_error('sending subscriptions to', self.interface.server, len(self.unanswered_requests), len(self.subscribed_addresses))
|
||||||
for r in self.unanswered_requests.values():
|
# Resend unanswered requests
|
||||||
self.interface.queue_request(r[0])
|
requests = self.unanswered_requests.values()
|
||||||
|
self.unanswered_requests = {}
|
||||||
|
for request in requests:
|
||||||
|
message_id = self.queue_request(request[0], request[1])
|
||||||
|
self.unanswered_requests[message_id] = request
|
||||||
for addr in self.subscribed_addresses:
|
for addr in self.subscribed_addresses:
|
||||||
self.queue_request('blockchain.address.subscribe', [addr])
|
self.queue_request('blockchain.address.subscribe', [addr])
|
||||||
self.queue_request('server.banner', [])
|
self.queue_request('server.banner', [])
|
||||||
@@ -488,37 +499,38 @@ class Network(util.DaemonThread):
|
|||||||
callback(response)
|
callback(response)
|
||||||
|
|
||||||
def process_responses(self, interface):
|
def process_responses(self, interface):
|
||||||
notifications, responses = interface.get_responses()
|
responses = interface.get_responses()
|
||||||
|
|
||||||
for request, response in responses:
|
for request, response in responses:
|
||||||
# Client ID was given by the daemon or proxy
|
callback = None
|
||||||
client_id = request.get('id')
|
if request:
|
||||||
if client_id is not None:
|
method, params, message_id = request
|
||||||
if interface != self.interface:
|
# client requests go through self.send() with a
|
||||||
continue
|
# callback, are only sent to the current interface,
|
||||||
_req, callback = self.unanswered_requests.pop(client_id)
|
# and are placed in the unanswered_requests dictionary
|
||||||
|
client_req = self.unanswered_requests.pop(message_id, None)
|
||||||
|
if client_req:
|
||||||
|
assert interface == self.interface
|
||||||
|
callback = client_req[2]
|
||||||
|
# Copy the request method and params to the response
|
||||||
|
response['method'] = method
|
||||||
|
response['params'] = params
|
||||||
else:
|
else:
|
||||||
callback = None
|
if not response: # Closed remotely / misbehaving
|
||||||
# Copy the request method and params to the response
|
self.connection_down(interface.server)
|
||||||
response['method'] = request.get('method')
|
break
|
||||||
response['params'] = request.get('params')
|
# Rewrite response shape to match subscription request response
|
||||||
response['id'] = client_id
|
method = response.get('method')
|
||||||
self.process_response(interface, response, callback)
|
params = response.get('params')
|
||||||
|
if method == 'blockchain.headers.subscribe':
|
||||||
|
response['result'] = params[0]
|
||||||
|
response['params'] = []
|
||||||
|
elif method == 'blockchain.address.subscribe':
|
||||||
|
response['params'] = [params[0]] # addr
|
||||||
|
response['result'] = params[1]
|
||||||
|
|
||||||
for response in notifications:
|
# Response is now in canonical form
|
||||||
if not response: # Closed remotely
|
self.process_response(interface, response, callback)
|
||||||
self.connection_down(interface.server)
|
|
||||||
break
|
|
||||||
# Rewrite response shape to match subscription request response
|
|
||||||
method = response.get('method')
|
|
||||||
params = response.get('params')
|
|
||||||
if method == 'blockchain.headers.subscribe':
|
|
||||||
response['result'] = params[0]
|
|
||||||
response['params'] = []
|
|
||||||
elif method == 'blockchain.address.subscribe':
|
|
||||||
response['params'] = [params[0]] # addr
|
|
||||||
response['result'] = params[1]
|
|
||||||
self.process_response(interface, response, None)
|
|
||||||
|
|
||||||
def send(self, messages, callback):
|
def send(self, messages, callback):
|
||||||
'''Messages is a list of (method, value) tuples'''
|
'''Messages is a list of (method, value) tuples'''
|
||||||
@@ -535,16 +547,11 @@ class Network(util.DaemonThread):
|
|||||||
for sub in subs:
|
for sub in subs:
|
||||||
if sub not in self.subscriptions[callback]:
|
if sub not in self.subscriptions[callback]:
|
||||||
self.subscriptions[callback].append(sub)
|
self.subscriptions[callback].append(sub)
|
||||||
_id = self.message_id
|
|
||||||
self.message_id += len(messages)
|
|
||||||
|
|
||||||
unsent = []
|
unsent = []
|
||||||
for message in messages:
|
for message in messages:
|
||||||
method, params = message
|
if not self.process_request(message, callback):
|
||||||
request = {'id': _id, 'method': method, 'params': params}
|
|
||||||
if not self.process_request(request, callback):
|
|
||||||
unsent.append(message)
|
unsent.append(message)
|
||||||
_id += 1
|
|
||||||
|
|
||||||
if unsent:
|
if unsent:
|
||||||
with self.lock:
|
with self.lock:
|
||||||
@@ -553,12 +560,10 @@ class Network(util.DaemonThread):
|
|||||||
# FIXME: inline this function
|
# FIXME: inline this function
|
||||||
def process_request(self, request, callback):
|
def process_request(self, request, callback):
|
||||||
'''Returns true if the request was processed.'''
|
'''Returns true if the request was processed.'''
|
||||||
method = request['method']
|
method, params = request
|
||||||
params = request['params']
|
|
||||||
_id = request['id']
|
|
||||||
|
|
||||||
if method.startswith('network.'):
|
if method.startswith('network.'):
|
||||||
out = {'id':_id}
|
out = {}
|
||||||
try:
|
try:
|
||||||
f = getattr(self, method[8:])
|
f = getattr(self, method[8:])
|
||||||
out['result'] = f(*params)
|
out['result'] = f(*params)
|
||||||
@@ -585,8 +590,8 @@ class Network(util.DaemonThread):
|
|||||||
|
|
||||||
if self.debug:
|
if self.debug:
|
||||||
self.print_error("-->", request)
|
self.print_error("-->", request)
|
||||||
self.unanswered_requests[_id] = request, callback
|
message_id = self.queue_request(method, params)
|
||||||
self.interface.queue_request(request)
|
self.unanswered_requests[message_id] = method, params, callback
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def connection_down(self, server):
|
def connection_down(self, server):
|
||||||
@@ -603,8 +608,7 @@ class Network(util.DaemonThread):
|
|||||||
def new_interface(self, server, socket):
|
def new_interface(self, server, socket):
|
||||||
self.add_recent_server(server)
|
self.add_recent_server(server)
|
||||||
self.interfaces[server] = interface = Interface(server, socket)
|
self.interfaces[server] = interface = Interface(server, socket)
|
||||||
interface.queue_request({'method': 'blockchain.headers.subscribe',
|
self.queue_request('blockchain.headers.subscribe', [], interface)
|
||||||
'params': []})
|
|
||||||
if server == self.default_server:
|
if server == self.default_server:
|
||||||
self.switch_to_interface(server)
|
self.switch_to_interface(server)
|
||||||
self.notify('interfaces')
|
self.notify('interfaces')
|
||||||
@@ -625,9 +629,8 @@ class Network(util.DaemonThread):
|
|||||||
if interface.has_timed_out():
|
if interface.has_timed_out():
|
||||||
self.connection_down(interface.server)
|
self.connection_down(interface.server)
|
||||||
elif interface.ping_required():
|
elif interface.ping_required():
|
||||||
version_req = {'method': 'server.version',
|
params = [ELECTRUM_VERSION, PROTOCOL_VERSION]
|
||||||
'params': [ELECTRUM_VERSION, PROTOCOL_VERSION]}
|
self.queue_request('server.version', params, interface)
|
||||||
interface.queue_request(version_req)
|
|
||||||
|
|
||||||
now = time.time()
|
now = time.time()
|
||||||
# nodes
|
# nodes
|
||||||
@@ -653,8 +656,7 @@ class Network(util.DaemonThread):
|
|||||||
|
|
||||||
def request_chunk(self, interface, data, idx):
|
def request_chunk(self, interface, data, idx):
|
||||||
interface.print_error("requesting chunk %d" % idx)
|
interface.print_error("requesting chunk %d" % idx)
|
||||||
interface.queue_request({'method':'blockchain.block.get_chunk',
|
self.queue_request('blockchain.block.get_chunk', [idx], interface)
|
||||||
'params':[idx]})
|
|
||||||
data['chunk_idx'] = idx
|
data['chunk_idx'] = idx
|
||||||
data['req_time'] = time.time()
|
data['req_time'] = time.time()
|
||||||
|
|
||||||
@@ -675,8 +677,7 @@ class Network(util.DaemonThread):
|
|||||||
|
|
||||||
def request_header(self, interface, data, height):
|
def request_header(self, interface, data, height):
|
||||||
interface.print_error("requesting header %d" % height)
|
interface.print_error("requesting header %d" % height)
|
||||||
interface.queue_request({'method':'blockchain.block.get_header',
|
self.queue_request('blockchain.block.get_header', [height], interface)
|
||||||
'params':[height]})
|
|
||||||
data['header_height'] = height
|
data['header_height'] = height
|
||||||
data['req_time'] = time.time()
|
data['req_time'] = time.time()
|
||||||
if not 'chain' in data:
|
if not 'chain' in data:
|
||||||
|
|||||||
@@ -1,9 +1,5 @@
|
|||||||
#!/usr/bin/env python
|
#!/usr/bin/env python
|
||||||
import util, json
|
import util, json
|
||||||
peers = util.get_peers()
|
peers = util.get_peers()
|
||||||
results = util.send_request(peers, {'method':'blockchain.estimatefee','params':[2]})
|
results = util.send_request(peers, 'blockchain.estimatefee', [2])
|
||||||
print json.dumps(results, indent=4)
|
print json.dumps(results, indent=4)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -31,12 +31,9 @@ def analyze(results):
|
|||||||
|
|
||||||
|
|
||||||
peers = util.get_peers()
|
peers = util.get_peers()
|
||||||
results = util.send_request(peers, {'method':'blockchain.headers.subscribe','params':[]})
|
results = util.send_request(peers, 'blockchain.headers.subscribe', [])
|
||||||
|
|
||||||
errors = analyze(results).keys()
|
errors = analyze(results).keys()
|
||||||
|
|
||||||
for n,v in sorted(results.items(), key=lambda x:x[1].get('block_height')):
|
for n,v in sorted(results.items(), key=lambda x:x[1].get('block_height')):
|
||||||
print "%40s"%n, v.get('block_height'), v.get('utxo_root'), "error" if n in errors else "ok"
|
print "%40s"%n, v.get('block_height'), v.get('utxo_root'), "error" if n in errors else "ok"
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ set_verbosity(False)
|
|||||||
|
|
||||||
config = SimpleConfig()
|
config = SimpleConfig()
|
||||||
servers = filter_protocol(protocol = 't')
|
servers = filter_protocol(protocol = 't')
|
||||||
results = util.send_request(servers, {'method':'blockchain.headers.subscribe', 'params':[]})
|
results = util.send_request(servers, 'blockchain.headers.subscribe', [])
|
||||||
|
|
||||||
d = defaultdict(int)
|
d = defaultdict(int)
|
||||||
|
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ except:
|
|||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
peers = util.get_peers()
|
peers = util.get_peers()
|
||||||
results = util.send_request(peers, {'method':'blockchain.transaction.get','params':[tx]})
|
results = util.send_request(peers, 'blockchain.transaction.get', [tx])
|
||||||
|
|
||||||
r1 = []
|
r1 = []
|
||||||
r2 = []
|
r2 = []
|
||||||
@@ -17,4 +17,3 @@ for k, v in results.items():
|
|||||||
|
|
||||||
print "Received %d answers"%len(results)
|
print "Received %d answers"%len(results)
|
||||||
print "Propagation rate: %.1f percent" % (len(r1) *100./(len(r1)+ len(r2)))
|
print "Propagation rate: %.1f percent" % (len(r1) *100./(len(r1)+ len(r2)))
|
||||||
|
|
||||||
|
|||||||
@@ -16,13 +16,15 @@ def get_interfaces(servers, timeout=10):
|
|||||||
connecting[server] = Connection(server, socket_queue, config.path)
|
connecting[server] = Connection(server, socket_queue, config.path)
|
||||||
interfaces = {}
|
interfaces = {}
|
||||||
timeout = time.time() + timeout
|
timeout = time.time() + timeout
|
||||||
while time.time() < timeout:
|
count = 0
|
||||||
|
while time.time() < timeout and count < len(servers):
|
||||||
try:
|
try:
|
||||||
server, socket = socket_queue.get(True, 1)
|
server, socket = socket_queue.get(True, 0.3)
|
||||||
except Queue.Empty:
|
except Queue.Empty:
|
||||||
continue
|
continue
|
||||||
if socket:
|
if socket:
|
||||||
interfaces[server] = Interface(server, socket)
|
interfaces[server] = Interface(server, socket)
|
||||||
|
count += 1
|
||||||
return interfaces
|
return interfaces
|
||||||
|
|
||||||
def wait_on_interfaces(interfaces, timeout=10):
|
def wait_on_interfaces(interfaces, timeout=10):
|
||||||
@@ -37,7 +39,7 @@ def wait_on_interfaces(interfaces, timeout=10):
|
|||||||
for interface in wout:
|
for interface in wout:
|
||||||
interface.send_requests()
|
interface.send_requests()
|
||||||
for interface in rout:
|
for interface in rout:
|
||||||
notifications, responses = interface.get_responses()
|
responses = interface.get_responses()
|
||||||
if responses:
|
if responses:
|
||||||
result[interface.server].extend(responses)
|
result[interface.server].extend(responses)
|
||||||
return result
|
return result
|
||||||
@@ -52,25 +54,23 @@ def get_peers():
|
|||||||
return []
|
return []
|
||||||
# 2. get list of peers
|
# 2. get list of peers
|
||||||
interface = interfaces[server]
|
interface = interfaces[server]
|
||||||
interface.queue_request({'id':0, 'method': 'server.peers.subscribe',
|
interface.queue_request('server.peers.subscribe', [], 0)
|
||||||
'params': []})
|
responses = wait_on_interfaces(interfaces).get(server)
|
||||||
responses = wait_on_interfaces(interfaces)
|
|
||||||
responses = responses.get(server)
|
|
||||||
if responses:
|
if responses:
|
||||||
response = responses[0][1] # One response, (req, response) tuple
|
response = responses[0][1] # One response, (req, response) tuple
|
||||||
peers = parse_servers(response.get('result'))
|
peers = parse_servers(response.get('result'))
|
||||||
peers = filter_protocol(peers,'s')
|
peers = filter_protocol(peers,'s')
|
||||||
return peers
|
return peers
|
||||||
|
|
||||||
def send_request(peers, request):
|
def send_request(peers, method, params):
|
||||||
print "Contacting %d servers"%len(peers)
|
print "Contacting %d servers"%len(peers)
|
||||||
interfaces = get_interfaces(peers)
|
interfaces = get_interfaces(peers)
|
||||||
print "%d servers could be reached" % len(interfaces)
|
print "%d servers could be reached" % len(interfaces)
|
||||||
for peer in peers:
|
for peer in peers:
|
||||||
if not peer in interfaces:
|
if not peer in interfaces:
|
||||||
print "Connection failed:", peer
|
print "Connection failed:", peer
|
||||||
for i in interfaces.values():
|
for msg_id, i in enumerate(interfaces.values()):
|
||||||
i.queue_request(request)
|
i.queue_request(method, params, msg_id)
|
||||||
responses = wait_on_interfaces(interfaces)
|
responses = wait_on_interfaces(interfaces)
|
||||||
for peer in interfaces:
|
for peer in interfaces:
|
||||||
if not peer in responses:
|
if not peer in responses:
|
||||||
|
|||||||
Reference in New Issue
Block a user