mirror of
https://github.com/PR0M3TH3AN/SeedPass.git
synced 2025-09-08 23:38:49 +00:00
Merge pull request #543 from PR0M3TH3AN/codex/new-task
Update notification system
This commit is contained in:
53
src/main.py
53
src/main.py
@@ -26,6 +26,7 @@ from utils import (
|
|||||||
clear_screen,
|
clear_screen,
|
||||||
pause,
|
pause,
|
||||||
clear_header_with_notification,
|
clear_header_with_notification,
|
||||||
|
clear_and_print_fingerprint,
|
||||||
)
|
)
|
||||||
import queue
|
import queue
|
||||||
from local_bip85.bip85 import Bip85Error
|
from local_bip85.bip85 import Bip85Error
|
||||||
@@ -102,22 +103,34 @@ def confirm_action(prompt: str) -> bool:
|
|||||||
|
|
||||||
|
|
||||||
def drain_notifications(pm: PasswordManager) -> str | None:
|
def drain_notifications(pm: PasswordManager) -> str | None:
|
||||||
"""Return the most recent queued notification message, clearing the queue."""
|
"""Return the next queued notification message if available."""
|
||||||
queue_obj = getattr(pm, "notifications", None)
|
queue_obj = getattr(pm, "notifications", None)
|
||||||
if queue_obj is None:
|
if queue_obj is None:
|
||||||
return None
|
return None
|
||||||
last_note = None
|
try:
|
||||||
while True:
|
note = queue_obj.get_nowait()
|
||||||
try:
|
except queue.Empty:
|
||||||
last_note = queue_obj.get_nowait()
|
|
||||||
except queue.Empty:
|
|
||||||
break
|
|
||||||
if not last_note:
|
|
||||||
return None
|
return None
|
||||||
category = getattr(last_note, "level", "info").lower()
|
category = getattr(note, "level", "info").lower()
|
||||||
if category not in ("info", "warning", "error"):
|
if category not in ("info", "warning", "error"):
|
||||||
category = "info"
|
category = "info"
|
||||||
return color_text(getattr(last_note, "message", ""), category)
|
return color_text(getattr(note, "message", ""), category)
|
||||||
|
|
||||||
|
|
||||||
|
def get_notification_text(pm: PasswordManager) -> str:
|
||||||
|
"""Return the current notification from ``pm`` as a colored string."""
|
||||||
|
note = None
|
||||||
|
if hasattr(pm, "get_current_notification"):
|
||||||
|
try:
|
||||||
|
note = pm.get_current_notification()
|
||||||
|
except Exception:
|
||||||
|
note = None
|
||||||
|
if not note:
|
||||||
|
return ""
|
||||||
|
category = getattr(note, "level", "info").lower()
|
||||||
|
if category not in ("info", "warning", "error"):
|
||||||
|
category = "info"
|
||||||
|
return color_text(getattr(note, "message", ""), category)
|
||||||
|
|
||||||
|
|
||||||
def handle_switch_fingerprint(password_manager: PasswordManager):
|
def handle_switch_fingerprint(password_manager: PasswordManager):
|
||||||
@@ -264,7 +277,7 @@ def _display_live_stats(
|
|||||||
if not sys.stdin or not sys.stdin.isatty():
|
if not sys.stdin or not sys.stdin.isatty():
|
||||||
clear_screen()
|
clear_screen()
|
||||||
display_fn()
|
display_fn()
|
||||||
note = drain_notifications(password_manager)
|
note = get_notification_text(password_manager)
|
||||||
if note:
|
if note:
|
||||||
print(note)
|
print(note)
|
||||||
print(colored("Press Enter to continue.", "cyan"))
|
print(colored("Press Enter to continue.", "cyan"))
|
||||||
@@ -274,13 +287,9 @@ def _display_live_stats(
|
|||||||
while True:
|
while True:
|
||||||
clear_screen()
|
clear_screen()
|
||||||
display_fn()
|
display_fn()
|
||||||
print()
|
note = get_notification_text(password_manager)
|
||||||
note = drain_notifications(password_manager)
|
|
||||||
sys.stdout.write("\033[F\033[2K")
|
|
||||||
if note:
|
if note:
|
||||||
print(note)
|
print(note)
|
||||||
else:
|
|
||||||
print()
|
|
||||||
print(colored("Press Enter to continue.", "cyan"))
|
print(colored("Press Enter to continue.", "cyan"))
|
||||||
sys.stdout.flush()
|
sys.stdout.flush()
|
||||||
try:
|
try:
|
||||||
@@ -940,12 +949,15 @@ def display_menu(
|
|||||||
"header_fingerprint_args",
|
"header_fingerprint_args",
|
||||||
(getattr(password_manager, "current_fingerprint", None), None, None),
|
(getattr(password_manager, "current_fingerprint", None), None, None),
|
||||||
)
|
)
|
||||||
clear_header_with_notification(
|
clear_and_print_fingerprint(
|
||||||
fp,
|
fp,
|
||||||
"Main Menu",
|
"Main Menu",
|
||||||
parent_fingerprint=parent_fp,
|
parent_fingerprint=parent_fp,
|
||||||
child_fingerprint=child_fp,
|
child_fingerprint=child_fp,
|
||||||
)
|
)
|
||||||
|
note_line = get_notification_text(password_manager)
|
||||||
|
if note_line:
|
||||||
|
print(note_line)
|
||||||
if time.time() - password_manager.last_activity > inactivity_timeout:
|
if time.time() - password_manager.last_activity > inactivity_timeout:
|
||||||
print(colored("Session timed out. Vault locked.", "yellow"))
|
print(colored("Session timed out. Vault locked.", "yellow"))
|
||||||
password_manager.lock_vault()
|
password_manager.lock_vault()
|
||||||
@@ -965,13 +977,6 @@ def display_menu(
|
|||||||
for handler in logging.getLogger().handlers:
|
for handler in logging.getLogger().handlers:
|
||||||
handler.flush()
|
handler.flush()
|
||||||
print(color_text(menu, "menu"))
|
print(color_text(menu, "menu"))
|
||||||
print()
|
|
||||||
last_note = drain_notifications(password_manager)
|
|
||||||
sys.stdout.write("\033[F\033[2K")
|
|
||||||
if last_note:
|
|
||||||
print(last_note)
|
|
||||||
else:
|
|
||||||
print()
|
|
||||||
try:
|
try:
|
||||||
choice = timed_input(
|
choice = timed_input(
|
||||||
"Enter your choice (1-8) or press Enter to exit: ",
|
"Enter your choice (1-8) or press Enter to exit: ",
|
||||||
|
@@ -39,26 +39,33 @@ def _make_pm(msg):
|
|||||||
def test_display_menu_prints_notifications(monkeypatch, capsys):
|
def test_display_menu_prints_notifications(monkeypatch, capsys):
|
||||||
pm = _make_pm("hello")
|
pm = _make_pm("hello")
|
||||||
monkeypatch.setattr(main, "_display_live_stats", lambda *_: None)
|
monkeypatch.setattr(main, "_display_live_stats", lambda *_: None)
|
||||||
monkeypatch.setattr(main, "clear_header_with_notification", lambda *a, **k: None)
|
monkeypatch.setattr(
|
||||||
|
main, "clear_and_print_fingerprint", lambda *a, **k: print("HEADER")
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(main, "get_notification_text", lambda *_: "hello")
|
||||||
monkeypatch.setattr(main, "timed_input", lambda *a, **k: "")
|
monkeypatch.setattr(main, "timed_input", lambda *a, **k: "")
|
||||||
with pytest.raises(SystemExit):
|
with pytest.raises(SystemExit):
|
||||||
main.display_menu(pm, sync_interval=1000, inactivity_timeout=1000)
|
main.display_menu(pm, sync_interval=1000, inactivity_timeout=1000)
|
||||||
out = capsys.readouterr().out
|
out = capsys.readouterr().out
|
||||||
assert "\x1b[F\x1b[2K" in out
|
assert out.splitlines()[0] == "HEADER"
|
||||||
assert out.count("hello") == 1
|
assert out.splitlines()[1] == "hello"
|
||||||
|
|
||||||
|
|
||||||
def test_display_menu_reuses_notification_line(monkeypatch, capsys):
|
def test_display_menu_reuses_notification_line(monkeypatch, capsys):
|
||||||
pm = _make_pm(None)
|
pm = _make_pm(None)
|
||||||
msgs = iter(["first", "second"])
|
msgs = iter(["first", "second"])
|
||||||
monkeypatch.setattr(main, "_display_live_stats", lambda *_: None)
|
monkeypatch.setattr(main, "_display_live_stats", lambda *_: None)
|
||||||
monkeypatch.setattr(main, "clear_header_with_notification", lambda *a, **k: None)
|
monkeypatch.setattr(
|
||||||
|
main, "clear_and_print_fingerprint", lambda *a, **k: print("HEADER")
|
||||||
|
)
|
||||||
inputs = iter(["9", ""])
|
inputs = iter(["9", ""])
|
||||||
monkeypatch.setattr(main, "timed_input", lambda *a, **k: next(inputs))
|
monkeypatch.setattr(main, "timed_input", lambda *a, **k: next(inputs))
|
||||||
monkeypatch.setattr(main, "drain_notifications", lambda _pm: next(msgs, None))
|
monkeypatch.setattr(main, "get_notification_text", lambda _pm: next(msgs, ""))
|
||||||
with pytest.raises(SystemExit):
|
with pytest.raises(SystemExit):
|
||||||
main.display_menu(pm, sync_interval=1000, inactivity_timeout=1000)
|
main.display_menu(pm, sync_interval=1000, inactivity_timeout=1000)
|
||||||
out = capsys.readouterr().out
|
out = capsys.readouterr().out
|
||||||
|
lines = out.splitlines()
|
||||||
|
assert lines[0] == "HEADER"
|
||||||
assert out.count("first") == 1
|
assert out.count("first") == 1
|
||||||
assert out.count("second") == 1
|
assert out.count("second") == 1
|
||||||
assert out.count("Select an option:") == 2
|
assert out.count("Select an option:") == 2
|
||||||
|
@@ -14,7 +14,7 @@ def _make_pm():
|
|||||||
|
|
||||||
def test_live_stats_shows_message(monkeypatch, capsys):
|
def test_live_stats_shows_message(monkeypatch, capsys):
|
||||||
pm = _make_pm()
|
pm = _make_pm()
|
||||||
monkeypatch.setattr(main, "drain_notifications", lambda *_: None)
|
monkeypatch.setattr(main, "get_notification_text", lambda *_: "")
|
||||||
monkeypatch.setattr(
|
monkeypatch.setattr(
|
||||||
main,
|
main,
|
||||||
"timed_input",
|
"timed_input",
|
||||||
@@ -27,7 +27,7 @@ def test_live_stats_shows_message(monkeypatch, capsys):
|
|||||||
|
|
||||||
def test_live_stats_shows_notification(monkeypatch, capsys):
|
def test_live_stats_shows_notification(monkeypatch, capsys):
|
||||||
pm = _make_pm()
|
pm = _make_pm()
|
||||||
monkeypatch.setattr(main, "drain_notifications", lambda *_: "note")
|
monkeypatch.setattr(main, "get_notification_text", lambda *_: "note")
|
||||||
monkeypatch.setattr(
|
monkeypatch.setattr(
|
||||||
main,
|
main,
|
||||||
"timed_input",
|
"timed_input",
|
||||||
|
Reference in New Issue
Block a user