restructure synchronizer
fix CLI notify cmd. fix merchant websockets.
This commit is contained in:
@@ -24,12 +24,15 @@
|
||||
# SOFTWARE.
|
||||
import asyncio
|
||||
import hashlib
|
||||
from typing import Dict, List
|
||||
from collections import defaultdict
|
||||
|
||||
from aiorpcx import TaskGroup, run_in_thread
|
||||
|
||||
from .transaction import Transaction
|
||||
from .util import bh2u, PrintError
|
||||
from .util import bh2u, make_aiohttp_session
|
||||
from .bitcoin import address_to_scripthash
|
||||
from .network import NetworkJobOnDefaultServer
|
||||
|
||||
|
||||
def history_status(h):
|
||||
@@ -41,7 +44,68 @@ def history_status(h):
|
||||
return bh2u(hashlib.sha256(status.encode('ascii')).digest())
|
||||
|
||||
|
||||
class Synchronizer(PrintError):
|
||||
class SynchronizerBase(NetworkJobOnDefaultServer):
|
||||
"""Subscribe over the network to a set of addresses, and monitor their statuses.
|
||||
Every time a status changes, run a coroutine provided by the subclass.
|
||||
"""
|
||||
def __init__(self, network):
|
||||
NetworkJobOnDefaultServer.__init__(self, network)
|
||||
self.asyncio_loop = network.asyncio_loop
|
||||
|
||||
def _reset(self):
|
||||
super()._reset()
|
||||
self.requested_addrs = set()
|
||||
self.scripthash_to_address = {}
|
||||
self._processed_some_notifications = False # so that we don't miss them
|
||||
# Queues
|
||||
self.add_queue = asyncio.Queue()
|
||||
self.status_queue = asyncio.Queue()
|
||||
|
||||
async def _start_tasks(self):
|
||||
try:
|
||||
async with self.group as group:
|
||||
await group.spawn(self.send_subscriptions())
|
||||
await group.spawn(self.handle_status())
|
||||
await group.spawn(self.main())
|
||||
finally:
|
||||
# we are being cancelled now
|
||||
self.session.unsubscribe(self.status_queue)
|
||||
|
||||
def add(self, addr):
|
||||
asyncio.run_coroutine_threadsafe(self._add_address(addr), self.asyncio_loop)
|
||||
|
||||
async def _add_address(self, addr):
|
||||
if addr in self.requested_addrs: return
|
||||
self.requested_addrs.add(addr)
|
||||
await self.add_queue.put(addr)
|
||||
|
||||
async def _on_address_status(self, addr, status):
|
||||
"""Handle the change of the status of an address."""
|
||||
raise NotImplementedError() # implemented by subclasses
|
||||
|
||||
async def send_subscriptions(self):
|
||||
async def subscribe_to_address(addr):
|
||||
h = address_to_scripthash(addr)
|
||||
self.scripthash_to_address[h] = addr
|
||||
await self.session.subscribe('blockchain.scripthash.subscribe', [h], self.status_queue)
|
||||
self.requested_addrs.remove(addr)
|
||||
|
||||
while True:
|
||||
addr = await self.add_queue.get()
|
||||
await self.group.spawn(subscribe_to_address, addr)
|
||||
|
||||
async def handle_status(self):
|
||||
while True:
|
||||
h, status = await self.status_queue.get()
|
||||
addr = self.scripthash_to_address[h]
|
||||
await self.group.spawn(self._on_address_status, addr, status)
|
||||
self._processed_some_notifications = True
|
||||
|
||||
async def main(self):
|
||||
raise NotImplementedError() # implemented by subclasses
|
||||
|
||||
|
||||
class Synchronizer(SynchronizerBase):
|
||||
'''The synchronizer keeps the wallet up-to-date with its set of
|
||||
addresses and their transactions. It subscribes over the network
|
||||
to wallet addresses, gets the wallet to generate new addresses
|
||||
@@ -51,16 +115,12 @@ class Synchronizer(PrintError):
|
||||
'''
|
||||
def __init__(self, wallet):
|
||||
self.wallet = wallet
|
||||
self.network = wallet.network
|
||||
self.asyncio_loop = wallet.network.asyncio_loop
|
||||
SynchronizerBase.__init__(self, wallet.network)
|
||||
|
||||
def _reset(self):
|
||||
super()._reset()
|
||||
self.requested_tx = {}
|
||||
self.requested_histories = {}
|
||||
self.requested_addrs = set()
|
||||
self.scripthash_to_address = {}
|
||||
self._processed_some_notifications = False # so that we don't miss them
|
||||
# Queues
|
||||
self.add_queue = asyncio.Queue()
|
||||
self.status_queue = asyncio.Queue()
|
||||
|
||||
def diagnostic_name(self):
|
||||
return '{}:{}'.format(self.__class__.__name__, self.wallet.diagnostic_name())
|
||||
@@ -70,14 +130,6 @@ class Synchronizer(PrintError):
|
||||
and not self.requested_histories
|
||||
and not self.requested_tx)
|
||||
|
||||
def add(self, addr):
|
||||
asyncio.run_coroutine_threadsafe(self._add(addr), self.asyncio_loop)
|
||||
|
||||
async def _add(self, addr):
|
||||
if addr in self.requested_addrs: return
|
||||
self.requested_addrs.add(addr)
|
||||
await self.add_queue.put(addr)
|
||||
|
||||
async def _on_address_status(self, addr, status):
|
||||
history = self.wallet.history.get(addr, [])
|
||||
if history_status(history) == status:
|
||||
@@ -144,30 +196,6 @@ class Synchronizer(PrintError):
|
||||
# callbacks
|
||||
self.wallet.network.trigger_callback('new_transaction', self.wallet, tx)
|
||||
|
||||
async def send_subscriptions(self, group: TaskGroup):
|
||||
async def subscribe_to_address(addr):
|
||||
h = address_to_scripthash(addr)
|
||||
self.scripthash_to_address[h] = addr
|
||||
await self.session.subscribe('blockchain.scripthash.subscribe', [h], self.status_queue)
|
||||
self.requested_addrs.remove(addr)
|
||||
|
||||
while True:
|
||||
addr = await self.add_queue.get()
|
||||
await group.spawn(subscribe_to_address, addr)
|
||||
|
||||
async def handle_status(self, group: TaskGroup):
|
||||
while True:
|
||||
h, status = await self.status_queue.get()
|
||||
addr = self.scripthash_to_address[h]
|
||||
await group.spawn(self._on_address_status, addr, status)
|
||||
self._processed_some_notifications = True
|
||||
|
||||
@property
|
||||
def session(self):
|
||||
s = self.wallet.network.interface.session
|
||||
assert s is not None
|
||||
return s
|
||||
|
||||
async def main(self):
|
||||
self.wallet.set_up_to_date(False)
|
||||
# request missing txns, if any
|
||||
@@ -178,7 +206,7 @@ class Synchronizer(PrintError):
|
||||
await self._request_missing_txs(history)
|
||||
# add addresses to bootstrap
|
||||
for addr in self.wallet.get_addresses():
|
||||
await self._add(addr)
|
||||
await self._add_address(addr)
|
||||
# main loop
|
||||
while True:
|
||||
await asyncio.sleep(0.1)
|
||||
@@ -189,3 +217,37 @@ class Synchronizer(PrintError):
|
||||
self._processed_some_notifications = False
|
||||
self.wallet.set_up_to_date(up_to_date)
|
||||
self.wallet.network.trigger_callback('wallet_updated', self.wallet)
|
||||
|
||||
|
||||
class Notifier(SynchronizerBase):
|
||||
"""Watch addresses. Every time the status of an address changes,
|
||||
an HTTP POST is sent to the corresponding URL.
|
||||
"""
|
||||
def __init__(self, network):
|
||||
SynchronizerBase.__init__(self, network)
|
||||
self.watched_addresses = defaultdict(list) # type: Dict[str, List[str]]
|
||||
self.start_watching_queue = asyncio.Queue()
|
||||
|
||||
async def main(self):
|
||||
# resend existing subscriptions if we were restarted
|
||||
for addr in self.watched_addresses:
|
||||
await self._add_address(addr)
|
||||
# main loop
|
||||
while True:
|
||||
addr, url = await self.start_watching_queue.get()
|
||||
self.watched_addresses[addr].append(url)
|
||||
await self._add_address(addr)
|
||||
|
||||
async def _on_address_status(self, addr, status):
|
||||
self.print_error('new status for addr {}'.format(addr))
|
||||
headers = {'content-type': 'application/json'}
|
||||
data = {'address': addr, 'status': status}
|
||||
for url in self.watched_addresses[addr]:
|
||||
try:
|
||||
async with make_aiohttp_session(proxy=self.network.proxy, headers=headers) as session:
|
||||
async with session.post(url, json=data, headers=headers) as resp:
|
||||
await resp.text()
|
||||
except Exception as e:
|
||||
self.print_error(str(e))
|
||||
else:
|
||||
self.print_error('Got Response for {}'.format(addr))
|
||||
|
||||
Reference in New Issue
Block a user