Add schema migrations for index

This commit is contained in:
thePR0M3TH3AN
2025-07-01 21:19:58 -04:00
parent d90b8e572e
commit 5b2c239c21
6 changed files with 96 additions and 8 deletions

View File

@@ -27,6 +27,7 @@ from typing import Optional, Tuple, Dict, Any, List
from pathlib import Path
from termcolor import colored
from password_manager.migrations import LATEST_VERSION
from password_manager.vault import Vault
from utils.file_lock import exclusive_lock
@@ -61,12 +62,12 @@ class EntryManager:
return data
except Exception as e:
logger.error(f"Failed to load index: {e}")
return {"passwords": {}}
return {"schema_version": LATEST_VERSION, "passwords": {}}
else:
logger.info(
f"Index file '{self.index_file}' not found. Initializing new password database."
)
return {"passwords": {}}
return {"schema_version": LATEST_VERSION, "passwords": {}}
def _save_index(self, data: Dict[str, Any]) -> None:
try:

View File

@@ -0,0 +1,43 @@
"""Schema migration helpers for password index files."""
from __future__ import annotations
from typing import Callable, Dict
MIGRATIONS: Dict[int, Callable[[dict], dict]] = {}
def migration(from_ver: int) -> Callable[[Callable[[dict], dict]], Callable[[dict], dict]]:
"""Register a migration function from *from_ver* to *from_ver* + 1."""
def decorator(func: Callable[[dict], dict]) -> Callable[[dict], dict]:
MIGRATIONS[from_ver] = func
return func
return decorator
@migration(0)
def _v0_to_v1(data: dict) -> dict:
"""Inject schema_version field for initial upgrade."""
data["schema_version"] = 1
return data
LATEST_VERSION = 1
def apply_migrations(data: dict) -> dict:
"""Upgrade *data* in-place to the latest schema version."""
current = data.get("schema_version", 0)
if current > LATEST_VERSION:
raise ValueError(f"Unsupported schema version {current}")
while current < LATEST_VERSION:
migrate = MIGRATIONS.get(current)
if migrate is None:
raise ValueError(f"No migration available from version {current}")
data = migrate(data)
current = data.get("schema_version", current + 1)
return data

View File

@@ -29,8 +29,17 @@ class Vault:
# ----- Password index helpers -----
def load_index(self) -> dict:
"""Return decrypted password index data as a dict."""
return self.encryption_manager.load_json_data(self.index_file)
"""Return decrypted password index data as a dict, applying migrations."""
data = self.encryption_manager.load_json_data(self.index_file)
from .migrations import apply_migrations, LATEST_VERSION
version = data.get("schema_version", 0)
if version > LATEST_VERSION:
raise ValueError(
f"File schema version {version} is newer than supported {LATEST_VERSION}"
)
data = apply_migrations(data)
return data
def save_index(self, data: dict) -> None:
"""Encrypt and write password index."""

View File

@@ -45,11 +45,11 @@ def test_backup_restore_workflow(monkeypatch):
vault.save_index({"passwords": {"temp": {}}})
backup_mgr.restore_latest_backup()
assert vault.load_index() == data2
assert vault.load_index()["passwords"] == data2["passwords"]
vault.save_index({"passwords": {}})
backup_mgr.restore_backup_by_timestamp(1111)
assert vault.load_index() == data1
assert vault.load_index()["passwords"] == data1["passwords"]
backup1.unlink()
current = vault.load_index()

View File

@@ -0,0 +1,34 @@
import sys
from pathlib import Path
import pytest
from cryptography.fernet import Fernet
sys.path.append(str(Path(__file__).resolve().parents[1]))
from password_manager.encryption import EncryptionManager
from password_manager.vault import Vault
from password_manager.migrations import LATEST_VERSION
def setup(tmp_path: Path):
key = Fernet.generate_key()
enc_mgr = EncryptionManager(key, tmp_path)
vault = Vault(enc_mgr, tmp_path)
return enc_mgr, vault
def test_migrate_v0_to_v1(tmp_path: Path):
enc_mgr, vault = setup(tmp_path)
legacy = {"passwords": {"0": {"website": "a", "length": 8}}}
enc_mgr.save_json_data(legacy)
data = vault.load_index()
assert data["schema_version"] == LATEST_VERSION
assert data["passwords"] == legacy["passwords"]
def test_error_on_future_version(tmp_path: Path):
enc_mgr, vault = setup(tmp_path)
future = {"schema_version": LATEST_VERSION + 1, "passwords": {}}
enc_mgr.save_json_data(future)
with pytest.raises(ValueError):
vault.load_index()

View File

@@ -54,7 +54,7 @@ def test_round_trip_across_modes(monkeypatch):
vault.save_index({"pw": 0})
import_backup(vault, backup, path)
assert vault.load_index() == data
assert vault.load_index()["pw"] == data["pw"]
def test_corruption_detection(monkeypatch):
@@ -113,4 +113,5 @@ def test_import_over_existing(monkeypatch):
vault.save_index({"v": 2})
import_backup(vault, backup, path)
assert vault.load_index() == {"v": 1}
loaded = vault.load_index()
assert loaded["v"] == 1