mirror of
https://github.com/PR0M3TH3AN/SeedPass.git
synced 2025-09-08 07:18:47 +00:00
Merge pull request #50 from PR0M3TH3AN/codex/create-test-for-file-locking-with-threading
Add shared file lock utility and concurrency test
This commit is contained in:
42
src/tests/test_file_locking.py
Normal file
42
src/tests/test_file_locking.py
Normal file
@@ -0,0 +1,42 @@
|
||||
import threading
|
||||
from pathlib import Path
|
||||
|
||||
from utils.file_lock import exclusive_lock, shared_lock
|
||||
|
||||
|
||||
def _writer(path: Path, content: str, exceptions: list[str]) -> None:
|
||||
try:
|
||||
with exclusive_lock(path):
|
||||
path.write_text(content)
|
||||
except Exception as e: # pragma: no cover - just capture
|
||||
exceptions.append(repr(e))
|
||||
|
||||
|
||||
def _reader(path: Path, results: list[str], exceptions: list[str]) -> None:
|
||||
try:
|
||||
with shared_lock(path):
|
||||
results.append(path.read_text())
|
||||
except Exception as e: # pragma: no cover
|
||||
exceptions.append(repr(e))
|
||||
|
||||
|
||||
def test_concurrent_shared_and_exclusive_lock(tmp_path: Path) -> None:
|
||||
file_path = tmp_path / "data.txt"
|
||||
file_path.write_text("init")
|
||||
|
||||
exceptions: list[str] = []
|
||||
reads: list[str] = []
|
||||
for i in range(5):
|
||||
writer = threading.Thread(
|
||||
target=_writer, args=(file_path, f"value{i}", exceptions)
|
||||
)
|
||||
reader = threading.Thread(target=_reader, args=(file_path, reads, exceptions))
|
||||
|
||||
writer.start()
|
||||
reader.start()
|
||||
writer.join()
|
||||
reader.join()
|
||||
|
||||
assert not exceptions
|
||||
assert file_path.read_text() == "value4"
|
||||
assert len(reads) == 5
|
@@ -4,7 +4,7 @@ import logging
|
||||
import traceback
|
||||
|
||||
try:
|
||||
from .file_lock import exclusive_lock
|
||||
from .file_lock import exclusive_lock, shared_lock
|
||||
from .key_derivation import derive_key_from_password, derive_key_from_parent_seed
|
||||
from .checksum import calculate_checksum, verify_checksum
|
||||
from .password_prompt import prompt_for_password
|
||||
@@ -20,5 +20,6 @@ __all__ = [
|
||||
"calculate_checksum",
|
||||
"verify_checksum",
|
||||
"exclusive_lock",
|
||||
"shared_lock",
|
||||
"prompt_for_password",
|
||||
]
|
||||
|
@@ -22,3 +22,25 @@ def exclusive_lock(
|
||||
lock = portalocker.Lock(str(path), mode="a+b", timeout=timeout)
|
||||
with lock as fh:
|
||||
yield fh
|
||||
|
||||
|
||||
@contextmanager
|
||||
def shared_lock(
|
||||
path: Path, timeout: Optional[float] = None
|
||||
) -> Generator[None, None, None]:
|
||||
"""Context manager that locks *path* with a shared lock.
|
||||
|
||||
The function opens the file in binary read/write mode and obtains a
|
||||
shared lock using ``portalocker``. If ``timeout`` is provided, acquiring
|
||||
the lock will wait for at most that many seconds before raising
|
||||
``portalocker.exceptions.LockException``.
|
||||
"""
|
||||
path = Path(path)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.touch(exist_ok=True)
|
||||
lock = portalocker.Lock(
|
||||
str(path), mode="r+b", timeout=timeout, flags=portalocker.LockFlags.SHARED
|
||||
)
|
||||
with lock as fh:
|
||||
fh.seek(0)
|
||||
yield fh
|
||||
|
Reference in New Issue
Block a user