Refactor CLI to use service layer

This commit is contained in:
thePR0M3TH3AN
2025-07-17 20:42:23 -04:00
parent e7021f480f
commit 5019c99c11
2 changed files with 414 additions and 110 deletions

View File

@@ -10,6 +10,10 @@ from seedpass.core.api import (
VaultService,
ProfileService,
SyncService,
EntryService,
ConfigService,
UtilityService,
NostrService,
VaultExportRequest,
VaultImportRequest,
ChangePasswordRequest,
@@ -73,6 +77,26 @@ def _get_services(
return VaultService(pm), ProfileService(pm), SyncService(pm)
def _get_entry_service(ctx: typer.Context) -> EntryService:
pm = _get_pm(ctx)
return EntryService(pm)
def _get_config_service(ctx: typer.Context) -> ConfigService:
pm = _get_pm(ctx)
return ConfigService(pm)
def _get_util_service(ctx: typer.Context) -> UtilityService:
pm = _get_pm(ctx)
return UtilityService(pm)
def _get_nostr_service(ctx: typer.Context) -> NostrService:
pm = _get_pm(ctx)
return NostrService(pm)
@app.callback(invoke_without_command=True)
def main(ctx: typer.Context, fingerprint: Optional[str] = fingerprint_option) -> None:
"""SeedPass CLI entry point.
@@ -95,8 +119,8 @@ def entry_list(
archived: bool = typer.Option(False, "--archived", help="Include archived"),
) -> None:
"""List entries in the vault."""
pm = _get_pm(ctx)
entries = pm.entry_manager.list_entries(
service = _get_entry_service(ctx)
entries = service.list_entries(
sort_by=sort, filter_kind=kind, include_archived=archived
)
for idx, label, username, url, is_archived in entries:
@@ -113,8 +137,8 @@ def entry_list(
@entry_app.command("search")
def entry_search(ctx: typer.Context, query: str) -> None:
"""Search entries."""
pm = _get_pm(ctx)
results = pm.entry_manager.search_entries(query)
service = _get_entry_service(ctx)
results = service.search_entries(query)
if not results:
typer.echo("No matching entries found")
return
@@ -130,8 +154,8 @@ def entry_search(ctx: typer.Context, query: str) -> None:
@entry_app.command("get")
def entry_get(ctx: typer.Context, query: str) -> None:
"""Retrieve a single entry's secret."""
pm = _get_pm(ctx)
matches = pm.entry_manager.search_entries(query)
service = _get_entry_service(ctx)
matches = service.search_entries(query)
if len(matches) == 0:
typer.echo("No matching entries found")
raise typer.Exit(code=1)
@@ -145,14 +169,14 @@ def entry_get(ctx: typer.Context, query: str) -> None:
raise typer.Exit(code=1)
index = matches[0][0]
entry = pm.entry_manager.retrieve_entry(index)
entry = service.retrieve_entry(index)
etype = entry.get("type", entry.get("kind"))
if etype == EntryType.PASSWORD.value:
length = int(entry.get("length", 12))
password = pm.password_generator.generate_password(length, index)
password = service.generate_password(length, index)
typer.echo(password)
elif etype == EntryType.TOTP.value:
code = pm.entry_manager.get_totp_code(index, pm.parent_seed)
code = service.get_totp_code(index)
typer.echo(code)
else:
typer.echo("Unsupported entry type")
@@ -168,10 +192,9 @@ def entry_add(
url: Optional[str] = typer.Option(None, "--url"),
) -> None:
"""Add a new password entry and output its index."""
pm = _get_pm(ctx)
index = pm.entry_manager.add_entry(label, length, username, url)
service = _get_entry_service(ctx)
index = service.add_entry(label, length, username, url)
typer.echo(str(index))
pm.sync_vault()
@entry_app.command("add-totp")
@@ -184,17 +207,15 @@ def entry_add_totp(
digits: int = typer.Option(6, "--digits", help="Number of TOTP digits"),
) -> None:
"""Add a TOTP entry and output the otpauth URI."""
pm = _get_pm(ctx)
uri = pm.entry_manager.add_totp(
service = _get_entry_service(ctx)
uri = service.add_totp(
label,
pm.parent_seed,
index=index,
secret=secret,
period=period,
digits=digits,
)
typer.echo(uri)
pm.sync_vault()
@entry_app.command("add-ssh")
@@ -205,15 +226,13 @@ def entry_add_ssh(
notes: str = typer.Option("", "--notes", help="Entry notes"),
) -> None:
"""Add an SSH key entry and output its index."""
pm = _get_pm(ctx)
idx = pm.entry_manager.add_ssh_key(
service = _get_entry_service(ctx)
idx = service.add_ssh_key(
label,
pm.parent_seed,
index=index,
notes=notes,
)
typer.echo(str(idx))
pm.sync_vault()
@entry_app.command("add-pgp")
@@ -226,17 +245,15 @@ def entry_add_pgp(
notes: str = typer.Option("", "--notes", help="Entry notes"),
) -> None:
"""Add a PGP key entry and output its index."""
pm = _get_pm(ctx)
idx = pm.entry_manager.add_pgp_key(
service = _get_entry_service(ctx)
idx = service.add_pgp_key(
label,
pm.parent_seed,
index=index,
key_type=key_type,
user_id=user_id,
notes=notes,
)
typer.echo(str(idx))
pm.sync_vault()
@entry_app.command("add-nostr")
@@ -247,14 +264,13 @@ def entry_add_nostr(
notes: str = typer.Option("", "--notes", help="Entry notes"),
) -> None:
"""Add a Nostr key entry and output its index."""
pm = _get_pm(ctx)
idx = pm.entry_manager.add_nostr_key(
service = _get_entry_service(ctx)
idx = service.add_nostr_key(
label,
index=index,
notes=notes,
)
typer.echo(str(idx))
pm.sync_vault()
@entry_app.command("add-seed")
@@ -266,16 +282,14 @@ def entry_add_seed(
notes: str = typer.Option("", "--notes", help="Entry notes"),
) -> None:
"""Add a derived seed phrase entry and output its index."""
pm = _get_pm(ctx)
idx = pm.entry_manager.add_seed(
service = _get_entry_service(ctx)
idx = service.add_seed(
label,
pm.parent_seed,
index=index,
words_num=words,
words=words,
notes=notes,
)
typer.echo(str(idx))
pm.sync_vault()
@entry_app.command("add-key-value")
@@ -286,10 +300,9 @@ def entry_add_key_value(
notes: str = typer.Option("", "--notes", help="Entry notes"),
) -> None:
"""Add a key/value entry and output its index."""
pm = _get_pm(ctx)
idx = pm.entry_manager.add_key_value(label, value, notes=notes)
service = _get_entry_service(ctx)
idx = service.add_key_value(label, value, notes=notes)
typer.echo(str(idx))
pm.sync_vault()
@entry_app.command("add-managed-account")
@@ -300,15 +313,13 @@ def entry_add_managed_account(
notes: str = typer.Option("", "--notes", help="Entry notes"),
) -> None:
"""Add a managed account seed entry and output its index."""
pm = _get_pm(ctx)
idx = pm.entry_manager.add_managed_account(
service = _get_entry_service(ctx)
idx = service.add_managed_account(
label,
pm.parent_seed,
index=index,
notes=notes,
)
typer.echo(str(idx))
pm.sync_vault()
@entry_app.command("modify")
@@ -326,9 +337,9 @@ def entry_modify(
value: Optional[str] = typer.Option(None, "--value", help="New value"),
) -> None:
"""Modify an existing entry."""
pm = _get_pm(ctx)
service = _get_entry_service(ctx)
try:
pm.entry_manager.modify_entry(
service.modify_entry(
entry_id,
username=username,
url=url,
@@ -341,32 +352,29 @@ def entry_modify(
except ValueError as e:
typer.echo(str(e))
raise typer.Exit(code=1)
pm.sync_vault()
@entry_app.command("archive")
def entry_archive(ctx: typer.Context, entry_id: int) -> None:
"""Archive an entry."""
pm = _get_pm(ctx)
pm.entry_manager.archive_entry(entry_id)
service = _get_entry_service(ctx)
service.archive_entry(entry_id)
typer.echo(str(entry_id))
pm.sync_vault()
@entry_app.command("unarchive")
def entry_unarchive(ctx: typer.Context, entry_id: int) -> None:
"""Restore an archived entry."""
pm = _get_pm(ctx)
pm.entry_manager.restore_entry(entry_id)
service = _get_entry_service(ctx)
service.restore_entry(entry_id)
typer.echo(str(entry_id))
pm.sync_vault()
@entry_app.command("totp-codes")
def entry_totp_codes(ctx: typer.Context) -> None:
"""Display all current TOTP codes."""
pm = _get_pm(ctx)
pm.handle_display_totp_codes()
service = _get_entry_service(ctx)
service.display_totp_codes()
@entry_app.command("export-totp")
@@ -374,8 +382,8 @@ def entry_export_totp(
ctx: typer.Context, file: str = typer.Option(..., help="Output file")
) -> None:
"""Export all TOTP secrets to a JSON file."""
pm = _get_pm(ctx)
data = pm.entry_manager.export_totp_entries(pm.parent_seed)
service = _get_entry_service(ctx)
data = service.export_totp_entries()
Path(file).write_text(json.dumps(data, indent=2))
typer.echo(str(file))
@@ -478,16 +486,16 @@ def nostr_sync(ctx: typer.Context) -> None:
@nostr_app.command("get-pubkey")
def nostr_get_pubkey(ctx: typer.Context) -> None:
"""Display the active profile's npub."""
pm = _get_pm(ctx)
npub = pm.nostr_client.key_manager.get_npub()
service = _get_nostr_service(ctx)
npub = service.get_pubkey()
typer.echo(npub)
@config_app.command("get")
def config_get(ctx: typer.Context, key: str) -> None:
"""Get a configuration value."""
pm = _get_pm(ctx)
value = pm.config_manager.load_config(require_pin=False).get(key)
service = _get_config_service(ctx)
value = service.get(key)
if value is None:
typer.echo("Key not found")
else:
@@ -497,43 +505,18 @@ def config_get(ctx: typer.Context, key: str) -> None:
@config_app.command("set")
def config_set(ctx: typer.Context, key: str, value: str) -> None:
"""Set a configuration value."""
pm = _get_pm(ctx)
cfg = pm.config_manager
mapping = {
"inactivity_timeout": lambda v: cfg.set_inactivity_timeout(float(v)),
"secret_mode_enabled": lambda v: cfg.set_secret_mode_enabled(
v.lower() in ("1", "true", "yes", "y", "on")
),
"clipboard_clear_delay": lambda v: cfg.set_clipboard_clear_delay(int(v)),
"additional_backup_path": lambda v: cfg.set_additional_backup_path(v or None),
"relays": lambda v: cfg.set_relays(
[r.strip() for r in v.split(",") if r.strip()], require_pin=False
),
"kdf_iterations": lambda v: cfg.set_kdf_iterations(int(v)),
"kdf_mode": lambda v: cfg.set_kdf_mode(v),
"backup_interval": lambda v: cfg.set_backup_interval(float(v)),
"nostr_max_retries": lambda v: cfg.set_nostr_max_retries(int(v)),
"nostr_retry_delay": lambda v: cfg.set_nostr_retry_delay(float(v)),
"min_uppercase": lambda v: cfg.set_min_uppercase(int(v)),
"min_lowercase": lambda v: cfg.set_min_lowercase(int(v)),
"min_digits": lambda v: cfg.set_min_digits(int(v)),
"min_special": lambda v: cfg.set_min_special(int(v)),
"quick_unlock": lambda v: cfg.set_quick_unlock(
v.lower() in ("1", "true", "yes", "y", "on")
),
"verbose_timing": lambda v: cfg.set_verbose_timing(
v.lower() in ("1", "true", "yes", "y", "on")
),
}
action = mapping.get(key)
if action is None:
typer.echo("Unknown key")
raise typer.Exit(code=1)
service = _get_config_service(ctx)
try:
action(value)
val = (
[r.strip() for r in value.split(",") if r.strip()]
if key == "relays"
else value
)
service.set(key, val)
except KeyError:
typer.echo("Unknown key")
raise typer.Exit(code=1)
except Exception as exc: # pragma: no cover - pass through errors
typer.echo(f"Error: {exc}")
raise typer.Exit(code=1)
@@ -544,11 +527,10 @@ def config_set(ctx: typer.Context, key: str, value: str) -> None:
@config_app.command("toggle-secret-mode")
def config_toggle_secret_mode(ctx: typer.Context) -> None:
"""Interactively enable or disable secret mode."""
pm = _get_pm(ctx)
cfg = pm.config_manager
service = _get_config_service(ctx)
try:
enabled = cfg.get_secret_mode_enabled()
delay = cfg.get_clipboard_clear_delay()
enabled = service.get_secret_mode_enabled()
delay = service.get_clipboard_clear_delay()
except Exception as exc: # pragma: no cover - pass through errors
typer.echo(f"Error loading settings: {exc}")
raise typer.Exit(code=1)
@@ -580,10 +562,7 @@ def config_toggle_secret_mode(ctx: typer.Context) -> None:
raise typer.Exit(code=1)
try:
cfg.set_secret_mode_enabled(enabled)
cfg.set_clipboard_clear_delay(delay)
pm.secret_mode_enabled = enabled
pm.clipboard_clear_delay = delay
service.set_secret_mode(enabled, delay)
except Exception as exc: # pragma: no cover - pass through errors
typer.echo(f"Error: {exc}")
raise typer.Exit(code=1)
@@ -595,10 +574,9 @@ def config_toggle_secret_mode(ctx: typer.Context) -> None:
@config_app.command("toggle-offline")
def config_toggle_offline(ctx: typer.Context) -> None:
"""Enable or disable offline mode."""
pm = _get_pm(ctx)
cfg = pm.config_manager
service = _get_config_service(ctx)
try:
enabled = cfg.get_offline_mode()
enabled = service.get_offline_mode()
except Exception as exc: # pragma: no cover - pass through errors
typer.echo(f"Error loading settings: {exc}")
raise typer.Exit(code=1)
@@ -617,8 +595,7 @@ def config_toggle_offline(ctx: typer.Context) -> None:
enabled = False
try:
cfg.set_offline_mode(enabled)
pm.offline_mode = enabled
service.set_offline_mode(enabled)
except Exception as exc: # pragma: no cover - pass through errors
typer.echo(f"Error: {exc}")
raise typer.Exit(code=1)
@@ -659,23 +636,23 @@ def fingerprint_switch(ctx: typer.Context, fingerprint: str) -> None:
@util_app.command("generate-password")
def generate_password(ctx: typer.Context, length: int = 24) -> None:
"""Generate a strong password."""
pm = _get_pm(ctx)
password = pm.password_generator.generate_password(length)
service = _get_util_service(ctx)
password = service.generate_password(length)
typer.echo(password)
@util_app.command("verify-checksum")
def verify_checksum(ctx: typer.Context) -> None:
"""Verify the SeedPass script checksum."""
pm = _get_pm(ctx)
pm.handle_verify_checksum()
service = _get_util_service(ctx)
service.verify_checksum()
@util_app.command("update-checksum")
def update_checksum(ctx: typer.Context) -> None:
"""Regenerate the script checksum file."""
pm = _get_pm(ctx)
pm.handle_update_script_checksum()
service = _get_util_service(ctx)
service.update_checksum()
@api_app.command("start")

View File

@@ -194,3 +194,330 @@ class SyncService:
with self._lock:
self._manager.start_background_vault_sync(summary)
class EntryService:
"""Thread-safe wrapper around entry operations."""
def __init__(self, manager: PasswordManager) -> None:
self._manager = manager
self._lock = Lock()
def list_entries(
self,
sort_by: str = "index",
filter_kind: str | None = None,
include_archived: bool = False,
):
with self._lock:
return self._manager.entry_manager.list_entries(
sort_by=sort_by,
filter_kind=filter_kind,
include_archived=include_archived,
)
def search_entries(self, query: str):
with self._lock:
return self._manager.entry_manager.search_entries(query)
def retrieve_entry(self, entry_id: int):
with self._lock:
return self._manager.entry_manager.retrieve_entry(entry_id)
def generate_password(self, length: int, index: int) -> str:
with self._lock:
return self._manager.password_generator.generate_password(length, index)
def get_totp_code(self, entry_id: int) -> str:
with self._lock:
return self._manager.entry_manager.get_totp_code(
entry_id, self._manager.parent_seed
)
def add_entry(
self,
label: str,
length: int,
username: str | None = None,
url: str | None = None,
) -> int:
with self._lock:
idx = self._manager.entry_manager.add_entry(label, length, username, url)
self._manager.sync_vault()
return idx
def add_totp(
self,
label: str,
*,
index: int | None = None,
secret: str | None = None,
period: int = 30,
digits: int = 6,
) -> str:
with self._lock:
uri = self._manager.entry_manager.add_totp(
label,
self._manager.parent_seed,
index=index,
secret=secret,
period=period,
digits=digits,
)
self._manager.sync_vault()
return uri
def add_ssh_key(
self,
label: str,
*,
index: int | None = None,
notes: str = "",
) -> int:
with self._lock:
idx = self._manager.entry_manager.add_ssh_key(
label,
self._manager.parent_seed,
index=index,
notes=notes,
)
self._manager.sync_vault()
return idx
def add_pgp_key(
self,
label: str,
*,
index: int | None = None,
key_type: str = "ed25519",
user_id: str = "",
notes: str = "",
) -> int:
with self._lock:
idx = self._manager.entry_manager.add_pgp_key(
label,
self._manager.parent_seed,
index=index,
key_type=key_type,
user_id=user_id,
notes=notes,
)
self._manager.sync_vault()
return idx
def add_nostr_key(
self,
label: str,
*,
index: int | None = None,
notes: str = "",
) -> int:
with self._lock:
idx = self._manager.entry_manager.add_nostr_key(
label,
index=index,
notes=notes,
)
self._manager.sync_vault()
return idx
def add_seed(
self,
label: str,
*,
index: int | None = None,
words: int = 24,
notes: str = "",
) -> int:
with self._lock:
idx = self._manager.entry_manager.add_seed(
label,
self._manager.parent_seed,
index=index,
words_num=words,
notes=notes,
)
self._manager.sync_vault()
return idx
def add_key_value(self, label: str, value: str, *, notes: str = "") -> int:
with self._lock:
idx = self._manager.entry_manager.add_key_value(label, value, notes=notes)
self._manager.sync_vault()
return idx
def add_managed_account(
self,
label: str,
*,
index: int | None = None,
notes: str = "",
) -> int:
with self._lock:
idx = self._manager.entry_manager.add_managed_account(
label,
self._manager.parent_seed,
index=index,
notes=notes,
)
self._manager.sync_vault()
return idx
def modify_entry(
self,
entry_id: int,
*,
username: str | None = None,
url: str | None = None,
notes: str | None = None,
label: str | None = None,
period: int | None = None,
digits: int | None = None,
value: str | None = None,
) -> None:
with self._lock:
self._manager.entry_manager.modify_entry(
entry_id,
username=username,
url=url,
notes=notes,
label=label,
period=period,
digits=digits,
value=value,
)
self._manager.sync_vault()
def archive_entry(self, entry_id: int) -> None:
with self._lock:
self._manager.entry_manager.archive_entry(entry_id)
self._manager.sync_vault()
def restore_entry(self, entry_id: int) -> None:
with self._lock:
self._manager.entry_manager.restore_entry(entry_id)
self._manager.sync_vault()
def export_totp_entries(self) -> dict:
with self._lock:
return self._manager.entry_manager.export_totp_entries(
self._manager.parent_seed
)
def display_totp_codes(self) -> None:
with self._lock:
self._manager.handle_display_totp_codes()
class ConfigService:
"""Thread-safe wrapper around configuration access."""
def __init__(self, manager: PasswordManager) -> None:
self._manager = manager
self._lock = Lock()
def get(self, key: str):
with self._lock:
return self._manager.config_manager.load_config(require_pin=False).get(key)
def set(self, key: str, value: str) -> None:
cfg = self._manager.config_manager
mapping = {
"inactivity_timeout": ("set_inactivity_timeout", float),
"secret_mode_enabled": (
"set_secret_mode_enabled",
lambda v: v.lower() in ("1", "true", "yes", "y", "on"),
),
"clipboard_clear_delay": ("set_clipboard_clear_delay", int),
"additional_backup_path": (
"set_additional_backup_path",
lambda v: v or None,
),
"relays": ("set_relays", lambda v: (v, {"require_pin": False})),
"kdf_iterations": ("set_kdf_iterations", int),
"kdf_mode": ("set_kdf_mode", lambda v: v),
"backup_interval": ("set_backup_interval", float),
"nostr_max_retries": ("set_nostr_max_retries", int),
"nostr_retry_delay": ("set_nostr_retry_delay", float),
"min_uppercase": ("set_min_uppercase", int),
"min_lowercase": ("set_min_lowercase", int),
"min_digits": ("set_min_digits", int),
"min_special": ("set_min_special", int),
"quick_unlock": (
"set_quick_unlock",
lambda v: v.lower() in ("1", "true", "yes", "y", "on"),
),
}
entry = mapping.get(key)
if entry is None:
raise KeyError(key)
method_name, conv = entry
with self._lock:
result = conv(value)
if (
isinstance(result, tuple)
and len(result) == 2
and isinstance(result[1], dict)
):
arg, kwargs = result
getattr(cfg, method_name)(arg, **kwargs)
else:
getattr(cfg, method_name)(result)
def get_secret_mode_enabled(self) -> bool:
with self._lock:
return self._manager.config_manager.get_secret_mode_enabled()
def get_clipboard_clear_delay(self) -> int:
with self._lock:
return self._manager.config_manager.get_clipboard_clear_delay()
def set_secret_mode(self, enabled: bool, delay: int) -> None:
with self._lock:
cfg = self._manager.config_manager
cfg.set_secret_mode_enabled(enabled)
cfg.set_clipboard_clear_delay(delay)
self._manager.secret_mode_enabled = enabled
self._manager.clipboard_clear_delay = delay
def get_offline_mode(self) -> bool:
with self._lock:
return self._manager.config_manager.get_offline_mode()
def set_offline_mode(self, enabled: bool) -> None:
with self._lock:
cfg = self._manager.config_manager
cfg.set_offline_mode(enabled)
self._manager.offline_mode = enabled
class UtilityService:
"""Miscellaneous helper operations."""
def __init__(self, manager: PasswordManager) -> None:
self._manager = manager
self._lock = Lock()
def generate_password(self, length: int) -> str:
with self._lock:
return self._manager.password_generator.generate_password(length)
def verify_checksum(self) -> None:
with self._lock:
self._manager.handle_verify_checksum()
def update_checksum(self) -> None:
with self._lock:
self._manager.handle_update_script_checksum()
class NostrService:
"""Nostr related helper methods."""
def __init__(self, manager: PasswordManager) -> None:
self._manager = manager
self._lock = Lock()
def get_pubkey(self) -> str:
with self._lock:
return self._manager.nostr_client.key_manager.get_npub()