mirror of
https://github.com/PR0M3TH3AN/SeedPass.git
synced 2025-09-09 15:58:48 +00:00
test: cover audit logging
This commit is contained in:
@@ -14,7 +14,8 @@ import json
|
|||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import hashlib
|
import hashlib
|
||||||
from typing import Optional, Literal
|
import hmac
|
||||||
|
from typing import Optional, Literal, Any
|
||||||
import shutil
|
import shutil
|
||||||
import time
|
import time
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
@@ -110,6 +111,43 @@ from .stats_manager import StatsManager
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class AuditLogger:
|
||||||
|
"""Append-only logger producing tamper-evident JSON records."""
|
||||||
|
|
||||||
|
def __init__(self, key: bytes, path: Path | None = None) -> None:
|
||||||
|
self.key = key
|
||||||
|
self.path = path or (Path.home() / ".seedpass" / "audit.log")
|
||||||
|
self.path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
self.last_sig = "0" * 64
|
||||||
|
if self.path.exists():
|
||||||
|
try:
|
||||||
|
with self.path.open("r", encoding="utf-8") as fh:
|
||||||
|
last = None
|
||||||
|
for line in fh:
|
||||||
|
if line.strip():
|
||||||
|
last = line
|
||||||
|
if last is not None:
|
||||||
|
self.last_sig = json.loads(last).get("sig", self.last_sig)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def log(self, event: str, details: dict[str, Any] | None = None) -> None:
|
||||||
|
details = details or {}
|
||||||
|
entry = {
|
||||||
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||||
|
"event": event,
|
||||||
|
"details": details,
|
||||||
|
}
|
||||||
|
payload = json.dumps(entry, sort_keys=True, separators=(",", ":"))
|
||||||
|
sig = hmac.new(
|
||||||
|
self.key, f"{self.last_sig}{payload}".encode(), hashlib.sha256
|
||||||
|
).hexdigest()
|
||||||
|
entry["sig"] = sig
|
||||||
|
with self.path.open("a", encoding="utf-8") as fh:
|
||||||
|
fh.write(json.dumps(entry, sort_keys=True) + "\n")
|
||||||
|
self.last_sig = sig
|
||||||
|
|
||||||
|
|
||||||
def calculate_profile_id(seed: str) -> str:
|
def calculate_profile_id(seed: str) -> str:
|
||||||
"""Return the fingerprint identifier for ``seed``."""
|
"""Return the fingerprint identifier for ``seed``."""
|
||||||
fp = generate_fingerprint(seed)
|
fp = generate_fingerprint(seed)
|
||||||
@@ -181,6 +219,7 @@ class PasswordManager:
|
|||||||
self._current_notification: Optional[Notification] = None
|
self._current_notification: Optional[Notification] = None
|
||||||
self._notification_expiry: float = 0.0
|
self._notification_expiry: float = 0.0
|
||||||
self._bip85_cache: dict[tuple[int, int], bytes] = {}
|
self._bip85_cache: dict[tuple[int, int], bytes] = {}
|
||||||
|
self.audit_logger: Optional[AuditLogger] = None
|
||||||
|
|
||||||
# Track changes to trigger periodic Nostr sync
|
# Track changes to trigger periodic Nostr sync
|
||||||
self.is_dirty: bool = False
|
self.is_dirty: bool = False
|
||||||
@@ -374,6 +413,12 @@ class PasswordManager:
|
|||||||
self.initialize_managers()
|
self.initialize_managers()
|
||||||
self.locked = False
|
self.locked = False
|
||||||
self.update_activity()
|
self.update_activity()
|
||||||
|
if (
|
||||||
|
getattr(self, "audit_logger", None) is None
|
||||||
|
and getattr(self, "_parent_seed_secret", None) is not None
|
||||||
|
):
|
||||||
|
key = hashlib.sha256(self.parent_seed.encode("utf-8")).digest()
|
||||||
|
self.audit_logger = AuditLogger(key)
|
||||||
if (
|
if (
|
||||||
getattr(self, "config_manager", None)
|
getattr(self, "config_manager", None)
|
||||||
and self.config_manager.get_quick_unlock()
|
and self.config_manager.get_quick_unlock()
|
||||||
@@ -383,6 +428,11 @@ class PasswordManager:
|
|||||||
self.current_fingerprint or "unknown",
|
self.current_fingerprint or "unknown",
|
||||||
datetime.now(timezone.utc).isoformat(),
|
datetime.now(timezone.utc).isoformat(),
|
||||||
)
|
)
|
||||||
|
audit_logger = getattr(self, "audit_logger", None)
|
||||||
|
if audit_logger:
|
||||||
|
audit_logger.log(
|
||||||
|
"quick_unlock", {"fingerprint": self.current_fingerprint}
|
||||||
|
)
|
||||||
self.last_unlock_duration = time.perf_counter() - start
|
self.last_unlock_duration = time.perf_counter() - start
|
||||||
if getattr(self, "verbose_timing", False):
|
if getattr(self, "verbose_timing", False):
|
||||||
logger.info("Vault unlocked in %.2f seconds", self.last_unlock_duration)
|
logger.info("Vault unlocked in %.2f seconds", self.last_unlock_duration)
|
||||||
@@ -4324,6 +4374,9 @@ class PasswordManager:
|
|||||||
parent_seed=self.parent_seed,
|
parent_seed=self.parent_seed,
|
||||||
)
|
)
|
||||||
print(colored(f"Database exported to '{path}'.", "green"))
|
print(colored(f"Database exported to '{path}'.", "green"))
|
||||||
|
audit_logger = getattr(self, "audit_logger", None)
|
||||||
|
if audit_logger and path is not None:
|
||||||
|
audit_logger.log("backup_export", {"path": str(path)})
|
||||||
return path
|
return path
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"Failed to export database: {e}", exc_info=True)
|
logging.error(f"Failed to export database: {e}", exc_info=True)
|
||||||
@@ -4538,7 +4591,12 @@ class PasswordManager:
|
|||||||
"green",
|
"green",
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
audit_logger = getattr(self, "audit_logger", None)
|
||||||
|
if audit_logger:
|
||||||
|
details = {
|
||||||
|
"backup_path": str(backup_path) if save and backup_path else None
|
||||||
|
}
|
||||||
|
audit_logger.log("seed_reveal", details)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"Error during parent seed backup/reveal: {e}", exc_info=True)
|
logging.error(f"Error during parent seed backup/reveal: {e}", exc_info=True)
|
||||||
print(colored(f"Error: Failed to backup/reveal parent seed: {e}", "red"))
|
print(colored(f"Error: Failed to backup/reveal parent seed: {e}", "red"))
|
||||||
|
84
src/tests/test_audit_logger.py
Normal file
84
src/tests/test_audit_logger.py
Normal file
@@ -0,0 +1,84 @@
|
|||||||
|
import json
|
||||||
|
import hashlib
|
||||||
|
import hmac
|
||||||
|
import queue
|
||||||
|
from pathlib import Path
|
||||||
|
from types import SimpleNamespace
|
||||||
|
|
||||||
|
import importlib
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from seedpass.core.manager import PasswordManager, AuditLogger
|
||||||
|
import seedpass.core.manager as manager_module
|
||||||
|
|
||||||
|
|
||||||
|
def test_audit_logger_records_events(monkeypatch, tmp_path):
|
||||||
|
monkeypatch.setattr(Path, "home", lambda: tmp_path)
|
||||||
|
|
||||||
|
pm = PasswordManager.__new__(PasswordManager)
|
||||||
|
pm.fingerprint_dir = tmp_path
|
||||||
|
pm.current_fingerprint = "user123"
|
||||||
|
pm.profile_stack = []
|
||||||
|
pm.setup_encryption_manager = lambda *a, **k: None
|
||||||
|
pm.initialize_bip85 = lambda: None
|
||||||
|
pm.initialize_managers = lambda: None
|
||||||
|
pm.update_activity = lambda: None
|
||||||
|
pm.verify_password = lambda pw: True
|
||||||
|
pm.notifications = queue.Queue()
|
||||||
|
pm.parent_seed = "seed phrase"
|
||||||
|
pm.config_manager = SimpleNamespace(get_quick_unlock=lambda: True)
|
||||||
|
|
||||||
|
manager_module.clear_header_with_notification = lambda *a, **k: None
|
||||||
|
|
||||||
|
pm.unlock_vault(password="pw")
|
||||||
|
|
||||||
|
dest = tmp_path / "db.json.enc"
|
||||||
|
monkeypatch.setattr(manager_module, "export_backup", lambda *a, **k: dest)
|
||||||
|
pm.vault = object()
|
||||||
|
pm.backup_manager = object()
|
||||||
|
pm.handle_export_database(dest)
|
||||||
|
|
||||||
|
confirms = iter([True, False])
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"seedpass.core.manager.confirm_action", lambda *_a, **_k: next(confirms)
|
||||||
|
)
|
||||||
|
pm.encryption_manager = SimpleNamespace(encrypt_and_save_file=lambda *a, **k: None)
|
||||||
|
pm.handle_backup_reveal_parent_seed(password="pw")
|
||||||
|
|
||||||
|
log_path = tmp_path / ".seedpass" / "audit.log"
|
||||||
|
lines = [json.loads(l) for l in log_path.read_text().splitlines()]
|
||||||
|
events = [e["event"] for e in lines]
|
||||||
|
assert "quick_unlock" in events
|
||||||
|
assert "backup_export" in events
|
||||||
|
assert "seed_reveal" in events
|
||||||
|
|
||||||
|
|
||||||
|
def _verify_chain(path: Path, key: bytes) -> bool:
|
||||||
|
prev = "0" * 64
|
||||||
|
for line in path.read_text().splitlines():
|
||||||
|
data = json.loads(line)
|
||||||
|
sig = data.pop("sig")
|
||||||
|
payload = json.dumps(data, sort_keys=True, separators=(",", ":"))
|
||||||
|
expected = hmac.new(
|
||||||
|
key, f"{prev}{payload}".encode(), hashlib.sha256
|
||||||
|
).hexdigest()
|
||||||
|
if sig != expected:
|
||||||
|
return False
|
||||||
|
prev = sig
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def test_audit_log_tamper_evident(monkeypatch, tmp_path):
|
||||||
|
monkeypatch.setattr(Path, "home", lambda: tmp_path)
|
||||||
|
key = hashlib.sha256(b"seed").digest()
|
||||||
|
logger = AuditLogger(key)
|
||||||
|
logger.log("one", {})
|
||||||
|
logger.log("two", {})
|
||||||
|
log_path = tmp_path / ".seedpass" / "audit.log"
|
||||||
|
assert _verify_chain(log_path, key)
|
||||||
|
lines = log_path.read_text().splitlines()
|
||||||
|
tampered = json.loads(lines[0])
|
||||||
|
tampered["event"] = "evil"
|
||||||
|
lines[0] = json.dumps(tampered)
|
||||||
|
log_path.write_text("\n".join(lines) + "\n")
|
||||||
|
assert not _verify_chain(log_path, key)
|
Reference in New Issue
Block a user