From 5019c99c11320bd67d54433300b23050586ea481 Mon Sep 17 00:00:00 2001 From: thePR0M3TH3AN <53631862+PR0M3TH3AN@users.noreply.github.com> Date: Thu, 17 Jul 2025 20:42:23 -0400 Subject: [PATCH] Refactor CLI to use service layer --- src/seedpass/cli.py | 197 +++++++++++------------ src/seedpass/core/api.py | 327 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 414 insertions(+), 110 deletions(-) diff --git a/src/seedpass/cli.py b/src/seedpass/cli.py index 07da8e5..309b3ac 100644 --- a/src/seedpass/cli.py +++ b/src/seedpass/cli.py @@ -10,6 +10,10 @@ from seedpass.core.api import ( VaultService, ProfileService, SyncService, + EntryService, + ConfigService, + UtilityService, + NostrService, VaultExportRequest, VaultImportRequest, ChangePasswordRequest, @@ -73,6 +77,26 @@ def _get_services( return VaultService(pm), ProfileService(pm), SyncService(pm) +def _get_entry_service(ctx: typer.Context) -> EntryService: + pm = _get_pm(ctx) + return EntryService(pm) + + +def _get_config_service(ctx: typer.Context) -> ConfigService: + pm = _get_pm(ctx) + return ConfigService(pm) + + +def _get_util_service(ctx: typer.Context) -> UtilityService: + pm = _get_pm(ctx) + return UtilityService(pm) + + +def _get_nostr_service(ctx: typer.Context) -> NostrService: + pm = _get_pm(ctx) + return NostrService(pm) + + @app.callback(invoke_without_command=True) def main(ctx: typer.Context, fingerprint: Optional[str] = fingerprint_option) -> None: """SeedPass CLI entry point. @@ -95,8 +119,8 @@ def entry_list( archived: bool = typer.Option(False, "--archived", help="Include archived"), ) -> None: """List entries in the vault.""" - pm = _get_pm(ctx) - entries = pm.entry_manager.list_entries( + service = _get_entry_service(ctx) + entries = service.list_entries( sort_by=sort, filter_kind=kind, include_archived=archived ) for idx, label, username, url, is_archived in entries: @@ -113,8 +137,8 @@ def entry_list( @entry_app.command("search") def entry_search(ctx: typer.Context, query: str) -> None: """Search entries.""" - pm = _get_pm(ctx) - results = pm.entry_manager.search_entries(query) + service = _get_entry_service(ctx) + results = service.search_entries(query) if not results: typer.echo("No matching entries found") return @@ -130,8 +154,8 @@ def entry_search(ctx: typer.Context, query: str) -> None: @entry_app.command("get") def entry_get(ctx: typer.Context, query: str) -> None: """Retrieve a single entry's secret.""" - pm = _get_pm(ctx) - matches = pm.entry_manager.search_entries(query) + service = _get_entry_service(ctx) + matches = service.search_entries(query) if len(matches) == 0: typer.echo("No matching entries found") raise typer.Exit(code=1) @@ -145,14 +169,14 @@ def entry_get(ctx: typer.Context, query: str) -> None: raise typer.Exit(code=1) index = matches[0][0] - entry = pm.entry_manager.retrieve_entry(index) + entry = service.retrieve_entry(index) etype = entry.get("type", entry.get("kind")) if etype == EntryType.PASSWORD.value: length = int(entry.get("length", 12)) - password = pm.password_generator.generate_password(length, index) + password = service.generate_password(length, index) typer.echo(password) elif etype == EntryType.TOTP.value: - code = pm.entry_manager.get_totp_code(index, pm.parent_seed) + code = service.get_totp_code(index) typer.echo(code) else: typer.echo("Unsupported entry type") @@ -168,10 +192,9 @@ def entry_add( url: Optional[str] = typer.Option(None, "--url"), ) -> None: """Add a new password entry and output its index.""" - pm = _get_pm(ctx) - index = pm.entry_manager.add_entry(label, length, username, url) + service = _get_entry_service(ctx) + index = service.add_entry(label, length, username, url) typer.echo(str(index)) - pm.sync_vault() @entry_app.command("add-totp") @@ -184,17 +207,15 @@ def entry_add_totp( digits: int = typer.Option(6, "--digits", help="Number of TOTP digits"), ) -> None: """Add a TOTP entry and output the otpauth URI.""" - pm = _get_pm(ctx) - uri = pm.entry_manager.add_totp( + service = _get_entry_service(ctx) + uri = service.add_totp( label, - pm.parent_seed, index=index, secret=secret, period=period, digits=digits, ) typer.echo(uri) - pm.sync_vault() @entry_app.command("add-ssh") @@ -205,15 +226,13 @@ def entry_add_ssh( notes: str = typer.Option("", "--notes", help="Entry notes"), ) -> None: """Add an SSH key entry and output its index.""" - pm = _get_pm(ctx) - idx = pm.entry_manager.add_ssh_key( + service = _get_entry_service(ctx) + idx = service.add_ssh_key( label, - pm.parent_seed, index=index, notes=notes, ) typer.echo(str(idx)) - pm.sync_vault() @entry_app.command("add-pgp") @@ -226,17 +245,15 @@ def entry_add_pgp( notes: str = typer.Option("", "--notes", help="Entry notes"), ) -> None: """Add a PGP key entry and output its index.""" - pm = _get_pm(ctx) - idx = pm.entry_manager.add_pgp_key( + service = _get_entry_service(ctx) + idx = service.add_pgp_key( label, - pm.parent_seed, index=index, key_type=key_type, user_id=user_id, notes=notes, ) typer.echo(str(idx)) - pm.sync_vault() @entry_app.command("add-nostr") @@ -247,14 +264,13 @@ def entry_add_nostr( notes: str = typer.Option("", "--notes", help="Entry notes"), ) -> None: """Add a Nostr key entry and output its index.""" - pm = _get_pm(ctx) - idx = pm.entry_manager.add_nostr_key( + service = _get_entry_service(ctx) + idx = service.add_nostr_key( label, index=index, notes=notes, ) typer.echo(str(idx)) - pm.sync_vault() @entry_app.command("add-seed") @@ -266,16 +282,14 @@ def entry_add_seed( notes: str = typer.Option("", "--notes", help="Entry notes"), ) -> None: """Add a derived seed phrase entry and output its index.""" - pm = _get_pm(ctx) - idx = pm.entry_manager.add_seed( + service = _get_entry_service(ctx) + idx = service.add_seed( label, - pm.parent_seed, index=index, - words_num=words, + words=words, notes=notes, ) typer.echo(str(idx)) - pm.sync_vault() @entry_app.command("add-key-value") @@ -286,10 +300,9 @@ def entry_add_key_value( notes: str = typer.Option("", "--notes", help="Entry notes"), ) -> None: """Add a key/value entry and output its index.""" - pm = _get_pm(ctx) - idx = pm.entry_manager.add_key_value(label, value, notes=notes) + service = _get_entry_service(ctx) + idx = service.add_key_value(label, value, notes=notes) typer.echo(str(idx)) - pm.sync_vault() @entry_app.command("add-managed-account") @@ -300,15 +313,13 @@ def entry_add_managed_account( notes: str = typer.Option("", "--notes", help="Entry notes"), ) -> None: """Add a managed account seed entry and output its index.""" - pm = _get_pm(ctx) - idx = pm.entry_manager.add_managed_account( + service = _get_entry_service(ctx) + idx = service.add_managed_account( label, - pm.parent_seed, index=index, notes=notes, ) typer.echo(str(idx)) - pm.sync_vault() @entry_app.command("modify") @@ -326,9 +337,9 @@ def entry_modify( value: Optional[str] = typer.Option(None, "--value", help="New value"), ) -> None: """Modify an existing entry.""" - pm = _get_pm(ctx) + service = _get_entry_service(ctx) try: - pm.entry_manager.modify_entry( + service.modify_entry( entry_id, username=username, url=url, @@ -341,32 +352,29 @@ def entry_modify( except ValueError as e: typer.echo(str(e)) raise typer.Exit(code=1) - pm.sync_vault() @entry_app.command("archive") def entry_archive(ctx: typer.Context, entry_id: int) -> None: """Archive an entry.""" - pm = _get_pm(ctx) - pm.entry_manager.archive_entry(entry_id) + service = _get_entry_service(ctx) + service.archive_entry(entry_id) typer.echo(str(entry_id)) - pm.sync_vault() @entry_app.command("unarchive") def entry_unarchive(ctx: typer.Context, entry_id: int) -> None: """Restore an archived entry.""" - pm = _get_pm(ctx) - pm.entry_manager.restore_entry(entry_id) + service = _get_entry_service(ctx) + service.restore_entry(entry_id) typer.echo(str(entry_id)) - pm.sync_vault() @entry_app.command("totp-codes") def entry_totp_codes(ctx: typer.Context) -> None: """Display all current TOTP codes.""" - pm = _get_pm(ctx) - pm.handle_display_totp_codes() + service = _get_entry_service(ctx) + service.display_totp_codes() @entry_app.command("export-totp") @@ -374,8 +382,8 @@ def entry_export_totp( ctx: typer.Context, file: str = typer.Option(..., help="Output file") ) -> None: """Export all TOTP secrets to a JSON file.""" - pm = _get_pm(ctx) - data = pm.entry_manager.export_totp_entries(pm.parent_seed) + service = _get_entry_service(ctx) + data = service.export_totp_entries() Path(file).write_text(json.dumps(data, indent=2)) typer.echo(str(file)) @@ -478,16 +486,16 @@ def nostr_sync(ctx: typer.Context) -> None: @nostr_app.command("get-pubkey") def nostr_get_pubkey(ctx: typer.Context) -> None: """Display the active profile's npub.""" - pm = _get_pm(ctx) - npub = pm.nostr_client.key_manager.get_npub() + service = _get_nostr_service(ctx) + npub = service.get_pubkey() typer.echo(npub) @config_app.command("get") def config_get(ctx: typer.Context, key: str) -> None: """Get a configuration value.""" - pm = _get_pm(ctx) - value = pm.config_manager.load_config(require_pin=False).get(key) + service = _get_config_service(ctx) + value = service.get(key) if value is None: typer.echo("Key not found") else: @@ -497,43 +505,18 @@ def config_get(ctx: typer.Context, key: str) -> None: @config_app.command("set") def config_set(ctx: typer.Context, key: str, value: str) -> None: """Set a configuration value.""" - pm = _get_pm(ctx) - cfg = pm.config_manager - - mapping = { - "inactivity_timeout": lambda v: cfg.set_inactivity_timeout(float(v)), - "secret_mode_enabled": lambda v: cfg.set_secret_mode_enabled( - v.lower() in ("1", "true", "yes", "y", "on") - ), - "clipboard_clear_delay": lambda v: cfg.set_clipboard_clear_delay(int(v)), - "additional_backup_path": lambda v: cfg.set_additional_backup_path(v or None), - "relays": lambda v: cfg.set_relays( - [r.strip() for r in v.split(",") if r.strip()], require_pin=False - ), - "kdf_iterations": lambda v: cfg.set_kdf_iterations(int(v)), - "kdf_mode": lambda v: cfg.set_kdf_mode(v), - "backup_interval": lambda v: cfg.set_backup_interval(float(v)), - "nostr_max_retries": lambda v: cfg.set_nostr_max_retries(int(v)), - "nostr_retry_delay": lambda v: cfg.set_nostr_retry_delay(float(v)), - "min_uppercase": lambda v: cfg.set_min_uppercase(int(v)), - "min_lowercase": lambda v: cfg.set_min_lowercase(int(v)), - "min_digits": lambda v: cfg.set_min_digits(int(v)), - "min_special": lambda v: cfg.set_min_special(int(v)), - "quick_unlock": lambda v: cfg.set_quick_unlock( - v.lower() in ("1", "true", "yes", "y", "on") - ), - "verbose_timing": lambda v: cfg.set_verbose_timing( - v.lower() in ("1", "true", "yes", "y", "on") - ), - } - - action = mapping.get(key) - if action is None: - typer.echo("Unknown key") - raise typer.Exit(code=1) + service = _get_config_service(ctx) try: - action(value) + val = ( + [r.strip() for r in value.split(",") if r.strip()] + if key == "relays" + else value + ) + service.set(key, val) + except KeyError: + typer.echo("Unknown key") + raise typer.Exit(code=1) except Exception as exc: # pragma: no cover - pass through errors typer.echo(f"Error: {exc}") raise typer.Exit(code=1) @@ -544,11 +527,10 @@ def config_set(ctx: typer.Context, key: str, value: str) -> None: @config_app.command("toggle-secret-mode") def config_toggle_secret_mode(ctx: typer.Context) -> None: """Interactively enable or disable secret mode.""" - pm = _get_pm(ctx) - cfg = pm.config_manager + service = _get_config_service(ctx) try: - enabled = cfg.get_secret_mode_enabled() - delay = cfg.get_clipboard_clear_delay() + enabled = service.get_secret_mode_enabled() + delay = service.get_clipboard_clear_delay() except Exception as exc: # pragma: no cover - pass through errors typer.echo(f"Error loading settings: {exc}") raise typer.Exit(code=1) @@ -580,10 +562,7 @@ def config_toggle_secret_mode(ctx: typer.Context) -> None: raise typer.Exit(code=1) try: - cfg.set_secret_mode_enabled(enabled) - cfg.set_clipboard_clear_delay(delay) - pm.secret_mode_enabled = enabled - pm.clipboard_clear_delay = delay + service.set_secret_mode(enabled, delay) except Exception as exc: # pragma: no cover - pass through errors typer.echo(f"Error: {exc}") raise typer.Exit(code=1) @@ -595,10 +574,9 @@ def config_toggle_secret_mode(ctx: typer.Context) -> None: @config_app.command("toggle-offline") def config_toggle_offline(ctx: typer.Context) -> None: """Enable or disable offline mode.""" - pm = _get_pm(ctx) - cfg = pm.config_manager + service = _get_config_service(ctx) try: - enabled = cfg.get_offline_mode() + enabled = service.get_offline_mode() except Exception as exc: # pragma: no cover - pass through errors typer.echo(f"Error loading settings: {exc}") raise typer.Exit(code=1) @@ -617,8 +595,7 @@ def config_toggle_offline(ctx: typer.Context) -> None: enabled = False try: - cfg.set_offline_mode(enabled) - pm.offline_mode = enabled + service.set_offline_mode(enabled) except Exception as exc: # pragma: no cover - pass through errors typer.echo(f"Error: {exc}") raise typer.Exit(code=1) @@ -659,23 +636,23 @@ def fingerprint_switch(ctx: typer.Context, fingerprint: str) -> None: @util_app.command("generate-password") def generate_password(ctx: typer.Context, length: int = 24) -> None: """Generate a strong password.""" - pm = _get_pm(ctx) - password = pm.password_generator.generate_password(length) + service = _get_util_service(ctx) + password = service.generate_password(length) typer.echo(password) @util_app.command("verify-checksum") def verify_checksum(ctx: typer.Context) -> None: """Verify the SeedPass script checksum.""" - pm = _get_pm(ctx) - pm.handle_verify_checksum() + service = _get_util_service(ctx) + service.verify_checksum() @util_app.command("update-checksum") def update_checksum(ctx: typer.Context) -> None: """Regenerate the script checksum file.""" - pm = _get_pm(ctx) - pm.handle_update_script_checksum() + service = _get_util_service(ctx) + service.update_checksum() @api_app.command("start") diff --git a/src/seedpass/core/api.py b/src/seedpass/core/api.py index 03609a8..9f6671c 100644 --- a/src/seedpass/core/api.py +++ b/src/seedpass/core/api.py @@ -194,3 +194,330 @@ class SyncService: with self._lock: self._manager.start_background_vault_sync(summary) + + +class EntryService: + """Thread-safe wrapper around entry operations.""" + + def __init__(self, manager: PasswordManager) -> None: + self._manager = manager + self._lock = Lock() + + def list_entries( + self, + sort_by: str = "index", + filter_kind: str | None = None, + include_archived: bool = False, + ): + with self._lock: + return self._manager.entry_manager.list_entries( + sort_by=sort_by, + filter_kind=filter_kind, + include_archived=include_archived, + ) + + def search_entries(self, query: str): + with self._lock: + return self._manager.entry_manager.search_entries(query) + + def retrieve_entry(self, entry_id: int): + with self._lock: + return self._manager.entry_manager.retrieve_entry(entry_id) + + def generate_password(self, length: int, index: int) -> str: + with self._lock: + return self._manager.password_generator.generate_password(length, index) + + def get_totp_code(self, entry_id: int) -> str: + with self._lock: + return self._manager.entry_manager.get_totp_code( + entry_id, self._manager.parent_seed + ) + + def add_entry( + self, + label: str, + length: int, + username: str | None = None, + url: str | None = None, + ) -> int: + with self._lock: + idx = self._manager.entry_manager.add_entry(label, length, username, url) + self._manager.sync_vault() + return idx + + def add_totp( + self, + label: str, + *, + index: int | None = None, + secret: str | None = None, + period: int = 30, + digits: int = 6, + ) -> str: + with self._lock: + uri = self._manager.entry_manager.add_totp( + label, + self._manager.parent_seed, + index=index, + secret=secret, + period=period, + digits=digits, + ) + self._manager.sync_vault() + return uri + + def add_ssh_key( + self, + label: str, + *, + index: int | None = None, + notes: str = "", + ) -> int: + with self._lock: + idx = self._manager.entry_manager.add_ssh_key( + label, + self._manager.parent_seed, + index=index, + notes=notes, + ) + self._manager.sync_vault() + return idx + + def add_pgp_key( + self, + label: str, + *, + index: int | None = None, + key_type: str = "ed25519", + user_id: str = "", + notes: str = "", + ) -> int: + with self._lock: + idx = self._manager.entry_manager.add_pgp_key( + label, + self._manager.parent_seed, + index=index, + key_type=key_type, + user_id=user_id, + notes=notes, + ) + self._manager.sync_vault() + return idx + + def add_nostr_key( + self, + label: str, + *, + index: int | None = None, + notes: str = "", + ) -> int: + with self._lock: + idx = self._manager.entry_manager.add_nostr_key( + label, + index=index, + notes=notes, + ) + self._manager.sync_vault() + return idx + + def add_seed( + self, + label: str, + *, + index: int | None = None, + words: int = 24, + notes: str = "", + ) -> int: + with self._lock: + idx = self._manager.entry_manager.add_seed( + label, + self._manager.parent_seed, + index=index, + words_num=words, + notes=notes, + ) + self._manager.sync_vault() + return idx + + def add_key_value(self, label: str, value: str, *, notes: str = "") -> int: + with self._lock: + idx = self._manager.entry_manager.add_key_value(label, value, notes=notes) + self._manager.sync_vault() + return idx + + def add_managed_account( + self, + label: str, + *, + index: int | None = None, + notes: str = "", + ) -> int: + with self._lock: + idx = self._manager.entry_manager.add_managed_account( + label, + self._manager.parent_seed, + index=index, + notes=notes, + ) + self._manager.sync_vault() + return idx + + def modify_entry( + self, + entry_id: int, + *, + username: str | None = None, + url: str | None = None, + notes: str | None = None, + label: str | None = None, + period: int | None = None, + digits: int | None = None, + value: str | None = None, + ) -> None: + with self._lock: + self._manager.entry_manager.modify_entry( + entry_id, + username=username, + url=url, + notes=notes, + label=label, + period=period, + digits=digits, + value=value, + ) + self._manager.sync_vault() + + def archive_entry(self, entry_id: int) -> None: + with self._lock: + self._manager.entry_manager.archive_entry(entry_id) + self._manager.sync_vault() + + def restore_entry(self, entry_id: int) -> None: + with self._lock: + self._manager.entry_manager.restore_entry(entry_id) + self._manager.sync_vault() + + def export_totp_entries(self) -> dict: + with self._lock: + return self._manager.entry_manager.export_totp_entries( + self._manager.parent_seed + ) + + def display_totp_codes(self) -> None: + with self._lock: + self._manager.handle_display_totp_codes() + + +class ConfigService: + """Thread-safe wrapper around configuration access.""" + + def __init__(self, manager: PasswordManager) -> None: + self._manager = manager + self._lock = Lock() + + def get(self, key: str): + with self._lock: + return self._manager.config_manager.load_config(require_pin=False).get(key) + + def set(self, key: str, value: str) -> None: + cfg = self._manager.config_manager + mapping = { + "inactivity_timeout": ("set_inactivity_timeout", float), + "secret_mode_enabled": ( + "set_secret_mode_enabled", + lambda v: v.lower() in ("1", "true", "yes", "y", "on"), + ), + "clipboard_clear_delay": ("set_clipboard_clear_delay", int), + "additional_backup_path": ( + "set_additional_backup_path", + lambda v: v or None, + ), + "relays": ("set_relays", lambda v: (v, {"require_pin": False})), + "kdf_iterations": ("set_kdf_iterations", int), + "kdf_mode": ("set_kdf_mode", lambda v: v), + "backup_interval": ("set_backup_interval", float), + "nostr_max_retries": ("set_nostr_max_retries", int), + "nostr_retry_delay": ("set_nostr_retry_delay", float), + "min_uppercase": ("set_min_uppercase", int), + "min_lowercase": ("set_min_lowercase", int), + "min_digits": ("set_min_digits", int), + "min_special": ("set_min_special", int), + "quick_unlock": ( + "set_quick_unlock", + lambda v: v.lower() in ("1", "true", "yes", "y", "on"), + ), + } + entry = mapping.get(key) + if entry is None: + raise KeyError(key) + method_name, conv = entry + with self._lock: + result = conv(value) + if ( + isinstance(result, tuple) + and len(result) == 2 + and isinstance(result[1], dict) + ): + arg, kwargs = result + getattr(cfg, method_name)(arg, **kwargs) + else: + getattr(cfg, method_name)(result) + + def get_secret_mode_enabled(self) -> bool: + with self._lock: + return self._manager.config_manager.get_secret_mode_enabled() + + def get_clipboard_clear_delay(self) -> int: + with self._lock: + return self._manager.config_manager.get_clipboard_clear_delay() + + def set_secret_mode(self, enabled: bool, delay: int) -> None: + with self._lock: + cfg = self._manager.config_manager + cfg.set_secret_mode_enabled(enabled) + cfg.set_clipboard_clear_delay(delay) + self._manager.secret_mode_enabled = enabled + self._manager.clipboard_clear_delay = delay + + def get_offline_mode(self) -> bool: + with self._lock: + return self._manager.config_manager.get_offline_mode() + + def set_offline_mode(self, enabled: bool) -> None: + with self._lock: + cfg = self._manager.config_manager + cfg.set_offline_mode(enabled) + self._manager.offline_mode = enabled + + +class UtilityService: + """Miscellaneous helper operations.""" + + def __init__(self, manager: PasswordManager) -> None: + self._manager = manager + self._lock = Lock() + + def generate_password(self, length: int) -> str: + with self._lock: + return self._manager.password_generator.generate_password(length) + + def verify_checksum(self) -> None: + with self._lock: + self._manager.handle_verify_checksum() + + def update_checksum(self) -> None: + with self._lock: + self._manager.handle_update_script_checksum() + + +class NostrService: + """Nostr related helper methods.""" + + def __init__(self, manager: PasswordManager) -> None: + self._manager = manager + self._lock = Lock() + + def get_pubkey(self) -> str: + with self._lock: + return self._manager.nostr_client.key_manager.get_npub()