Add modified_ts tracking and merge logic

This commit is contained in:
thePR0M3TH3AN
2025-07-24 20:28:21 -04:00
parent db90d9caf0
commit 747e2be5a9
6 changed files with 90 additions and 10 deletions

View File

@@ -228,6 +228,7 @@ class EncryptionManager:
relative_path: Optional[Path] = None, relative_path: Optional[Path] = None,
*, *,
strict: bool = True, strict: bool = True,
merge: bool = False,
) -> bool: ) -> bool:
"""Decrypts data from Nostr and saves it. """Decrypts data from Nostr and saves it.
@@ -249,6 +250,20 @@ class EncryptionManager:
data = json_lib.loads(decrypted_data) data = json_lib.loads(decrypted_data)
else: else:
data = json_lib.loads(decrypted_data.decode("utf-8")) data = json_lib.loads(decrypted_data.decode("utf-8"))
if merge and (self.fingerprint_dir / relative_path).exists():
current = self.load_json_data(relative_path)
current_entries = current.get("entries", {})
for idx, entry in data.get("entries", {}).items():
cur_ts = current_entries.get(idx, {}).get("modified_ts", 0)
new_ts = entry.get("modified_ts", 0)
if idx not in current_entries or new_ts >= cur_ts:
current_entries[idx] = entry
current["entries"] = current_entries
if "schema_version" in data:
current["schema_version"] = max(
current.get("schema_version", 0), data.get("schema_version", 0)
)
data = current
self.save_json_data(data, relative_path) # This always saves in V2 format self.save_json_data(data, relative_path) # This always saves in V2 format
self.update_checksum(relative_path) self.update_checksum(relative_path)
logger.info("Index file from Nostr was processed and saved successfully.") logger.info("Index file from Nostr was processed and saved successfully.")

View File

