From 5b2c239c21e396ec6e26748453b47b9242b5dcc1 Mon Sep 17 00:00:00 2001 From: thePR0M3TH3AN <53631862+PR0M3TH3AN@users.noreply.github.com> Date: Tue, 1 Jul 2025 21:19:58 -0400 Subject: [PATCH] Add schema migrations for index --- src/password_manager/entry_management.py | 5 +-- src/password_manager/migrations.py | 43 ++++++++++++++++++++++++ src/password_manager/vault.py | 13 +++++-- src/tests/test_backup_restore.py | 4 +-- src/tests/test_migrations.py | 34 +++++++++++++++++++ src/tests/test_portable_backup.py | 5 +-- 6 files changed, 96 insertions(+), 8 deletions(-) create mode 100644 src/password_manager/migrations.py create mode 100644 src/tests/test_migrations.py diff --git a/src/password_manager/entry_management.py b/src/password_manager/entry_management.py index 346696f..26f4008 100644 --- a/src/password_manager/entry_management.py +++ b/src/password_manager/entry_management.py @@ -27,6 +27,7 @@ from typing import Optional, Tuple, Dict, Any, List from pathlib import Path from termcolor import colored +from password_manager.migrations import LATEST_VERSION from password_manager.vault import Vault from utils.file_lock import exclusive_lock @@ -61,12 +62,12 @@ class EntryManager: return data except Exception as e: logger.error(f"Failed to load index: {e}") - return {"passwords": {}} + return {"schema_version": LATEST_VERSION, "passwords": {}} else: logger.info( f"Index file '{self.index_file}' not found. Initializing new password database." ) - return {"passwords": {}} + return {"schema_version": LATEST_VERSION, "passwords": {}} def _save_index(self, data: Dict[str, Any]) -> None: try: diff --git a/src/password_manager/migrations.py b/src/password_manager/migrations.py new file mode 100644 index 0000000..e1e6843 --- /dev/null +++ b/src/password_manager/migrations.py @@ -0,0 +1,43 @@ +"""Schema migration helpers for password index files.""" + +from __future__ import annotations + +from typing import Callable, Dict + +MIGRATIONS: Dict[int, Callable[[dict], dict]] = {} + + +def migration(from_ver: int) -> Callable[[Callable[[dict], dict]], Callable[[dict], dict]]: + """Register a migration function from *from_ver* to *from_ver* + 1.""" + + def decorator(func: Callable[[dict], dict]) -> Callable[[dict], dict]: + MIGRATIONS[from_ver] = func + return func + + return decorator + + +@migration(0) +def _v0_to_v1(data: dict) -> dict: + """Inject schema_version field for initial upgrade.""" + data["schema_version"] = 1 + return data + + +LATEST_VERSION = 1 + + +def apply_migrations(data: dict) -> dict: + """Upgrade *data* in-place to the latest schema version.""" + current = data.get("schema_version", 0) + if current > LATEST_VERSION: + raise ValueError(f"Unsupported schema version {current}") + + while current < LATEST_VERSION: + migrate = MIGRATIONS.get(current) + if migrate is None: + raise ValueError(f"No migration available from version {current}") + data = migrate(data) + current = data.get("schema_version", current + 1) + + return data diff --git a/src/password_manager/vault.py b/src/password_manager/vault.py index 08561de..a3cfeef 100644 --- a/src/password_manager/vault.py +++ b/src/password_manager/vault.py @@ -29,8 +29,17 @@ class Vault: # ----- Password index helpers ----- def load_index(self) -> dict: - """Return decrypted password index data as a dict.""" - return self.encryption_manager.load_json_data(self.index_file) + """Return decrypted password index data as a dict, applying migrations.""" + data = self.encryption_manager.load_json_data(self.index_file) + from .migrations import apply_migrations, LATEST_VERSION + + version = data.get("schema_version", 0) + if version > LATEST_VERSION: + raise ValueError( + f"File schema version {version} is newer than supported {LATEST_VERSION}" + ) + data = apply_migrations(data) + return data def save_index(self, data: dict) -> None: """Encrypt and write password index.""" diff --git a/src/tests/test_backup_restore.py b/src/tests/test_backup_restore.py index 56d9329..3abe76a 100644 --- a/src/tests/test_backup_restore.py +++ b/src/tests/test_backup_restore.py @@ -45,11 +45,11 @@ def test_backup_restore_workflow(monkeypatch): vault.save_index({"passwords": {"temp": {}}}) backup_mgr.restore_latest_backup() - assert vault.load_index() == data2 + assert vault.load_index()["passwords"] == data2["passwords"] vault.save_index({"passwords": {}}) backup_mgr.restore_backup_by_timestamp(1111) - assert vault.load_index() == data1 + assert vault.load_index()["passwords"] == data1["passwords"] backup1.unlink() current = vault.load_index() diff --git a/src/tests/test_migrations.py b/src/tests/test_migrations.py new file mode 100644 index 0000000..fa14ef2 --- /dev/null +++ b/src/tests/test_migrations.py @@ -0,0 +1,34 @@ +import sys +from pathlib import Path +import pytest +from cryptography.fernet import Fernet + +sys.path.append(str(Path(__file__).resolve().parents[1])) + +from password_manager.encryption import EncryptionManager +from password_manager.vault import Vault +from password_manager.migrations import LATEST_VERSION + + +def setup(tmp_path: Path): + key = Fernet.generate_key() + enc_mgr = EncryptionManager(key, tmp_path) + vault = Vault(enc_mgr, tmp_path) + return enc_mgr, vault + + +def test_migrate_v0_to_v1(tmp_path: Path): + enc_mgr, vault = setup(tmp_path) + legacy = {"passwords": {"0": {"website": "a", "length": 8}}} + enc_mgr.save_json_data(legacy) + data = vault.load_index() + assert data["schema_version"] == LATEST_VERSION + assert data["passwords"] == legacy["passwords"] + + +def test_error_on_future_version(tmp_path: Path): + enc_mgr, vault = setup(tmp_path) + future = {"schema_version": LATEST_VERSION + 1, "passwords": {}} + enc_mgr.save_json_data(future) + with pytest.raises(ValueError): + vault.load_index() diff --git a/src/tests/test_portable_backup.py b/src/tests/test_portable_backup.py index 835b981..6b9c665 100644 --- a/src/tests/test_portable_backup.py +++ b/src/tests/test_portable_backup.py @@ -54,7 +54,7 @@ def test_round_trip_across_modes(monkeypatch): vault.save_index({"pw": 0}) import_backup(vault, backup, path) - assert vault.load_index() == data + assert vault.load_index()["pw"] == data["pw"] def test_corruption_detection(monkeypatch): @@ -113,4 +113,5 @@ def test_import_over_existing(monkeypatch): vault.save_index({"v": 2}) import_backup(vault, backup, path) - assert vault.load_index() == {"v": 1} + loaded = vault.load_index() + assert loaded["v"] == 1