Merge pull request #53 from PR0M3TH3AN/beta

Beta
This commit is contained in:
thePR0M3TH3AN
2025-06-29 23:39:08 -04:00
committed by GitHub
21 changed files with 420 additions and 169 deletions

11
.gitattributes vendored Normal file
View File

@@ -0,0 +1,11 @@
* text=auto eol=lf
*.png binary
*.jpg binary
*.jpeg binary
*.gif binary
*.svg binary
*.ico binary
*.pdf binary
*.eps binary
*.ai binary
*.penpot binary

View File

@@ -8,12 +8,49 @@ on:
jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
python-version: ["3.11"]
exclude:
- os: windows-latest
python-version: "3.11"
include:
- os: windows-latest
python-version: "3.10"
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.11'
python-version: ${{ matrix.python-version }}
- name: Install build tools (Linux/macOS)
if: runner.os != 'Windows'
shell: bash
run: |
if [ "$RUNNER_OS" = "Linux" ]; then
sudo apt-get update
sudo apt-get install -y pkg-config build-essential
else
brew install pkg-config
fi
- name: Install MSYS2 toolchain
if: runner.os == 'Windows'
id: msys
uses: msys2/setup-msys2@v2
with:
update: true
install: >-
mingw-w64-x86_64-toolchain
base-devel
- name: Set PKG_CONFIG_PATH
if: runner.os == 'Windows'
shell: bash
run: echo "PKG_CONFIG_PATH=${{ steps.msys.outputs.msys2-location }}/mingw64/lib/pkgconfig" >> $GITHUB_ENV
- name: Add MSYS2 tools to PATH
if: runner.os == 'Windows'
shell: bash
run: echo "${{ steps.msys.outputs.msys2-location }}/mingw64/bin" >> $GITHUB_PATH
- name: Cache pip
uses: actions/cache@v3
with:
@@ -27,11 +64,12 @@ jobs:
python -m pip install --upgrade pip
pip install -r src/requirements.txt
- name: Run tests with coverage
shell: bash
run: |
pytest --cov=src --cov-report=xml --cov-report=term-missing \
--cov-fail-under=20 src/tests
- name: Upload coverage report
uses: actions/upload-artifact@v4
with:
name: coverage-xml
name: coverage-xml-${{ matrix.os }}
path: coverage.xml

View File

@@ -13,6 +13,11 @@
This software was not developed by an experienced security expert and should be used with caution. There may be bugs and missing features. For instance, the maximum size of the index before the Nostr backup starts to have problems is unknown. Additionally, the security of the program's memory management and logs has not been evaluated and may leak sensitive information.
---
### Supported OS
✔ Windows 10/11 • macOS 12+ • Any modern Linux
SeedPass now uses the `portalocker` library for cross-platform file locking. No WSL or Cygwin required.
## Table of Contents
@@ -81,7 +86,7 @@ Activate the virtual environment using the appropriate command for your operatin
source venv/bin/activate
```
- **On Windows:** (Note: SeedPass currently does not support Windows)
- **On Windows:**
```bash
venv\Scripts\activate
@@ -178,11 +183,11 @@ You can manage the relay list or change the PIN through the **Settings** menu:
## Running Tests
SeedPass includes a small suite of unit tests. After activating your virtual environment and installing dependencies, run the tests with **pytest**:
SeedPass includes a small suite of unit tests. After activating your virtual environment and installing dependencies, run the tests with **pytest**. Use `-vv` to see INFO-level log messages from each passing test:
```bash
pip install -r src/requirements.txt
pytest
pytest -vv
```
## Security Considerations

3
pytest.ini Normal file
View File

@@ -0,0 +1,3 @@
[pytest]
log_cli = true
log_cli_level = INFO

View File

@@ -1,5 +1,6 @@
# main.py
import os
from pathlib import Path
import sys
import logging
import signal
@@ -23,13 +24,13 @@ def configure_logging():
logger.removeHandler(handler)
# Ensure the 'logs' directory exists
log_directory = "logs"
if not os.path.exists(log_directory):
os.makedirs(log_directory)
log_directory = Path("logs")
if not log_directory.exists():
log_directory.mkdir(parents=True, exist_ok=True)
# Create handlers
c_handler = logging.StreamHandler(sys.stdout)
f_handler = logging.FileHandler(os.path.join(log_directory, "main.log"))
f_handler = logging.FileHandler(log_directory / "main.log")
# Set levels: only errors and critical messages will be shown in the console
c_handler.setLevel(logging.ERROR)