@@ -27,6 +27,7 @@ import logging
import hashlib import hashlib
import sys import sys
import shutil import shutil
import time
from typing import Optional, Tuple, Dict, Any, List from typing import Optional, Tuple, Dict, Any, List
from pathlib import Path from pathlib import Path
@@ -97,6 +98,7 @@ class EntryManager:
entry["word_count"] = entry["words"] entry["word_count"] = entry["words"]
entry.pop("words", None) entry.pop("words", None)
entry.setdefault("tags", []) entry.setdefault("tags", [])
entry.setdefault("modified_ts", entry.get("updated", 0))
logger.debug("Index loaded successfully.") logger.debug("Index loaded successfully.")
self._index_cache = data self._index_cache = data
return data return data
@@ -176,6 +178,7 @@ class EntryManager:
"type": EntryType.PASSWORD.value, "type": EntryType.PASSWORD.value,
"kind": EntryType.PASSWORD.value, "kind": EntryType.PASSWORD.value,
"notes": notes, "notes": notes,
"modified_ts": int(time.time()),
"custom_fields": custom_fields or [], "custom_fields": custom_fields or [],
"tags": tags or [], "tags": tags or [],
} }
@@ -236,6 +239,7 @@ class EntryManager:
"type": EntryType.TOTP.value, "type": EntryType.TOTP.value,
"kind": EntryType.TOTP.value, "kind": EntryType.TOTP.value,
"label": label, "label": label,
"modified_ts": int(time.time()),
"index": index, "index": index,
"period": period, "period": period,
"digits": digits, "digits": digits,
@@ -249,6 +253,7 @@ class EntryManager:
"kind": EntryType.TOTP.value, "kind": EntryType.TOTP.value,
"label": label, "label": label,
"secret": secret, "secret": secret,
"modified_ts": int(time.time()),
"period": period, "period": period,
"digits": digits, "digits": digits,
"archived": archived, "archived": archived,
@@ -294,6 +299,7 @@ class EntryManager:
"kind": EntryType.SSH.value, "kind": EntryType.SSH.value,
"index": index, "index": index,
"label": label, "label": label,
"modified_ts": int(time.time()),
"notes": notes, "notes": notes,
"archived": archived, "archived": archived,
"tags": tags or [], "tags": tags or [],
@@ -340,6 +346,7 @@ class EntryManager:
"kind": EntryType.PGP.value, "kind": EntryType.PGP.value,
"index": index, "index": index,
"label": label, "label": label,
"modified_ts": int(time.time()),
"key_type": key_type, "key_type": key_type,
"user_id": user_id, "user_id": user_id,
"notes": notes, "notes": notes,
@@ -392,6 +399,7 @@ class EntryManager:
"kind": EntryType.NOSTR.value, "kind": EntryType.NOSTR.value,
"index": index, "index": index,
"label": label, "label": label,
"modified_ts": int(time.time()),
"notes": notes, "notes": notes,
"archived": archived, "archived": archived,
"tags": tags or [], "tags": tags or [],
@@ -421,6 +429,7 @@ class EntryManager:
"type": EntryType.KEY_VALUE.value, "type": EntryType.KEY_VALUE.value,
"kind": EntryType.KEY_VALUE.value, "kind": EntryType.KEY_VALUE.value,
"label": label, "label": label,
"modified_ts": int(time.time()),
"value": value, "value": value,
"notes": notes, "notes": notes,
"archived": archived, "archived": archived,
@@ -480,6 +489,7 @@ class EntryManager:
"kind": EntryType.SEED.value, "kind": EntryType.SEED.value,
"index": index, "index": index,
"label": label, "label": label,
"modified_ts": int(time.time()),
"word_count": words_num, "word_count": words_num,
"notes": notes, "notes": notes,
"archived": archived, "archived": archived,
@@ -552,6 +562,7 @@ class EntryManager:
"kind": EntryType.MANAGED_ACCOUNT.value, "kind": EntryType.MANAGED_ACCOUNT.value,
"index": index, "index": index,
"label": label, "label": label,
"modified_ts": int(time.time()),
"word_count": word_count, "word_count": word_count,
"notes": notes, "notes": notes,
"fingerprint": fingerprint, "fingerprint": fingerprint,
@@ -682,7 +693,8 @@ class EntryManager:
): ):
entry.setdefault("custom_fields", []) entry.setdefault("custom_fields", [])
logger.debug(f"Retrieved entry at index {index}: {entry}") logger.debug(f"Retrieved entry at index {index}: {entry}")
return entry clean = {k: v for k, v in entry.items() if k != "modified_ts"}
return clean
else: else:
logger.warning(f"No entry found at index {index}.") logger.warning(f"No entry found at index {index}.")
print(colored(f"Warning: No entry found at index {index}.", "yellow")) print(colored(f"Warning: No entry found at index {index}.", "yellow"))
@@ -887,6 +899,8 @@ class EntryManager:
entry["tags"] = tags entry["tags"] = tags
logger.debug(f"Updated tags for index {index}: {tags}") logger.debug(f"Updated tags for index {index}: {tags}")
entry["modified_ts"] = int(time.time())
data["entries"][str(index)] = entry data["entries"][str(index)] = entry
logger.debug(f"Modified entry at index {index}: {entry}") logger.debug(f"Modified entry at index {index}: {entry}")

View File

@@ -1179,7 +1179,7 @@ class PasswordManager:
updated = False updated = False
if current != encrypted: if current != encrypted:
if self.vault.decrypt_and_save_index_from_nostr( if self.vault.decrypt_and_save_index_from_nostr(
encrypted, strict=False encrypted, strict=False, merge=False
): ):
updated = True updated = True
current = encrypted current = encrypted
@@ -1189,7 +1189,7 @@ class PasswordManager:
for delta in deltas: for delta in deltas:
if current != delta: if current != delta:
if self.vault.decrypt_and_save_index_from_nostr( if self.vault.decrypt_and_save_index_from_nostr(
delta, strict=False delta, strict=False, merge=True
): ):
updated = True updated = True
current = delta current = delta
@@ -1314,7 +1314,7 @@ class PasswordManager:
manifest, chunks = result manifest, chunks = result
encrypted = gzip.decompress(b"".join(chunks)) encrypted = gzip.decompress(b"".join(chunks))
success = self.vault.decrypt_and_save_index_from_nostr( success = self.vault.decrypt_and_save_index_from_nostr(
encrypted, strict=False encrypted, strict=False, merge=False
) )
if success: if success:
have_data = True have_data = True
@@ -1325,7 +1325,7 @@ class PasswordManager:
for delta in deltas: for delta in deltas:
if current != delta: if current != delta:
if self.vault.decrypt_and_save_index_from_nostr( if self.vault.decrypt_and_save_index_from_nostr(
delta, strict=False delta, strict=False, merge=True
): ):
current = delta current = delta
logger.info("Initialized local database from Nostr.") logger.info("Initialized local database from Nostr.")
@@ -3641,7 +3641,7 @@ class PasswordManager:
:param encrypted_data: The encrypted data retrieved from Nostr. :param encrypted_data: The encrypted data retrieved from Nostr.
""" """
try: try:
self.vault.decrypt_and_save_index_from_nostr(encrypted_data) self.vault.decrypt_and_save_index_from_nostr(encrypted_data, merge=True)
logging.info("Index file updated from Nostr successfully.") logging.info("Index file updated from Nostr successfully.")
print(colored("Index file updated from Nostr successfully.", "green")) print(colored("Index file updated from Nostr successfully.", "green"))
except Exception as e: except Exception as e:

