Add thread-safe state access in NostrClient

This commit is contained in:
thePR0M3TH3AN
2025-07-24 19:05:31 -04:00
parent aaf7b79e59
commit 64a84c59d7
2 changed files with 49 additions and 28 deletions

View File

@@ -8,6 +8,7 @@ from typing import List, Optional, Tuple, TYPE_CHECKING
import hashlib import hashlib
import asyncio import asyncio
import gzip import gzip
import threading
import websockets import websockets
# Imports from the nostr-sdk library # Imports from the nostr-sdk library
@@ -138,6 +139,7 @@ class NostrClient:
self.last_error: Optional[str] = None self.last_error: Optional[str] = None
self.delta_threshold = 100 self.delta_threshold = 100
self._state_lock = threading.Lock()
self.current_manifest: Manifest | None = None self.current_manifest: Manifest | None = None
self.current_manifest_id: str | None = None self.current_manifest_id: str | None = None
self._delta_events: list[str] = [] self._delta_events: list[str] = []
@@ -399,6 +401,7 @@ class NostrClient:
.sign_with_keys(self.keys) .sign_with_keys(self.keys)
) )
await self.client.send_event(manifest_event) await self.client.send_event(manifest_event)
with self._state_lock:
self.current_manifest = manifest self.current_manifest = manifest
self.current_manifest_id = manifest_identifier self.current_manifest_id = manifest_identifier
# Record when this snapshot was published for future delta events # Record when this snapshot was published for future delta events
@@ -486,6 +489,7 @@ class NostrClient:
ident = tag[1] ident = tag[1]
elif isinstance(tag, str): elif isinstance(tag, str):
ident = tag ident = tag
with self._state_lock:
self.current_manifest = manifest self.current_manifest = manifest
self.current_manifest_id = ident self.current_manifest_id = ident
return manifest, chunks return manifest, chunks
@@ -551,12 +555,15 @@ class NostrClient:
if hasattr(created_at, "secs"): if hasattr(created_at, "secs"):
created_at = created_at.secs created_at = created_at.secs
if self.current_manifest is not None: if self.current_manifest is not None:
with self._state_lock:
self.current_manifest.delta_since = int(created_at) self.current_manifest.delta_since = int(created_at)
manifest_json = json.dumps( manifest_json = json.dumps(
{ {
"ver": self.current_manifest.ver, "ver": self.current_manifest.ver,
"algo": self.current_manifest.algo, "algo": self.current_manifest.algo,
"chunks": [meta.__dict__ for meta in self.current_manifest.chunks], "chunks": [
meta.__dict__ for meta in self.current_manifest.chunks
],
"delta_since": self.current_manifest.delta_since, "delta_since": self.current_manifest.delta_since,
} }
) )
@@ -567,6 +574,7 @@ class NostrClient:
.sign_with_keys(self.keys) .sign_with_keys(self.keys)
) )
await self.client.send_event(manifest_event) await self.client.send_event(manifest_event)
with self._state_lock:
self._delta_events.append(delta_id) self._delta_events.append(delta_id)
return delta_id return delta_id
@@ -609,6 +617,21 @@ class NostrClient:
await self.client.send_event(exp_event) await self.client.send_event(exp_event)
return deltas return deltas
def get_current_manifest(self) -> Manifest | None:
"""Thread-safe access to ``current_manifest``."""
with self._state_lock:
return self.current_manifest
def get_current_manifest_id(self) -> str | None:
"""Thread-safe access to ``current_manifest_id``."""
with self._state_lock:
return self.current_manifest_id
def get_delta_events(self) -> list[str]:
"""Thread-safe snapshot of pending delta event IDs."""
with self._state_lock:
return list(self._delta_events)
def close_client_pool(self) -> None: def close_client_pool(self) -> None:
"""Disconnects the client from all relays.""" """Disconnects the client from all relays."""
try: try:

View File

@@ -3669,7 +3669,7 @@ class PasswordManager:
chunk_ids: list[str] = [] chunk_ids: list[str] = []
if manifest is not None: if manifest is not None:
chunk_ids = [c.event_id for c in manifest.chunks if c.event_id] chunk_ids = [c.event_id for c in manifest.chunks if c.event_id]
delta_ids = getattr(self.nostr_client, "_delta_events", []) delta_ids = self.nostr_client.get_delta_events()
return { return {
"manifest_id": event_id, "manifest_id": event_id,
"chunk_ids": chunk_ids, "chunk_ids": chunk_ids,
@@ -4121,13 +4121,11 @@ class PasswordManager:
) )
# Nostr sync info # Nostr sync info
manifest = getattr(self.nostr_client, "current_manifest", None) manifest = self.nostr_client.get_current_manifest()
if manifest is not None: if manifest is not None:
stats["chunk_count"] = len(manifest.chunks) stats["chunk_count"] = len(manifest.chunks)
stats["delta_since"] = manifest.delta_since stats["delta_since"] = manifest.delta_since
stats["pending_deltas"] = len( stats["pending_deltas"] = len(self.nostr_client.get_delta_events())
getattr(self.nostr_client, "_delta_events", [])
)
else: else:
stats["chunk_count"] = 0 stats["chunk_count"] = 0
stats["delta_since"] = None stats["delta_since"] = None