1
0

replace wallet.interface everywhere

This commit is contained in:
thomasv
2013-09-12 14:58:42 +02:00
parent 907dca6eb9
commit 6b6c508976
10 changed files with 87 additions and 86 deletions

View File

@@ -65,6 +65,8 @@ class Blockchain(threading.Thread):
if not result: continue
i, result = result
if not result: continue
header = result.get('result')
height = header.get('block_height')
self.servers_height[i.server] = height

View File

@@ -381,6 +381,12 @@ class Interface(threading.Thread):
raise BaseException('Unknown protocol: %s'%protocol)
def stop_subscriptions(self):
for callback in self.subscriptions.keys():
callback(self, None)
self.subscriptions = {}
def send(self, messages, callback):
sub = []
@@ -430,23 +436,6 @@ class Interface(threading.Thread):
return proxy
def set_server(self, server, proxy=None):
"todo: remove this"
# raise an error if the format isnt correct
a,b,c = server.split(':')
b = int(b)
assert c in 'stgh'
# set the server
if server != self.server or proxy != self.proxy:
print "changing server:", server, proxy
self.server = server
self.proxy = proxy
if self.is_connected and self.protocol in 'st' and self.s:
self.s.shutdown(socket.SHUT_RDWR)
self.s.close()
self.is_connected = False # this exits the polling loop
self.trigger_callback('disconnecting') # for actively disconnecting
def stop(self):
if self.is_connected and self.protocol in 'st' and self.s:

View File

@@ -45,9 +45,9 @@ class Network(threading.Thread):
self.interfaces = {}
self.queue = Queue.Queue()
self.default_server = self.config.get('server')
self.servers_list = filter_protocol(DEFAULT_SERVERS,'s')
self.disconnected_servers = []
self.callbacks = {}
#banner
self.servers = []
self.banner = ''
@@ -66,13 +66,17 @@ class Network(threading.Thread):
def random_server(self):
if len(self.servers_list) <= len(self.interfaces.keys()):
return
choice_list = []
l = filter_protocol(self.get_servers(), 's')
for s in l:
if s in self.disconnected_servers or s in self.interfaces.keys():
continue
else:
choice_list.append(s)
while True:
server = random.choice( self.servers_list )
if server not in self.interfaces.keys(): break
if not choice_list: return
server = random.choice( choice_list )
return server
@@ -116,11 +120,11 @@ class Network(threading.Thread):
def set_server(self, server, proxy):
subscriptions = self.interface.subscriptions
i = self.interface
self.default_server = server
self.start_interface(server)
self.interface = self.interfaces[server]
self.resend_subscriptions(subscriptions)
i.stop_subscriptions() # fixme: it should not stop all subscriptions, and send 'unsubscribe'
self.trigger_callback('disconnecting') # for actively disconnecting
@@ -138,8 +142,9 @@ class Network(threading.Thread):
if i == self.interface:
i.send([('server.banner',[])], self.on_banner)
i.send([('server.peers.subscribe',[])], self.on_peers)
self.trigger_callback('connected')
else:
self.servers_list.remove(i.server)
self.disconnected_servers.append(i.server)
self.interfaces.pop(i.server)
self.start_random_interface()
@@ -154,6 +159,7 @@ class Network(threading.Thread):
self.blockchain.queue.put((i,result))
def on_peers(self, i, r):
if not r: return
self.servers = self.parse_servers(r.get('result'))
self.trigger_callback('peers')
@@ -200,12 +206,6 @@ class Network(threading.Thread):
return servers
def resend_subscriptions(self, subscriptions):
for channel, messages in subscriptions.items():
if messages:
self.interface.send(messages, channel)
if __name__ == "__main__":

View File

