Handle migration flags for sync prompt

This commit is contained in:
thePR0M3TH3AN
2025-08-04 14:46:21 -04:00
parent f16a771a6c
commit 3823603712
4 changed files with 121 additions and 26 deletions

View File

@@ -1157,6 +1157,7 @@ class PasswordManager:
) )
migrated = False migrated = False
last_migration_performed = False
index_exists = ( index_exists = (
self.vault.index_file.exists() self.vault.index_file.exists()
or ( or (
@@ -1164,7 +1165,9 @@ class PasswordManager:
).exists() ).exists()
) )
try: try:
_, migrated = self.vault.load_index(return_migration_flag=True) _, migrated, last_migration_performed = self.vault.load_index(
return_migration_flags=True
)
except RuntimeError as exc: except RuntimeError as exc:
print(colored(str(exc), "red")) print(colored(str(exc), "red"))
sys.exit(1) sys.exit(1)
@@ -1232,26 +1235,32 @@ class PasswordManager:
print(colored("Local database migration successful.", "green")) print(colored("Local database migration successful.", "green"))
if self.encryption_manager is not None: if self.encryption_manager is not None:
self.encryption_manager.last_migration_performed = False self.encryption_manager.last_migration_performed = False
if not self.offline_mode and confirm_action( if last_migration_performed and not self.offline_mode:
"Do you want to sync the migrated profile to Nostr now?" if confirm_action(
): "Do you want to sync the migrated profile to Nostr now?"
result = self.sync_vault() ):
if result: result = self.sync_vault()
if result:
print(
colored(
"Profile synchronized to Nostr successfully.",
"green",
)
)
else:
print(
colored(
"Error: Failed to sync profile to Nostr.",
"red",
)
)
else:
print( print(
colored( colored(
"Profile synchronized to Nostr successfully.", "You can sync the migrated profile later from the main menu.",
"green", "yellow",
) )
) )
else:
print(colored("Error: Failed to sync profile to Nostr.", "red"))
elif not self.offline_mode:
print(
colored(
"You can sync the migrated profile later from the main menu.",
"yellow",
)
)
logger.debug("Managers re-initialized for the new fingerprint.") logger.debug("Managers re-initialized for the new fingerprint.")

View File

@@ -32,13 +32,19 @@ class Vault:
self.encryption_manager = manager self.encryption_manager = manager
# ----- Password index helpers ----- # ----- Password index helpers -----
def load_index(self, *, return_migration_flag: bool = False): def load_index(self, *, return_migration_flags: bool = False):
"""Return decrypted password index data, applying migrations. """Return decrypted password index data, applying migrations.
If a legacy ``seedpass_passwords_db.json.enc`` file is detected, the If a legacy ``seedpass_passwords_db.json.enc`` file is detected, the
user is prompted to migrate it. A backup copy of the legacy file (and user is prompted to migrate it. A backup copy of the legacy file (and
its checksum) is saved under ``legacy_backups`` within the fingerprint its checksum) is saved under ``legacy_backups`` within the fingerprint
directory before renaming to the new filename. directory before renaming to the new filename.
When ``return_migration_flags`` is ``True`` the tuple
``(data, migrated, last_migration_performed)`` is returned where
``migrated`` indicates whether any migration occurred and
``last_migration_performed`` reflects whether the underlying
:class:`EncryptionManager` reported a conversion.
""" """
legacy_file = self.fingerprint_dir / "seedpass_passwords_db.json.enc" legacy_file = self.fingerprint_dir / "seedpass_passwords_db.json.enc"
@@ -154,8 +160,8 @@ class Vault:
self.migrated_from_legacy = ( self.migrated_from_legacy = (
legacy_detected or migration_performed or schema_migrated legacy_detected or migration_performed or schema_migrated
) )
if return_migration_flag: if return_migration_flags:
return data, self.migrated_from_legacy return data, self.migrated_from_legacy, migration_performed
return data return data
def save_index(self, data: dict) -> None: def save_index(self, data: dict) -> None:

View File

@@ -298,7 +298,7 @@ def test_legacy_index_reinit_syncs_once_when_confirmed(monkeypatch, tmp_path: Pa
assert enc_mgr.last_migration_performed is False assert enc_mgr.last_migration_performed is False
def test_schema_migration_triggers_sync(monkeypatch, tmp_path: Path): def test_schema_migration_no_sync_prompt(monkeypatch, tmp_path: Path):
vault, enc_mgr = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD) vault, enc_mgr = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD)
data = {"schema_version": 3, "entries": {}} data = {"schema_version": 3, "entries": {}}
@@ -316,7 +316,8 @@ def test_schema_migration_triggers_sync(monkeypatch, tmp_path: Path):
pm.bip85 = SimpleNamespace() pm.bip85 = SimpleNamespace()
pm.offline_mode = False pm.offline_mode = False
calls = {"sync": 0} calls = {"sync": 0, "confirm": 0}
pm.sync_vault = lambda *a, **k: calls.__setitem__("sync", calls["sync"] + 1) or { pm.sync_vault = lambda *a, **k: calls.__setitem__("sync", calls["sync"] + 1) or {
"manifest_id": "m", "manifest_id": "m",
"chunk_ids": [], "chunk_ids": [],
@@ -326,8 +327,87 @@ def test_schema_migration_triggers_sync(monkeypatch, tmp_path: Path):
monkeypatch.setattr( monkeypatch.setattr(
"seedpass.core.manager.NostrClient", lambda *a, **k: SimpleNamespace() "seedpass.core.manager.NostrClient", lambda *a, **k: SimpleNamespace()
) )
monkeypatch.setattr("seedpass.core.manager.confirm_action", lambda *_a, **_k: True)
def fake_confirm(*_a, **_k):
calls["confirm"] += 1
return True
monkeypatch.setattr("seedpass.core.manager.confirm_action", fake_confirm)
pm.initialize_managers() pm.initialize_managers()
assert calls["sync"] == 1 assert calls["sync"] == 0
assert calls["confirm"] == 0
assert enc_mgr.last_migration_performed is False assert enc_mgr.last_migration_performed is False
def test_declined_migration_no_sync_prompt(monkeypatch, tmp_path: Path):
vault, enc_mgr = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD)
key = derive_index_key(TEST_SEED)
data = {"schema_version": 4, "entries": {}}
enc = Fernet(key).encrypt(json.dumps(data).encode())
legacy_file = tmp_path / "seedpass_passwords_db.json.enc"
legacy_file.write_bytes(enc)
monkeypatch.setattr("builtins.input", lambda *_a, **_k: "n")
pm = PasswordManager.__new__(PasswordManager)
pm.encryption_mode = EncryptionMode.SEED_ONLY
pm.encryption_manager = enc_mgr
pm.vault = Vault(enc_mgr, tmp_path)
pm.parent_seed = TEST_SEED
pm.fingerprint_dir = tmp_path
pm.current_fingerprint = tmp_path.name
pm.bip85 = SimpleNamespace()
calls = {"confirm": 0}
def fake_confirm(*_a, **_k):
calls["confirm"] += 1
return True
monkeypatch.setattr("seedpass.core.manager.confirm_action", fake_confirm)
with pytest.raises(SystemExit):
pm.initialize_managers()
assert calls["confirm"] == 0
def test_failed_migration_no_sync_prompt(monkeypatch, tmp_path: Path):
vault, enc_mgr = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD)
key = derive_index_key(TEST_SEED)
data = {"schema_version": 4, "entries": {}}
enc = Fernet(key).encrypt(json.dumps(data).encode())
legacy_file = tmp_path / "seedpass_passwords_db.json.enc"
legacy_file.write_bytes(enc)
monkeypatch.setattr("builtins.input", lambda *_a, **_k: "y")
def fail(*_a, **_k):
raise ValueError("boom")
monkeypatch.setattr(enc_mgr, "load_json_data", fail)
pm = PasswordManager.__new__(PasswordManager)
pm.encryption_mode = EncryptionMode.SEED_ONLY
pm.encryption_manager = enc_mgr
pm.vault = Vault(enc_mgr, tmp_path)
pm.parent_seed = TEST_SEED
pm.fingerprint_dir = tmp_path
pm.current_fingerprint = tmp_path.name
pm.bip85 = SimpleNamespace()
calls = {"confirm": 0}
def fake_confirm(*_a, **_k):
calls["confirm"] += 1
return True
monkeypatch.setattr("seedpass.core.manager.confirm_action", fake_confirm)
with pytest.raises(SystemExit):
pm.initialize_managers()
assert calls["confirm"] == 0

View File

@@ -96,11 +96,11 @@ def test_schema_migration_persisted_once(tmp_path: Path):
}, },
} }
enc_mgr.save_json_data(legacy) enc_mgr.save_json_data(legacy)
data, migrated = vault.load_index(return_migration_flag=True) data, migrated, _ = vault.load_index(return_migration_flags=True)
assert migrated is True assert migrated is True
assert data["schema_version"] == LATEST_VERSION assert data["schema_version"] == LATEST_VERSION
assert data["entries"]["0"]["tags"] == [] assert data["entries"]["0"]["tags"] == []
data_again, migrated_again = vault.load_index(return_migration_flag=True) data_again, migrated_again, _ = vault.load_index(return_migration_flags=True)
assert migrated_again is False assert migrated_again is False
assert data_again == data assert data_again == data