Merge pull request #606 from PR0M3TH3AN/codex/add-fetch-chunks-with-retry-helper

Add nostr snapshot retries and fallback
This commit is contained in:
thePR0M3TH3AN
2025-07-17 14:49:48 -04:00
committed by GitHub
2 changed files with 100 additions and 35 deletions

View File

@@ -407,21 +407,16 @@ class NostrClient:
logger.info("publish_snapshot completed in %.2f seconds", duration) logger.info("publish_snapshot completed in %.2f seconds", duration)
return manifest, manifest_id return manifest, manifest_id
async def fetch_latest_snapshot(self) -> Tuple[Manifest, list[bytes]] | None: async def _fetch_chunks_with_retry(
"""Retrieve the latest manifest and all snapshot chunks.""" self, manifest_event
if self.offline_mode or not self.relays: ) -> tuple[Manifest, list[bytes]] | None:
return None """Retrieve all chunks referenced by ``manifest_event`` with retries."""
await self._connect_async()
pubkey = self.keys.public_key() pubkey = self.keys.public_key()
f = Filter().author(pubkey).kind(Kind(KIND_MANIFEST)).limit(1)
timeout = timedelta(seconds=10) timeout = timedelta(seconds=10)
events = (await self.client.fetch_events(f, timeout)).to_vec()
if not events: try:
return None data = json.loads(manifest_event.content())
manifest_event = events[0]
manifest_raw = manifest_event.content()
data = json.loads(manifest_raw)
manifest = Manifest( manifest = Manifest(
ver=data["ver"], ver=data["ver"],
algo=data["algo"], algo=data["algo"],
@@ -432,9 +427,14 @@ class NostrClient:
else None else None
), ),
) )
except Exception:
return None
chunks: list[bytes] = [] chunks: list[bytes] = []
for meta in manifest.chunks: for meta in manifest.chunks:
attempt = 0
chunk_bytes: bytes | None = None
while attempt < MAX_RETRIES:
cf = Filter().author(pubkey).kind(Kind(KIND_SNAPSHOT_CHUNK)) cf = Filter().author(pubkey).kind(Kind(KIND_SNAPSHOT_CHUNK))
if meta.event_id: if meta.event_id:
cf = cf.id(EventId.parse(meta.event_id)) cf = cf.id(EventId.parse(meta.event_id))
@@ -442,20 +442,45 @@ class NostrClient:
cf = cf.identifier(meta.id) cf = cf.identifier(meta.id)
cf = cf.limit(1) cf = cf.limit(1)
cev = (await self.client.fetch_events(cf, timeout)).to_vec() cev = (await self.client.fetch_events(cf, timeout)).to_vec()
if not cev: if cev:
raise ValueError(f"Missing chunk {meta.id}") candidate = base64.b64decode(cev[0].content().encode("utf-8"))
chunk_bytes = base64.b64decode(cev[0].content().encode("utf-8")) if hashlib.sha256(candidate).hexdigest() == meta.hash:
if hashlib.sha256(chunk_bytes).hexdigest() != meta.hash: chunk_bytes = candidate
raise ValueError(f"Checksum mismatch for chunk {meta.id}") break
attempt += 1
if attempt < MAX_RETRIES:
await asyncio.sleep(RETRY_DELAY)
if chunk_bytes is None:
return None
chunks.append(chunk_bytes) chunks.append(chunk_bytes)
self.current_manifest = manifest
man_id = getattr(manifest_event, "id", None) man_id = getattr(manifest_event, "id", None)
if hasattr(man_id, "to_hex"): if hasattr(man_id, "to_hex"):
man_id = man_id.to_hex() man_id = man_id.to_hex()
self.current_manifest = manifest
self.current_manifest_id = man_id self.current_manifest_id = man_id
return manifest, chunks return manifest, chunks
async def fetch_latest_snapshot(self) -> Tuple[Manifest, list[bytes]] | None:
"""Retrieve the latest manifest and all snapshot chunks."""
if self.offline_mode or not self.relays:
return None
await self._connect_async()
pubkey = self.keys.public_key()
f = Filter().author(pubkey).kind(Kind(KIND_MANIFEST)).limit(3)
timeout = timedelta(seconds=10)
events = (await self.client.fetch_events(f, timeout)).to_vec()
if not events:
return None
for manifest_event in events:
result = await self._fetch_chunks_with_retry(manifest_event)
if result is not None:
return result
return None
async def publish_delta(self, delta_bytes: bytes, manifest_id: str) -> str: async def publish_delta(self, delta_bytes: bytes, manifest_id: str) -> str:
"""Publish a delta event referencing a manifest.""" """Publish a delta event referencing a manifest."""
if self.offline_mode or not self.relays: if self.offline_mode or not self.relays:

View File

@@ -65,3 +65,43 @@ def test_publish_and_fetch_deltas(dummy_nostr_client):
assert relay.manifests[-1].delta_since == second_ts assert relay.manifests[-1].delta_since == second_ts
deltas = asyncio.run(client.fetch_deltas_since(0)) deltas = asyncio.run(client.fetch_deltas_since(0))
assert deltas == [d1, d2] assert deltas == [d1, d2]
def test_fetch_snapshot_fallback_on_missing_chunk(dummy_nostr_client, monkeypatch):
import os
import gzip
client, relay = dummy_nostr_client
monkeypatch.setattr("nostr.client.MAX_RETRIES", 3)
monkeypatch.setattr("nostr.client.RETRY_DELAY", 0)
data1 = os.urandom(60000)
manifest1, _ = asyncio.run(client.publish_snapshot(data1))
data2 = os.urandom(60000)
manifest2, _ = asyncio.run(client.publish_snapshot(data2))
missing = manifest2.chunks[0]
if missing.event_id:
relay.chunks.pop(missing.event_id, None)
relay.chunks.pop(missing.id, None)
relay.filters.clear()
fetched_manifest, chunk_bytes = asyncio.run(client.fetch_latest_snapshot())
assert gzip.decompress(b"".join(chunk_bytes)) == data1
assert [c.event_id for c in fetched_manifest.chunks] == [
c.event_id for c in manifest1.chunks
]
attempts = sum(
1
for f in relay.filters
if getattr(f, "kind_val", None) == KIND_SNAPSHOT_CHUNK
and (
missing.id in getattr(f, "ids", [])
or (missing.event_id and missing.event_id in getattr(f, "ids", []))
)
)
assert attempts == 3