@@ -224,7 +224,6 @@ class Wallet:
def update(self):
self.up_to_date = False
#self.interface.poke('synchronizer')
while not self.is_up_to_date():
time.sleep(0.1)
@@ -1039,7 +1038,7 @@ class Wallet:
print_error("received transaction that is no longer referenced in history", tx_hash)
return
self.transactions[tx_hash] = tx
self.interface.pending_transactions_for_notifications.append(tx)
self.network.interface.pending_transactions_for_notifications.append(tx)
self.save_transactions()
if self.verifier and tx_height>0:
self.verifier.add(tx_hash, tx_height)
@@ -1188,7 +1187,7 @@ class Wallet:
def send_tx(self, tx):
# asynchronous
self.tx_event.clear()
self.interface.send([('blockchain.transaction.broadcast', [str(tx)])], self.on_broadcast)
self.network.interface.send([('blockchain.transaction.broadcast', [str(tx)])], self.on_broadcast)
return tx.hash()
def on_broadcast(self, i, result):
@@ -1320,7 +1319,7 @@ class Wallet:
# assert not self.is_mine(_addr)
ext_requests.append( ('blockchain.address.get_history', [_addr]) )
ext_h = self.interface.synchronous_get(ext_requests)
ext_h = self.network.interface.synchronous_get(ext_requests)
print_error("sync:", ext_requests, ext_h)
height = None
for h in ext_h:
@@ -1362,7 +1361,6 @@ class Wallet:
def start_threads(self, network):
from verifier import TxVerifier
self.network = network
self.interface = network.interface
self.verifier = TxVerifier(self.network, self.storage)
self.verifier.start()
self.set_verifier(self.verifier)
@@ -1385,7 +1383,7 @@ class WalletSynchronizer(threading.Thread):
self.daemon = True
self.wallet = wallet
wallet.synchronizer = self
self.interface = self.wallet.interface
self.network = self.wallet.network
#self.wallet.network.register_callback('connected', lambda: self.wallet.set_up_to_date(False))
self.was_updated = True
self.running = False
@@ -1403,15 +1401,25 @@ class WalletSynchronizer(threading.Thread):
messages = []
for addr in addresses:
messages.append(('blockchain.address.subscribe', [addr]))
self.interface.send( messages, lambda i,r: self.queue.put(r))
self.network.interface.send( messages, lambda i,r: self.queue.put(r))
def run(self):
if not self.interface.is_connected:
print_error( "synchronizer: waiting for interface")
self.interface.connect_event.wait()
with self.lock:
self.running = True
with self.lock: self.running = True
while self.is_running():
interface = self.network.interface
if not interface.is_connected:
print_error("synchronizer: waiting for interface")
interface.connect_event.wait()
self.run_interface(interface)
def run_interface(self, interface):
print_error("synchronizer: connected to", interface.server)
requested_tx = []
missing_tx = []
@@ -1423,12 +1431,10 @@ class WalletSynchronizer(threading.Thread):
for tx_hash, tx_height in history:
if self.wallet.transactions.get(tx_hash) is None and (tx_hash, tx_height) not in missing_tx:
missing_tx.append( (tx_hash, tx_height) )
print_error("missing tx", missing_tx)
# wait until we are connected, in case the user is not connected
while not self.interface.is_connected:
time.sleep(1)
if missing_tx:
print_error("missing tx", missing_tx)
# subscriptions
self.subscribe_to_addresses(self.wallet.addresses(True, next=True))
@@ -1443,12 +1449,12 @@ class WalletSynchronizer(threading.Thread):
# request missing transactions
for tx_hash, tx_height in missing_tx:
if (tx_hash, tx_height) not in requested_tx:
self.interface.send([ ('blockchain.transaction.get',[tx_hash, tx_height]) ], lambda i,r: self.queue.put(r))
interface.send([ ('blockchain.transaction.get',[tx_hash, tx_height]) ], lambda i,r: self.queue.put(r))
requested_tx.append( (tx_hash, tx_height) )
missing_tx = []
# detect if situation has changed
if self.interface.is_up_to_date() and self.queue.empty():
if interface.is_up_to_date() and self.queue.empty():
if not self.wallet.is_up_to_date():
self.wallet.set_up_to_date(True)
self.was_updated = True
@@ -1462,10 +1468,16 @@ class WalletSynchronizer(threading.Thread):
self.was_updated = False
# 2. get a response
r = self.queue.get(block=True, timeout=10000000000)
try:
r = self.queue.get(block=True, timeout=1)
except Queue.Empty:
continue
# poke sends None. (needed during stop)
if not r: continue
if interface != self.network.interface:
break
if not r:
continue
# 3. handle response
method = r['method']
@@ -1480,7 +1492,7 @@ class WalletSynchronizer(threading.Thread):
addr = params[0]
if self.wallet.get_status(self.wallet.get_history(addr)) != result:
if requested_histories.get(addr) is None:
self.interface.send([('blockchain.address.get_history', [addr])], lambda i,r:self.queue.put(r))
interface.send([('blockchain.address.get_history', [addr])], lambda i,r:self.queue.put(r))
requested_histories[addr] = result
elif method == 'blockchain.address.get_history':