From 8579cf7f3da5de9246fbef1e5104d62a7f27b4fa Mon Sep 17 00:00:00 2001 From: thePR0M3TH3AN <53631862+PR0M3TH3AN@users.noreply.github.com> Date: Thu, 17 Jul 2025 14:43:04 -0400 Subject: [PATCH] Add snapshot retrieval retries and fallback --- src/nostr/client.py | 95 ++++++++++++++++++---------- src/tests/test_nostr_dummy_client.py | 40 ++++++++++++ 2 files changed, 100 insertions(+), 35 deletions(-) diff --git a/src/nostr/client.py b/src/nostr/client.py index e59fd28..ea2d817 100644 --- a/src/nostr/client.py +++ b/src/nostr/client.py @@ -407,6 +407,60 @@ class NostrClient: logger.info("publish_snapshot completed in %.2f seconds", duration) return manifest, manifest_id + async def _fetch_chunks_with_retry( + self, manifest_event + ) -> tuple[Manifest, list[bytes]] | None: + """Retrieve all chunks referenced by ``manifest_event`` with retries.""" + + pubkey = self.keys.public_key() + timeout = timedelta(seconds=10) + + try: + data = json.loads(manifest_event.content()) + manifest = Manifest( + ver=data["ver"], + algo=data["algo"], + chunks=[ChunkMeta(**c) for c in data["chunks"]], + delta_since=( + int(data["delta_since"]) + if data.get("delta_since") is not None + else None + ), + ) + except Exception: + return None + + chunks: list[bytes] = [] + for meta in manifest.chunks: + attempt = 0 + chunk_bytes: bytes | None = None + while attempt < MAX_RETRIES: + cf = Filter().author(pubkey).kind(Kind(KIND_SNAPSHOT_CHUNK)) + if meta.event_id: + cf = cf.id(EventId.parse(meta.event_id)) + else: + cf = cf.identifier(meta.id) + cf = cf.limit(1) + cev = (await self.client.fetch_events(cf, timeout)).to_vec() + if cev: + candidate = base64.b64decode(cev[0].content().encode("utf-8")) + if hashlib.sha256(candidate).hexdigest() == meta.hash: + chunk_bytes = candidate + break + attempt += 1 + if attempt < MAX_RETRIES: + await asyncio.sleep(RETRY_DELAY) + if chunk_bytes is None: + return None + chunks.append(chunk_bytes) + + man_id = getattr(manifest_event, "id", None) + if hasattr(man_id, "to_hex"): + man_id = man_id.to_hex() + self.current_manifest = manifest + self.current_manifest_id = man_id + return manifest, chunks + async def fetch_latest_snapshot(self) -> Tuple[Manifest, list[bytes]] | None: """Retrieve the latest manifest and all snapshot chunks.""" if self.offline_mode or not self.relays: @@ -414,47 +468,18 @@ class NostrClient: await self._connect_async() pubkey = self.keys.public_key() - f = Filter().author(pubkey).kind(Kind(KIND_MANIFEST)).limit(1) + f = Filter().author(pubkey).kind(Kind(KIND_MANIFEST)).limit(3) timeout = timedelta(seconds=10) events = (await self.client.fetch_events(f, timeout)).to_vec() if not events: return None - manifest_event = events[0] - manifest_raw = manifest_event.content() - data = json.loads(manifest_raw) - manifest = Manifest( - ver=data["ver"], - algo=data["algo"], - chunks=[ChunkMeta(**c) for c in data["chunks"]], - delta_since=( - int(data["delta_since"]) - if data.get("delta_since") is not None - else None - ), - ) - chunks: list[bytes] = [] - for meta in manifest.chunks: - cf = Filter().author(pubkey).kind(Kind(KIND_SNAPSHOT_CHUNK)) - if meta.event_id: - cf = cf.id(EventId.parse(meta.event_id)) - else: - cf = cf.identifier(meta.id) - cf = cf.limit(1) - cev = (await self.client.fetch_events(cf, timeout)).to_vec() - if not cev: - raise ValueError(f"Missing chunk {meta.id}") - chunk_bytes = base64.b64decode(cev[0].content().encode("utf-8")) - if hashlib.sha256(chunk_bytes).hexdigest() != meta.hash: - raise ValueError(f"Checksum mismatch for chunk {meta.id}") - chunks.append(chunk_bytes) + for manifest_event in events: + result = await self._fetch_chunks_with_retry(manifest_event) + if result is not None: + return result - self.current_manifest = manifest - man_id = getattr(manifest_event, "id", None) - if hasattr(man_id, "to_hex"): - man_id = man_id.to_hex() - self.current_manifest_id = man_id - return manifest, chunks + return None async def publish_delta(self, delta_bytes: bytes, manifest_id: str) -> str: """Publish a delta event referencing a manifest.""" diff --git a/src/tests/test_nostr_dummy_client.py b/src/tests/test_nostr_dummy_client.py index fd91682..ed6ccd5 100644 --- a/src/tests/test_nostr_dummy_client.py +++ b/src/tests/test_nostr_dummy_client.py @@ -65,3 +65,43 @@ def test_publish_and_fetch_deltas(dummy_nostr_client): assert relay.manifests[-1].delta_since == second_ts deltas = asyncio.run(client.fetch_deltas_since(0)) assert deltas == [d1, d2] + + +def test_fetch_snapshot_fallback_on_missing_chunk(dummy_nostr_client, monkeypatch): + import os + import gzip + + client, relay = dummy_nostr_client + monkeypatch.setattr("nostr.client.MAX_RETRIES", 3) + monkeypatch.setattr("nostr.client.RETRY_DELAY", 0) + + data1 = os.urandom(60000) + manifest1, _ = asyncio.run(client.publish_snapshot(data1)) + + data2 = os.urandom(60000) + manifest2, _ = asyncio.run(client.publish_snapshot(data2)) + + missing = manifest2.chunks[0] + if missing.event_id: + relay.chunks.pop(missing.event_id, None) + relay.chunks.pop(missing.id, None) + + relay.filters.clear() + + fetched_manifest, chunk_bytes = asyncio.run(client.fetch_latest_snapshot()) + + assert gzip.decompress(b"".join(chunk_bytes)) == data1 + assert [c.event_id for c in fetched_manifest.chunks] == [ + c.event_id for c in manifest1.chunks + ] + + attempts = sum( + 1 + for f in relay.filters + if getattr(f, "kind_val", None) == KIND_SNAPSHOT_CHUNK + and ( + missing.id in getattr(f, "ids", []) + or (missing.event_id and missing.event_id in getattr(f, "ids", [])) + ) + ) + assert attempts == 3