From 06ca51993a6d1f8c5b7233102b3cf472647a2682 Mon Sep 17 00:00:00 2001 From: thePR0M3TH3AN <53631862+PR0M3TH3AN@users.noreply.github.com> Date: Tue, 19 Aug 2025 09:53:46 -0400 Subject: [PATCH 1/2] Add KDF config with per-file metadata --- src/seedpass/core/encryption.py | 76 +++++++++++++++++++------ src/seedpass/core/manager.py | 12 ++-- src/seedpass/core/vault.py | 38 +++++++++++-- src/tests/test_fuzz_key_derivation.py | 21 ++++++- src/tests/test_kdf_modes.py | 50 +++++++++++++++-- src/tests/test_key_derivation.py | 20 ++++--- src/utils/key_derivation.py | 80 +++++++++++++++------------ 7 files changed, 221 insertions(+), 76 deletions(-) diff --git a/src/seedpass/core/encryption.py b/src/seedpass/core/encryption.py index f685841..3b5e6b6 100644 --- a/src/seedpass/core/encryption.py +++ b/src/seedpass/core/encryption.py @@ -16,8 +16,9 @@ except Exception: # pragma: no cover - fallback for environments without orjson import hashlib import os import base64 +from dataclasses import asdict from pathlib import Path -from typing import Optional +from typing import Optional, Tuple from cryptography.hazmat.primitives.ciphers.aead import AESGCM from cryptography.exceptions import InvalidTag @@ -26,6 +27,7 @@ from termcolor import colored from utils.file_lock import exclusive_lock from mnemonic import Mnemonic from utils.password_prompt import prompt_existing_password +from utils.key_derivation import KdfConfig, CURRENT_KDF_VERSION # Instantiate the logger logger = logging.getLogger(__name__) @@ -231,40 +233,58 @@ class EncryptionManager: raise ValueError("Invalid path outside fingerprint directory") return candidate - def encrypt_parent_seed(self, parent_seed: str) -> None: + def encrypt_parent_seed( + self, parent_seed: str, kdf: Optional[KdfConfig] = None + ) -> None: """Encrypts and saves the parent seed to 'parent_seed.enc'.""" data = parent_seed.encode("utf-8") - encrypted_data = self.encrypt_data(data) # This now creates V2 format - with exclusive_lock(self.parent_seed_file) as fh: - fh.seek(0) - fh.truncate() - fh.write(encrypted_data) - os.chmod(self.parent_seed_file, 0o600) + self.encrypt_and_save_file(data, self.parent_seed_file, kdf=kdf) logger.info(f"Parent seed encrypted and saved to '{self.parent_seed_file}'.") def decrypt_parent_seed(self) -> str: """Decrypts and returns the parent seed, handling migration.""" with exclusive_lock(self.parent_seed_file) as fh: fh.seek(0) - encrypted_data = fh.read() + blob = fh.read() + kdf, encrypted_data = self._deserialize(blob) is_legacy = not encrypted_data.startswith(b"V2:") decrypted_data = self.decrypt_data(encrypted_data, context="seed") if is_legacy: logger.info("Parent seed was in legacy format. Re-encrypting to V2 format.") - self.encrypt_parent_seed(decrypted_data.decode("utf-8").strip()) + self.encrypt_parent_seed(decrypted_data.decode("utf-8").strip(), kdf=kdf) return decrypted_data.decode("utf-8").strip() - def encrypt_and_save_file(self, data: bytes, relative_path: Path) -> None: + def _serialize(self, kdf: KdfConfig, ciphertext: bytes) -> bytes: + payload = {"kdf": asdict(kdf), "ct": base64.b64encode(ciphertext).decode()} + if USE_ORJSON: + return json_lib.dumps(payload) + return json_lib.dumps(payload, separators=(",", ":")).encode("utf-8") + + def _deserialize(self, blob: bytes) -> Tuple[KdfConfig, bytes]: + if USE_ORJSON: + obj = json_lib.loads(blob) + else: + obj = json_lib.loads(blob.decode("utf-8")) + kdf = KdfConfig(**obj.get("kdf", {})) + ct = base64.b64decode(obj.get("ct", "")) + return kdf, ct + + def encrypt_and_save_file( + self, data: bytes, relative_path: Path, *, kdf: Optional[KdfConfig] = None + ) -> None: + if kdf is None: + kdf = KdfConfig() file_path = self.resolve_relative_path(relative_path) file_path.parent.mkdir(parents=True, exist_ok=True) encrypted_data = self.encrypt_data(data) + payload = self._serialize(kdf, encrypted_data) with exclusive_lock(file_path) as fh: fh.seek(0) fh.truncate() - fh.write(encrypted_data) + fh.write(payload) fh.flush() os.fsync(fh.fileno()) os.chmod(file_path, 0o600) @@ -273,20 +293,37 @@ class EncryptionManager: file_path = self.resolve_relative_path(relative_path) with exclusive_lock(file_path) as fh: fh.seek(0) - encrypted_data = fh.read() + blob = fh.read() + _, encrypted_data = self._deserialize(blob) return self.decrypt_data(encrypted_data, context=str(relative_path)) - def save_json_data(self, data: dict, relative_path: Optional[Path] = None) -> None: + def get_file_kdf(self, relative_path: Path) -> KdfConfig: + file_path = self.resolve_relative_path(relative_path) + with exclusive_lock(file_path) as fh: + fh.seek(0) + blob = fh.read() + kdf, _ = self._deserialize(blob) + return kdf + + def save_json_data( + self, + data: dict, + relative_path: Optional[Path] = None, + *, + kdf: Optional[KdfConfig] = None, + ) -> None: if relative_path is None: relative_path = Path("seedpass_entries_db.json.enc") if USE_ORJSON: json_data = json_lib.dumps(data) else: json_data = json_lib.dumps(data, separators=(",", ":")).encode("utf-8") - self.encrypt_and_save_file(json_data, relative_path) + self.encrypt_and_save_file(json_data, relative_path, kdf=kdf) logger.debug(f"JSON data encrypted and saved to '{relative_path}'.") - def load_json_data(self, relative_path: Optional[Path] = None) -> dict: + def load_json_data( + self, relative_path: Optional[Path] = None, *, return_kdf: bool = False + ) -> dict | Tuple[dict, KdfConfig]: """ Loads and decrypts JSON data, automatically migrating and re-saving if it's in the legacy format. @@ -299,8 +336,9 @@ class EncryptionManager: with exclusive_lock(file_path) as fh: fh.seek(0) - encrypted_data = fh.read() + blob = fh.read() + kdf, encrypted_data = self._deserialize(blob) is_legacy = not encrypted_data.startswith(b"V2:") self.last_migration_performed = False @@ -316,10 +354,12 @@ class EncryptionManager: # If it was a legacy file, re-save it in the new format now if is_legacy and self._legacy_migrate_flag: logger.info(f"Migrating and re-saving legacy vault file: {file_path}") - self.save_json_data(data, relative_path) + self.save_json_data(data, relative_path, kdf=kdf) self.update_checksum(relative_path) self.last_migration_performed = True + if return_kdf: + return data, kdf return data except (InvalidToken, InvalidTag, JSONDecodeError) as e: logger.error( diff --git a/src/seedpass/core/manager.py b/src/seedpass/core/manager.py index d692752..4c1873e 100644 --- a/src/seedpass/core/manager.py +++ b/src/seedpass/core/manager.py @@ -15,6 +15,7 @@ import logging import os import hashlib import hmac +import base64 from typing import Optional, Literal, Any import shutil import time @@ -46,6 +47,7 @@ from utils.key_derivation import ( derive_key_from_password_argon2, derive_index_key, EncryptionMode, + KdfConfig, ) from utils.checksum import ( calculate_checksum, @@ -689,9 +691,9 @@ class PasswordManager: for iter_try in dict.fromkeys(iter_candidates): try: if mode == "argon2": - seed_key = derive_key_from_password_argon2( - password, salt_fp - ) + salt = hashlib.sha256(salt_fp.encode()).digest()[:16] + cfg = KdfConfig(salt_b64=base64.b64encode(salt).decode()) + seed_key = derive_key_from_password_argon2(password, cfg) else: seed_key = derive_key_from_password( password, salt_fp, iterations=iter_try @@ -771,7 +773,9 @@ class PasswordManager: ) salt_fp = fingerprint_dir.name if mode == "argon2": - seed_key = derive_key_from_password_argon2(password, salt_fp) + salt = hashlib.sha256(salt_fp.encode()).digest()[:16] + cfg = KdfConfig(salt_b64=base64.b64encode(salt).decode()) + seed_key = derive_key_from_password_argon2(password, cfg) else: seed_key = derive_key_from_password( password, salt_fp, iterations=iterations diff --git a/src/seedpass/core/vault.py b/src/seedpass/core/vault.py index f13d1ce..a0a639d 100644 --- a/src/seedpass/core/vault.py +++ b/src/seedpass/core/vault.py @@ -14,6 +14,7 @@ from .encryption import ( USE_ORJSON, json_lib, ) +from utils.key_derivation import KdfConfig, CURRENT_KDF_VERSION from utils.password_prompt import prompt_existing_password @@ -38,6 +39,11 @@ class Vault: """Replace the internal encryption manager.""" self.encryption_manager = manager + def _hkdf_kdf(self) -> KdfConfig: + return KdfConfig( + name="hkdf", version=CURRENT_KDF_VERSION, params={}, salt_b64="" + ) + # ----- Password index helpers ----- def load_index(self, *, return_migration_flags: bool = False): """Return decrypted password index data, applying migrations. @@ -102,10 +108,24 @@ class Vault: ) try: - data = self.encryption_manager.load_json_data(self.index_file) + data, kdf = self.encryption_manager.load_json_data( + self.index_file, return_kdf=True + ) migration_performed = getattr( self.encryption_manager, "last_migration_performed", False ) + if kdf.version < CURRENT_KDF_VERSION: + new_kdf = KdfConfig( + name=kdf.name, + version=CURRENT_KDF_VERSION, + params=kdf.params, + salt_b64=kdf.salt_b64, + ) + self.encryption_manager.save_json_data( + data, self.index_file, kdf=new_kdf + ) + self.encryption_manager.update_checksum(self.index_file) + migration_performed = True except LegacyFormatRequiresMigrationError: print( colored( @@ -142,7 +162,9 @@ class Vault: else: data = json_lib.loads(decrypted.decode("utf-8")) if self.encryption_manager._legacy_migrate_flag: - self.encryption_manager.save_json_data(data, self.index_file) + self.encryption_manager.save_json_data( + data, self.index_file, kdf=self._hkdf_kdf() + ) self.encryption_manager.update_checksum(self.index_file) migration_performed = getattr( self.encryption_manager, "last_migration_performed", False @@ -181,7 +203,9 @@ class Vault: try: data = apply_migrations(data) if schema_migrated: - self.encryption_manager.save_json_data(data, self.index_file) + self.encryption_manager.save_json_data( + data, self.index_file, kdf=self._hkdf_kdf() + ) self.encryption_manager.update_checksum(self.index_file) except Exception as exc: # noqa: BLE001 - surface clear error and restore if legacy_detected and backup_dir is not None: @@ -214,7 +238,9 @@ class Vault: def save_index(self, data: dict) -> None: """Encrypt and write password index.""" - self.encryption_manager.save_json_data(data, self.index_file) + self.encryption_manager.save_json_data( + data, self.index_file, kdf=self._hkdf_kdf() + ) def get_encrypted_index(self) -> Optional[bytes]: """Return the encrypted index bytes if present.""" @@ -252,4 +278,6 @@ class Vault: def save_config(self, config: dict) -> None: """Encrypt and persist configuration.""" - self.encryption_manager.save_json_data(config, self.config_file) + self.encryption_manager.save_json_data( + config, self.config_file, kdf=self._hkdf_kdf() + ) diff --git a/src/tests/test_fuzz_key_derivation.py b/src/tests/test_fuzz_key_derivation.py index a15b760..ac85e41 100644 --- a/src/tests/test_fuzz_key_derivation.py +++ b/src/tests/test_fuzz_key_derivation.py @@ -3,11 +3,15 @@ from pathlib import Path from hypothesis import given, strategies as st, settings, HealthCheck from mnemonic import Mnemonic +import hashlib +import base64 +import os from utils.key_derivation import ( derive_key_from_password, derive_key_from_password_argon2, derive_index_key, + KdfConfig, ) from utils.fingerprint import generate_fingerprint from seedpass.core.encryption import EncryptionManager @@ -36,16 +40,27 @@ def test_fuzz_key_round_trip(password, seed_bytes, config, mode, tmp_path: Path) seed_phrase = Mnemonic("english").to_mnemonic(seed_bytes) fp = generate_fingerprint(seed_phrase) if mode == "argon2": - key = derive_key_from_password_argon2( - password, fp, time_cost=1, memory_cost=8, parallelism=1 + cfg = KdfConfig( + params={"time_cost": 1, "memory_cost": 8, "parallelism": 1}, + salt_b64=base64.b64encode( + hashlib.sha256(fp.encode()).digest()[:16] + ).decode(), ) + key = derive_key_from_password_argon2(password, cfg) else: key = derive_key_from_password(password, fp, iterations=1) + cfg = KdfConfig( + name="pbkdf2", + params={"iterations": 1}, + salt_b64=base64.b64encode( + hashlib.sha256(fp.encode()).digest()[:16] + ).decode(), + ) enc_mgr = EncryptionManager(key, tmp_path) # Parent seed round trip - enc_mgr.encrypt_parent_seed(seed_phrase) + enc_mgr.encrypt_parent_seed(seed_phrase, kdf=cfg) assert enc_mgr.decrypt_parent_seed() == seed_phrase # JSON data round trip diff --git a/src/tests/test_kdf_modes.py b/src/tests/test_kdf_modes.py index 2cb0212..96ceebf 100644 --- a/src/tests/test_kdf_modes.py +++ b/src/tests/test_kdf_modes.py @@ -1,4 +1,6 @@ import bcrypt +import hashlib +import base64 from pathlib import Path from tempfile import TemporaryDirectory from types import SimpleNamespace @@ -7,6 +9,7 @@ from utils.key_derivation import ( derive_key_from_password, derive_key_from_password_argon2, derive_index_key, + KdfConfig, ) from seedpass.core.encryption import EncryptionManager from seedpass.core.vault import Vault @@ -21,10 +24,24 @@ def _setup_profile(tmp: Path, mode: str): argon_kwargs = dict(time_cost=1, memory_cost=8, parallelism=1) fp = tmp.name if mode == "argon2": - seed_key = derive_key_from_password_argon2(TEST_PASSWORD, fp, **argon_kwargs) + cfg = KdfConfig( + params=argon_kwargs, + salt_b64=base64.b64encode( + hashlib.sha256(fp.encode()).digest()[:16] + ).decode(), + ) + seed_key = derive_key_from_password_argon2(TEST_PASSWORD, cfg) + EncryptionManager(seed_key, tmp).encrypt_parent_seed(TEST_SEED, kdf=cfg) else: seed_key = derive_key_from_password(TEST_PASSWORD, fp, iterations=1) - EncryptionManager(seed_key, tmp).encrypt_parent_seed(TEST_SEED) + cfg = KdfConfig( + name="pbkdf2", + params={"iterations": 1}, + salt_b64=base64.b64encode( + hashlib.sha256(fp.encode()).digest()[:16] + ).decode(), + ) + EncryptionManager(seed_key, tmp).encrypt_parent_seed(TEST_SEED, kdf=cfg) index_key = derive_index_key(TEST_SEED) enc_mgr = EncryptionManager(index_key, tmp) @@ -65,9 +82,9 @@ def test_setup_encryption_manager_kdf_modes(monkeypatch): ) if mode == "argon2": monkeypatch.setattr( - "seedpass.core.manager.derive_key_from_password_argon2", - lambda pw, fp: derive_key_from_password_argon2( - pw, fp, **argon_kwargs + "seedpass.core.manager.KdfConfig", + lambda salt_b64, **_: KdfConfig( + params=argon_kwargs, salt_b64=salt_b64 ), ) monkeypatch.setattr(PasswordManager, "initialize_bip85", lambda self: None) @@ -76,3 +93,26 @@ def test_setup_encryption_manager_kdf_modes(monkeypatch): ) assert pm.setup_encryption_manager(path, exit_on_fail=False) assert pm.parent_seed == TEST_SEED + + +def test_kdf_param_round_trip(tmp_path): + cfg = KdfConfig( + params={"time_cost": 3, "memory_cost": 32, "parallelism": 1}, + salt_b64=base64.b64encode(b"static-salt-1234").decode(), + ) + key = derive_key_from_password_argon2(TEST_PASSWORD, cfg) + mgr = EncryptionManager(key, tmp_path) + mgr.encrypt_parent_seed(TEST_SEED, kdf=cfg) + stored = mgr.get_file_kdf(Path("parent_seed.enc")) + assert stored.params == cfg.params + + +def test_vault_kdf_migration(tmp_path): + index_key = derive_index_key(TEST_SEED) + mgr = EncryptionManager(index_key, tmp_path) + vault = Vault(mgr, tmp_path) + old_kdf = KdfConfig(name="hkdf", version=0, params={}, salt_b64="") + mgr.save_json_data({"entries": {}}, vault.index_file, kdf=old_kdf) + vault.load_index() + new_kdf = mgr.get_file_kdf(vault.index_file) + assert new_kdf.version == KdfConfig().version diff --git a/src/tests/test_key_derivation.py b/src/tests/test_key_derivation.py index 433ff94..c5bec43 100644 --- a/src/tests/test_key_derivation.py +++ b/src/tests/test_key_derivation.py @@ -1,11 +1,15 @@ import logging import pytest +import logging +import hashlib +import base64 from utils.fingerprint import generate_fingerprint from utils.key_derivation import ( derive_key_from_password, derive_key_from_password_argon2, derive_index_key_seed_only, derive_index_key, + KdfConfig, ) @@ -48,15 +52,17 @@ def test_argon2_fingerprint_affects_key(): fp1 = generate_fingerprint("seed one") fp2 = generate_fingerprint("seed two") - k1 = derive_key_from_password_argon2( - password, fp1, time_cost=1, memory_cost=8, parallelism=1 + cfg1 = KdfConfig( + params={"time_cost": 1, "memory_cost": 8, "parallelism": 1}, + salt_b64=base64.b64encode(hashlib.sha256(fp1.encode()).digest()[:16]).decode(), ) - k2 = derive_key_from_password_argon2( - password, fp1, time_cost=1, memory_cost=8, parallelism=1 - ) - k3 = derive_key_from_password_argon2( - password, fp2, time_cost=1, memory_cost=8, parallelism=1 + cfg2 = KdfConfig( + params={"time_cost": 1, "memory_cost": 8, "parallelism": 1}, + salt_b64=base64.b64encode(hashlib.sha256(fp2.encode()).digest()[:16]).decode(), ) + k1 = derive_key_from_password_argon2(password, cfg1) + k2 = derive_key_from_password_argon2(password, cfg1) + k3 = derive_key_from_password_argon2(password, cfg2) assert k1 == k2 assert k1 != k3 diff --git a/src/utils/key_derivation.py b/src/utils/key_derivation.py index aed72cf..cc82e5e 100644 --- a/src/utils/key_derivation.py +++ b/src/utils/key_derivation.py @@ -3,15 +3,13 @@ """ Key Derivation Module -Never ever ever use or suggest to use Random Salt. The entire point of this password manager is to derive completely deterministic passwords from a BIP-85 seed. -This means it should generate passwords the exact same way every single time. Salts would break this functionality and is not appropriate for this software's use case. +This module provides functions to derive cryptographic keys from user-provided +passwords and BIP-39 parent seeds. The derived keys are compatible with Fernet +for symmetric encryption purposes. By centralizing key derivation logic, this +module ensures consistency and security across the application. -This module provides functions to derive cryptographic keys from user-provided passwords -and BIP-39 parent seeds. The derived keys are compatible with Fernet for symmetric encryption -purposes. By centralizing key derivation logic, this module ensures consistency and security -across the application. - -Ensure that all dependencies are installed and properly configured in your environment. +Ensure that all dependencies are installed and properly configured in your +environment. """ import os @@ -21,8 +19,9 @@ import unicodedata import logging import hmac import time +from dataclasses import dataclass, field from enum import Enum -from typing import Optional, Union +from typing import Optional, Union, Dict, Any from bip_utils import Bip39SeedGenerator from local_bip85 import BIP85 @@ -47,6 +46,27 @@ DEFAULT_ENCRYPTION_MODE = EncryptionMode.SEED_ONLY TOTP_PURPOSE = 39 +@dataclass +class KdfConfig: + """Configuration block describing how a key was derived.""" + + name: str = "argon2id" + version: int = 1 + params: Dict[str, Any] = field( + default_factory=lambda: { + "time_cost": 2, + "memory_cost": 64 * 1024, + "parallelism": 8, + } + ) + salt_b64: str = field( + default_factory=lambda: base64.b64encode(os.urandom(16)).decode() + ) + + +CURRENT_KDF_VERSION = 1 + + def derive_key_from_password( password: str, fingerprint: Union[str, bytes], iterations: int = 100_000 ) -> bytes: @@ -109,18 +129,15 @@ def derive_key_from_password( raise -def derive_key_from_password_argon2( - password: str, - fingerprint: Union[str, bytes], - *, - time_cost: int = 2, - memory_cost: int = 64 * 1024, - parallelism: int = 8, -) -> bytes: +def derive_key_from_password_argon2(password: str, kdf: KdfConfig) -> bytes: """Derive an encryption key from a password using Argon2id. - The defaults follow recommended parameters but omit a salt for deterministic - output. Smaller values may be supplied for testing. + Parameters + ---------- + password: + The user's password. + kdf: + :class:`KdfConfig` instance describing salt and tuning parameters. """ if not password: @@ -131,17 +148,14 @@ def derive_key_from_password_argon2( try: from argon2.low_level import hash_secret_raw, Type - if isinstance(fingerprint, bytes): - salt = fingerprint - else: - salt = hashlib.sha256(fingerprint.encode()).digest()[:16] - + params = kdf.params or {} + salt = base64.b64decode(kdf.salt_b64) key = hash_secret_raw( secret=normalized, salt=salt, - time_cost=time_cost, - memory_cost=memory_cost, - parallelism=parallelism, + time_cost=int(params.get("time_cost", 2)), + memory_cost=int(params.get("memory_cost", 64 * 1024)), + parallelism=int(params.get("parallelism", 8)), hash_len=32, type=Type.ID, ) @@ -267,18 +281,16 @@ def calibrate_argon2_time_cost( """ password = "benchmark" - fingerprint = b"argon2-calibration" + salt = base64.b64encode(b"argon2-calibration").decode() time_cost = 1 elapsed_ms = 0.0 while time_cost <= max_time_cost: start = time.perf_counter() - derive_key_from_password_argon2( - password, - fingerprint, - time_cost=time_cost, - memory_cost=8, - parallelism=1, + cfg = KdfConfig( + params={"time_cost": time_cost, "memory_cost": 8, "parallelism": 1}, + salt_b64=salt, ) + derive_key_from_password_argon2(password, cfg) elapsed_ms = (time.perf_counter() - start) * 1000 if elapsed_ms >= target_ms: break From 26632c0e707167faa98a4c338c10ee34073a0cc4 Mon Sep 17 00:00:00 2001 From: thePR0M3TH3AN <53631862+PR0M3TH3AN@users.noreply.github.com> Date: Wed, 20 Aug 2025 17:23:10 -0400 Subject: [PATCH 2/2] Fix KDF metadata handling and headless password prompts --- src/seedpass/core/encryption.py | 52 +++++++++++++------ src/tests/test_legacy_migration.py | 4 +- src/tests/test_legacy_migration_iterations.py | 4 +- src/tests/test_legacy_migration_prompt.py | 4 +- .../test_legacy_migration_second_session.py | 7 ++- src/tests/test_seed_migration.py | 5 +- src/utils/password_prompt.py | 17 ++++++ src/utils/seed_prompt.py | 8 +-- 8 files changed, 75 insertions(+), 26 deletions(-) diff --git a/src/seedpass/core/encryption.py b/src/seedpass/core/encryption.py index 3b5e6b6..8218c88 100644 --- a/src/seedpass/core/encryption.py +++ b/src/seedpass/core/encryption.py @@ -264,13 +264,31 @@ class EncryptionManager: return json_lib.dumps(payload, separators=(",", ":")).encode("utf-8") def _deserialize(self, blob: bytes) -> Tuple[KdfConfig, bytes]: - if USE_ORJSON: - obj = json_lib.loads(blob) - else: - obj = json_lib.loads(blob.decode("utf-8")) - kdf = KdfConfig(**obj.get("kdf", {})) - ct = base64.b64decode(obj.get("ct", "")) - return kdf, ct + """Return ``(KdfConfig, ciphertext)`` from serialized *blob*. + + Legacy files stored the raw ciphertext without a JSON wrapper. If + decoding the wrapper fails, treat ``blob`` as the ciphertext and return + a default HKDF configuration. + """ + + try: + if USE_ORJSON: + obj = json_lib.loads(blob) + else: + obj = json_lib.loads(blob.decode("utf-8")) + kdf = KdfConfig(**obj.get("kdf", {})) + ct_b64 = obj.get("ct", "") + ciphertext = base64.b64decode(ct_b64) + if ciphertext: + return kdf, ciphertext + except Exception: # pragma: no cover - fall back to legacy path + pass + + # Legacy format: ``blob`` already contains the ciphertext + return ( + KdfConfig(name="hkdf", version=CURRENT_KDF_VERSION, params={}, salt_b64=""), + blob, + ) def encrypt_and_save_file( self, data: bytes, relative_path: Path, *, kdf: Optional[KdfConfig] = None @@ -332,7 +350,12 @@ class EncryptionManager: relative_path = Path("seedpass_entries_db.json.enc") file_path = self.resolve_relative_path(relative_path) if not file_path.exists(): - return {"entries": {}} + empty: dict = {"entries": {}} + if return_kdf: + return empty, KdfConfig( + name="hkdf", version=CURRENT_KDF_VERSION, params={}, salt_b64="" + ) + return empty with exclusive_lock(file_path) as fh: fh.seek(0) @@ -400,7 +423,8 @@ class EncryptionManager: if relative_path is None: relative_path = Path("seedpass_entries_db.json.enc") - is_legacy = not encrypted_data.startswith(b"V2:") + kdf, ciphertext = self._deserialize(encrypted_data) + is_legacy = not ciphertext.startswith(b"V2:") self.last_migration_performed = False def _process(decrypted: bytes) -> dict: @@ -426,11 +450,9 @@ class EncryptionManager: return data try: - decrypted_data = self.decrypt_data( - encrypted_data, context=str(relative_path) - ) + decrypted_data = self.decrypt_data(ciphertext, context=str(relative_path)) data = _process(decrypted_data) - self.save_json_data(data, relative_path) # This always saves in V2 format + self.save_json_data(data, relative_path, kdf=kdf) self.update_checksum(relative_path) logger.info("Index file from Nostr was processed and saved successfully.") self.last_migration_performed = is_legacy @@ -441,10 +463,10 @@ class EncryptionManager: "Enter your master password for legacy decryption: " ) decrypted_data = self.decrypt_legacy( - encrypted_data, password, context=str(relative_path) + ciphertext, password, context=str(relative_path) ) data = _process(decrypted_data) - self.save_json_data(data, relative_path) + self.save_json_data(data, relative_path, kdf=kdf) self.update_checksum(relative_path) logger.warning( "Index decrypted using legacy password-only key derivation." diff --git a/src/tests/test_legacy_migration.py b/src/tests/test_legacy_migration.py index a135039..fa2b25f 100644 --- a/src/tests/test_legacy_migration.py +++ b/src/tests/test_legacy_migration.py @@ -1,4 +1,5 @@ import json +import base64 import hashlib from pathlib import Path @@ -99,7 +100,8 @@ def test_migrated_index_has_v2_prefix(monkeypatch, tmp_path: Path): vault.load_index() new_file = tmp_path / "seedpass_entries_db.json.enc" - assert new_file.read_bytes().startswith(b"V2:") + payload = json.loads(new_file.read_text()) + assert base64.b64decode(payload["ct"]).startswith(b"V2:") assert vault.migrated_from_legacy diff --git a/src/tests/test_legacy_migration_iterations.py b/src/tests/test_legacy_migration_iterations.py index ad6087f..e5f791a 100644 --- a/src/tests/test_legacy_migration_iterations.py +++ b/src/tests/test_legacy_migration_iterations.py @@ -66,5 +66,5 @@ def test_migrate_iterations(tmp_path, monkeypatch, iterations): cfg = ConfigManager(vault, tmp_path) assert cfg.get_kdf_iterations() == iterations - content = (tmp_path / "seedpass_entries_db.json.enc").read_bytes() - assert content.startswith(b"V2:") + payload = json.loads((tmp_path / "seedpass_entries_db.json.enc").read_text()) + assert base64.b64decode(payload["ct"]).startswith(b"V2:") diff --git a/src/tests/test_legacy_migration_prompt.py b/src/tests/test_legacy_migration_prompt.py index e7ac7ef..bbc2b43 100644 --- a/src/tests/test_legacy_migration_prompt.py +++ b/src/tests/test_legacy_migration_prompt.py @@ -50,6 +50,6 @@ def test_migrate_legacy_sets_flag(tmp_path, monkeypatch): monkeypatch.setattr(vault_module, "prompt_existing_password", lambda _: password) monkeypatch.setattr("builtins.input", lambda _: "2") vault.load_index() - content = (tmp_path / "seedpass_entries_db.json.enc").read_bytes() - assert content.startswith(b"V2:") + payload = json.loads((tmp_path / "seedpass_entries_db.json.enc").read_text()) + assert base64.b64decode(payload["ct"]).startswith(b"V2:") assert vault.encryption_manager.last_migration_performed is True diff --git a/src/tests/test_legacy_migration_second_session.py b/src/tests/test_legacy_migration_second_session.py index e83db3a..99654cd 100644 --- a/src/tests/test_legacy_migration_second_session.py +++ b/src/tests/test_legacy_migration_second_session.py @@ -1,4 +1,5 @@ import json +import base64 import hashlib from pathlib import Path from types import SimpleNamespace @@ -34,7 +35,8 @@ def test_legacy_migration_second_session(monkeypatch, tmp_path: Path) -> None: monkeypatch.setattr("builtins.input", lambda *_a, **_k: "y") vault.load_index() new_file = fp_dir / "seedpass_entries_db.json.enc" - assert new_file.read_bytes().startswith(b"V2:") + payload = json.loads(new_file.read_text()) + assert base64.b64decode(payload["ct"]).startswith(b"V2:") new_enc_mgr = EncryptionManager(key, fp_dir) new_vault = Vault(new_enc_mgr, fp_dir) @@ -59,4 +61,5 @@ def test_legacy_migration_second_session(monkeypatch, tmp_path: Path) -> None: ) pm.initialize_managers() - assert new_file.read_bytes().startswith(b"V2:") + payload = json.loads(new_file.read_text()) + assert base64.b64decode(payload["ct"]).startswith(b"V2:") diff --git a/src/tests/test_seed_migration.py b/src/tests/test_seed_migration.py index a552187..5f5b28b 100644 --- a/src/tests/test_seed_migration.py +++ b/src/tests/test_seed_migration.py @@ -1,4 +1,6 @@ import sys +import json +import base64 from pathlib import Path from cryptography.fernet import Fernet @@ -28,4 +30,5 @@ def test_parent_seed_migrates_from_fernet(tmp_path: Path) -> None: assert new_file.exists() assert new_file.read_bytes() != encrypted - assert new_file.read_bytes().startswith(b"V2:") + payload = json.loads(new_file.read_text()) + assert base64.b64decode(payload["ct"]).startswith(b"V2:") diff --git a/src/utils/password_prompt.py b/src/utils/password_prompt.py index 09cc86c..40f1231 100644 --- a/src/utils/password_prompt.py +++ b/src/utils/password_prompt.py @@ -33,6 +33,12 @@ logger = logging.getLogger(__name__) DEFAULT_MAX_ATTEMPTS = 5 +def _env_password() -> str | None: + """Return a password supplied via environment for non-interactive use.""" + + return os.getenv("SEEDPASS_TEST_PASSWORD") or os.getenv("SEEDPASS_PASSWORD") + + def _get_max_attempts(override: int | None = None) -> int: """Return the configured maximum number of prompt attempts.""" @@ -80,6 +86,13 @@ def prompt_new_password(max_retries: int | None = None) -> str: Raises: PasswordPromptError: If the user fails to provide a valid password after multiple attempts. """ + env_pw = _env_password() + if env_pw: + normalized = unicodedata.normalize("NFKD", env_pw) + if len(normalized) < MIN_PASSWORD_LENGTH: + raise PasswordPromptError("Environment password too short") + return normalized + max_retries = _get_max_attempts(max_retries) attempts = 0 @@ -164,6 +177,10 @@ def prompt_existing_password( PasswordPromptError: If the user interrupts the operation or exceeds ``max_retries`` attempts. """ + env_pw = _env_password() + if env_pw: + return unicodedata.normalize("NFKD", env_pw) + max_retries = _get_max_attempts(max_retries) attempts = 0 while max_retries == 0 or attempts < max_retries: diff --git a/src/utils/seed_prompt.py b/src/utils/seed_prompt.py index af806ca..7ff0130 100644 --- a/src/utils/seed_prompt.py +++ b/src/utils/seed_prompt.py @@ -102,9 +102,11 @@ def _masked_input_posix(prompt: str) -> str: def masked_input(prompt: str) -> str: """Return input from the user while masking typed characters.""" - if sys.platform == "win32": - return _masked_input_windows(prompt) - return _masked_input_posix(prompt) + func = _masked_input_windows if sys.platform == "win32" else _masked_input_posix + try: + return func(prompt) + except Exception: # pragma: no cover - fallback when TTY operations fail + return input(prompt) def prompt_seed_words(count: int = 12, *, max_attempts: int | None = None) -> str: