mirror of
https://github.com/PR0M3TH3AN/SeedPass.git
synced 2025-09-10 00:09:04 +00:00
Merge pull request #745 from PR0M3TH3AN/codex/improve-fallback-error-handling
feat: add legacy migration prompt
This commit is contained in:
@@ -76,6 +76,10 @@ class EncryptionManager:
|
|||||||
)
|
)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
# Track user preference for handling legacy indexes
|
||||||
|
self._legacy_migrate_flag = True
|
||||||
|
self.last_migration_performed = False
|
||||||
|
|
||||||
def encrypt_data(self, data: bytes) -> bytes:
|
def encrypt_data(self, data: bytes) -> bytes:
|
||||||
"""
|
"""
|
||||||
(2) Encrypts data using the NEW AES-GCM format, prepending a version
|
(2) Encrypts data using the NEW AES-GCM format, prepending a version
|
||||||
@@ -134,6 +138,26 @@ class EncryptionManager:
|
|||||||
if isinstance(e, InvalidToken) and str(e) == "AES-GCM payload too short":
|
if isinstance(e, InvalidToken) and str(e) == "AES-GCM payload too short":
|
||||||
raise
|
raise
|
||||||
logger.error(f"FATAL: Could not decrypt data: {e}", exc_info=True)
|
logger.error(f"FATAL: Could not decrypt data: {e}", exc_info=True)
|
||||||
|
print(
|
||||||
|
colored(
|
||||||
|
"Failed to decrypt with current key. This may be a legacy index.",
|
||||||
|
"red",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
resp = input(
|
||||||
|
"\nChoose an option:\n"
|
||||||
|
"1. Open legacy index without migrating\n"
|
||||||
|
"2. Migrate to new format and sync to Nostr\n"
|
||||||
|
"Selection [1/2]: "
|
||||||
|
).strip()
|
||||||
|
if resp == "1":
|
||||||
|
self._legacy_migrate_flag = False
|
||||||
|
elif resp == "2":
|
||||||
|
self._legacy_migrate_flag = True
|
||||||
|
else:
|
||||||
|
raise InvalidToken(
|
||||||
|
"User declined legacy decryption or provided invalid choice."
|
||||||
|
) from e
|
||||||
try:
|
try:
|
||||||
password = prompt_existing_password(
|
password = prompt_existing_password(
|
||||||
"Enter your master password for legacy decryption: "
|
"Enter your master password for legacy decryption: "
|
||||||
@@ -250,6 +274,7 @@ class EncryptionManager:
|
|||||||
encrypted_data = fh.read()
|
encrypted_data = fh.read()
|
||||||
|
|
||||||
is_legacy = not encrypted_data.startswith(b"V2:")
|
is_legacy = not encrypted_data.startswith(b"V2:")
|
||||||
|
self.last_migration_performed = False
|
||||||
|
|
||||||
try:
|
try:
|
||||||
decrypted_data = self.decrypt_data(encrypted_data)
|
decrypted_data = self.decrypt_data(encrypted_data)
|
||||||
@@ -259,10 +284,11 @@ class EncryptionManager:
|
|||||||
data = json_lib.loads(decrypted_data.decode("utf-8"))
|
data = json_lib.loads(decrypted_data.decode("utf-8"))
|
||||||
|
|
||||||
# If it was a legacy file, re-save it in the new format now
|
# If it was a legacy file, re-save it in the new format now
|
||||||
if is_legacy:
|
if is_legacy and self._legacy_migrate_flag:
|
||||||
logger.info(f"Migrating and re-saving legacy vault file: {file_path}")
|
logger.info(f"Migrating and re-saving legacy vault file: {file_path}")
|
||||||
self.save_json_data(data, relative_path)
|
self.save_json_data(data, relative_path)
|
||||||
self.update_checksum(relative_path)
|
self.update_checksum(relative_path)
|
||||||
|
self.last_migration_performed = True
|
||||||
|
|
||||||
return data
|
return data
|
||||||
except (InvalidToken, InvalidTag, JSONDecodeError) as e:
|
except (InvalidToken, InvalidTag, JSONDecodeError) as e:
|
||||||
|
@@ -76,6 +76,9 @@ class Vault:
|
|||||||
)
|
)
|
||||||
|
|
||||||
data = self.encryption_manager.load_json_data(self.index_file)
|
data = self.encryption_manager.load_json_data(self.index_file)
|
||||||
|
self.migrated_from_legacy = self.migrated_from_legacy or getattr(
|
||||||
|
self.encryption_manager, "last_migration_performed", False
|
||||||
|
)
|
||||||
from .migrations import apply_migrations, LATEST_VERSION
|
from .migrations import apply_migrations, LATEST_VERSION
|
||||||
|
|
||||||
version = data.get("schema_version", 0)
|
version = data.get("schema_version", 0)
|
||||||
|
@@ -23,6 +23,7 @@ def test_decrypt_data_password_fallback(tmp_path, monkeypatch):
|
|||||||
monkeypatch.setattr(
|
monkeypatch.setattr(
|
||||||
enc_module, "prompt_existing_password", lambda *_a, **_k: TEST_PASSWORD
|
enc_module, "prompt_existing_password", lambda *_a, **_k: TEST_PASSWORD
|
||||||
)
|
)
|
||||||
|
monkeypatch.setattr("builtins.input", lambda *_a, **_k: "1")
|
||||||
|
|
||||||
legacy_key = _fast_legacy_key(TEST_PASSWORD, iterations=50_000)
|
legacy_key = _fast_legacy_key(TEST_PASSWORD, iterations=50_000)
|
||||||
legacy_mgr = EncryptionManager(legacy_key, tmp_path)
|
legacy_mgr = EncryptionManager(legacy_key, tmp_path)
|
||||||
|
49
src/tests/test_legacy_migration_prompt.py
Normal file
49
src/tests/test_legacy_migration_prompt.py
Normal file
@@ -0,0 +1,49 @@
|
|||||||
|
import base64
|
||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from seedpass.core.encryption import (
|
||||||
|
EncryptionManager,
|
||||||
|
_derive_legacy_key_from_password,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _setup_legacy_file(tmp_path: Path, password: str) -> Path:
|
||||||
|
legacy_key = _derive_legacy_key_from_password(password, iterations=50_000)
|
||||||
|
legacy_mgr = EncryptionManager(legacy_key, tmp_path)
|
||||||
|
data = {"entries": {"0": {"kind": "test"}}}
|
||||||
|
json_bytes = json.dumps(data, separators=(",", ":")).encode("utf-8")
|
||||||
|
legacy_encrypted = legacy_mgr.fernet.encrypt(json_bytes)
|
||||||
|
file_path = tmp_path / "seedpass_entries_db.json.enc"
|
||||||
|
file_path.write_bytes(legacy_encrypted)
|
||||||
|
return file_path
|
||||||
|
|
||||||
|
|
||||||
|
def test_open_legacy_without_migrating(tmp_path, monkeypatch):
|
||||||
|
password = "secret"
|
||||||
|
_setup_legacy_file(tmp_path, password)
|
||||||
|
new_key = base64.urlsafe_b64encode(b"A" * 32)
|
||||||
|
mgr = EncryptionManager(new_key, tmp_path)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"seedpass.core.encryption.prompt_existing_password", lambda _: password
|
||||||
|
)
|
||||||
|
monkeypatch.setattr("builtins.input", lambda _: "1")
|
||||||
|
mgr.load_json_data()
|
||||||
|
content = (tmp_path / "seedpass_entries_db.json.enc").read_bytes()
|
||||||
|
assert not content.startswith(b"V2:")
|
||||||
|
assert mgr.last_migration_performed is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_migrate_legacy_and_sync(tmp_path, monkeypatch):
|
||||||
|
password = "secret"
|
||||||
|
_setup_legacy_file(tmp_path, password)
|
||||||
|
new_key = base64.urlsafe_b64encode(b"B" * 32)
|
||||||
|
mgr = EncryptionManager(new_key, tmp_path)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"seedpass.core.encryption.prompt_existing_password", lambda _: password
|
||||||
|
)
|
||||||
|
monkeypatch.setattr("builtins.input", lambda _: "2")
|
||||||
|
mgr.load_json_data()
|
||||||
|
content = (tmp_path / "seedpass_entries_db.json.enc").read_bytes()
|
||||||
|
assert content.startswith(b"V2:")
|
||||||
|
assert mgr.last_migration_performed is True
|
@@ -25,6 +25,7 @@ def test_legacy_password_only_fallback(monkeypatch, tmp_path, caplog):
|
|||||||
monkeypatch.setattr(
|
monkeypatch.setattr(
|
||||||
enc_module, "prompt_existing_password", lambda *_a, **_k: TEST_PASSWORD
|
enc_module, "prompt_existing_password", lambda *_a, **_k: TEST_PASSWORD
|
||||||
)
|
)
|
||||||
|
monkeypatch.setattr("builtins.input", lambda *_a, **_k: "2")
|
||||||
|
|
||||||
vault, enc_mgr = create_vault(tmp_path)
|
vault, enc_mgr = create_vault(tmp_path)
|
||||||
data = {"schema_version": 4, "entries": {}}
|
data = {"schema_version": 4, "entries": {}}
|
||||||
|
@@ -9,6 +9,7 @@ import sys
|
|||||||
|
|
||||||
sys.path.append(str(Path(__file__).resolve().parents[1]))
|
sys.path.append(str(Path(__file__).resolve().parents[1]))
|
||||||
|
|
||||||
|
import seedpass.core.encryption as enc_module
|
||||||
from seedpass.core.encryption import EncryptionManager
|
from seedpass.core.encryption import EncryptionManager
|
||||||
from seedpass.core.vault import Vault
|
from seedpass.core.vault import Vault
|
||||||
from seedpass.core.backup import BackupManager
|
from seedpass.core.backup import BackupManager
|
||||||
@@ -71,6 +72,17 @@ def test_corruption_detection(monkeypatch):
|
|||||||
content["payload"] = base64.b64encode(payload).decode()
|
content["payload"] = base64.b64encode(payload).decode()
|
||||||
path.write_text(json.dumps(content))
|
path.write_text(json.dumps(content))
|
||||||
|
|
||||||
|
def _fast_legacy_key(password: str, iterations: int = 100_000) -> bytes:
|
||||||
|
return base64.urlsafe_b64encode(b"0" * 32)
|
||||||
|
|
||||||
|
monkeypatch.setattr(
|
||||||
|
enc_module, "_derive_legacy_key_from_password", _fast_legacy_key
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
enc_module, "prompt_existing_password", lambda *_a, **_k: PASSWORD
|
||||||
|
)
|
||||||
|
monkeypatch.setattr("builtins.input", lambda *_a, **_k: "1")
|
||||||
|
|
||||||
with pytest.raises(InvalidToken):
|
with pytest.raises(InvalidToken):
|
||||||
import_backup(vault, backup, path, parent_seed=SEED)
|
import_backup(vault, backup, path, parent_seed=SEED)
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user