mirror of
https://github.com/PR0M3TH3AN/SeedPass.git
synced 2025-09-08 07:18:47 +00:00
Merge pull request #614 from PR0M3TH3AN/codex/create-service-layer-for-password-manager
Add service layer for vault operations
This commit is contained in:
@@ -6,6 +6,18 @@ import typer
|
||||
|
||||
from seedpass.core.manager import PasswordManager
|
||||
from seedpass.core.entry_types import EntryType
|
||||
from seedpass.core.api import (
|
||||
VaultService,
|
||||
ProfileService,
|
||||
SyncService,
|
||||
VaultExportRequest,
|
||||
VaultImportRequest,
|
||||
ChangePasswordRequest,
|
||||
UnlockRequest,
|
||||
BackupParentSeedRequest,
|
||||
ProfileSwitchRequest,
|
||||
ProfileRemoveRequest,
|
||||
)
|
||||
import uvicorn
|
||||
from . import api as api_module
|
||||
|
||||
@@ -52,6 +64,15 @@ def _get_pm(ctx: typer.Context) -> PasswordManager:
|
||||
return pm
|
||||
|
||||
|
||||
def _get_services(
|
||||
ctx: typer.Context,
|
||||
) -> tuple[VaultService, ProfileService, SyncService]:
|
||||
"""Return service layer instances for the current context."""
|
||||
|
||||
pm = _get_pm(ctx)
|
||||
return VaultService(pm), ProfileService(pm), SyncService(pm)
|
||||
|
||||
|
||||
@app.callback(invoke_without_command=True)
|
||||
def main(ctx: typer.Context, fingerprint: Optional[str] = fingerprint_option) -> None:
|
||||
"""SeedPass CLI entry point.
|
||||
@@ -364,8 +385,8 @@ def vault_export(
|
||||
ctx: typer.Context, file: str = typer.Option(..., help="Output file")
|
||||
) -> None:
|
||||
"""Export the vault."""
|
||||
pm = _get_pm(ctx)
|
||||
pm.handle_export_database(Path(file))
|
||||
vault_service, _profile, _sync = _get_services(ctx)
|
||||
vault_service.export_vault(VaultExportRequest(path=Path(file)))
|
||||
typer.echo(str(file))
|
||||
|
||||
|
||||
@@ -374,20 +395,21 @@ def vault_import(
|
||||
ctx: typer.Context, file: str = typer.Option(..., help="Input file")
|
||||
) -> None:
|
||||
"""Import a vault from an encrypted JSON file."""
|
||||
pm = _get_pm(ctx)
|
||||
pm.handle_import_database(Path(file))
|
||||
pm.sync_vault()
|
||||
vault_service, _profile, _sync = _get_services(ctx)
|
||||
vault_service.import_vault(VaultImportRequest(path=Path(file)))
|
||||
typer.echo(str(file))
|
||||
|
||||
|
||||
@vault_app.command("change-password")
|
||||
def vault_change_password(ctx: typer.Context) -> None:
|
||||
"""Change the master password used for encryption."""
|
||||
pm = _get_pm(ctx)
|
||||
vault_service, _profile, _sync = _get_services(ctx)
|
||||
old_pw = typer.prompt("Current password", hide_input=True)
|
||||
new_pw = typer.prompt("New password", hide_input=True, confirmation_prompt=True)
|
||||
try:
|
||||
pm.change_password(old_pw, new_pw)
|
||||
vault_service.change_password(
|
||||
ChangePasswordRequest(old_password=old_pw, new_password=new_pw)
|
||||
)
|
||||
except Exception as exc: # pragma: no cover - pass through errors
|
||||
typer.echo(f"Error: {exc}")
|
||||
raise typer.Exit(code=1)
|
||||
@@ -397,29 +419,29 @@ def vault_change_password(ctx: typer.Context) -> None:
|
||||
@vault_app.command("unlock")
|
||||
def vault_unlock(ctx: typer.Context) -> None:
|
||||
"""Unlock the vault for the active profile."""
|
||||
pm = _get_pm(ctx)
|
||||
vault_service, _profile, _sync = _get_services(ctx)
|
||||
password = typer.prompt("Master password", hide_input=True)
|
||||
try:
|
||||
duration = pm.unlock_vault(password)
|
||||
resp = vault_service.unlock(UnlockRequest(password=password))
|
||||
except Exception as exc: # pragma: no cover - pass through errors
|
||||
typer.echo(f"Error: {exc}")
|
||||
raise typer.Exit(code=1)
|
||||
typer.echo(f"Unlocked in {duration:.2f}s")
|
||||
typer.echo(f"Unlocked in {resp.duration:.2f}s")
|
||||
|
||||
|
||||
@vault_app.command("lock")
|
||||
def vault_lock(ctx: typer.Context) -> None:
|
||||
"""Lock the vault and clear sensitive data from memory."""
|
||||
pm = _get_pm(ctx)
|
||||
pm.lock_vault()
|
||||
vault_service, _profile, _sync = _get_services(ctx)
|
||||
vault_service.lock()
|
||||
typer.echo("locked")
|
||||
|
||||
|
||||
@vault_app.command("stats")
|
||||
def vault_stats(ctx: typer.Context) -> None:
|
||||
"""Display statistics about the current seed profile."""
|
||||
pm = _get_pm(ctx)
|
||||
stats = pm.get_profile_stats()
|
||||
vault_service, _profile, _sync = _get_services(ctx)
|
||||
stats = vault_service.stats()
|
||||
typer.echo(json.dumps(stats, indent=2))
|
||||
|
||||
|
||||
@@ -431,21 +453,23 @@ def vault_reveal_parent_seed(
|
||||
),
|
||||
) -> None:
|
||||
"""Display the parent seed and optionally write an encrypted backup file."""
|
||||
pm = _get_pm(ctx)
|
||||
pm.handle_backup_reveal_parent_seed(Path(file) if file else None)
|
||||
vault_service, _profile, _sync = _get_services(ctx)
|
||||
vault_service.backup_parent_seed(
|
||||
BackupParentSeedRequest(path=Path(file) if file else None)
|
||||
)
|
||||
|
||||
|
||||
@nostr_app.command("sync")
|
||||
def nostr_sync(ctx: typer.Context) -> None:
|
||||
"""Sync with configured Nostr relays."""
|
||||
pm = _get_pm(ctx)
|
||||
result = pm.sync_vault()
|
||||
if result:
|
||||
_vault, _profile, sync_service = _get_services(ctx)
|
||||
model = sync_service.sync()
|
||||
if model:
|
||||
typer.echo("Event IDs:")
|
||||
typer.echo(f"- manifest: {result['manifest_id']}")
|
||||
for cid in result["chunk_ids"]:
|
||||
typer.echo(f"- manifest: {model.manifest_id}")
|
||||
for cid in model.chunk_ids:
|
||||
typer.echo(f"- chunk: {cid}")
|
||||
for did in result["delta_ids"]:
|
||||
for did in model.delta_ids:
|
||||
typer.echo(f"- delta: {did}")
|
||||
else:
|
||||
typer.echo("Error: Failed to sync vault")
|
||||
@@ -606,30 +630,30 @@ def config_toggle_offline(ctx: typer.Context) -> None:
|
||||
@fingerprint_app.command("list")
|
||||
def fingerprint_list(ctx: typer.Context) -> None:
|
||||
"""List available seed profiles."""
|
||||
pm = _get_pm(ctx)
|
||||
for fp in pm.fingerprint_manager.list_fingerprints():
|
||||
_vault, profile_service, _sync = _get_services(ctx)
|
||||
for fp in profile_service.list_profiles():
|
||||
typer.echo(fp)
|
||||
|
||||
|
||||
@fingerprint_app.command("add")
|
||||
def fingerprint_add(ctx: typer.Context) -> None:
|
||||
"""Create a new seed profile."""
|
||||
pm = _get_pm(ctx)
|
||||
pm.add_new_fingerprint()
|
||||
_vault, profile_service, _sync = _get_services(ctx)
|
||||
profile_service.add_profile()
|
||||
|
||||
|
||||
@fingerprint_app.command("remove")
|
||||
def fingerprint_remove(ctx: typer.Context, fingerprint: str) -> None:
|
||||
"""Remove a seed profile."""
|
||||
pm = _get_pm(ctx)
|
||||
pm.fingerprint_manager.remove_fingerprint(fingerprint)
|
||||
_vault, profile_service, _sync = _get_services(ctx)
|
||||
profile_service.remove_profile(ProfileRemoveRequest(fingerprint=fingerprint))
|
||||
|
||||
|
||||
@fingerprint_app.command("switch")
|
||||
def fingerprint_switch(ctx: typer.Context, fingerprint: str) -> None:
|
||||
"""Switch to another seed profile."""
|
||||
pm = _get_pm(ctx)
|
||||
pm.select_fingerprint(fingerprint)
|
||||
_vault, profile_service, _sync = _get_services(ctx)
|
||||
profile_service.switch_profile(ProfileSwitchRequest(fingerprint=fingerprint))
|
||||
|
||||
|
||||
@util_app.command("generate-password")
|
||||
|
196
src/seedpass/core/api.py
Normal file
196
src/seedpass/core/api.py
Normal file
@@ -0,0 +1,196 @@
|
||||
from __future__ import annotations
|
||||
|
||||
"""Service layer wrapping :class:`PasswordManager` operations.
|
||||
|
||||
These services provide thread-safe methods for common operations used by the CLI
|
||||
and API. Request and response payloads are represented using Pydantic models to
|
||||
allow easy validation and documentation.
|
||||
"""
|
||||
|
||||
from pathlib import Path
|
||||
from threading import Lock
|
||||
from typing import List, Optional, Dict
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from .manager import PasswordManager
|
||||
|
||||
|
||||
class VaultExportRequest(BaseModel):
|
||||
"""Parameters required to export the vault."""
|
||||
|
||||
path: Path
|
||||
|
||||
|
||||
class VaultExportResponse(BaseModel):
|
||||
"""Result of a vault export operation."""
|
||||
|
||||
path: Path
|
||||
|
||||
|
||||
class VaultImportRequest(BaseModel):
|
||||
"""Parameters required to import a vault."""
|
||||
|
||||
path: Path
|
||||
|
||||
|
||||
class ChangePasswordRequest(BaseModel):
|
||||
"""Payload for :meth:`VaultService.change_password`."""
|
||||
|
||||
old_password: str
|
||||
new_password: str
|
||||
|
||||
|
||||
class UnlockRequest(BaseModel):
|
||||
"""Payload for unlocking the vault."""
|
||||
|
||||
password: str
|
||||
|
||||
|
||||
class UnlockResponse(BaseModel):
|
||||
"""Duration taken to unlock the vault."""
|
||||
|
||||
duration: float
|
||||
|
||||
|
||||
class BackupParentSeedRequest(BaseModel):
|
||||
"""Optional path to write the encrypted seed backup."""
|
||||
|
||||
path: Optional[Path] = None
|
||||
|
||||
|
||||
class ProfileSwitchRequest(BaseModel):
|
||||
"""Select a different seed profile."""
|
||||
|
||||
fingerprint: str
|
||||
|
||||
|
||||
class ProfileRemoveRequest(BaseModel):
|
||||
"""Remove a seed profile."""
|
||||
|
||||
fingerprint: str
|
||||
|
||||
|
||||
class SyncResponse(BaseModel):
|
||||
"""Information about uploaded events after syncing."""
|
||||
|
||||
manifest_id: str
|
||||
chunk_ids: List[str] = []
|
||||
delta_ids: List[str] = []
|
||||
|
||||
|
||||
class VaultService:
|
||||
"""Thread-safe wrapper around vault operations."""
|
||||
|
||||
def __init__(self, manager: PasswordManager) -> None:
|
||||
self._manager = manager
|
||||
self._lock = Lock()
|
||||
|
||||
def export_vault(self, req: VaultExportRequest) -> VaultExportResponse:
|
||||
"""Export the vault to ``req.path``."""
|
||||
|
||||
with self._lock:
|
||||
self._manager.handle_export_database(req.path)
|
||||
return VaultExportResponse(path=req.path)
|
||||
|
||||
def import_vault(self, req: VaultImportRequest) -> None:
|
||||
"""Import the vault from ``req.path`` and sync."""
|
||||
|
||||
with self._lock:
|
||||
self._manager.handle_import_database(req.path)
|
||||
self._manager.sync_vault()
|
||||
|
||||
def change_password(self, req: ChangePasswordRequest) -> None:
|
||||
"""Change the master password."""
|
||||
|
||||
with self._lock:
|
||||
self._manager.change_password(req.old_password, req.new_password)
|
||||
|
||||
def unlock(self, req: UnlockRequest) -> UnlockResponse:
|
||||
"""Unlock the vault and return the duration."""
|
||||
|
||||
with self._lock:
|
||||
duration = self._manager.unlock_vault(req.password)
|
||||
return UnlockResponse(duration=duration)
|
||||
|
||||
def lock(self) -> None:
|
||||
"""Lock the vault and clear sensitive data."""
|
||||
|
||||
with self._lock:
|
||||
self._manager.lock_vault()
|
||||
|
||||
def backup_parent_seed(self, req: BackupParentSeedRequest) -> None:
|
||||
"""Backup and reveal the parent seed."""
|
||||
|
||||
with self._lock:
|
||||
self._manager.handle_backup_reveal_parent_seed(req.path)
|
||||
|
||||
def stats(self) -> Dict:
|
||||
"""Return statistics about the current profile."""
|
||||
|
||||
with self._lock:
|
||||
return self._manager.get_profile_stats()
|
||||
|
||||
|
||||
class ProfileService:
|
||||
"""Thread-safe wrapper around profile management operations."""
|
||||
|
||||
def __init__(self, manager: PasswordManager) -> None:
|
||||
self._manager = manager
|
||||
self._lock = Lock()
|
||||
|
||||
def list_profiles(self) -> List[str]:
|
||||
"""List available seed profiles."""
|
||||
|
||||
with self._lock:
|
||||
return list(self._manager.fingerprint_manager.list_fingerprints())
|
||||
|
||||
def add_profile(self) -> Optional[str]:
|
||||
"""Create a new seed profile and return its fingerprint if available."""
|
||||
|
||||
with self._lock:
|
||||
self._manager.add_new_fingerprint()
|
||||
return getattr(
|
||||
self._manager.fingerprint_manager, "current_fingerprint", None
|
||||
)
|
||||
|
||||
def remove_profile(self, req: ProfileRemoveRequest) -> None:
|
||||
"""Remove the specified seed profile."""
|
||||
|
||||
with self._lock:
|
||||
self._manager.fingerprint_manager.remove_fingerprint(req.fingerprint)
|
||||
|
||||
def switch_profile(self, req: ProfileSwitchRequest) -> None:
|
||||
"""Switch to ``req.fingerprint``."""
|
||||
|
||||
with self._lock:
|
||||
self._manager.select_fingerprint(req.fingerprint)
|
||||
|
||||
|
||||
class SyncService:
|
||||
"""Thread-safe wrapper around vault synchronization."""
|
||||
|
||||
def __init__(self, manager: PasswordManager) -> None:
|
||||
self._manager = manager
|
||||
self._lock = Lock()
|
||||
|
||||
def sync(self) -> Optional[SyncResponse]:
|
||||
"""Publish the vault to Nostr and return event info."""
|
||||
|
||||
with self._lock:
|
||||
result = self._manager.sync_vault()
|
||||
if not result:
|
||||
return None
|
||||
return SyncResponse(**result)
|
||||
|
||||
def start_background_sync(self) -> None:
|
||||
"""Begin background synchronization if possible."""
|
||||
|
||||
with self._lock:
|
||||
self._manager.start_background_sync()
|
||||
|
||||
def start_background_vault_sync(self, summary: Optional[str] = None) -> None:
|
||||
"""Publish the vault in a background thread."""
|
||||
|
||||
with self._lock:
|
||||
self._manager.start_background_vault_sync(summary)
|
Reference in New Issue
Block a user