From 1a194aec04177cc64da57e7b2e71732158221a85 Mon Sep 17 00:00:00 2001 From: thePR0M3TH3AN <53631862+PR0M3TH3AN@users.noreply.github.com> Date: Mon, 4 Aug 2025 13:36:44 -0400 Subject: [PATCH] Handle legacy index migration errors --- src/seedpass/core/vault.py | 69 +++++++++++++++++++++++++----- src/tests/test_legacy_migration.py | 51 ++++++++++++++++++++++ 2 files changed, 110 insertions(+), 10 deletions(-) diff --git a/src/seedpass/core/vault.py b/src/seedpass/core/vault.py index 11433ac..6c557cd 100644 --- a/src/seedpass/core/vault.py +++ b/src/seedpass/core/vault.py @@ -43,6 +43,8 @@ class Vault: legacy_file = self.fingerprint_dir / "seedpass_passwords_db.json.enc" self.migrated_from_legacy = False + legacy_detected = False + backup_dir = None if legacy_file.exists() and not self.index_file.exists(): print(colored("Legacy index detected.", "yellow")) resp = ( @@ -78,7 +80,7 @@ class Vault: if stray_checksum.exists(): stray_checksum.unlink() - self.migrated_from_legacy = True + legacy_detected = True print( colored( "Migration complete. Original index backed up to 'legacy_backups'", @@ -86,10 +88,33 @@ class Vault: ) ) - data = self.encryption_manager.load_json_data(self.index_file) - migration_performed = getattr( - self.encryption_manager, "last_migration_performed", False - ) + try: + data = self.encryption_manager.load_json_data(self.index_file) + migration_performed = getattr( + self.encryption_manager, "last_migration_performed", False + ) + except Exception as exc: # noqa: BLE001 - surface clear error and restore + if legacy_detected and backup_dir is not None: + backup_file = backup_dir / legacy_file.name + legacy_checksum_path = ( + self.fingerprint_dir / "seedpass_passwords_db_checksum.txt" + ) + backup_checksum = backup_dir / legacy_checksum_path.name + try: + if self.index_file.exists(): + self.index_file.unlink() + shutil.copy2(backup_file, legacy_file) + checksum_new = ( + self.fingerprint_dir / "seedpass_entries_db_checksum.txt" + ) + if checksum_new.exists(): + checksum_new.unlink() + if backup_checksum.exists(): + shutil.copy2(backup_checksum, legacy_checksum_path) + finally: + self.migrated_from_legacy = False + raise RuntimeError(f"Migration failed: {exc}") from exc + from .migrations import apply_migrations, LATEST_VERSION version = data.get("schema_version", 0) @@ -98,12 +123,36 @@ class Vault: f"File schema version {version} is newer than supported {LATEST_VERSION}" ) schema_migrated = version < LATEST_VERSION - data = apply_migrations(data) - if schema_migrated: - self.encryption_manager.save_json_data(data, self.index_file) - self.encryption_manager.update_checksum(self.index_file) + + try: + data = apply_migrations(data) + if schema_migrated: + self.encryption_manager.save_json_data(data, self.index_file) + self.encryption_manager.update_checksum(self.index_file) + except Exception as exc: # noqa: BLE001 - surface clear error and restore + if legacy_detected and backup_dir is not None: + backup_file = backup_dir / legacy_file.name + legacy_checksum_path = ( + self.fingerprint_dir / "seedpass_passwords_db_checksum.txt" + ) + backup_checksum = backup_dir / legacy_checksum_path.name + try: + if self.index_file.exists(): + self.index_file.unlink() + shutil.copy2(backup_file, legacy_file) + checksum_new = ( + self.fingerprint_dir / "seedpass_entries_db_checksum.txt" + ) + if checksum_new.exists(): + checksum_new.unlink() + if backup_checksum.exists(): + shutil.copy2(backup_checksum, legacy_checksum_path) + finally: + self.migrated_from_legacy = False + raise RuntimeError(f"Migration failed: {exc}") from exc + self.migrated_from_legacy = ( - self.migrated_from_legacy or migration_performed or schema_migrated + legacy_detected or migration_performed or schema_migrated ) if return_migration_flag: return data, self.migrated_from_legacy diff --git a/src/tests/test_legacy_migration.py b/src/tests/test_legacy_migration.py index 06baefe..9ceb5d2 100644 --- a/src/tests/test_legacy_migration.py +++ b/src/tests/test_legacy_migration.py @@ -2,6 +2,8 @@ import json import hashlib from pathlib import Path +import pytest + from helpers import create_vault, TEST_SEED, TEST_PASSWORD from utils.key_derivation import derive_index_key from cryptography.fernet import Fernet @@ -52,6 +54,55 @@ def test_legacy_index_migrates(monkeypatch, tmp_path: Path): assert backup.exists() +def test_failed_migration_restores_legacy(monkeypatch, tmp_path: Path): + vault, enc_mgr = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD) + + key = derive_index_key(TEST_SEED) + data = {"schema_version": 4, "entries": {}} + enc = Fernet(key).encrypt(json.dumps(data).encode()) + legacy_file = tmp_path / "seedpass_passwords_db.json.enc" + legacy_file.write_bytes(enc) + checksum = hashlib.sha256(enc).hexdigest() + (tmp_path / "seedpass_passwords_db_checksum.txt").write_text(checksum) + + monkeypatch.setattr("builtins.input", lambda *_a, **_k: "y") + + def bad_load_json(_path): + raise ValueError("boom") + + monkeypatch.setattr(enc_mgr, "load_json_data", bad_load_json) + + with pytest.raises(RuntimeError, match="Migration failed:"): + vault.load_index() + + # Legacy file restored and new index removed + assert legacy_file.exists() + assert not (tmp_path / "seedpass_entries_db.json.enc").exists() + assert (tmp_path / "seedpass_passwords_db_checksum.txt").read_text() == checksum + assert not vault.migrated_from_legacy + + +def test_migrated_index_has_v2_prefix(monkeypatch, tmp_path: Path): + vault, _ = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD) + + key = derive_index_key(TEST_SEED) + data = {"schema_version": 4, "entries": {}} + enc = Fernet(key).encrypt(json.dumps(data).encode()) + legacy_file = tmp_path / "seedpass_passwords_db.json.enc" + legacy_file.write_bytes(enc) + (tmp_path / "seedpass_passwords_db_checksum.txt").write_text( + hashlib.sha256(enc).hexdigest() + ) + + monkeypatch.setattr("builtins.input", lambda *_a, **_k: "y") + + vault.load_index() + + new_file = tmp_path / "seedpass_entries_db.json.enc" + assert new_file.read_bytes().startswith(b"V2:") + assert vault.migrated_from_legacy + + def test_legacy_index_migration_removes_strays(monkeypatch, tmp_path: Path): vault, _ = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD)