From 26632c0e707167faa98a4c338c10ee34073a0cc4 Mon Sep 17 00:00:00 2001 From: thePR0M3TH3AN <53631862+PR0M3TH3AN@users.noreply.github.com> Date: Wed, 20 Aug 2025 17:23:10 -0400 Subject: [PATCH] Fix KDF metadata handling and headless password prompts --- src/seedpass/core/encryption.py | 52 +++++++++++++------ src/tests/test_legacy_migration.py | 4 +- src/tests/test_legacy_migration_iterations.py | 4 +- src/tests/test_legacy_migration_prompt.py | 4 +- .../test_legacy_migration_second_session.py | 7 ++- src/tests/test_seed_migration.py | 5 +- src/utils/password_prompt.py | 17 ++++++ src/utils/seed_prompt.py | 8 +-- 8 files changed, 75 insertions(+), 26 deletions(-) diff --git a/src/seedpass/core/encryption.py b/src/seedpass/core/encryption.py index 3b5e6b6..8218c88 100644 --- a/src/seedpass/core/encryption.py +++ b/src/seedpass/core/encryption.py @@ -264,13 +264,31 @@ class EncryptionManager: return json_lib.dumps(payload, separators=(",", ":")).encode("utf-8") def _deserialize(self, blob: bytes) -> Tuple[KdfConfig, bytes]: - if USE_ORJSON: - obj = json_lib.loads(blob) - else: - obj = json_lib.loads(blob.decode("utf-8")) - kdf = KdfConfig(**obj.get("kdf", {})) - ct = base64.b64decode(obj.get("ct", "")) - return kdf, ct + """Return ``(KdfConfig, ciphertext)`` from serialized *blob*. + + Legacy files stored the raw ciphertext without a JSON wrapper. If + decoding the wrapper fails, treat ``blob`` as the ciphertext and return + a default HKDF configuration. + """ + + try: + if USE_ORJSON: + obj = json_lib.loads(blob) + else: + obj = json_lib.loads(blob.decode("utf-8")) + kdf = KdfConfig(**obj.get("kdf", {})) + ct_b64 = obj.get("ct", "") + ciphertext = base64.b64decode(ct_b64) + if ciphertext: + return kdf, ciphertext + except Exception: # pragma: no cover - fall back to legacy path + pass + + # Legacy format: ``blob`` already contains the ciphertext + return ( + KdfConfig(name="hkdf", version=CURRENT_KDF_VERSION, params={}, salt_b64=""), + blob, + ) def encrypt_and_save_file( self, data: bytes, relative_path: Path, *, kdf: Optional[KdfConfig] = None @@ -332,7 +350,12 @@ class EncryptionManager: relative_path = Path("seedpass_entries_db.json.enc") file_path = self.resolve_relative_path(relative_path) if not file_path.exists(): - return {"entries": {}} + empty: dict = {"entries": {}} + if return_kdf: + return empty, KdfConfig( + name="hkdf", version=CURRENT_KDF_VERSION, params={}, salt_b64="" + ) + return empty with exclusive_lock(file_path) as fh: fh.seek(0) @@ -400,7 +423,8 @@ class EncryptionManager: if relative_path is None: relative_path = Path("seedpass_entries_db.json.enc") - is_legacy = not encrypted_data.startswith(b"V2:") + kdf, ciphertext = self._deserialize(encrypted_data) + is_legacy = not ciphertext.startswith(b"V2:") self.last_migration_performed = False def _process(decrypted: bytes) -> dict: @@ -426,11 +450,9 @@ class EncryptionManager: return data try: - decrypted_data = self.decrypt_data( - encrypted_data, context=str(relative_path) - ) + decrypted_data = self.decrypt_data(ciphertext, context=str(relative_path)) data = _process(decrypted_data) - self.save_json_data(data, relative_path) # This always saves in V2 format + self.save_json_data(data, relative_path, kdf=kdf) self.update_checksum(relative_path) logger.info("Index file from Nostr was processed and saved successfully.") self.last_migration_performed = is_legacy @@ -441,10 +463,10 @@ class EncryptionManager: "Enter your master password for legacy decryption: " ) decrypted_data = self.decrypt_legacy( - encrypted_data, password, context=str(relative_path) + ciphertext, password, context=str(relative_path) ) data = _process(decrypted_data) - self.save_json_data(data, relative_path) + self.save_json_data(data, relative_path, kdf=kdf) self.update_checksum(relative_path) logger.warning( "Index decrypted using legacy password-only key derivation." diff --git a/src/tests/test_legacy_migration.py b/src/tests/test_legacy_migration.py index a135039..fa2b25f 100644 --- a/src/tests/test_legacy_migration.py +++ b/src/tests/test_legacy_migration.py @@ -1,4 +1,5 @@ import json +import base64 import hashlib from pathlib import Path @@ -99,7 +100,8 @@ def test_migrated_index_has_v2_prefix(monkeypatch, tmp_path: Path): vault.load_index() new_file = tmp_path / "seedpass_entries_db.json.enc" - assert new_file.read_bytes().startswith(b"V2:") + payload = json.loads(new_file.read_text()) + assert base64.b64decode(payload["ct"]).startswith(b"V2:") assert vault.migrated_from_legacy diff --git a/src/tests/test_legacy_migration_iterations.py b/src/tests/test_legacy_migration_iterations.py index ad6087f..e5f791a 100644 --- a/src/tests/test_legacy_migration_iterations.py +++ b/src/tests/test_legacy_migration_iterations.py @@ -66,5 +66,5 @@ def test_migrate_iterations(tmp_path, monkeypatch, iterations): cfg = ConfigManager(vault, tmp_path) assert cfg.get_kdf_iterations() == iterations - content = (tmp_path / "seedpass_entries_db.json.enc").read_bytes() - assert content.startswith(b"V2:") + payload = json.loads((tmp_path / "seedpass_entries_db.json.enc").read_text()) + assert base64.b64decode(payload["ct"]).startswith(b"V2:") diff --git a/src/tests/test_legacy_migration_prompt.py b/src/tests/test_legacy_migration_prompt.py index e7ac7ef..bbc2b43 100644 --- a/src/tests/test_legacy_migration_prompt.py +++ b/src/tests/test_legacy_migration_prompt.py @@ -50,6 +50,6 @@ def test_migrate_legacy_sets_flag(tmp_path, monkeypatch): monkeypatch.setattr(vault_module, "prompt_existing_password", lambda _: password) monkeypatch.setattr("builtins.input", lambda _: "2") vault.load_index() - content = (tmp_path / "seedpass_entries_db.json.enc").read_bytes() - assert content.startswith(b"V2:") + payload = json.loads((tmp_path / "seedpass_entries_db.json.enc").read_text()) + assert base64.b64decode(payload["ct"]).startswith(b"V2:") assert vault.encryption_manager.last_migration_performed is True diff --git a/src/tests/test_legacy_migration_second_session.py b/src/tests/test_legacy_migration_second_session.py index e83db3a..99654cd 100644 --- a/src/tests/test_legacy_migration_second_session.py +++ b/src/tests/test_legacy_migration_second_session.py @@ -1,4 +1,5 @@ import json +import base64 import hashlib from pathlib import Path from types import SimpleNamespace @@ -34,7 +35,8 @@ def test_legacy_migration_second_session(monkeypatch, tmp_path: Path) -> None: monkeypatch.setattr("builtins.input", lambda *_a, **_k: "y") vault.load_index() new_file = fp_dir / "seedpass_entries_db.json.enc" - assert new_file.read_bytes().startswith(b"V2:") + payload = json.loads(new_file.read_text()) + assert base64.b64decode(payload["ct"]).startswith(b"V2:") new_enc_mgr = EncryptionManager(key, fp_dir) new_vault = Vault(new_enc_mgr, fp_dir) @@ -59,4 +61,5 @@ def test_legacy_migration_second_session(monkeypatch, tmp_path: Path) -> None: ) pm.initialize_managers() - assert new_file.read_bytes().startswith(b"V2:") + payload = json.loads(new_file.read_text()) + assert base64.b64decode(payload["ct"]).startswith(b"V2:") diff --git a/src/tests/test_seed_migration.py b/src/tests/test_seed_migration.py index a552187..5f5b28b 100644 --- a/src/tests/test_seed_migration.py +++ b/src/tests/test_seed_migration.py @@ -1,4 +1,6 @@ import sys +import json +import base64 from pathlib import Path from cryptography.fernet import Fernet @@ -28,4 +30,5 @@ def test_parent_seed_migrates_from_fernet(tmp_path: Path) -> None: assert new_file.exists() assert new_file.read_bytes() != encrypted - assert new_file.read_bytes().startswith(b"V2:") + payload = json.loads(new_file.read_text()) + assert base64.b64decode(payload["ct"]).startswith(b"V2:") diff --git a/src/utils/password_prompt.py b/src/utils/password_prompt.py index 09cc86c..40f1231 100644 --- a/src/utils/password_prompt.py +++ b/src/utils/password_prompt.py @@ -33,6 +33,12 @@ logger = logging.getLogger(__name__) DEFAULT_MAX_ATTEMPTS = 5 +def _env_password() -> str | None: + """Return a password supplied via environment for non-interactive use.""" + + return os.getenv("SEEDPASS_TEST_PASSWORD") or os.getenv("SEEDPASS_PASSWORD") + + def _get_max_attempts(override: int | None = None) -> int: """Return the configured maximum number of prompt attempts.""" @@ -80,6 +86,13 @@ def prompt_new_password(max_retries: int | None = None) -> str: Raises: PasswordPromptError: If the user fails to provide a valid password after multiple attempts. """ + env_pw = _env_password() + if env_pw: + normalized = unicodedata.normalize("NFKD", env_pw) + if len(normalized) < MIN_PASSWORD_LENGTH: + raise PasswordPromptError("Environment password too short") + return normalized + max_retries = _get_max_attempts(max_retries) attempts = 0 @@ -164,6 +177,10 @@ def prompt_existing_password( PasswordPromptError: If the user interrupts the operation or exceeds ``max_retries`` attempts. """ + env_pw = _env_password() + if env_pw: + return unicodedata.normalize("NFKD", env_pw) + max_retries = _get_max_attempts(max_retries) attempts = 0 while max_retries == 0 or attempts < max_retries: diff --git a/src/utils/seed_prompt.py b/src/utils/seed_prompt.py index af806ca..7ff0130 100644 --- a/src/utils/seed_prompt.py +++ b/src/utils/seed_prompt.py @@ -102,9 +102,11 @@ def _masked_input_posix(prompt: str) -> str: def masked_input(prompt: str) -> str: """Return input from the user while masking typed characters.""" - if sys.platform == "win32": - return _masked_input_windows(prompt) - return _masked_input_posix(prompt) + func = _masked_input_windows if sys.platform == "win32" else _masked_input_posix + try: + return func(prompt) + except Exception: # pragma: no cover - fallback when TTY operations fail + return input(prompt) def prompt_seed_words(count: int = 12, *, max_attempts: int | None = None) -> str: