Merge pull request #673 from PR0M3TH3AN/codex/add-thread-safety-to-nostrclient-states

Add thread-safe state to NostrClient
This commit is contained in:
thePR0M3TH3AN
2025-07-24 19:10:31 -04:00
committed by GitHub
2 changed files with 49 additions and 28 deletions

View File

@@ -8,6 +8,7 @@ from typing import List, Optional, Tuple, TYPE_CHECKING
import hashlib
import asyncio
import gzip
import threading
import websockets
# Imports from the nostr-sdk library
@@ -138,6 +139,7 @@ class NostrClient:
self.last_error: Optional[str] = None
self.delta_threshold = 100
self._state_lock = threading.Lock()
self.current_manifest: Manifest | None = None
self.current_manifest_id: str | None = None
self._delta_events: list[str] = []
@@ -399,11 +401,12 @@ class NostrClient:
.sign_with_keys(self.keys)
)
await self.client.send_event(manifest_event)
self.current_manifest = manifest
self.current_manifest_id = manifest_identifier
# Record when this snapshot was published for future delta events
self.current_manifest.delta_since = int(time.time())
self._delta_events = []
with self._state_lock:
self.current_manifest = manifest
self.current_manifest_id = manifest_identifier
# Record when this snapshot was published for future delta events
self.current_manifest.delta_since = int(time.time())
self._delta_events = []
if getattr(self, "verbose_timing", False):
duration = time.perf_counter() - start
logger.info("publish_snapshot completed in %.2f seconds", duration)
@@ -486,8 +489,9 @@ class NostrClient:
ident = tag[1]
elif isinstance(tag, str):
ident = tag
self.current_manifest = manifest
self.current_manifest_id = ident
with self._state_lock:
self.current_manifest = manifest
self.current_manifest_id = ident
return manifest, chunks
async def fetch_latest_snapshot(self) -> Tuple[Manifest, list[bytes]] | None:
@@ -551,23 +555,27 @@ class NostrClient:
if hasattr(created_at, "secs"):
created_at = created_at.secs
if self.current_manifest is not None:
self.current_manifest.delta_since = int(created_at)
manifest_json = json.dumps(
{
"ver": self.current_manifest.ver,
"algo": self.current_manifest.algo,
"chunks": [meta.__dict__ for meta in self.current_manifest.chunks],
"delta_since": self.current_manifest.delta_since,
}
)
manifest_event = (
EventBuilder(Kind(KIND_MANIFEST), manifest_json)
.tags([Tag.identifier(self.current_manifest_id)])
.build(self.keys.public_key())
.sign_with_keys(self.keys)
)
with self._state_lock:
self.current_manifest.delta_since = int(created_at)
manifest_json = json.dumps(
{
"ver": self.current_manifest.ver,
"algo": self.current_manifest.algo,
"chunks": [
meta.__dict__ for meta in self.current_manifest.chunks
],
"delta_since": self.current_manifest.delta_since,
}
)
manifest_event = (
EventBuilder(Kind(KIND_MANIFEST), manifest_json)
.tags([Tag.identifier(self.current_manifest_id)])
.build(self.keys.public_key())
.sign_with_keys(self.keys)
)
await self.client.send_event(manifest_event)
self._delta_events.append(delta_id)
with self._state_lock:
self._delta_events.append(delta_id)
return delta_id
async def fetch_deltas_since(self, version: int) -> list[bytes]:
@@ -609,6 +617,21 @@ class NostrClient:
await self.client.send_event(exp_event)
return deltas
def get_current_manifest(self) -> Manifest | None:
"""Thread-safe access to ``current_manifest``."""
with self._state_lock:
return self.current_manifest
def get_current_manifest_id(self) -> str | None:
"""Thread-safe access to ``current_manifest_id``."""
with self._state_lock:
return self.current_manifest_id
def get_delta_events(self) -> list[str]:
"""Thread-safe snapshot of pending delta event IDs."""
with self._state_lock:
return list(self._delta_events)
def close_client_pool(self) -> None:
"""Disconnects the client from all relays."""
try:

View File

@@ -3669,7 +3669,7 @@ class PasswordManager:
chunk_ids: list[str] = []
if manifest is not None:
chunk_ids = [c.event_id for c in manifest.chunks if c.event_id]
delta_ids = getattr(self.nostr_client, "_delta_events", [])
delta_ids = self.nostr_client.get_delta_events()
return {
"manifest_id": event_id,
"chunk_ids": chunk_ids,
@@ -4121,13 +4121,11 @@ class PasswordManager:
)
# Nostr sync info
manifest = getattr(self.nostr_client, "current_manifest", None)
manifest = self.nostr_client.get_current_manifest()
if manifest is not None:
stats["chunk_count"] = len(manifest.chunks)
stats["delta_since"] = manifest.delta_since
stats["pending_deltas"] = len(
getattr(self.nostr_client, "_delta_events", [])
)
stats["pending_deltas"] = len(self.nostr_client.get_delta_events())
else:
stats["chunk_count"] = 0
stats["delta_since"] = None