simplify interface: use callbacks
This commit is contained in:
178
lib/interface.py
178
lib/interface.py
@@ -25,36 +25,6 @@ from util import print_error, print_msg
|
||||
|
||||
|
||||
DEFAULT_TIMEOUT = 5
|
||||
DEFAULT_PORTS = {'t':'50001', 's':'50002', 'h':'8081', 'g':'8082'}
|
||||
|
||||
DEFAULT_SERVERS = {
|
||||
'the9ull.homelinux.org': {'h': '8082', 't': '50001'},
|
||||
'electrum.coinwallet.me': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'},
|
||||
'electrum.dynaloop.net': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'},
|
||||
'electrum.koh.ms': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'},
|
||||
'electrum.novit.ro': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'},
|
||||
'electrum.stepkrav.pw': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'},
|
||||
'ecdsa.org': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'},
|
||||
'electrum.mooo.com': {'h': '8081', 't': '50001'},
|
||||
'electrum.bitcoins.sk': {'h': '8081', 's': '50002', 't': '50001', 'g': '8'},
|
||||
'electrum.no-ip.org': {'h': '80', 's': '50002', 't': '50001', 'g': '443'},
|
||||
'electrum.drollette.com': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'},
|
||||
'btc.it-zone.org': {'h': '80', 's': '110', 't': '50001', 'g': '443'},
|
||||
'electrum.yacoin.com': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'},
|
||||
'electrum.be': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'}
|
||||
}
|
||||
|
||||
|
||||
|
||||
def filter_protocol(servers, p):
|
||||
l = []
|
||||
for k, protocols in servers.items():
|
||||
if p in protocols:
|
||||
l.append( ':'.join([k, protocols[p], p]) )
|
||||
return l
|
||||
|
||||
|
||||
|
||||
proxy_modes = ['socks4', 'socks5', 'http']
|
||||
|
||||
|
||||
@@ -62,12 +32,9 @@ def pick_random_server():
|
||||
return random.choice( filter_protocol(DEFAULT_SERVERS,'s') )
|
||||
|
||||
|
||||
|
||||
|
||||
class Interface(threading.Thread):
|
||||
|
||||
|
||||
|
||||
def init_server(self, host, port, proxy=None, use_ssl=True):
|
||||
self.host = host
|
||||
self.port = port
|
||||
@@ -78,43 +45,9 @@ class Interface(threading.Thread):
|
||||
#json
|
||||
self.message_id = 0
|
||||
self.unanswered_requests = {}
|
||||
#banner
|
||||
self.banner = ''
|
||||
self.pending_transactions_for_notifications= []
|
||||
|
||||
|
||||
def parse_servers(self, result):
|
||||
""" parse servers list into dict format"""
|
||||
|
||||
servers = {}
|
||||
for item in result:
|
||||
host = item[1]
|
||||
out = {}
|
||||
version = None
|
||||
pruning_level = '-'
|
||||
if len(item) > 2:
|
||||
for v in item[2]:
|
||||
if re.match("[stgh]\d*", v):
|
||||
protocol, port = v[0], v[1:]
|
||||
if port == '': port = DEFAULT_PORTS[protocol]
|
||||
out[protocol] = port
|
||||
elif re.match("v(.?)+", v):
|
||||
version = v[1:]
|
||||
elif re.match("p\d*", v):
|
||||
pruning_level = v[1:]
|
||||
if pruning_level == '': pruning_level = '0'
|
||||
try:
|
||||
is_recent = float(version)>=float(PROTOCOL_VERSION)
|
||||
except:
|
||||
is_recent = False
|
||||
|
||||
if out and is_recent:
|
||||
out['pruning'] = pruning_level
|
||||
servers[host] = out
|
||||
|
||||
return servers
|
||||
|
||||
|
||||
def queue_json_response(self, c):
|
||||
|
||||
# uncomment to debug
|
||||
@@ -127,30 +60,18 @@ class Interface(threading.Thread):
|
||||
print_error("received error:", c)
|
||||
if msg_id is not None:
|
||||
with self.lock:
|
||||
method, params, channel = self.unanswered_requests.pop(msg_id)
|
||||
response_queue = self.responses[channel]
|
||||
response_queue.put((self,{'method':method, 'params':params, 'error':error, 'id':msg_id}))
|
||||
method, params, callback = self.unanswered_requests.pop(msg_id)
|
||||
callback(self,{'method':method, 'params':params, 'error':error, 'id':msg_id})
|
||||
|
||||
return
|
||||
|
||||
if msg_id is not None:
|
||||
with self.lock:
|
||||
method, params, channel = self.unanswered_requests.pop(msg_id)
|
||||
method, params, callback = self.unanswered_requests.pop(msg_id)
|
||||
result = c.get('result')
|
||||
|
||||
if method == 'server.version':
|
||||
self.server_version = result
|
||||
|
||||
elif method == 'server.banner':
|
||||
self.banner = result
|
||||
self.network.trigger_callback('banner')
|
||||
|
||||
elif method == 'server.peers.subscribe':
|
||||
self.servers = self.parse_servers(result)
|
||||
self.network.trigger_callback('peers')
|
||||
|
||||
else:
|
||||
# notification: find the channel(s)
|
||||
# notification
|
||||
method = c.get('method')
|
||||
params = c.get('params')
|
||||
|
||||
@@ -170,31 +91,19 @@ class Interface(threading.Thread):
|
||||
with self.lock:
|
||||
for k,v in self.subscriptions.items():
|
||||
if (method, params) in v:
|
||||
channel = k
|
||||
callback = k
|
||||
break
|
||||
else:
|
||||
print_error( "received unexpected notification", method, params)
|
||||
print_error( self.subscriptions )
|
||||
return
|
||||
|
||||
response_queue = self.responses[channel]
|
||||
response_queue.put((self, {'method':method, 'params':params, 'result':result, 'id':msg_id}))
|
||||
|
||||
|
||||
callback(self, {'method':method, 'params':params, 'result':result, 'id':msg_id})
|
||||
|
||||
def get_response(self, channel='default', block=True, timeout=10000000000):
|
||||
ir = self.responses[channel].get(block, timeout)
|
||||
if ir:
|
||||
return ir[1]
|
||||
|
||||
def register_channel(self, channel, queue=None):
|
||||
if queue is None:
|
||||
queue = Queue.Queue()
|
||||
with self.lock:
|
||||
self.responses[channel] = queue
|
||||
|
||||
def poke(self, channel):
|
||||
self.responses[channel].put(None)
|
||||
def on_version(self, i, result):
|
||||
self.server_version = result
|
||||
|
||||
|
||||
def init_http(self, host, port, proxy=None, use_ssl=True):
|
||||
@@ -237,7 +146,7 @@ class Interface(threading.Thread):
|
||||
self.send([])
|
||||
|
||||
|
||||
def send_http(self, messages, channel='default'):
|
||||
def send_http(self, messages, callback):
|
||||
import urllib2, json, time, cookielib
|
||||
print_error( "send_http", messages )
|
||||
|
||||
@@ -257,7 +166,7 @@ class Interface(threading.Thread):
|
||||
method, params = m
|
||||
if type(params) != type([]): params = [params]
|
||||
data.append( { 'method':method, 'id':self.message_id, 'params':params } )
|
||||
self.unanswered_requests[self.message_id] = method, params, channel
|
||||
self.unanswered_requests[self.message_id] = method, params, callback
|
||||
self.message_id += 1
|
||||
|
||||
if data:
|
||||
@@ -359,7 +268,7 @@ class Interface(threading.Thread):
|
||||
|
||||
if timeout:
|
||||
# ping the server with server.version, as a real ping does not exist yet
|
||||
self.send([('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION])])
|
||||
self.send([('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION])], self.on_version)
|
||||
continue
|
||||
|
||||
out += msg
|
||||
@@ -381,14 +290,14 @@ class Interface(threading.Thread):
|
||||
self.is_connected = False
|
||||
|
||||
|
||||
def send_tcp(self, messages, channel='default'):
|
||||
def send_tcp(self, messages, callback):
|
||||
"""return the ids of the requests that we sent"""
|
||||
out = ''
|
||||
ids = []
|
||||
for m in messages:
|
||||
method, params = m
|
||||
request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } )
|
||||
self.unanswered_requests[self.message_id] = method, params, channel
|
||||
self.unanswered_requests[self.message_id] = method, params, callback
|
||||
ids.append(self.message_id)
|
||||
# uncomment to debug
|
||||
# print "-->", request
|
||||
@@ -413,7 +322,7 @@ class Interface(threading.Thread):
|
||||
|
||||
|
||||
def __init__(self, config=None):
|
||||
self.server = random.choice(filter_protocol(DEFAULT_SERVERS, 's'))
|
||||
#self.server = random.choice(filter_protocol(DEFAULT_SERVERS, 's'))
|
||||
self.proxy = None
|
||||
|
||||
if config is None:
|
||||
@@ -426,9 +335,6 @@ class Interface(threading.Thread):
|
||||
self.connect_event = threading.Event()
|
||||
|
||||
self.subscriptions = {}
|
||||
self.responses = {}
|
||||
self.responses['default'] = Queue.Queue()
|
||||
|
||||
self.lock = threading.Lock()
|
||||
|
||||
self.servers = {} # actual list from IRC
|
||||
@@ -475,7 +381,7 @@ class Interface(threading.Thread):
|
||||
raise BaseException('Unknown protocol: %s'%protocol)
|
||||
|
||||
|
||||
def send(self, messages, channel='default'):
|
||||
def send(self, messages, callback):
|
||||
|
||||
sub = []
|
||||
for message in messages:
|
||||
@@ -485,21 +391,21 @@ class Interface(threading.Thread):
|
||||
|
||||
if sub:
|
||||
with self.lock:
|
||||
if self.subscriptions.get(channel) is None:
|
||||
self.subscriptions[channel] = []
|
||||
if self.subscriptions.get(callback) is None:
|
||||
self.subscriptions[callback] = []
|
||||
for message in sub:
|
||||
if message not in self.subscriptions[channel]:
|
||||
self.subscriptions[channel].append(message)
|
||||
if message not in self.subscriptions[callback]:
|
||||
self.subscriptions[callback].append(message)
|
||||
|
||||
if not self.is_connected:
|
||||
return
|
||||
|
||||
if self.protocol in 'st':
|
||||
with self.lock:
|
||||
out = self.send_tcp(messages, channel)
|
||||
out = self.send_tcp(messages, callback)
|
||||
else:
|
||||
# do not use lock, http is synchronous
|
||||
out = self.send_http(messages, channel)
|
||||
out = self.send_http(messages, callback)
|
||||
|
||||
return out
|
||||
|
||||
@@ -525,6 +431,7 @@ class Interface(threading.Thread):
|
||||
|
||||
|
||||
def set_server(self, server, proxy=None):
|
||||
"todo: remove this"
|
||||
# raise an error if the format isnt correct
|
||||
a,b,c = server.split(':')
|
||||
b = int(b)
|
||||
@@ -540,46 +447,25 @@ class Interface(threading.Thread):
|
||||
self.is_connected = False # this exits the polling loop
|
||||
self.trigger_callback('disconnecting') # for actively disconnecting
|
||||
|
||||
|
||||
def stop(self):
|
||||
if self.is_connected and self.protocol in 'st' and self.s:
|
||||
self.s.shutdown(socket.SHUT_RDWR)
|
||||
self.s.close()
|
||||
|
||||
|
||||
def get_servers(self):
|
||||
if not self.servers:
|
||||
return DEFAULT_SERVERS
|
||||
else:
|
||||
return self.servers
|
||||
|
||||
|
||||
def is_empty(self, channel):
|
||||
q = self.responses.get(channel)
|
||||
if q:
|
||||
return q.empty()
|
||||
else:
|
||||
return True
|
||||
|
||||
|
||||
def get_pending_requests(self, channel):
|
||||
result = []
|
||||
with self.lock:
|
||||
for k, v in self.unanswered_requests.items():
|
||||
a, b, c = v
|
||||
if c == channel: result.append(k)
|
||||
return result
|
||||
|
||||
def is_up_to_date(self, channel):
|
||||
return self.is_empty(channel) and not self.get_pending_requests(channel)
|
||||
def is_up_to_date(self):
|
||||
return self.unanswered_requests == {}
|
||||
|
||||
|
||||
def synchronous_get(self, requests, timeout=100000000):
|
||||
# todo: use generators, unanswered_requests should be a list of arrays...
|
||||
ids = self.send(requests)
|
||||
q = Queue.Queue()
|
||||
ids = self.send(requests, lambda i,r: queue.put(r))
|
||||
id2 = ids[:]
|
||||
res = {}
|
||||
while ids:
|
||||
r = self.responses['default'].get(True, timeout)
|
||||
r = queue.get(True, timeout)
|
||||
_id = r.get('id')
|
||||
if _id in ids:
|
||||
ids.remove(_id)
|
||||
@@ -595,20 +481,16 @@ class Interface(threading.Thread):
|
||||
threading.Thread.start(self)
|
||||
|
||||
|
||||
|
||||
def run(self):
|
||||
self.init_interface()
|
||||
if self.is_connected:
|
||||
self.send([('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION])])
|
||||
self.send([('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION])], self.on_version)
|
||||
self.change_status()
|
||||
self.run_tcp() if self.protocol in 'st' else self.run_http()
|
||||
self.change_status()
|
||||
|
||||
|
||||
def change_status(self):
|
||||
#print "change status", self.server, self.is_connected
|
||||
self.queue.put(self)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user