mirror of
https://github.com/PR0M3TH3AN/SeedPass.git
synced 2025-09-08 23:38:49 +00:00
test: cover legacy migration error handling
This commit is contained in:
@@ -38,6 +38,17 @@ def _derive_legacy_key_from_password(password: str, iterations: int = 100_000) -
|
|||||||
return base64.urlsafe_b64encode(key)
|
return base64.urlsafe_b64encode(key)
|
||||||
|
|
||||||
|
|
||||||
|
class LegacyFormatRequiresMigrationError(Exception):
|
||||||
|
"""Raised when legacy-encrypted data needs user-guided migration."""
|
||||||
|
|
||||||
|
def __init__(self, context: Optional[str] = None) -> None:
|
||||||
|
msg = (
|
||||||
|
f"Legacy data detected for {context}" if context else "Legacy data detected"
|
||||||
|
)
|
||||||
|
super().__init__(msg)
|
||||||
|
self.context = context
|
||||||
|
|
||||||
|
|
||||||
class EncryptionManager:
|
class EncryptionManager:
|
||||||
"""
|
"""
|
||||||
Manages encryption and decryption, handling migration from legacy Fernet
|
Manages encryption and decryption, handling migration from legacy Fernet
|
||||||
@@ -153,63 +164,46 @@ class EncryptionManager:
|
|||||||
if not self._legacy_migrate_flag:
|
if not self._legacy_migrate_flag:
|
||||||
raise
|
raise
|
||||||
logger.debug(f"Could not decrypt data{ctx}: {e}")
|
logger.debug(f"Could not decrypt data{ctx}: {e}")
|
||||||
print(
|
raise LegacyFormatRequiresMigrationError(context)
|
||||||
colored(
|
|
||||||
f"Failed to decrypt{ctx} with current key. This may be a legacy index.",
|
|
||||||
"red",
|
|
||||||
)
|
|
||||||
)
|
|
||||||
resp = input(
|
|
||||||
"\nChoose an option:\n"
|
|
||||||
"1. Open legacy index without migrating\n"
|
|
||||||
"2. Migrate to new format.\n"
|
|
||||||
"Selection [1/2]: "
|
|
||||||
).strip()
|
|
||||||
if resp == "1":
|
|
||||||
self._legacy_migrate_flag = False
|
|
||||||
self.last_migration_performed = False
|
|
||||||
elif resp == "2":
|
|
||||||
self._legacy_migrate_flag = True
|
|
||||||
self.last_migration_performed = True
|
|
||||||
else:
|
|
||||||
raise InvalidToken(
|
|
||||||
"User declined legacy decryption or provided invalid choice."
|
|
||||||
) from e
|
|
||||||
password = prompt_existing_password(
|
|
||||||
"Enter your master password for legacy decryption: "
|
|
||||||
)
|
|
||||||
last_exc: Optional[Exception] = None
|
|
||||||
for iter_count in [50_000, 100_000]:
|
|
||||||
try:
|
|
||||||
legacy_key = _derive_legacy_key_from_password(
|
|
||||||
password, iterations=iter_count
|
|
||||||
)
|
|
||||||
legacy_mgr = EncryptionManager(legacy_key, self.fingerprint_dir)
|
|
||||||
legacy_mgr._legacy_migrate_flag = False
|
|
||||||
result = legacy_mgr.decrypt_data(encrypted_data, context=context)
|
|
||||||
try: # record iteration count for future runs
|
|
||||||
from .vault import Vault
|
|
||||||
from .config_manager import ConfigManager
|
|
||||||
|
|
||||||
cfg_mgr = ConfigManager(
|
def decrypt_legacy(
|
||||||
Vault(self, self.fingerprint_dir), self.fingerprint_dir
|
self, encrypted_data: bytes, password: str, context: Optional[str] = None
|
||||||
)
|
) -> bytes:
|
||||||
cfg_mgr.set_kdf_iterations(iter_count)
|
"""Decrypt ``encrypted_data`` using legacy password-only key derivation."""
|
||||||
except Exception: # pragma: no cover - best effort
|
|
||||||
logger.error(
|
ctx = f" {context}" if context else ""
|
||||||
"Failed to record PBKDF2 iteration count in config",
|
last_exc: Optional[Exception] = None
|
||||||
exc_info=True,
|
for iter_count in [50_000, 100_000]:
|
||||||
)
|
try:
|
||||||
logger.warning(
|
legacy_key = _derive_legacy_key_from_password(
|
||||||
"Data decrypted using legacy password-only key derivation."
|
password, iterations=iter_count
|
||||||
|
)
|
||||||
|
legacy_mgr = EncryptionManager(legacy_key, self.fingerprint_dir)
|
||||||
|
legacy_mgr._legacy_migrate_flag = False
|
||||||
|
result = legacy_mgr.decrypt_data(encrypted_data, context=context)
|
||||||
|
try: # record iteration count for future runs
|
||||||
|
from .vault import Vault
|
||||||
|
from .config_manager import ConfigManager
|
||||||
|
|
||||||
|
cfg_mgr = ConfigManager(
|
||||||
|
Vault(self, self.fingerprint_dir), self.fingerprint_dir
|
||||||
)
|
)
|
||||||
return result
|
cfg_mgr.set_kdf_iterations(iter_count)
|
||||||
except Exception as e2: # pragma: no cover - try next iteration
|
except Exception: # pragma: no cover - best effort
|
||||||
last_exc = e2
|
logger.error(
|
||||||
logger.error(f"Failed legacy decryption attempt: {last_exc}", exc_info=True)
|
"Failed to record PBKDF2 iteration count in config",
|
||||||
raise InvalidToken(
|
exc_info=True,
|
||||||
f"Could not decrypt{ctx} with any available method."
|
)
|
||||||
) from e
|
logger.warning(
|
||||||
|
"Data decrypted using legacy password-only key derivation."
|
||||||
|
)
|
||||||
|
return result
|
||||||
|
except Exception as e2: # pragma: no cover - try next iteration
|
||||||
|
last_exc = e2
|
||||||
|
logger.error(f"Failed legacy decryption attempt: {last_exc}", exc_info=True)
|
||||||
|
raise InvalidToken(
|
||||||
|
f"Could not decrypt{ctx} with any available method."
|
||||||
|
) from last_exc
|
||||||
|
|
||||||
# --- All functions below this point now use the smart `decrypt_data` method ---
|
# --- All functions below this point now use the smart `decrypt_data` method ---
|
||||||
|
|
||||||
|
@@ -6,8 +6,15 @@ from os import PathLike
|
|||||||
import shutil
|
import shutil
|
||||||
|
|
||||||
from termcolor import colored
|
from termcolor import colored
|
||||||
|
from cryptography.fernet import InvalidToken
|
||||||
|
|
||||||
from .encryption import EncryptionManager
|
from .encryption import (
|
||||||
|
EncryptionManager,
|
||||||
|
LegacyFormatRequiresMigrationError,
|
||||||
|
USE_ORJSON,
|
||||||
|
json_lib,
|
||||||
|
)
|
||||||
|
from utils.password_prompt import prompt_existing_password
|
||||||
|
|
||||||
|
|
||||||
class Vault:
|
class Vault:
|
||||||
@@ -99,6 +106,47 @@ class Vault:
|
|||||||
migration_performed = getattr(
|
migration_performed = getattr(
|
||||||
self.encryption_manager, "last_migration_performed", False
|
self.encryption_manager, "last_migration_performed", False
|
||||||
)
|
)
|
||||||
|
except LegacyFormatRequiresMigrationError:
|
||||||
|
print(
|
||||||
|
colored(
|
||||||
|
"Failed to decrypt index with current key. This may be a legacy index.",
|
||||||
|
"red",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
resp = input(
|
||||||
|
"\nChoose an option:\n"
|
||||||
|
"1. Open legacy index without migrating\n"
|
||||||
|
"2. Migrate to new format.\n"
|
||||||
|
"Selection [1/2]: "
|
||||||
|
).strip()
|
||||||
|
if resp == "1":
|
||||||
|
self.encryption_manager._legacy_migrate_flag = False
|
||||||
|
self.encryption_manager.last_migration_performed = False
|
||||||
|
elif resp == "2":
|
||||||
|
self.encryption_manager._legacy_migrate_flag = True
|
||||||
|
self.encryption_manager.last_migration_performed = True
|
||||||
|
else:
|
||||||
|
raise InvalidToken(
|
||||||
|
"User declined legacy decryption or provided invalid choice."
|
||||||
|
)
|
||||||
|
password = prompt_existing_password(
|
||||||
|
"Enter your master password for legacy decryption: "
|
||||||
|
)
|
||||||
|
with self.index_file.open("rb") as fh:
|
||||||
|
encrypted_data = fh.read()
|
||||||
|
decrypted = self.encryption_manager.decrypt_legacy(
|
||||||
|
encrypted_data, password, context=str(self.index_file)
|
||||||
|
)
|
||||||
|
if USE_ORJSON:
|
||||||
|
data = json_lib.loads(decrypted)
|
||||||
|
else:
|
||||||
|
data = json_lib.loads(decrypted.decode("utf-8"))
|
||||||
|
if self.encryption_manager._legacy_migrate_flag:
|
||||||
|
self.encryption_manager.save_json_data(data, self.index_file)
|
||||||
|
self.encryption_manager.update_checksum(self.index_file)
|
||||||
|
migration_performed = getattr(
|
||||||
|
self.encryption_manager, "last_migration_performed", False
|
||||||
|
)
|
||||||
except Exception as exc: # noqa: BLE001 - surface clear error and restore
|
except Exception as exc: # noqa: BLE001 - surface clear error and restore
|
||||||
if legacy_detected and backup_dir is not None:
|
if legacy_detected and backup_dir is not None:
|
||||||
backup_file = backup_dir / legacy_file.name
|
backup_file = backup_dir / legacy_file.name
|
||||||
|
43
src/tests/test_legacy_format_exception.py
Normal file
43
src/tests/test_legacy_format_exception.py
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from cryptography.fernet import Fernet
|
||||||
|
|
||||||
|
from seedpass.core.encryption import (
|
||||||
|
EncryptionManager,
|
||||||
|
LegacyFormatRequiresMigrationError,
|
||||||
|
_derive_legacy_key_from_password,
|
||||||
|
)
|
||||||
|
from seedpass.core.vault import Vault
|
||||||
|
|
||||||
|
|
||||||
|
def test_decrypt_data_raises_legacy_exception(tmp_path: Path) -> None:
|
||||||
|
key = Fernet.generate_key()
|
||||||
|
mgr = EncryptionManager(key, tmp_path)
|
||||||
|
with pytest.raises(LegacyFormatRequiresMigrationError):
|
||||||
|
mgr.decrypt_data(b"not a valid token")
|
||||||
|
|
||||||
|
|
||||||
|
def test_vault_handles_legacy_exception(tmp_path: Path, monkeypatch) -> None:
|
||||||
|
password = "secret"
|
||||||
|
legacy_key = _derive_legacy_key_from_password(password)
|
||||||
|
legacy_mgr = EncryptionManager(legacy_key, tmp_path)
|
||||||
|
payload = json.dumps(
|
||||||
|
{"schema_version": 3, "entries": {"1": {"kind": "password", "password": "x"}}}
|
||||||
|
).encode("utf-8")
|
||||||
|
legacy_bytes = legacy_mgr.fernet.encrypt(payload)
|
||||||
|
index_file = tmp_path / Vault.INDEX_FILENAME
|
||||||
|
index_file.write_bytes(legacy_bytes)
|
||||||
|
|
||||||
|
new_mgr = EncryptionManager(Fernet.generate_key(), tmp_path)
|
||||||
|
vault = Vault(new_mgr, tmp_path)
|
||||||
|
|
||||||
|
monkeypatch.setattr("builtins.input", lambda *args, **kwargs: "1")
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"seedpass.core.vault.prompt_existing_password", lambda *args, **kwargs: password
|
||||||
|
)
|
||||||
|
|
||||||
|
data, migrated, last = vault.load_index(return_migration_flags=True)
|
||||||
|
assert "1" in data["entries"]
|
||||||
|
assert last is False
|
Reference in New Issue
Block a user