Merge pull request #732 from PR0M3TH3AN/codex/update-agents.md-with-migration-plan

docs: add migration guidance
This commit is contained in:
thePR0M3TH3AN
2025-08-03 11:46:24 -04:00
committed by GitHub
4 changed files with 160 additions and 3 deletions

View File

@@ -39,6 +39,14 @@ This project is written in **Python**. Follow these instructions when working wi
Following these practices helps keep the code base consistent and secure. Following these practices helps keep the code base consistent and secure.
## Legacy Index Migration
- Always provide a migration path for index archives and import/export routines.
- Support older SeedPass versions whose indexes lacked salts or password-based encryption by detecting legacy formats and upgrading them to the current schema.
- Ensure migrations unlock older account indexes and allow Nostr synchronization.
- Add regression tests covering these migrations whenever the index format or encryption changes.
## Integrating New Entry Types ## Integrating New Entry Types
SeedPass supports multiple `kind` values in its JSON entry files. When adding a SeedPass supports multiple `kind` values in its JSON entry files. When adding a

View File

@@ -1154,6 +1154,15 @@ class PasswordManager:
fingerprint_dir=self.fingerprint_dir, fingerprint_dir=self.fingerprint_dir,
config_manager=self.config_manager, config_manager=self.config_manager,
) )
migrated = False
try:
self.vault.load_index()
migrated = getattr(self.vault, "migrated_from_legacy", False)
except RuntimeError as exc:
print(colored(str(exc), "red"))
sys.exit(1)
self.entry_manager = EntryManager( self.entry_manager = EntryManager(
vault=self.vault, vault=self.vault,
backup_manager=self.backup_manager, backup_manager=self.backup_manager,
@@ -1213,6 +1222,9 @@ class PasswordManager:
delta_since=self.delta_since or None, delta_since=self.delta_since or None,
) )
if migrated and not self.offline_mode:
self.start_background_vault_sync()
logger.debug("Managers re-initialized for the new fingerprint.") logger.debug("Managers re-initialized for the new fingerprint.")
except Exception as e: except Exception as e:
@@ -1243,12 +1255,14 @@ class PasswordManager:
encrypted = gzip.decompress(b"".join(chunks)) encrypted = gzip.decompress(b"".join(chunks))
current = self.vault.get_encrypted_index() current = self.vault.get_encrypted_index()
updated = False updated = False
migrated = False
if current != encrypted: if current != encrypted:
if self.vault.decrypt_and_save_index_from_nostr( if self.vault.decrypt_and_save_index_from_nostr(
encrypted, strict=False, merge=False encrypted, strict=False, merge=False
): ):
updated = True updated = True
current = encrypted current = encrypted
migrated = migrated or self.vault.migrated_from_legacy
if manifest.delta_since: if manifest.delta_since:
version = int(manifest.delta_since) version = int(manifest.delta_since)
deltas = await self.nostr_client.fetch_deltas_since(version) deltas = await self.nostr_client.fetch_deltas_since(version)
@@ -1259,6 +1273,9 @@ class PasswordManager:
): ):
updated = True updated = True
current = delta current = delta
migrated = migrated or self.vault.migrated_from_legacy
if migrated and not getattr(self, "offline_mode", False):
self.start_background_vault_sync()
if updated: if updated:
logger.info("Local database synchronized from Nostr.") logger.info("Local database synchronized from Nostr.")
except Exception as e: except Exception as e:
@@ -1391,11 +1408,13 @@ class PasswordManager:
if result: if result:
manifest, chunks = result manifest, chunks = result
encrypted = gzip.decompress(b"".join(chunks)) encrypted = gzip.decompress(b"".join(chunks))
migrated = False
success = self.vault.decrypt_and_save_index_from_nostr( success = self.vault.decrypt_and_save_index_from_nostr(
encrypted, strict=False, merge=False encrypted, strict=False, merge=False
) )
if success: if success:
have_data = True have_data = True
migrated = migrated or self.vault.migrated_from_legacy
current = encrypted current = encrypted
if manifest.delta_since: if manifest.delta_since:
version = int(manifest.delta_since) version = int(manifest.delta_since)
@@ -1406,6 +1425,11 @@ class PasswordManager:
delta, strict=False, merge=True delta, strict=False, merge=True
): ):
current = delta current = delta
migrated = (
migrated or self.vault.migrated_from_legacy
)
if migrated and not getattr(self, "offline_mode", False):
self.start_background_vault_sync()
logger.info("Initialized local database from Nostr.") logger.info("Initialized local database from Nostr.")
except Exception as e: # pragma: no cover - network errors except Exception as e: # pragma: no cover - network errors
logger.warning(f"Unable to sync index from Nostr: {e}") logger.warning(f"Unable to sync index from Nostr: {e}")

