mirror of
https://github.com/PR0M3TH3AN/SeedPass.git
synced 2025-09-05 05:48:42 +00:00
Merge pull request #810 from PR0M3TH3AN/codex/remove-input-calls-from-decrypt_data
feat: raise legacy migration error
This commit is contained in:
@@ -38,6 +38,17 @@ def _derive_legacy_key_from_password(password: str, iterations: int = 100_000) -
|
||||
return base64.urlsafe_b64encode(key)
|
||||
|
||||
|
||||
class LegacyFormatRequiresMigrationError(Exception):
|
||||
"""Raised when legacy-encrypted data needs user-guided migration."""
|
||||
|
||||
def __init__(self, context: Optional[str] = None) -> None:
|
||||
msg = (
|
||||
f"Legacy data detected for {context}" if context else "Legacy data detected"
|
||||
)
|
||||
super().__init__(msg)
|
||||
self.context = context
|
||||
|
||||
|
||||
class EncryptionManager:
|
||||
"""
|
||||
Manages encryption and decryption, handling migration from legacy Fernet
|
||||
@@ -153,63 +164,46 @@ class EncryptionManager:
|
||||
if not self._legacy_migrate_flag:
|
||||
raise
|
||||
logger.debug(f"Could not decrypt data{ctx}: {e}")
|
||||
print(
|
||||
colored(
|
||||
f"Failed to decrypt{ctx} with current key. This may be a legacy index.",
|
||||
"red",
|
||||
)
|
||||
)
|
||||
resp = input(
|
||||
"\nChoose an option:\n"
|
||||
"1. Open legacy index without migrating\n"
|
||||
"2. Migrate to new format.\n"
|
||||
"Selection [1/2]: "
|
||||
).strip()
|
||||
if resp == "1":
|
||||
self._legacy_migrate_flag = False
|
||||
self.last_migration_performed = False
|
||||
elif resp == "2":
|
||||
self._legacy_migrate_flag = True
|
||||
self.last_migration_performed = True
|
||||
else:
|
||||
raise InvalidToken(
|
||||
"User declined legacy decryption or provided invalid choice."
|
||||
) from e
|
||||
password = prompt_existing_password(
|
||||
"Enter your master password for legacy decryption: "
|
||||
)
|
||||
last_exc: Optional[Exception] = None
|
||||
for iter_count in [50_000, 100_000]:
|
||||
try:
|
||||
legacy_key = _derive_legacy_key_from_password(
|
||||
password, iterations=iter_count
|
||||
)
|
||||
legacy_mgr = EncryptionManager(legacy_key, self.fingerprint_dir)
|
||||
legacy_mgr._legacy_migrate_flag = False
|
||||
result = legacy_mgr.decrypt_data(encrypted_data, context=context)
|
||||
try: # record iteration count for future runs
|
||||
from .vault import Vault
|
||||
from .config_manager import ConfigManager
|
||||
raise LegacyFormatRequiresMigrationError(context)
|
||||
|
||||
cfg_mgr = ConfigManager(
|
||||
Vault(self, self.fingerprint_dir), self.fingerprint_dir
|
||||
)
|
||||
cfg_mgr.set_kdf_iterations(iter_count)
|
||||
except Exception: # pragma: no cover - best effort
|
||||
logger.error(
|
||||
"Failed to record PBKDF2 iteration count in config",
|
||||
exc_info=True,
|
||||
)
|
||||
logger.warning(
|
||||
"Data decrypted using legacy password-only key derivation."
|
||||
def decrypt_legacy(
|
||||
self, encrypted_data: bytes, password: str, context: Optional[str] = None
|
||||
) -> bytes:
|
||||
"""Decrypt ``encrypted_data`` using legacy password-only key derivation."""
|
||||
|
||||
ctx = f" {context}" if context else ""
|
||||
last_exc: Optional[Exception] = None
|
||||
for iter_count in [50_000, 100_000]:
|
||||
try:
|
||||
legacy_key = _derive_legacy_key_from_password(
|
||||
password, iterations=iter_count
|
||||
)
|
||||
legacy_mgr = EncryptionManager(legacy_key, self.fingerprint_dir)
|
||||
legacy_mgr._legacy_migrate_flag = False
|
||||
result = legacy_mgr.decrypt_data(encrypted_data, context=context)
|
||||
try: # record iteration count for future runs
|
||||
from .vault import Vault
|
||||
from .config_manager import ConfigManager
|
||||
|
||||
cfg_mgr = ConfigManager(
|
||||
Vault(self, self.fingerprint_dir), self.fingerprint_dir
|
||||
)
|
||||
return result
|
||||
except Exception as e2: # pragma: no cover - try next iteration
|
||||
last_exc = e2
|
||||
logger.error(f"Failed legacy decryption attempt: {last_exc}", exc_info=True)
|
||||
raise InvalidToken(
|
||||
f"Could not decrypt{ctx} with any available method."
|
||||
) from e
|
||||
cfg_mgr.set_kdf_iterations(iter_count)
|
||||
except Exception: # pragma: no cover - best effort
|
||||
logger.error(
|
||||
"Failed to record PBKDF2 iteration count in config",
|
||||
exc_info=True,
|
||||
)
|
||||
logger.warning(
|
||||
"Data decrypted using legacy password-only key derivation."
|
||||
)
|
||||
return result
|
||||
except Exception as e2: # pragma: no cover - try next iteration
|
||||
last_exc = e2
|
||||
logger.error(f"Failed legacy decryption attempt: {last_exc}", exc_info=True)
|
||||
raise InvalidToken(
|
||||
f"Could not decrypt{ctx} with any available method."
|
||||
) from last_exc
|
||||
|
||||
# --- All functions below this point now use the smart `decrypt_data` method ---
|
||||
|
||||
@@ -401,15 +395,13 @@ class EncryptionManager:
|
||||
logger.info("Index file from Nostr was processed and saved successfully.")
|
||||
self.last_migration_performed = is_legacy
|
||||
return True
|
||||
except InvalidToken as e:
|
||||
except (InvalidToken, LegacyFormatRequiresMigrationError):
|
||||
try:
|
||||
password = prompt_existing_password(
|
||||
"Enter your master password for legacy decryption: "
|
||||
)
|
||||
legacy_key = _derive_legacy_key_from_password(password)
|
||||
legacy_mgr = EncryptionManager(legacy_key, self.fingerprint_dir)
|
||||
decrypted_data = legacy_mgr.decrypt_data(
|
||||
encrypted_data, context=str(relative_path)
|
||||
decrypted_data = self.decrypt_legacy(
|
||||
encrypted_data, password, context=str(relative_path)
|
||||
)
|
||||
data = _process(decrypted_data)
|
||||
self.save_json_data(data, relative_path)
|
||||
|
@@ -129,6 +129,7 @@ def import_backup(
|
||||
)
|
||||
key = _derive_export_key(seed)
|
||||
enc_mgr = EncryptionManager(key, vault.fingerprint_dir)
|
||||
enc_mgr._legacy_migrate_flag = False
|
||||
index_bytes = enc_mgr.decrypt_data(payload, context="backup payload")
|
||||
index = json.loads(index_bytes.decode("utf-8"))
|
||||
|
||||
|
@@ -6,8 +6,15 @@ from os import PathLike
|
||||
import shutil
|
||||
|
||||
from termcolor import colored
|
||||
from cryptography.fernet import InvalidToken
|
||||
|
||||
from .encryption import EncryptionManager
|
||||
from .encryption import (
|
||||
EncryptionManager,
|
||||
LegacyFormatRequiresMigrationError,
|
||||
USE_ORJSON,
|
||||
json_lib,
|
||||
)
|
||||
from utils.password_prompt import prompt_existing_password
|
||||
|
||||
|
||||
class Vault:
|
||||
@@ -99,6 +106,47 @@ class Vault:
|
||||
migration_performed = getattr(
|
||||
self.encryption_manager, "last_migration_performed", False
|
||||
)
|
||||
except LegacyFormatRequiresMigrationError:
|
||||
print(
|
||||
colored(
|
||||
"Failed to decrypt index with current key. This may be a legacy index.",
|
||||
"red",
|
||||
)
|
||||
)
|
||||
resp = input(
|
||||
"\nChoose an option:\n"
|
||||
"1. Open legacy index without migrating\n"
|
||||
"2. Migrate to new format.\n"
|
||||
"Selection [1/2]: "
|
||||
).strip()
|
||||
if resp == "1":
|
||||
self.encryption_manager._legacy_migrate_flag = False
|
||||
self.encryption_manager.last_migration_performed = False
|
||||
elif resp == "2":
|
||||
self.encryption_manager._legacy_migrate_flag = True
|
||||
self.encryption_manager.last_migration_performed = True
|
||||
else:
|
||||
raise InvalidToken(
|
||||
"User declined legacy decryption or provided invalid choice."
|
||||
)
|
||||
password = prompt_existing_password(
|
||||
"Enter your master password for legacy decryption: "
|
||||
)
|
||||
with self.index_file.open("rb") as fh:
|
||||
encrypted_data = fh.read()
|
||||
decrypted = self.encryption_manager.decrypt_legacy(
|
||||
encrypted_data, password, context=str(self.index_file)
|
||||
)
|
||||
if USE_ORJSON:
|
||||
data = json_lib.loads(decrypted)
|
||||
else:
|
||||
data = json_lib.loads(decrypted.decode("utf-8"))
|
||||
if self.encryption_manager._legacy_migrate_flag:
|
||||
self.encryption_manager.save_json_data(data, self.index_file)
|
||||
self.encryption_manager.update_checksum(self.index_file)
|
||||
migration_performed = getattr(
|
||||
self.encryption_manager, "last_migration_performed", False
|
||||
)
|
||||
except Exception as exc: # noqa: BLE001 - surface clear error and restore
|
||||
if legacy_detected and backup_dir is not None:
|
||||
backup_file = backup_dir / legacy_file.name
|
||||
|
@@ -3,7 +3,7 @@ import shutil
|
||||
from pathlib import Path
|
||||
from types import SimpleNamespace
|
||||
|
||||
from tests.helpers import TEST_PASSWORD, TEST_SEED
|
||||
from helpers import TEST_PASSWORD, TEST_SEED
|
||||
|
||||
import colorama
|
||||
import constants
|
||||
|
@@ -6,7 +6,10 @@ import pytest
|
||||
from cryptography.fernet import InvalidToken
|
||||
|
||||
from helpers import TEST_PASSWORD, TEST_SEED
|
||||
from seedpass.core.encryption import EncryptionManager
|
||||
from seedpass.core.encryption import (
|
||||
EncryptionManager,
|
||||
LegacyFormatRequiresMigrationError,
|
||||
)
|
||||
from utils.key_derivation import derive_index_key
|
||||
|
||||
|
||||
@@ -24,7 +27,7 @@ def test_wrong_password_message(tmp_path):
|
||||
assert "index" in str(exc.value)
|
||||
|
||||
|
||||
def test_legacy_file_requires_migration_message(tmp_path, monkeypatch, capsys):
|
||||
def test_legacy_file_requires_migration_message(tmp_path, monkeypatch):
|
||||
def _fast_legacy_key(password: str, iterations: int = 100_000) -> bytes:
|
||||
normalized = unicodedata.normalize("NFKD", password).strip().encode("utf-8")
|
||||
key = hashlib.pbkdf2_hmac("sha256", normalized, b"", 1, dklen=32)
|
||||
@@ -33,22 +36,15 @@ def test_legacy_file_requires_migration_message(tmp_path, monkeypatch, capsys):
|
||||
monkeypatch.setattr(
|
||||
"seedpass.core.encryption._derive_legacy_key_from_password", _fast_legacy_key
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
"seedpass.core.encryption.prompt_existing_password",
|
||||
lambda *_a, **_k: TEST_PASSWORD,
|
||||
)
|
||||
monkeypatch.setattr("builtins.input", lambda *_a, **_k: "1")
|
||||
|
||||
legacy_key = _fast_legacy_key(TEST_PASSWORD)
|
||||
legacy_mgr = EncryptionManager(legacy_key, tmp_path)
|
||||
token = legacy_mgr.fernet.encrypt(b"secret")
|
||||
|
||||
new_mgr = EncryptionManager(derive_index_key(TEST_SEED), tmp_path)
|
||||
assert new_mgr.decrypt_data(token, context="index") == b"secret"
|
||||
|
||||
out = capsys.readouterr().out
|
||||
assert "Failed to decrypt index" in out
|
||||
assert "legacy index" in out
|
||||
with pytest.raises(LegacyFormatRequiresMigrationError, match="index") as exc:
|
||||
new_mgr.decrypt_data(token, context="index")
|
||||
assert "index" in str(exc.value)
|
||||
|
||||
|
||||
def test_corrupted_data_message(tmp_path):
|
||||
|
43
src/tests/test_legacy_format_exception.py
Normal file
43
src/tests/test_legacy_format_exception.py
Normal file
@@ -0,0 +1,43 @@
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from cryptography.fernet import Fernet
|
||||
|
||||
from seedpass.core.encryption import (
|
||||
EncryptionManager,
|
||||
LegacyFormatRequiresMigrationError,
|
||||
_derive_legacy_key_from_password,
|
||||
)
|
||||
from seedpass.core.vault import Vault
|
||||
|
||||
|
||||
def test_decrypt_data_raises_legacy_exception(tmp_path: Path) -> None:
|
||||
key = Fernet.generate_key()
|
||||
mgr = EncryptionManager(key, tmp_path)
|
||||
with pytest.raises(LegacyFormatRequiresMigrationError):
|
||||
mgr.decrypt_data(b"not a valid token")
|
||||
|
||||
|
||||
def test_vault_handles_legacy_exception(tmp_path: Path, monkeypatch) -> None:
|
||||
password = "secret"
|
||||
legacy_key = _derive_legacy_key_from_password(password)
|
||||
legacy_mgr = EncryptionManager(legacy_key, tmp_path)
|
||||
payload = json.dumps(
|
||||
{"schema_version": 3, "entries": {"1": {"kind": "password", "password": "x"}}}
|
||||
).encode("utf-8")
|
||||
legacy_bytes = legacy_mgr.fernet.encrypt(payload)
|
||||
index_file = tmp_path / Vault.INDEX_FILENAME
|
||||
index_file.write_bytes(legacy_bytes)
|
||||
|
||||
new_mgr = EncryptionManager(Fernet.generate_key(), tmp_path)
|
||||
vault = Vault(new_mgr, tmp_path)
|
||||
|
||||
monkeypatch.setattr("builtins.input", lambda *args, **kwargs: "1")
|
||||
monkeypatch.setattr(
|
||||
"seedpass.core.vault.prompt_existing_password", lambda *args, **kwargs: password
|
||||
)
|
||||
|
||||
data, migrated, last = vault.load_index(return_migration_flags=True)
|
||||
assert "1" in data["entries"]
|
||||
assert last is False
|
@@ -8,6 +8,7 @@ sys.path.append(str(Path(__file__).resolve().parents[1]))
|
||||
import pytest
|
||||
|
||||
import seedpass.core.encryption as enc_module
|
||||
import seedpass.core.vault as vault_module
|
||||
from helpers import TEST_PASSWORD
|
||||
from seedpass.core.encryption import (
|
||||
EncryptionManager,
|
||||
@@ -15,12 +16,13 @@ from seedpass.core.encryption import (
|
||||
)
|
||||
from seedpass.core.config_manager import ConfigManager
|
||||
from seedpass.core.vault import Vault
|
||||
from seedpass.core.migrations import LATEST_VERSION
|
||||
|
||||
|
||||
def _setup_legacy_file(tmp_path: Path, iterations: int) -> None:
|
||||
legacy_key = _derive_legacy_key_from_password(TEST_PASSWORD, iterations=iterations)
|
||||
mgr = EncryptionManager(legacy_key, tmp_path)
|
||||
data = {"entries": {"0": {"kind": "test"}}}
|
||||
data = {"schema_version": LATEST_VERSION, "entries": {"0": {"kind": "test"}}}
|
||||
json_bytes = json.dumps(data, separators=(",", ":")).encode("utf-8")
|
||||
legacy_encrypted = mgr.fernet.encrypt(json_bytes)
|
||||
(tmp_path / "seedpass_entries_db.json.enc").write_bytes(legacy_encrypted)
|
||||
@@ -32,6 +34,7 @@ def test_migrate_iterations(tmp_path, monkeypatch, iterations):
|
||||
|
||||
new_key = base64.urlsafe_b64encode(b"B" * 32)
|
||||
mgr = EncryptionManager(new_key, tmp_path)
|
||||
vault = Vault(mgr, tmp_path)
|
||||
|
||||
prompts: list[int] = []
|
||||
|
||||
@@ -40,6 +43,7 @@ def test_migrate_iterations(tmp_path, monkeypatch, iterations):
|
||||
return TEST_PASSWORD
|
||||
|
||||
monkeypatch.setattr(enc_module, "prompt_existing_password", fake_prompt)
|
||||
monkeypatch.setattr(vault_module, "prompt_existing_password", fake_prompt)
|
||||
monkeypatch.setattr("builtins.input", lambda *_a, **_k: "2")
|
||||
|
||||
calls: list[int] = []
|
||||
@@ -51,15 +55,15 @@ def test_migrate_iterations(tmp_path, monkeypatch, iterations):
|
||||
|
||||
monkeypatch.setattr(enc_module, "_derive_legacy_key_from_password", tracking_derive)
|
||||
|
||||
mgr.load_json_data()
|
||||
vault.load_index()
|
||||
# Loading again should not prompt for password or attempt legacy counts
|
||||
mgr.load_json_data()
|
||||
vault.load_index()
|
||||
|
||||
assert prompts == [1]
|
||||
expected = [50_000] if iterations == 50_000 else [50_000, 100_000]
|
||||
assert calls == expected
|
||||
|
||||
cfg = ConfigManager(Vault(mgr, tmp_path), tmp_path)
|
||||
cfg = ConfigManager(vault, tmp_path)
|
||||
assert cfg.get_kdf_iterations() == iterations
|
||||
|
||||
content = (tmp_path / "seedpass_entries_db.json.enc").read_bytes()
|
||||
|
@@ -6,12 +6,15 @@ from seedpass.core.encryption import (
|
||||
EncryptionManager,
|
||||
_derive_legacy_key_from_password,
|
||||
)
|
||||
from seedpass.core.vault import Vault
|
||||
import seedpass.core.vault as vault_module
|
||||
from seedpass.core.migrations import LATEST_VERSION
|
||||
|
||||
|
||||
def _setup_legacy_file(tmp_path: Path, password: str) -> Path:
|
||||
legacy_key = _derive_legacy_key_from_password(password, iterations=50_000)
|
||||
legacy_mgr = EncryptionManager(legacy_key, tmp_path)
|
||||
data = {"entries": {"0": {"kind": "test"}}}
|
||||
data = {"schema_version": LATEST_VERSION, "entries": {"0": {"kind": "test"}}}
|
||||
json_bytes = json.dumps(data, separators=(",", ":")).encode("utf-8")
|
||||
legacy_encrypted = legacy_mgr.fernet.encrypt(json_bytes)
|
||||
file_path = tmp_path / "seedpass_entries_db.json.enc"
|
||||
@@ -24,14 +27,15 @@ def test_open_legacy_without_migrating(tmp_path, monkeypatch):
|
||||
_setup_legacy_file(tmp_path, password)
|
||||
new_key = base64.urlsafe_b64encode(b"A" * 32)
|
||||
mgr = EncryptionManager(new_key, tmp_path)
|
||||
vault = Vault(mgr, tmp_path)
|
||||
monkeypatch.setattr(
|
||||
"seedpass.core.encryption.prompt_existing_password", lambda _: password
|
||||
)
|
||||
monkeypatch.setattr(vault_module, "prompt_existing_password", lambda _: password)
|
||||
monkeypatch.setattr("builtins.input", lambda _: "1")
|
||||
mgr.load_json_data()
|
||||
content = (tmp_path / "seedpass_entries_db.json.enc").read_bytes()
|
||||
assert not content.startswith(b"V2:")
|
||||
assert mgr.last_migration_performed is False
|
||||
vault.load_index()
|
||||
assert vault.encryption_manager.last_migration_performed is False
|
||||
assert vault.migrated_from_legacy is False
|
||||
|
||||
|
||||
def test_migrate_legacy_sets_flag(tmp_path, monkeypatch):
|
||||
@@ -39,11 +43,13 @@ def test_migrate_legacy_sets_flag(tmp_path, monkeypatch):
|
||||
_setup_legacy_file(tmp_path, password)
|
||||
new_key = base64.urlsafe_b64encode(b"B" * 32)
|
||||
mgr = EncryptionManager(new_key, tmp_path)
|
||||
vault = Vault(mgr, tmp_path)
|
||||
monkeypatch.setattr(
|
||||
"seedpass.core.encryption.prompt_existing_password", lambda _: password
|
||||
)
|
||||
monkeypatch.setattr(vault_module, "prompt_existing_password", lambda _: password)
|
||||
monkeypatch.setattr("builtins.input", lambda _: "2")
|
||||
mgr.load_json_data()
|
||||
vault.load_index()
|
||||
content = (tmp_path / "seedpass_entries_db.json.enc").read_bytes()
|
||||
assert content.startswith(b"V2:")
|
||||
assert mgr.last_migration_performed is True
|
||||
assert vault.encryption_manager.last_migration_performed is True
|
||||
|
@@ -7,7 +7,7 @@ import constants
|
||||
import seedpass.core.manager as manager_module
|
||||
from utils.fingerprint_manager import FingerprintManager
|
||||
from seedpass.core.config_manager import ConfigManager
|
||||
from tests.helpers import TEST_SEED, TEST_PASSWORD, create_vault
|
||||
from helpers import TEST_SEED, TEST_PASSWORD, create_vault
|
||||
|
||||
|
||||
def test_init_with_password(monkeypatch):
|
||||
|
@@ -25,7 +25,6 @@ def test_legacy_password_only_fallback(monkeypatch, tmp_path, caplog):
|
||||
monkeypatch.setattr(
|
||||
enc_module, "prompt_existing_password", lambda *_a, **_k: TEST_PASSWORD
|
||||
)
|
||||
monkeypatch.setattr("builtins.input", lambda *_a, **_k: "2")
|
||||
|
||||
vault, enc_mgr = create_vault(tmp_path)
|
||||
data = {"schema_version": 4, "entries": {}}
|
||||
|
@@ -81,7 +81,6 @@ def test_corruption_detection(monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
enc_module, "prompt_existing_password", lambda *_a, **_k: PASSWORD
|
||||
)
|
||||
monkeypatch.setattr("builtins.input", lambda *_a, **_k: "1")
|
||||
|
||||
with pytest.raises(InvalidToken):
|
||||
import_backup(vault, backup, path, parent_seed=SEED)
|
||||
|
@@ -8,7 +8,7 @@ from pathlib import Path
|
||||
sys.path.append(str(Path(__file__).resolve().parents[1]))
|
||||
import main
|
||||
from utils.fingerprint_manager import FingerprintManager
|
||||
from tests.helpers import TEST_SEED
|
||||
from helpers import TEST_SEED
|
||||
|
||||
|
||||
def test_profile_deletion_stops_sync(monkeypatch, tmp_path):
|
||||
|
Reference in New Issue
Block a user