mirror of
https://github.com/PR0M3TH3AN/SeedPass.git
synced 2025-09-08 07:18:47 +00:00
587 lines
17 KiB
Python
587 lines
17 KiB
Python
from __future__ import annotations
|
|
|
|
"""Service layer wrapping :class:`PasswordManager` operations.
|
|
|
|
These services provide thread-safe methods for common operations used by the CLI
|
|
and API. Request and response payloads are represented using Pydantic models to
|
|
allow easy validation and documentation.
|
|
"""
|
|
|
|
from pathlib import Path
|
|
from threading import Lock
|
|
from typing import List, Optional, Dict
|
|
import json
|
|
|
|
from pydantic import BaseModel
|
|
|
|
from .manager import PasswordManager
|
|
from .pubsub import bus
|
|
|
|
|
|
class VaultExportRequest(BaseModel):
|
|
"""Parameters required to export the vault."""
|
|
|
|
path: Path
|
|
|
|
|
|
class VaultExportResponse(BaseModel):
|
|
"""Result of a vault export operation."""
|
|
|
|
path: Path
|
|
|
|
|
|
class VaultImportRequest(BaseModel):
|
|
"""Parameters required to import a vault."""
|
|
|
|
path: Path
|
|
|
|
|
|
class ChangePasswordRequest(BaseModel):
|
|
"""Payload for :meth:`VaultService.change_password`."""
|
|
|
|
old_password: str
|
|
new_password: str
|
|
|
|
|
|
class UnlockRequest(BaseModel):
|
|
"""Payload for unlocking the vault."""
|
|
|
|
password: str
|
|
|
|
|
|
class UnlockResponse(BaseModel):
|
|
"""Duration taken to unlock the vault."""
|
|
|
|
duration: float
|
|
|
|
|
|
class BackupParentSeedRequest(BaseModel):
|
|
"""Optional path to write the encrypted seed backup."""
|
|
|
|
path: Optional[Path] = None
|
|
password: Optional[str] = None
|
|
|
|
|
|
class ProfileSwitchRequest(BaseModel):
|
|
"""Select a different seed profile."""
|
|
|
|
fingerprint: str
|
|
password: Optional[str] = None
|
|
|
|
|
|
class ProfileRemoveRequest(BaseModel):
|
|
"""Remove a seed profile."""
|
|
|
|
fingerprint: str
|
|
|
|
|
|
class SyncResponse(BaseModel):
|
|
"""Information about uploaded events after syncing."""
|
|
|
|
manifest_id: str
|
|
chunk_ids: List[str] = []
|
|
delta_ids: List[str] = []
|
|
|
|
|
|
class VaultService:
|
|
"""Thread-safe wrapper around vault operations."""
|
|
|
|
def __init__(self, manager: PasswordManager) -> None:
|
|
self._manager = manager
|
|
self._lock = Lock()
|
|
|
|
def export_vault(self, req: VaultExportRequest) -> VaultExportResponse:
|
|
"""Export the vault to ``req.path``."""
|
|
|
|
with self._lock:
|
|
self._manager.handle_export_database(req.path)
|
|
return VaultExportResponse(path=req.path)
|
|
|
|
def import_vault(self, req: VaultImportRequest) -> None:
|
|
"""Import the vault from ``req.path`` and sync."""
|
|
|
|
with self._lock:
|
|
self._manager.handle_import_database(req.path)
|
|
self._manager.sync_vault()
|
|
|
|
def export_profile(self) -> bytes:
|
|
"""Return encrypted profile data for backup."""
|
|
|
|
with self._lock:
|
|
data = self._manager.vault.load_index()
|
|
payload = json.dumps(data, sort_keys=True, separators=(",", ":")).encode(
|
|
"utf-8"
|
|
)
|
|
return self._manager.vault.encryption_manager.encrypt_data(payload)
|
|
|
|
def import_profile(self, data: bytes) -> None:
|
|
"""Restore a profile from ``data`` and sync."""
|
|
|
|
with self._lock:
|
|
decrypted = self._manager.vault.encryption_manager.decrypt_data(data)
|
|
index = json.loads(decrypted.decode("utf-8"))
|
|
self._manager.vault.save_index(index)
|
|
self._manager.sync_vault()
|
|
|
|
def change_password(self, req: ChangePasswordRequest) -> None:
|
|
"""Change the master password."""
|
|
|
|
with self._lock:
|
|
self._manager.change_password(req.old_password, req.new_password)
|
|
|
|
def unlock(self, req: UnlockRequest) -> UnlockResponse:
|
|
"""Unlock the vault and return the duration."""
|
|
|
|
with self._lock:
|
|
duration = self._manager.unlock_vault(req.password)
|
|
return UnlockResponse(duration=duration)
|
|
|
|
def lock(self) -> None:
|
|
"""Lock the vault and clear sensitive data."""
|
|
|
|
with self._lock:
|
|
self._manager.lock_vault()
|
|
|
|
def backup_parent_seed(self, req: BackupParentSeedRequest) -> None:
|
|
"""Backup and reveal the parent seed."""
|
|
|
|
with self._lock:
|
|
self._manager.handle_backup_reveal_parent_seed(
|
|
req.path, password=req.password
|
|
)
|
|
|
|
def stats(self) -> Dict:
|
|
"""Return statistics about the current profile."""
|
|
|
|
with self._lock:
|
|
return self._manager.get_profile_stats()
|
|
|
|
|
|
class ProfileService:
|
|
"""Thread-safe wrapper around profile management operations."""
|
|
|
|
def __init__(self, manager: PasswordManager) -> None:
|
|
self._manager = manager
|
|
self._lock = Lock()
|
|
|
|
def list_profiles(self) -> List[str]:
|
|
"""List available seed profiles."""
|
|
|
|
with self._lock:
|
|
return list(self._manager.fingerprint_manager.list_fingerprints())
|
|
|
|
def add_profile(self) -> Optional[str]:
|
|
"""Create a new seed profile and return its fingerprint if available."""
|
|
|
|
with self._lock:
|
|
self._manager.add_new_fingerprint()
|
|
return getattr(
|
|
self._manager.fingerprint_manager, "current_fingerprint", None
|
|
)
|
|
|
|
def remove_profile(self, req: ProfileRemoveRequest) -> None:
|
|
"""Remove the specified seed profile."""
|
|
|
|
with self._lock:
|
|
self._manager.fingerprint_manager.remove_fingerprint(req.fingerprint)
|
|
|
|
def switch_profile(self, req: ProfileSwitchRequest) -> None:
|
|
"""Switch to ``req.fingerprint``."""
|
|
|
|
with self._lock:
|
|
self._manager.select_fingerprint(req.fingerprint, password=req.password)
|
|
|
|
|
|
class SyncService:
|
|
"""Thread-safe wrapper around vault synchronization."""
|
|
|
|
def __init__(self, manager: PasswordManager) -> None:
|
|
self._manager = manager
|
|
self._lock = Lock()
|
|
|
|
def sync(self) -> Optional[SyncResponse]:
|
|
"""Publish the vault to Nostr and return event info."""
|
|
|
|
with self._lock:
|
|
bus.publish("sync_started")
|
|
result = self._manager.sync_vault()
|
|
bus.publish("sync_finished", result)
|
|
if not result:
|
|
return None
|
|
return SyncResponse(**result)
|
|
|
|
def start_background_sync(self) -> None:
|
|
"""Begin background synchronization if possible."""
|
|
|
|
with self._lock:
|
|
self._manager.start_background_sync()
|
|
|
|
def start_background_vault_sync(self, summary: Optional[str] = None) -> None:
|
|
"""Publish the vault in a background thread."""
|
|
|
|
with self._lock:
|
|
self._manager.start_background_vault_sync(summary)
|
|
|
|
|
|
class EntryService:
|
|
"""Thread-safe wrapper around entry operations."""
|
|
|
|
def __init__(self, manager: PasswordManager) -> None:
|
|
self._manager = manager
|
|
self._lock = Lock()
|
|
|
|
def list_entries(
|
|
self,
|
|
sort_by: str = "index",
|
|
filter_kind: str | None = None,
|
|
include_archived: bool = False,
|
|
):
|
|
with self._lock:
|
|
return self._manager.entry_manager.list_entries(
|
|
sort_by=sort_by,
|
|
filter_kind=filter_kind,
|
|
include_archived=include_archived,
|
|
)
|
|
|
|
def search_entries(
|
|
self, query: str, kinds: list[str] | None = None
|
|
) -> list[tuple[int, str, str | None, str | None, bool]]:
|
|
"""Search entries optionally filtering by ``kinds``.
|
|
|
|
Parameters
|
|
----------
|
|
query:
|
|
Search string to match against entry metadata.
|
|
kinds:
|
|
Optional list of entry kinds to restrict the search.
|
|
"""
|
|
|
|
with self._lock:
|
|
return self._manager.entry_manager.search_entries(query, kinds=kinds)
|
|
|
|
def retrieve_entry(self, entry_id: int):
|
|
with self._lock:
|
|
return self._manager.entry_manager.retrieve_entry(entry_id)
|
|
|
|
def generate_password(self, length: int, index: int) -> str:
|
|
with self._lock:
|
|
return self._manager.password_generator.generate_password(length, index)
|
|
|
|
def get_totp_code(self, entry_id: int) -> str:
|
|
with self._lock:
|
|
return self._manager.entry_manager.get_totp_code(
|
|
entry_id, self._manager.parent_seed
|
|
)
|
|
|
|
def add_entry(
|
|
self,
|
|
label: str,
|
|
length: int,
|
|
username: str | None = None,
|
|
url: str | None = None,
|
|
) -> int:
|
|
with self._lock:
|
|
idx = self._manager.entry_manager.add_entry(label, length, username, url)
|
|
self._manager.start_background_vault_sync()
|
|
return idx
|
|
|
|
def add_totp(
|
|
self,
|
|
label: str,
|
|
*,
|
|
index: int | None = None,
|
|
secret: str | None = None,
|
|
period: int = 30,
|
|
digits: int = 6,
|
|
) -> str:
|
|
with self._lock:
|
|
uri = self._manager.entry_manager.add_totp(
|
|
label,
|
|
self._manager.parent_seed,
|
|
index=index,
|
|
secret=secret,
|
|
period=period,
|
|
digits=digits,
|
|
)
|
|
self._manager.start_background_vault_sync()
|
|
return uri
|
|
|
|
def add_ssh_key(
|
|
self,
|
|
label: str,
|
|
*,
|
|
index: int | None = None,
|
|
notes: str = "",
|
|
) -> int:
|
|
with self._lock:
|
|
idx = self._manager.entry_manager.add_ssh_key(
|
|
label,
|
|
self._manager.parent_seed,
|
|
index=index,
|
|
notes=notes,
|
|
)
|
|
self._manager.start_background_vault_sync()
|
|
return idx
|
|
|
|
def add_pgp_key(
|
|
self,
|
|
label: str,
|
|
*,
|
|
index: int | None = None,
|
|
key_type: str = "ed25519",
|
|
user_id: str = "",
|
|
notes: str = "",
|
|
) -> int:
|
|
with self._lock:
|
|
idx = self._manager.entry_manager.add_pgp_key(
|
|
label,
|
|
self._manager.parent_seed,
|
|
index=index,
|
|
key_type=key_type,
|
|
user_id=user_id,
|
|
notes=notes,
|
|
)
|
|
self._manager.start_background_vault_sync()
|
|
return idx
|
|
|
|
def add_nostr_key(
|
|
self,
|
|
label: str,
|
|
*,
|
|
index: int | None = None,
|
|
notes: str = "",
|
|
) -> int:
|
|
with self._lock:
|
|
idx = self._manager.entry_manager.add_nostr_key(
|
|
label,
|
|
index=index,
|
|
notes=notes,
|
|
)
|
|
self._manager.start_background_vault_sync()
|
|
return idx
|
|
|
|
def add_seed(
|
|
self,
|
|
label: str,
|
|
*,
|
|
index: int | None = None,
|
|
words: int = 24,
|
|
notes: str = "",
|
|
) -> int:
|
|
with self._lock:
|
|
idx = self._manager.entry_manager.add_seed(
|
|
label,
|
|
self._manager.parent_seed,
|
|
index=index,
|
|
words_num=words,
|
|
notes=notes,
|
|
)
|
|
self._manager.start_background_vault_sync()
|
|
return idx
|
|
|
|
def add_key_value(
|
|
self, label: str, key: str, value: str, *, notes: str = ""
|
|
) -> int:
|
|
with self._lock:
|
|
idx = self._manager.entry_manager.add_key_value(
|
|
label, key, value, notes=notes
|
|
)
|
|
self._manager.start_background_vault_sync()
|
|
return idx
|
|
|
|
def add_managed_account(
|
|
self,
|
|
label: str,
|
|
*,
|
|
index: int | None = None,
|
|
notes: str = "",
|
|
) -> int:
|
|
with self._lock:
|
|
idx = self._manager.entry_manager.add_managed_account(
|
|
label,
|
|
self._manager.parent_seed,
|
|
index=index,
|
|
notes=notes,
|
|
)
|
|
self._manager.start_background_vault_sync()
|
|
return idx
|
|
|
|
def modify_entry(
|
|
self,
|
|
entry_id: int,
|
|
*,
|
|
username: str | None = None,
|
|
url: str | None = None,
|
|
notes: str | None = None,
|
|
label: str | None = None,
|
|
period: int | None = None,
|
|
digits: int | None = None,
|
|
key: str | None = None,
|
|
value: str | None = None,
|
|
) -> None:
|
|
with self._lock:
|
|
self._manager.entry_manager.modify_entry(
|
|
entry_id,
|
|
username=username,
|
|
url=url,
|
|
notes=notes,
|
|
label=label,
|
|
period=period,
|
|
digits=digits,
|
|
key=key,
|
|
value=value,
|
|
)
|
|
self._manager.start_background_vault_sync()
|
|
|
|
def archive_entry(self, entry_id: int) -> None:
|
|
with self._lock:
|
|
self._manager.entry_manager.archive_entry(entry_id)
|
|
self._manager.start_background_vault_sync()
|
|
|
|
def restore_entry(self, entry_id: int) -> None:
|
|
with self._lock:
|
|
self._manager.entry_manager.restore_entry(entry_id)
|
|
self._manager.start_background_vault_sync()
|
|
|
|
def export_totp_entries(self) -> dict:
|
|
with self._lock:
|
|
return self._manager.entry_manager.export_totp_entries(
|
|
self._manager.parent_seed
|
|
)
|
|
|
|
def display_totp_codes(self) -> None:
|
|
with self._lock:
|
|
self._manager.handle_display_totp_codes()
|
|
|
|
|
|
class ConfigService:
|
|
"""Thread-safe wrapper around configuration access."""
|
|
|
|
def __init__(self, manager: PasswordManager) -> None:
|
|
self._manager = manager
|
|
self._lock = Lock()
|
|
|
|
def get(self, key: str):
|
|
with self._lock:
|
|
return self._manager.config_manager.load_config(require_pin=False).get(key)
|
|
|
|
def set(self, key: str, value: str) -> None:
|
|
cfg = self._manager.config_manager
|
|
mapping = {
|
|
"inactivity_timeout": ("set_inactivity_timeout", float),
|
|
"secret_mode_enabled": (
|
|
"set_secret_mode_enabled",
|
|
lambda v: v.lower() in ("1", "true", "yes", "y", "on"),
|
|
),
|
|
"clipboard_clear_delay": ("set_clipboard_clear_delay", int),
|
|
"additional_backup_path": (
|
|
"set_additional_backup_path",
|
|
lambda v: v or None,
|
|
),
|
|
"relays": ("set_relays", lambda v: (v, {"require_pin": False})),
|
|
"kdf_iterations": ("set_kdf_iterations", int),
|
|
"kdf_mode": ("set_kdf_mode", lambda v: v),
|
|
"backup_interval": ("set_backup_interval", float),
|
|
"nostr_max_retries": ("set_nostr_max_retries", int),
|
|
"nostr_retry_delay": ("set_nostr_retry_delay", float),
|
|
"min_uppercase": ("set_min_uppercase", int),
|
|
"min_lowercase": ("set_min_lowercase", int),
|
|
"min_digits": ("set_min_digits", int),
|
|
"min_special": ("set_min_special", int),
|
|
"quick_unlock": (
|
|
"set_quick_unlock",
|
|
lambda v: v.lower() in ("1", "true", "yes", "y", "on"),
|
|
),
|
|
}
|
|
entry = mapping.get(key)
|
|
if entry is None:
|
|
raise KeyError(key)
|
|
method_name, conv = entry
|
|
with self._lock:
|
|
result = conv(value)
|
|
if (
|
|
isinstance(result, tuple)
|
|
and len(result) == 2
|
|
and isinstance(result[1], dict)
|
|
):
|
|
arg, kwargs = result
|
|
getattr(cfg, method_name)(arg, **kwargs)
|
|
else:
|
|
getattr(cfg, method_name)(result)
|
|
|
|
def get_secret_mode_enabled(self) -> bool:
|
|
with self._lock:
|
|
return self._manager.config_manager.get_secret_mode_enabled()
|
|
|
|
def get_clipboard_clear_delay(self) -> int:
|
|
with self._lock:
|
|
return self._manager.config_manager.get_clipboard_clear_delay()
|
|
|
|
def set_secret_mode(self, enabled: bool, delay: int) -> None:
|
|
with self._lock:
|
|
cfg = self._manager.config_manager
|
|
cfg.set_secret_mode_enabled(enabled)
|
|
cfg.set_clipboard_clear_delay(delay)
|
|
self._manager.secret_mode_enabled = enabled
|
|
self._manager.clipboard_clear_delay = delay
|
|
|
|
def get_offline_mode(self) -> bool:
|
|
with self._lock:
|
|
return self._manager.config_manager.get_offline_mode()
|
|
|
|
def set_offline_mode(self, enabled: bool) -> None:
|
|
with self._lock:
|
|
cfg = self._manager.config_manager
|
|
cfg.set_offline_mode(enabled)
|
|
self._manager.offline_mode = enabled
|
|
|
|
|
|
class UtilityService:
|
|
"""Miscellaneous helper operations."""
|
|
|
|
def __init__(self, manager: PasswordManager) -> None:
|
|
self._manager = manager
|
|
self._lock = Lock()
|
|
|
|
def generate_password(self, length: int) -> str:
|
|
with self._lock:
|
|
return self._manager.password_generator.generate_password(length)
|
|
|
|
def verify_checksum(self) -> None:
|
|
with self._lock:
|
|
self._manager.handle_verify_checksum()
|
|
|
|
def update_checksum(self) -> None:
|
|
with self._lock:
|
|
self._manager.handle_update_script_checksum()
|
|
|
|
|
|
class NostrService:
|
|
"""Nostr related helper methods."""
|
|
|
|
def __init__(self, manager: PasswordManager) -> None:
|
|
self._manager = manager
|
|
self._lock = Lock()
|
|
|
|
def get_pubkey(self) -> str:
|
|
with self._lock:
|
|
return self._manager.nostr_client.key_manager.get_npub()
|
|
|
|
def list_relays(self) -> list[str]:
|
|
with self._lock:
|
|
return self._manager.state_manager.list_relays()
|
|
|
|
def add_relay(self, url: str) -> None:
|
|
with self._lock:
|
|
self._manager.state_manager.add_relay(url)
|
|
self._manager.nostr_client.relays = (
|
|
self._manager.state_manager.list_relays()
|
|
)
|
|
|
|
def remove_relay(self, idx: int) -> None:
|
|
with self._lock:
|
|
self._manager.state_manager.remove_relay(idx)
|
|
self._manager.nostr_client.relays = (
|
|
self._manager.state_manager.list_relays()
|
|
)
|