interface: split request_chunk, based on "can_return_early" param
This commit is contained in:
@@ -831,28 +831,43 @@ class Interface(Logger):
|
||||
headers = list(util.chunks(bfh(res['hex']), size=HEADER_SIZE))
|
||||
return headers
|
||||
|
||||
async def request_chunk(
|
||||
async def request_chunk_below_max_checkpoint(
|
||||
self,
|
||||
height: int,
|
||||
*,
|
||||
tip: Optional[int] = None,
|
||||
can_return_early: bool = False,
|
||||
height: int,
|
||||
) -> None:
|
||||
if not is_non_negative_integer(height):
|
||||
raise Exception(f"{repr(height)} is not a block height")
|
||||
assert height <= constants.net.max_checkpoint(), f"{height=} must be <= cp={constants.net.max_checkpoint()}"
|
||||
index = height // CHUNK_SIZE
|
||||
if index in self._requested_chunks:
|
||||
return None
|
||||
self.logger.debug(f"requesting chunk from height {height}")
|
||||
try:
|
||||
self._requested_chunks.add(index)
|
||||
headers = await self.get_block_headers(start_height=index * CHUNK_SIZE, count=CHUNK_SIZE)
|
||||
finally:
|
||||
self._requested_chunks.discard(index)
|
||||
conn = self.blockchain.connect_chunk(index, data=b"".join(headers))
|
||||
if not conn:
|
||||
raise RequestCorrupted(f"chunk ({index=}, for {height=}) does not connect to blockchain")
|
||||
return None
|
||||
|
||||
async def _fast_forward_chain(
|
||||
self,
|
||||
*,
|
||||
height: int,
|
||||
tip: int,
|
||||
) -> Optional[Tuple[bool, int]]:
|
||||
if not is_non_negative_integer(height):
|
||||
raise Exception(f"{repr(height)} is not a block height")
|
||||
if not is_non_negative_integer(tip):
|
||||
raise Exception(f"{repr(tip)} is not a block height")
|
||||
index = height // CHUNK_SIZE
|
||||
if can_return_early and index in self._requested_chunks:
|
||||
return None
|
||||
#self.logger.debug(f"requesting chunk from height {height}")
|
||||
size = CHUNK_SIZE
|
||||
if tip is not None:
|
||||
size = min(size, tip - index * CHUNK_SIZE + 1)
|
||||
size = max(size, 0)
|
||||
try:
|
||||
self._requested_chunks.add(index)
|
||||
headers = await self.get_block_headers(start_height=index * CHUNK_SIZE, count=size)
|
||||
finally:
|
||||
self._requested_chunks.discard(index)
|
||||
size = min(size, tip - index * CHUNK_SIZE + 1)
|
||||
size = max(size, 0)
|
||||
headers = await self.get_block_headers(start_height=index * CHUNK_SIZE, count=size)
|
||||
conn = self.blockchain.connect_chunk(index, data=b"".join(headers))
|
||||
if not conn:
|
||||
return conn, 0
|
||||
@@ -1021,7 +1036,8 @@ class Interface(Logger):
|
||||
# It is more efficient to process headers in large batches (CPU/disk_usage/logging).
|
||||
# (but this wastes a little bandwidth, if we are not on a chunk boundary)
|
||||
# TODO we should request (some) chunks concurrently. would help when we are many chunks behind
|
||||
could_connect, num_headers = await self.request_chunk(height, tip=next_height)
|
||||
could_connect, num_headers = await self._fast_forward_chain(
|
||||
height=height, tip=next_height)
|
||||
if not could_connect:
|
||||
if height <= constants.net.max_checkpoint():
|
||||
raise GracefulDisconnect('server chain conflicts with checkpoints or genesis')
|
||||
|
||||
@@ -102,7 +102,7 @@ class LNChannelVerifier(NetworkJobOnDefaultServer):
|
||||
header = blockchain.read_header(block_height)
|
||||
if header is None:
|
||||
if block_height <= constants.net.max_checkpoint():
|
||||
await self.taskgroup.spawn(self.interface.request_chunk(block_height, can_return_early=True))
|
||||
await self.taskgroup.spawn(self.interface.request_chunk_below_max_checkpoint(height=block_height))
|
||||
continue
|
||||
self.started_verifying_channel.add(short_channel_id)
|
||||
await self.taskgroup.spawn(self.verify_channel(block_height, short_channel_id))
|
||||
|
||||
@@ -1309,19 +1309,6 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
|
||||
# otherwise:
|
||||
return _("Unknown error")
|
||||
|
||||
@best_effort_reliable
|
||||
@catch_server_exceptions
|
||||
async def request_chunk(
|
||||
self,
|
||||
height: int,
|
||||
*,
|
||||
tip: Optional[int] = None,
|
||||
can_return_early: bool = False,
|
||||
) -> Optional[Tuple[bool, int]]:
|
||||
if self.interface is None: # handled by best_effort_reliable
|
||||
raise RequestTimedOut()
|
||||
return await self.interface.request_chunk(height, tip=tip, can_return_early=can_return_early)
|
||||
|
||||
@best_effort_reliable
|
||||
@catch_server_exceptions
|
||||
async def get_transaction(self, tx_hash: str, *, timeout=None) -> str:
|
||||
|
||||
@@ -88,7 +88,7 @@ class SPV(NetworkJobOnDefaultServer):
|
||||
if header is None:
|
||||
if tx_height <= constants.net.max_checkpoint():
|
||||
# FIXME these requests are not counted (self._requests_sent += 1)
|
||||
await self.taskgroup.spawn(self.interface.request_chunk(tx_height, can_return_early=True))
|
||||
await self.taskgroup.spawn(self.interface.request_chunk_below_max_checkpoint(height=tx_height))
|
||||
continue
|
||||
# request now
|
||||
self.logger.info(f'requested merkle {tx_hash}')
|
||||
|
||||
Reference in New Issue
Block a user