mirror of
https://github.com/PR0M3TH3AN/SeedPass.git
synced 2025-09-08 07:18:47 +00:00
Compare commits
9 Commits
fde09bd1a0
...
ca733be2e3
Author | SHA1 | Date | |
---|---|---|---|
![]() |
ca733be2e3 | ||
![]() |
e528bebae3 | ||
![]() |
e760bf2b25 | ||
![]() |
d106802a18 | ||
![]() |
f2648a8c1d | ||
![]() |
d030cf9692 | ||
![]() |
bebbca8169 | ||
![]() |
4d7e3d4b63 | ||
![]() |
7b0344739f |
@@ -34,13 +34,9 @@ def initialize_app() -> None:
|
|||||||
"""Ensure the application directory exists."""
|
"""Ensure the application directory exists."""
|
||||||
try:
|
try:
|
||||||
APP_DIR.mkdir(exist_ok=True, parents=True)
|
APP_DIR.mkdir(exist_ok=True, parents=True)
|
||||||
if logger.isEnabledFor(logging.DEBUG):
|
logger.debug("Application directory created at %s", APP_DIR)
|
||||||
logger.info(f"Application directory created at {APP_DIR}")
|
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
if logger.isEnabledFor(logging.DEBUG):
|
logger.error("Failed to create application directory: %s", exc, exc_info=True)
|
||||||
logger.error(
|
|
||||||
f"Failed to create application directory: {exc}", exc_info=True
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------
|
# -----------------------------------
|
||||||
|
45
src/main.py
45
src/main.py
@@ -9,7 +9,6 @@ if vendor_dir.exists():
|
|||||||
|
|
||||||
import os
|
import os
|
||||||
import logging
|
import logging
|
||||||
from logging.handlers import QueueHandler, QueueListener
|
|
||||||
import signal
|
import signal
|
||||||
import time
|
import time
|
||||||
import argparse
|
import argparse
|
||||||
@@ -39,8 +38,11 @@ from utils import (
|
|||||||
)
|
)
|
||||||
from utils.clipboard import ClipboardUnavailableError
|
from utils.clipboard import ClipboardUnavailableError
|
||||||
from utils.atomic_write import atomic_write
|
from utils.atomic_write import atomic_write
|
||||||
from utils.logging_utils import ConsolePauseFilter
|
from utils.logging_utils import (
|
||||||
import queue
|
ConsolePauseFilter,
|
||||||
|
ChecksumWarningFilter,
|
||||||
|
pause_logging_for_ui,
|
||||||
|
)
|
||||||
from local_bip85.bip85 import Bip85Error
|
from local_bip85.bip85 import Bip85Error
|
||||||
|
|
||||||
|
|
||||||
@@ -59,7 +61,7 @@ def _warn_missing_optional_dependencies() -> None:
|
|||||||
try:
|
try:
|
||||||
importlib.import_module(module)
|
importlib.import_module(module)
|
||||||
except ModuleNotFoundError:
|
except ModuleNotFoundError:
|
||||||
logging.warning(
|
logging.debug(
|
||||||
"Optional dependency '%s' is not installed; %s will be unavailable.",
|
"Optional dependency '%s' is not installed; %s will be unavailable.",
|
||||||
module,
|
module,
|
||||||
feature,
|
feature,
|
||||||
@@ -79,28 +81,16 @@ def load_global_config() -> dict:
|
|||||||
return {}
|
return {}
|
||||||
|
|
||||||
|
|
||||||
_queue_listener: QueueListener | None = None
|
def configure_logging() -> None:
|
||||||
|
"""Configure application-wide logging handlers."""
|
||||||
|
|
||||||
def configure_logging():
|
|
||||||
"""Configure application-wide logging with queue-based handlers."""
|
|
||||||
global _queue_listener
|
|
||||||
|
|
||||||
logger = logging.getLogger()
|
|
||||||
logger.setLevel(logging.DEBUG)
|
|
||||||
|
|
||||||
for handler in logger.handlers[:]:
|
|
||||||
logger.removeHandler(handler)
|
|
||||||
|
|
||||||
log_directory = Path("logs")
|
log_directory = Path("logs")
|
||||||
log_directory.mkdir(parents=True, exist_ok=True)
|
log_directory.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
log_queue: queue.Queue[logging.LogRecord] = queue.Queue()
|
|
||||||
queue_handler = QueueHandler(log_queue)
|
|
||||||
|
|
||||||
console_handler = logging.StreamHandler(sys.stderr)
|
console_handler = logging.StreamHandler(sys.stderr)
|
||||||
console_handler.setLevel(logging.ERROR)
|
console_handler.setLevel(logging.WARNING)
|
||||||
console_handler.addFilter(ConsolePauseFilter())
|
console_handler.addFilter(ConsolePauseFilter())
|
||||||
|
console_handler.addFilter(ChecksumWarningFilter())
|
||||||
|
|
||||||
file_handler = logging.FileHandler(log_directory / "main.log")
|
file_handler = logging.FileHandler(log_directory / "main.log")
|
||||||
file_handler.setLevel(logging.DEBUG)
|
file_handler.setLevel(logging.DEBUG)
|
||||||
@@ -111,15 +101,19 @@ def configure_logging():
|
|||||||
console_handler.setFormatter(formatter)
|
console_handler.setFormatter(formatter)
|
||||||
file_handler.setFormatter(formatter)
|
file_handler.setFormatter(formatter)
|
||||||
|
|
||||||
_queue_listener = QueueListener(log_queue, console_handler, file_handler)
|
root_logger = logging.getLogger()
|
||||||
_queue_listener.start()
|
root_logger.setLevel(logging.DEBUG)
|
||||||
|
root_logger.handlers.clear()
|
||||||
|
root_logger.addHandler(console_handler)
|
||||||
|
root_logger.addHandler(file_handler)
|
||||||
|
|
||||||
logger.addHandler(queue_handler)
|
logging.captureWarnings(True)
|
||||||
|
|
||||||
logging.getLogger("monstr").setLevel(logging.WARNING)
|
logging.getLogger("monstr").setLevel(logging.ERROR)
|
||||||
logging.getLogger("nostr").setLevel(logging.WARNING)
|
logging.getLogger("nostr").setLevel(logging.ERROR)
|
||||||
|
|
||||||
|
|
||||||
|
@pause_logging_for_ui
|
||||||
def confirm_action(prompt: str) -> bool:
|
def confirm_action(prompt: str) -> bool:
|
||||||
"""
|
"""
|
||||||
Prompts the user for confirmation.
|
Prompts the user for confirmation.
|
||||||
@@ -168,6 +162,7 @@ def get_notification_text(pm: PasswordManager) -> str:
|
|||||||
return color_text(getattr(note, "message", ""), category)
|
return color_text(getattr(note, "message", ""), category)
|
||||||
|
|
||||||
|
|
||||||
|
@pause_logging_for_ui
|
||||||
def handle_switch_fingerprint(password_manager: PasswordManager):
|
def handle_switch_fingerprint(password_manager: PasswordManager):
|
||||||
"""
|
"""
|
||||||
Handles switching the active fingerprint.
|
Handles switching the active fingerprint.
|
||||||
|
@@ -29,6 +29,7 @@ from utils.file_lock import exclusive_lock
|
|||||||
from mnemonic import Mnemonic
|
from mnemonic import Mnemonic
|
||||||
from utils.password_prompt import prompt_existing_password
|
from utils.password_prompt import prompt_existing_password
|
||||||
from utils.key_derivation import KdfConfig, CURRENT_KDF_VERSION
|
from utils.key_derivation import KdfConfig, CURRENT_KDF_VERSION
|
||||||
|
from .errors import DecryptionError
|
||||||
|
|
||||||
# Instantiate the logger
|
# Instantiate the logger
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@@ -137,12 +138,15 @@ class EncryptionManager:
|
|||||||
ciphertext = encrypted_data[15:]
|
ciphertext = encrypted_data[15:]
|
||||||
if len(ciphertext) < 16:
|
if len(ciphertext) < 16:
|
||||||
logger.error("AES-GCM payload too short")
|
logger.error("AES-GCM payload too short")
|
||||||
raise InvalidToken("AES-GCM payload too short")
|
raise DecryptionError(
|
||||||
|
f"Failed to decrypt{ctx}: AES-GCM payload too short"
|
||||||
|
)
|
||||||
return self.cipher.decrypt(nonce, ciphertext, None)
|
return self.cipher.decrypt(nonce, ciphertext, None)
|
||||||
except InvalidTag as e:
|
except InvalidTag as e:
|
||||||
msg = f"Failed to decrypt{ctx}: invalid key or corrupt file"
|
logger.error(f"Failed to decrypt{ctx}: invalid key or corrupt file")
|
||||||
logger.error(msg)
|
raise DecryptionError(
|
||||||
raise InvalidToken(msg) from e
|
f"Failed to decrypt{ctx}: invalid key or corrupt file"
|
||||||
|
) from e
|
||||||
|
|
||||||
# Next try the older V2 format
|
# Next try the older V2 format
|
||||||
if encrypted_data.startswith(b"V2:"):
|
if encrypted_data.startswith(b"V2:"):
|
||||||
@@ -151,7 +155,9 @@ class EncryptionManager:
|
|||||||
ciphertext = encrypted_data[15:]
|
ciphertext = encrypted_data[15:]
|
||||||
if len(ciphertext) < 16:
|
if len(ciphertext) < 16:
|
||||||
logger.error("AES-GCM payload too short")
|
logger.error("AES-GCM payload too short")
|
||||||
raise InvalidToken("AES-GCM payload too short")
|
raise DecryptionError(
|
||||||
|
f"Failed to decrypt{ctx}: AES-GCM payload too short"
|
||||||
|
)
|
||||||
return self.cipher.decrypt(nonce, ciphertext, None)
|
return self.cipher.decrypt(nonce, ciphertext, None)
|
||||||
except InvalidTag as e:
|
except InvalidTag as e:
|
||||||
logger.debug(
|
logger.debug(
|
||||||
@@ -164,9 +170,12 @@ class EncryptionManager:
|
|||||||
)
|
)
|
||||||
return result
|
return result
|
||||||
except InvalidToken:
|
except InvalidToken:
|
||||||
msg = f"Failed to decrypt{ctx}: invalid key or corrupt file"
|
logger.error(
|
||||||
logger.error(msg)
|
f"Failed to decrypt{ctx}: invalid key or corrupt file"
|
||||||
raise InvalidToken(msg) from e
|
)
|
||||||
|
raise DecryptionError(
|
||||||
|
f"Failed to decrypt{ctx}: invalid key or corrupt file"
|
||||||
|
) from e
|
||||||
|
|
||||||
# If it's neither V3 nor V2, assume legacy Fernet format
|
# If it's neither V3 nor V2, assume legacy Fernet format
|
||||||
logger.warning("Data is in legacy Fernet format. Attempting migration.")
|
logger.warning("Data is in legacy Fernet format. Attempting migration.")
|
||||||
@@ -176,18 +185,23 @@ class EncryptionManager:
|
|||||||
logger.error(
|
logger.error(
|
||||||
"Legacy Fernet decryption failed. Vault may be corrupt or key is incorrect."
|
"Legacy Fernet decryption failed. Vault may be corrupt or key is incorrect."
|
||||||
)
|
)
|
||||||
raise e
|
raise DecryptionError(
|
||||||
|
f"Failed to decrypt{ctx}: invalid key or corrupt file"
|
||||||
|
) from e
|
||||||
|
|
||||||
except (InvalidToken, InvalidTag) as e:
|
except DecryptionError as e:
|
||||||
if encrypted_data.startswith(b"V3|") or encrypted_data.startswith(b"V2:"):
|
if (
|
||||||
# Already determined not to be legacy; re-raise
|
encrypted_data.startswith(b"V3|")
|
||||||
raise
|
or encrypted_data.startswith(b"V2:")
|
||||||
if isinstance(e, InvalidToken) and str(e) == "AES-GCM payload too short":
|
or not self._legacy_migrate_flag
|
||||||
raise
|
):
|
||||||
if not self._legacy_migrate_flag:
|
|
||||||
raise
|
raise
|
||||||
logger.debug(f"Could not decrypt data{ctx}: {e}")
|
logger.debug(f"Could not decrypt data{ctx}: {e}")
|
||||||
raise LegacyFormatRequiresMigrationError(context)
|
raise LegacyFormatRequiresMigrationError(context) from e
|
||||||
|
except (InvalidToken, InvalidTag) as e: # pragma: no cover - safety net
|
||||||
|
raise DecryptionError(
|
||||||
|
f"Failed to decrypt{ctx}: invalid key or corrupt file"
|
||||||
|
) from e
|
||||||
|
|
||||||
def decrypt_legacy(
|
def decrypt_legacy(
|
||||||
self, encrypted_data: bytes, password: str, context: Optional[str] = None
|
self, encrypted_data: bytes, password: str, context: Optional[str] = None
|
||||||
@@ -224,8 +238,8 @@ class EncryptionManager:
|
|||||||
except Exception as e2: # pragma: no cover - try next iteration
|
except Exception as e2: # pragma: no cover - try next iteration
|
||||||
last_exc = e2
|
last_exc = e2
|
||||||
logger.error(f"Failed legacy decryption attempt: {last_exc}", exc_info=True)
|
logger.error(f"Failed legacy decryption attempt: {last_exc}", exc_info=True)
|
||||||
raise InvalidToken(
|
raise DecryptionError(
|
||||||
f"Could not decrypt{ctx} with any available method."
|
f"Failed to decrypt{ctx}: invalid key or corrupt file"
|
||||||
) from last_exc
|
) from last_exc
|
||||||
|
|
||||||
# --- All functions below this point now use the smart `decrypt_data` method ---
|
# --- All functions below this point now use the smart `decrypt_data` method ---
|
||||||
@@ -409,10 +423,16 @@ class EncryptionManager:
|
|||||||
if return_kdf:
|
if return_kdf:
|
||||||
return data, kdf
|
return data, kdf
|
||||||
return data
|
return data
|
||||||
except (InvalidToken, InvalidTag) as e:
|
except DecryptionError as e:
|
||||||
msg = f"Failed to decrypt or parse data from {file_path}: {e}"
|
msg = f"Failed to decrypt or parse data from {file_path}: {e}"
|
||||||
logger.error(msg)
|
logger.error(msg)
|
||||||
raise InvalidToken(msg) from e
|
raise
|
||||||
|
except (InvalidToken, InvalidTag) as e: # pragma: no cover - legacy safety
|
||||||
|
msg = f"Failed to decrypt or parse data from {file_path}: {e}"
|
||||||
|
logger.error(msg)
|
||||||
|
raise DecryptionError(
|
||||||
|
f"Failed to decrypt {file_path}: invalid key or corrupt file"
|
||||||
|
) from e
|
||||||
except JSONDecodeError as e:
|
except JSONDecodeError as e:
|
||||||
msg = f"Failed to parse JSON data from {file_path}: {e}"
|
msg = f"Failed to parse JSON data from {file_path}: {e}"
|
||||||
logger.error(msg)
|
logger.error(msg)
|
||||||
@@ -484,7 +504,7 @@ class EncryptionManager:
|
|||||||
logger.info("Index file from Nostr was processed and saved successfully.")
|
logger.info("Index file from Nostr was processed and saved successfully.")
|
||||||
self.last_migration_performed = is_legacy
|
self.last_migration_performed = is_legacy
|
||||||
return True
|
return True
|
||||||
except (InvalidToken, LegacyFormatRequiresMigrationError):
|
except (DecryptionError, LegacyFormatRequiresMigrationError):
|
||||||
try:
|
try:
|
||||||
password = prompt_existing_password(
|
password = prompt_existing_password(
|
||||||
"Enter your master password for legacy decryption: "
|
"Enter your master password for legacy decryption: "
|
||||||
|
@@ -9,6 +9,7 @@ exception, displaying a friendly message and exiting with code ``1``.
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
from click import ClickException
|
from click import ClickException
|
||||||
|
from cryptography.fernet import InvalidToken
|
||||||
|
|
||||||
|
|
||||||
class SeedPassError(ClickException):
|
class SeedPassError(ClickException):
|
||||||
@@ -18,4 +19,12 @@ class SeedPassError(ClickException):
|
|||||||
super().__init__(message)
|
super().__init__(message)
|
||||||
|
|
||||||
|
|
||||||
__all__ = ["SeedPassError"]
|
class DecryptionError(InvalidToken, SeedPassError):
|
||||||
|
"""Raised when encrypted data cannot be decrypted.
|
||||||
|
|
||||||
|
Subclasses :class:`cryptography.fernet.InvalidToken` so callers expecting
|
||||||
|
the cryptography exception continue to work.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = ["SeedPassError", "DecryptionError"]
|
||||||
|
@@ -37,8 +37,7 @@ from .password_generation import PasswordGenerator
|
|||||||
from .backup import BackupManager
|
from .backup import BackupManager
|
||||||
from .vault import Vault
|
from .vault import Vault
|
||||||
from .portable_backup import export_backup, import_backup, PortableMode
|
from .portable_backup import export_backup, import_backup, PortableMode
|
||||||
from cryptography.fernet import InvalidToken
|
from .errors import SeedPassError, DecryptionError
|
||||||
from .errors import SeedPassError
|
|
||||||
from .totp import TotpManager
|
from .totp import TotpManager
|
||||||
from .entry_types import EntryType
|
from .entry_types import EntryType
|
||||||
from .pubsub import bus
|
from .pubsub import bus
|
||||||
@@ -103,6 +102,7 @@ from mnemonic import Mnemonic
|
|||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
from utils.fingerprint_manager import FingerprintManager
|
from utils.fingerprint_manager import FingerprintManager
|
||||||
|
from utils.logging_utils import pause_logging_for_ui
|
||||||
|
|
||||||
# Import NostrClient
|
# Import NostrClient
|
||||||
from nostr.client import NostrClient
|
from nostr.client import NostrClient
|
||||||
@@ -751,13 +751,11 @@ class PasswordManager:
|
|||||||
):
|
):
|
||||||
self.config_manager.set_kdf_iterations(iter_try)
|
self.config_manager.set_kdf_iterations(iter_try)
|
||||||
break
|
break
|
||||||
except InvalidToken:
|
except DecryptionError:
|
||||||
seed_mgr = None
|
seed_mgr = None
|
||||||
|
|
||||||
if seed_mgr is None:
|
if seed_mgr is None:
|
||||||
msg = (
|
msg = "Incorrect password or corrupt file"
|
||||||
"Invalid password for selected seed profile. Please try again."
|
|
||||||
)
|
|
||||||
print(colored(msg, "red"))
|
print(colored(msg, "red"))
|
||||||
attempts += 1
|
attempts += 1
|
||||||
password = None
|
password = None
|
||||||
@@ -829,11 +827,16 @@ class PasswordManager:
|
|||||||
seed_bytes = Bip39SeedGenerator(self.parent_seed).Generate()
|
seed_bytes = Bip39SeedGenerator(self.parent_seed).Generate()
|
||||||
self.derive_key_hierarchy(seed_bytes)
|
self.derive_key_hierarchy(seed_bytes)
|
||||||
self.bip85 = BIP85(seed_bytes)
|
self.bip85 = BIP85(seed_bytes)
|
||||||
|
except DecryptionError as e:
|
||||||
|
logger.error(f"Failed to load parent seed: {e}", exc_info=True)
|
||||||
|
print(colored("Incorrect password or corrupt file", "red"))
|
||||||
|
raise SeedPassError("Incorrect password or corrupt file") from e
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to load parent seed: {e}", exc_info=True)
|
logger.error(f"Failed to load parent seed: {e}", exc_info=True)
|
||||||
print(colored(f"Error: Failed to load parent seed: {e}", "red"))
|
print(colored(f"Error: Failed to load parent seed: {e}", "red"))
|
||||||
raise SeedPassError(f"Failed to load parent seed: {e}") from e
|
raise SeedPassError(f"Failed to load parent seed: {e}") from e
|
||||||
|
|
||||||
|
@pause_logging_for_ui
|
||||||
@requires_unlocked
|
@requires_unlocked
|
||||||
def handle_switch_fingerprint(self, *, password: Optional[str] = None) -> bool:
|
def handle_switch_fingerprint(self, *, password: Optional[str] = None) -> bool:
|
||||||
return self.profile_service.handle_switch_fingerprint(password=password)
|
return self.profile_service.handle_switch_fingerprint(password=password)
|
||||||
@@ -894,6 +897,7 @@ class PasswordManager:
|
|||||||
self.update_activity()
|
self.update_activity()
|
||||||
self.start_background_sync()
|
self.start_background_sync()
|
||||||
|
|
||||||
|
@pause_logging_for_ui
|
||||||
def handle_existing_seed(self, *, password: Optional[str] = None) -> None:
|
def handle_existing_seed(self, *, password: Optional[str] = None) -> None:
|
||||||
"""
|
"""
|
||||||
Handles the scenario where an existing parent seed file is found.
|
Handles the scenario where an existing parent seed file is found.
|
||||||
@@ -980,6 +984,7 @@ class PasswordManager:
|
|||||||
print(colored(f"Error: Failed to decrypt parent seed: {e}", "red"))
|
print(colored(f"Error: Failed to decrypt parent seed: {e}", "red"))
|
||||||
raise SeedPassError(f"Failed to decrypt parent seed: {e}") from e
|
raise SeedPassError(f"Failed to decrypt parent seed: {e}") from e
|
||||||
|
|
||||||
|
@pause_logging_for_ui
|
||||||
def handle_new_seed_setup(self) -> None:
|
def handle_new_seed_setup(self) -> None:
|
||||||
"""
|
"""
|
||||||
Handles the setup process when no existing parent seed is found.
|
Handles the setup process when no existing parent seed is found.
|
||||||
@@ -1881,9 +1886,11 @@ class PasswordManager:
|
|||||||
self.notify("Starting with a new, empty vault.", level="INFO")
|
self.notify("Starting with a new, empty vault.", level="INFO")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@pause_logging_for_ui
|
||||||
def handle_add_password(self) -> None:
|
def handle_add_password(self) -> None:
|
||||||
self.entry_service.handle_add_password()
|
self.entry_service.handle_add_password()
|
||||||
|
|
||||||
|
@pause_logging_for_ui
|
||||||
def handle_add_totp(self) -> None:
|
def handle_add_totp(self) -> None:
|
||||||
"""Add a TOTP entry either derived from the seed or imported."""
|
"""Add a TOTP entry either derived from the seed or imported."""
|
||||||
try:
|
try:
|
||||||
@@ -2025,6 +2032,7 @@ class PasswordManager:
|
|||||||
print(colored(f"Error: Failed to add TOTP: {e}", "red"))
|
print(colored(f"Error: Failed to add TOTP: {e}", "red"))
|
||||||
pause()
|
pause()
|
||||||
|
|
||||||
|
@pause_logging_for_ui
|
||||||
def handle_add_ssh_key(self) -> None:
|
def handle_add_ssh_key(self) -> None:
|
||||||
"""Add an SSH key pair entry and display the derived keys."""
|
"""Add an SSH key pair entry and display the derived keys."""
|
||||||
try:
|
try:
|
||||||
@@ -2082,6 +2090,7 @@ class PasswordManager:
|
|||||||
print(colored(f"Error: Failed to add SSH key: {e}", "red"))
|
print(colored(f"Error: Failed to add SSH key: {e}", "red"))
|
||||||
pause()
|
pause()
|
||||||
|
|
||||||
|
@pause_logging_for_ui
|
||||||
def handle_add_seed(self) -> None:
|
def handle_add_seed(self) -> None:
|
||||||
"""Add a derived BIP-39 seed phrase entry."""
|
"""Add a derived BIP-39 seed phrase entry."""
|
||||||
try:
|
try:
|
||||||
@@ -2151,6 +2160,7 @@ class PasswordManager:
|
|||||||
print(colored(f"Error: Failed to add seed phrase: {e}", "red"))
|
print(colored(f"Error: Failed to add seed phrase: {e}", "red"))
|
||||||
pause()
|
pause()
|
||||||
|
|
||||||
|
@pause_logging_for_ui
|
||||||
def handle_add_pgp(self) -> None:
|
def handle_add_pgp(self) -> None:
|
||||||
"""Add a PGP key entry and display the generated key."""
|
"""Add a PGP key entry and display the generated key."""
|
||||||
try:
|
try:
|
||||||
@@ -2218,6 +2228,7 @@ class PasswordManager:
|
|||||||
print(colored(f"Error: Failed to add PGP key: {e}", "red"))
|
print(colored(f"Error: Failed to add PGP key: {e}", "red"))
|
||||||
pause()
|
pause()
|
||||||
|
|
||||||
|
@pause_logging_for_ui
|
||||||
def handle_add_nostr_key(self) -> None:
|
def handle_add_nostr_key(self) -> None:
|
||||||
"""Add a Nostr key entry and display the derived keys."""
|
"""Add a Nostr key entry and display the derived keys."""
|
||||||
try:
|
try:
|
||||||
@@ -2277,6 +2288,7 @@ class PasswordManager:
|
|||||||
print(colored(f"Error: Failed to add Nostr key: {e}", "red"))
|
print(colored(f"Error: Failed to add Nostr key: {e}", "red"))
|
||||||
pause()
|
pause()
|
||||||
|
|
||||||
|
@pause_logging_for_ui
|
||||||
def handle_add_key_value(self) -> None:
|
def handle_add_key_value(self) -> None:
|
||||||
"""Add a generic key/value entry."""
|
"""Add a generic key/value entry."""
|
||||||
try:
|
try:
|
||||||
@@ -2358,6 +2370,7 @@ class PasswordManager:
|
|||||||
print(colored(f"Error: Failed to add key/value entry: {e}", "red"))
|
print(colored(f"Error: Failed to add key/value entry: {e}", "red"))
|
||||||
pause()
|
pause()
|
||||||
|
|
||||||
|
@pause_logging_for_ui
|
||||||
def handle_add_managed_account(self) -> None:
|
def handle_add_managed_account(self) -> None:
|
||||||
"""Add a managed account seed entry."""
|
"""Add a managed account seed entry."""
|
||||||
try:
|
try:
|
||||||
@@ -3147,6 +3160,7 @@ class PasswordManager:
|
|||||||
print(colored("Error: Failed to retrieve the password.", "red"))
|
print(colored("Error: Failed to retrieve the password.", "red"))
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@pause_logging_for_ui
|
||||||
def handle_retrieve_entry(self) -> None:
|
def handle_retrieve_entry(self) -> None:
|
||||||
"""Prompt for an index and display the corresponding entry."""
|
"""Prompt for an index and display the corresponding entry."""
|
||||||
try:
|
try:
|
||||||
@@ -3181,6 +3195,7 @@ class PasswordManager:
|
|||||||
print(colored(f"Error: Failed to retrieve password: {e}", "red"))
|
print(colored(f"Error: Failed to retrieve password: {e}", "red"))
|
||||||
pause()
|
pause()
|
||||||
|
|
||||||
|
@pause_logging_for_ui
|
||||||
def handle_modify_entry(self) -> None:
|
def handle_modify_entry(self) -> None:
|
||||||
"""
|
"""
|
||||||
Handles modifying an existing password entry by prompting the user for the index number
|
Handles modifying an existing password entry by prompting the user for the index number
|
||||||
@@ -3621,6 +3636,7 @@ class PasswordManager:
|
|||||||
logging.error(f"Error during modifying entry: {e}", exc_info=True)
|
logging.error(f"Error during modifying entry: {e}", exc_info=True)
|
||||||
print(colored(f"Error: Failed to modify entry: {e}", "red"))
|
print(colored(f"Error: Failed to modify entry: {e}", "red"))
|
||||||
|
|
||||||
|
@pause_logging_for_ui
|
||||||
def handle_search_entries(self) -> None:
|
def handle_search_entries(self) -> None:
|
||||||
"""Prompt for a query, list matches and optionally show details."""
|
"""Prompt for a query, list matches and optionally show details."""
|
||||||
try:
|
try:
|
||||||
@@ -3830,6 +3846,7 @@ class PasswordManager:
|
|||||||
)
|
)
|
||||||
print("-" * 40)
|
print("-" * 40)
|
||||||
|
|
||||||
|
@pause_logging_for_ui
|
||||||
def handle_list_entries(self) -> None:
|
def handle_list_entries(self) -> None:
|
||||||
self.menu_handler.handle_list_entries()
|
self.menu_handler.handle_list_entries()
|
||||||
|
|
||||||
@@ -3870,6 +3887,7 @@ class PasswordManager:
|
|||||||
logging.error(f"Error during entry deletion: {e}", exc_info=True)
|
logging.error(f"Error during entry deletion: {e}", exc_info=True)
|
||||||
print(colored(f"Error: Failed to delete entry: {e}", "red"))
|
print(colored(f"Error: Failed to delete entry: {e}", "red"))
|
||||||
|
|
||||||
|
@pause_logging_for_ui
|
||||||
def handle_archive_entry(self) -> None:
|
def handle_archive_entry(self) -> None:
|
||||||
"""Archive an entry without deleting it."""
|
"""Archive an entry without deleting it."""
|
||||||
try:
|
try:
|
||||||
@@ -3888,6 +3906,7 @@ class PasswordManager:
|
|||||||
logging.error(f"Error archiving entry: {e}", exc_info=True)
|
logging.error(f"Error archiving entry: {e}", exc_info=True)
|
||||||
print(colored(f"Error: Failed to archive entry: {e}", "red"))
|
print(colored(f"Error: Failed to archive entry: {e}", "red"))
|
||||||
|
|
||||||
|
@pause_logging_for_ui
|
||||||
def handle_view_archived_entries(self) -> None:
|
def handle_view_archived_entries(self) -> None:
|
||||||
"""Display archived entries and optionally view or restore them."""
|
"""Display archived entries and optionally view or restore them."""
|
||||||
try:
|
try:
|
||||||
@@ -3955,9 +3974,11 @@ class PasswordManager:
|
|||||||
logging.error(f"Error viewing archived entries: {e}", exc_info=True)
|
logging.error(f"Error viewing archived entries: {e}", exc_info=True)
|
||||||
print(colored(f"Error: Failed to view archived entries: {e}", "red"))
|
print(colored(f"Error: Failed to view archived entries: {e}", "red"))
|
||||||
|
|
||||||
|
@pause_logging_for_ui
|
||||||
def handle_display_totp_codes(self) -> None:
|
def handle_display_totp_codes(self) -> None:
|
||||||
self.menu_handler.handle_display_totp_codes()
|
self.menu_handler.handle_display_totp_codes()
|
||||||
|
|
||||||
|
@pause_logging_for_ui
|
||||||
def handle_verify_checksum(self) -> None:
|
def handle_verify_checksum(self) -> None:
|
||||||
"""
|
"""
|
||||||
Handles verifying the script's checksum against the stored checksum to ensure integrity.
|
Handles verifying the script's checksum against the stored checksum to ensure integrity.
|
||||||
@@ -3997,6 +4018,7 @@ class PasswordManager:
|
|||||||
logging.error(f"Error during checksum verification: {e}", exc_info=True)
|
logging.error(f"Error during checksum verification: {e}", exc_info=True)
|
||||||
print(colored(f"Error: Failed to verify checksum: {e}", "red"))
|
print(colored(f"Error: Failed to verify checksum: {e}", "red"))
|
||||||
|
|
||||||
|
@pause_logging_for_ui
|
||||||
def handle_update_script_checksum(self) -> None:
|
def handle_update_script_checksum(self) -> None:
|
||||||
"""Generate a new checksum for the manager script."""
|
"""Generate a new checksum for the manager script."""
|
||||||
if not confirm_action("Generate new script checksum? (Y/N): "):
|
if not confirm_action("Generate new script checksum? (Y/N): "):
|
||||||
@@ -4142,6 +4164,7 @@ class PasswordManager:
|
|||||||
logging.error(f"Failed to restore backup: {e}", exc_info=True)
|
logging.error(f"Failed to restore backup: {e}", exc_info=True)
|
||||||
print(colored(f"Error: Failed to restore backup: {e}", "red"))
|
print(colored(f"Error: Failed to restore backup: {e}", "red"))
|
||||||
|
|
||||||
|
@pause_logging_for_ui
|
||||||
def handle_export_database(
|
def handle_export_database(
|
||||||
self,
|
self,
|
||||||
dest: Path | None = None,
|
dest: Path | None = None,
|
||||||
@@ -4184,6 +4207,7 @@ class PasswordManager:
|
|||||||
print(colored(f"Error: Failed to export database: {e}", "red"))
|
print(colored(f"Error: Failed to export database: {e}", "red"))
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
@pause_logging_for_ui
|
||||||
def handle_import_database(self, src: Path) -> None:
|
def handle_import_database(self, src: Path) -> None:
|
||||||
"""Import a portable database file, replacing the current index."""
|
"""Import a portable database file, replacing the current index."""
|
||||||
|
|
||||||
@@ -4223,7 +4247,7 @@ class PasswordManager:
|
|||||||
src,
|
src,
|
||||||
parent_seed=self.parent_seed,
|
parent_seed=self.parent_seed,
|
||||||
)
|
)
|
||||||
except InvalidToken:
|
except DecryptionError:
|
||||||
logging.error("Invalid backup token during import", exc_info=True)
|
logging.error("Invalid backup token during import", exc_info=True)
|
||||||
print(
|
print(
|
||||||
colored(
|
colored(
|
||||||
@@ -4266,6 +4290,7 @@ class PasswordManager:
|
|||||||
print(colored("Database imported successfully.", "green"))
|
print(colored("Database imported successfully.", "green"))
|
||||||
self.sync_vault()
|
self.sync_vault()
|
||||||
|
|
||||||
|
@pause_logging_for_ui
|
||||||
def handle_export_totp_codes(self) -> Path | None:
|
def handle_export_totp_codes(self) -> Path | None:
|
||||||
"""Export all 2FA codes to a JSON file for other authenticator apps."""
|
"""Export all 2FA codes to a JSON file for other authenticator apps."""
|
||||||
try:
|
try:
|
||||||
@@ -4336,6 +4361,7 @@ class PasswordManager:
|
|||||||
print(colored(f"Error: Failed to export 2FA codes: {e}", "red"))
|
print(colored(f"Error: Failed to export 2FA codes: {e}", "red"))
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
@pause_logging_for_ui
|
||||||
def handle_backup_reveal_parent_seed(
|
def handle_backup_reveal_parent_seed(
|
||||||
self, file: Path | None = None, *, password: Optional[str] = None
|
self, file: Path | None = None, *, password: Optional[str] = None
|
||||||
) -> None:
|
) -> None:
|
||||||
@@ -4463,7 +4489,7 @@ class PasswordManager:
|
|||||||
else:
|
else:
|
||||||
logging.warning("Password verification failed.")
|
logging.warning("Password verification failed.")
|
||||||
return is_correct
|
return is_correct
|
||||||
except InvalidToken as e:
|
except DecryptionError as e:
|
||||||
logging.error(f"Failed to decrypt config: {e}")
|
logging.error(f"Failed to decrypt config: {e}")
|
||||||
print(
|
print(
|
||||||
colored(
|
colored(
|
||||||
|
@@ -10,6 +10,7 @@ from .entry_types import EntryType, ALL_ENTRY_TYPES
|
|||||||
import seedpass.core.manager as manager_module
|
import seedpass.core.manager as manager_module
|
||||||
from utils.color_scheme import color_text
|
from utils.color_scheme import color_text
|
||||||
from utils.terminal_utils import clear_header_with_notification
|
from utils.terminal_utils import clear_header_with_notification
|
||||||
|
from utils.logging_utils import pause_logging_for_ui
|
||||||
|
|
||||||
if TYPE_CHECKING: # pragma: no cover - typing only
|
if TYPE_CHECKING: # pragma: no cover - typing only
|
||||||
from .manager import PasswordManager
|
from .manager import PasswordManager
|
||||||
@@ -21,6 +22,7 @@ class MenuHandler:
|
|||||||
def __init__(self, manager: PasswordManager) -> None:
|
def __init__(self, manager: PasswordManager) -> None:
|
||||||
self.manager = manager
|
self.manager = manager
|
||||||
|
|
||||||
|
@pause_logging_for_ui
|
||||||
def handle_list_entries(self) -> None:
|
def handle_list_entries(self) -> None:
|
||||||
"""List entries and optionally show details."""
|
"""List entries and optionally show details."""
|
||||||
pm = self.manager
|
pm = self.manager
|
||||||
@@ -86,6 +88,7 @@ class MenuHandler:
|
|||||||
logging.error(f"Failed to list entries: {e}", exc_info=True)
|
logging.error(f"Failed to list entries: {e}", exc_info=True)
|
||||||
print(colored(f"Error: Failed to list entries: {e}", "red"))
|
print(colored(f"Error: Failed to list entries: {e}", "red"))
|
||||||
|
|
||||||
|
@pause_logging_for_ui
|
||||||
def handle_display_totp_codes(self) -> None:
|
def handle_display_totp_codes(self) -> None:
|
||||||
"""Display all stored TOTP codes with a countdown progress bar."""
|
"""Display all stored TOTP codes with a countdown progress bar."""
|
||||||
pm = self.manager
|
pm = self.manager
|
||||||
|
23
src/tests/test_invalid_password_message.py
Normal file
23
src/tests/test_invalid_password_message.py
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
from pathlib import Path
|
||||||
|
from tempfile import TemporaryDirectory
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from seedpass.core.manager import PasswordManager
|
||||||
|
from seedpass.core.config_manager import ConfigManager
|
||||||
|
from seedpass.core.errors import SeedPassError
|
||||||
|
from helpers import create_vault, TEST_SEED, TEST_PASSWORD
|
||||||
|
|
||||||
|
|
||||||
|
def test_invalid_password_shows_friendly_message_once(capsys):
|
||||||
|
with TemporaryDirectory() as tmpdir:
|
||||||
|
tmp_path = Path(tmpdir)
|
||||||
|
vault, _ = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD)
|
||||||
|
pm = PasswordManager.__new__(PasswordManager)
|
||||||
|
pm.config_manager = ConfigManager(vault, tmp_path)
|
||||||
|
pm.fingerprint_dir = tmp_path
|
||||||
|
pm.parent_seed = ""
|
||||||
|
with pytest.raises(SeedPassError):
|
||||||
|
pm.load_parent_seed(tmp_path, password="wrongpass")
|
||||||
|
captured = capsys.readouterr().out
|
||||||
|
assert captured.count("Incorrect password or corrupt file") == 1
|
@@ -1,4 +1,7 @@
|
|||||||
import logging
|
import logging
|
||||||
|
from pathlib import Path
|
||||||
|
from contextlib import contextmanager
|
||||||
|
from functools import wraps
|
||||||
|
|
||||||
_console_paused = False
|
_console_paused = False
|
||||||
|
|
||||||
@@ -12,6 +15,20 @@ class ConsolePauseFilter(logging.Filter):
|
|||||||
return not _console_paused
|
return not _console_paused
|
||||||
|
|
||||||
|
|
||||||
|
class ChecksumWarningFilter(logging.Filter):
|
||||||
|
"""Filter allowing only checksum warnings and errors to surface."""
|
||||||
|
|
||||||
|
def filter(
|
||||||
|
self, record: logging.LogRecord
|
||||||
|
) -> bool: # pragma: no cover - simple filter
|
||||||
|
if record.levelno >= logging.ERROR:
|
||||||
|
return True
|
||||||
|
return (
|
||||||
|
record.levelno == logging.WARNING
|
||||||
|
and Path(record.pathname).name == "checksum.py"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def pause_console_logging() -> None:
|
def pause_console_logging() -> None:
|
||||||
"""Temporarily pause logging to console handlers."""
|
"""Temporarily pause logging to console handlers."""
|
||||||
global _console_paused
|
global _console_paused
|
||||||
@@ -22,3 +39,24 @@ def resume_console_logging() -> None:
|
|||||||
"""Resume logging to console handlers."""
|
"""Resume logging to console handlers."""
|
||||||
global _console_paused
|
global _console_paused
|
||||||
_console_paused = False
|
_console_paused = False
|
||||||
|
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def console_logging_paused() -> None:
|
||||||
|
"""Context manager to pause console logging within a block."""
|
||||||
|
pause_console_logging()
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
resume_console_logging()
|
||||||
|
|
||||||
|
|
||||||
|
def pause_logging_for_ui(func):
|
||||||
|
"""Decorator to pause console logging while ``func`` executes."""
|
||||||
|
|
||||||
|
@wraps(func)
|
||||||
|
def wrapper(*args, **kwargs):
|
||||||
|
with console_logging_paused():
|
||||||
|
return func(*args, **kwargs)
|
||||||
|
|
||||||
|
return wrapper
|
||||||
|
Reference in New Issue
Block a user