Use cached index in EntryManager

This commit is contained in:
thePR0M3TH3AN
2025-07-12 22:09:03 -04:00
parent f350d44460
commit 10f447c930
2 changed files with 53 additions and 19 deletions

View File

@@ -119,7 +119,7 @@ class EntryManager:
:return: The next index number as an integer.
"""
try:
data = self.vault.load_index()
data = self._load_index()
if "entries" in data and isinstance(data["entries"], dict):
indices = [int(idx) for idx in data["entries"].keys()]
next_index = max(indices) + 1 if indices else 0
@@ -156,7 +156,7 @@ class EntryManager:
"""
try:
index = self.get_next_index()
data = self.vault.load_index()
data = self._load_index()
data.setdefault("entries", {})
data["entries"][str(index)] = {
@@ -190,7 +190,7 @@ class EntryManager:
def get_next_totp_index(self) -> int:
"""Return the next available derivation index for TOTP secrets."""
data = self.vault.load_index()
data = self._load_index()
entries = data.get("entries", {})
indices = [
int(v.get("index", 0))
@@ -217,7 +217,7 @@ class EntryManager:
) -> str:
"""Add a new TOTP entry and return the provisioning URI."""
entry_id = self.get_next_index()
data = self.vault.load_index()
data = self._load_index()
data.setdefault("entries", {})
if secret is None:
@@ -279,7 +279,7 @@ class EntryManager:
if index is None:
index = self.get_next_index()
data = self.vault.load_index()
data = self._load_index()
data.setdefault("entries", {})
data["entries"][str(index)] = {
"type": EntryType.SSH.value,
@@ -325,7 +325,7 @@ class EntryManager:
if index is None:
index = self.get_next_index()
data = self.vault.load_index()
data = self._load_index()
data.setdefault("entries", {})
data["entries"][str(index)] = {
"type": EntryType.PGP.value,
@@ -377,7 +377,7 @@ class EntryManager:
if index is None:
index = self.get_next_index()
data = self.vault.load_index()
data = self._load_index()
data.setdefault("entries", {})
data["entries"][str(index)] = {
"type": EntryType.NOSTR.value,
@@ -407,7 +407,7 @@ class EntryManager:
index = self.get_next_index()
data = self.vault.load_index()
data = self._load_index()
data.setdefault("entries", {})
data["entries"][str(index)] = {
"type": EntryType.KEY_VALUE.value,
@@ -465,7 +465,7 @@ class EntryManager:
if index is None:
index = self.get_next_index()
data = self.vault.load_index()
data = self._load_index()
data.setdefault("entries", {})
data["entries"][str(index)] = {
"type": EntryType.SEED.value,
@@ -537,7 +537,7 @@ class EntryManager:
account_dir = self.fingerprint_dir / "accounts" / fingerprint
account_dir.mkdir(parents=True, exist_ok=True)
data = self.vault.load_index()
data = self._load_index()
data.setdefault("entries", {})
data["entries"][str(index)] = {
"type": EntryType.MANAGED_ACCOUNT.value,
@@ -612,7 +612,7 @@ class EntryManager:
def export_totp_entries(self, parent_seed: str) -> dict[str, list[dict[str, Any]]]:
"""Return all TOTP secrets and metadata for external use."""
data = self.vault.load_index()
data = self._load_index()
entries = data.get("entries", {})
exported: list[dict[str, Any]] = []
for entry in entries.values():
@@ -662,7 +662,7 @@ class EntryManager:
:return: A dictionary containing the entry details or None if not found.
"""
try:
data = self.vault.load_index()
data = self._load_index()
entry = data.get("entries", {}).get(str(index))
if entry:
@@ -719,7 +719,7 @@ class EntryManager:
:param value: (Optional) New value for key/value entries.
"""
try:
data = self.vault.load_index()
data = self._load_index()
entry = data.get("entries", {}).get(str(index))
if not entry:
@@ -919,7 +919,7 @@ class EntryManager:
``True``.
"""
try:
data = self.vault.load_index()
data = self._load_index()
entries_data = data.get("entries", {})
if not entries_data:
@@ -1030,7 +1030,7 @@ class EntryManager:
self, query: str
) -> List[Tuple[int, str, Optional[str], Optional[str], bool]]:
"""Return entries matching the query across common fields."""
data = self.vault.load_index()
data = self._load_index()
entries_data = data.get("entries", {})
if not entries_data:
@@ -1119,11 +1119,11 @@ class EntryManager:
:param index: The index number of the entry to delete.
"""
try:
data = self.vault.load_index()
data = self._load_index()
if "entries" in data and str(index) in data["entries"]:
del data["entries"][str(index)]
logger.debug(f"Deleted entry at index {index}.")
self.vault.save_index(data)
self._save_index(data)
self.update_checksum()
self.backup_manager.create_backup()
logger.info(f"Entry at index {index} deleted successfully.")
@@ -1154,7 +1154,7 @@ class EntryManager:
Updates the checksum file for the password database to ensure data integrity.
"""
try:
data = self.vault.load_index()
data = self._load_index()
json_content = json.dumps(data, indent=4)
checksum = hashlib.sha256(json_content.encode("utf-8")).hexdigest()
@@ -1200,6 +1200,7 @@ class EntryManager:
)
)
self.clear_cache()
self.update_checksum()
except Exception as e:
@@ -1253,7 +1254,7 @@ class EntryManager:
) -> list[tuple[int, str, str]]:
"""Return a list of entry index, type, and display labels."""
try:
data = self.vault.load_index()
data = self._load_index()
entries_data = data.get("entries", {})
summaries: list[tuple[int, str, str]] = []

View File

@@ -0,0 +1,33 @@
from pathlib import Path
from tempfile import TemporaryDirectory
from unittest.mock import patch
from helpers import create_vault, TEST_SEED, TEST_PASSWORD
from password_manager.entry_management import EntryManager
from password_manager.backup import BackupManager
from password_manager.config_manager import ConfigManager
def test_index_caching():
with TemporaryDirectory() as tmpdir:
vault, _ = create_vault(Path(tmpdir), TEST_SEED, TEST_PASSWORD)
cfg_mgr = ConfigManager(vault, Path(tmpdir))
backup_mgr = BackupManager(Path(tmpdir), cfg_mgr)
entry_mgr = EntryManager(vault, backup_mgr)
# create initial entry so the index file exists
entry_mgr.add_entry("init", 8)
entry_mgr.clear_cache()
with patch.object(vault, "load_index", wraps=vault.load_index) as mocked:
idx = entry_mgr.add_entry("example.com", 8)
assert mocked.call_count == 1
entry = entry_mgr.retrieve_entry(idx)
assert entry["label"] == "example.com"
assert mocked.call_count == 1
entry_mgr.clear_cache()
entry = entry_mgr.retrieve_entry(idx)
assert entry["label"] == "example.com"
assert mocked.call_count == 2