View File

@@ -11,19 +11,41 @@ import concurrent.futures
from typing import List, Optional, Callable
from pathlib import Path
from monstr.client.client import ClientPool
from monstr.encrypt import Keys, NIP4Encrypt
from monstr.event.event import Event
try:
from monstr.client.client import ClientPool
from monstr.encrypt import Keys, NIP4Encrypt
from monstr.event.event import Event
except ImportError: # Fallback placeholders when monstr is unavailable
NIP4Encrypt = None
Event = None
class ClientPool: # minimal stub for tests when monstr is absent
def __init__(self, relays):
self.relays = relays
self.connected = True
async def run(self):
pass
def publish(self, event):
pass
def subscribe(self, handlers=None, filters=None, sub_id=None):
pass
def unsubscribe(self, sub_id):
pass
from .coincurve_keys import Keys
import threading
import uuid
import fcntl
from .key_manager import KeyManager
from .encryption_manager import EncryptionManager
from .event_handler import EventHandler
from constants import APP_DIR
from utils.file_lock import lock_file
from utils.file_lock import exclusive_lock
# Get the logger for this module
logger = logging.getLogger(__name__)
@@ -103,6 +125,8 @@ class NostrClient:
"""
try:
logger.debug("Initializing ClientPool with relays.")
if ClientPool is None:
raise ImportError("monstr library is required for ClientPool")
self.client_pool = ClientPool(self.relays)
# Start the ClientPool in a separate thread
@@ -257,6 +281,8 @@ class NostrClient:
content_base64 = event.content
if event.kind == Event.KIND_ENCRYPT:
if NIP4Encrypt is None:
raise ImportError("monstr library required for NIP4 encryption")
nip4_encrypt = NIP4Encrypt(self.key_manager.keys)
content_base64 = nip4_encrypt.decrypt_message(
event.content, event.pub_key
@@ -416,7 +442,7 @@ class NostrClient:
json.dumps(data).encode("utf-8")
)
index_file_path = self.fingerprint_dir / "seedpass_passwords_db.json.enc"
with lock_file(index_file_path, fcntl.LOCK_EX):
with exclusive_lock(index_file_path):
with open(index_file_path, "wb") as f:
f.write(encrypted_data)
logger.debug(f"Encrypted data saved to {index_file_path}.")
@@ -442,7 +468,7 @@ class NostrClient:
checksum_file = self.fingerprint_dir / "seedpass_passwords_db_checksum.txt"
with lock_file(checksum_file, fcntl.LOCK_EX):
with exclusive_lock(checksum_file):
with open(checksum_file, "w") as f:
f.write(checksum)
@@ -465,7 +491,7 @@ class NostrClient:
:return: Decrypted data as bytes.
"""
try:
with lock_file(file_path, fcntl.LOCK_SH):
with exclusive_lock(file_path):
with open(file_path, "rb") as f:
encrypted_data = f.read()
decrypted_data = self.encryption_manager.decrypt_data(encrypted_data)
@@ -501,6 +527,8 @@ class NostrClient:
event.created_at = int(time.time())
if to_pubkey:
if NIP4Encrypt is None:
raise ImportError("monstr library required for NIP4 encryption")
nip4_encrypt = NIP4Encrypt(self.key_manager.keys)
event.content = nip4_encrypt.encrypt_message(event.content, to_pubkey)
event.kind = Event.KIND_ENCRYPT

View File

@@ -0,0 +1,45 @@
from __future__ import annotations
from bech32 import bech32_encode, bech32_decode, convertbits
from coincurve import PrivateKey, PublicKey
class Keys:
"""Minimal replacement for monstr.encrypt.Keys using coincurve."""
def __init__(self, priv_k: str | None = None, pub_k: str | None = None):
if priv_k is not None:
if priv_k.startswith("nsec"):
priv_k = self.bech32_to_hex(priv_k)
self._priv_k = priv_k
priv = PrivateKey(bytes.fromhex(priv_k))
else:
priv = PrivateKey()
self._priv_k = priv.to_hex()
pub = priv.public_key.format(compressed=True).hex()[2:]
if pub_k:
if pub_k.startswith("npub"):
pub_k = self.bech32_to_hex(pub_k)
self._pub_k = pub_k
else:
self._pub_k = pub
@staticmethod
def hex_to_bech32(key_str: str, prefix: str = "npub") -> str:
data = convertbits(bytes.fromhex(key_str), 8, 5)
return bech32_encode(prefix, data)
@staticmethod
def bech32_to_hex(key: str) -> str:
hrp, data = bech32_decode(key)
if data is None:
raise ValueError("Invalid bech32 key")
decoded = convertbits(data, 5, 8, False)
return bytes(decoded).hex()
def private_key_hex(self) -> str:
return self._priv_k
def public_key_hex(self) -> str:
return self._pub_k

