Merge pull request #588 from PR0M3TH3AN/codex/modify-decryption-method-for-strict-handling

Improve Nostr sync failure handling
This commit is contained in:
thePR0M3TH3AN
2025-07-16 15:24:20 -04:00
committed by GitHub
6 changed files with 85 additions and 29 deletions

View File

@@ -473,7 +473,9 @@ subfolder (or adjust `APP_DIR` in `constants.py`) if you want to load it with
the main application. The fingerprint is printed after creation and the
encrypted index is published to Nostr. Use that same seed phrase to load
SeedPass. The app checks Nostr on startup and pulls any newer snapshot so your
vault stays in sync across machines.
vault stays in sync across machines. If no snapshot exists or the download
cannot be decrypted (for example when using a brand-new seed), SeedPass
automatically initializes an empty index instead of exiting.
### Automatically Updating the Script Checksum

View File

@@ -223,15 +223,28 @@ class EncryptionManager:
return fh.read()
def decrypt_and_save_index_from_nostr(
self, encrypted_data: bytes, relative_path: Optional[Path] = None
) -> None:
"""Decrypts data from Nostr and saves it, automatically using the new format."""
self,
encrypted_data: bytes,
relative_path: Optional[Path] = None,
*,
strict: bool = True,
) -> bool:
"""Decrypts data from Nostr and saves it.
Parameters
----------
encrypted_data:
The payload downloaded from Nostr.
relative_path:
Destination filename under the profile directory.
strict:
When ``True`` (default) re-raise any decryption error. When ``False``
return ``False`` if decryption fails.
"""
if relative_path is None:
relative_path = Path("seedpass_entries_db.json.enc")
try:
decrypted_data = self.decrypt_data(
encrypted_data
) # This now handles both formats
decrypted_data = self.decrypt_data(encrypted_data)
if USE_ORJSON:
data = json_lib.loads(decrypted_data)
else:
@@ -240,18 +253,22 @@ class EncryptionManager:
self.update_checksum(relative_path)
logger.info("Index file from Nostr was processed and saved successfully.")
print(colored("Index file updated from Nostr successfully.", "green"))
except Exception as e:
logger.error(
f"Failed to decrypt and save data from Nostr: {e}",
exc_info=True,
)
print(
colored(
f"Error: Failed to decrypt and save data from Nostr: {e}",
"red",
return True
except Exception as e: # pragma: no cover - error handling
if strict:
logger.error(
f"Failed to decrypt and save data from Nostr: {e}",
exc_info=True,
)
)
raise
print(
colored(
f"Error: Failed to decrypt and save data from Nostr: {e}",
"red",
)
)
raise
logger.warning(f"Failed to decrypt index from Nostr: {e}")
return False
def update_checksum(self, relative_path: Optional[Path] = None) -> None:
"""Updates the checksum file for the specified file."""

View File

@@ -1103,8 +1103,10 @@ class PasswordManager:
encrypted = deltas[-1]
current = self.vault.get_encrypted_index()
if current != encrypted:
self.vault.decrypt_and_save_index_from_nostr(encrypted)
logger.info("Local database synchronized from Nostr.")
if self.vault.decrypt_and_save_index_from_nostr(
encrypted, strict=False
):
logger.info("Local database synchronized from Nostr.")
except Exception as e:
logger.warning(f"Unable to sync index from Nostr: {e}")
finally:
@@ -1195,14 +1197,12 @@ class PasswordManager:
deltas = asyncio.run(self.nostr_client.fetch_deltas_since(version))
if deltas:
encrypted = deltas[-1]
try:
self.vault.decrypt_and_save_index_from_nostr(encrypted)
success = self.vault.decrypt_and_save_index_from_nostr(
encrypted, strict=False
)
if success:
logger.info("Initialized local database from Nostr.")
have_data = True
except Exception as err:
logger.warning(
f"Failed to decrypt Nostr data: {err}; treating as new account."
)
except Exception as e:
logger.warning(f"Unable to sync index from Nostr: {e}")

View File

@@ -60,9 +60,13 @@ class Vault:
"""Return the encrypted index bytes if present."""
return self.encryption_manager.get_encrypted_index()
def decrypt_and_save_index_from_nostr(self, encrypted_data: bytes) -> None:
def decrypt_and_save_index_from_nostr(
self, encrypted_data: bytes, *, strict: bool = True
) -> bool:
"""Decrypt Nostr payload and overwrite the local index."""
self.encryption_manager.decrypt_and_save_index_from_nostr(encrypted_data)
return self.encryption_manager.decrypt_and_save_index_from_nostr(
encrypted_data, strict=strict
)
# ----- Config helpers -----
def load_config(self) -> dict:

View File

@@ -63,7 +63,7 @@ def test_index_export_import_round_trip():
},
}
)
vault.decrypt_and_save_index_from_nostr(encrypted)
assert vault.decrypt_and_save_index_from_nostr(encrypted)
loaded = vault.load_index()
assert loaded["entries"] == original["entries"]

View File

@@ -6,6 +6,9 @@ sys.path.append(str(Path(__file__).resolve().parents[1]))
from utils.fingerprint_manager import FingerprintManager
from password_manager.manager import PasswordManager, EncryptionMode
from helpers import create_vault, dummy_nostr_client
import gzip
from nostr.backup_models import Manifest, ChunkMeta
VALID_SEED = "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about"
@@ -51,3 +54,33 @@ def test_add_and_switch_fingerprint(monkeypatch):
assert pm.current_fingerprint == fingerprint
assert fm.current_fingerprint == fingerprint
assert pm.fingerprint_dir == expected_dir
def test_sync_index_missing_bad_data(monkeypatch, dummy_nostr_client):
client, _relay = dummy_nostr_client
with TemporaryDirectory() as tmpdir:
dir_path = Path(tmpdir)
vault, _enc = create_vault(dir_path)
pm = PasswordManager.__new__(PasswordManager)
pm.fingerprint_dir = dir_path
pm.vault = vault
pm.nostr_client = client
pm.sync_vault = lambda *a, **k: None
manifest = Manifest(
ver=1,
algo="aes-gcm",
chunks=[ChunkMeta(id="c0", size=1, hash="00")],
delta_since=None,
)
monkeypatch.setattr(
client,
"fetch_latest_snapshot",
lambda: (manifest, [gzip.compress(b"garbage")]),
)
monkeypatch.setattr(client, "fetch_deltas_since", lambda *_a, **_k: [])
pm.sync_index_from_nostr_if_missing()
data = pm.vault.load_index()
assert data["entries"] == {}