View File

@@ -3,6 +3,9 @@
from pathlib import Path from pathlib import Path
from typing import Optional, Union from typing import Optional, Union
from os import PathLike from os import PathLike
import shutil
from termcolor import colored
from .encryption import EncryptionManager from .encryption import EncryptionManager
@@ -22,6 +25,7 @@ class Vault:
self.fingerprint_dir = Path(fingerprint_dir) self.fingerprint_dir = Path(fingerprint_dir)
self.index_file = self.fingerprint_dir / self.INDEX_FILENAME self.index_file = self.fingerprint_dir / self.INDEX_FILENAME
self.config_file = self.fingerprint_dir / self.CONFIG_FILENAME self.config_file = self.fingerprint_dir / self.CONFIG_FILENAME
self.migrated_from_legacy = False
def set_encryption_manager(self, manager: EncryptionManager) -> None: def set_encryption_manager(self, manager: EncryptionManager) -> None:
"""Replace the internal encryption manager.""" """Replace the internal encryption manager."""
@@ -29,17 +33,47 @@ class Vault:
# ----- Password index helpers ----- # ----- Password index helpers -----
def load_index(self) -> dict: def load_index(self) -> dict:
"""Return decrypted password index data as a dict, applying migrations.""" """Return decrypted password index data as a dict, applying migrations.
If a legacy ``seedpass_passwords_db.json.enc`` file is detected, the
user is prompted to migrate it. A backup copy of the legacy file (and
its checksum) is saved under ``legacy_backups`` within the fingerprint
directory before renaming to the new filename.
"""
legacy_file = self.fingerprint_dir / "seedpass_passwords_db.json.enc" legacy_file = self.fingerprint_dir / "seedpass_passwords_db.json.enc"
self.migrated_from_legacy = False
if legacy_file.exists() and not self.index_file.exists(): if legacy_file.exists() and not self.index_file.exists():
print(colored("Legacy index detected.", "yellow"))
resp = (
input("Would you like to migrate this to the new index format? [y/N]: ")
.strip()
.lower()
)
if resp != "y":
raise RuntimeError("Migration declined by user")
legacy_checksum = ( legacy_checksum = (
self.fingerprint_dir / "seedpass_passwords_db_checksum.txt" self.fingerprint_dir / "seedpass_passwords_db_checksum.txt"
) )
backup_dir = self.fingerprint_dir / "legacy_backups"
backup_dir.mkdir(exist_ok=True)
shutil.copy2(legacy_file, backup_dir / legacy_file.name)
if legacy_checksum.exists():
shutil.copy2(legacy_checksum, backup_dir / legacy_checksum.name)
legacy_file.rename(self.index_file) legacy_file.rename(self.index_file)
if legacy_checksum.exists(): if legacy_checksum.exists():
legacy_checksum.rename( legacy_checksum.rename(
self.fingerprint_dir / "seedpass_entries_db_checksum.txt" self.fingerprint_dir / "seedpass_entries_db_checksum.txt"
) )
self.migrated_from_legacy = True
print(
colored(
"Migration complete. Original index backed up to 'legacy_backups'",
"green",
)
)
data = self.encryption_manager.load_json_data(self.index_file) data = self.encryption_manager.load_json_data(self.index_file)
from .migrations import apply_migrations, LATEST_VERSION from .migrations import apply_migrations, LATEST_VERSION
@@ -64,9 +98,13 @@ class Vault:
self, encrypted_data: bytes, *, strict: bool = True, merge: bool = False self, encrypted_data: bytes, *, strict: bool = True, merge: bool = False
) -> bool: ) -> bool:
"""Decrypt Nostr payload and update the local index.""" """Decrypt Nostr payload and update the local index."""
return self.encryption_manager.decrypt_and_save_index_from_nostr( self.migrated_from_legacy = not encrypted_data.startswith(b"V2:")
result = self.encryption_manager.decrypt_and_save_index_from_nostr(
encrypted_data, strict=strict, merge=merge encrypted_data, strict=strict, merge=merge
) )
if not result:
self.migrated_from_legacy = False
return result
# ----- Config helpers ----- # ----- Config helpers -----
def load_config(self) -> dict: def load_config(self) -> dict:

View File

@@ -5,9 +5,15 @@ from pathlib import Path
from helpers import create_vault, TEST_SEED, TEST_PASSWORD from helpers import create_vault, TEST_SEED, TEST_PASSWORD
from utils.key_derivation import derive_index_key from utils.key_derivation import derive_index_key
from cryptography.fernet import Fernet from cryptography.fernet import Fernet
from types import SimpleNamespace
import asyncio
import gzip
from seedpass.core.manager import PasswordManager, EncryptionMode
from seedpass.core.vault import Vault
def test_legacy_index_migrates(tmp_path: Path): def test_legacy_index_migrates(monkeypatch, tmp_path: Path):
vault, _ = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD) vault, _ = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD)
key = derive_index_key(TEST_SEED) key = derive_index_key(TEST_SEED)
@@ -33,6 +39,8 @@ def test_legacy_index_migrates(tmp_path: Path):
hashlib.sha256(enc).hexdigest() hashlib.sha256(enc).hexdigest()
) )
monkeypatch.setattr("builtins.input", lambda *_a, **_k: "y")
loaded = vault.load_index() loaded = vault.load_index()
assert loaded == data assert loaded == data
@@ -40,3 +48,82 @@ def test_legacy_index_migrates(tmp_path: Path):
assert new_file.exists() assert new_file.exists()
assert not legacy_file.exists() assert not legacy_file.exists()
assert not (tmp_path / "seedpass_passwords_db_checksum.txt").exists() assert not (tmp_path / "seedpass_passwords_db_checksum.txt").exists()
backup = tmp_path / "legacy_backups" / "seedpass_passwords_db.json.enc"
assert backup.exists()
def test_migration_triggers_sync(monkeypatch, tmp_path: Path):
vault, enc_mgr = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD)
key = derive_index_key(TEST_SEED)
data = {"schema_version": 4, "entries": {}}
enc = Fernet(key).encrypt(json.dumps(data).encode())
legacy_file = tmp_path / "seedpass_passwords_db.json.enc"
legacy_file.write_bytes(enc)
monkeypatch.setattr("builtins.input", lambda *_a, **_k: "y")
pm = PasswordManager.__new__(PasswordManager)
pm.encryption_mode = EncryptionMode.SEED_ONLY
pm.encryption_manager = enc_mgr
pm.vault = Vault(enc_mgr, tmp_path)
pm.parent_seed = TEST_SEED
pm.fingerprint_dir = tmp_path
pm.current_fingerprint = tmp_path.name
pm.bip85 = SimpleNamespace()
calls = {"sync": 0}
pm.start_background_vault_sync = lambda *a, **k: calls.__setitem__(
"sync", calls["sync"] + 1
)
monkeypatch.setattr(
"seedpass.core.manager.NostrClient", lambda *a, **k: SimpleNamespace()
)
pm.initialize_managers()
assert calls["sync"] == 1
def test_legacy_nostr_payload_triggers_sync(monkeypatch, tmp_path: Path):
vault, enc_mgr = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD)
key = derive_index_key(TEST_SEED)
data = {"schema_version": 4, "entries": {}}
legacy_enc = Fernet(key).encrypt(json.dumps(data).encode())
compressed = gzip.compress(legacy_enc)
class DummyClient:
def __init__(self):
self.relays = []
self.last_error = None
self.fingerprint = None
async def fetch_latest_snapshot(self):
from nostr.backup_models import Manifest
return Manifest(ver=1, algo="gzip", chunks=[], delta_since=None), [
compressed
]
async def fetch_deltas_since(self, version):
return []
pm = PasswordManager.__new__(PasswordManager)
pm.encryption_mode = EncryptionMode.SEED_ONLY
pm.encryption_manager = enc_mgr
pm.vault = Vault(enc_mgr, tmp_path)
pm.parent_seed = TEST_SEED
pm.fingerprint_dir = tmp_path
pm.current_fingerprint = tmp_path.name
pm.nostr_client = DummyClient()
pm.offline_mode = False
calls = {"sync": 0}
pm.start_background_vault_sync = lambda *a, **k: calls.__setitem__(
"sync", calls["sync"] + 1
)
asyncio.run(pm.sync_index_from_nostr_async())
assert calls["sync"] == 1
assert pm.vault.load_index() == data