View File

@@ -61,11 +61,11 @@ class Vault:
return self.encryption_manager.get_encrypted_index() return self.encryption_manager.get_encrypted_index()
def decrypt_and_save_index_from_nostr( def decrypt_and_save_index_from_nostr(
self, encrypted_data: bytes, *, strict: bool = True self, encrypted_data: bytes, *, strict: bool = True, merge: bool = False
) -> bool: ) -> bool:
"""Decrypt Nostr payload and overwrite the local index.""" """Decrypt Nostr payload and update the local index."""
return self.encryption_manager.decrypt_and_save_index_from_nostr( return self.encryption_manager.decrypt_and_save_index_from_nostr(
encrypted_data, strict=strict encrypted_data, strict=strict, merge=merge
) )
# ----- Config helpers ----- # ----- Config helpers -----

View File

@@ -0,0 +1,49 @@
from pathlib import Path
from tempfile import TemporaryDirectory
import pytest
from helpers import create_vault
from seedpass.core.entry_management import EntryManager
from seedpass.core.backup import BackupManager
from seedpass.core.config_manager import ConfigManager
def _setup_mgr(path: Path):
vault, _ = create_vault(path)
cfg = ConfigManager(vault, path)
backup = BackupManager(path, cfg)
return vault, EntryManager(vault, backup)
def test_merge_modified_ts():
with TemporaryDirectory() as tmpdir:
base = Path(tmpdir)
va, ema = _setup_mgr(base / "A")
vb, emb = _setup_mgr(base / "B")
idx0 = ema.add_entry("a", 8)
idx1 = ema.add_entry("b", 8)
# B starts from A's snapshot
enc = va.get_encrypted_index() or b""
vb.decrypt_and_save_index_from_nostr(enc, merge=False)
emb.clear_cache()
assert emb.retrieve_entry(idx0)["username"] == ""
ema.modify_entry(idx0, username="ua")
delta_a = va.get_encrypted_index() or b""
vb.decrypt_and_save_index_from_nostr(delta_a, merge=True)
emb.clear_cache()
assert emb.retrieve_entry(idx0)["username"] == "ua"
emb.modify_entry(idx1, username="ub")
delta_b = vb.get_encrypted_index() or b""
va.decrypt_and_save_index_from_nostr(delta_b, merge=True)
ema.clear_cache()
assert ema.retrieve_entry(idx1)["username"] == "ub"
assert ema.retrieve_entry(idx0)["username"] == "ua"
assert ema.retrieve_entry(idx1)["username"] == "ub"
assert emb.retrieve_entry(idx0)["username"] == "ua"
assert emb.retrieve_entry(idx1)["username"] == "ub"

View File

@@ -44,7 +44,9 @@ def test_add_and_retrieve_entry():
data = enc_mgr.load_json_data(entry_mgr.index_file) data = enc_mgr.load_json_data(entry_mgr.index_file)
assert str(index) in data.get("entries", {}) assert str(index) in data.get("entries", {})
assert data["entries"][str(index)] == entry stored = data["entries"][str(index)]
stored.pop("modified_ts", None)
assert stored == entry
@pytest.mark.parametrize( @pytest.mark.parametrize(