mirror of
https://github.com/PR0M3TH3AN/SeedPass.git
synced 2025-09-08 07:18:47 +00:00
Add snapshot retrieval retries and fallback
This commit is contained in:
@@ -407,6 +407,60 @@ class NostrClient:
|
||||
logger.info("publish_snapshot completed in %.2f seconds", duration)
|
||||
return manifest, manifest_id
|
||||
|
||||
async def _fetch_chunks_with_retry(
|
||||
self, manifest_event
|
||||
) -> tuple[Manifest, list[bytes]] | None:
|
||||
"""Retrieve all chunks referenced by ``manifest_event`` with retries."""
|
||||
|
||||
pubkey = self.keys.public_key()
|
||||
timeout = timedelta(seconds=10)
|
||||
|
||||
try:
|
||||
data = json.loads(manifest_event.content())
|
||||
manifest = Manifest(
|
||||
ver=data["ver"],
|
||||
algo=data["algo"],
|
||||
chunks=[ChunkMeta(**c) for c in data["chunks"]],
|
||||
delta_since=(
|
||||
int(data["delta_since"])
|
||||
if data.get("delta_since") is not None
|
||||
else None
|
||||
),
|
||||
)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
chunks: list[bytes] = []
|
||||
for meta in manifest.chunks:
|
||||
attempt = 0
|
||||
chunk_bytes: bytes | None = None
|
||||
while attempt < MAX_RETRIES:
|
||||
cf = Filter().author(pubkey).kind(Kind(KIND_SNAPSHOT_CHUNK))
|
||||
if meta.event_id:
|
||||
cf = cf.id(EventId.parse(meta.event_id))
|
||||
else:
|
||||
cf = cf.identifier(meta.id)
|
||||
cf = cf.limit(1)
|
||||
cev = (await self.client.fetch_events(cf, timeout)).to_vec()
|
||||
if cev:
|
||||
candidate = base64.b64decode(cev[0].content().encode("utf-8"))
|
||||
if hashlib.sha256(candidate).hexdigest() == meta.hash:
|
||||
chunk_bytes = candidate
|
||||
break
|
||||
attempt += 1
|
||||
if attempt < MAX_RETRIES:
|
||||
await asyncio.sleep(RETRY_DELAY)
|
||||
if chunk_bytes is None:
|
||||
return None
|
||||
chunks.append(chunk_bytes)
|
||||
|
||||
man_id = getattr(manifest_event, "id", None)
|
||||
if hasattr(man_id, "to_hex"):
|
||||
man_id = man_id.to_hex()
|
||||
self.current_manifest = manifest
|
||||
self.current_manifest_id = man_id
|
||||
return manifest, chunks
|
||||
|
||||
async def fetch_latest_snapshot(self) -> Tuple[Manifest, list[bytes]] | None:
|
||||
"""Retrieve the latest manifest and all snapshot chunks."""
|
||||
if self.offline_mode or not self.relays:
|
||||
@@ -414,47 +468,18 @@ class NostrClient:
|
||||
await self._connect_async()
|
||||
|
||||
pubkey = self.keys.public_key()
|
||||
f = Filter().author(pubkey).kind(Kind(KIND_MANIFEST)).limit(1)
|
||||
f = Filter().author(pubkey).kind(Kind(KIND_MANIFEST)).limit(3)
|
||||
timeout = timedelta(seconds=10)
|
||||
events = (await self.client.fetch_events(f, timeout)).to_vec()
|
||||
if not events:
|
||||
return None
|
||||
manifest_event = events[0]
|
||||
manifest_raw = manifest_event.content()
|
||||
data = json.loads(manifest_raw)
|
||||
manifest = Manifest(
|
||||
ver=data["ver"],
|
||||
algo=data["algo"],
|
||||
chunks=[ChunkMeta(**c) for c in data["chunks"]],
|
||||
delta_since=(
|
||||
int(data["delta_since"])
|
||||
if data.get("delta_since") is not None
|
||||
else None
|
||||
),
|
||||
)
|
||||
|
||||
chunks: list[bytes] = []
|
||||
for meta in manifest.chunks:
|
||||
cf = Filter().author(pubkey).kind(Kind(KIND_SNAPSHOT_CHUNK))
|
||||
if meta.event_id:
|
||||
cf = cf.id(EventId.parse(meta.event_id))
|
||||
else:
|
||||
cf = cf.identifier(meta.id)
|
||||
cf = cf.limit(1)
|
||||
cev = (await self.client.fetch_events(cf, timeout)).to_vec()
|
||||
if not cev:
|
||||
raise ValueError(f"Missing chunk {meta.id}")
|
||||
chunk_bytes = base64.b64decode(cev[0].content().encode("utf-8"))
|
||||
if hashlib.sha256(chunk_bytes).hexdigest() != meta.hash:
|
||||
raise ValueError(f"Checksum mismatch for chunk {meta.id}")
|
||||
chunks.append(chunk_bytes)
|
||||
for manifest_event in events:
|
||||
result = await self._fetch_chunks_with_retry(manifest_event)
|
||||
if result is not None:
|
||||
return result
|
||||
|
||||
self.current_manifest = manifest
|
||||
man_id = getattr(manifest_event, "id", None)
|
||||
if hasattr(man_id, "to_hex"):
|
||||
man_id = man_id.to_hex()
|
||||
self.current_manifest_id = man_id
|
||||
return manifest, chunks
|
||||
return None
|
||||
|
||||
async def publish_delta(self, delta_bytes: bytes, manifest_id: str) -> str:
|
||||
"""Publish a delta event referencing a manifest."""
|
||||
|
@@ -65,3 +65,43 @@ def test_publish_and_fetch_deltas(dummy_nostr_client):
|
||||
assert relay.manifests[-1].delta_since == second_ts
|
||||
deltas = asyncio.run(client.fetch_deltas_since(0))
|
||||
assert deltas == [d1, d2]
|
||||
|
||||
|
||||
def test_fetch_snapshot_fallback_on_missing_chunk(dummy_nostr_client, monkeypatch):
|
||||
import os
|
||||
import gzip
|
||||
|
||||
client, relay = dummy_nostr_client
|
||||
monkeypatch.setattr("nostr.client.MAX_RETRIES", 3)
|
||||
monkeypatch.setattr("nostr.client.RETRY_DELAY", 0)
|
||||
|
||||
data1 = os.urandom(60000)
|
||||
manifest1, _ = asyncio.run(client.publish_snapshot(data1))
|
||||
|
||||
data2 = os.urandom(60000)
|
||||
manifest2, _ = asyncio.run(client.publish_snapshot(data2))
|
||||
|
||||
missing = manifest2.chunks[0]
|
||||
if missing.event_id:
|
||||
relay.chunks.pop(missing.event_id, None)
|
||||
relay.chunks.pop(missing.id, None)
|
||||
|
||||
relay.filters.clear()
|
||||
|
||||
fetched_manifest, chunk_bytes = asyncio.run(client.fetch_latest_snapshot())
|
||||
|
||||
assert gzip.decompress(b"".join(chunk_bytes)) == data1
|
||||
assert [c.event_id for c in fetched_manifest.chunks] == [
|
||||
c.event_id for c in manifest1.chunks
|
||||
]
|
||||
|
||||
attempts = sum(
|
||||
1
|
||||
for f in relay.filters
|
||||
if getattr(f, "kind_val", None) == KIND_SNAPSHOT_CHUNK
|
||||
and (
|
||||
missing.id in getattr(f, "ids", [])
|
||||
or (missing.event_id and missing.event_id in getattr(f, "ids", []))
|
||||
)
|
||||
)
|
||||
assert attempts == 3
|
||||
|
Reference in New Issue
Block a user