mirror of
https://github.com/PR0M3TH3AN/SeedPass.git
synced 2025-09-08 07:18:47 +00:00
Merge pull request #616 from PR0M3TH3AN/codex/refactor-cli.py-to-use-api-methods
Refactor CLI to call core API services
This commit is contained in:
@@ -10,6 +10,10 @@ from seedpass.core.api import (
|
||||
VaultService,
|
||||
ProfileService,
|
||||
SyncService,
|
||||
EntryService,
|
||||
ConfigService,
|
||||
UtilityService,
|
||||
NostrService,
|
||||
VaultExportRequest,
|
||||
VaultImportRequest,
|
||||
ChangePasswordRequest,
|
||||
@@ -73,6 +77,26 @@ def _get_services(
|
||||
return VaultService(pm), ProfileService(pm), SyncService(pm)
|
||||
|
||||
|
||||
def _get_entry_service(ctx: typer.Context) -> EntryService:
|
||||
pm = _get_pm(ctx)
|
||||
return EntryService(pm)
|
||||
|
||||
|
||||
def _get_config_service(ctx: typer.Context) -> ConfigService:
|
||||
pm = _get_pm(ctx)
|
||||
return ConfigService(pm)
|
||||
|
||||
|
||||
def _get_util_service(ctx: typer.Context) -> UtilityService:
|
||||
pm = _get_pm(ctx)
|
||||
return UtilityService(pm)
|
||||
|
||||
|
||||
def _get_nostr_service(ctx: typer.Context) -> NostrService:
|
||||
pm = _get_pm(ctx)
|
||||
return NostrService(pm)
|
||||
|
||||
|
||||
@app.callback(invoke_without_command=True)
|
||||
def main(ctx: typer.Context, fingerprint: Optional[str] = fingerprint_option) -> None:
|
||||
"""SeedPass CLI entry point.
|
||||
@@ -95,8 +119,8 @@ def entry_list(
|
||||
archived: bool = typer.Option(False, "--archived", help="Include archived"),
|
||||
) -> None:
|
||||
"""List entries in the vault."""
|
||||
pm = _get_pm(ctx)
|
||||
entries = pm.entry_manager.list_entries(
|
||||
service = _get_entry_service(ctx)
|
||||
entries = service.list_entries(
|
||||
sort_by=sort, filter_kind=kind, include_archived=archived
|
||||
)
|
||||
for idx, label, username, url, is_archived in entries:
|
||||
@@ -113,8 +137,8 @@ def entry_list(
|
||||
@entry_app.command("search")
|
||||
def entry_search(ctx: typer.Context, query: str) -> None:
|
||||
"""Search entries."""
|
||||
pm = _get_pm(ctx)
|
||||
results = pm.entry_manager.search_entries(query)
|
||||
service = _get_entry_service(ctx)
|
||||
results = service.search_entries(query)
|
||||
if not results:
|
||||
typer.echo("No matching entries found")
|
||||
return
|
||||
@@ -130,8 +154,8 @@ def entry_search(ctx: typer.Context, query: str) -> None:
|
||||
@entry_app.command("get")
|
||||
def entry_get(ctx: typer.Context, query: str) -> None:
|
||||
"""Retrieve a single entry's secret."""
|
||||
pm = _get_pm(ctx)
|
||||
matches = pm.entry_manager.search_entries(query)
|
||||
service = _get_entry_service(ctx)
|
||||
matches = service.search_entries(query)
|
||||
if len(matches) == 0:
|
||||
typer.echo("No matching entries found")
|
||||
raise typer.Exit(code=1)
|
||||
@@ -145,14 +169,14 @@ def entry_get(ctx: typer.Context, query: str) -> None:
|
||||
raise typer.Exit(code=1)
|
||||
|
||||
index = matches[0][0]
|
||||
entry = pm.entry_manager.retrieve_entry(index)
|
||||
entry = service.retrieve_entry(index)
|
||||
etype = entry.get("type", entry.get("kind"))
|
||||
if etype == EntryType.PASSWORD.value:
|
||||
length = int(entry.get("length", 12))
|
||||
password = pm.password_generator.generate_password(length, index)
|
||||
password = service.generate_password(length, index)
|
||||
typer.echo(password)
|
||||
elif etype == EntryType.TOTP.value:
|
||||
code = pm.entry_manager.get_totp_code(index, pm.parent_seed)
|
||||
code = service.get_totp_code(index)
|
||||
typer.echo(code)
|
||||
else:
|
||||
typer.echo("Unsupported entry type")
|
||||
@@ -168,10 +192,9 @@ def entry_add(
|
||||
url: Optional[str] = typer.Option(None, "--url"),
|
||||
) -> None:
|
||||
"""Add a new password entry and output its index."""
|
||||
pm = _get_pm(ctx)
|
||||
index = pm.entry_manager.add_entry(label, length, username, url)
|
||||
service = _get_entry_service(ctx)
|
||||
index = service.add_entry(label, length, username, url)
|
||||
typer.echo(str(index))
|
||||
pm.sync_vault()
|
||||
|
||||
|
||||
@entry_app.command("add-totp")
|
||||
@@ -184,17 +207,15 @@ def entry_add_totp(
|
||||
digits: int = typer.Option(6, "--digits", help="Number of TOTP digits"),
|
||||
) -> None:
|
||||
"""Add a TOTP entry and output the otpauth URI."""
|
||||
pm = _get_pm(ctx)
|
||||
uri = pm.entry_manager.add_totp(
|
||||
service = _get_entry_service(ctx)
|
||||
uri = service.add_totp(
|
||||
label,
|
||||
pm.parent_seed,
|
||||
index=index,
|
||||
secret=secret,
|
||||
period=period,
|
||||
digits=digits,
|
||||
)
|
||||
typer.echo(uri)
|
||||
pm.sync_vault()
|
||||
|
||||
|
||||
@entry_app.command("add-ssh")
|
||||
@@ -205,15 +226,13 @@ def entry_add_ssh(
|
||||
notes: str = typer.Option("", "--notes", help="Entry notes"),
|
||||
) -> None:
|
||||
"""Add an SSH key entry and output its index."""
|
||||
pm = _get_pm(ctx)
|
||||
idx = pm.entry_manager.add_ssh_key(
|
||||
service = _get_entry_service(ctx)
|
||||
idx = service.add_ssh_key(
|
||||
label,
|
||||
pm.parent_seed,
|
||||
index=index,
|
||||
notes=notes,
|
||||
)
|
||||
typer.echo(str(idx))
|
||||
pm.sync_vault()
|
||||
|
||||
|
||||
@entry_app.command("add-pgp")
|
||||
@@ -226,17 +245,15 @@ def entry_add_pgp(
|
||||
notes: str = typer.Option("", "--notes", help="Entry notes"),
|
||||
) -> None:
|
||||
"""Add a PGP key entry and output its index."""
|
||||
pm = _get_pm(ctx)
|
||||
idx = pm.entry_manager.add_pgp_key(
|
||||
service = _get_entry_service(ctx)
|
||||
idx = service.add_pgp_key(
|
||||
label,
|
||||
pm.parent_seed,
|
||||
index=index,
|
||||
key_type=key_type,
|
||||
user_id=user_id,
|
||||
notes=notes,
|
||||
)
|
||||
typer.echo(str(idx))
|
||||
pm.sync_vault()
|
||||
|
||||
|
||||
@entry_app.command("add-nostr")
|
||||
@@ -247,14 +264,13 @@ def entry_add_nostr(
|
||||
notes: str = typer.Option("", "--notes", help="Entry notes"),
|
||||
) -> None:
|
||||
"""Add a Nostr key entry and output its index."""
|
||||
pm = _get_pm(ctx)
|
||||
idx = pm.entry_manager.add_nostr_key(
|
||||
service = _get_entry_service(ctx)
|
||||
idx = service.add_nostr_key(
|
||||
label,
|
||||
index=index,
|
||||
notes=notes,
|
||||
)
|
||||
typer.echo(str(idx))
|
||||
pm.sync_vault()
|
||||
|
||||
|
||||
@entry_app.command("add-seed")
|
||||
@@ -266,16 +282,14 @@ def entry_add_seed(
|
||||
notes: str = typer.Option("", "--notes", help="Entry notes"),
|
||||
) -> None:
|
||||
"""Add a derived seed phrase entry and output its index."""
|
||||
pm = _get_pm(ctx)
|
||||
idx = pm.entry_manager.add_seed(
|
||||
service = _get_entry_service(ctx)
|
||||
idx = service.add_seed(
|
||||
label,
|
||||
pm.parent_seed,
|
||||
index=index,
|
||||
words_num=words,
|
||||
words=words,
|
||||
notes=notes,
|
||||
)
|
||||
typer.echo(str(idx))
|
||||
pm.sync_vault()
|
||||
|
||||
|
||||
@entry_app.command("add-key-value")
|
||||
@@ -286,10 +300,9 @@ def entry_add_key_value(
|
||||
notes: str = typer.Option("", "--notes", help="Entry notes"),
|
||||
) -> None:
|
||||
"""Add a key/value entry and output its index."""
|
||||
pm = _get_pm(ctx)
|
||||
idx = pm.entry_manager.add_key_value(label, value, notes=notes)
|
||||
service = _get_entry_service(ctx)
|
||||
idx = service.add_key_value(label, value, notes=notes)
|
||||
typer.echo(str(idx))
|
||||
pm.sync_vault()
|
||||
|
||||
|
||||
@entry_app.command("add-managed-account")
|
||||
@@ -300,15 +313,13 @@ def entry_add_managed_account(
|
||||
notes: str = typer.Option("", "--notes", help="Entry notes"),
|
||||
) -> None:
|
||||
"""Add a managed account seed entry and output its index."""
|
||||
pm = _get_pm(ctx)
|
||||
idx = pm.entry_manager.add_managed_account(
|
||||
service = _get_entry_service(ctx)
|
||||
idx = service.add_managed_account(
|
||||
label,
|
||||
pm.parent_seed,
|
||||
index=index,
|
||||
notes=notes,
|
||||
)
|
||||
typer.echo(str(idx))
|
||||
pm.sync_vault()
|
||||
|
||||
|
||||
@entry_app.command("modify")
|
||||
@@ -326,9 +337,9 @@ def entry_modify(
|
||||
value: Optional[str] = typer.Option(None, "--value", help="New value"),
|
||||
) -> None:
|
||||
"""Modify an existing entry."""
|
||||
pm = _get_pm(ctx)
|
||||
service = _get_entry_service(ctx)
|
||||
try:
|
||||
pm.entry_manager.modify_entry(
|
||||
service.modify_entry(
|
||||
entry_id,
|
||||
username=username,
|
||||
url=url,
|
||||
@@ -341,32 +352,29 @@ def entry_modify(
|
||||
except ValueError as e:
|
||||
typer.echo(str(e))
|
||||
raise typer.Exit(code=1)
|
||||
pm.sync_vault()
|
||||
|
||||
|
||||
@entry_app.command("archive")
|
||||
def entry_archive(ctx: typer.Context, entry_id: int) -> None:
|
||||
"""Archive an entry."""
|
||||
pm = _get_pm(ctx)
|
||||
pm.entry_manager.archive_entry(entry_id)
|
||||
service = _get_entry_service(ctx)
|
||||
service.archive_entry(entry_id)
|
||||
typer.echo(str(entry_id))
|
||||
pm.sync_vault()
|
||||
|
||||
|
||||
@entry_app.command("unarchive")
|
||||
def entry_unarchive(ctx: typer.Context, entry_id: int) -> None:
|
||||
"""Restore an archived entry."""
|
||||
pm = _get_pm(ctx)
|
||||
pm.entry_manager.restore_entry(entry_id)
|
||||
service = _get_entry_service(ctx)
|
||||
service.restore_entry(entry_id)
|
||||
typer.echo(str(entry_id))
|
||||
pm.sync_vault()
|
||||
|
||||
|
||||
@entry_app.command("totp-codes")
|
||||
def entry_totp_codes(ctx: typer.Context) -> None:
|
||||
"""Display all current TOTP codes."""
|
||||
pm = _get_pm(ctx)
|
||||
pm.handle_display_totp_codes()
|
||||
service = _get_entry_service(ctx)
|
||||
service.display_totp_codes()
|
||||
|
||||
|
||||
@entry_app.command("export-totp")
|
||||
@@ -374,8 +382,8 @@ def entry_export_totp(
|
||||
ctx: typer.Context, file: str = typer.Option(..., help="Output file")
|
||||
) -> None:
|
||||
"""Export all TOTP secrets to a JSON file."""
|
||||
pm = _get_pm(ctx)
|
||||
data = pm.entry_manager.export_totp_entries(pm.parent_seed)
|
||||
service = _get_entry_service(ctx)
|
||||
data = service.export_totp_entries()
|
||||
Path(file).write_text(json.dumps(data, indent=2))
|
||||
typer.echo(str(file))
|
||||
|
||||
@@ -478,16 +486,16 @@ def nostr_sync(ctx: typer.Context) -> None:
|
||||
@nostr_app.command("get-pubkey")
|
||||
def nostr_get_pubkey(ctx: typer.Context) -> None:
|
||||
"""Display the active profile's npub."""
|
||||
pm = _get_pm(ctx)
|
||||
npub = pm.nostr_client.key_manager.get_npub()
|
||||
service = _get_nostr_service(ctx)
|
||||
npub = service.get_pubkey()
|
||||
typer.echo(npub)
|
||||
|
||||
|
||||
@config_app.command("get")
|
||||
def config_get(ctx: typer.Context, key: str) -> None:
|
||||
"""Get a configuration value."""
|
||||
pm = _get_pm(ctx)
|
||||
value = pm.config_manager.load_config(require_pin=False).get(key)
|
||||
service = _get_config_service(ctx)
|
||||
value = service.get(key)
|
||||
if value is None:
|
||||
typer.echo("Key not found")
|
||||
else:
|
||||
@@ -497,43 +505,18 @@ def config_get(ctx: typer.Context, key: str) -> None:
|
||||
@config_app.command("set")
|
||||
def config_set(ctx: typer.Context, key: str, value: str) -> None:
|
||||
"""Set a configuration value."""
|
||||
pm = _get_pm(ctx)
|
||||
cfg = pm.config_manager
|
||||
|
||||
mapping = {
|
||||
"inactivity_timeout": lambda v: cfg.set_inactivity_timeout(float(v)),
|
||||
"secret_mode_enabled": lambda v: cfg.set_secret_mode_enabled(
|
||||
v.lower() in ("1", "true", "yes", "y", "on")
|
||||
),
|
||||
"clipboard_clear_delay": lambda v: cfg.set_clipboard_clear_delay(int(v)),
|
||||
"additional_backup_path": lambda v: cfg.set_additional_backup_path(v or None),
|
||||
"relays": lambda v: cfg.set_relays(
|
||||
[r.strip() for r in v.split(",") if r.strip()], require_pin=False
|
||||
),
|
||||
"kdf_iterations": lambda v: cfg.set_kdf_iterations(int(v)),
|
||||
"kdf_mode": lambda v: cfg.set_kdf_mode(v),
|
||||
"backup_interval": lambda v: cfg.set_backup_interval(float(v)),
|
||||
"nostr_max_retries": lambda v: cfg.set_nostr_max_retries(int(v)),
|
||||
"nostr_retry_delay": lambda v: cfg.set_nostr_retry_delay(float(v)),
|
||||
"min_uppercase": lambda v: cfg.set_min_uppercase(int(v)),
|
||||
"min_lowercase": lambda v: cfg.set_min_lowercase(int(v)),
|
||||
"min_digits": lambda v: cfg.set_min_digits(int(v)),
|
||||
"min_special": lambda v: cfg.set_min_special(int(v)),
|
||||
"quick_unlock": lambda v: cfg.set_quick_unlock(
|
||||
v.lower() in ("1", "true", "yes", "y", "on")
|
||||
),
|
||||
"verbose_timing": lambda v: cfg.set_verbose_timing(
|
||||
v.lower() in ("1", "true", "yes", "y", "on")
|
||||
),
|
||||
}
|
||||
|
||||
action = mapping.get(key)
|
||||
if action is None:
|
||||
typer.echo("Unknown key")
|
||||
raise typer.Exit(code=1)
|
||||
service = _get_config_service(ctx)
|
||||
|
||||
try:
|
||||
action(value)
|
||||
val = (
|
||||
[r.strip() for r in value.split(",") if r.strip()]
|
||||
if key == "relays"
|
||||
else value
|
||||
)
|
||||
service.set(key, val)
|
||||
except KeyError:
|
||||
typer.echo("Unknown key")
|
||||
raise typer.Exit(code=1)
|
||||
except Exception as exc: # pragma: no cover - pass through errors
|
||||
typer.echo(f"Error: {exc}")
|
||||
raise typer.Exit(code=1)
|
||||
@@ -544,11 +527,10 @@ def config_set(ctx: typer.Context, key: str, value: str) -> None:
|
||||
@config_app.command("toggle-secret-mode")
|
||||
def config_toggle_secret_mode(ctx: typer.Context) -> None:
|
||||
"""Interactively enable or disable secret mode."""
|
||||
pm = _get_pm(ctx)
|
||||
cfg = pm.config_manager
|
||||
service = _get_config_service(ctx)
|
||||
try:
|
||||
enabled = cfg.get_secret_mode_enabled()
|
||||
delay = cfg.get_clipboard_clear_delay()
|
||||
enabled = service.get_secret_mode_enabled()
|
||||
delay = service.get_clipboard_clear_delay()
|
||||
except Exception as exc: # pragma: no cover - pass through errors
|
||||
typer.echo(f"Error loading settings: {exc}")
|
||||
raise typer.Exit(code=1)
|
||||
@@ -580,10 +562,7 @@ def config_toggle_secret_mode(ctx: typer.Context) -> None:
|
||||
raise typer.Exit(code=1)
|
||||
|
||||
try:
|
||||
cfg.set_secret_mode_enabled(enabled)
|
||||
cfg.set_clipboard_clear_delay(delay)
|
||||
pm.secret_mode_enabled = enabled
|
||||
pm.clipboard_clear_delay = delay
|
||||
service.set_secret_mode(enabled, delay)
|
||||
except Exception as exc: # pragma: no cover - pass through errors
|
||||
typer.echo(f"Error: {exc}")
|
||||
raise typer.Exit(code=1)
|
||||
@@ -595,10 +574,9 @@ def config_toggle_secret_mode(ctx: typer.Context) -> None:
|
||||
@config_app.command("toggle-offline")
|
||||
def config_toggle_offline(ctx: typer.Context) -> None:
|
||||
"""Enable or disable offline mode."""
|
||||
pm = _get_pm(ctx)
|
||||
cfg = pm.config_manager
|
||||
service = _get_config_service(ctx)
|
||||
try:
|
||||
enabled = cfg.get_offline_mode()
|
||||
enabled = service.get_offline_mode()
|
||||
except Exception as exc: # pragma: no cover - pass through errors
|
||||
typer.echo(f"Error loading settings: {exc}")
|
||||
raise typer.Exit(code=1)
|
||||
@@ -617,8 +595,7 @@ def config_toggle_offline(ctx: typer.Context) -> None:
|
||||
enabled = False
|
||||
|
||||
try:
|
||||
cfg.set_offline_mode(enabled)
|
||||
pm.offline_mode = enabled
|
||||
service.set_offline_mode(enabled)
|
||||
except Exception as exc: # pragma: no cover - pass through errors
|
||||
typer.echo(f"Error: {exc}")
|
||||
raise typer.Exit(code=1)
|
||||
@@ -659,23 +636,23 @@ def fingerprint_switch(ctx: typer.Context, fingerprint: str) -> None:
|
||||
@util_app.command("generate-password")
|
||||
def generate_password(ctx: typer.Context, length: int = 24) -> None:
|
||||
"""Generate a strong password."""
|
||||
pm = _get_pm(ctx)
|
||||
password = pm.password_generator.generate_password(length)
|
||||
service = _get_util_service(ctx)
|
||||
password = service.generate_password(length)
|
||||
typer.echo(password)
|
||||
|
||||
|
||||
@util_app.command("verify-checksum")
|
||||
def verify_checksum(ctx: typer.Context) -> None:
|
||||
"""Verify the SeedPass script checksum."""
|
||||
pm = _get_pm(ctx)
|
||||
pm.handle_verify_checksum()
|
||||
service = _get_util_service(ctx)
|
||||
service.verify_checksum()
|
||||
|
||||
|
||||
@util_app.command("update-checksum")
|
||||
def update_checksum(ctx: typer.Context) -> None:
|
||||
"""Regenerate the script checksum file."""
|
||||
pm = _get_pm(ctx)
|
||||
pm.handle_update_script_checksum()
|
||||
service = _get_util_service(ctx)
|
||||
service.update_checksum()
|
||||
|
||||
|
||||
@api_app.command("start")
|
||||
|
@@ -194,3 +194,330 @@ class SyncService:
|
||||
|
||||
with self._lock:
|
||||
self._manager.start_background_vault_sync(summary)
|
||||
|
||||
|
||||
class EntryService:
|
||||
"""Thread-safe wrapper around entry operations."""
|
||||
|
||||
def __init__(self, manager: PasswordManager) -> None:
|
||||
self._manager = manager
|
||||
self._lock = Lock()
|
||||
|
||||
def list_entries(
|
||||
self,
|
||||
sort_by: str = "index",
|
||||
filter_kind: str | None = None,
|
||||
include_archived: bool = False,
|
||||
):
|
||||
with self._lock:
|
||||
return self._manager.entry_manager.list_entries(
|
||||
sort_by=sort_by,
|
||||
filter_kind=filter_kind,
|
||||
include_archived=include_archived,
|
||||
)
|
||||
|
||||
def search_entries(self, query: str):
|
||||
with self._lock:
|
||||
return self._manager.entry_manager.search_entries(query)
|
||||
|
||||
def retrieve_entry(self, entry_id: int):
|
||||
with self._lock:
|
||||
return self._manager.entry_manager.retrieve_entry(entry_id)
|
||||
|
||||
def generate_password(self, length: int, index: int) -> str:
|
||||
with self._lock:
|
||||
return self._manager.password_generator.generate_password(length, index)
|
||||
|
||||
def get_totp_code(self, entry_id: int) -> str:
|
||||
with self._lock:
|
||||
return self._manager.entry_manager.get_totp_code(
|
||||
entry_id, self._manager.parent_seed
|
||||
)
|
||||
|
||||
def add_entry(
|
||||
self,
|
||||
label: str,
|
||||
length: int,
|
||||
username: str | None = None,
|
||||
url: str | None = None,
|
||||
) -> int:
|
||||
with self._lock:
|
||||
idx = self._manager.entry_manager.add_entry(label, length, username, url)
|
||||
self._manager.sync_vault()
|
||||
return idx
|
||||
|
||||
def add_totp(
|
||||
self,
|
||||
label: str,
|
||||
*,
|
||||
index: int | None = None,
|
||||
secret: str | None = None,
|
||||
period: int = 30,
|
||||
digits: int = 6,
|
||||
) -> str:
|
||||
with self._lock:
|
||||
uri = self._manager.entry_manager.add_totp(
|
||||
label,
|
||||
self._manager.parent_seed,
|
||||
index=index,
|
||||
secret=secret,
|
||||
period=period,
|
||||
digits=digits,
|
||||
)
|
||||
self._manager.sync_vault()
|
||||
return uri
|
||||
|
||||
def add_ssh_key(
|
||||
self,
|
||||
label: str,
|
||||
*,
|
||||
index: int | None = None,
|
||||
notes: str = "",
|
||||
) -> int:
|
||||
with self._lock:
|
||||
idx = self._manager.entry_manager.add_ssh_key(
|
||||
label,
|
||||
self._manager.parent_seed,
|
||||
index=index,
|
||||
notes=notes,
|
||||
)
|
||||
self._manager.sync_vault()
|
||||
return idx
|
||||
|
||||
def add_pgp_key(
|
||||
self,
|
||||
label: str,
|
||||
*,
|
||||
index: int | None = None,
|
||||
key_type: str = "ed25519",
|
||||
user_id: str = "",
|
||||
notes: str = "",
|
||||
) -> int:
|
||||
with self._lock:
|
||||
idx = self._manager.entry_manager.add_pgp_key(
|
||||
label,
|
||||
self._manager.parent_seed,
|
||||
index=index,
|
||||
key_type=key_type,
|
||||
user_id=user_id,
|
||||
notes=notes,
|
||||
)
|
||||
self._manager.sync_vault()
|
||||
return idx
|
||||
|
||||
def add_nostr_key(
|
||||
self,
|
||||
label: str,
|
||||
*,
|
||||
index: int | None = None,
|
||||
notes: str = "",
|
||||
) -> int:
|
||||
with self._lock:
|
||||
idx = self._manager.entry_manager.add_nostr_key(
|
||||
label,
|
||||
index=index,
|
||||
notes=notes,
|
||||
)
|
||||
self._manager.sync_vault()
|
||||
return idx
|
||||
|
||||
def add_seed(
|
||||
self,
|
||||
label: str,
|
||||
*,
|
||||
index: int | None = None,
|
||||
words: int = 24,
|
||||
notes: str = "",
|
||||
) -> int:
|
||||
with self._lock:
|
||||
idx = self._manager.entry_manager.add_seed(
|
||||
label,
|
||||
self._manager.parent_seed,
|
||||
index=index,
|
||||
words_num=words,
|
||||
notes=notes,
|
||||
)
|
||||
self._manager.sync_vault()
|
||||
return idx
|
||||
|
||||
def add_key_value(self, label: str, value: str, *, notes: str = "") -> int:
|
||||
with self._lock:
|
||||
idx = self._manager.entry_manager.add_key_value(label, value, notes=notes)
|
||||
self._manager.sync_vault()
|
||||
return idx
|
||||
|
||||
def add_managed_account(
|
||||
self,
|
||||
label: str,
|
||||
*,
|
||||
index: int | None = None,
|
||||
notes: str = "",
|
||||
) -> int:
|
||||
with self._lock:
|
||||
idx = self._manager.entry_manager.add_managed_account(
|
||||
label,
|
||||
self._manager.parent_seed,
|
||||
index=index,
|
||||
notes=notes,
|
||||
)
|
||||
self._manager.sync_vault()
|
||||
return idx
|
||||
|
||||
def modify_entry(
|
||||
self,
|
||||
entry_id: int,
|
||||
*,
|
||||
username: str | None = None,
|
||||
url: str | None = None,
|
||||
notes: str | None = None,
|
||||
label: str | None = None,
|
||||
period: int | None = None,
|
||||
digits: int | None = None,
|
||||
value: str | None = None,
|
||||
) -> None:
|
||||
with self._lock:
|
||||
self._manager.entry_manager.modify_entry(
|
||||
entry_id,
|
||||
username=username,
|
||||
url=url,
|
||||
notes=notes,
|
||||
label=label,
|
||||
period=period,
|
||||
digits=digits,
|
||||
value=value,
|
||||
)
|
||||
self._manager.sync_vault()
|
||||
|
||||
def archive_entry(self, entry_id: int) -> None:
|
||||
with self._lock:
|
||||
self._manager.entry_manager.archive_entry(entry_id)
|
||||
self._manager.sync_vault()
|
||||
|
||||
def restore_entry(self, entry_id: int) -> None:
|
||||
with self._lock:
|
||||
self._manager.entry_manager.restore_entry(entry_id)
|
||||
self._manager.sync_vault()
|
||||
|
||||
def export_totp_entries(self) -> dict:
|
||||
with self._lock:
|
||||
return self._manager.entry_manager.export_totp_entries(
|
||||
self._manager.parent_seed
|
||||
)
|
||||
|
||||
def display_totp_codes(self) -> None:
|
||||
with self._lock:
|
||||
self._manager.handle_display_totp_codes()
|
||||
|
||||
|
||||
class ConfigService:
|
||||
"""Thread-safe wrapper around configuration access."""
|
||||
|
||||
def __init__(self, manager: PasswordManager) -> None:
|
||||
self._manager = manager
|
||||
self._lock = Lock()
|
||||
|
||||
def get(self, key: str):
|
||||
with self._lock:
|
||||
return self._manager.config_manager.load_config(require_pin=False).get(key)
|
||||
|
||||
def set(self, key: str, value: str) -> None:
|
||||
cfg = self._manager.config_manager
|
||||
mapping = {
|
||||
"inactivity_timeout": ("set_inactivity_timeout", float),
|
||||
"secret_mode_enabled": (
|
||||
"set_secret_mode_enabled",
|
||||
lambda v: v.lower() in ("1", "true", "yes", "y", "on"),
|
||||
),
|
||||
"clipboard_clear_delay": ("set_clipboard_clear_delay", int),
|
||||
"additional_backup_path": (
|
||||
"set_additional_backup_path",
|
||||
lambda v: v or None,
|
||||
),
|
||||
"relays": ("set_relays", lambda v: (v, {"require_pin": False})),
|
||||
"kdf_iterations": ("set_kdf_iterations", int),
|
||||
"kdf_mode": ("set_kdf_mode", lambda v: v),
|
||||
"backup_interval": ("set_backup_interval", float),
|
||||
"nostr_max_retries": ("set_nostr_max_retries", int),
|
||||
"nostr_retry_delay": ("set_nostr_retry_delay", float),
|
||||
"min_uppercase": ("set_min_uppercase", int),
|
||||
"min_lowercase": ("set_min_lowercase", int),
|
||||
"min_digits": ("set_min_digits", int),
|
||||
"min_special": ("set_min_special", int),
|
||||
"quick_unlock": (
|
||||
"set_quick_unlock",
|
||||
lambda v: v.lower() in ("1", "true", "yes", "y", "on"),
|
||||
),
|
||||
}
|
||||
entry = mapping.get(key)
|
||||
if entry is None:
|
||||
raise KeyError(key)
|
||||
method_name, conv = entry
|
||||
with self._lock:
|
||||
result = conv(value)
|
||||
if (
|
||||
isinstance(result, tuple)
|
||||
and len(result) == 2
|
||||
and isinstance(result[1], dict)
|
||||
):
|
||||
arg, kwargs = result
|
||||
getattr(cfg, method_name)(arg, **kwargs)
|
||||
else:
|
||||
getattr(cfg, method_name)(result)
|
||||
|
||||
def get_secret_mode_enabled(self) -> bool:
|
||||
with self._lock:
|
||||
return self._manager.config_manager.get_secret_mode_enabled()
|
||||
|
||||
def get_clipboard_clear_delay(self) -> int:
|
||||
with self._lock:
|
||||
return self._manager.config_manager.get_clipboard_clear_delay()
|
||||
|
||||
def set_secret_mode(self, enabled: bool, delay: int) -> None:
|
||||
with self._lock:
|
||||
cfg = self._manager.config_manager
|
||||
cfg.set_secret_mode_enabled(enabled)
|
||||
cfg.set_clipboard_clear_delay(delay)
|
||||
self._manager.secret_mode_enabled = enabled
|
||||
self._manager.clipboard_clear_delay = delay
|
||||
|
||||
def get_offline_mode(self) -> bool:
|
||||
with self._lock:
|
||||
return self._manager.config_manager.get_offline_mode()
|
||||
|
||||
def set_offline_mode(self, enabled: bool) -> None:
|
||||
with self._lock:
|
||||
cfg = self._manager.config_manager
|
||||
cfg.set_offline_mode(enabled)
|
||||
self._manager.offline_mode = enabled
|
||||
|
||||
|
||||
class UtilityService:
|
||||
"""Miscellaneous helper operations."""
|
||||
|
||||
def __init__(self, manager: PasswordManager) -> None:
|
||||
self._manager = manager
|
||||
self._lock = Lock()
|
||||
|
||||
def generate_password(self, length: int) -> str:
|
||||
with self._lock:
|
||||
return self._manager.password_generator.generate_password(length)
|
||||
|
||||
def verify_checksum(self) -> None:
|
||||
with self._lock:
|
||||
self._manager.handle_verify_checksum()
|
||||
|
||||
def update_checksum(self) -> None:
|
||||
with self._lock:
|
||||
self._manager.handle_update_script_checksum()
|
||||
|
||||
|
||||
class NostrService:
|
||||
"""Nostr related helper methods."""
|
||||
|
||||
def __init__(self, manager: PasswordManager) -> None:
|
||||
self._manager = manager
|
||||
self._lock = Lock()
|
||||
|
||||
def get_pubkey(self) -> str:
|
||||
with self._lock:
|
||||
return self._manager.nostr_client.key_manager.get_npub()
|
||||
|
Reference in New Issue
Block a user