diff --git a/electrum/interface.py b/electrum/interface.py index 3fc51e559..981f76aee 100644 --- a/electrum/interface.py +++ b/electrum/interface.py @@ -856,22 +856,47 @@ class Interface(Logger): async def _fast_forward_chain( self, *, - height: int, - tip: int, - ) -> Optional[Tuple[bool, int]]: + height: int, # usually local chain tip + 1 + tip: int, # server tip. we should not request past this. + ) -> int: + """Request some headers starting at `height` to grow the blockchain of this interface. + Returns number of headers we managed to connect, starting at `height`. + """ if not is_non_negative_integer(height): raise Exception(f"{repr(height)} is not a block height") if not is_non_negative_integer(tip): raise Exception(f"{repr(tip)} is not a block height") - index = height // CHUNK_SIZE - size = CHUNK_SIZE - size = min(size, tip - index * CHUNK_SIZE + 1) - size = max(size, 0) - headers = await self.get_block_headers(start_height=index * CHUNK_SIZE, count=size) - conn = self.blockchain.connect_chunk(index, data=b"".join(headers)) - if not conn: - return conn, 0 - return conn, len(headers) + if not (height > constants.net.max_checkpoint() + or height == 0 == constants.net.max_checkpoint()): + raise Exception(f"{height=} must be > cp={constants.net.max_checkpoint()}") + assert height <= tip, f"{height=} must be <= {tip=}" + # Request a few chunks of headers concurrently. + # tradeoffs: + # - more chunks: higher memory requirements + # - more chunks: higher concurrency => syncing needs fewer network round-trips + # - if a chunk does not connect, bandwidth for all later chunks is wasted + async with OldTaskGroup() as group: + tasks = [] # type: List[Tuple[int, asyncio.Task[Sequence[bytes]]]] + index0 = height // CHUNK_SIZE + for chunk_cnt in range(10): + index = index0 + chunk_cnt + start_height = index * CHUNK_SIZE + if start_height > tip: + break + end_height = min(start_height + CHUNK_SIZE - 1, tip) + size = end_height - start_height + 1 + tasks.append((index, await group.spawn(self.get_block_headers(start_height=start_height, count=size)))) + # try to connect chunks + num_headers = 0 + for index, task in tasks: + headers = task.result() + conn = self.blockchain.connect_chunk(index, data=b"".join(headers)) + if not conn: + break + num_headers += len(headers) + # We started at a chunk boundary, instead of requested `height`. Need to correct for that. + offset = height - index0 * CHUNK_SIZE + return max(0, num_headers - offset) def is_main_server(self) -> bool: return (self.network.interface == self or @@ -1035,10 +1060,9 @@ class Interface(Logger): # We are far from the tip. # It is more efficient to process headers in large batches (CPU/disk_usage/logging). # (but this wastes a little bandwidth, if we are not on a chunk boundary) - # TODO we should request (some) chunks concurrently. would help when we are many chunks behind - could_connect, num_headers = await self._fast_forward_chain( + num_headers = await self._fast_forward_chain( height=height, tip=next_height) - if not could_connect: + if num_headers == 0: if height <= constants.net.max_checkpoint(): raise GracefulDisconnect('server chain conflicts with checkpoints or genesis') last, height = await self.step(height) @@ -1046,7 +1070,7 @@ class Interface(Logger): # report progress to gui/etc util.trigger_callback('blockchain_updated') util.trigger_callback('network_updated') - height = (height // CHUNK_SIZE * CHUNK_SIZE) + num_headers + height += num_headers assert height <= next_height+1, (height, self.tip) last = ChainResolutionMode.CATCHUP else: