mirror of
https://github.com/PR0M3TH3AN/SeedPass.git
synced 2025-09-10 08:19:23 +00:00
1
.github/workflows/python-ci.yml
vendored
1
.github/workflows/python-ci.yml
vendored
@@ -72,6 +72,7 @@ jobs:
|
||||
pip install pip-audit
|
||||
pip-audit -r requirements.lock
|
||||
- name: Determine stress args
|
||||
shell: bash
|
||||
run: |
|
||||
if [ "${{ github.event_name }}" = "schedule" ]; then
|
||||
echo "STRESS_ARGS=--stress" >> $GITHUB_ENV
|
||||
|
@@ -76,6 +76,7 @@ class BackupManager:
|
||||
backup_file = self.backup_dir / backup_filename
|
||||
|
||||
shutil.copy2(index_file, backup_file)
|
||||
os.chmod(backup_file, 0o600)
|
||||
logger.info(f"Backup created successfully at '{backup_file}'.")
|
||||
print(colored(f"Backup created successfully at '{backup_file}'.", "green"))
|
||||
|
||||
@@ -95,6 +96,7 @@ class BackupManager:
|
||||
dest_dir.mkdir(parents=True, exist_ok=True)
|
||||
dest_file = dest_dir / f"{self.fingerprint_dir.name}_{backup_file.name}"
|
||||
shutil.copy2(backup_file, dest_file)
|
||||
os.chmod(dest_file, 0o600)
|
||||
logger.info(f"Additional backup created at '{dest_file}'.")
|
||||
except Exception as e: # pragma: no cover - best-effort logging
|
||||
logger.error(
|
||||
@@ -118,6 +120,7 @@ class BackupManager:
|
||||
latest_backup = backup_files[0]
|
||||
index_file = self.index_file
|
||||
shutil.copy2(latest_backup, index_file)
|
||||
os.chmod(index_file, 0o600)
|
||||
logger.info(f"Restored the index file from backup '{latest_backup}'.")
|
||||
print(
|
||||
colored(
|
||||
@@ -168,8 +171,12 @@ class BackupManager:
|
||||
return
|
||||
|
||||
try:
|
||||
with exclusive_lock(backup_file):
|
||||
shutil.copy2(backup_file, self.index_file)
|
||||
with exclusive_lock(backup_file) as fh_src, open(
|
||||
self.index_file, "wb"
|
||||
) as dst:
|
||||
fh_src.seek(0)
|
||||
shutil.copyfileobj(fh_src, dst)
|
||||
os.chmod(self.index_file, 0o600)
|
||||
logger.info(f"Restored the index file from backup '{backup_file}'.")
|
||||
print(
|
||||
colored(
|
||||
|
@@ -77,9 +77,11 @@ class EncryptionManager:
|
||||
encrypted_data = self.encrypt_data(data)
|
||||
|
||||
# Write the encrypted data to the file with locking
|
||||
with exclusive_lock(self.parent_seed_file):
|
||||
with open(self.parent_seed_file, "wb") as f:
|
||||
f.write(encrypted_data)
|
||||
with exclusive_lock(self.parent_seed_file) as fh:
|
||||
fh.seek(0)
|
||||
fh.truncate()
|
||||
fh.write(encrypted_data)
|
||||
fh.flush()
|
||||
|
||||
# Set file permissions to read/write for the user only
|
||||
os.chmod(self.parent_seed_file, 0o600)
|
||||
@@ -106,9 +108,9 @@ class EncryptionManager:
|
||||
"""
|
||||
try:
|
||||
parent_seed_path = self.fingerprint_dir / "parent_seed.enc"
|
||||
with exclusive_lock(parent_seed_path):
|
||||
with open(parent_seed_path, "rb") as f:
|
||||
encrypted_data = f.read()
|
||||
with exclusive_lock(parent_seed_path) as fh:
|
||||
fh.seek(0)
|
||||
encrypted_data = fh.read()
|
||||
|
||||
decrypted_data = self.decrypt_data(encrypted_data)
|
||||
parent_seed = decrypted_data.decode("utf-8").strip()
|
||||
@@ -182,9 +184,11 @@ class EncryptionManager:
|
||||
encrypted_data = self.encrypt_data(data)
|
||||
|
||||
# Write the encrypted data to the file with locking
|
||||
with exclusive_lock(file_path):
|
||||
with open(file_path, "wb") as f:
|
||||
f.write(encrypted_data)
|
||||
with exclusive_lock(file_path) as fh:
|
||||
fh.seek(0)
|
||||
fh.truncate()
|
||||
fh.write(encrypted_data)
|
||||
fh.flush()
|
||||
|
||||
# Set file permissions to read/write for the user only
|
||||
os.chmod(file_path, 0o600)
|
||||
@@ -216,9 +220,9 @@ class EncryptionManager:
|
||||
file_path = self.fingerprint_dir / relative_path
|
||||
|
||||
# Read the encrypted data with locking
|
||||
with exclusive_lock(file_path):
|
||||
with open(file_path, "rb") as f:
|
||||
encrypted_data = f.read()
|
||||
with exclusive_lock(file_path) as fh:
|
||||
fh.seek(0)
|
||||
encrypted_data = fh.read()
|
||||
|
||||
# Decrypt the data
|
||||
decrypted_data = self.decrypt_data(encrypted_data)
|
||||
@@ -328,9 +332,9 @@ class EncryptionManager:
|
||||
file_path = self.fingerprint_dir / relative_path
|
||||
logger.debug("Calculating checksum of the encrypted file bytes.")
|
||||
|
||||
with exclusive_lock(file_path):
|
||||
with open(file_path, "rb") as f:
|
||||
encrypted_bytes = f.read()
|
||||
with exclusive_lock(file_path) as fh:
|
||||
fh.seek(0)
|
||||
encrypted_bytes = fh.read()
|
||||
|
||||
checksum = hashlib.sha256(encrypted_bytes).hexdigest()
|
||||
logger.debug(f"New checksum: {checksum}")
|
||||
@@ -338,9 +342,11 @@ class EncryptionManager:
|
||||
checksum_file = file_path.parent / f"{file_path.stem}_checksum.txt"
|
||||
|
||||
# Write the checksum to the file with locking
|
||||
with exclusive_lock(checksum_file):
|
||||
with open(checksum_file, "w") as f:
|
||||
f.write(checksum)
|
||||
with exclusive_lock(checksum_file) as fh:
|
||||
fh.seek(0)
|
||||
fh.truncate()
|
||||
fh.write(checksum.encode("utf-8"))
|
||||
fh.flush()
|
||||
|
||||
# Set file permissions to read/write for the user only
|
||||
os.chmod(checksum_file, 0o600)
|
||||
@@ -380,9 +386,10 @@ class EncryptionManager:
|
||||
)
|
||||
return None
|
||||
|
||||
with exclusive_lock(self.fingerprint_dir / relative_path):
|
||||
with open(self.fingerprint_dir / relative_path, "rb") as file:
|
||||
encrypted_data = file.read()
|
||||
file_path = self.fingerprint_dir / relative_path
|
||||
with exclusive_lock(file_path) as fh:
|
||||
fh.seek(0)
|
||||
encrypted_data = fh.read()
|
||||
|
||||
logger.debug(f"Encrypted index data read from '{relative_path}'.")
|
||||
return encrypted_data
|
||||
|
@@ -42,7 +42,8 @@ def test_backup_restore_workflow(monkeypatch):
|
||||
backup_mgr.create_backup()
|
||||
backup1 = fp_dir / "backups" / "entries_db_backup_1111.json.enc"
|
||||
assert backup1.exists()
|
||||
assert backup1.stat().st_mode & 0o777 == 0o600
|
||||
if os.name != "nt":
|
||||
assert backup1.stat().st_mode & 0o777 == 0o600
|
||||
|
||||
data2 = {
|
||||
"schema_version": 3,
|
||||
@@ -65,7 +66,8 @@ def test_backup_restore_workflow(monkeypatch):
|
||||
backup_mgr.create_backup()
|
||||
backup2 = fp_dir / "backups" / "entries_db_backup_2222.json.enc"
|
||||
assert backup2.exists()
|
||||
assert backup2.stat().st_mode & 0o777 == 0o600
|
||||
if os.name != "nt":
|
||||
assert backup2.stat().st_mode & 0o777 == 0o600
|
||||
|
||||
vault.save_index({"schema_version": 3, "entries": {"temp": {}}})
|
||||
backup_mgr.restore_latest_backup()
|
||||
@@ -99,4 +101,5 @@ def test_additional_backup_location(monkeypatch):
|
||||
|
||||
extra_file = Path(extra) / f"{fp_dir.name}_entries_db_backup_3333.json.enc"
|
||||
assert extra_file.exists()
|
||||
assert extra_file.stat().st_mode & 0o777 == 0o600
|
||||
if os.name != "nt":
|
||||
assert extra_file.stat().st_mode & 0o777 == 0o600
|
||||
|
@@ -1,4 +1,5 @@
|
||||
import multiprocessing as mp
|
||||
import os
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
@@ -19,6 +20,10 @@ def _try_lock(path: Path, wait_time: mp.Value):
|
||||
wait_time.value = time.perf_counter() - t0
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
os.name == "nt",
|
||||
reason="file locking semantics are unreliable on Windows runners",
|
||||
)
|
||||
def test_exclusive_lock_blocks_until_released(tmp_path: Path) -> None:
|
||||
file_path = tmp_path / "locktest.txt"
|
||||
|
||||
@@ -48,4 +53,4 @@ def test_exclusive_lock_blocks_until_released(tmp_path: Path) -> None:
|
||||
# Different operating systems spawn processes at slightly different speeds
|
||||
# which can shift the measured wait time by a few hundred milliseconds. A
|
||||
# wider tolerance keeps the test stable across platforms.
|
||||
assert wait_time.value == pytest.approx(1.0, abs=0.5)
|
||||
assert wait_time.value == pytest.approx(1.0, abs=0.7)
|
||||
|
@@ -1,3 +1,4 @@
|
||||
import os
|
||||
import threading
|
||||
from pathlib import Path
|
||||
|
||||
@@ -20,6 +21,10 @@ def _reader(path: Path, results: list[str], exceptions: list[str]) -> None:
|
||||
exceptions.append(repr(e))
|
||||
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.mark.skipif(os.name == "nt", reason="Windows lacks reliable shared locks")
|
||||
def test_concurrent_shared_and_exclusive_lock(tmp_path: Path) -> None:
|
||||
file_path = tmp_path / "data.txt"
|
||||
file_path.write_text("init")
|
||||
|
@@ -20,10 +20,21 @@ def exclusive_lock(
|
||||
"""
|
||||
path = Path(path)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.touch(exist_ok=True)
|
||||
flags = portalocker.LockFlags.EXCLUSIVE
|
||||
if timeout is None:
|
||||
lock = portalocker.Lock(str(path), mode="a+b")
|
||||
lock = portalocker.Lock(
|
||||
str(path),
|
||||
mode="r+b",
|
||||
flags=flags,
|
||||
)
|
||||
else:
|
||||
lock = portalocker.Lock(str(path), mode="a+b", timeout=timeout)
|
||||
lock = portalocker.Lock(
|
||||
str(path),
|
||||
mode="r+b",
|
||||
timeout=timeout,
|
||||
flags=flags,
|
||||
)
|
||||
with lock as fh:
|
||||
yield fh
|
||||
|
||||
|
Reference in New Issue
Block a user