View File

@@ -3,7 +3,16 @@
import time
import logging
import traceback
from monstr.event.event import Event
try:
from monstr.event.event import Event
except ImportError: # pragma: no cover - optional dependency
class Event: # minimal placeholder for type hints when monstr is absent
id: str
created_at: int
content: str
# Instantiate the logger
logger = logging.getLogger(__name__)

View File

@@ -7,7 +7,7 @@ from bech32 import bech32_encode, convertbits
from local_bip85.bip85 import BIP85
from bip_utils import Bip39SeedGenerator
from monstr.encrypt import Keys
from .coincurve_keys import Keys
logger = logging.getLogger(__name__)

View File

@@ -16,11 +16,10 @@ import os
import shutil
import time
import traceback
import fcntl
from pathlib import Path
from termcolor import colored
from utils.file_lock import lock_file
from utils.file_lock import exclusive_lock
from constants import APP_DIR
# Instantiate the logger
@@ -144,7 +143,7 @@ class BackupManager:
return
try:
with lock_file(backup_file, lock_type=fcntl.LOCK_SH):
with exclusive_lock(backup_file):
shutil.copy2(backup_file, self.index_file)
logger.info(f"Restored the index file from backup '{backup_file}'.")
print(

View File

@@ -24,8 +24,9 @@ from typing import Optional
from cryptography.fernet import Fernet, InvalidToken
from termcolor import colored
from utils.file_lock import lock_file # Ensure this utility is correctly implemented
import fcntl # For file locking
from utils.file_lock import (
exclusive_lock,
) # Ensure this utility is correctly implemented
# Instantiate the logger
logger = logging.getLogger(__name__)
@@ -77,7 +78,7 @@ class EncryptionManager:
encrypted_data = self.encrypt_data(data)
# Write the encrypted data to the file with locking
with lock_file(self.parent_seed_file, fcntl.LOCK_EX):
with exclusive_lock(self.parent_seed_file):
with open(self.parent_seed_file, "wb") as f:
f.write(encrypted_data)
@@ -107,7 +108,7 @@ class EncryptionManager:
"""
try:
parent_seed_path = self.fingerprint_dir / "parent_seed.enc"
with lock_file(parent_seed_path, fcntl.LOCK_SH):
with exclusive_lock(parent_seed_path):
with open(parent_seed_path, "rb") as f:
encrypted_data = f.read()
@@ -188,7 +189,7 @@ class EncryptionManager:
encrypted_data = self.encrypt_data(data)
# Write the encrypted data to the file with locking
with lock_file(file_path, fcntl.LOCK_EX):
with exclusive_lock(file_path):
with open(file_path, "wb") as f:
f.write(encrypted_data)
@@ -220,7 +221,7 @@ class EncryptionManager:
file_path = self.fingerprint_dir / relative_path
# Read the encrypted data with locking
with lock_file(file_path, fcntl.LOCK_SH):
with exclusive_lock(file_path):
with open(file_path, "rb") as f:
encrypted_data = f.read()
@@ -351,7 +352,7 @@ class EncryptionManager:
checksum_file = file_path.parent / f"{file_path.stem}_checksum.txt"
# Write the checksum to the file with locking
with lock_file(checksum_file, fcntl.LOCK_EX):
with exclusive_lock(checksum_file):
with open(checksum_file, "w") as f:
f.write(checksum)
@@ -392,7 +393,7 @@ class EncryptionManager:
)
return None
with lock_file(self.fingerprint_dir / relative_path, fcntl.LOCK_SH):
with exclusive_lock(self.fingerprint_dir / relative_path):
with open(self.fingerprint_dir / relative_path, "rb") as file:
encrypted_data = file.read()

View File

@@ -29,9 +29,8 @@ from pathlib import Path
from termcolor import colored
from password_manager.encryption import EncryptionManager
from utils.file_lock import lock_file
from utils.file_lock import exclusive_lock
import fcntl
# Instantiate the logger
logger = logging.getLogger(__name__)
@@ -407,7 +406,8 @@ class EntryManager:
:param backup_path: The file path of the backup to restore from.
"""
try:
if not os.path.exists(backup_path):
backup_path = Path(backup_path)
if not backup_path.exists():
logger.error(f"Backup file '{backup_path}' does not exist.")
print(
colored(

View File

@@ -908,11 +908,59 @@ class PasswordManager:
print(colored(f"Entry updated successfully for index {index}.", "green"))
# Push the updated index to Nostr so changes are backed up.
try:
encrypted_data = self.get_encrypted_data()
if encrypted_data:
self.nostr_client.publish_json_to_nostr(encrypted_data)
logging.info(
"Encrypted index posted to Nostr after entry modification."
)
except Exception as nostr_error:
logging.error(f"Failed to post updated index to Nostr: {nostr_error}")
logging.error(traceback.format_exc())
except Exception as e:
logging.error(f"Error during modifying entry: {e}")
logging.error(traceback.format_exc())
print(colored(f"Error: Failed to modify entry: {e}", "red"))
def delete_entry(self) -> None:
"""Deletes an entry from the password index."""
try:
index_input = input(
"Enter the index number of the entry to delete: "
).strip()
if not index_input.isdigit():
print(colored("Error: Index must be a number.", "red"))
return
index_to_delete = int(index_input)
if not confirm_action(
f"Are you sure you want to delete entry {index_to_delete}? (Y/N): "
):
print(colored("Deletion cancelled.", "yellow"))
return
self.entry_manager.delete_entry(index_to_delete)
# Push updated index to Nostr after deletion
try:
encrypted_data = self.get_encrypted_data()
if encrypted_data:
self.nostr_client.publish_json_to_nostr(encrypted_data)
logging.info(
"Encrypted index posted to Nostr after entry deletion."
)
except Exception as nostr_error:
logging.error(f"Failed to post updated index to Nostr: {nostr_error}")
logging.error(traceback.format_exc())
except Exception as e:
logging.error(f"Error during entry deletion: {e}")
logging.error(traceback.format_exc())
print(colored(f"Error: Failed to delete entry: {e}", "red"))
def handle_verify_checksum(self) -> None:
"""
Handles verifying the script's checksum against the stored checksum to ensure integrity.
@@ -1202,6 +1250,10 @@ class PasswordManager:
)
print(colored("Master password changed successfully.", "green"))
# All data has been re-encrypted with the new password. Since no
# entries changed, avoid pushing the database to Nostr here.
# Subsequent entry modifications will trigger a push when needed.
except Exception as e:
logging.error(f"Failed to change password: {e}")
logging.error(traceback.format_exc())

View File

@@ -3,11 +3,12 @@ termcolor>=1.1.0
cryptography>=40.0.2
bip-utils>=2.5.0
bech32==1.2.0
monstr @ git+https://github.com/monty888/monstr.git@master#egg=monstr
coincurve>=18.0.0
mnemonic
aiohttp
bcrypt
bip85
pytest>=7.0
pytest-cov
portalocker>=2.8

View File

@@ -0,0 +1,43 @@
import multiprocessing as mp
import time
from pathlib import Path
import pytest
from utils.file_lock import exclusive_lock
def _hold_lock(path: Path, hold_time: float, started: mp.Event):
with exclusive_lock(path):
started.set()
time.sleep(hold_time)
def _try_lock(path: Path, wait_time: mp.Value):
t0 = time.perf_counter()
with exclusive_lock(path):
wait_time.value = time.perf_counter() - t0
def test_exclusive_lock_blocks_until_released(tmp_path: Path):
file_path = tmp_path / "locktest.txt"
started = mp.Event()
wait_time = mp.Value("d", 0.0)
p1 = mp.Process(target=_hold_lock, args=(file_path, 1.0, started))
p2 = mp.Process(target=_try_lock, args=(file_path, wait_time))
p1.start()
started.wait()
time.sleep(0.1)
p2.start()
p1.join()
p2.join()
# CI runners can be jittery; allow generous slack around the 1s lock hold time
# Different operating systems spawn processes at slightly different speeds
# which can shift the measured wait time by a few hundred milliseconds. A
# wider tolerance keeps the test stable across platforms.
assert wait_time.value == pytest.approx(1.0, abs=0.5)

View File

@@ -0,0 +1,42 @@
import threading
from pathlib import Path
from utils.file_lock import exclusive_lock, shared_lock
def _writer(path: Path, content: str, exceptions: list[str]) -> None:
try:
with exclusive_lock(path):
path.write_text(content)
except Exception as e: # pragma: no cover - just capture
exceptions.append(repr(e))
def _reader(path: Path, results: list[str], exceptions: list[str]) -> None:
try:
with shared_lock(path):
results.append(path.read_text())
except Exception as e: # pragma: no cover
exceptions.append(repr(e))
def test_concurrent_shared_and_exclusive_lock(tmp_path: Path) -> None:
file_path = tmp_path / "data.txt"
file_path.write_text("init")
exceptions: list[str] = []
reads: list[str] = []
for i in range(5):
writer = threading.Thread(
target=_writer, args=(file_path, f"value{i}", exceptions)
)
reader = threading.Thread(target=_reader, args=(file_path, reads, exceptions))
writer.start()
reader.start()
writer.join()
reader.join()
assert not exceptions
assert file_path.read_text() == "value4"
assert len(reads) == 5

View File

@@ -0,0 +1,18 @@
import logging
import pytest
from utils.key_derivation import derive_key_from_password
def test_derive_key_deterministic():
password = "correct horse battery staple"
key1 = derive_key_from_password(password, iterations=1)
key2 = derive_key_from_password(password, iterations=1)
assert key1 == key2
assert len(key1) == 44
logging.info("Deterministic key derivation succeeded")
def test_derive_key_empty_password_error():
with pytest.raises(ValueError):
derive_key_from_password("")
logging.info("Empty password correctly raised ValueError")

View File

@@ -0,0 +1,46 @@
import sys
from pathlib import Path
from tempfile import TemporaryDirectory
from types import SimpleNamespace
from unittest.mock import patch
from cryptography.fernet import Fernet
sys.path.append(str(Path(__file__).resolve().parents[1]))
from password_manager.encryption import EncryptionManager
from password_manager.entry_management import EntryManager
from password_manager.config_manager import ConfigManager
from password_manager.manager import PasswordManager
def test_change_password_does_not_trigger_nostr_backup(monkeypatch):
with TemporaryDirectory() as tmpdir:
fp = Path(tmpdir)
enc_mgr = EncryptionManager(Fernet.generate_key(), fp)
entry_mgr = EntryManager(enc_mgr, fp)
cfg_mgr = ConfigManager(enc_mgr, fp)
pm = PasswordManager.__new__(PasswordManager)
pm.encryption_manager = enc_mgr
pm.entry_manager = entry_mgr
pm.config_manager = cfg_mgr
pm.password_generator = SimpleNamespace(encryption_manager=enc_mgr)
pm.fingerprint_dir = fp
pm.current_fingerprint = "fp"
pm.parent_seed = "seed"
pm.store_hashed_password = lambda pw: None
pm.verify_password = lambda pw: True
monkeypatch.setattr(
"password_manager.manager.prompt_existing_password", lambda *_: "old"
)
monkeypatch.setattr(
"password_manager.manager.prompt_for_password", lambda: "new"
)
with patch("password_manager.manager.NostrClient") as MockClient:
mock_instance = MockClient.return_value
pm.nostr_client = mock_instance
pm.change_password()
mock_instance.publish_json_to_nostr.assert_not_called()

View File

@@ -4,7 +4,7 @@ import logging
import traceback
try:
from .file_lock import lock_file
from .file_lock import exclusive_lock, shared_lock
from .key_derivation import derive_key_from_password, derive_key_from_parent_seed
from .checksum import calculate_checksum, verify_checksum
from .password_prompt import prompt_for_password
@@ -19,6 +19,7 @@ __all__ = [
"derive_key_from_parent_seed",
"calculate_checksum",
"verify_checksum",
"lock_file",
"exclusive_lock",
"shared_lock",
"prompt_for_password",
]

View File

@@ -1,142 +1,46 @@
# utils/file_lock.py
"""File-based locking utilities using portalocker for cross-platform support."""
"""
File Lock Module
This module provides a single context manager, `lock_file`, for acquiring and releasing
locks on files using the `fcntl` library. It ensures that critical files are accessed
safely, preventing race conditions and maintaining data integrity when multiple processes
or threads attempt to read from or write to the same file concurrently.
I need to change this to something that supports Windows in the future.
Ensure that all dependencies are installed and properly configured in your environment.
"""
import os
import fcntl
import logging
from contextlib import contextmanager
from typing import Generator
from typing import Generator, Optional
from pathlib import Path
from termcolor import colored
import sys
import traceback
# Instantiate the logger
logger = logging.getLogger(__name__)
import portalocker
@contextmanager
def lock_file(file_path: Path, lock_type: int) -> Generator[None, None, None]:
def exclusive_lock(
path: Path, timeout: Optional[float] = None
) -> Generator[None, None, None]:
"""Context manager that locks *path* exclusively.
The function opens the file in binary append mode and obtains an
exclusive lock using ``portalocker``. If ``timeout`` is provided,
acquiring the lock will wait for at most that many seconds before
raising ``portalocker.exceptions.LockException``.
"""
Context manager to acquire a lock on a file.
Parameters:
file_path (Path): The path to the file to lock.
lock_type (int): The type of lock to acquire (`fcntl.LOCK_EX` for exclusive,
`fcntl.LOCK_SH` for shared).
Yields:
None
Raises:
ValueError: If an invalid lock type is provided.
SystemExit: Exits the program if the lock cannot be acquired.
"""
if lock_type not in (fcntl.LOCK_EX, fcntl.LOCK_SH):
logging.error(
f"Invalid lock type: {lock_type}. Use fcntl.LOCK_EX or fcntl.LOCK_SH."
)
print(colored("Error: Invalid lock type provided.", "red"))
sys.exit(1)
file = None
try:
# Determine the mode based on whether the file exists
mode = "rb+" if file_path.exists() else "wb"
# Open the file
file = open(file_path, mode)
logging.debug(f"Opened file '{file_path}' in mode '{mode}' for locking.")
# Acquire the lock
fcntl.flock(file, lock_type)
lock_type_str = "Exclusive" if lock_type == fcntl.LOCK_EX else "Shared"
logging.debug(f"{lock_type_str} lock acquired on '{file_path}'.")
yield # Control is transferred to the block inside the `with` statement
except IOError as e:
lock_type_str = "exclusive" if lock_type == fcntl.LOCK_EX else "shared"
logging.error(f"Failed to acquire {lock_type_str} lock on '{file_path}': {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(
colored(
f"Error: Failed to acquire {lock_type_str} lock on '{file_path}': {e}",
"red",
)
)
sys.exit(1)
finally:
if file:
try:
# Release the lock
fcntl.flock(file, fcntl.LOCK_UN)
logging.debug(f"Lock released on '{file_path}'.")
except Exception as e:
lock_type_str = "exclusive" if lock_type == fcntl.LOCK_EX else "shared"
logging.warning(
f"Failed to release {lock_type_str} lock on '{file_path}': {e}"
)
logging.error(traceback.format_exc()) # Log full traceback
print(
colored(
f"Warning: Failed to release {lock_type_str} lock on '{file_path}': {e}",
"yellow",
)
)
finally:
# Close the file
try:
file.close()
logging.debug(f"File '{file_path}' closed successfully.")
except Exception as e:
logging.warning(f"Failed to close file '{file_path}': {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(
colored(
f"Warning: Failed to close file '{file_path}': {e}",
"yellow",
)
)
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
lock = portalocker.Lock(str(path), mode="a+b", timeout=timeout)
with lock as fh:
yield fh
@contextmanager
def exclusive_lock(file_path: Path) -> Generator[None, None, None]:
def shared_lock(
path: Path, timeout: Optional[float] = None
) -> Generator[None, None, None]:
"""Context manager that locks *path* with a shared lock.
The function opens the file in binary read/write mode and obtains a
shared lock using ``portalocker``. If ``timeout`` is provided, acquiring
the lock will wait for at most that many seconds before raising
``portalocker.exceptions.LockException``.
"""
Convenience context manager to acquire an exclusive lock on a file.
Parameters:
file_path (Path): The path to the file to lock.
Yields:
None
"""
with lock_file(file_path, fcntl.LOCK_EX):
yield
@contextmanager
def shared_lock(file_path: Path) -> Generator[None, None, None]:
"""
Convenience context manager to acquire a shared lock on a file.
Parameters:
file_path (Path): The path to the file to lock.
Yields:
None
"""
with lock_file(file_path, fcntl.LOCK_SH):
yield
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
path.touch(exist_ok=True)
lock = portalocker.Lock(
str(path), mode="r+b", timeout=timeout, flags=portalocker.LockFlags.SHARED
)
with lock as fh:
fh.seek(0)
yield fh

View File

@@ -23,7 +23,11 @@ import traceback
from typing import Union
from bip_utils import Bip39SeedGenerator
from local_bip85.bip85 import BIP85
from monstr.encrypt import Keys
try:
from monstr.encrypt import Keys
except ImportError: # Fall back to local coincurve implementation
from nostr.coincurve_keys import Keys
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend