Handle decryption failures when syncing index

This commit is contained in:
thePR0M3TH3AN
2025-07-16 15:19:48 -04:00
parent 387bfad220
commit b80abff895
6 changed files with 85 additions and 29 deletions

View File

@@ -223,15 +223,28 @@ class EncryptionManager:
return fh.read()
def decrypt_and_save_index_from_nostr(
self, encrypted_data: bytes, relative_path: Optional[Path] = None
) -> None:
"""Decrypts data from Nostr and saves it, automatically using the new format."""
self,
encrypted_data: bytes,
relative_path: Optional[Path] = None,
*,
strict: bool = True,
) -> bool:
"""Decrypts data from Nostr and saves it.
Parameters
----------
encrypted_data:
The payload downloaded from Nostr.
relative_path:
Destination filename under the profile directory.
strict:
When ``True`` (default) re-raise any decryption error. When ``False``
return ``False`` if decryption fails.
"""
if relative_path is None:
relative_path = Path("seedpass_entries_db.json.enc")
try:
decrypted_data = self.decrypt_data(
encrypted_data
) # This now handles both formats
decrypted_data = self.decrypt_data(encrypted_data)
if USE_ORJSON:
data = json_lib.loads(decrypted_data)
else:
@@ -240,18 +253,22 @@ class EncryptionManager:
self.update_checksum(relative_path)
logger.info("Index file from Nostr was processed and saved successfully.")
print(colored("Index file updated from Nostr successfully.", "green"))
except Exception as e:
logger.error(
f"Failed to decrypt and save data from Nostr: {e}",
exc_info=True,
)
print(
colored(
f"Error: Failed to decrypt and save data from Nostr: {e}",
"red",
return True
except Exception as e: # pragma: no cover - error handling
if strict:
logger.error(
f"Failed to decrypt and save data from Nostr: {e}",
exc_info=True,
)
)
raise
print(
colored(
f"Error: Failed to decrypt and save data from Nostr: {e}",
"red",
)
)
raise
logger.warning(f"Failed to decrypt index from Nostr: {e}")
return False
def update_checksum(self, relative_path: Optional[Path] = None) -> None:
"""Updates the checksum file for the specified file."""

View File

@@ -1103,8 +1103,10 @@ class PasswordManager:
encrypted = deltas[-1]
current = self.vault.get_encrypted_index()
if current != encrypted:
self.vault.decrypt_and_save_index_from_nostr(encrypted)
logger.info("Local database synchronized from Nostr.")
if self.vault.decrypt_and_save_index_from_nostr(
encrypted, strict=False
):
logger.info("Local database synchronized from Nostr.")
except Exception as e:
logger.warning(f"Unable to sync index from Nostr: {e}")
finally:
@@ -1195,14 +1197,12 @@ class PasswordManager:
deltas = asyncio.run(self.nostr_client.fetch_deltas_since(version))
if deltas:
encrypted = deltas[-1]
try:
self.vault.decrypt_and_save_index_from_nostr(encrypted)
success = self.vault.decrypt_and_save_index_from_nostr(
encrypted, strict=False
)
if success:
logger.info("Initialized local database from Nostr.")
have_data = True
except Exception as err:
logger.warning(
f"Failed to decrypt Nostr data: {err}; treating as new account."
)
except Exception as e:
logger.warning(f"Unable to sync index from Nostr: {e}")

View File

@@ -60,9 +60,13 @@ class Vault:
"""Return the encrypted index bytes if present."""
return self.encryption_manager.get_encrypted_index()
def decrypt_and_save_index_from_nostr(self, encrypted_data: bytes) -> None:
def decrypt_and_save_index_from_nostr(
self, encrypted_data: bytes, *, strict: bool = True
) -> bool:
"""Decrypt Nostr payload and overwrite the local index."""
self.encryption_manager.decrypt_and_save_index_from_nostr(encrypted_data)
return self.encryption_manager.decrypt_and_save_index_from_nostr(
encrypted_data, strict=strict
)
# ----- Config helpers -----
def load_config(self) -> dict:

View File

@@ -63,7 +63,7 @@ def test_index_export_import_round_trip():
},
}
)
vault.decrypt_and_save_index_from_nostr(encrypted)
assert vault.decrypt_and_save_index_from_nostr(encrypted)
loaded = vault.load_index()
assert loaded["entries"] == original["entries"]

View File

@@ -6,6 +6,9 @@ sys.path.append(str(Path(__file__).resolve().parents[1]))
from utils.fingerprint_manager import FingerprintManager
from password_manager.manager import PasswordManager, EncryptionMode
from helpers import create_vault, dummy_nostr_client
import gzip
from nostr.backup_models import Manifest, ChunkMeta
VALID_SEED = "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about"
@@ -51,3 +54,33 @@ def test_add_and_switch_fingerprint(monkeypatch):
assert pm.current_fingerprint == fingerprint
assert fm.current_fingerprint == fingerprint
assert pm.fingerprint_dir == expected_dir
def test_sync_index_missing_bad_data(monkeypatch, dummy_nostr_client):
client, _relay = dummy_nostr_client
with TemporaryDirectory() as tmpdir:
dir_path = Path(tmpdir)
vault, _enc = create_vault(dir_path)
pm = PasswordManager.__new__(PasswordManager)
pm.fingerprint_dir = dir_path
pm.vault = vault
pm.nostr_client = client
pm.sync_vault = lambda *a, **k: None
manifest = Manifest(
ver=1,
algo="aes-gcm",
chunks=[ChunkMeta(id="c0", size=1, hash="00")],
delta_since=None,
)
monkeypatch.setattr(
client,
"fetch_latest_snapshot",
lambda: (manifest, [gzip.compress(b"garbage")]),
)
monkeypatch.setattr(client, "fetch_deltas_since", lambda *_a, **_k: [])
pm.sync_index_from_nostr_if_missing()
data = pm.vault.load_index()
assert data["entries"] == {}