interface: parallel header-chunks download
We request chunks concurrently. This makes header-sync much faster
when we are many blocks behind.
notes:
- all chunks are downloaded from the same interface, just for simplicity
- we request up to 10 chunks concurrently (so 10*2016 headers)
- more chunks: higher memory requirements
- more chunks: higher concurrency => syncing needs fewer network round-trips
- if a chunk does not connect, bandwidth for all later chunks is wasted
- we can tweak the constant or make it dynamic or make it a configvar, etc, later
- without this, we progress the chain tip by around 1 chunk per second
- 52k blocks (1 year on mainnet) takes around 26 seconds
- this is probably not *that* interesting for mainnet,
but for testnet3, that sometimes has 200x the block-rate of mainnet,
it is extremely useful
This commit is contained in:
@@ -856,22 +856,47 @@ class Interface(Logger):
|
||||
async def _fast_forward_chain(
|
||||
self,
|
||||
*,
|
||||
height: int,
|
||||
tip: int,
|
||||
) -> Optional[Tuple[bool, int]]:
|
||||
height: int, # usually local chain tip + 1
|
||||
tip: int, # server tip. we should not request past this.
|
||||
) -> int:
|
||||
"""Request some headers starting at `height` to grow the blockchain of this interface.
|
||||
Returns number of headers we managed to connect, starting at `height`.
|
||||
"""
|
||||
if not is_non_negative_integer(height):
|
||||
raise Exception(f"{repr(height)} is not a block height")
|
||||
if not is_non_negative_integer(tip):
|
||||
raise Exception(f"{repr(tip)} is not a block height")
|
||||
index = height // CHUNK_SIZE
|
||||
size = CHUNK_SIZE
|
||||
size = min(size, tip - index * CHUNK_SIZE + 1)
|
||||
size = max(size, 0)
|
||||
headers = await self.get_block_headers(start_height=index * CHUNK_SIZE, count=size)
|
||||
conn = self.blockchain.connect_chunk(index, data=b"".join(headers))
|
||||
if not conn:
|
||||
return conn, 0
|
||||
return conn, len(headers)
|
||||
if not (height > constants.net.max_checkpoint()
|
||||
or height == 0 == constants.net.max_checkpoint()):
|
||||
raise Exception(f"{height=} must be > cp={constants.net.max_checkpoint()}")
|
||||
assert height <= tip, f"{height=} must be <= {tip=}"
|
||||
# Request a few chunks of headers concurrently.
|
||||
# tradeoffs:
|
||||
# - more chunks: higher memory requirements
|
||||
# - more chunks: higher concurrency => syncing needs fewer network round-trips
|
||||
# - if a chunk does not connect, bandwidth for all later chunks is wasted
|
||||
async with OldTaskGroup() as group:
|
||||
tasks = [] # type: List[Tuple[int, asyncio.Task[Sequence[bytes]]]]
|
||||
index0 = height // CHUNK_SIZE
|
||||
for chunk_cnt in range(10):
|
||||
index = index0 + chunk_cnt
|
||||
start_height = index * CHUNK_SIZE
|
||||
if start_height > tip:
|
||||
break
|
||||
end_height = min(start_height + CHUNK_SIZE - 1, tip)
|
||||
size = end_height - start_height + 1
|
||||
tasks.append((index, await group.spawn(self.get_block_headers(start_height=start_height, count=size))))
|
||||
# try to connect chunks
|
||||
num_headers = 0
|
||||
for index, task in tasks:
|
||||
headers = task.result()
|
||||
conn = self.blockchain.connect_chunk(index, data=b"".join(headers))
|
||||
if not conn:
|
||||
break
|
||||
num_headers += len(headers)
|
||||
# We started at a chunk boundary, instead of requested `height`. Need to correct for that.
|
||||
offset = height - index0 * CHUNK_SIZE
|
||||
return max(0, num_headers - offset)
|
||||
|
||||
def is_main_server(self) -> bool:
|
||||
return (self.network.interface == self or
|
||||
@@ -1035,10 +1060,9 @@ class Interface(Logger):
|
||||
# We are far from the tip.
|
||||
# It is more efficient to process headers in large batches (CPU/disk_usage/logging).
|
||||
# (but this wastes a little bandwidth, if we are not on a chunk boundary)
|
||||
# TODO we should request (some) chunks concurrently. would help when we are many chunks behind
|
||||
could_connect, num_headers = await self._fast_forward_chain(
|
||||
num_headers = await self._fast_forward_chain(
|
||||
height=height, tip=next_height)
|
||||
if not could_connect:
|
||||
if num_headers == 0:
|
||||
if height <= constants.net.max_checkpoint():
|
||||
raise GracefulDisconnect('server chain conflicts with checkpoints or genesis')
|
||||
last, height = await self.step(height)
|
||||
@@ -1046,7 +1070,7 @@ class Interface(Logger):
|
||||
# report progress to gui/etc
|
||||
util.trigger_callback('blockchain_updated')
|
||||
util.trigger_callback('network_updated')
|
||||
height = (height // CHUNK_SIZE * CHUNK_SIZE) + num_headers
|
||||
height += num_headers
|
||||
assert height <= next_height+1, (height, self.tip)
|
||||
last = ChainResolutionMode.CATCHUP
|
||||
else:
|
||||
|
||||
Reference in New Issue
Block a user