Merge pull request #677 from PR0M3TH3AN/codex/add-modified_ts-management-for-vault-entries

Add per-entry timestamps and merge logic
This commit is contained in:
thePR0M3TH3AN
2025-07-24 20:36:18 -04:00
committed by GitHub
6 changed files with 90 additions and 10 deletions

View File

@@ -228,6 +228,7 @@ class EncryptionManager:
relative_path: Optional[Path] = None,
*,
strict: bool = True,
merge: bool = False,
) -> bool:
"""Decrypts data from Nostr and saves it.
@@ -249,6 +250,20 @@ class EncryptionManager:
data = json_lib.loads(decrypted_data)
else:
data = json_lib.loads(decrypted_data.decode("utf-8"))
if merge and (self.fingerprint_dir / relative_path).exists():
current = self.load_json_data(relative_path)
current_entries = current.get("entries", {})
for idx, entry in data.get("entries", {}).items():
cur_ts = current_entries.get(idx, {}).get("modified_ts", 0)
new_ts = entry.get("modified_ts", 0)
if idx not in current_entries or new_ts >= cur_ts:
current_entries[idx] = entry
current["entries"] = current_entries
if "schema_version" in data:
current["schema_version"] = max(
current.get("schema_version", 0), data.get("schema_version", 0)
)
data = current
self.save_json_data(data, relative_path) # This always saves in V2 format
self.update_checksum(relative_path)
logger.info("Index file from Nostr was processed and saved successfully.")

View File

@@ -27,6 +27,7 @@ import logging
import hashlib
import sys
import shutil
import time
from typing import Optional, Tuple, Dict, Any, List
from pathlib import Path
@@ -97,6 +98,7 @@ class EntryManager:
entry["word_count"] = entry["words"]
entry.pop("words", None)
entry.setdefault("tags", [])
entry.setdefault("modified_ts", entry.get("updated", 0))
logger.debug("Index loaded successfully.")
self._index_cache = data
return data
@@ -176,6 +178,7 @@ class EntryManager:
"type": EntryType.PASSWORD.value,
"kind": EntryType.PASSWORD.value,
"notes": notes,
"modified_ts": int(time.time()),
"custom_fields": custom_fields or [],
"tags": tags or [],
}
@@ -236,6 +239,7 @@ class EntryManager:
"type": EntryType.TOTP.value,
"kind": EntryType.TOTP.value,
"label": label,
"modified_ts": int(time.time()),
"index": index,
"period": period,
"digits": digits,
@@ -249,6 +253,7 @@ class EntryManager:
"kind": EntryType.TOTP.value,
"label": label,
"secret": secret,
"modified_ts": int(time.time()),
"period": period,
"digits": digits,
"archived": archived,
@@ -294,6 +299,7 @@ class EntryManager:
"kind": EntryType.SSH.value,
"index": index,
"label": label,
"modified_ts": int(time.time()),
"notes": notes,
"archived": archived,
"tags": tags or [],
@@ -340,6 +346,7 @@ class EntryManager:
"kind": EntryType.PGP.value,
"index": index,
"label": label,
"modified_ts": int(time.time()),
"key_type": key_type,
"user_id": user_id,
"notes": notes,
@@ -392,6 +399,7 @@ class EntryManager:
"kind": EntryType.NOSTR.value,
"index": index,
"label": label,
"modified_ts": int(time.time()),
"notes": notes,
"archived": archived,
"tags": tags or [],
@@ -421,6 +429,7 @@ class EntryManager:
"type": EntryType.KEY_VALUE.value,
"kind": EntryType.KEY_VALUE.value,
"label": label,
"modified_ts": int(time.time()),
"value": value,
"notes": notes,
"archived": archived,
@@ -480,6 +489,7 @@ class EntryManager:
"kind": EntryType.SEED.value,
"index": index,
"label": label,
"modified_ts": int(time.time()),
"word_count": words_num,
"notes": notes,
"archived": archived,
@@ -552,6 +562,7 @@ class EntryManager:
"kind": EntryType.MANAGED_ACCOUNT.value,
"index": index,
"label": label,
"modified_ts": int(time.time()),
"word_count": word_count,
"notes": notes,
"fingerprint": fingerprint,
@@ -682,7 +693,8 @@ class EntryManager:
):
entry.setdefault("custom_fields", [])
logger.debug(f"Retrieved entry at index {index}: {entry}")
return entry
clean = {k: v for k, v in entry.items() if k != "modified_ts"}
return clean
else:
logger.warning(f"No entry found at index {index}.")
print(colored(f"Warning: No entry found at index {index}.", "yellow"))
@@ -887,6 +899,8 @@ class EntryManager:
entry["tags"] = tags
logger.debug(f"Updated tags for index {index}: {tags}")
entry["modified_ts"] = int(time.time())
data["entries"][str(index)] = entry
logger.debug(f"Modified entry at index {index}: {entry}")

View File

@@ -1179,7 +1179,7 @@ class PasswordManager:
updated = False
if current != encrypted:
if self.vault.decrypt_and_save_index_from_nostr(
encrypted, strict=False
encrypted, strict=False, merge=False
):
updated = True
current = encrypted
@@ -1189,7 +1189,7 @@ class PasswordManager:
for delta in deltas:
if current != delta:
if self.vault.decrypt_and_save_index_from_nostr(
delta, strict=False
delta, strict=False, merge=True
):
updated = True
current = delta
@@ -1314,7 +1314,7 @@ class PasswordManager:
manifest, chunks = result
encrypted = gzip.decompress(b"".join(chunks))
success = self.vault.decrypt_and_save_index_from_nostr(
encrypted, strict=False
encrypted, strict=False, merge=False
)
if success:
have_data = True
@@ -1325,7 +1325,7 @@ class PasswordManager:
for delta in deltas:
if current != delta:
if self.vault.decrypt_and_save_index_from_nostr(
delta, strict=False
delta, strict=False, merge=True
):
current = delta
logger.info("Initialized local database from Nostr.")
@@ -3641,7 +3641,7 @@ class PasswordManager:
:param encrypted_data: The encrypted data retrieved from Nostr.
"""
try:
self.vault.decrypt_and_save_index_from_nostr(encrypted_data)
self.vault.decrypt_and_save_index_from_nostr(encrypted_data, merge=True)
logging.info("Index file updated from Nostr successfully.")
print(colored("Index file updated from Nostr successfully.", "green"))
except Exception as e:

