mirror of
https://github.com/PR0M3TH3AN/SeedPass.git
synced 2025-09-08 07:18:47 +00:00
Add StateManager and relay CLI
This commit is contained in:
@@ -492,6 +492,39 @@ def nostr_get_pubkey(ctx: typer.Context) -> None:
|
||||
typer.echo(npub)
|
||||
|
||||
|
||||
@nostr_app.command("list-relays")
|
||||
def nostr_list_relays(ctx: typer.Context) -> None:
|
||||
"""Display configured Nostr relays."""
|
||||
service = _get_nostr_service(ctx)
|
||||
relays = service.list_relays()
|
||||
for i, r in enumerate(relays, 1):
|
||||
typer.echo(f"{i}: {r}")
|
||||
|
||||
|
||||
@nostr_app.command("add-relay")
|
||||
def nostr_add_relay(ctx: typer.Context, url: str) -> None:
|
||||
"""Add a relay URL."""
|
||||
service = _get_nostr_service(ctx)
|
||||
try:
|
||||
service.add_relay(url)
|
||||
except Exception as exc: # pragma: no cover - pass through errors
|
||||
typer.echo(f"Error: {exc}")
|
||||
raise typer.Exit(code=1)
|
||||
typer.echo("Added")
|
||||
|
||||
|
||||
@nostr_app.command("remove-relay")
|
||||
def nostr_remove_relay(ctx: typer.Context, idx: int) -> None:
|
||||
"""Remove a relay by index (1-based)."""
|
||||
service = _get_nostr_service(ctx)
|
||||
try:
|
||||
service.remove_relay(idx)
|
||||
except Exception as exc: # pragma: no cover - pass through errors
|
||||
typer.echo(f"Error: {exc}")
|
||||
raise typer.Exit(code=1)
|
||||
typer.echo("Removed")
|
||||
|
||||
|
||||
@config_app.command("get")
|
||||
def config_get(ctx: typer.Context, key: str) -> None:
|
||||
"""Get a configuration value."""
|
||||
|
@@ -4,7 +4,7 @@
|
||||
|
||||
from importlib import import_module
|
||||
|
||||
__all__ = ["PasswordManager", "ConfigManager", "Vault", "EntryType"]
|
||||
__all__ = ["PasswordManager", "ConfigManager", "Vault", "EntryType", "StateManager"]
|
||||
|
||||
|
||||
def __getattr__(name: str):
|
||||
@@ -16,4 +16,6 @@ def __getattr__(name: str):
|
||||
return import_module(".vault", __name__).Vault
|
||||
if name == "EntryType":
|
||||
return import_module(".entry_types", __name__).EntryType
|
||||
if name == "StateManager":
|
||||
return import_module(".state_manager", __name__).StateManager
|
||||
raise AttributeError(f"module '{__name__}' has no attribute '{name}'")
|
||||
|
@@ -525,3 +525,21 @@ class NostrService:
|
||||
def get_pubkey(self) -> str:
|
||||
with self._lock:
|
||||
return self._manager.nostr_client.key_manager.get_npub()
|
||||
|
||||
def list_relays(self) -> list[str]:
|
||||
with self._lock:
|
||||
return self._manager.state_manager.list_relays()
|
||||
|
||||
def add_relay(self, url: str) -> None:
|
||||
with self._lock:
|
||||
self._manager.state_manager.add_relay(url)
|
||||
self._manager.nostr_client.relays = (
|
||||
self._manager.state_manager.list_relays()
|
||||
)
|
||||
|
||||
def remove_relay(self, idx: int) -> None:
|
||||
with self._lock:
|
||||
self._manager.state_manager.remove_relay(idx)
|
||||
self._manager.nostr_client.relays = (
|
||||
self._manager.state_manager.list_relays()
|
||||
)
|
||||
|
@@ -95,6 +95,7 @@ from utils.fingerprint_manager import FingerprintManager
|
||||
# Import NostrClient
|
||||
from nostr.client import NostrClient, DEFAULT_RELAYS
|
||||
from .config_manager import ConfigManager
|
||||
from .state_manager import StateManager
|
||||
|
||||
# Instantiate the logger
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -140,6 +141,7 @@ class PasswordManager:
|
||||
self.bip85: Optional[BIP85] = None
|
||||
self.nostr_client: Optional[NostrClient] = None
|
||||
self.config_manager: Optional[ConfigManager] = None
|
||||
self.state_manager: Optional[StateManager] = None
|
||||
self.notifications: queue.Queue[Notification] = queue.Queue()
|
||||
self._current_notification: Optional[Notification] = None
|
||||
self._notification_expiry: float = 0.0
|
||||
@@ -157,6 +159,8 @@ class PasswordManager:
|
||||
self.last_unlock_duration: float | None = None
|
||||
self.verbose_timing: bool = False
|
||||
self._suppress_entry_actions_menu: bool = False
|
||||
self.last_bip85_idx: int = 0
|
||||
self.last_sync_ts: int = 0
|
||||
|
||||
# Initialize the fingerprint manager first
|
||||
self.initialize_fingerprint_manager()
|
||||
@@ -1073,6 +1077,7 @@ class PasswordManager:
|
||||
vault=self.vault,
|
||||
fingerprint_dir=self.fingerprint_dir,
|
||||
)
|
||||
self.state_manager = StateManager(self.fingerprint_dir)
|
||||
self.backup_manager = BackupManager(
|
||||
fingerprint_dir=self.fingerprint_dir,
|
||||
config_manager=self.config_manager,
|
||||
@@ -1091,7 +1096,15 @@ class PasswordManager:
|
||||
|
||||
# Load relay configuration and initialize NostrClient
|
||||
config = self.config_manager.load_config()
|
||||
relay_list = config.get("relays", list(DEFAULT_RELAYS))
|
||||
if getattr(self, "state_manager", None) is not None:
|
||||
state = self.state_manager.state
|
||||
relay_list = state.get("relays", list(DEFAULT_RELAYS))
|
||||
self.last_bip85_idx = state.get("last_bip85_idx", 0)
|
||||
self.last_sync_ts = state.get("last_sync_ts", 0)
|
||||
else:
|
||||
relay_list = list(DEFAULT_RELAYS)
|
||||
self.last_bip85_idx = 0
|
||||
self.last_sync_ts = 0
|
||||
self.offline_mode = bool(config.get("offline_mode", False))
|
||||
self.inactivity_timeout = config.get(
|
||||
"inactivity_timeout", INACTIVITY_TIMEOUT
|
||||
@@ -3966,7 +3979,11 @@ class PasswordManager:
|
||||
self.password_generator.encryption_manager = new_enc_mgr
|
||||
self.store_hashed_password(new_password)
|
||||
|
||||
relay_list = config_data.get("relays", list(DEFAULT_RELAYS))
|
||||
if getattr(self, "state_manager", None) is not None:
|
||||
state = self.state_manager.state
|
||||
relay_list = state.get("relays", list(DEFAULT_RELAYS))
|
||||
else:
|
||||
relay_list = list(DEFAULT_RELAYS)
|
||||
self.nostr_client = NostrClient(
|
||||
encryption_manager=self.encryption_manager,
|
||||
fingerprint=self.current_fingerprint,
|
||||
|
85
src/seedpass/core/state_manager.py
Normal file
85
src/seedpass/core/state_manager.py
Normal file
@@ -0,0 +1,85 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
|
||||
from utils.file_lock import exclusive_lock, shared_lock
|
||||
from nostr.client import DEFAULT_RELAYS
|
||||
|
||||
|
||||
class StateManager:
|
||||
"""Persist simple state values per profile."""
|
||||
|
||||
STATE_FILENAME = "seedpass_state.json"
|
||||
|
||||
def __init__(self, fingerprint_dir: Path) -> None:
|
||||
self.fingerprint_dir = Path(fingerprint_dir)
|
||||
self.state_path = self.fingerprint_dir / self.STATE_FILENAME
|
||||
|
||||
def _load(self) -> dict:
|
||||
if not self.state_path.exists():
|
||||
return {
|
||||
"last_bip85_idx": 0,
|
||||
"last_sync_ts": 0,
|
||||
"relays": list(DEFAULT_RELAYS),
|
||||
}
|
||||
with shared_lock(self.state_path) as fh:
|
||||
fh.seek(0)
|
||||
data = fh.read()
|
||||
if not data:
|
||||
return {
|
||||
"last_bip85_idx": 0,
|
||||
"last_sync_ts": 0,
|
||||
"relays": list(DEFAULT_RELAYS),
|
||||
}
|
||||
try:
|
||||
obj = json.loads(data.decode())
|
||||
except Exception:
|
||||
obj = {}
|
||||
obj.setdefault("last_bip85_idx", 0)
|
||||
obj.setdefault("last_sync_ts", 0)
|
||||
obj.setdefault("relays", list(DEFAULT_RELAYS))
|
||||
return obj
|
||||
|
||||
def _save(self, data: dict) -> None:
|
||||
with exclusive_lock(self.state_path) as fh:
|
||||
fh.seek(0)
|
||||
fh.truncate()
|
||||
fh.write(json.dumps(data, separators=(",", ":")).encode())
|
||||
fh.flush()
|
||||
os.fsync(fh.fileno())
|
||||
|
||||
@property
|
||||
def state(self) -> dict:
|
||||
return self._load()
|
||||
|
||||
def update_state(self, **kwargs) -> None:
|
||||
data = self._load()
|
||||
data.update(kwargs)
|
||||
self._save(data)
|
||||
|
||||
# Relay helpers
|
||||
def list_relays(self) -> List[str]:
|
||||
return self._load().get("relays", [])
|
||||
|
||||
def add_relay(self, url: str) -> None:
|
||||
data = self._load()
|
||||
relays = data.get("relays", [])
|
||||
if url in relays:
|
||||
raise ValueError("Relay already present")
|
||||
relays.append(url)
|
||||
data["relays"] = relays
|
||||
self._save(data)
|
||||
|
||||
def remove_relay(self, idx: int) -> None:
|
||||
data = self._load()
|
||||
relays = data.get("relays", [])
|
||||
if not 1 <= idx <= len(relays):
|
||||
raise ValueError("Invalid index")
|
||||
if len(relays) == 1:
|
||||
raise ValueError("At least one relay required")
|
||||
relays.pop(idx - 1)
|
||||
data["relays"] = relays
|
||||
self._save(data)
|
53
src/tests/test_cli_relays.py
Normal file
53
src/tests/test_cli_relays.py
Normal file
@@ -0,0 +1,53 @@
|
||||
from types import SimpleNamespace
|
||||
from typer.testing import CliRunner
|
||||
|
||||
from seedpass.cli import app
|
||||
from seedpass import cli
|
||||
|
||||
|
||||
class DummyService:
|
||||
def __init__(self, relays):
|
||||
self.relays = relays
|
||||
|
||||
def get_pubkey(self):
|
||||
return "npub"
|
||||
|
||||
def list_relays(self):
|
||||
return self.relays
|
||||
|
||||
def add_relay(self, url):
|
||||
if url in self.relays:
|
||||
raise ValueError("exists")
|
||||
self.relays.append(url)
|
||||
|
||||
def remove_relay(self, idx):
|
||||
if not 1 <= idx <= len(self.relays):
|
||||
raise ValueError("bad")
|
||||
if len(self.relays) == 1:
|
||||
raise ValueError("min")
|
||||
self.relays.pop(idx - 1)
|
||||
|
||||
|
||||
runner = CliRunner()
|
||||
|
||||
|
||||
def test_cli_relay_crud(monkeypatch):
|
||||
relays = ["wss://a"]
|
||||
|
||||
def pm_factory(*a, **k):
|
||||
return SimpleNamespace()
|
||||
|
||||
monkeypatch.setattr(cli, "PasswordManager", pm_factory)
|
||||
monkeypatch.setattr(cli, "NostrService", lambda pm: DummyService(relays))
|
||||
|
||||
result = runner.invoke(app, ["nostr", "list-relays"])
|
||||
assert "1: wss://a" in result.stdout
|
||||
|
||||
result = runner.invoke(app, ["nostr", "add-relay", "wss://b"])
|
||||
assert result.exit_code == 0
|
||||
assert "Added" in result.stdout
|
||||
assert relays == ["wss://a", "wss://b"]
|
||||
|
||||
result = runner.invoke(app, ["nostr", "remove-relay", "1"])
|
||||
assert result.exit_code == 0
|
||||
assert relays == ["wss://b"]
|
26
src/tests/test_state_manager.py
Normal file
26
src/tests/test_state_manager.py
Normal file
@@ -0,0 +1,26 @@
|
||||
from tempfile import TemporaryDirectory
|
||||
from pathlib import Path
|
||||
|
||||
from seedpass.core.state_manager import StateManager
|
||||
from nostr.client import DEFAULT_RELAYS
|
||||
|
||||
|
||||
def test_state_manager_round_trip():
|
||||
with TemporaryDirectory() as tmpdir:
|
||||
sm = StateManager(Path(tmpdir))
|
||||
state = sm.state
|
||||
assert state["relays"] == list(DEFAULT_RELAYS)
|
||||
assert state["last_bip85_idx"] == 0
|
||||
assert state["last_sync_ts"] == 0
|
||||
|
||||
sm.add_relay("wss://example.com")
|
||||
sm.update_state(last_bip85_idx=5, last_sync_ts=123)
|
||||
|
||||
sm2 = StateManager(Path(tmpdir))
|
||||
state2 = sm2.state
|
||||
assert "wss://example.com" in state2["relays"]
|
||||
assert state2["last_bip85_idx"] == 5
|
||||
assert state2["last_sync_ts"] == 123
|
||||
|
||||
sm2.remove_relay(1) # remove first default relay
|
||||
assert len(sm2.list_relays()) == len(DEFAULT_RELAYS)
|
Reference in New Issue
Block a user