mirror of
https://github.com/PR0M3TH3AN/SeedPass.git
synced 2025-09-08 07:18:47 +00:00
Merge pull request #759 from PR0M3TH3AN/codex/modify-vault.load_index-migration-logic
Handle legacy index migration errors
This commit is contained in:
@@ -43,6 +43,8 @@ class Vault:
|
||||
|
||||
legacy_file = self.fingerprint_dir / "seedpass_passwords_db.json.enc"
|
||||
self.migrated_from_legacy = False
|
||||
legacy_detected = False
|
||||
backup_dir = None
|
||||
if legacy_file.exists() and not self.index_file.exists():
|
||||
print(colored("Legacy index detected.", "yellow"))
|
||||
resp = (
|
||||
@@ -78,7 +80,7 @@ class Vault:
|
||||
if stray_checksum.exists():
|
||||
stray_checksum.unlink()
|
||||
|
||||
self.migrated_from_legacy = True
|
||||
legacy_detected = True
|
||||
print(
|
||||
colored(
|
||||
"Migration complete. Original index backed up to 'legacy_backups'",
|
||||
@@ -86,10 +88,33 @@ class Vault:
|
||||
)
|
||||
)
|
||||
|
||||
data = self.encryption_manager.load_json_data(self.index_file)
|
||||
migration_performed = getattr(
|
||||
self.encryption_manager, "last_migration_performed", False
|
||||
)
|
||||
try:
|
||||
data = self.encryption_manager.load_json_data(self.index_file)
|
||||
migration_performed = getattr(
|
||||
self.encryption_manager, "last_migration_performed", False
|
||||
)
|
||||
except Exception as exc: # noqa: BLE001 - surface clear error and restore
|
||||
if legacy_detected and backup_dir is not None:
|
||||
backup_file = backup_dir / legacy_file.name
|
||||
legacy_checksum_path = (
|
||||
self.fingerprint_dir / "seedpass_passwords_db_checksum.txt"
|
||||
)
|
||||
backup_checksum = backup_dir / legacy_checksum_path.name
|
||||
try:
|
||||
if self.index_file.exists():
|
||||
self.index_file.unlink()
|
||||
shutil.copy2(backup_file, legacy_file)
|
||||
checksum_new = (
|
||||
self.fingerprint_dir / "seedpass_entries_db_checksum.txt"
|
||||
)
|
||||
if checksum_new.exists():
|
||||
checksum_new.unlink()
|
||||
if backup_checksum.exists():
|
||||
shutil.copy2(backup_checksum, legacy_checksum_path)
|
||||
finally:
|
||||
self.migrated_from_legacy = False
|
||||
raise RuntimeError(f"Migration failed: {exc}") from exc
|
||||
|
||||
from .migrations import apply_migrations, LATEST_VERSION
|
||||
|
||||
version = data.get("schema_version", 0)
|
||||
@@ -98,12 +123,36 @@ class Vault:
|
||||
f"File schema version {version} is newer than supported {LATEST_VERSION}"
|
||||
)
|
||||
schema_migrated = version < LATEST_VERSION
|
||||
data = apply_migrations(data)
|
||||
if schema_migrated:
|
||||
self.encryption_manager.save_json_data(data, self.index_file)
|
||||
self.encryption_manager.update_checksum(self.index_file)
|
||||
|
||||
try:
|
||||
data = apply_migrations(data)
|
||||
if schema_migrated:
|
||||
self.encryption_manager.save_json_data(data, self.index_file)
|
||||
self.encryption_manager.update_checksum(self.index_file)
|
||||
except Exception as exc: # noqa: BLE001 - surface clear error and restore
|
||||
if legacy_detected and backup_dir is not None:
|
||||
backup_file = backup_dir / legacy_file.name
|
||||
legacy_checksum_path = (
|
||||
self.fingerprint_dir / "seedpass_passwords_db_checksum.txt"
|
||||
)
|
||||
backup_checksum = backup_dir / legacy_checksum_path.name
|
||||
try:
|
||||
if self.index_file.exists():
|
||||
self.index_file.unlink()
|
||||
shutil.copy2(backup_file, legacy_file)
|
||||
checksum_new = (
|
||||
self.fingerprint_dir / "seedpass_entries_db_checksum.txt"
|
||||
)
|
||||
if checksum_new.exists():
|
||||
checksum_new.unlink()
|
||||
if backup_checksum.exists():
|
||||
shutil.copy2(backup_checksum, legacy_checksum_path)
|
||||
finally:
|
||||
self.migrated_from_legacy = False
|
||||
raise RuntimeError(f"Migration failed: {exc}") from exc
|
||||
|
||||
self.migrated_from_legacy = (
|
||||
self.migrated_from_legacy or migration_performed or schema_migrated
|
||||
legacy_detected or migration_performed or schema_migrated
|
||||
)
|
||||
if return_migration_flag:
|
||||
return data, self.migrated_from_legacy
|
||||
|
@@ -2,6 +2,8 @@ import json
|
||||
import hashlib
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from helpers import create_vault, TEST_SEED, TEST_PASSWORD
|
||||
from utils.key_derivation import derive_index_key
|
||||
from cryptography.fernet import Fernet
|
||||
@@ -52,6 +54,55 @@ def test_legacy_index_migrates(monkeypatch, tmp_path: Path):
|
||||
assert backup.exists()
|
||||
|
||||
|
||||
def test_failed_migration_restores_legacy(monkeypatch, tmp_path: Path):
|
||||
vault, enc_mgr = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD)
|
||||
|
||||
key = derive_index_key(TEST_SEED)
|
||||
data = {"schema_version": 4, "entries": {}}
|
||||
enc = Fernet(key).encrypt(json.dumps(data).encode())
|
||||
legacy_file = tmp_path / "seedpass_passwords_db.json.enc"
|
||||
legacy_file.write_bytes(enc)
|
||||
checksum = hashlib.sha256(enc).hexdigest()
|
||||
(tmp_path / "seedpass_passwords_db_checksum.txt").write_text(checksum)
|
||||
|
||||
monkeypatch.setattr("builtins.input", lambda *_a, **_k: "y")
|
||||
|
||||
def bad_load_json(_path):
|
||||
raise ValueError("boom")
|
||||
|
||||
monkeypatch.setattr(enc_mgr, "load_json_data", bad_load_json)
|
||||
|
||||
with pytest.raises(RuntimeError, match="Migration failed:"):
|
||||
vault.load_index()
|
||||
|
||||
# Legacy file restored and new index removed
|
||||
assert legacy_file.exists()
|
||||
assert not (tmp_path / "seedpass_entries_db.json.enc").exists()
|
||||
assert (tmp_path / "seedpass_passwords_db_checksum.txt").read_text() == checksum
|
||||
assert not vault.migrated_from_legacy
|
||||
|
||||
|
||||
def test_migrated_index_has_v2_prefix(monkeypatch, tmp_path: Path):
|
||||
vault, _ = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD)
|
||||
|
||||
key = derive_index_key(TEST_SEED)
|
||||
data = {"schema_version": 4, "entries": {}}
|
||||
enc = Fernet(key).encrypt(json.dumps(data).encode())
|
||||
legacy_file = tmp_path / "seedpass_passwords_db.json.enc"
|
||||
legacy_file.write_bytes(enc)
|
||||
(tmp_path / "seedpass_passwords_db_checksum.txt").write_text(
|
||||
hashlib.sha256(enc).hexdigest()
|
||||
)
|
||||
|
||||
monkeypatch.setattr("builtins.input", lambda *_a, **_k: "y")
|
||||
|
||||
vault.load_index()
|
||||
|
||||
new_file = tmp_path / "seedpass_entries_db.json.enc"
|
||||
assert new_file.read_bytes().startswith(b"V2:")
|
||||
assert vault.migrated_from_legacy
|
||||
|
||||
|
||||
def test_legacy_index_migration_removes_strays(monkeypatch, tmp_path: Path):
|
||||
vault, _ = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD)
|
||||
|
||||
|
Reference in New Issue
Block a user