Merge pull request #148 from PR0M3TH3AN/codex/add-methods-for-delta-publishing-and-fetching

Implement delta event support in Nostr client
This commit is contained in:
thePR0M3TH3AN
2025-07-02 16:12:36 -04:00
committed by GitHub

View File

@@ -21,6 +21,7 @@ from nostr_sdk import (
Tag,
)
from datetime import timedelta
from nostr_sdk import EventId, Timestamp
from .key_manager import KeyManager as SeedPassKeyManager
from .backup_models import Manifest, ChunkMeta, KIND_MANIFEST, KIND_SNAPSHOT_CHUNK
@@ -113,6 +114,10 @@ class NostrClient:
# store the last error encountered during network operations
self.last_error: Optional[str] = None
self.delta_threshold = 100
self.current_manifest: Manifest | None = None
self._delta_events: list[str] = []
# Configure and initialize the nostr-sdk Client
signer = NostrSigner.keys(self.keys)
self.client = Client(signer)
@@ -272,6 +277,8 @@ class NostrClient:
.sign_with_keys(self.keys)
)
await self.client.send_event(manifest_event)
self.current_manifest = manifest
self._delta_events = []
return manifest
async def fetch_latest_snapshot(self) -> Tuple[Manifest, list[bytes]] | None:
@@ -309,8 +316,59 @@ class NostrClient:
raise ValueError(f"Checksum mismatch for chunk {meta.id}")
chunks.append(chunk_bytes)
self.current_manifest = manifest
return manifest, chunks
async def publish_delta(self, delta_bytes: bytes, manifest_id: str) -> str:
"""Publish a delta event referencing a manifest."""
content = base64.b64encode(delta_bytes).decode("utf-8")
tag = Tag.event(EventId.parse(manifest_id))
builder = EventBuilder(Kind(KIND_DELTA), content).tags([tag])
event = builder.build(self.keys.public_key()).sign_with_keys(self.keys)
result = await self.client.send_event(event)
delta_id = result.id.to_hex() if hasattr(result, "id") else str(result)
if self.current_manifest is not None:
self.current_manifest.delta_since = delta_id
self._delta_events.append(delta_id)
return delta_id
async def fetch_deltas_since(self, version: int) -> list[bytes]:
"""Retrieve delta events newer than the given version."""
pubkey = self.keys.public_key()
f = (
Filter()
.author(pubkey)
.kind(Kind(KIND_DELTA))
.since(Timestamp.from_secs(version))
)
timeout = timedelta(seconds=10)
events = (await self.client.fetch_events(f, timeout)).to_vec()
deltas: list[bytes] = []
for ev in events:
deltas.append(base64.b64decode(ev.content().encode("utf-8")))
if self.current_manifest is not None:
snap_size = sum(c.size for c in self.current_manifest.chunks)
if (
len(deltas) >= self.delta_threshold
or sum(len(d) for d in deltas) > snap_size
):
# Publish a new snapshot to consolidate deltas
joined = b"".join(deltas)
await self.publish_snapshot(joined)
exp = Timestamp.from_secs(int(time.time()))
for ev in events:
exp_builder = EventBuilder(Kind(KIND_DELTA), ev.content()).tags(
[Tag.expiration(exp)]
)
exp_event = exp_builder.build(
self.keys.public_key()
).sign_with_keys(self.keys)
await self.client.send_event(exp_event)
return deltas
def close_client_pool(self) -> None:
"""Disconnects the client from all relays."""
try: