mirror of
https://github.com/PR0M3TH3AN/SeedPass.git
synced 2025-09-08 07:18:47 +00:00
Merge pull request #118 from PR0M3TH3AN/codex/implement-password-manager-migrations
Add schema migration support
This commit is contained in:
@@ -27,6 +27,7 @@ from typing import Optional, Tuple, Dict, Any, List
|
||||
from pathlib import Path
|
||||
|
||||
from termcolor import colored
|
||||
from password_manager.migrations import LATEST_VERSION
|
||||
|
||||
from password_manager.vault import Vault
|
||||
from utils.file_lock import exclusive_lock
|
||||
@@ -61,12 +62,12 @@ class EntryManager:
|
||||
return data
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load index: {e}")
|
||||
return {"passwords": {}}
|
||||
return {"schema_version": LATEST_VERSION, "passwords": {}}
|
||||
else:
|
||||
logger.info(
|
||||
f"Index file '{self.index_file}' not found. Initializing new password database."
|
||||
)
|
||||
return {"passwords": {}}
|
||||
return {"schema_version": LATEST_VERSION, "passwords": {}}
|
||||
|
||||
def _save_index(self, data: Dict[str, Any]) -> None:
|
||||
try:
|
||||
|
43
src/password_manager/migrations.py
Normal file
43
src/password_manager/migrations.py
Normal file
@@ -0,0 +1,43 @@
|
||||
"""Schema migration helpers for password index files."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Callable, Dict
|
||||
|
||||
MIGRATIONS: Dict[int, Callable[[dict], dict]] = {}
|
||||
|
||||
|
||||
def migration(from_ver: int) -> Callable[[Callable[[dict], dict]], Callable[[dict], dict]]:
|
||||
"""Register a migration function from *from_ver* to *from_ver* + 1."""
|
||||
|
||||
def decorator(func: Callable[[dict], dict]) -> Callable[[dict], dict]:
|
||||
MIGRATIONS[from_ver] = func
|
||||
return func
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
@migration(0)
|
||||
def _v0_to_v1(data: dict) -> dict:
|
||||
"""Inject schema_version field for initial upgrade."""
|
||||
data["schema_version"] = 1
|
||||
return data
|
||||
|
||||
|
||||
LATEST_VERSION = 1
|
||||
|
||||
|
||||
def apply_migrations(data: dict) -> dict:
|
||||
"""Upgrade *data* in-place to the latest schema version."""
|
||||
current = data.get("schema_version", 0)
|
||||
if current > LATEST_VERSION:
|
||||
raise ValueError(f"Unsupported schema version {current}")
|
||||
|
||||
while current < LATEST_VERSION:
|
||||
migrate = MIGRATIONS.get(current)
|
||||
if migrate is None:
|
||||
raise ValueError(f"No migration available from version {current}")
|
||||
data = migrate(data)
|
||||
current = data.get("schema_version", current + 1)
|
||||
|
||||
return data
|
@@ -29,8 +29,17 @@ class Vault:
|
||||
|
||||
# ----- Password index helpers -----
|
||||
def load_index(self) -> dict:
|
||||
"""Return decrypted password index data as a dict."""
|
||||
return self.encryption_manager.load_json_data(self.index_file)
|
||||
"""Return decrypted password index data as a dict, applying migrations."""
|
||||
data = self.encryption_manager.load_json_data(self.index_file)
|
||||
from .migrations import apply_migrations, LATEST_VERSION
|
||||
|
||||
version = data.get("schema_version", 0)
|
||||
if version > LATEST_VERSION:
|
||||
raise ValueError(
|
||||
f"File schema version {version} is newer than supported {LATEST_VERSION}"
|
||||
)
|
||||
data = apply_migrations(data)
|
||||
return data
|
||||
|
||||
def save_index(self, data: dict) -> None:
|
||||
"""Encrypt and write password index."""
|
||||
|
@@ -45,11 +45,11 @@ def test_backup_restore_workflow(monkeypatch):
|
||||
|
||||
vault.save_index({"passwords": {"temp": {}}})
|
||||
backup_mgr.restore_latest_backup()
|
||||
assert vault.load_index() == data2
|
||||
assert vault.load_index()["passwords"] == data2["passwords"]
|
||||
|
||||
vault.save_index({"passwords": {}})
|
||||
backup_mgr.restore_backup_by_timestamp(1111)
|
||||
assert vault.load_index() == data1
|
||||
assert vault.load_index()["passwords"] == data1["passwords"]
|
||||
|
||||
backup1.unlink()
|
||||
current = vault.load_index()
|
||||
|
34
src/tests/test_migrations.py
Normal file
34
src/tests/test_migrations.py
Normal file
@@ -0,0 +1,34 @@
|
||||
import sys
|
||||
from pathlib import Path
|
||||
import pytest
|
||||
from cryptography.fernet import Fernet
|
||||
|
||||
sys.path.append(str(Path(__file__).resolve().parents[1]))
|
||||
|
||||
from password_manager.encryption import EncryptionManager
|
||||
from password_manager.vault import Vault
|
||||
from password_manager.migrations import LATEST_VERSION
|
||||
|
||||
|
||||
def setup(tmp_path: Path):
|
||||
key = Fernet.generate_key()
|
||||
enc_mgr = EncryptionManager(key, tmp_path)
|
||||
vault = Vault(enc_mgr, tmp_path)
|
||||
return enc_mgr, vault
|
||||
|
||||
|
||||
def test_migrate_v0_to_v1(tmp_path: Path):
|
||||
enc_mgr, vault = setup(tmp_path)
|
||||
legacy = {"passwords": {"0": {"website": "a", "length": 8}}}
|
||||
enc_mgr.save_json_data(legacy)
|
||||
data = vault.load_index()
|
||||
assert data["schema_version"] == LATEST_VERSION
|
||||
assert data["passwords"] == legacy["passwords"]
|
||||
|
||||
|
||||
def test_error_on_future_version(tmp_path: Path):
|
||||
enc_mgr, vault = setup(tmp_path)
|
||||
future = {"schema_version": LATEST_VERSION + 1, "passwords": {}}
|
||||
enc_mgr.save_json_data(future)
|
||||
with pytest.raises(ValueError):
|
||||
vault.load_index()
|
@@ -54,7 +54,7 @@ def test_round_trip_across_modes(monkeypatch):
|
||||
|
||||
vault.save_index({"pw": 0})
|
||||
import_backup(vault, backup, path)
|
||||
assert vault.load_index() == data
|
||||
assert vault.load_index()["pw"] == data["pw"]
|
||||
|
||||
|
||||
def test_corruption_detection(monkeypatch):
|
||||
@@ -113,4 +113,5 @@ def test_import_over_existing(monkeypatch):
|
||||
|
||||
vault.save_index({"v": 2})
|
||||
import_backup(vault, backup, path)
|
||||
assert vault.load_index() == {"v": 1}
|
||||
loaded = vault.load_index()
|
||||
assert loaded["v"] == 1
|
||||
|
Reference in New Issue
Block a user