Merge pull request #96 from PR0M3TH3AN/codex/add-concurrency-stress-test-for-vault-and-backupmanager

Add concurrency stress test
This commit is contained in:
thePR0M3TH3AN
2025-07-01 14:11:52 -04:00
committed by GitHub
3 changed files with 84 additions and 6 deletions

View File

@@ -1,7 +1,8 @@
"""Vault utilities for reading and writing encrypted files.""" """Vault utilities for reading and writing encrypted files."""
from pathlib import Path from pathlib import Path
from typing import Optional from typing import Optional, Union
from os import PathLike
from .encryption import EncryptionManager from .encryption import EncryptionManager
@@ -12,9 +13,13 @@ class Vault:
INDEX_FILENAME = "seedpass_passwords_db.json.enc" INDEX_FILENAME = "seedpass_passwords_db.json.enc"
CONFIG_FILENAME = "seedpass_config.json.enc" CONFIG_FILENAME = "seedpass_config.json.enc"
def __init__(self, encryption_manager: EncryptionManager, fingerprint_dir: Path): def __init__(
self,
encryption_manager: EncryptionManager,
fingerprint_dir: Union[str, PathLike[str], Path],
):
self.encryption_manager = encryption_manager self.encryption_manager = encryption_manager
self.fingerprint_dir = fingerprint_dir self.fingerprint_dir = Path(fingerprint_dir)
self.index_file = self.fingerprint_dir / self.INDEX_FILENAME self.index_file = self.fingerprint_dir / self.INDEX_FILENAME
self.config_file = self.fingerprint_dir / self.CONFIG_FILENAME self.config_file = self.fingerprint_dir / self.CONFIG_FILENAME

View File

@@ -0,0 +1,72 @@
import sys
from pathlib import Path
from multiprocessing import Process, Queue
from cryptography.fernet import Fernet
import pytest
sys.path.append(str(Path(__file__).resolve().parents[1]))
from password_manager.encryption import EncryptionManager
from password_manager.vault import Vault
from password_manager.backup import BackupManager
def _writer(key: bytes, dir_path: Path, loops: int, out: Queue) -> None:
try:
enc = EncryptionManager(key, dir_path)
vault = Vault(enc, dir_path)
for _ in range(loops):
data = vault.load_index()
data["counter"] = data.get("counter", 0) + 1
vault.save_index(data)
except Exception as e: # pragma: no cover - capture for assertion
out.put(repr(e))
def _reader(key: bytes, dir_path: Path, loops: int, out: Queue) -> None:
try:
enc = EncryptionManager(key, dir_path)
vault = Vault(enc, dir_path)
for _ in range(loops):
vault.load_index()
except Exception as e: # pragma: no cover - capture
out.put(repr(e))
def _backup(dir_path: Path, loops: int, out: Queue) -> None:
try:
bm = BackupManager(dir_path)
for _ in range(loops):
bm.create_backup()
except Exception as e: # pragma: no cover - capture
out.put(repr(e))
@pytest.mark.parametrize("_", range(3))
def test_concurrency_stress(tmp_path: Path, _):
key = Fernet.generate_key()
enc = EncryptionManager(key, tmp_path)
Vault(enc, tmp_path).save_index({"counter": 0})
q: Queue = Queue()
procs = [
Process(target=_writer, args=(key, tmp_path, 20, q)),
Process(target=_writer, args=(key, tmp_path, 20, q)),
Process(target=_reader, args=(key, tmp_path, 20, q)),
Process(target=_reader, args=(key, tmp_path, 20, q)),
Process(target=_backup, args=(tmp_path, 20, q)),
]
for p in procs:
p.start()
for p in procs:
p.join()
errors = []
while not q.empty():
errors.append(q.get())
assert not errors
vault = Vault(EncryptionManager(key, tmp_path), tmp_path)
assert isinstance(vault.load_index(), dict)

View File

@@ -1,14 +1,15 @@
"""File-based locking utilities using portalocker for cross-platform support.""" """File-based locking utilities using portalocker for cross-platform support."""
from contextlib import contextmanager from contextlib import contextmanager
from typing import Generator, Optional from typing import Generator, Optional, Union
from os import PathLike
from pathlib import Path from pathlib import Path
import portalocker import portalocker
@contextmanager @contextmanager
def exclusive_lock( def exclusive_lock(
path: Path, timeout: Optional[float] = None path: Union[str, PathLike[str], Path], timeout: Optional[float] = None
) -> Generator[None, None, None]: ) -> Generator[None, None, None]:
"""Context manager that locks *path* exclusively. """Context manager that locks *path* exclusively.
@@ -29,7 +30,7 @@ def exclusive_lock(
@contextmanager @contextmanager
def shared_lock( def shared_lock(
path: Path, timeout: Optional[float] = None path: Union[str, PathLike[str], Path], timeout: Optional[float] = None
) -> Generator[None, None, None]: ) -> Generator[None, None, None]:
"""Context manager that locks *path* with a shared lock. """Context manager that locks *path* with a shared lock.