1
0

fix recovery procedure

This commit is contained in:
ThomasV
2012-03-30 14:15:05 +02:00
parent d0dd8c847a
commit 81bb04378f
3 changed files with 76 additions and 79 deletions

View File

@@ -42,20 +42,15 @@ class Interface:
self.rtime = 0
self.is_connected = True
#only asynchrnous
self.addresses_waiting_for_status = []
self.addresses_waiting_for_history = []
self.poll_interval = 1
#json
self.message_id = 0
self.responses = Queue.Queue()
def is_up_to_date(self):
return self.responses.empty() and not ( self.addresses_waiting_for_status or self.addresses_waiting_for_history )
def poke(self):
# push a fake response so that the getting thread exits its loop
self.responses.put(None)
def queue_json_response(self, c):
#print repr(c)
@@ -70,26 +65,13 @@ class Interface:
if error:
print "received error:", c, method, params
else:
self.update_waiting_lists(method, params)
self.responses.put({'method':method, 'params':params, 'result':result})
def update_waiting_lists(self, method, params):
if method == 'blockchain.address.subscribe':
addr = params[-1]
if addr in self.addresses_waiting_for_status:
self.addresses_waiting_for_status.remove(addr)
elif method == 'blockchain.address.get_history':
addr = params[0]
if addr in self.addresses_waiting_for_history:
self.addresses_waiting_for_history.remove(addr)
def subscribe(self, addresses):
messages = []
for addr in addresses:
messages.append(('blockchain.address.subscribe', [addr]))
self.addresses_waiting_for_status.append(addr)
self.send(messages)
@@ -140,11 +122,11 @@ class PollingInterface(Interface):
#else:
# return False
def poll_thread(self, poll_interval):
def poll_thread(self):
while self.is_connected:
try:
self.poll()
time.sleep(poll_interval)
time.sleep(self.poll_interval)
except socket.gaierror:
break
except socket.error:
@@ -166,7 +148,7 @@ class NativeInterface(PollingInterface):
def start_session(self, addresses, version):
self.send([('session.new', [ version, addresses ])] )
self.send([('server.peers.subscribe',[])])
thread.start_new_thread(self.poll_thread, (5,))
thread.start_new_thread(self.poll_thread, ())
def send(self, messages):
import time
@@ -186,7 +168,7 @@ class NativeInterface(PollingInterface):
params = self.session_id
if cmd == 'address.subscribe':
params = [ self.session_id] + params
params = [ self.session_id ] + params
if cmd in ['h', 'tx']:
str_params = params[0]
@@ -212,16 +194,16 @@ class NativeInterface(PollingInterface):
if cmd == 'h':
out = old_to_new(out)
if cmd in[ 'peers','h','poll']:
if cmd in ['peers','h','poll']:
out = ast.literal_eval( out )
if out=='': out=None #fixme
if out == '':
out = None
if cmd == 'new_session':
self.session_id, msg = ast.literal_eval( out )
self.responses.put({'method':'server.banner', 'params':[], 'result':msg})
else:
self.update_waiting_lists(method, params)
self.responses.put({'method':method, 'params':params, 'result':out})
@@ -231,7 +213,7 @@ class HttpInterface(PollingInterface):
def start(self):
self.session_id = None
thread.start_new_thread(self.poll_thread, (15,))
thread.start_new_thread(self.poll_thread, ())
def poll(self):
if self.session_id:
@@ -280,6 +262,13 @@ class HttpInterface(PollingInterface):
for item in response:
self.queue_json_response(item)
if response:
self.poll_interval = 1
else:
if self.poll_interval < 15:
self.poll_interval += 1
#print self.poll_interval, response
self.rtime = time.time() - t1
self.is_connected = True
@@ -313,8 +302,7 @@ class AsynchronousInterface(Interface):
traceback.print_exc(file=sys.stdout)
self.is_connected = False
# push None so that the getting thread exits its loop
self.responses.put(None)
self.poke()
def send(self, messages):
out = ''
@@ -327,7 +315,6 @@ class AsynchronousInterface(Interface):
def get_history(self, addr):
self.send([('blockchain.address.get_history', [addr])])
self.addresses_waiting_for_history.append(addr)
def start(self):
self.s = socket.socket( socket.AF_INET, socket.SOCK_STREAM )