Files
seedPass/src/seedpass/core/api.py
2025-08-01 10:38:40 -04:00

705 lines
22 KiB
Python

from __future__ import annotations
"""Service layer wrapping :class:`PasswordManager` operations.
These services provide thread-safe methods for common operations used by the CLI
and API. Request and response payloads are represented using Pydantic models to
allow easy validation and documentation.
"""
from pathlib import Path
from threading import Lock
from typing import List, Optional, Dict, Any
import dataclasses
import json
from pydantic import BaseModel
from .manager import PasswordManager
from .pubsub import bus
class VaultExportRequest(BaseModel):
"""Parameters required to export the vault."""
path: Path
class VaultExportResponse(BaseModel):
"""Result of a vault export operation."""
path: Path
class VaultImportRequest(BaseModel):
"""Parameters required to import a vault."""
path: Path
class ChangePasswordRequest(BaseModel):
"""Payload for :meth:`VaultService.change_password`."""
old_password: str
new_password: str
class UnlockRequest(BaseModel):
"""Payload for unlocking the vault."""
password: str
class UnlockResponse(BaseModel):
"""Duration taken to unlock the vault."""
duration: float
class BackupParentSeedRequest(BaseModel):
"""Optional path to write the encrypted seed backup."""
path: Optional[Path] = None
password: Optional[str] = None
class ProfileSwitchRequest(BaseModel):
"""Select a different seed profile."""
fingerprint: str
password: Optional[str] = None
class ProfileRemoveRequest(BaseModel):
"""Remove a seed profile."""
fingerprint: str
class SyncResponse(BaseModel):
"""Information about uploaded events after syncing."""
manifest_id: str
chunk_ids: List[str] = []
delta_ids: List[str] = []
class PasswordPolicyOptions(BaseModel):
"""Optional password policy overrides."""
include_special_chars: bool | None = None
allowed_special_chars: str | None = None
special_mode: str | None = None
exclude_ambiguous: bool | None = None
min_uppercase: int | None = None
min_lowercase: int | None = None
min_digits: int | None = None
min_special: int | None = None
class AddPasswordEntryRequest(PasswordPolicyOptions):
label: str
length: int
username: str | None = None
url: str | None = None
class GeneratePasswordRequest(PasswordPolicyOptions):
length: int
class GeneratePasswordResponse(BaseModel):
password: str
class VaultService:
"""Thread-safe wrapper around vault operations."""
def __init__(self, manager: PasswordManager) -> None:
self._manager = manager
self._lock = Lock()
def export_vault(self, req: VaultExportRequest) -> VaultExportResponse:
"""Export the vault to ``req.path``."""
with self._lock:
self._manager.handle_export_database(req.path)
return VaultExportResponse(path=req.path)
def import_vault(self, req: VaultImportRequest) -> None:
"""Import the vault from ``req.path`` and sync."""
with self._lock:
self._manager.handle_import_database(req.path)
self._manager.sync_vault()
def export_profile(self) -> bytes:
"""Return encrypted profile data for backup."""
with self._lock:
data = self._manager.vault.load_index()
payload = json.dumps(data, sort_keys=True, separators=(",", ":")).encode(
"utf-8"
)
return self._manager.vault.encryption_manager.encrypt_data(payload)
def import_profile(self, data: bytes) -> None:
"""Restore a profile from ``data`` and sync."""
with self._lock:
decrypted = self._manager.vault.encryption_manager.decrypt_data(data)
index = json.loads(decrypted.decode("utf-8"))
self._manager.vault.save_index(index)
self._manager.sync_vault()
def change_password(self, req: ChangePasswordRequest) -> None:
"""Change the master password."""
with self._lock:
self._manager.change_password(req.old_password, req.new_password)
def unlock(self, req: UnlockRequest) -> UnlockResponse:
"""Unlock the vault and return the duration."""
with self._lock:
duration = self._manager.unlock_vault(req.password)
return UnlockResponse(duration=duration)
def lock(self) -> None:
"""Lock the vault and clear sensitive data."""
with self._lock:
self._manager.lock_vault()
def backup_parent_seed(self, req: BackupParentSeedRequest) -> None:
"""Backup and reveal the parent seed."""
with self._lock:
self._manager.handle_backup_reveal_parent_seed(
req.path, password=req.password
)
def stats(self) -> Dict:
"""Return statistics about the current profile."""
with self._lock:
return self._manager.get_profile_stats()
class ProfileService:
"""Thread-safe wrapper around profile management operations."""
def __init__(self, manager: PasswordManager) -> None:
self._manager = manager
self._lock = Lock()
def list_profiles(self) -> List[str]:
"""List available seed profiles."""
with self._lock:
return list(self._manager.fingerprint_manager.list_fingerprints())
def add_profile(self) -> Optional[str]:
"""Create a new seed profile and return its fingerprint if available."""
with self._lock:
self._manager.add_new_fingerprint()
return getattr(
self._manager.fingerprint_manager, "current_fingerprint", None
)
def remove_profile(self, req: ProfileRemoveRequest) -> None:
"""Remove the specified seed profile."""
with self._lock:
self._manager.fingerprint_manager.remove_fingerprint(req.fingerprint)
def switch_profile(self, req: ProfileSwitchRequest) -> None:
"""Switch to ``req.fingerprint``."""
with self._lock:
self._manager.select_fingerprint(req.fingerprint, password=req.password)
class SyncService:
"""Thread-safe wrapper around vault synchronization."""
def __init__(self, manager: PasswordManager) -> None:
self._manager = manager
self._lock = Lock()
def sync(self) -> Optional[SyncResponse]:
"""Publish the vault to Nostr and return event info."""
with self._lock:
bus.publish("sync_started")
result = self._manager.sync_vault()
bus.publish("sync_finished", result)
if not result:
return None
return SyncResponse(**result)
def start_background_sync(self) -> None:
"""Begin background synchronization if possible."""
with self._lock:
self._manager.start_background_sync()
def start_background_vault_sync(self, summary: Optional[str] = None) -> None:
"""Publish the vault in a background thread."""
with self._lock:
self._manager.start_background_vault_sync(summary)
class EntryService:
"""Thread-safe wrapper around entry operations."""
def __init__(self, manager: PasswordManager) -> None:
self._manager = manager
self._lock = Lock()
def list_entries(
self,
sort_by: str = "index",
filter_kind: str | None = None,
include_archived: bool = False,
):
with self._lock:
return self._manager.entry_manager.list_entries(
sort_by=sort_by,
filter_kind=filter_kind,
include_archived=include_archived,
)
def search_entries(
self, query: str, kinds: list[str] | None = None
) -> list[tuple[int, str, str | None, str | None, bool]]:
"""Search entries optionally filtering by ``kinds``.
Parameters
----------
query:
Search string to match against entry metadata.
kinds:
Optional list of entry kinds to restrict the search.
"""
with self._lock:
return self._manager.entry_manager.search_entries(query, kinds=kinds)
def retrieve_entry(self, entry_id: int):
with self._lock:
return self._manager.entry_manager.retrieve_entry(entry_id)
def generate_password(self, length: int, index: int) -> str:
with self._lock:
entry = self._manager.entry_manager.retrieve_entry(index)
gen_fn = getattr(self._manager, "_generate_password_for_entry", None)
if gen_fn is None:
return self._manager.password_generator.generate_password(length, index)
return gen_fn(entry, index, length)
def get_totp_code(self, entry_id: int) -> str:
with self._lock:
return self._manager.entry_manager.get_totp_code(
entry_id, self._manager.parent_seed
)
def add_entry(
self,
label: str,
length: int,
username: str | None = None,
url: str | None = None,
*,
include_special_chars: bool | None = None,
allowed_special_chars: str | None = None,
special_mode: str | None = None,
exclude_ambiguous: bool | None = None,
min_uppercase: int | None = None,
min_lowercase: int | None = None,
min_digits: int | None = None,
min_special: int | None = None,
) -> int:
with self._lock:
kwargs: dict[str, Any] = {}
if include_special_chars is not None:
kwargs["include_special_chars"] = include_special_chars
if allowed_special_chars is not None:
kwargs["allowed_special_chars"] = allowed_special_chars
if special_mode is not None:
kwargs["special_mode"] = special_mode
if exclude_ambiguous is not None:
kwargs["exclude_ambiguous"] = exclude_ambiguous
if min_uppercase is not None:
kwargs["min_uppercase"] = min_uppercase
if min_lowercase is not None:
kwargs["min_lowercase"] = min_lowercase
if min_digits is not None:
kwargs["min_digits"] = min_digits
if min_special is not None:
kwargs["min_special"] = min_special
idx = self._manager.entry_manager.add_entry(
label,
length,
username,
url,
**kwargs,
)
self._manager.start_background_vault_sync()
return idx
def add_totp(
self,
label: str,
*,
index: int | None = None,
secret: str | None = None,
period: int = 30,
digits: int = 6,
) -> str:
with self._lock:
uri = self._manager.entry_manager.add_totp(
label,
self._manager.parent_seed,
index=index,
secret=secret,
period=period,
digits=digits,
)
self._manager.start_background_vault_sync()
return uri
def add_ssh_key(
self,
label: str,
*,
index: int | None = None,
notes: str = "",
) -> int:
with self._lock:
idx = self._manager.entry_manager.add_ssh_key(
label,
self._manager.parent_seed,
index=index,
notes=notes,
)
self._manager.start_background_vault_sync()
return idx
def add_pgp_key(
self,
label: str,
*,
index: int | None = None,
key_type: str = "ed25519",
user_id: str = "",
notes: str = "",
) -> int:
with self._lock:
idx = self._manager.entry_manager.add_pgp_key(
label,
self._manager.parent_seed,
index=index,
key_type=key_type,
user_id=user_id,
notes=notes,
)
self._manager.start_background_vault_sync()
return idx
def add_nostr_key(
self,
label: str,
*,
index: int | None = None,
notes: str = "",
) -> int:
with self._lock:
idx = self._manager.entry_manager.add_nostr_key(
label,
self._manager.parent_seed,
index=index,
notes=notes,
)
self._manager.start_background_vault_sync()
return idx
def add_seed(
self,
label: str,
*,
index: int | None = None,
words: int = 24,
notes: str = "",
) -> int:
with self._lock:
idx = self._manager.entry_manager.add_seed(
label,
self._manager.parent_seed,
index=index,
words_num=words,
notes=notes,
)
self._manager.start_background_vault_sync()
return idx
def add_key_value(
self, label: str, key: str, value: str, *, notes: str = ""
) -> int:
with self._lock:
idx = self._manager.entry_manager.add_key_value(
label, key, value, notes=notes
)
self._manager.start_background_vault_sync()
return idx
def add_managed_account(
self,
label: str,
*,
index: int | None = None,
notes: str = "",
) -> int:
with self._lock:
idx = self._manager.entry_manager.add_managed_account(
label,
self._manager.parent_seed,
index=index,
notes=notes,
)
self._manager.start_background_vault_sync()
return idx
def modify_entry(
self,
entry_id: int,
*,
username: str | None = None,
url: str | None = None,
notes: str | None = None,
label: str | None = None,
period: int | None = None,
digits: int | None = None,
key: str | None = None,
value: str | None = None,
) -> None:
with self._lock:
self._manager.entry_manager.modify_entry(
entry_id,
username=username,
url=url,
notes=notes,
label=label,
period=period,
digits=digits,
key=key,
value=value,
)
self._manager.start_background_vault_sync()
def archive_entry(self, entry_id: int) -> None:
with self._lock:
self._manager.entry_manager.archive_entry(entry_id)
self._manager.start_background_vault_sync()
def restore_entry(self, entry_id: int) -> None:
with self._lock:
self._manager.entry_manager.restore_entry(entry_id)
self._manager.start_background_vault_sync()
def export_totp_entries(self) -> dict:
with self._lock:
return self._manager.entry_manager.export_totp_entries(
self._manager.parent_seed
)
def display_totp_codes(self) -> None:
with self._lock:
self._manager.handle_display_totp_codes()
class ConfigService:
"""Thread-safe wrapper around configuration access."""
def __init__(self, manager: PasswordManager) -> None:
self._manager = manager
self._lock = Lock()
def get(self, key: str):
with self._lock:
return self._manager.config_manager.load_config(require_pin=False).get(key)
def set(self, key: str, value: str) -> None:
cfg = self._manager.config_manager
mapping = {
"inactivity_timeout": ("set_inactivity_timeout", float),
"secret_mode_enabled": (
"set_secret_mode_enabled",
lambda v: v.lower() in ("1", "true", "yes", "y", "on"),
),
"clipboard_clear_delay": ("set_clipboard_clear_delay", int),
"additional_backup_path": (
"set_additional_backup_path",
lambda v: v or None,
),
"relays": ("set_relays", lambda v: (v, {"require_pin": False})),
"kdf_iterations": ("set_kdf_iterations", int),
"kdf_mode": ("set_kdf_mode", lambda v: v),
"backup_interval": ("set_backup_interval", float),
"nostr_max_retries": ("set_nostr_max_retries", int),
"nostr_retry_delay": ("set_nostr_retry_delay", float),
"min_uppercase": ("set_min_uppercase", int),
"min_lowercase": ("set_min_lowercase", int),
"min_digits": ("set_min_digits", int),
"min_special": ("set_min_special", int),
"include_special_chars": (
"set_include_special_chars",
lambda v: v.lower() in ("1", "true", "yes", "y", "on"),
),
"allowed_special_chars": ("set_allowed_special_chars", lambda v: v),
"special_mode": ("set_special_mode", lambda v: v),
"exclude_ambiguous": (
"set_exclude_ambiguous",
lambda v: v.lower() in ("1", "true", "yes", "y", "on"),
),
"quick_unlock": (
"set_quick_unlock",
lambda v: v.lower() in ("1", "true", "yes", "y", "on"),
),
}
entry = mapping.get(key)
if entry is None:
raise KeyError(key)
method_name, conv = entry
with self._lock:
result = conv(value)
if (
isinstance(result, tuple)
and len(result) == 2
and isinstance(result[1], dict)
):
arg, kwargs = result
getattr(cfg, method_name)(arg, **kwargs)
else:
getattr(cfg, method_name)(result)
def get_secret_mode_enabled(self) -> bool:
with self._lock:
return self._manager.config_manager.get_secret_mode_enabled()
def get_clipboard_clear_delay(self) -> int:
with self._lock:
return self._manager.config_manager.get_clipboard_clear_delay()
def set_secret_mode(self, enabled: bool, delay: int) -> None:
with self._lock:
cfg = self._manager.config_manager
cfg.set_secret_mode_enabled(enabled)
cfg.set_clipboard_clear_delay(delay)
self._manager.secret_mode_enabled = enabled
self._manager.clipboard_clear_delay = delay
def get_offline_mode(self) -> bool:
with self._lock:
return self._manager.config_manager.get_offline_mode()
def set_offline_mode(self, enabled: bool) -> None:
with self._lock:
cfg = self._manager.config_manager
cfg.set_offline_mode(enabled)
self._manager.offline_mode = enabled
class UtilityService:
"""Miscellaneous helper operations."""
def __init__(self, manager: PasswordManager) -> None:
self._manager = manager
self._lock = Lock()
def generate_password(
self,
length: int,
*,
include_special_chars: bool | None = None,
allowed_special_chars: str | None = None,
special_mode: str | None = None,
exclude_ambiguous: bool | None = None,
min_uppercase: int | None = None,
min_lowercase: int | None = None,
min_digits: int | None = None,
min_special: int | None = None,
) -> str:
with self._lock:
pg = self._manager.password_generator
base_policy = getattr(pg, "policy", None)
overrides: dict[str, Any] = {}
if include_special_chars is not None:
overrides["include_special_chars"] = include_special_chars
if allowed_special_chars is not None:
overrides["allowed_special_chars"] = allowed_special_chars
if special_mode is not None:
overrides["special_mode"] = special_mode
if exclude_ambiguous is not None:
overrides["exclude_ambiguous"] = exclude_ambiguous
if min_uppercase is not None:
overrides["min_uppercase"] = int(min_uppercase)
if min_lowercase is not None:
overrides["min_lowercase"] = int(min_lowercase)
if min_digits is not None:
overrides["min_digits"] = int(min_digits)
if min_special is not None:
overrides["min_special"] = int(min_special)
if base_policy is not None and overrides:
pg.policy = dataclasses.replace(
base_policy,
**{k: overrides[k] for k in overrides if hasattr(base_policy, k)},
)
try:
return pg.generate_password(length)
finally:
pg.policy = base_policy
return pg.generate_password(length)
def verify_checksum(self) -> None:
with self._lock:
self._manager.handle_verify_checksum()
def update_checksum(self) -> None:
with self._lock:
self._manager.handle_update_script_checksum()
class NostrService:
"""Nostr related helper methods."""
def __init__(self, manager: PasswordManager) -> None:
self._manager = manager
self._lock = Lock()
def get_pubkey(self) -> str:
with self._lock:
return self._manager.nostr_client.key_manager.get_npub()
def list_relays(self) -> list[str]:
with self._lock:
return self._manager.state_manager.list_relays()
def add_relay(self, url: str) -> None:
with self._lock:
self._manager.state_manager.add_relay(url)
self._manager.nostr_client.relays = (
self._manager.state_manager.list_relays()
)
def remove_relay(self, idx: int) -> None:
with self._lock:
self._manager.state_manager.remove_relay(idx)
self._manager.nostr_client.relays = (
self._manager.state_manager.list_relays()
)