View File

@@ -61,11 +61,11 @@ class Vault:
return self.encryption_manager.get_encrypted_index()
def decrypt_and_save_index_from_nostr(
self, encrypted_data: bytes, *, strict: bool = True
self, encrypted_data: bytes, *, strict: bool = True, merge: bool = False
) -> bool:
"""Decrypt Nostr payload and overwrite the local index."""
"""Decrypt Nostr payload and update the local index."""
return self.encryption_manager.decrypt_and_save_index_from_nostr(
encrypted_data, strict=strict
encrypted_data, strict=strict, merge=merge
)
# ----- Config helpers -----

View File

@@ -0,0 +1,49 @@
from pathlib import Path
from tempfile import TemporaryDirectory
import pytest
from helpers import create_vault
from seedpass.core.entry_management import EntryManager
from seedpass.core.backup import BackupManager
from seedpass.core.config_manager import ConfigManager
def _setup_mgr(path: Path):
vault, _ = create_vault(path)
cfg = ConfigManager(vault, path)
backup = BackupManager(path, cfg)
return vault, EntryManager(vault, backup)
def test_merge_modified_ts():
with TemporaryDirectory() as tmpdir:
base = Path(tmpdir)
va, ema = _setup_mgr(base / "A")
vb, emb = _setup_mgr(base / "B")
idx0 = ema.add_entry("a", 8)
idx1 = ema.add_entry("b", 8)
# B starts from A's snapshot
enc = va.get_encrypted_index() or b""
vb.decrypt_and_save_index_from_nostr(enc, merge=False)
emb.clear_cache()
assert emb.retrieve_entry(idx0)["username"] == ""
ema.modify_entry(idx0, username="ua")
delta_a = va.get_encrypted_index() or b""
vb.decrypt_and_save_index_from_nostr(delta_a, merge=True)
emb.clear_cache()
assert emb.retrieve_entry(idx0)["username"] == "ua"
emb.modify_entry(idx1, username="ub")
delta_b = vb.get_encrypted_index() or b""
va.decrypt_and_save_index_from_nostr(delta_b, merge=True)
ema.clear_cache()
assert ema.retrieve_entry(idx1)["username"] == "ub"
assert ema.retrieve_entry(idx0)["username"] == "ua"
assert ema.retrieve_entry(idx1)["username"] == "ub"
assert emb.retrieve_entry(idx0)["username"] == "ua"
assert emb.retrieve_entry(idx1)["username"] == "ub"

View File

@@ -44,7 +44,9 @@ def test_add_and_retrieve_entry():
data = enc_mgr.load_json_data(entry_mgr.index_file)
assert str(index) in data.get("entries", {})
assert data["entries"][str(index)] == entry
stored = data["entries"][str(index)]
stored.pop("modified_ts", None)
assert stored == entry
@pytest.mark.parametrize(