mirror of
https://github.com/PR0M3TH3AN/SeedPass.git
synced 2025-09-07 06:48:52 +00:00
Compare commits
4 Commits
d9f76ee668
...
fde09bd1a0
Author | SHA1 | Date | |
---|---|---|---|
![]() |
fde09bd1a0 | ||
![]() |
b307728c05 | ||
![]() |
8ade9e3028 | ||
![]() |
c0a6187478 |
48
src/main.py
48
src/main.py
@@ -9,6 +9,7 @@ if vendor_dir.exists():
|
||||
|
||||
import os
|
||||
import logging
|
||||
from logging.handlers import QueueHandler, QueueListener
|
||||
import signal
|
||||
import time
|
||||
import argparse
|
||||
@@ -38,6 +39,7 @@ from utils import (
|
||||
)
|
||||
from utils.clipboard import ClipboardUnavailableError
|
||||
from utils.atomic_write import atomic_write
|
||||
from utils.logging_utils import ConsolePauseFilter
|
||||
import queue
|
||||
from local_bip85.bip85 import Bip85Error
|
||||
|
||||
@@ -77,39 +79,43 @@ def load_global_config() -> dict:
|
||||
return {}
|
||||
|
||||
|
||||
def configure_logging():
|
||||
logger = logging.getLogger()
|
||||
logger.setLevel(logging.DEBUG) # Keep this as DEBUG to capture all logs
|
||||
_queue_listener: QueueListener | None = None
|
||||
|
||||
|
||||
def configure_logging():
|
||||
"""Configure application-wide logging with queue-based handlers."""
|
||||
global _queue_listener
|
||||
|
||||
logger = logging.getLogger()
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
# Remove all handlers associated with the root logger object
|
||||
for handler in logger.handlers[:]:
|
||||
logger.removeHandler(handler)
|
||||
|
||||
# Ensure the 'logs' directory exists
|
||||
log_directory = Path("logs")
|
||||
if not log_directory.exists():
|
||||
log_directory.mkdir(parents=True, exist_ok=True)
|
||||
log_directory.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Create handlers
|
||||
c_handler = logging.StreamHandler(sys.stdout)
|
||||
f_handler = logging.FileHandler(log_directory / "main.log")
|
||||
log_queue: queue.Queue[logging.LogRecord] = queue.Queue()
|
||||
queue_handler = QueueHandler(log_queue)
|
||||
|
||||
# Set levels: only errors and critical messages will be shown in the console
|
||||
c_handler.setLevel(logging.ERROR)
|
||||
f_handler.setLevel(logging.DEBUG)
|
||||
console_handler = logging.StreamHandler(sys.stderr)
|
||||
console_handler.setLevel(logging.ERROR)
|
||||
console_handler.addFilter(ConsolePauseFilter())
|
||||
|
||||
file_handler = logging.FileHandler(log_directory / "main.log")
|
||||
file_handler.setLevel(logging.DEBUG)
|
||||
|
||||
# Create formatters and add them to handlers
|
||||
formatter = logging.Formatter(
|
||||
"%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]"
|
||||
"%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]",
|
||||
)
|
||||
c_handler.setFormatter(formatter)
|
||||
f_handler.setFormatter(formatter)
|
||||
console_handler.setFormatter(formatter)
|
||||
file_handler.setFormatter(formatter)
|
||||
|
||||
# Add handlers to the logger
|
||||
logger.addHandler(c_handler)
|
||||
logger.addHandler(f_handler)
|
||||
_queue_listener = QueueListener(log_queue, console_handler, file_handler)
|
||||
_queue_listener.start()
|
||||
|
||||
logger.addHandler(queue_handler)
|
||||
|
||||
# Set logging level for third-party libraries to WARNING to suppress their debug logs
|
||||
logging.getLogger("monstr").setLevel(logging.WARNING)
|
||||
logging.getLogger("nostr").setLevel(logging.WARNING)
|
||||
|
||||
|
@@ -1,4 +1,5 @@
|
||||
import types
|
||||
import pytest
|
||||
from utils import seed_prompt
|
||||
|
||||
|
||||
@@ -46,6 +47,37 @@ def test_masked_input_windows_space(monkeypatch, capsys):
|
||||
assert out.count("*") == 4
|
||||
|
||||
|
||||
def test_masked_input_posix_ctrl_c(monkeypatch):
|
||||
seq = iter(["\x03"])
|
||||
monkeypatch.setattr(seed_prompt.sys.stdin, "read", lambda n=1: next(seq))
|
||||
monkeypatch.setattr(seed_prompt.sys.stdin, "fileno", lambda: 0)
|
||||
|
||||
calls: list[tuple[str, int]] = []
|
||||
fake_termios = types.SimpleNamespace(
|
||||
tcgetattr=lambda fd: "old",
|
||||
tcsetattr=lambda fd, *_: calls.append(("tcsetattr", fd)),
|
||||
TCSADRAIN=1,
|
||||
)
|
||||
fake_tty = types.SimpleNamespace(setraw=lambda fd: calls.append(("setraw", fd)))
|
||||
monkeypatch.setattr(seed_prompt, "termios", fake_termios)
|
||||
monkeypatch.setattr(seed_prompt, "tty", fake_tty)
|
||||
monkeypatch.setattr(seed_prompt.sys, "platform", "linux", raising=False)
|
||||
|
||||
with pytest.raises(KeyboardInterrupt):
|
||||
seed_prompt.masked_input("Enter: ")
|
||||
assert calls == [("setraw", 0), ("tcsetattr", 0)]
|
||||
|
||||
|
||||
def test_masked_input_windows_ctrl_c(monkeypatch):
|
||||
seq = iter(["\x03"])
|
||||
fake_msvcrt = types.SimpleNamespace(getwch=lambda: next(seq))
|
||||
monkeypatch.setattr(seed_prompt, "msvcrt", fake_msvcrt)
|
||||
monkeypatch.setattr(seed_prompt.sys, "platform", "win32", raising=False)
|
||||
|
||||
with pytest.raises(KeyboardInterrupt):
|
||||
seed_prompt.masked_input("Password: ")
|
||||
|
||||
|
||||
def test_prompt_seed_words_valid(monkeypatch):
|
||||
from mnemonic import Mnemonic
|
||||
|
||||
|
24
src/utils/logging_utils.py
Normal file
24
src/utils/logging_utils.py
Normal file
@@ -0,0 +1,24 @@
|
||||
import logging
|
||||
|
||||
_console_paused = False
|
||||
|
||||
|
||||
class ConsolePauseFilter(logging.Filter):
|
||||
"""Filter that blocks records when console logging is paused."""
|
||||
|
||||
def filter(
|
||||
self, record: logging.LogRecord
|
||||
) -> bool: # pragma: no cover - small utility
|
||||
return not _console_paused
|
||||
|
||||
|
||||
def pause_console_logging() -> None:
|
||||
"""Temporarily pause logging to console handlers."""
|
||||
global _console_paused
|
||||
_console_paused = True
|
||||
|
||||
|
||||
def resume_console_logging() -> None:
|
||||
"""Resume logging to console handlers."""
|
||||
global _console_paused
|
||||
_console_paused = False
|
@@ -15,6 +15,7 @@ except ImportError: # pragma: no cover - POSIX only
|
||||
tty = None # type: ignore
|
||||
|
||||
from utils.terminal_utils import clear_screen
|
||||
from utils.logging_utils import pause_console_logging, resume_console_logging
|
||||
|
||||
|
||||
DEFAULT_MAX_ATTEMPTS = 5
|
||||
@@ -58,6 +59,8 @@ def _masked_input_windows(prompt: str) -> str:
|
||||
buffer: list[str] = []
|
||||
while True:
|
||||
ch = msvcrt.getwch()
|
||||
if ch == "\x03":
|
||||
raise KeyboardInterrupt
|
||||
if ch in ("\r", "\n"):
|
||||
sys.stdout.write("\n")
|
||||
return "".join(buffer)
|
||||
@@ -85,6 +88,8 @@ def _masked_input_posix(prompt: str) -> str:
|
||||
tty.setraw(fd)
|
||||
while True:
|
||||
ch = sys.stdin.read(1)
|
||||
if ch == "\x03":
|
||||
raise KeyboardInterrupt
|
||||
if ch in ("\r", "\n"):
|
||||
sys.stdout.write("\n")
|
||||
return "".join(buffer)
|
||||
@@ -103,10 +108,15 @@ def _masked_input_posix(prompt: str) -> str:
|
||||
def masked_input(prompt: str) -> str:
|
||||
"""Return input from the user while masking typed characters."""
|
||||
func = _masked_input_windows if sys.platform == "win32" else _masked_input_posix
|
||||
pause_console_logging()
|
||||
try:
|
||||
return func(prompt)
|
||||
except KeyboardInterrupt:
|
||||
raise
|
||||
except Exception: # pragma: no cover - fallback when TTY operations fail
|
||||
return input(prompt)
|
||||
finally:
|
||||
resume_console_logging()
|
||||
|
||||
|
||||
def prompt_seed_words(count: int = 12, *, max_attempts: int | None = None) -> str:
|
||||
|
Reference in New Issue
Block a user