1
0

asynchronous processing: use a queue, handle responses in wallet class

This commit is contained in:
ThomasV
2012-03-23 16:34:34 +01:00
parent 39895f41cc
commit f60f6c28d3
4 changed files with 116 additions and 111 deletions

View File

@@ -25,19 +25,16 @@ DEFAULT_SERVERS = ['electrum.bitcoins.sk','ecdsa.org','electrum.novit.ro'] # li
class Interface:
def __init__(self, host, port, address_callback=None, history_callback=None, newblock_callback=None):
def __init__(self, host, port, address_callback=None, history_callback=None, newblock_callback=None, sync_cb=None):
self.host = host
self.port = port
self.sync_callback = sync_cb
self.address_callback = address_callback
self.history_callback = history_callback
self.newblock_callback = newblock_callback
self.servers = [] # actual list from IRC
self.rtime = 0
self.blocks = 0
self.message = ''
self.was_updated = True # fixme: use a semaphore
self.is_up_to_date = False
self.is_connected = False
@@ -45,15 +42,17 @@ class Interface:
self.addresses_waiting_for_status = []
self.addresses_waiting_for_history = []
self.tx_event = threading.Event()
self.up_to_date_event = threading.Event()
self.up_to_date_event.clear()
#json
self.message_id = 0
self.messages = {}
self.responses = Queue.Queue()
def is_up_to_date(self):
return not ( self.addresses_waiting_for_status or self.addresses_waiting_for_history )
def send_tx(self, data):
self.tx_event.clear()
self.send([('transaction.broadcast', [data])])
@@ -61,9 +60,6 @@ class Interface:
return self.tx_result
def start_session(self, addresses, version):
pass
def queue_json_response(self, c):
#print repr(c)
@@ -80,76 +76,18 @@ class Interface:
print "received error:", c, method, params
else:
#self.handle_response(method, params, result)
if method == 'address.subscribe':
addr = params[-1]
if addr in self.addresses_waiting_for_status:
self.addresses_waiting_for_status.remove(addr)
elif method == 'address.get_history':
addr = params[0]
if addr in self.addresses_waiting_for_history:
self.addresses_waiting_for_history.remove(addr)
self.responses.put({'method':method, 'params':params, 'result':result})
#self.is_up_to_date = True
def handle_response(self, r):
if r is None:
print "empty item"
return
method = r['method']
params = r['params']
result = r['result']
if method == 'server.banner':
self.message = result
self.was_updated = True
elif method == 'session.poll':
# native poll
blocks, changed_addresses = result
if blocks == -1: raise BaseException("session not found")
self.blocks = int(blocks)
if changed_addresses:
self.is_up_to_date = False
self.was_updated = True
for addr, status in changed_addresses.items():
apply(self.address_callback, (addr, status))
else:
self.is_up_to_date = True
elif method == 'server.peers':
#print "Received server list: ", result
self.servers = map( lambda x:x[1], result )
elif method == 'address.subscribe':
addr = params[-1]
if addr in self.addresses_waiting_for_status:
self.addresses_waiting_for_status.remove(addr)
apply(self.address_callback,(addr, result))
elif method == 'address.get_history':
addr = params[0]
if addr in self.addresses_waiting_for_history:
self.addresses_waiting_for_history.remove(addr)
apply(self.history_callback, (addr, result))
self.was_updated = True
elif method == 'transaction.broadcast':
self.tx_result = result
self.tx_event.set()
elif method == 'numblocks.subscribe':
self.blocks = result
if self.newblock_callback: apply(self.newblock_callback,(result,))
elif method == 'client.version':
pass
else:
print "unknown message:", method, params, result
if self.addresses_waiting_for_status or self.addresses_waiting_for_history:
self.is_up_to_date = False
else:
self.is_up_to_date = True
self.up_to_date_event.set()
def subscribe(self, addresses):
messages = []
for addr in addresses:
@@ -196,11 +134,6 @@ class PollingInterface(Interface):
def poll(self):
self.send([('session.poll', [])])
def update_wallet(self):
while True:
self.poll()
if self.is_up_to_date: break
#if is_new or wallet.remote_url:
# self.was_updated = True
# is_new = wallet.synchronize()
@@ -213,7 +146,7 @@ class PollingInterface(Interface):
def poll_thread(self, poll_interval):
while self.is_connected:
try:
self.update_wallet()
self.poll()
time.sleep(poll_interval)
except socket.gaierror:
break
@@ -281,7 +214,6 @@ class NativeInterface(PollingInterface):
if cmd == 'new_session':
self.session_id, self.message = ast.literal_eval( out )
self.was_updated = True
else:
self.responses.put({'method':method, 'params':params, 'result':out})
@@ -379,9 +311,6 @@ class AsynchronousInterface(Interface):
self.is_connected = False
self.responses.put(None)
def update_wallet(self):
self.up_to_date_event.wait()
def send(self, messages):
out = ''
for m in messages:
@@ -416,8 +345,6 @@ def new_interface(wallet):
else:
host = random.choice( DEFAULT_SERVERS ) # random choice when the wallet is created
port = wallet.port
address_cb = wallet.receive_status_callback
history_cb = wallet.receive_history_callback
if port == 50000:
InterfaceClass = NativeInterface
@@ -429,21 +356,20 @@ def new_interface(wallet):
print "unknown port number: %d. using native protocol."%port
InterfaceClass = NativeInterface
interface = InterfaceClass(host, port, address_cb, history_cb)
interface = InterfaceClass(host, port)
return interface
def loop_interfaces_thread(wallet):
while True:
interface = wallet.interface
try:
addresses = wallet.all_addresses()
version = wallet.electrum_version
wallet.interface.start_session(addresses, version)
while wallet.interface.is_connected:
response = wallet.interface.responses.get()
wallet.interface.handle_response(response)
interface.start_session(addresses, version)
wallet.run()
print "Disconnected"
except socket.error: