From 6a56fd756bb58080fffb43b61a19c62bb4eeec97 Mon Sep 17 00:00:00 2001 From: SomberNight Date: Tue, 15 Jul 2025 22:53:35 +0000 Subject: [PATCH 1/2] interface: split request_chunk, based on "can_return_early" param --- electrum/interface.py | 48 ++++++++++++++++++++++++++++-------------- electrum/lnverifier.py | 2 +- electrum/network.py | 13 ------------ electrum/verifier.py | 2 +- 4 files changed, 34 insertions(+), 31 deletions(-) diff --git a/electrum/interface.py b/electrum/interface.py index 5e572ab0c..3fc51e559 100644 --- a/electrum/interface.py +++ b/electrum/interface.py @@ -831,28 +831,43 @@ class Interface(Logger): headers = list(util.chunks(bfh(res['hex']), size=HEADER_SIZE)) return headers - async def request_chunk( + async def request_chunk_below_max_checkpoint( self, - height: int, *, - tip: Optional[int] = None, - can_return_early: bool = False, + height: int, + ) -> None: + if not is_non_negative_integer(height): + raise Exception(f"{repr(height)} is not a block height") + assert height <= constants.net.max_checkpoint(), f"{height=} must be <= cp={constants.net.max_checkpoint()}" + index = height // CHUNK_SIZE + if index in self._requested_chunks: + return None + self.logger.debug(f"requesting chunk from height {height}") + try: + self._requested_chunks.add(index) + headers = await self.get_block_headers(start_height=index * CHUNK_SIZE, count=CHUNK_SIZE) + finally: + self._requested_chunks.discard(index) + conn = self.blockchain.connect_chunk(index, data=b"".join(headers)) + if not conn: + raise RequestCorrupted(f"chunk ({index=}, for {height=}) does not connect to blockchain") + return None + + async def _fast_forward_chain( + self, + *, + height: int, + tip: int, ) -> Optional[Tuple[bool, int]]: if not is_non_negative_integer(height): raise Exception(f"{repr(height)} is not a block height") + if not is_non_negative_integer(tip): + raise Exception(f"{repr(tip)} is not a block height") index = height // CHUNK_SIZE - if can_return_early and index in self._requested_chunks: - return None - #self.logger.debug(f"requesting chunk from height {height}") size = CHUNK_SIZE - if tip is not None: - size = min(size, tip - index * CHUNK_SIZE + 1) - size = max(size, 0) - try: - self._requested_chunks.add(index) - headers = await self.get_block_headers(start_height=index * CHUNK_SIZE, count=size) - finally: - self._requested_chunks.discard(index) + size = min(size, tip - index * CHUNK_SIZE + 1) + size = max(size, 0) + headers = await self.get_block_headers(start_height=index * CHUNK_SIZE, count=size) conn = self.blockchain.connect_chunk(index, data=b"".join(headers)) if not conn: return conn, 0 @@ -1021,7 +1036,8 @@ class Interface(Logger): # It is more efficient to process headers in large batches (CPU/disk_usage/logging). # (but this wastes a little bandwidth, if we are not on a chunk boundary) # TODO we should request (some) chunks concurrently. would help when we are many chunks behind - could_connect, num_headers = await self.request_chunk(height, tip=next_height) + could_connect, num_headers = await self._fast_forward_chain( + height=height, tip=next_height) if not could_connect: if height <= constants.net.max_checkpoint(): raise GracefulDisconnect('server chain conflicts with checkpoints or genesis') diff --git a/electrum/lnverifier.py b/electrum/lnverifier.py index dad87e647..3464da4f9 100644 --- a/electrum/lnverifier.py +++ b/electrum/lnverifier.py @@ -102,7 +102,7 @@ class LNChannelVerifier(NetworkJobOnDefaultServer): header = blockchain.read_header(block_height) if header is None: if block_height <= constants.net.max_checkpoint(): - await self.taskgroup.spawn(self.interface.request_chunk(block_height, can_return_early=True)) + await self.taskgroup.spawn(self.interface.request_chunk_below_max_checkpoint(height=block_height)) continue self.started_verifying_channel.add(short_channel_id) await self.taskgroup.spawn(self.verify_channel(block_height, short_channel_id)) diff --git a/electrum/network.py b/electrum/network.py index e1b41b8a5..2171ce25e 100644 --- a/electrum/network.py +++ b/electrum/network.py @@ -1309,19 +1309,6 @@ class Network(Logger, NetworkRetryManager[ServerAddr]): # otherwise: return _("Unknown error") - @best_effort_reliable - @catch_server_exceptions - async def request_chunk( - self, - height: int, - *, - tip: Optional[int] = None, - can_return_early: bool = False, - ) -> Optional[Tuple[bool, int]]: - if self.interface is None: # handled by best_effort_reliable - raise RequestTimedOut() - return await self.interface.request_chunk(height, tip=tip, can_return_early=can_return_early) - @best_effort_reliable @catch_server_exceptions async def get_transaction(self, tx_hash: str, *, timeout=None) -> str: diff --git a/electrum/verifier.py b/electrum/verifier.py index 2e4de8f3b..7205fce2e 100644 --- a/electrum/verifier.py +++ b/electrum/verifier.py @@ -88,7 +88,7 @@ class SPV(NetworkJobOnDefaultServer): if header is None: if tx_height <= constants.net.max_checkpoint(): # FIXME these requests are not counted (self._requests_sent += 1) - await self.taskgroup.spawn(self.interface.request_chunk(tx_height, can_return_early=True)) + await self.taskgroup.spawn(self.interface.request_chunk_below_max_checkpoint(height=tx_height)) continue # request now self.logger.info(f'requested merkle {tx_hash}') From 3ceb59d58e643ec19207d23472d7e30bf4b7592f Mon Sep 17 00:00:00 2001 From: SomberNight Date: Wed, 16 Jul 2025 00:15:19 +0000 Subject: [PATCH 2/2] interface: parallel header-chunks download We request chunks concurrently. This makes header-sync much faster when we are many blocks behind. notes: - all chunks are downloaded from the same interface, just for simplicity - we request up to 10 chunks concurrently (so 10*2016 headers) - more chunks: higher memory requirements - more chunks: higher concurrency => syncing needs fewer network round-trips - if a chunk does not connect, bandwidth for all later chunks is wasted - we can tweak the constant or make it dynamic or make it a configvar, etc, later - without this, we progress the chain tip by around 1 chunk per second - 52k blocks (1 year on mainnet) takes around 26 seconds - this is probably not *that* interesting for mainnet, but for testnet3, that sometimes has 200x the block-rate of mainnet, it is extremely useful --- electrum/interface.py | 56 ++++++++++++++++++++++++++++++------------- 1 file changed, 40 insertions(+), 16 deletions(-) diff --git a/electrum/interface.py b/electrum/interface.py index 3fc51e559..981f76aee 100644 --- a/electrum/interface.py +++ b/electrum/interface.py @@ -856,22 +856,47 @@ class Interface(Logger): async def _fast_forward_chain( self, *, - height: int, - tip: int, - ) -> Optional[Tuple[bool, int]]: + height: int, # usually local chain tip + 1 + tip: int, # server tip. we should not request past this. + ) -> int: + """Request some headers starting at `height` to grow the blockchain of this interface. + Returns number of headers we managed to connect, starting at `height`. + """ if not is_non_negative_integer(height): raise Exception(f"{repr(height)} is not a block height") if not is_non_negative_integer(tip): raise Exception(f"{repr(tip)} is not a block height") - index = height // CHUNK_SIZE - size = CHUNK_SIZE - size = min(size, tip - index * CHUNK_SIZE + 1) - size = max(size, 0) - headers = await self.get_block_headers(start_height=index * CHUNK_SIZE, count=size) - conn = self.blockchain.connect_chunk(index, data=b"".join(headers)) - if not conn: - return conn, 0 - return conn, len(headers) + if not (height > constants.net.max_checkpoint() + or height == 0 == constants.net.max_checkpoint()): + raise Exception(f"{height=} must be > cp={constants.net.max_checkpoint()}") + assert height <= tip, f"{height=} must be <= {tip=}" + # Request a few chunks of headers concurrently. + # tradeoffs: + # - more chunks: higher memory requirements + # - more chunks: higher concurrency => syncing needs fewer network round-trips + # - if a chunk does not connect, bandwidth for all later chunks is wasted + async with OldTaskGroup() as group: + tasks = [] # type: List[Tuple[int, asyncio.Task[Sequence[bytes]]]] + index0 = height // CHUNK_SIZE + for chunk_cnt in range(10): + index = index0 + chunk_cnt + start_height = index * CHUNK_SIZE + if start_height > tip: + break + end_height = min(start_height + CHUNK_SIZE - 1, tip) + size = end_height - start_height + 1 + tasks.append((index, await group.spawn(self.get_block_headers(start_height=start_height, count=size)))) + # try to connect chunks + num_headers = 0 + for index, task in tasks: + headers = task.result() + conn = self.blockchain.connect_chunk(index, data=b"".join(headers)) + if not conn: + break + num_headers += len(headers) + # We started at a chunk boundary, instead of requested `height`. Need to correct for that. + offset = height - index0 * CHUNK_SIZE + return max(0, num_headers - offset) def is_main_server(self) -> bool: return (self.network.interface == self or @@ -1035,10 +1060,9 @@ class Interface(Logger): # We are far from the tip. # It is more efficient to process headers in large batches (CPU/disk_usage/logging). # (but this wastes a little bandwidth, if we are not on a chunk boundary) - # TODO we should request (some) chunks concurrently. would help when we are many chunks behind - could_connect, num_headers = await self._fast_forward_chain( + num_headers = await self._fast_forward_chain( height=height, tip=next_height) - if not could_connect: + if num_headers == 0: if height <= constants.net.max_checkpoint(): raise GracefulDisconnect('server chain conflicts with checkpoints or genesis') last, height = await self.step(height) @@ -1046,7 +1070,7 @@ class Interface(Logger): # report progress to gui/etc util.trigger_callback('blockchain_updated') util.trigger_callback('network_updated') - height = (height // CHUNK_SIZE * CHUNK_SIZE) + num_headers + height += num_headers assert height <= next_height+1, (height, self.tip) last = ChainResolutionMode.CATCHUP else: