diff --git a/electrum/interface.py b/electrum/interface.py index 8da607efe..e47edd024 100644 --- a/electrum/interface.py +++ b/electrum/interface.py @@ -832,32 +832,72 @@ class Interface(Logger): headers = list(util.chunks(bfh(res['hex']), size=HEADER_SIZE)) return headers - async def request_chunk( + async def request_chunk_below_max_checkpoint( self, - height: int, *, - tip: Optional[int] = None, - can_return_early: bool = False, - ) -> Optional[Tuple[bool, int]]: + height: int, + ) -> None: if not is_non_negative_integer(height): raise Exception(f"{repr(height)} is not a block height") + assert height <= constants.net.max_checkpoint(), f"{height=} must be <= cp={constants.net.max_checkpoint()}" index = height // CHUNK_SIZE - if can_return_early and index in self._requested_chunks: + if index in self._requested_chunks: return None - #self.logger.debug(f"requesting chunk from height {height}") - size = CHUNK_SIZE - if tip is not None: - size = min(size, tip - index * CHUNK_SIZE + 1) - size = max(size, 0) + self.logger.debug(f"requesting chunk from height {height}") try: self._requested_chunks.add(index) - headers = await self.get_block_headers(start_height=index * CHUNK_SIZE, count=size) + headers = await self.get_block_headers(start_height=index * CHUNK_SIZE, count=CHUNK_SIZE) finally: self._requested_chunks.discard(index) conn = self.blockchain.connect_chunk(index, data=b"".join(headers)) if not conn: - return conn, 0 - return conn, len(headers) + raise RequestCorrupted(f"chunk ({index=}, for {height=}) does not connect to blockchain") + return None + + async def _fast_forward_chain( + self, + *, + height: int, # usually local chain tip + 1 + tip: int, # server tip. we should not request past this. + ) -> int: + """Request some headers starting at `height` to grow the blockchain of this interface. + Returns number of headers we managed to connect, starting at `height`. + """ + if not is_non_negative_integer(height): + raise Exception(f"{repr(height)} is not a block height") + if not is_non_negative_integer(tip): + raise Exception(f"{repr(tip)} is not a block height") + if not (height > constants.net.max_checkpoint() + or height == 0 == constants.net.max_checkpoint()): + raise Exception(f"{height=} must be > cp={constants.net.max_checkpoint()}") + assert height <= tip, f"{height=} must be <= {tip=}" + # Request a few chunks of headers concurrently. + # tradeoffs: + # - more chunks: higher memory requirements + # - more chunks: higher concurrency => syncing needs fewer network round-trips + # - if a chunk does not connect, bandwidth for all later chunks is wasted + async with OldTaskGroup() as group: + tasks = [] # type: List[Tuple[int, asyncio.Task[Sequence[bytes]]]] + index0 = height // CHUNK_SIZE + for chunk_cnt in range(10): + index = index0 + chunk_cnt + start_height = index * CHUNK_SIZE + if start_height > tip: + break + end_height = min(start_height + CHUNK_SIZE - 1, tip) + size = end_height - start_height + 1 + tasks.append((index, await group.spawn(self.get_block_headers(start_height=start_height, count=size)))) + # try to connect chunks + num_headers = 0 + for index, task in tasks: + headers = task.result() + conn = self.blockchain.connect_chunk(index, data=b"".join(headers)) + if not conn: + break + num_headers += len(headers) + # We started at a chunk boundary, instead of requested `height`. Need to correct for that. + offset = height - index0 * CHUNK_SIZE + return max(0, num_headers - offset) def is_main_server(self) -> bool: return (self.network.interface == self or @@ -1021,9 +1061,9 @@ class Interface(Logger): # We are far from the tip. # It is more efficient to process headers in large batches (CPU/disk_usage/logging). # (but this wastes a little bandwidth, if we are not on a chunk boundary) - # TODO we should request (some) chunks concurrently. would help when we are many chunks behind - could_connect, num_headers = await self.request_chunk(height, tip=next_height) - if not could_connect: + num_headers = await self._fast_forward_chain( + height=height, tip=next_height) + if num_headers == 0: if height <= constants.net.max_checkpoint(): raise GracefulDisconnect('server chain conflicts with checkpoints or genesis') last, height = await self.step(height) @@ -1031,7 +1071,7 @@ class Interface(Logger): # report progress to gui/etc util.trigger_callback('blockchain_updated') util.trigger_callback('network_updated') - height = (height // CHUNK_SIZE * CHUNK_SIZE) + num_headers + height += num_headers assert height <= next_height+1, (height, self.tip) last = ChainResolutionMode.CATCHUP else: diff --git a/electrum/lnverifier.py b/electrum/lnverifier.py index dad87e647..3464da4f9 100644 --- a/electrum/lnverifier.py +++ b/electrum/lnverifier.py @@ -102,7 +102,7 @@ class LNChannelVerifier(NetworkJobOnDefaultServer): header = blockchain.read_header(block_height) if header is None: if block_height <= constants.net.max_checkpoint(): - await self.taskgroup.spawn(self.interface.request_chunk(block_height, can_return_early=True)) + await self.taskgroup.spawn(self.interface.request_chunk_below_max_checkpoint(height=block_height)) continue self.started_verifying_channel.add(short_channel_id) await self.taskgroup.spawn(self.verify_channel(block_height, short_channel_id)) diff --git a/electrum/network.py b/electrum/network.py index 44719ff41..462cdaa74 100644 --- a/electrum/network.py +++ b/electrum/network.py @@ -1311,19 +1311,6 @@ class Network(Logger, NetworkRetryManager[ServerAddr]): # otherwise: return _("Unknown error") - @best_effort_reliable - @catch_server_exceptions - async def request_chunk( - self, - height: int, - *, - tip: Optional[int] = None, - can_return_early: bool = False, - ) -> Optional[Tuple[bool, int]]: - if self.interface is None: # handled by best_effort_reliable - raise RequestTimedOut() - return await self.interface.request_chunk(height, tip=tip, can_return_early=can_return_early) - @best_effort_reliable @catch_server_exceptions async def get_transaction(self, tx_hash: str, *, timeout=None) -> str: diff --git a/electrum/verifier.py b/electrum/verifier.py index 2e4de8f3b..7205fce2e 100644 --- a/electrum/verifier.py +++ b/electrum/verifier.py @@ -88,7 +88,7 @@ class SPV(NetworkJobOnDefaultServer): if header is None: if tx_height <= constants.net.max_checkpoint(): # FIXME these requests are not counted (self._requests_sent += 1) - await self.taskgroup.spawn(self.interface.request_chunk(tx_height, can_return_early=True)) + await self.taskgroup.spawn(self.interface.request_chunk_below_max_checkpoint(height=tx_height)) continue # request now self.logger.info(f'requested merkle {tx_hash}')