diff --git a/src/seedpass/core/encryption.py b/src/seedpass/core/encryption.py index e90ac11..d315e75 100644 --- a/src/seedpass/core/encryption.py +++ b/src/seedpass/core/encryption.py @@ -38,6 +38,17 @@ def _derive_legacy_key_from_password(password: str, iterations: int = 100_000) - return base64.urlsafe_b64encode(key) +class LegacyFormatRequiresMigrationError(Exception): + """Raised when legacy-encrypted data needs user-guided migration.""" + + def __init__(self, context: Optional[str] = None) -> None: + msg = ( + f"Legacy data detected for {context}" if context else "Legacy data detected" + ) + super().__init__(msg) + self.context = context + + class EncryptionManager: """ Manages encryption and decryption, handling migration from legacy Fernet @@ -153,63 +164,46 @@ class EncryptionManager: if not self._legacy_migrate_flag: raise logger.debug(f"Could not decrypt data{ctx}: {e}") - print( - colored( - f"Failed to decrypt{ctx} with current key. This may be a legacy index.", - "red", - ) - ) - resp = input( - "\nChoose an option:\n" - "1. Open legacy index without migrating\n" - "2. Migrate to new format.\n" - "Selection [1/2]: " - ).strip() - if resp == "1": - self._legacy_migrate_flag = False - self.last_migration_performed = False - elif resp == "2": - self._legacy_migrate_flag = True - self.last_migration_performed = True - else: - raise InvalidToken( - "User declined legacy decryption or provided invalid choice." - ) from e - password = prompt_existing_password( - "Enter your master password for legacy decryption: " - ) - last_exc: Optional[Exception] = None - for iter_count in [50_000, 100_000]: - try: - legacy_key = _derive_legacy_key_from_password( - password, iterations=iter_count - ) - legacy_mgr = EncryptionManager(legacy_key, self.fingerprint_dir) - legacy_mgr._legacy_migrate_flag = False - result = legacy_mgr.decrypt_data(encrypted_data, context=context) - try: # record iteration count for future runs - from .vault import Vault - from .config_manager import ConfigManager + raise LegacyFormatRequiresMigrationError(context) - cfg_mgr = ConfigManager( - Vault(self, self.fingerprint_dir), self.fingerprint_dir - ) - cfg_mgr.set_kdf_iterations(iter_count) - except Exception: # pragma: no cover - best effort - logger.error( - "Failed to record PBKDF2 iteration count in config", - exc_info=True, - ) - logger.warning( - "Data decrypted using legacy password-only key derivation." + def decrypt_legacy( + self, encrypted_data: bytes, password: str, context: Optional[str] = None + ) -> bytes: + """Decrypt ``encrypted_data`` using legacy password-only key derivation.""" + + ctx = f" {context}" if context else "" + last_exc: Optional[Exception] = None + for iter_count in [50_000, 100_000]: + try: + legacy_key = _derive_legacy_key_from_password( + password, iterations=iter_count + ) + legacy_mgr = EncryptionManager(legacy_key, self.fingerprint_dir) + legacy_mgr._legacy_migrate_flag = False + result = legacy_mgr.decrypt_data(encrypted_data, context=context) + try: # record iteration count for future runs + from .vault import Vault + from .config_manager import ConfigManager + + cfg_mgr = ConfigManager( + Vault(self, self.fingerprint_dir), self.fingerprint_dir ) - return result - except Exception as e2: # pragma: no cover - try next iteration - last_exc = e2 - logger.error(f"Failed legacy decryption attempt: {last_exc}", exc_info=True) - raise InvalidToken( - f"Could not decrypt{ctx} with any available method." - ) from e + cfg_mgr.set_kdf_iterations(iter_count) + except Exception: # pragma: no cover - best effort + logger.error( + "Failed to record PBKDF2 iteration count in config", + exc_info=True, + ) + logger.warning( + "Data decrypted using legacy password-only key derivation." + ) + return result + except Exception as e2: # pragma: no cover - try next iteration + last_exc = e2 + logger.error(f"Failed legacy decryption attempt: {last_exc}", exc_info=True) + raise InvalidToken( + f"Could not decrypt{ctx} with any available method." + ) from last_exc # --- All functions below this point now use the smart `decrypt_data` method --- diff --git a/src/seedpass/core/vault.py b/src/seedpass/core/vault.py index f146220..f13d1ce 100644 --- a/src/seedpass/core/vault.py +++ b/src/seedpass/core/vault.py @@ -6,8 +6,15 @@ from os import PathLike import shutil from termcolor import colored +from cryptography.fernet import InvalidToken -from .encryption import EncryptionManager +from .encryption import ( + EncryptionManager, + LegacyFormatRequiresMigrationError, + USE_ORJSON, + json_lib, +) +from utils.password_prompt import prompt_existing_password class Vault: @@ -99,6 +106,47 @@ class Vault: migration_performed = getattr( self.encryption_manager, "last_migration_performed", False ) + except LegacyFormatRequiresMigrationError: + print( + colored( + "Failed to decrypt index with current key. This may be a legacy index.", + "red", + ) + ) + resp = input( + "\nChoose an option:\n" + "1. Open legacy index without migrating\n" + "2. Migrate to new format.\n" + "Selection [1/2]: " + ).strip() + if resp == "1": + self.encryption_manager._legacy_migrate_flag = False + self.encryption_manager.last_migration_performed = False + elif resp == "2": + self.encryption_manager._legacy_migrate_flag = True + self.encryption_manager.last_migration_performed = True + else: + raise InvalidToken( + "User declined legacy decryption or provided invalid choice." + ) + password = prompt_existing_password( + "Enter your master password for legacy decryption: " + ) + with self.index_file.open("rb") as fh: + encrypted_data = fh.read() + decrypted = self.encryption_manager.decrypt_legacy( + encrypted_data, password, context=str(self.index_file) + ) + if USE_ORJSON: + data = json_lib.loads(decrypted) + else: + data = json_lib.loads(decrypted.decode("utf-8")) + if self.encryption_manager._legacy_migrate_flag: + self.encryption_manager.save_json_data(data, self.index_file) + self.encryption_manager.update_checksum(self.index_file) + migration_performed = getattr( + self.encryption_manager, "last_migration_performed", False + ) except Exception as exc: # noqa: BLE001 - surface clear error and restore if legacy_detected and backup_dir is not None: backup_file = backup_dir / legacy_file.name diff --git a/src/tests/test_legacy_format_exception.py b/src/tests/test_legacy_format_exception.py new file mode 100644 index 0000000..6ac5507 --- /dev/null +++ b/src/tests/test_legacy_format_exception.py @@ -0,0 +1,43 @@ +import json +from pathlib import Path + +import pytest +from cryptography.fernet import Fernet + +from seedpass.core.encryption import ( + EncryptionManager, + LegacyFormatRequiresMigrationError, + _derive_legacy_key_from_password, +) +from seedpass.core.vault import Vault + + +def test_decrypt_data_raises_legacy_exception(tmp_path: Path) -> None: + key = Fernet.generate_key() + mgr = EncryptionManager(key, tmp_path) + with pytest.raises(LegacyFormatRequiresMigrationError): + mgr.decrypt_data(b"not a valid token") + + +def test_vault_handles_legacy_exception(tmp_path: Path, monkeypatch) -> None: + password = "secret" + legacy_key = _derive_legacy_key_from_password(password) + legacy_mgr = EncryptionManager(legacy_key, tmp_path) + payload = json.dumps( + {"schema_version": 3, "entries": {"1": {"kind": "password", "password": "x"}}} + ).encode("utf-8") + legacy_bytes = legacy_mgr.fernet.encrypt(payload) + index_file = tmp_path / Vault.INDEX_FILENAME + index_file.write_bytes(legacy_bytes) + + new_mgr = EncryptionManager(Fernet.generate_key(), tmp_path) + vault = Vault(new_mgr, tmp_path) + + monkeypatch.setattr("builtins.input", lambda *args, **kwargs: "1") + monkeypatch.setattr( + "seedpass.core.vault.prompt_existing_password", lambda *args, **kwargs: password + ) + + data, migrated, last = vault.load_index(return_migration_flags=True) + assert "1" in data["entries"] + assert last is False