From 456de50ff17630bea49a5e873afc7b4d1e7bc7be Mon Sep 17 00:00:00 2001 From: thePR0M3TH3AN <53631862+PR0M3TH3AN@users.noreply.github.com> Date: Wed, 2 Jul 2025 16:11:05 -0400 Subject: [PATCH] Add delta publishing and fetching --- src/nostr/client.py | 58 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/src/nostr/client.py b/src/nostr/client.py index 2a19a72..c2aee6e 100644 --- a/src/nostr/client.py +++ b/src/nostr/client.py @@ -21,6 +21,7 @@ from nostr_sdk import ( Tag, ) from datetime import timedelta +from nostr_sdk import EventId, Timestamp from .key_manager import KeyManager as SeedPassKeyManager from .backup_models import Manifest, ChunkMeta, KIND_MANIFEST, KIND_SNAPSHOT_CHUNK @@ -113,6 +114,10 @@ class NostrClient: # store the last error encountered during network operations self.last_error: Optional[str] = None + self.delta_threshold = 100 + self.current_manifest: Manifest | None = None + self._delta_events: list[str] = [] + # Configure and initialize the nostr-sdk Client signer = NostrSigner.keys(self.keys) self.client = Client(signer) @@ -272,6 +277,8 @@ class NostrClient: .sign_with_keys(self.keys) ) await self.client.send_event(manifest_event) + self.current_manifest = manifest + self._delta_events = [] return manifest async def fetch_latest_snapshot(self) -> Tuple[Manifest, list[bytes]] | None: @@ -309,8 +316,59 @@ class NostrClient: raise ValueError(f"Checksum mismatch for chunk {meta.id}") chunks.append(chunk_bytes) + self.current_manifest = manifest return manifest, chunks + async def publish_delta(self, delta_bytes: bytes, manifest_id: str) -> str: + """Publish a delta event referencing a manifest.""" + + content = base64.b64encode(delta_bytes).decode("utf-8") + tag = Tag.event(EventId.parse(manifest_id)) + builder = EventBuilder(Kind(KIND_DELTA), content).tags([tag]) + event = builder.build(self.keys.public_key()).sign_with_keys(self.keys) + result = await self.client.send_event(event) + delta_id = result.id.to_hex() if hasattr(result, "id") else str(result) + if self.current_manifest is not None: + self.current_manifest.delta_since = delta_id + self._delta_events.append(delta_id) + return delta_id + + async def fetch_deltas_since(self, version: int) -> list[bytes]: + """Retrieve delta events newer than the given version.""" + + pubkey = self.keys.public_key() + f = ( + Filter() + .author(pubkey) + .kind(Kind(KIND_DELTA)) + .since(Timestamp.from_secs(version)) + ) + timeout = timedelta(seconds=10) + events = (await self.client.fetch_events(f, timeout)).to_vec() + deltas: list[bytes] = [] + for ev in events: + deltas.append(base64.b64decode(ev.content().encode("utf-8"))) + + if self.current_manifest is not None: + snap_size = sum(c.size for c in self.current_manifest.chunks) + if ( + len(deltas) >= self.delta_threshold + or sum(len(d) for d in deltas) > snap_size + ): + # Publish a new snapshot to consolidate deltas + joined = b"".join(deltas) + await self.publish_snapshot(joined) + exp = Timestamp.from_secs(int(time.time())) + for ev in events: + exp_builder = EventBuilder(Kind(KIND_DELTA), ev.content()).tags( + [Tag.expiration(exp)] + ) + exp_event = exp_builder.build( + self.keys.public_key() + ).sign_with_keys(self.keys) + await self.client.send_event(exp_event) + return deltas + def close_client_pool(self) -> None: """Disconnects the client from all relays.""" try: