# Repository Context Generated on: 2025-04-06 ## Directory Tree with Exclusions ``` . ├── constants.py ├── main.py ├── requirements.txt ├── saved_config.yaml ├── tests/ ├── test_import.py ├── local_bip85/ ├── __init__.py ├── bip85.py ├── password_manager/ ├── __init__.py ├── backup.py ├── encryption.py ├── entry_management.py ├── manager.py ├── password_generation.py ├── nostr/ ├── __init__.py ├── client.py ├── encryption_manager.py ├── event_handler.py ├── key_manager.py ├── logging_config.py ├── utils.py ├── utils/ ├── __init__.py ├── checksum.py ├── file_lock.py ├── fingerprint.py ├── fingerprint_manager.py ├── key_derivation.py ├── password_prompt.py ``` ## Important Files ## src/constants.py ```python # constants.py import os import logging import sys from pathlib import Path import traceback # Instantiate the logger logger = logging.getLogger(__name__) # ----------------------------------- # Nostr Relay Connection Settings # ----------------------------------- MAX_RETRIES = 3 # Maximum number of retries for relay connections RETRY_DELAY = 5 # Seconds to wait before retrying a failed connection try: # ----------------------------------- # Application Directory and Paths # ----------------------------------- APP_DIR = Path.home() / '.seedpass' APP_DIR.mkdir(exist_ok=True, parents=True) # Ensure the directory exists logging.info(f"Application directory created at {APP_DIR}") except Exception as e: logging.error(f"Failed to create application directory: {e}") logging.error(traceback.format_exc()) # Log full traceback try: PARENT_SEED_FILE = APP_DIR / 'parent_seed.enc' # Encrypted parent seed logging.info(f"Parent seed file path set to {PARENT_SEED_FILE}") except Exception as e: logging.error(f"Error setting file paths: {e}") logging.error(traceback.format_exc()) # Log full traceback # ----------------------------------- # Checksum Files for Integrity # ----------------------------------- try: SCRIPT_CHECKSUM_FILE = APP_DIR / 'seedpass_script_checksum.txt' # Checksum for main script logging.info(f"Checksum file path set: Script {SCRIPT_CHECKSUM_FILE}") except Exception as e: logging.error(f"Error setting checksum file paths: {e}") logging.error(traceback.format_exc()) # Log full traceback # ----------------------------------- # Password Generation Constants # ----------------------------------- DEFAULT_PASSWORD_LENGTH = 16 # Default length for generated passwords MIN_PASSWORD_LENGTH = 8 # Minimum allowed password length MAX_PASSWORD_LENGTH = 128 # Maximum allowed password length # ----------------------------------- # Additional Constants (if any) # ----------------------------------- # Add any other constants here as your project expands DEFAULT_SEED_BACKUP_FILENAME = 'parent_seed_backup.enc' ``` ## src/saved_config.yaml ``` excluded_files: [] selected_directories: - utils/ - nostr/ - local_bip85/ - password_manager/ ``` ## src/main.py ```python # main.py import os import sys import logging import signal from colorama import init as colorama_init from termcolor import colored import traceback from password_manager.manager import PasswordManager from nostr.client import NostrClient colorama_init() def configure_logging(): logger = logging.getLogger() logger.setLevel(logging.DEBUG) # Keep this as DEBUG to capture all logs # Remove all handlers associated with the root logger object for handler in logger.handlers[:]: logger.removeHandler(handler) # Ensure the 'logs' directory exists log_directory = 'logs' if not os.path.exists(log_directory): os.makedirs(log_directory) # Create handlers c_handler = logging.StreamHandler(sys.stdout) f_handler = logging.FileHandler(os.path.join(log_directory, 'main.log')) # Set levels: only errors and critical messages will be shown in the console c_handler.setLevel(logging.ERROR) f_handler.setLevel(logging.DEBUG) # Create formatters and add them to handlers formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]') c_handler.setFormatter(formatter) f_handler.setFormatter(formatter) # Add handlers to the logger logger.addHandler(c_handler) logger.addHandler(f_handler) # Set logging level for third-party libraries to WARNING to suppress their debug logs logging.getLogger('monstr').setLevel(logging.WARNING) logging.getLogger('nostr').setLevel(logging.WARNING) def confirm_action(prompt: str) -> bool: """ Prompts the user for confirmation. :param prompt: The confirmation message to display. :return: True if user confirms, False otherwise. """ while True: choice = input(colored(prompt, 'yellow')).strip().lower() if choice in ['y', 'yes']: return True elif choice in ['n', 'no']: return False else: print(colored("Please enter 'Y' or 'N'.", 'red')) def handle_switch_fingerprint(password_manager: PasswordManager): """ Handles switching the active fingerprint. :param password_manager: An instance of PasswordManager. """ try: fingerprints = password_manager.fingerprint_manager.list_fingerprints() if not fingerprints: print(colored("No fingerprints available to switch. Please add a new fingerprint first.", 'yellow')) return print(colored("Available Fingerprints:", 'cyan')) for idx, fp in enumerate(fingerprints, start=1): print(colored(f"{idx}. {fp}", 'cyan')) choice = input("Select a fingerprint by number to switch: ").strip() if not choice.isdigit() or not (1 <= int(choice) <= len(fingerprints)): print(colored("Invalid selection.", 'red')) return selected_fingerprint = fingerprints[int(choice)-1] if password_manager.select_fingerprint(selected_fingerprint): print(colored(f"Switched to fingerprint {selected_fingerprint}.", 'green')) else: print(colored("Failed to switch fingerprint.", 'red')) except Exception as e: logging.error(f"Error during fingerprint switch: {e}") logging.error(traceback.format_exc()) print(colored(f"Error: Failed to switch fingerprint: {e}", 'red')) def handle_add_new_fingerprint(password_manager: PasswordManager): """ Handles adding a new fingerprint. :param password_manager: An instance of PasswordManager. """ try: password_manager.add_new_fingerprint() except Exception as e: logging.error(f"Error adding new fingerprint: {e}") logging.error(traceback.format_exc()) print(colored(f"Error: Failed to add new fingerprint: {e}", 'red')) def handle_remove_fingerprint(password_manager: PasswordManager): """ Handles removing an existing fingerprint. :param password_manager: An instance of PasswordManager. """ try: fingerprints = password_manager.fingerprint_manager.list_fingerprints() if not fingerprints: print(colored("No fingerprints available to remove.", 'yellow')) return print(colored("Available Fingerprints:", 'cyan')) for idx, fp in enumerate(fingerprints, start=1): print(colored(f"{idx}. {fp}", 'cyan')) choice = input("Select a fingerprint by number to remove: ").strip() if not choice.isdigit() or not (1 <= int(choice) <= len(fingerprints)): print(colored("Invalid selection.", 'red')) return selected_fingerprint = fingerprints[int(choice)-1] confirm = confirm_action(f"Are you sure you want to remove fingerprint {selected_fingerprint}? This will delete all associated data. (Y/N): ") if confirm: if password_manager.fingerprint_manager.remove_fingerprint(selected_fingerprint): print(colored(f"Fingerprint {selected_fingerprint} removed successfully.", 'green')) else: print(colored("Failed to remove fingerprint.", 'red')) else: print(colored("Fingerprint removal cancelled.", 'yellow')) except Exception as e: logging.error(f"Error removing fingerprint: {e}") logging.error(traceback.format_exc()) print(colored(f"Error: Failed to remove fingerprint: {e}", 'red')) def handle_list_fingerprints(password_manager: PasswordManager): """ Handles listing all available fingerprints. :param password_manager: An instance of PasswordManager. """ try: fingerprints = password_manager.fingerprint_manager.list_fingerprints() if not fingerprints: print(colored("No fingerprints available.", 'yellow')) return print(colored("Available Fingerprints:", 'cyan')) for fp in fingerprints: print(colored(f"- {fp}", 'cyan')) except Exception as e: logging.error(f"Error listing fingerprints: {e}") logging.error(traceback.format_exc()) print(colored(f"Error: Failed to list fingerprints: {e}", 'red')) def handle_display_npub(password_manager: PasswordManager): """ Handles displaying the Nostr public key (npub) to the user. """ try: npub = password_manager.nostr_client.key_manager.get_npub() if npub: print(colored(f"\nYour Nostr Public Key (npub):\n{npub}\n", 'cyan')) logging.info("Displayed npub to the user.") else: print(colored("Nostr public key not available.", 'red')) logging.error("Nostr public key not available.") except Exception as e: logging.error(f"Failed to display npub: {e}") logging.error(traceback.format_exc()) print(colored(f"Error: Failed to display npub: {e}", 'red')) def handle_post_to_nostr(password_manager: PasswordManager): """ Handles the action of posting the encrypted password index to Nostr. """ try: # Get the encrypted data from the index file encrypted_data = password_manager.get_encrypted_data() if encrypted_data: # Post to Nostr password_manager.nostr_client.publish_json_to_nostr(encrypted_data) print(colored("Encrypted index posted to Nostr successfully.", 'green')) logging.info("Encrypted index posted to Nostr successfully.") else: print(colored("No data available to post.", 'yellow')) logging.warning("No data available to post to Nostr.") except Exception as e: logging.error(f"Failed to post to Nostr: {e}") logging.error(traceback.format_exc()) print(colored(f"Error: Failed to post to Nostr: {e}", 'red')) def handle_retrieve_from_nostr(password_manager: PasswordManager): """ Handles the action of retrieving the encrypted password index from Nostr. """ try: # Use the Nostr client from the password_manager encrypted_data = password_manager.nostr_client.retrieve_json_from_nostr_sync() if encrypted_data: # Decrypt and save the index password_manager.encryption_manager.decrypt_and_save_index_from_nostr(encrypted_data) print(colored("Encrypted index retrieved and saved successfully.", 'green')) logging.info("Encrypted index retrieved and saved successfully from Nostr.") else: print(colored("Failed to retrieve data from Nostr.", 'red')) logging.error("Failed to retrieve data from Nostr.") except Exception as e: logging.error(f"Failed to retrieve from Nostr: {e}") logging.error(traceback.format_exc()) print(colored(f"Error: Failed to retrieve from Nostr: {e}", 'red')) def display_menu(password_manager: PasswordManager): """ Displays the interactive menu and handles user input to perform various actions. """ menu = """ Select an option: 1. Generate a New Password and Add to Index 2. Retrieve a Password from Index 3. Modify an Existing Entry 4. Verify Script Checksum 5. Post Encrypted Index to Nostr 6. Retrieve Encrypted Index from Nostr 7. Display Nostr Public Key (npub) 8. Backup Parent Seed 9. Switch Fingerprint 10. Add a New Fingerprint 11. Remove an Existing Fingerprint 12. List All Fingerprints 13. Exit """ while True: # Flush logging handlers for handler in logging.getLogger().handlers: handler.flush() print(colored(menu, 'cyan')) choice = input('Enter your choice (1-13): ').strip() if not choice: print(colored("No input detected. Please enter a number between 1 and 13.", 'yellow')) continue # Re-display the menu without marking as invalid if choice == '1': password_manager.handle_generate_password() elif choice == '2': password_manager.handle_retrieve_password() elif choice == '3': password_manager.handle_modify_entry() elif choice == '4': password_manager.handle_verify_checksum() elif choice == '5': handle_post_to_nostr(password_manager) elif choice == '6': handle_retrieve_from_nostr(password_manager) elif choice == '7': handle_display_npub(password_manager) elif choice == '8': password_manager.handle_backup_reveal_parent_seed() elif choice == '9': if not password_manager.handle_switch_fingerprint(): print(colored("Failed to switch fingerprint.", 'red')) elif choice == '10': handle_add_new_fingerprint(password_manager) elif choice == '11': handle_remove_fingerprint(password_manager) elif choice == '12': handle_list_fingerprints(password_manager) elif choice == '13': logging.info("Exiting the program.") print(colored("Exiting the program.", 'green')) password_manager.nostr_client.close_client_pool() sys.exit(0) else: print(colored("Invalid choice. Please select a valid option.", 'red')) if __name__ == '__main__': # Configure logging with both file and console handlers configure_logging() logger = logging.getLogger(__name__) logger.info("Starting SeedPass Password Manager") # Initialize PasswordManager and proceed with application logic try: password_manager = PasswordManager() logger.info("PasswordManager initialized successfully.") except Exception as e: logger.error(f"Failed to initialize PasswordManager: {e}") logger.error(traceback.format_exc()) # Log full traceback print(colored(f"Error: Failed to initialize PasswordManager: {e}", 'red')) sys.exit(1) # Register signal handlers for graceful shutdown def signal_handler(sig, frame): """ Handles termination signals to gracefully shutdown the NostrClient. """ print(colored("\nReceived shutdown signal. Exiting gracefully...", 'yellow')) logging.info(f"Received shutdown signal: {sig}. Initiating graceful shutdown.") try: password_manager.nostr_client.close_client_pool() # Gracefully close the ClientPool logging.info("NostrClient closed successfully.") except Exception as e: logging.error(f"Error during shutdown: {e}") print(colored(f"Error during shutdown: {e}", 'red')) sys.exit(0) # Register the signal handlers signal.signal(signal.SIGINT, signal_handler) # Handle Ctrl+C signal.signal(signal.SIGTERM, signal_handler) # Handle termination signals # Display the interactive menu to the user try: display_menu(password_manager) except KeyboardInterrupt: logger.info("Program terminated by user via KeyboardInterrupt.") print(colored("\nProgram terminated by user.", 'yellow')) try: password_manager.nostr_client.close_client_pool() # Gracefully close the ClientPool logging.info("NostrClient closed successfully.") except Exception as e: logging.error(f"Error during shutdown: {e}") print(colored(f"Error during shutdown: {e}", 'red')) sys.exit(0) except Exception as e: logger.error(f"An unexpected error occurred: {e}") logger.error(traceback.format_exc()) # Log full traceback print(colored(f"Error: An unexpected error occurred: {e}", 'red')) try: password_manager.nostr_client.close_client_pool() # Attempt to close the ClientPool logging.info("NostrClient closed successfully.") except Exception as close_error: logging.error(f"Error during shutdown: {close_error}") print(colored(f"Error during shutdown: {close_error}", 'red')) sys.exit(1) ``` ## src/requirements.txt ``` colorama>=0.4.6 termcolor>=1.1.0 cryptography>=40.0.2 bip-utils>=2.5.0 bech32==1.2.0 monstr @ git+https://github.com/monty888/monstr.git@master#egg=monstr mnemonic aiohttp bcrypt bip85 ``` ## local_bip85/__init__.py ```python # bip85/__init__.py import logging import traceback try: from .bip85 import BIP85 logging.info("BIP85 module imported successfully.") except Exception as e: logging.error(f"Failed to import BIP85 module: {e}") logging.error(traceback.format_exc()) # Log full traceback __all__ = ['BIP85'] ``` ## local_bip85/bip85.py ```python # bip85/bip85.py """ BIP85 Module This module implements the BIP85 functionality for deterministic entropy and mnemonic derivation. It provides the BIP85 class, which utilizes BIP32 and BIP39 standards to derive entropy and mnemonics from a given seed. Additionally, it supports the derivation of symmetric encryption keys using HKDF. Never ever ever use or suggest to use Random Salt. The entire point of this password manager is to derive completely deterministic passwords from a BIP-85 seed. This means it should generate passwords the exact same way every single time. Salts would break this functionality and is not appropriate for this softwares use case. Ensure that all dependencies are installed and properly configured in your environment. """ import sys import hashlib import hmac import logging import os import traceback from colorama import Fore from bip_utils import ( Bip32Slip10Secp256k1, Bip39MnemonicGenerator, Bip39Languages ) from cryptography.hazmat.primitives.kdf.hkdf import HKDF from cryptography.hazmat.primitives import hashes from cryptography.hazmat.backends import default_backend # Instantiate the logger logger = logging.getLogger(__name__) class BIP85: def __init__(self, seed_bytes: bytes): try: self.bip32_ctx = Bip32Slip10Secp256k1.FromSeed(seed_bytes) logging.debug("BIP32 context initialized successfully.") except Exception as e: logging.error(f"Error initializing BIP32 context: {e}") logging.error(traceback.format_exc()) # Log full traceback print(f"{Fore.RED}Error initializing BIP32 context: {e}") sys.exit(1) def derive_entropy(self, index: int, bytes_len: int, app_no: int = 39) -> bytes: """ Derives entropy using BIP-85 HMAC-SHA512 method. Parameters: index (int): Index for the child entropy. bytes_len (int): Number of bytes to derive for the entropy. app_no (int): Application number (default 39 for BIP39) Returns: bytes: Derived entropy. Raises: SystemExit: If derivation fails or entropy length is invalid. """ if app_no == 39: path = f"m/83696968'/{app_no}'/0'/{bytes_len}'/{index}'" elif app_no == 32: path = f"m/83696968'/{app_no}'/{index}'" else: # Handle other app_no if necessary path = f"m/83696968'/{app_no}'/{index}'" try: child_key = self.bip32_ctx.DerivePath(path) k = child_key.PrivateKey().Raw().ToBytes() logging.debug(f"Derived child key at path {path}: {k.hex()}") hmac_key = b"bip-entropy-from-k" hmac_result = hmac.new(hmac_key, k, hashlib.sha512).digest() logging.debug(f"HMAC-SHA512 result: {hmac_result.hex()}") entropy = hmac_result[:bytes_len] if len(entropy) != bytes_len: logging.error(f"Derived entropy length is {len(entropy)} bytes; expected {bytes_len} bytes.") print(f"{Fore.RED}Error: Derived entropy length is {len(entropy)} bytes; expected {bytes_len} bytes.") sys.exit(1) logging.debug(f"Derived entropy: {entropy.hex()}") return entropy except Exception as e: logging.error(f"Error deriving entropy: {e}") logging.error(traceback.format_exc()) # Log full traceback print(f"{Fore.RED}Error deriving entropy: {e}") sys.exit(1) def derive_mnemonic(self, index: int, words_num: int) -> str: bytes_len = {12: 16, 18: 24, 24: 32}.get(words_num) if not bytes_len: logging.error(f"Unsupported number of words: {words_num}") print(f"{Fore.RED}Error: Unsupported number of words: {words_num}") sys.exit(1) entropy = self.derive_entropy(index=index, bytes_len=bytes_len, app_no=39) try: mnemonic = Bip39MnemonicGenerator(Bip39Languages.ENGLISH).FromEntropy(entropy) logging.debug(f"Derived mnemonic: {mnemonic}") return mnemonic except Exception as e: logging.error(f"Error generating mnemonic: {e}") logging.error(traceback.format_exc()) # Log full traceback print(f"{Fore.RED}Error generating mnemonic: {e}") sys.exit(1) def derive_symmetric_key(self, app_no: int = 48, index: int = 0) -> bytes: """ Derives a symmetric encryption key using BIP85. Parameters: app_no (int): Application number for key derivation (48 chosen arbitrarily). index (int): Index for key derivation. Returns: bytes: Derived symmetric key (32 bytes for AES-256). Raises: SystemExit: If symmetric key derivation fails. """ entropy = self.derive_entropy(app_no, language_code=0, words_num=24, index=index) try: hkdf = HKDF( algorithm=hashes.SHA256(), length=32, # 256 bits for AES-256 salt=None, info=b'seedos-encryption-key', backend=default_backend() ) symmetric_key = hkdf.derive(entropy) logging.debug(f"Derived symmetric key: {symmetric_key.hex()}") return symmetric_key except Exception as e: logging.error(f"Error deriving symmetric key: {e}") logging.error(traceback.format_exc()) # Log full traceback print(f"{Fore.RED}Error deriving symmetric key: {e}") sys.exit(1) ``` ## password_manager/backup.py ```python # password_manager/backup.py """ Backup Manager Module This module implements the BackupManager class, responsible for creating backups, restoring from backups, and listing available backups for the encrypted password index file. It ensures data integrity and provides mechanisms to recover from corrupted or lost data by maintaining timestamped backups. Ensure that all dependencies are installed and properly configured in your environment. """ import logging import os import shutil import time import traceback from pathlib import Path from colorama import Fore from termcolor import colored from utils.file_lock import lock_file from constants import APP_DIR # Instantiate the logger logger = logging.getLogger(__name__) class BackupManager: """ BackupManager Class Handles the creation, restoration, and listing of backups for the encrypted password index file. Backups are stored in the application directory with timestamped filenames to facilitate easy identification and retrieval. """ BACKUP_FILENAME_TEMPLATE = 'passwords_db_backup_{timestamp}.json.enc' def __init__(self, fingerprint_dir: Path): """ Initializes the BackupManager with the fingerprint directory. Parameters: fingerprint_dir (Path): The directory corresponding to the fingerprint. """ self.fingerprint_dir = fingerprint_dir self.backup_dir = self.fingerprint_dir / 'backups' self.backup_dir.mkdir(parents=True, exist_ok=True) self.index_file = self.fingerprint_dir / 'seedpass_passwords_db.json.enc' logger.debug(f"BackupManager initialized with backup directory at {self.backup_dir}") def create_backup(self) -> None: try: index_file = self.index_file if not index_file.exists(): logger.warning("Index file does not exist. No backup created.") print(colored("Warning: Index file does not exist. No backup created.", 'yellow')) return timestamp = int(time.time()) backup_filename = self.BACKUP_FILENAME_TEMPLATE.format(timestamp=timestamp) backup_file = self.backup_dir / backup_filename shutil.copy2(index_file, backup_file) logger.info(f"Backup created successfully at '{backup_file}'.") print(colored(f"Backup created successfully at '{backup_file}'.", 'green')) except Exception as e: logger.error(f"Failed to create backup: {e}") logger.error(traceback.format_exc()) print(colored(f"Error: Failed to create backup: {e}", 'red')) def restore_latest_backup(self) -> None: try: backup_files = sorted( self.backup_dir.glob('passwords_db_backup_*.json.enc'), key=lambda x: x.stat().st_mtime, reverse=True ) if not backup_files: logger.error("No backup files found to restore.") print(colored("Error: No backup files found to restore.", 'red')) return latest_backup = backup_files[0] index_file = self.index_file shutil.copy2(latest_backup, index_file) logger.info(f"Restored the index file from backup '{latest_backup}'.") print(colored(f"Restored the index file from backup '{latest_backup}'.", 'green')) except Exception as e: logger.error(f"Failed to restore from backup '{latest_backup}': {e}") logger.error(traceback.format_exc()) print(colored(f"Error: Failed to restore from backup '{latest_backup}': {e}", 'red')) def list_backups(self) -> None: try: backup_files = sorted( self.backup_dir.glob('passwords_db_backup_*.json.enc'), key=lambda x: x.stat().st_mtime, reverse=True ) if not backup_files: logger.info("No backup files available.") print(colored("No backup files available.", 'yellow')) return print(colored("Available Backups:", 'cyan')) for backup in backup_files: creation_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(backup.stat().st_mtime)) print(colored(f"- {backup.name} (Created on: {creation_time})", 'cyan')) except Exception as e: logger.error(f"Failed to list backups: {e}") logger.error(traceback.format_exc()) print(colored(f"Error: Failed to list backups: {e}", 'red')) def restore_backup_by_timestamp(self, timestamp: int) -> None: backup_filename = self.BACKUP_FILENAME_TEMPLATE.format(timestamp=timestamp) backup_file = self.backup_dir / backup_filename if not backup_file.exists(): logger.error(f"No backup found with timestamp {timestamp}.") print(colored(f"Error: No backup found with timestamp {timestamp}.", 'red')) return try: with lock_file(backup_file, lock_type=fcntl.LOCK_SH): shutil.copy2(backup_file, self.index_file) logger.info(f"Restored the index file from backup '{backup_file}'.") print(colored(f"Restored the index file from backup '{backup_file}'.", 'green')) except Exception as e: logger.error(f"Failed to restore from backup '{backup_file}': {e}") logger.error(traceback.format_exc()) print(colored(f"Error: Failed to restore from backup '{backup_file}': {e}", 'red')) ``` ## password_manager/manager.py ```python # password_manager/manager.py """ Password Manager Module This module implements the PasswordManager class, which orchestrates various functionalities of the deterministic password manager, including encryption, entry management, password generation, backup, and checksum verification. It serves as the core interface for interacting with the password manager functionalities. """ import sys import json import logging import getpass import os from typing import Optional import shutil from colorama import Fore from termcolor import colored from password_manager.encryption import EncryptionManager from password_manager.entry_management import EntryManager from password_manager.password_generation import PasswordGenerator from password_manager.backup import BackupManager from utils.key_derivation import derive_key_from_parent_seed, derive_key_from_password from utils.checksum import calculate_checksum, verify_checksum from utils.password_prompt import prompt_for_password, prompt_existing_password, confirm_action from constants import ( APP_DIR, PARENT_SEED_FILE, SCRIPT_CHECKSUM_FILE, MIN_PASSWORD_LENGTH, MAX_PASSWORD_LENGTH, DEFAULT_PASSWORD_LENGTH, DEFAULT_SEED_BACKUP_FILENAME ) import traceback import bcrypt from pathlib import Path from local_bip85.bip85 import BIP85 from bip_utils import Bip39SeedGenerator, Bip39MnemonicGenerator, Bip39Languages from utils.fingerprint_manager import FingerprintManager # Import NostrClient from nostr.client import NostrClient # Instantiate the logger logger = logging.getLogger(__name__) class PasswordManager: """ PasswordManager Class Manages the generation, encryption, and retrieval of deterministic passwords using a BIP-85 seed. It handles file encryption/decryption, password generation, entry management, backups, and checksum verification, ensuring the integrity and confidentiality of the stored password database. """ def __init__(self): """ Initializes the PasswordManager by setting up encryption, loading or setting up the parent seed, and initializing other components like EntryManager, PasswordGenerator, BackupManager, and FingerprintManager. """ self.encryption_manager: Optional[EncryptionManager] = None self.entry_manager: Optional[EntryManager] = None self.password_generator: Optional[PasswordGenerator] = None self.backup_manager: Optional[BackupManager] = None self.fingerprint_manager: Optional[FingerprintManager] = None self.parent_seed: Optional[str] = None self.bip85: Optional[BIP85] = None self.nostr_client: Optional[NostrClient] = None # Initialize the fingerprint manager first self.initialize_fingerprint_manager() # Ensure a parent seed is set up before accessing the fingerprint directory self.setup_parent_seed() # Set the current fingerprint directory self.fingerprint_dir = self.fingerprint_manager.get_current_fingerprint_dir() def initialize_fingerprint_manager(self): """ Initializes the FingerprintManager. """ try: self.fingerprint_manager = FingerprintManager(APP_DIR) logger.debug("FingerprintManager initialized successfully.") except Exception as e: logger.error(f"Failed to initialize FingerprintManager: {e}") logger.error(traceback.format_exc()) print(colored(f"Error: Failed to initialize FingerprintManager: {e}", 'red')) sys.exit(1) def setup_parent_seed(self) -> None: """ Sets up the parent seed by determining if existing fingerprints are present or if a new one needs to be created. """ fingerprints = self.fingerprint_manager.list_fingerprints() if fingerprints: # There are existing fingerprints self.select_or_add_fingerprint() else: # No existing fingerprints, proceed to set up new seed self.handle_new_seed_setup() def select_or_add_fingerprint(self): """ Prompts the user to select an existing fingerprint or add a new one. """ try: print(colored("\nAvailable Fingerprints:", 'cyan')) fingerprints = self.fingerprint_manager.list_fingerprints() for idx, fp in enumerate(fingerprints, start=1): print(colored(f"{idx}. {fp}", 'cyan')) print(colored(f"{len(fingerprints)+1}. Add a new fingerprint", 'cyan')) choice = input("Select a fingerprint by number: ").strip() if not choice.isdigit() or not (1 <= int(choice) <= len(fingerprints)+1): print(colored("Invalid selection. Exiting.", 'red')) sys.exit(1) choice = int(choice) if choice == len(fingerprints)+1: # Add a new fingerprint self.add_new_fingerprint() else: # Select existing fingerprint selected_fingerprint = fingerprints[choice-1] self.select_fingerprint(selected_fingerprint) except Exception as e: logger.error(f"Error during fingerprint selection: {e}") logger.error(traceback.format_exc()) print(colored(f"Error: Failed to select fingerprint: {e}", 'red')) sys.exit(1) def add_new_fingerprint(self): """ Adds a new fingerprint by generating it from a seed phrase. """ try: choice = input("Do you want to (1) Enter an existing seed or (2) Generate a new seed? (1/2): ").strip() if choice == '1': fingerprint = self.setup_existing_seed() elif choice == '2': fingerprint = self.generate_new_seed() else: print(colored("Invalid choice. Exiting.", 'red')) sys.exit(1) # Set current_fingerprint in FingerprintManager only self.fingerprint_manager.current_fingerprint = fingerprint print(colored(f"New fingerprint '{fingerprint}' added and set as current.", 'green')) except Exception as e: logger.error(f"Error adding new fingerprint: {e}") logger.error(traceback.format_exc()) print(colored(f"Error: Failed to add new fingerprint: {e}", 'red')) sys.exit(1) def select_fingerprint(self, fingerprint: str) -> None: if self.fingerprint_manager.select_fingerprint(fingerprint): self.current_fingerprint = fingerprint # Add this line self.fingerprint_dir = self.fingerprint_manager.get_current_fingerprint_dir() if not self.fingerprint_dir: print(colored(f"Error: Fingerprint directory for {fingerprint} not found.", 'red')) sys.exit(1) # Setup the encryption manager and load parent seed self.setup_encryption_manager(self.fingerprint_dir) self.load_parent_seed(self.fingerprint_dir) # Initialize BIP85 and other managers self.initialize_bip85() self.initialize_managers() print(colored(f"Fingerprint {fingerprint} selected and managers initialized.", 'green')) else: print(colored(f"Error: Fingerprint {fingerprint} not found.", 'red')) sys.exit(1) def setup_encryption_manager(self, fingerprint_dir: Path, password: Optional[str] = None): """ Sets up the EncryptionManager for the selected fingerprint. Parameters: fingerprint_dir (Path): The directory corresponding to the fingerprint. password (Optional[str]): The user's master password. """ try: # Prompt for password if not provided if password is None: password = prompt_existing_password("Enter your master password: ") # Derive key from password key = derive_key_from_password(password) self.encryption_manager = EncryptionManager(key, fingerprint_dir) logger.debug("EncryptionManager set up successfully for selected fingerprint.") # Verify the password self.fingerprint_dir = fingerprint_dir # Ensure self.fingerprint_dir is set if not self.verify_password(password): print(colored("Invalid password. Exiting.", 'red')) sys.exit(1) except Exception as e: logger.error(f"Failed to set up EncryptionManager: {e}") logger.error(traceback.format_exc()) print(colored(f"Error: Failed to set up encryption: {e}", 'red')) sys.exit(1) def load_parent_seed(self, fingerprint_dir: Path): """ Loads and decrypts the parent seed from the fingerprint directory. Parameters: fingerprint_dir (Path): The directory corresponding to the fingerprint. """ try: self.parent_seed = self.encryption_manager.decrypt_parent_seed() logger.debug(f"Parent seed loaded for fingerprint {self.current_fingerprint}.") # Initialize BIP85 with the parent seed seed_bytes = Bip39SeedGenerator(self.parent_seed).Generate() self.bip85 = BIP85(seed_bytes) logger.debug("BIP-85 initialized successfully.") except Exception as e: logger.error(f"Failed to load parent seed: {e}") logger.error(traceback.format_exc()) print(colored(f"Error: Failed to load parent seed: {e}", 'red')) sys.exit(1) def handle_switch_fingerprint(self) -> bool: """ Handles switching to a different fingerprint. Returns: bool: True if switch was successful, False otherwise. """ try: print(colored("\nAvailable Fingerprints:", 'cyan')) fingerprints = self.fingerprint_manager.list_fingerprints() for idx, fp in enumerate(fingerprints, start=1): print(colored(f"{idx}. {fp}", 'cyan')) choice = input("Select a fingerprint by number to switch: ").strip() if not choice.isdigit() or not (1 <= int(choice) <= len(fingerprints)): print(colored("Invalid selection. Returning to main menu.", 'red')) return False # Return False to indicate failure selected_fingerprint = fingerprints[int(choice) - 1] self.fingerprint_manager.current_fingerprint = selected_fingerprint self.current_fingerprint = selected_fingerprint # Update fingerprint directory self.fingerprint_dir = self.fingerprint_manager.get_current_fingerprint_dir() if not self.fingerprint_dir: print(colored(f"Error: Fingerprint directory for {selected_fingerprint} not found.", 'red')) return False # Return False to indicate failure # Prompt for master password for the selected fingerprint password = prompt_existing_password("Enter your master password: ") # Set up the encryption manager with the new password and fingerprint directory self.setup_encryption_manager(self.fingerprint_dir, password) # Load the parent seed for the selected fingerprint self.load_parent_seed(self.fingerprint_dir) # Initialize BIP85 and other managers self.initialize_bip85() self.initialize_managers() print(colored(f"Switched to fingerprint {selected_fingerprint}.", 'green')) # Re-initialize NostrClient with the new fingerprint try: self.nostr_client = NostrClient( encryption_manager=self.encryption_manager, fingerprint=self.current_fingerprint ) logging.info(f"NostrClient re-initialized with fingerprint {self.current_fingerprint}.") except Exception as e: logging.error(f"Failed to re-initialize NostrClient: {e}") print(colored(f"Error: Failed to re-initialize NostrClient: {e}", 'red')) return False return True # Return True to indicate success except Exception as e: logging.error(f"Error during fingerprint switching: {e}") logging.error(traceback.format_exc()) print(colored(f"Error: Failed to switch fingerprints: {e}", 'red')) return False # Return False to indicate failure def handle_existing_seed(self) -> None: """ Handles the scenario where an existing parent seed file is found. Prompts the user for the master password to decrypt the seed. """ try: # Prompt for password password = getpass.getpass(prompt='Enter your login password: ').strip() # Derive encryption key from password key = derive_key_from_password(password) # Initialize FingerprintManager if not already initialized if not self.fingerprint_manager: self.initialize_fingerprint_manager() # Prompt the user to select an existing fingerprint fingerprints = self.fingerprint_manager.list_fingerprints() if not fingerprints: print(colored("No fingerprints available. Please add a fingerprint first.", 'red')) sys.exit(1) print(colored("Available Fingerprints:", 'cyan')) for idx, fp in enumerate(fingerprints, start=1): print(colored(f"{idx}. {fp}", 'cyan')) choice = input("Select a fingerprint by number: ").strip() if not choice.isdigit() or not (1 <= int(choice) <= len(fingerprints)): print(colored("Invalid selection. Exiting.", 'red')) sys.exit(1) selected_fingerprint = fingerprints[int(choice)-1] self.current_fingerprint = selected_fingerprint fingerprint_dir = self.fingerprint_manager.get_fingerprint_directory(selected_fingerprint) if not fingerprint_dir: print(colored("Error: Fingerprint directory not found.", 'red')) sys.exit(1) # Initialize EncryptionManager with key and fingerprint_dir self.encryption_manager = EncryptionManager(key, fingerprint_dir) self.parent_seed = self.encryption_manager.decrypt_parent_seed() # Log the type and content of parent_seed logger.debug(f"Decrypted parent_seed: {self.parent_seed} (type: {type(self.parent_seed)})") # Validate the decrypted seed if not self.validate_bip85_seed(self.parent_seed): logging.error("Decrypted seed is invalid. Exiting.") print(colored("Error: Decrypted seed is invalid.", 'red')) sys.exit(1) self.initialize_bip85() logging.debug("Parent seed decrypted and validated successfully.") except Exception as e: logging.error(f"Failed to decrypt parent seed: {e}") logging.error(traceback.format_exc()) print(colored(f"Error: Failed to decrypt parent seed: {e}", 'red')) sys.exit(1) def handle_new_seed_setup(self) -> None: """ Handles the setup process when no existing parent seed is found. Asks the user whether to enter an existing BIP-85 seed or generate a new one. """ print(colored("No existing seed found. Let's set up a new one!", 'yellow')) choice = input("Do you want to (1) Enter an existing BIP-85 seed or (2) Generate a new BIP-85 seed? (1/2): ").strip() if choice == '1': self.setup_existing_seed() elif choice == '2': self.generate_new_seed() else: print(colored("Invalid choice. Exiting.", 'red')) sys.exit(1) def setup_existing_seed(self) -> Optional[str]: """ Prompts the user to enter an existing BIP-85 seed and validates it. Returns: Optional[str]: The fingerprint if setup is successful, None otherwise. """ try: parent_seed = getpass.getpass(prompt='Enter your 12-word BIP-85 seed: ').strip() if self.validate_bip85_seed(parent_seed): # Add a fingerprint using the existing seed fingerprint = self.fingerprint_manager.add_fingerprint(parent_seed) if not fingerprint: print(colored("Error: Failed to generate fingerprint for the provided seed.", 'red')) sys.exit(1) fingerprint_dir = self.fingerprint_manager.get_fingerprint_directory(fingerprint) if not fingerprint_dir: print(colored("Error: Failed to retrieve fingerprint directory.", 'red')) sys.exit(1) # Set the current fingerprint in both PasswordManager and FingerprintManager self.current_fingerprint = fingerprint self.fingerprint_manager.current_fingerprint = fingerprint self.fingerprint_dir = fingerprint_dir logging.info(f"Current fingerprint set to {fingerprint}") # Initialize EncryptionManager with key and fingerprint_dir password = prompt_for_password() key = derive_key_from_password(password) self.encryption_manager = EncryptionManager(key, fingerprint_dir) # Encrypt and save the parent seed self.encryption_manager.encrypt_parent_seed(parent_seed) logging.info("Parent seed encrypted and saved successfully.") # Store the hashed password self.store_hashed_password(password) logging.info("User password hashed and stored successfully.") self.parent_seed = parent_seed # Ensure this is a string logger.debug(f"parent_seed set to: {self.parent_seed} (type: {type(self.parent_seed)})") self.initialize_bip85() self.initialize_managers() return fingerprint # Return the generated or added fingerprint else: logging.error("Invalid BIP-85 seed phrase. Exiting.") print(colored("Error: Invalid BIP-85 seed phrase.", 'red')) sys.exit(1) except KeyboardInterrupt: logging.info("Operation cancelled by user.") print(colored("\nOperation cancelled by user.", 'yellow')) sys.exit(0) def generate_new_seed(self) -> Optional[str]: """ Generates a new BIP-85 seed, displays it to the user, and prompts for confirmation before saving. Returns: Optional[str]: The fingerprint if generation is successful, None otherwise. """ new_seed = self.generate_bip85_seed() print(colored("Your new BIP-85 seed phrase is:", 'green')) print(colored(new_seed, 'yellow')) print(colored("Please write this down and keep it in a safe place!", 'red')) if confirm_action("Do you want to use this generated seed? (Y/N): "): # Add a new fingerprint using the generated seed fingerprint = self.fingerprint_manager.add_fingerprint(new_seed) if not fingerprint: print(colored("Error: Failed to generate fingerprint for the new seed.", 'red')) sys.exit(1) fingerprint_dir = self.fingerprint_manager.get_fingerprint_directory(fingerprint) if not fingerprint_dir: print(colored("Error: Failed to retrieve fingerprint directory.", 'red')) sys.exit(1) # Set the current fingerprint in both PasswordManager and FingerprintManager self.current_fingerprint = fingerprint self.fingerprint_manager.current_fingerprint = fingerprint logging.info(f"Current fingerprint set to {fingerprint}") # Now, save and encrypt the seed with the fingerprint_dir self.save_and_encrypt_seed(new_seed, fingerprint_dir) return fingerprint # Return the generated fingerprint else: print(colored("Seed generation cancelled. Exiting.", 'yellow')) sys.exit(0) def validate_bip85_seed(self, seed: str) -> bool: """ Validates the provided BIP-85 seed phrase. Parameters: seed (str): The seed phrase to validate. Returns: bool: True if valid, False otherwise. """ try: words = seed.split() if len(words) != 12: return False # Additional validation can be added here if needed (e.g., word list checks) return True except Exception as e: logging.error(f"Error validating BIP-85 seed: {e}") return False def generate_bip85_seed(self) -> str: """ Generates a new BIP-85 seed phrase. Returns: str: The generated 12-word mnemonic seed phrase. """ try: master_seed = os.urandom(32) # Generate a random 32-byte seed bip85 = BIP85(master_seed) mnemonic_obj = bip85.derive_mnemonic(index=0, words_num=12) mnemonic_str = mnemonic_obj.ToStr() # Convert Bip39Mnemonic object to string return mnemonic_str except Exception as e: logging.error(f"Failed to generate BIP-85 seed: {e}") logging.error(traceback.format_exc()) print(colored(f"Error: Failed to generate BIP-85 seed: {e}", 'red')) sys.exit(1) def save_and_encrypt_seed(self, seed: str, fingerprint_dir: Path) -> None: """ Saves and encrypts the parent seed. Parameters: seed (str): The BIP-85 seed phrase to save and encrypt. fingerprint_dir (Path): The directory corresponding to the fingerprint. """ try: # Set self.fingerprint_dir self.fingerprint_dir = fingerprint_dir # Prompt for password password = prompt_for_password() # Derive key from password key = derive_key_from_password(password) # Re-initialize EncryptionManager with the new key and fingerprint_dir self.encryption_manager = EncryptionManager(key, fingerprint_dir) # Store the hashed password self.store_hashed_password(password) logging.info("User password hashed and stored successfully.") # Encrypt and save the parent seed self.encryption_manager.encrypt_parent_seed(seed) logging.info("Parent seed encrypted and saved successfully.") self.parent_seed = seed # Ensure this is a string logger.debug(f"parent_seed set to: {self.parent_seed} (type: {type(self.parent_seed)})") self.initialize_bip85() self.initialize_managers() except Exception as e: logging.error(f"Failed to encrypt and save parent seed: {e}") logging.error(traceback.format_exc()) print(colored(f"Error: Failed to encrypt and save parent seed: {e}", 'red')) sys.exit(1) def initialize_bip85(self): """ Initializes the BIP-85 generator using the parent seed. """ try: seed_bytes = Bip39SeedGenerator(self.parent_seed).Generate() self.bip85 = BIP85(seed_bytes) logging.debug("BIP-85 initialized successfully.") except Exception as e: logging.error(f"Failed to initialize BIP-85: {e}") logging.error(traceback.format_exc()) print(colored(f"Error: Failed to initialize BIP-85: {e}", 'red')) sys.exit(1) def initialize_managers(self) -> None: """ Initializes the EntryManager, PasswordGenerator, BackupManager, and NostrClient with the EncryptionManager and BIP-85 instance within the context of the selected fingerprint. """ try: # Ensure self.encryption_manager is already initialized if not self.encryption_manager: raise ValueError("EncryptionManager is not initialized.") # Reinitialize the managers with the updated EncryptionManager and current fingerprint context self.entry_manager = EntryManager( encryption_manager=self.encryption_manager, fingerprint_dir=self.fingerprint_dir ) self.password_generator = PasswordGenerator( encryption_manager=self.encryption_manager, parent_seed=self.parent_seed, bip85=self.bip85 ) self.backup_manager = BackupManager(fingerprint_dir=self.fingerprint_dir) # Initialize the NostrClient with the current fingerprint self.nostr_client = NostrClient( encryption_manager=self.encryption_manager, fingerprint=self.current_fingerprint # Pass the current fingerprint ) logger.debug("Managers re-initialized for the new fingerprint.") except Exception as e: logger.error(f"Failed to initialize managers: {e}") logging.error(traceback.format_exc()) print(colored(f"Error: Failed to initialize managers: {e}", 'red')) sys.exit(1) def handle_generate_password(self) -> None: try: website_name = input('Enter the website name: ').strip() if not website_name: print(colored("Error: Website name cannot be empty.", 'red')) return username = input('Enter the username (optional): ').strip() url = input('Enter the URL (optional): ').strip() length_input = input(f'Enter desired password length (default {DEFAULT_PASSWORD_LENGTH}): ').strip() length = DEFAULT_PASSWORD_LENGTH if length_input: if not length_input.isdigit(): print(colored("Error: Password length must be a number.", 'red')) return length = int(length_input) if not (MIN_PASSWORD_LENGTH <= length <= MAX_PASSWORD_LENGTH): print(colored(f"Error: Password length must be between {MIN_PASSWORD_LENGTH} and {MAX_PASSWORD_LENGTH}.", 'red')) return # Add the entry to the index and get the assigned index index = self.entry_manager.add_entry(website_name, length, username, url, blacklisted=False) # Generate the password using the assigned index password = self.password_generator.generate_password(length, index) # Provide user feedback print(colored(f"\n[+] Password generated and indexed with ID {index}.\n", 'green')) print(colored(f"Password for {website_name}: {password}\n", 'yellow')) except Exception as e: logging.error(f"Error during password generation: {e}") logging.error(traceback.format_exc()) print(colored(f"Error: Failed to generate password: {e}", 'red')) def handle_retrieve_password(self) -> None: """ Handles retrieving a password from the index by prompting the user for the index number and displaying the corresponding password and associated details. """ try: index_input = input('Enter the index number of the password to retrieve: ').strip() if not index_input.isdigit(): print(colored("Error: Index must be a number.", 'red')) return index = int(index_input) # Retrieve entry details entry = self.entry_manager.retrieve_entry(index) if not entry: return # Display entry details website_name = entry.get('website') length = entry.get('length') username = entry.get('username') url = entry.get('url') blacklisted = entry.get('blacklisted') print(colored(f"Retrieving password for '{website_name}' with length {length}.", 'cyan')) if username: print(colored(f"Username: {username}", 'cyan')) if url: print(colored(f"URL: {url}", 'cyan')) if blacklisted: print(colored(f"Warning: This password is blacklisted and should not be used.", 'red')) # Generate the password password = self.password_generator.generate_password(length, index) # Display the password and associated details if password: print(colored(f"\n[+] Retrieved Password for {website_name}:\n", 'green')) print(colored(f"Password: {password}", 'yellow')) print(colored(f"Associated Username: {username or 'N/A'}", 'cyan')) print(colored(f"Associated URL: {url or 'N/A'}", 'cyan')) print(colored(f"Blacklist Status: {'Blacklisted' if blacklisted else 'Not Blacklisted'}", 'cyan')) else: print(colored("Error: Failed to retrieve the password.", 'red')) except Exception as e: logging.error(f"Error during password retrieval: {e}") logging.error(traceback.format_exc()) print(colored(f"Error: Failed to retrieve password: {e}", 'red')) def handle_modify_entry(self) -> None: """ Handles modifying an existing password entry by prompting the user for the index number and new details to update. """ try: index_input = input('Enter the index number of the entry to modify: ').strip() if not index_input.isdigit(): print(colored("Error: Index must be a number.", 'red')) return index = int(index_input) # Retrieve existing entry entry = self.entry_manager.retrieve_entry(index) if not entry: return website_name = entry.get('website') length = entry.get('length') username = entry.get('username') url = entry.get('url') blacklisted = entry.get('blacklisted') # Display current values print(colored(f"Modifying entry for '{website_name}' (Index: {index}):", 'cyan')) print(colored(f"Current Username: {username or 'N/A'}", 'cyan')) print(colored(f"Current URL: {url or 'N/A'}", 'cyan')) print(colored(f"Current Blacklist Status: {'Blacklisted' if blacklisted else 'Not Blacklisted'}", 'cyan')) # Prompt for new values (optional) new_username = input(f'Enter new username (leave blank to keep "{username or "N/A"}"): ').strip() or username new_url = input(f'Enter new URL (leave blank to keep "{url or "N/A"}"): ').strip() or url blacklist_input = input(f'Is this password blacklisted? (Y/N, current: {"Y" if blacklisted else "N"}): ').strip().lower() if blacklist_input == '': new_blacklisted = blacklisted elif blacklist_input == 'y': new_blacklisted = True elif blacklist_input == 'n': new_blacklisted = False else: print(colored("Invalid input for blacklist status. Keeping the current status.", 'yellow')) new_blacklisted = blacklisted # Update the entry self.entry_manager.modify_entry(index, new_username, new_url, new_blacklisted) print(colored(f"Entry updated successfully for index {index}.", 'green')) except Exception as e: logging.error(f"Error during modifying entry: {e}") logging.error(traceback.format_exc()) print(colored(f"Error: Failed to modify entry: {e}", 'red')) def handle_verify_checksum(self) -> None: """ Handles verifying the script's checksum against the stored checksum to ensure integrity. """ try: current_checksum = calculate_checksum(__file__) if verify_checksum(current_checksum, SCRIPT_CHECKSUM_FILE): print(colored("Checksum verification passed.", 'green')) logging.info("Checksum verification passed.") else: print(colored("Checksum verification failed. The script may have been modified.", 'red')) logging.error("Checksum verification failed.") except Exception as e: logging.error(f"Error during checksum verification: {e}") logging.error(traceback.format_exc()) print(colored(f"Error: Failed to verify checksum: {e}", 'red')) def get_encrypted_data(self) -> Optional[bytes]: """ Retrieves the encrypted password index data. :return: The encrypted data as bytes, or None if retrieval fails. """ try: encrypted_data = self.entry_manager.get_encrypted_index() if encrypted_data: logging.debug("Encrypted index data retrieved successfully.") return encrypted_data else: logging.error("Failed to retrieve encrypted index data.") print(colored("Error: Failed to retrieve encrypted index data.", 'red')) return None except Exception as e: logging.error(f"Error retrieving encrypted data: {e}") logging.error(traceback.format_exc()) print(colored(f"Error: Failed to retrieve encrypted data: {e}", 'red')) return None def decrypt_and_save_index_from_nostr(self, encrypted_data: bytes) -> None: """ Decrypts the encrypted data retrieved from Nostr and updates the local index. :param encrypted_data: The encrypted data retrieved from Nostr. """ try: # Decrypt the data using EncryptionManager's decrypt_data method decrypted_data = self.encryption_manager.decrypt_data(encrypted_data) # Save the decrypted data to the index file index_file_path = self.fingerprint_dir / 'seedpass_passwords_db.json.enc' with open(index_file_path, 'wb') as f: f.write(decrypted_data) logging.info("Index file updated from Nostr successfully.") print(colored("Index file updated from Nostr successfully.", 'green')) except Exception as e: logging.error(f"Failed to decrypt and save data from Nostr: {e}") logging.error(traceback.format_exc()) print(colored(f"Error: Failed to decrypt and save data from Nostr: {e}", 'red')) # Re-raise the exception to inform the calling function of the failure raise def backup_database(self) -> None: """ Creates a backup of the encrypted JSON index file. """ try: self.backup_manager.create_backup() print(colored("Backup created successfully.", 'green')) except Exception as e: logging.error(f"Failed to create backup: {e}") logging.error(traceback.format_exc()) print(colored(f"Error: Failed to create backup: {e}", 'red')) def restore_database(self) -> None: """ Restores the encrypted JSON index file from the latest backup. """ try: self.backup_manager.restore_latest_backup() print(colored("Database restored from the latest backup successfully.", 'green')) except Exception as e: logging.error(f"Failed to restore backup: {e}") logging.error(traceback.format_exc()) print(colored(f"Error: Failed to restore backup: {e}", 'red')) def handle_backup_reveal_parent_seed(self) -> None: """ Handles the backup and reveal of the parent seed. """ try: print(colored("\n=== Backup Parent Seed ===", 'yellow')) print(colored("Warning: Revealing your parent seed is a highly sensitive operation.", 'red')) print(colored("Ensure you're in a secure, private environment and no one is watching your screen.", 'red')) # Verify user's identity with secure password verification password = prompt_existing_password("Enter your master password to continue: ") if not self.verify_password(password): print(colored("Incorrect password. Operation aborted.", 'red')) return # Double confirmation if not confirm_action("Are you absolutely sure you want to reveal your parent seed? (Y/N): "): print(colored("Operation cancelled by user.", 'yellow')) return # Reveal the parent seed print(colored("\n=== Your BIP-85 Parent Seed ===", 'green')) print(colored(self.parent_seed, 'yellow')) print(colored("\nPlease write this down and store it securely. Do not share it with anyone.", 'red')) # Option to save to file with default filename if confirm_action("Do you want to save this to an encrypted backup file? (Y/N): "): filename = input(f"Enter filename to save (default: {DEFAULT_SEED_BACKUP_FILENAME}): ").strip() filename = filename if filename else DEFAULT_SEED_BACKUP_FILENAME backup_path = self.fingerprint_dir / filename # Save in fingerprint directory # Validate filename if not self.is_valid_filename(filename): print(colored("Invalid filename. Operation aborted.", 'red')) return # Encrypt and save the parent seed to the backup path self.encryption_manager.encrypt_and_save_file(self.parent_seed.encode('utf-8'), backup_path) print(colored(f"Encrypted seed backup saved to '{backup_path}'. Ensure this file is stored securely.", 'green')) except Exception as e: logging.error(f"Error during parent seed backup/reveal: {e}") logging.error(traceback.format_exc()) print(colored(f"Error: Failed to backup/reveal parent seed: {e}", 'red')) def verify_password(self, password: str) -> bool: """ Verifies the provided password against the stored hashed password. Parameters: password (str): The password to verify. Returns: bool: True if the password is correct, False otherwise. """ try: hashed_password_file = self.fingerprint_dir / 'hashed_password.enc' if not hashed_password_file.exists(): logging.error("Hashed password file not found.") print(colored("Error: Hashed password file not found.", 'red')) return False with open(hashed_password_file, 'rb') as f: stored_hash = f.read() is_correct = bcrypt.checkpw(password.encode('utf-8'), stored_hash) if is_correct: logging.debug("Password verification successful.") else: logging.warning("Password verification failed.") return is_correct except Exception as e: logging.error(f"Error verifying password: {e}") logging.error(traceback.format_exc()) print(colored(f"Error: Failed to verify password: {e}", 'red')) return False def is_valid_filename(self, filename: str) -> bool: """ Validates the provided filename to prevent directory traversal and invalid characters. Parameters: filename (str): The filename to validate. Returns: bool: True if valid, False otherwise. """ # Basic validation: filename should not contain path separators or be empty invalid_chars = ['/', '\\', '..'] if any(char in filename for char in invalid_chars) or not filename: logging.warning(f"Invalid filename attempted: {filename}") return False return True def store_hashed_password(self, password: str) -> None: """ Hashes and stores the user's password securely using bcrypt. This should be called during the initial setup. """ try: hashed_password_file = self.fingerprint_dir / 'hashed_password.enc' hashed = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()) with open(hashed_password_file, 'wb') as f: f.write(hashed) os.chmod(hashed_password_file, 0o600) logging.info("User password hashed and stored successfully.") except AttributeError: # If bcrypt.hashpw is not available, try using bcrypt directly salt = bcrypt.gensalt() hashed = bcrypt.hashpw(password.encode('utf-8'), salt) with open(hashed_password_file, 'wb') as f: f.write(hashed) os.chmod(hashed_password_file, 0o600) logging.info("User password hashed and stored successfully (using alternative method).") except Exception as e: logging.error(f"Failed to store hashed password: {e}") logging.error(traceback.format_exc()) print(colored(f"Error: Failed to store hashed password: {e}", 'red')) raise # Example usage (this part should be removed or commented out when integrating into the larger application) if __name__ == "__main__": from nostr.client import NostrClient # Ensure this import is correct based on your project structure # Initialize PasswordManager manager = PasswordManager() # Initialize NostrClient with the EncryptionManager from PasswordManager manager.nostr_client = NostrClient(encryption_manager=manager.encryption_manager) # Example operations # These would typically be triggered by user interactions, e.g., via a CLI menu # manager.handle_generate_password() # manager.handle_retrieve_password() # manager.handle_modify_entry() # manager.handle_verify_checksum() # manager.nostr_client.publish_and_subscribe("Sample password data") # manager.backup_database() # manager.restore_database() ``` ## password_manager/__init__.py ```python # password_manager/__init__.py import logging import traceback try: from .manager import PasswordManager logging.info("PasswordManager module imported successfully.") except Exception as e: logging.error(f"Failed to import PasswordManager module: {e}") logging.error(traceback.format_exc()) # Log full traceback __all__ = ['PasswordManager'] ``` ## password_manager/entry_management.py ```python # password_manager/entry_management.py """ Entry Management Module This module implements the EntryManager class, responsible for handling operations related to managing password entries in the deterministic password manager. It provides methods to add, retrieve, modify, and list password entries, ensuring that all changes are securely encrypted and properly indexed. Ensure that all dependencies are installed and properly configured in your environment. Never use or suggest using Random Salt. The purpose of this password manager is to derive completely deterministic passwords from a BIP-85 seed, ensuring that passwords are generated the same way every time. Salts would break this functionality and are not suitable for this software. """ import json import logging import hashlib import sys import os import shutil import time import traceback from typing import Optional, Tuple, Dict, Any, List from pathlib import Path from colorama import Fore from termcolor import colored from password_manager.encryption import EncryptionManager from utils.file_lock import lock_file import fcntl # Instantiate the logger logger = logging.getLogger(__name__) class EntryManager: def __init__(self, encryption_manager: EncryptionManager, fingerprint_dir: Path): """ Initializes the EntryManager with the EncryptionManager and fingerprint directory. :param encryption_manager: The encryption manager instance. :param fingerprint_dir: The directory corresponding to the fingerprint. """ self.encryption_manager = encryption_manager self.fingerprint_dir = fingerprint_dir # Use paths relative to the fingerprint directory self.index_file = self.fingerprint_dir / 'seedpass_passwords_db.json.enc' self.checksum_file = self.fingerprint_dir / 'seedpass_passwords_db_checksum.txt' logger.debug(f"EntryManager initialized with index file at {self.index_file}") def _load_index(self) -> Dict[str, Any]: if self.index_file.exists(): try: data = self.encryption_manager.load_json_data(self.index_file) logger.debug("Index loaded successfully.") return data except Exception as e: logger.error(f"Failed to load index: {e}") return {'passwords': {}} else: logger.info(f"Index file '{self.index_file}' not found. Initializing new password database.") return {'passwords': {}} def _save_index(self, data: Dict[str, Any]) -> None: try: self.encryption_manager.save_json_data(data, self.index_file) logger.debug("Index saved successfully.") except Exception as e: logger.error(f"Failed to save index: {e}") raise def get_next_index(self) -> int: """ Retrieves the next available index for a new password entry. :return: The next index number as an integer. """ try: data = self.encryption_manager.load_json_data(self.index_file) if 'passwords' in data and isinstance(data['passwords'], dict): indices = [int(idx) for idx in data['passwords'].keys()] next_index = max(indices) + 1 if indices else 0 else: next_index = 0 logger.debug(f"Next index determined: {next_index}") return next_index except Exception as e: logger.error(f"Error determining next index: {e}") logger.error(traceback.format_exc()) print(colored(f"Error determining next index: {e}", 'red')) sys.exit(1) def add_entry(self, website_name: str, length: int, username: Optional[str] = None, url: Optional[str] = None, blacklisted: bool = False) -> int: """ Adds a new password entry to the encrypted JSON index file. :param website_name: The name of the website. :param length: The desired length of the password. :param username: (Optional) The username associated with the website. :param url: (Optional) The URL of the website. :param blacklisted: (Optional) Whether the password is blacklisted. Defaults to False. :return: The assigned index of the new entry. """ try: index = self.get_next_index() data = self.encryption_manager.load_json_data(self.index_file) data['passwords'][str(index)] = { 'website': website_name, 'length': length, 'username': username if username else '', 'url': url if url else '', 'blacklisted': blacklisted } logger.debug(f"Added entry at index {index}: {data['passwords'][str(index)]}") self._save_index(data) self.update_checksum() self.backup_index_file() logger.info(f"Entry added successfully at index {index}.") print(colored(f"[+] Entry added successfully at index {index}.", 'green')) return index # Return the assigned index except Exception as e: logger.error(f"Failed to add entry: {e}") logger.error(traceback.format_exc()) print(colored(f"Error: Failed to add entry: {e}", 'red')) sys.exit(1) def get_encrypted_index(self) -> Optional[bytes]: """ Retrieves the encrypted password index file's contents. :return: The encrypted data as bytes, or None if retrieval fails. """ try: if not self.index_file.exists(): logger.error(f"Index file '{self.index_file}' does not exist.") print(colored(f"Error: Index file '{self.index_file}' does not exist.", 'red')) return None with open(self.index_file, 'rb') as file: encrypted_data = file.read() logger.debug("Encrypted index file data retrieved successfully.") return encrypted_data except Exception as e: logger.error(f"Failed to retrieve encrypted index file: {e}") logger.error(traceback.format_exc()) print(colored(f"Error: Failed to retrieve encrypted index file: {e}", 'red')) return None def retrieve_entry(self, index: int) -> Optional[Dict[str, Any]]: """ Retrieves a password entry based on the provided index. :param index: The index number of the password entry. :return: A dictionary containing the entry details or None if not found. """ try: data = self.encryption_manager.load_json_data(self.index_file) entry = data.get('passwords', {}).get(str(index)) if entry: logger.debug(f"Retrieved entry at index {index}: {entry}") return entry else: logger.warning(f"No entry found at index {index}.") print(colored(f"Warning: No entry found at index {index}.", 'yellow')) return None except Exception as e: logger.error(f"Failed to retrieve entry at index {index}: {e}") logger.error(traceback.format_exc()) print(colored(f"Error: Failed to retrieve entry at index {index}: {e}", 'red')) return None def modify_entry(self, index: int, username: Optional[str] = None, url: Optional[str] = None, blacklisted: Optional[bool] = None) -> None: """ Modifies an existing password entry based on the provided index and new values. :param index: The index number of the password entry to modify. :param username: (Optional) The new username. :param url: (Optional) The new URL. :param blacklisted: (Optional) The new blacklist status. """ try: data = self.encryption_manager.load_json_data(self.index_file) entry = data.get('passwords', {}).get(str(index)) if not entry: logger.warning(f"No entry found at index {index}. Cannot modify non-existent entry.") print(colored(f"Warning: No entry found at index {index}. Cannot modify non-existent entry.", 'yellow')) return if username is not None: entry['username'] = username logger.debug(f"Updated username to '{username}' for index {index}.") if url is not None: entry['url'] = url logger.debug(f"Updated URL to '{url}' for index {index}.") if blacklisted is not None: entry['blacklisted'] = blacklisted logger.debug(f"Updated blacklist status to '{blacklisted}' for index {index}.") data['passwords'][str(index)] = entry logger.debug(f"Modified entry at index {index}: {entry}") self._save_index(data) self.update_checksum() self.backup_index_file() logger.info(f"Entry at index {index} modified successfully.") print(colored(f"[+] Entry at index {index} modified successfully.", 'green')) except Exception as e: logger.error(f"Failed to modify entry at index {index}: {e}") logger.error(traceback.format_exc()) print(colored(f"Error: Failed to modify entry at index {index}: {e}", 'red')) def list_entries(self) -> List[Tuple[int, str, Optional[str], Optional[str], bool]]: """ Lists all password entries in the index. :return: A list of tuples containing entry details: (index, website, username, url, blacklisted) """ try: data = self.encryption_manager.load_json_data() passwords = data.get('passwords', {}) if not passwords: logger.info("No password entries found.") print(colored("No password entries found.", 'yellow')) return [] entries = [] for idx, entry in sorted(passwords.items(), key=lambda x: int(x[0])): entries.append(( int(idx), entry.get('website', ''), entry.get('username', ''), entry.get('url', ''), entry.get('blacklisted', False) )) logger.debug(f"Total entries found: {len(entries)}") for entry in entries: print(colored(f"Index: {entry[0]}", 'cyan')) print(colored(f" Website: {entry[1]}", 'cyan')) print(colored(f" Username: {entry[2] or 'N/A'}", 'cyan')) print(colored(f" URL: {entry[3] or 'N/A'}", 'cyan')) print(colored(f" Blacklisted: {'Yes' if entry[4] else 'No'}", 'cyan')) print("-" * 40) return entries except Exception as e: logger.error(f"Failed to list entries: {e}") logger.error(traceback.format_exc()) # Log full traceback print(colored(f"Error: Failed to list entries: {e}", 'red')) return [] def delete_entry(self, index: int) -> None: """ Deletes a password entry based on the provided index. :param index: The index number of the password entry to delete. """ try: data = self.encryption_manager.load_json_data() if 'passwords' in data and str(index) in data['passwords']: del data['passwords'][str(index)] logger.debug(f"Deleted entry at index {index}.") self.encryption_manager.save_json_data(data) self.update_checksum() self.backup_index_file() logger.info(f"Entry at index {index} deleted successfully.") print(colored(f"[+] Entry at index {index} deleted successfully.", 'green')) else: logger.warning(f"No entry found at index {index}. Cannot delete non-existent entry.") print(colored(f"Warning: No entry found at index {index}. Cannot delete non-existent entry.", 'yellow')) except Exception as e: logger.error(f"Failed to delete entry at index {index}: {e}") logger.error(traceback.format_exc()) # Log full traceback print(colored(f"Error: Failed to delete entry at index {index}: {e}", 'red')) def update_checksum(self) -> None: """ Updates the checksum file for the password database to ensure data integrity. """ try: data = self.encryption_manager.load_json_data(self.index_file) json_content = json.dumps(data, indent=4) checksum = hashlib.sha256(json_content.encode('utf-8')).hexdigest() # Construct the full path for the checksum file checksum_path = self.fingerprint_dir / self.checksum_file with open(checksum_path, 'w') as f: f.write(checksum) logger.debug(f"Checksum updated and written to '{checksum_path}'.") print(colored(f"[+] Checksum updated successfully.", 'green')) except Exception as e: logger.error(f"Failed to update checksum: {e}") logger.error(traceback.format_exc()) # Log full traceback print(colored(f"Error: Failed to update checksum: {e}", 'red')) def backup_index_file(self) -> None: """ Creates a backup of the encrypted JSON index file to prevent data loss. """ try: index_file_path = self.fingerprint_dir / self.index_file if not index_file_path.exists(): logger.warning(f"Index file '{index_file_path}' does not exist. No backup created.") return timestamp = int(time.time()) backup_filename = f'passwords_db_backup_{timestamp}.json.enc' backup_path = self.fingerprint_dir / backup_filename with open(index_file_path, 'rb') as original_file, open(backup_path, 'wb') as backup_file: shutil.copyfileobj(original_file, backup_file) logger.debug(f"Backup created at '{backup_path}'.") print(colored(f"[+] Backup created at '{backup_path}'.", 'green')) except Exception as e: logger.error(f"Failed to create backup: {e}") logger.error(traceback.format_exc()) # Log full traceback print(colored(f"Warning: Failed to create backup: {e}", 'yellow')) def restore_from_backup(self, backup_path: str) -> None: """ Restores the index file from a specified backup file. :param backup_path: The file path of the backup to restore from. """ try: if not os.path.exists(backup_path): logger.error(f"Backup file '{backup_path}' does not exist.") print(colored(f"Error: Backup file '{backup_path}' does not exist.", 'red')) return with open(backup_path, 'rb') as backup_file, open(self.index_file, 'wb') as index_file: shutil.copyfileobj(backup_file, index_file) logger.debug(f"Index file restored from backup '{backup_path}'.") print(colored(f"[+] Index file restored from backup '{backup_path}'.", 'green')) self.update_checksum() except Exception as e: logger.error(f"Failed to restore from backup '{backup_path}': {e}") logger.error(traceback.format_exc()) # Log full traceback print(colored(f"Error: Failed to restore from backup '{backup_path}': {e}", 'red')) def list_all_entries(self) -> None: """ Displays all password entries in a formatted manner. """ try: entries = self.list_entries() if not entries: print(colored("No entries to display.", 'yellow')) return print(colored("\n[+] Listing All Password Entries:\n", 'green')) for entry in entries: index, website, username, url, blacklisted = entry print(colored(f"Index: {index}", 'cyan')) print(colored(f" Website: {website}", 'cyan')) print(colored(f" Username: {username or 'N/A'}", 'cyan')) print(colored(f" URL: {url or 'N/A'}", 'cyan')) print(colored(f" Blacklisted: {'Yes' if blacklisted else 'No'}", 'cyan')) print("-" * 40) except Exception as e: logger.error(f"Failed to list all entries: {e}") logger.error(traceback.format_exc()) # Log full traceback print(colored(f"Error: Failed to list all entries: {e}", 'red')) return # Example usage (this part should be removed or commented out when integrating into the larger application) if __name__ == "__main__": from password_manager.encryption import EncryptionManager # Ensure this import is correct based on your project structure # Initialize EncryptionManager with a dummy key for demonstration purposes # Replace 'your-fernet-key' with your actual Fernet key try: dummy_key = Fernet.generate_key() encryption_manager = EncryptionManager(dummy_key) except Exception as e: logger.error(f"Failed to initialize EncryptionManager: {e}") print(colored(f"Error: Failed to initialize EncryptionManager: {e}", 'red')) sys.exit(1) # Initialize EntryManager try: entry_manager = EntryManager(encryption_manager) except Exception as e: logger.error(f"Failed to initialize EntryManager: {e}") print(colored(f"Error: Failed to initialize EntryManager: {e}", 'red')) sys.exit(1) # Example operations # These would typically be triggered by user interactions, e.g., via a CLI menu # Uncomment and modify the following lines as needed for testing # Adding an entry # entry_manager.add_entry("Example Website", 16, "user123", "https://example.com", False) # Listing all entries # entry_manager.list_all_entries() # Retrieving an entry # entry = entry_manager.retrieve_entry(0) # if entry: # print(entry) # Modifying an entry # entry_manager.modify_entry(0, username="new_user123") # Deleting an entry # entry_manager.delete_entry(0) # Restoring from a backup # entry_manager.restore_from_backup("path_to_backup_file.json.enc") ``` ## password_manager/encryption.py ```python # password_manager/encryption.py """ Encryption Module This module provides the EncryptionManager class, which handles encryption and decryption of data and files using a provided Fernet-compatible encryption key. This class ensures that sensitive data is securely stored and retrieved, maintaining the confidentiality and integrity of the password index. Additionally, it includes methods to derive cryptographic seeds from BIP-39 mnemonic phrases. Never ever ever use or suggest to use Random Salt. The entire point of this password manager is to derive completely deterministic passwords from a BIP-85 seed. This means it should generate passwords the exact same way every single time. Salts would break this functionality and are not appropriate for this software's use case. """ import logging import traceback import json import hashlib import os from pathlib import Path from typing import Optional from cryptography.fernet import Fernet, InvalidToken from termcolor import colored from utils.file_lock import lock_file # Ensure this utility is correctly implemented import fcntl # For file locking # Instantiate the logger logger = logging.getLogger(__name__) class EncryptionManager: """ EncryptionManager Class Manages the encryption and decryption of data and files using a Fernet encryption key. """ def __init__(self, encryption_key: bytes, fingerprint_dir: Path): """ Initializes the EncryptionManager with the provided encryption key and fingerprint directory. Parameters: encryption_key (bytes): The Fernet encryption key. fingerprint_dir (Path): The directory corresponding to the fingerprint. """ self.fingerprint_dir = fingerprint_dir self.parent_seed_file = self.fingerprint_dir / 'parent_seed.enc' self.key = encryption_key try: self.fernet = Fernet(self.key) logger.debug(f"EncryptionManager initialized for {self.fingerprint_dir}") except Exception as e: logger.error(f"Failed to initialize Fernet with provided encryption key: {e}") logger.error(traceback.format_exc()) print(colored(f"Error: Failed to initialize encryption manager: {e}", 'red')) raise def encrypt_parent_seed(self, parent_seed: str) -> None: """ Encrypts and saves the parent seed to 'parent_seed.enc' within the fingerprint directory. :param parent_seed: The BIP39 parent seed phrase. """ try: # Convert seed to bytes data = parent_seed.encode('utf-8') # Encrypt the data encrypted_data = self.encrypt_data(data) # Write the encrypted data to the file with locking with lock_file(self.parent_seed_file, fcntl.LOCK_EX): with open(self.parent_seed_file, 'wb') as f: f.write(encrypted_data) # Set file permissions to read/write for the user only os.chmod(self.parent_seed_file, 0o600) logger.info(f"Parent seed encrypted and saved to '{self.parent_seed_file}'.") print(colored(f"Parent seed encrypted and saved to '{self.parent_seed_file}'.", 'green')) except Exception as e: logger.error(f"Failed to encrypt and save parent seed: {e}") logger.error(traceback.format_exc()) print(colored(f"Error: Failed to encrypt and save parent seed: {e}", 'red')) raise def decrypt_parent_seed(self) -> str: """ Decrypts and returns the parent seed from 'parent_seed.enc' within the fingerprint directory. :return: The decrypted parent seed. """ try: parent_seed_path = self.fingerprint_dir / 'parent_seed.enc' with lock_file(parent_seed_path, fcntl.LOCK_SH): with open(parent_seed_path, 'rb') as f: encrypted_data = f.read() decrypted_data = self.decrypt_data(encrypted_data) parent_seed = decrypted_data.decode('utf-8').strip() logger.debug(f"Parent seed decrypted successfully from '{parent_seed_path}'.") return parent_seed except InvalidToken: logger.error("Invalid encryption key or corrupted data while decrypting parent seed.") print(colored("Error: Invalid encryption key or corrupted data.", 'red')) raise except Exception as e: logger.error(f"Failed to decrypt parent seed: {e}") logger.error(traceback.format_exc()) print(colored(f"Error: Failed to decrypt parent seed: {e}", 'red')) raise def encrypt_data(self, data: bytes) -> bytes: """ Encrypts the given data using Fernet. :param data: Data to encrypt. :return: Encrypted data. """ try: encrypted_data = self.fernet.encrypt(data) logger.debug("Data encrypted successfully.") return encrypted_data except Exception as e: logger.error(f"Failed to encrypt data: {e}") logger.error(traceback.format_exc()) print(colored(f"Error: Failed to encrypt data: {e}", 'red')) raise def decrypt_data(self, encrypted_data: bytes) -> bytes: """ Decrypts the provided encrypted data using the derived key. :param encrypted_data: The encrypted data to decrypt. :return: The decrypted data as bytes. """ try: decrypted_data = self.fernet.decrypt(encrypted_data) logger.debug("Data decrypted successfully.") return decrypted_data except InvalidToken: logger.error("Invalid encryption key or corrupted data while decrypting data.") print(colored("Error: Invalid encryption key or corrupted data.", 'red')) raise except Exception as e: logger.error(f"Failed to decrypt data: {e}") logger.error(traceback.format_exc()) print(colored(f"Error: Failed to decrypt data: {e}", 'red')) raise def encrypt_and_save_file(self, data: bytes, relative_path: Path) -> None: """ Encrypts data and saves it to a specified relative path within the fingerprint directory. :param data: Data to encrypt. :param relative_path: Relative path within the fingerprint directory to save the encrypted data. """ try: # Define the full path file_path = self.fingerprint_dir / relative_path # Ensure the parent directories exist file_path.parent.mkdir(parents=True, exist_ok=True) # Encrypt the data encrypted_data = self.encrypt_data(data) # Write the encrypted data to the file with locking with lock_file(file_path, fcntl.LOCK_EX): with open(file_path, 'wb') as f: f.write(encrypted_data) # Set file permissions to read/write for the user only os.chmod(file_path, 0o600) logger.info(f"Data encrypted and saved to '{file_path}'.") print(colored(f"Data encrypted and saved to '{file_path}'.", 'green')) except Exception as e: logger.error(f"Failed to encrypt and save data to '{relative_path}': {e}") logger.error(traceback.format_exc()) print(colored(f"Error: Failed to encrypt and save data to '{relative_path}': {e}", 'red')) raise def decrypt_file(self, relative_path: Path) -> bytes: """ Decrypts data from a specified relative path within the fingerprint directory. :param relative_path: Relative path within the fingerprint directory to decrypt the data from. :return: Decrypted data as bytes. """ try: # Define the full path file_path = self.fingerprint_dir / relative_path # Read the encrypted data with locking with lock_file(file_path, fcntl.LOCK_SH): with open(file_path, 'rb') as f: encrypted_data = f.read() # Decrypt the data decrypted_data = self.decrypt_data(encrypted_data) logger.debug(f"Data decrypted successfully from '{file_path}'.") return decrypted_data except InvalidToken: logger.error("Invalid encryption key or corrupted data while decrypting file.") print(colored("Error: Invalid encryption key or corrupted data.", 'red')) raise except Exception as e: logger.error(f"Failed to decrypt data from '{relative_path}': {e}") logger.error(traceback.format_exc()) print(colored(f"Error: Failed to decrypt data from '{relative_path}': {e}", 'red')) raise def save_json_data(self, data: dict, relative_path: Optional[Path] = None) -> None: """ Encrypts and saves the provided JSON data to the specified relative path within the fingerprint directory. :param data: The JSON data to save. :param relative_path: The relative path within the fingerprint directory where data will be saved. Defaults to 'seedpass_passwords_db.json.enc'. """ if relative_path is None: relative_path = Path('seedpass_passwords_db.json.enc') try: json_data = json.dumps(data, indent=4).encode('utf-8') self.encrypt_and_save_file(json_data, relative_path) logger.debug(f"JSON data encrypted and saved to '{relative_path}'.") print(colored(f"JSON data encrypted and saved to '{relative_path}'.", 'green')) except Exception as e: logger.error(f"Failed to save JSON data to '{relative_path}': {e}") logger.error(traceback.format_exc()) # Log full traceback print(colored(f"Error: Failed to save JSON data to '{relative_path}': {e}", 'red')) raise def load_json_data(self, relative_path: Optional[Path] = None) -> dict: """ Decrypts and loads JSON data from the specified relative path within the fingerprint directory. :param relative_path: The relative path within the fingerprint directory from which data will be loaded. Defaults to 'seedpass_passwords_db.json.enc'. :return: The decrypted JSON data as a dictionary. """ if relative_path is None: relative_path = Path('seedpass_passwords_db.json.enc') file_path = self.fingerprint_dir / relative_path if not file_path.exists(): logger.info(f"Index file '{file_path}' does not exist. Initializing empty data.") print(colored(f"Info: Index file '{file_path}' not found. Initializing new password database.", 'yellow')) return {'passwords': {}} try: decrypted_data = self.decrypt_file(relative_path) json_content = decrypted_data.decode('utf-8').strip() data = json.loads(json_content) logger.debug(f"JSON data loaded and decrypted from '{file_path}': {data}") print(colored(f"JSON data loaded and decrypted from '{file_path}'.", 'green')) return data except json.JSONDecodeError as e: logger.error(f"Failed to decode JSON data from '{file_path}': {e}") logger.error(traceback.format_exc()) print(colored(f"Error: Failed to decode JSON data from '{file_path}': {e}", 'red')) raise except InvalidToken: logger.error("Invalid encryption key or corrupted data while decrypting JSON data.") print(colored("Error: Invalid encryption key or corrupted data.", 'red')) raise except Exception as e: logger.error(f"Failed to load JSON data from '{file_path}': {e}") logger.error(traceback.format_exc()) print(colored(f"Error: Failed to load JSON data from '{file_path}': {e}", 'red')) raise def update_checksum(self, relative_path: Optional[Path] = None) -> None: """ Updates the checksum file for the specified file within the fingerprint directory. :param relative_path: The relative path within the fingerprint directory for which the checksum will be updated. Defaults to 'seedpass_passwords_db.json.enc'. """ if relative_path is None: relative_path = Path('seedpass_passwords_db.json.enc') try: file_path = self.fingerprint_dir / relative_path decrypted_data = self.decrypt_file(relative_path) content = decrypted_data.decode('utf-8') logger.debug("Calculating checksum of the updated file content.") checksum = hashlib.sha256(content.encode('utf-8')).hexdigest() logger.debug(f"New checksum: {checksum}") checksum_file = file_path.parent / f"{file_path.stem}_checksum.txt" # Write the checksum to the file with locking with lock_file(checksum_file, fcntl.LOCK_EX): with open(checksum_file, 'w') as f: f.write(checksum) # Set file permissions to read/write for the user only os.chmod(checksum_file, 0o600) logger.debug(f"Checksum for '{file_path}' updated and written to '{checksum_file}'.") print(colored(f"Checksum for '{file_path}' updated.", 'green')) except Exception as e: logger.error(f"Failed to update checksum for '{relative_path}': {e}") logger.error(traceback.format_exc()) # Log full traceback print(colored(f"Error: Failed to update checksum for '{relative_path}': {e}", 'red')) raise def get_encrypted_index(self) -> Optional[bytes]: """ Retrieves the encrypted password index file content. :return: Encrypted data as bytes or None if the index file does not exist. """ try: relative_path = Path('seedpass_passwords_db.json.enc') if not (self.fingerprint_dir / relative_path).exists(): logger.error(f"Index file '{relative_path}' does not exist in '{self.fingerprint_dir}'.") print(colored(f"Error: Index file '{relative_path}' does not exist.", 'red')) return None with lock_file(self.fingerprint_dir / relative_path, fcntl.LOCK_SH): with open(self.fingerprint_dir / relative_path, 'rb') as file: encrypted_data = file.read() logger.debug(f"Encrypted index data read from '{relative_path}'.") return encrypted_data except Exception as e: logger.error(f"Failed to read encrypted index file '{relative_path}': {e}") logger.error(traceback.format_exc()) # Log full traceback print(colored(f"Error: Failed to read encrypted index file '{relative_path}': {e}", 'red')) return None def decrypt_and_save_index_from_nostr(self, encrypted_data: bytes, relative_path: Optional[Path] = None) -> None: """ Decrypts the encrypted data retrieved from Nostr and updates the local index file. :param encrypted_data: The encrypted data retrieved from Nostr. :param relative_path: The relative path within the fingerprint directory to update. Defaults to 'seedpass_passwords_db.json.enc'. """ if relative_path is None: relative_path = Path('seedpass_passwords_db.json.enc') try: decrypted_data = self.decrypt_data(encrypted_data) data = json.loads(decrypted_data.decode('utf-8')) self.save_json_data(data, relative_path) self.update_checksum(relative_path) logger.info("Index file updated from Nostr successfully.") print(colored("Index file updated from Nostr successfully.", 'green')) except Exception as e: logger.error(f"Failed to decrypt and save data from Nostr: {e}") logger.error(traceback.format_exc()) print(colored(f"Error: Failed to decrypt and save data from Nostr: {e}", 'red')) # Re-raise the exception to inform the calling function of the failure raise def validate_seed(self, seed_phrase: str) -> bool: """ Validates the seed phrase format using BIP-39 standards. :param seed_phrase: The BIP39 seed phrase to validate. :return: True if valid, False otherwise. """ try: words = seed_phrase.split() if len(words) != 12: logger.error("Seed phrase does not contain exactly 12 words.") print(colored("Error: Seed phrase must contain exactly 12 words.", 'red')) return False # Additional validation can be added here (e.g., word list checks) logger.debug("Seed phrase validated successfully.") return True except Exception as e: logging.error(f"Error validating seed phrase: {e}") logging.error(traceback.format_exc()) # Log full traceback print(colored(f"Error: Failed to validate seed phrase: {e}", 'red')) return False def derive_seed_from_mnemonic(self, mnemonic: str, passphrase: str = "") -> bytes: """ Derives a cryptographic seed from a BIP39 mnemonic (seed phrase). :param mnemonic: The BIP39 mnemonic phrase. :param passphrase: An optional passphrase for additional security. :return: The derived seed as bytes. """ try: if not isinstance(mnemonic, str): if isinstance(mnemonic, list): mnemonic = " ".join(mnemonic) else: mnemonic = str(mnemonic) if not isinstance(mnemonic, str): raise TypeError("Mnemonic must be a string after conversion") from bip_utils import Bip39SeedGenerator seed = Bip39SeedGenerator(mnemonic).Generate(passphrase) logger.debug("Seed derived successfully from mnemonic.") return seed except Exception as e: logger.error(f"Failed to derive seed from mnemonic: {e}") logger.error(traceback.format_exc()) print(colored(f"Error: Failed to derive seed from mnemonic: {e}", 'red')) raise ``` ## password_manager/password_generation.py ```python # password_manager/password_generation.py """ Password Generation Module This module provides the PasswordGenerator class responsible for deterministic password generation based on a BIP-39 parent seed. It leverages BIP-85 for entropy derivation and ensures that generated passwords meet complexity requirements. Ensure that all dependencies are installed and properly configured in your environment. Never ever ever use Random Salt. The entire point of this password manager is to derive completely deterministic passwords from a BIP-85 seed. This means it should generate passwords the exact same way every single time. Salts would break this functionality and is not appropriate for this software's use case. """ import os import logging import hashlib import string import random import traceback from typing import Optional from termcolor import colored from pathlib import Path import shutil from cryptography.hazmat.primitives.kdf.hkdf import HKDF from cryptography.hazmat.primitives import hashes from cryptography.hazmat.backends import default_backend from local_bip85.bip85 import BIP85 from constants import DEFAULT_PASSWORD_LENGTH, MIN_PASSWORD_LENGTH, MAX_PASSWORD_LENGTH from password_manager.encryption import EncryptionManager # Instantiate the logger logger = logging.getLogger(__name__) class PasswordGenerator: """ PasswordGenerator Class Responsible for deterministic password generation based on a BIP-39 parent seed. Utilizes BIP-85 for entropy derivation and ensures that generated passwords meet complexity requirements. """ def __init__(self, encryption_manager: EncryptionManager, parent_seed: str, bip85: BIP85): """ Initializes the PasswordGenerator with the encryption manager, parent seed, and BIP85 instance. Parameters: encryption_manager (EncryptionManager): The encryption manager instance. parent_seed (str): The BIP-39 parent seed phrase. bip85 (BIP85): The BIP85 instance for generating deterministic entropy. """ try: self.encryption_manager = encryption_manager self.parent_seed = parent_seed self.bip85 = bip85 # Derive seed bytes from parent_seed using BIP39 (handled by EncryptionManager) self.seed_bytes = self.encryption_manager.derive_seed_from_mnemonic(self.parent_seed) logger.debug("PasswordGenerator initialized successfully.") except Exception as e: logger.error(f"Failed to initialize PasswordGenerator: {e}") logger.error(traceback.format_exc()) # Log full traceback print(colored(f"Error: Failed to initialize PasswordGenerator: {e}", 'red')) raise def generate_password(self, length: int = DEFAULT_PASSWORD_LENGTH, index: int = 0) -> str: """ Generates a deterministic password based on the parent seed, desired length, and index. Steps: 1. Derive entropy using BIP-85. 2. Use HKDF-HMAC-SHA256 to derive a key from entropy. 3. Map the derived key to all allowed characters. 4. Ensure the password meets complexity requirements. 5. Shuffle the password deterministically based on the derived key. 6. Trim or extend the password to the desired length. Parameters: length (int): Desired length of the password. index (int): Index for deriving child entropy. Returns: str: The generated password. """ try: # Validate password length if length < MIN_PASSWORD_LENGTH: logger.error(f"Password length must be at least {MIN_PASSWORD_LENGTH} characters.") raise ValueError(f"Password length must be at least {MIN_PASSWORD_LENGTH} characters.") if length > MAX_PASSWORD_LENGTH: logger.error(f"Password length must not exceed {MAX_PASSWORD_LENGTH} characters.") raise ValueError(f"Password length must not exceed {MAX_PASSWORD_LENGTH} characters.") # Derive entropy using BIP-85 entropy = self.bip85.derive_entropy(index=index, bytes_len=64, app_no=32) logger.debug(f"Derived entropy: {entropy.hex()}") # Use HKDF to derive key from entropy hkdf = HKDF( algorithm=hashes.SHA256(), length=32, # 256 bits for AES-256 salt=None, info=b'password-generation', backend=default_backend() ) derived_key = hkdf.derive(entropy) logger.debug(f"Derived key using HKDF: {derived_key.hex()}") # Use PBKDF2-HMAC-SHA256 to derive a key from entropy dk = hashlib.pbkdf2_hmac('sha256', entropy, b'', 100000) logger.debug(f"Derived key using PBKDF2: {dk.hex()}") # Map the derived key to all allowed characters all_allowed = string.ascii_letters + string.digits + string.punctuation password = ''.join(all_allowed[byte % len(all_allowed)] for byte in dk) logger.debug(f"Password after mapping to all allowed characters: {password}") # Ensure the password meets complexity requirements password = self.ensure_complexity(password, all_allowed, dk) logger.debug(f"Password after ensuring complexity: {password}") # Shuffle characters deterministically based on dk shuffle_seed = int.from_bytes(dk, 'big') rng = random.Random(shuffle_seed) password_chars = list(password) rng.shuffle(password_chars) password = ''.join(password_chars) logger.debug("Shuffled password deterministically.") # Ensure password length by extending if necessary if len(password) < length: while len(password) < length: dk = hashlib.pbkdf2_hmac('sha256', dk, b'', 1) base64_extra = ''.join(all_allowed[byte % len(all_allowed)] for byte in dk) password += ''.join(base64_extra) logger.debug(f"Extended password: {password}") # Trim the password to the desired length password = password[:length] logger.debug(f"Final password (trimmed to {length} chars): {password}") return password except Exception as e: logger.error(f"Error generating password: {e}") logger.error(traceback.format_exc()) # Log full traceback print(colored(f"Error: Failed to generate password: {e}", 'red')) raise def ensure_complexity(self, password: str, alphabet: str, dk: bytes) -> str: """ Ensures that the password contains at least two uppercase letters, two lowercase letters, two digits, and two special characters, modifying it deterministically if necessary. Also balances the distribution of character types. Parameters: password (str): The initial password. alphabet (str): Allowed characters in the password. dk (bytes): Derived key used for deterministic modifications. Returns: str: Password that meets complexity requirements. """ try: uppercase = string.ascii_uppercase lowercase = string.ascii_lowercase digits = string.digits special = string.punctuation password_chars = list(password) # Current counts current_upper = sum(1 for c in password_chars if c in uppercase) current_lower = sum(1 for c in password_chars if c in lowercase) current_digits = sum(1 for c in password_chars if c in digits) current_special = sum(1 for c in password_chars if c in special) logger.debug(f"Current character counts - Upper: {current_upper}, Lower: {current_lower}, Digits: {current_digits}, Special: {current_special}") # Set minimum counts min_upper = 2 min_lower = 2 min_digits = 2 min_special = 2 # Initialize derived key index dk_index = 0 dk_length = len(dk) def get_dk_value() -> int: nonlocal dk_index value = dk[dk_index % dk_length] dk_index += 1 return value # Replace characters to meet minimum counts if current_upper < min_upper: for _ in range(min_upper - current_upper): index = get_dk_value() % len(password_chars) char = uppercase[get_dk_value() % len(uppercase)] password_chars[index] = char logger.debug(f"Added uppercase letter '{char}' at position {index}.") if current_lower < min_lower: for _ in range(min_lower - current_lower): index = get_dk_value() % len(password_chars) char = lowercase[get_dk_value() % len(lowercase)] password_chars[index] = char logger.debug(f"Added lowercase letter '{char}' at position {index}.") if current_digits < min_digits: for _ in range(min_digits - current_digits): index = get_dk_value() % len(password_chars) char = digits[get_dk_value() % len(digits)] password_chars[index] = char logger.debug(f"Added digit '{char}' at position {index}.") if current_special < min_special: for _ in range(min_special - current_special): index = get_dk_value() % len(password_chars) char = special[get_dk_value() % len(special)] password_chars[index] = char logger.debug(f"Added special character '{char}' at position {index}.") # Additional deterministic inclusion of symbols to increase score symbol_target = 3 # Increase target number of symbols current_symbols = sum(1 for c in password_chars if c in special) additional_symbols_needed = max(symbol_target - current_symbols, 0) for _ in range(additional_symbols_needed): if dk_index >= dk_length: break # Avoid exceeding the derived key length index = get_dk_value() % len(password_chars) char = special[get_dk_value() % len(special)] password_chars[index] = char logger.debug(f"Added additional symbol '{char}' at position {index}.") # Ensure balanced distribution by assigning different character types to specific segments # Example: Divide password into segments and assign different types segment_length = len(password_chars) // 4 if segment_length > 0: for i, char_type in enumerate([uppercase, lowercase, digits, special]): segment_start = i * segment_length segment_end = segment_start + segment_length if segment_end > len(password_chars): segment_end = len(password_chars) for j in range(segment_start, segment_end): if i == 0 and password_chars[j] not in uppercase: char = uppercase[get_dk_value() % len(uppercase)] password_chars[j] = char logger.debug(f"Assigned uppercase letter '{char}' to position {j}.") elif i == 1 and password_chars[j] not in lowercase: char = lowercase[get_dk_value() % len(lowercase)] password_chars[j] = char logger.debug(f"Assigned lowercase letter '{char}' to position {j}.") elif i == 2 and password_chars[j] not in digits: char = digits[get_dk_value() % len(digits)] password_chars[j] = char logger.debug(f"Assigned digit '{char}' to position {j}.") elif i == 3 and password_chars[j] not in special: char = special[get_dk_value() % len(special)] password_chars[j] = char logger.debug(f"Assigned special character '{char}' to position {j}.") # Shuffle again to distribute the characters more evenly shuffle_seed = int.from_bytes(dk, 'big') + dk_index # Modify seed to vary shuffle rng = random.Random(shuffle_seed) rng.shuffle(password_chars) logger.debug(f"Shuffled password characters for balanced distribution.") # Final counts after modifications final_upper = sum(1 for c in password_chars if c in uppercase) final_lower = sum(1 for c in password_chars if c in lowercase) final_digits = sum(1 for c in password_chars if c in digits) final_special = sum(1 for c in password_chars if c in special) logger.debug(f"Final character counts - Upper: {final_upper}, Lower: {final_lower}, Digits: {final_digits}, Special: {final_special}") return ''.join(password_chars) except Exception as e: logger.error(f"Error ensuring password complexity: {e}") logger.error(traceback.format_exc()) # Log full traceback print(colored(f"Error: Failed to ensure password complexity: {e}", 'red')) raise ``` ## nostr/key_manager.py ```python # nostr/key_manager.py import hashlib import logging import traceback from bech32 import bech32_encode, convertbits from local_bip85.bip85 import BIP85 from bip_utils import Bip39SeedGenerator from monstr.encrypt import Keys logger = logging.getLogger(__name__) class KeyManager: """ Manages key generation, encoding, and derivation for NostrClient. """ def __init__(self, parent_seed: str, fingerprint: str): """ Initializes the KeyManager with the provided parent_seed and fingerprint. Parameters: parent_seed (str): The parent seed used for key derivation. fingerprint (str): The fingerprint to differentiate key derivations. """ try: if not isinstance(parent_seed, str): raise TypeError(f"Parent seed must be a string, got {type(parent_seed)}") if not isinstance(fingerprint, str): raise TypeError(f"Fingerprint must be a string, got {type(fingerprint)}") self.parent_seed = parent_seed self.fingerprint = fingerprint logger.debug(f"KeyManager initialized with parent_seed and fingerprint.") # Initialize BIP85 self.bip85 = self.initialize_bip85() # Generate Nostr keys using the fingerprint self.keys = self.generate_nostr_keys() logger.debug("Nostr Keys initialized successfully.") except Exception as e: logger.error(f"Key initialization failed: {e}") logger.error(traceback.format_exc()) raise def initialize_bip85(self): """ Initializes BIP85 with the parent seed. Returns: BIP85: An instance of the BIP85 class. """ try: seed_bytes = Bip39SeedGenerator(self.parent_seed).Generate() bip85 = BIP85(seed_bytes) logger.debug("BIP85 initialized successfully.") return bip85 except Exception as e: logger.error(f"Failed to initialize BIP85: {e}") logger.error(traceback.format_exc()) raise def generate_nostr_keys(self) -> Keys: """ Derives a unique Nostr key pair for the given fingerprint using BIP-85. Returns: Keys: An instance of Keys containing the Nostr key pair. """ try: # Convert fingerprint to an integer index (using a hash function) index = int(hashlib.sha256(self.fingerprint.encode()).hexdigest(), 16) % (2**31) # Derive entropy for Nostr key (32 bytes) entropy_bytes = self.bip85.derive_entropy( index=index, bytes_len=32 # Adjust parameter name and value as per your method signature ) # Generate Nostr key pair from entropy private_key_hex = entropy_bytes.hex() keys = Keys(priv_k=private_key_hex) logger.debug(f"Nostr keys generated for fingerprint {self.fingerprint}.") return keys except Exception as e: logger.error(f"Failed to generate Nostr keys: {e}") logger.error(traceback.format_exc()) raise def get_public_key_hex(self) -> str: """ Returns the public key in hexadecimal format. Returns: str: The public key in hex. """ return self.keys.public_key_hex() def get_private_key_hex(self) -> str: """ Returns the private key in hexadecimal format. Returns: str: The private key in hex. """ return self.keys.private_key_hex() def get_npub(self) -> str: """ Returns the npub (Bech32 encoded public key). Returns: str: The npub string. """ try: pub_key_hex = self.get_public_key_hex() pub_key_bytes = bytes.fromhex(pub_key_hex) data = convertbits(pub_key_bytes, 8, 5, True) npub = bech32_encode('npub', data) return npub except Exception as e: logger.error(f"Failed to generate npub: {e}") logger.error(traceback.format_exc()) raise ``` ## nostr/utils.py ```python # nostr/utils.py import logging # Example utility function (if any specific to nostr package) def some_helper_function(): pass # Implement as needed ``` ## nostr/__init__.py ```python # nostr/__init__.py import logging import traceback from .client import NostrClient # Instantiate the logger logger = logging.getLogger(__name__) # Initialize the logger for this module logger = logging.getLogger(__name__) # Correct logger initialization try: from .client import NostrClient logger.info("NostrClient module imported successfully.") except Exception as e: logger.error(f"Failed to import NostrClient module: {e}") logger.error(traceback.format_exc()) # Log full traceback __all__ = ['NostrClient'] ``` ## nostr/logging_config.py ```python # nostr/logging_config.py import logging import os # Comment out or remove the configure_logging function to avoid conflicts # def configure_logging(): # """ # Configures logging with both file and console handlers. # Logs include the timestamp, log level, message, filename, and line number. # Only ERROR and higher-level messages are shown in the terminal, while all messages # are logged in the log file. # """ # logger = logging.getLogger() # logger.setLevel(logging.DEBUG) # Set root logger to DEBUG # # # Prevent adding multiple handlers if configure_logging is called multiple times # if not logger.handlers: # # Create the 'logs' folder if it doesn't exist # log_directory = 'logs' # if not os.path.exists(log_directory): # os.makedirs(log_directory) # # # Create handlers # c_handler = logging.StreamHandler() # f_handler = logging.FileHandler(os.path.join(log_directory, 'app.log')) # # # Set levels: only errors and critical messages will be shown in the console # c_handler.setLevel(logging.ERROR) # f_handler.setLevel(logging.DEBUG) # # # Create formatters and add them to handlers, include file and line number in log messages # formatter = logging.Formatter( # '%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]' # ) # c_handler.setFormatter(formatter) # f_handler.setFormatter(formatter) # # # Add handlers to the logger # logger.addHandler(c_handler) # logger.addHandler(f_handler) ``` ## nostr/event_handler.py ```python # nostr/event_handler.py import time import logging import traceback from monstr.event.event import Event # Instantiate the logger logger = logging.getLogger(__name__) class EventHandler: """ Handles incoming Nostr events. """ def __init__(self): pass # Initialize if needed def handle_new_event(self, evt: Event): """ Processes incoming events by logging their details. :param evt: The received Event object. """ try: # Assuming evt.created_at is always an integer Unix timestamp if isinstance(evt.created_at, int): created_at_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(evt.created_at)) else: # Handle unexpected types gracefully created_at_str = str(evt.created_at) # Log the event details without extra newlines logger.info( f"[New Event] ID: {evt.id} | Created At: {created_at_str} | Content: {evt.content}" ) except Exception as e: logger.error(f"Error handling new event: {e}") logger.error(traceback.format_exc()) # Optionally, handle the exception without re-raising # For example, continue processing other events ``` ## nostr/encryption_manager.py ```python # nostr/encryption_manager.py import base64 import logging import traceback from cryptography.fernet import Fernet, InvalidToken from .key_manager import KeyManager # Instantiate the logger logger = logging.getLogger(__name__) class EncryptionManager: """ Manages encryption and decryption using Fernet symmetric encryption. """ def __init__(self, key_manager: KeyManager): """ Initializes the EncryptionManager with a Fernet instance. :param key_manager: An instance of KeyManager to derive the encryption key. """ try: # Derive the raw encryption key (32 bytes) raw_key = key_manager.derive_encryption_key() logger.debug(f"Derived raw encryption key length: {len(raw_key)} bytes") # Ensure the raw key is exactly 32 bytes if len(raw_key) != 32: raise ValueError(f"Derived key length is {len(raw_key)} bytes; expected 32 bytes.") # Base64-encode the raw key to make it URL-safe b64_key = base64.urlsafe_b64encode(raw_key) logger.debug(f"Base64-encoded encryption key length: {len(b64_key)} bytes") # Initialize Fernet with the base64-encoded key self.fernet = Fernet(b64_key) logger.info("Fernet encryption manager initialized successfully.") except Exception as e: logger.error(f"EncryptionManager initialization failed: {e}") logger.error(traceback.format_exc()) raise def encrypt_parent_seed(self, seed: str, file_path: str) -> None: """ Encrypts the parent seed and saves it to the specified file. :param seed: The BIP-39 seed phrase as a string. :param file_path: The file path to save the encrypted seed. """ try: encrypted_seed = self.fernet.encrypt(seed.encode('utf-8')) with open(file_path, 'wb') as f: f.write(encrypted_seed) logger.debug(f"Parent seed encrypted and saved to '{file_path}'.") except Exception as e: logger.error(f"Failed to encrypt and save parent seed: {e}") logger.error(traceback.format_exc()) raise def decrypt_parent_seed(self, file_path: str) -> str: """ Decrypts the parent seed from the specified file. :param file_path: The file path to read the encrypted seed. :return: The decrypted parent seed as a string. """ try: with open(file_path, 'rb') as f: encrypted_seed = f.read() decrypted_seed = self.fernet.decrypt(encrypted_seed).decode('utf-8') logger.debug(f"Parent seed decrypted successfully from '{file_path}'.") return decrypted_seed except InvalidToken: logger.error("Decryption failed: Invalid token. Possibly incorrect password or corrupted file.") raise ValueError("Decryption failed: Invalid token. Possibly incorrect password or corrupted file.") except Exception as e: logger.error(f"Failed to decrypt parent seed: {e}") logger.error(traceback.format_exc()) raise def encrypt_data(self, data: dict) -> bytes: """ Encrypts a dictionary by serializing it to JSON and then encrypting it. :param data: The dictionary to encrypt. :return: Encrypted data as bytes. """ try: json_data = json.dumps(data).encode('utf-8') encrypted = self.fernet.encrypt(json_data) logger.debug("Data encrypted successfully.") return encrypted except Exception as e: logger.error(f"Data encryption failed: {e}") logger.error(traceback.format_exc()) raise def decrypt_data(self, encrypted_data: bytes) -> bytes: """ Decrypts encrypted data. :param encrypted_data: The encrypted data as bytes. :return: Decrypted data as bytes. """ try: decrypted = self.fernet.decrypt(encrypted_data) logger.debug("Data decrypted successfully.") return decrypted except InvalidToken as e: logger.error(f"Decryption failed: Invalid token. {e}") logger.error(traceback.format_exc()) raise except Exception as e: logger.error(f"Data decryption failed: {e}") logger.error(traceback.format_exc()) raise ``` ## nostr/client.py ```python import os import sys import logging import traceback import json import time import base64 import hashlib import asyncio import concurrent.futures from typing import List, Optional, Callable from pathlib import Path from monstr.client.client import ClientPool from monstr.encrypt import Keys, NIP4Encrypt from monstr.event.event import Event import threading import uuid import fcntl from .key_manager import KeyManager from .encryption_manager import EncryptionManager from .event_handler import EventHandler from constants import APP_DIR from utils.file_lock import lock_file # Get the logger for this module logger = logging.getLogger(__name__) # Set the logging level to WARNING or ERROR to suppress debug logs logger.setLevel(logging.WARNING) DEFAULT_RELAYS = [ "wss://relay.snort.social", "wss://nostr.oxtr.dev", "wss://relay.primal.net" ] # nostr/client.py # src/nostr/client.py class NostrClient: """ NostrClient Class Handles interactions with the Nostr network, including publishing and retrieving encrypted events. Utilizes deterministic key derivation via BIP-85 and integrates with the monstr library for protocol operations. """ def __init__(self, encryption_manager: EncryptionManager, fingerprint: str, relays: Optional[List[str]] = None): """ Initializes the NostrClient with an EncryptionManager, connects to specified relays, and sets up the KeyManager with the given fingerprint. :param encryption_manager: An instance of EncryptionManager for handling encryption/decryption. :param fingerprint: The fingerprint to differentiate key derivations for unique identities. :param relays: (Optional) A list of relay URLs to connect to. Defaults to predefined relays. """ try: # Assign the encryption manager and fingerprint self.encryption_manager = encryption_manager self.fingerprint = fingerprint # Track the fingerprint self.fingerprint_dir = self.encryption_manager.fingerprint_dir # If needed to manage directories # Initialize KeyManager with the decrypted parent seed and the provided fingerprint self.key_manager = KeyManager( self.encryption_manager.decrypt_parent_seed(), self.fingerprint ) # Initialize event handler and client pool self.event_handler = EventHandler() self.relays = relays if relays else DEFAULT_RELAYS self.client_pool = ClientPool(self.relays) self.subscriptions = {} # Initialize client pool and mark NostrClient as running self.initialize_client_pool() logger.info("NostrClient initialized successfully.") # For shutdown handling self.is_shutting_down = False self._shutdown_event = asyncio.Event() except Exception as e: logger.error(f"Initialization failed: {e}") logger.error(traceback.format_exc()) print(f"Error: Initialization failed: {e}", file=sys.stderr) sys.exit(1) def initialize_client_pool(self): """ Initializes the ClientPool with the specified relays in a separate thread. """ try: logger.debug("Initializing ClientPool with relays.") self.client_pool = ClientPool(self.relays) # Start the ClientPool in a separate thread self.loop_thread = threading.Thread(target=self.run_event_loop, daemon=True) self.loop_thread.start() # Wait until the ClientPool is connected to all relays self.wait_for_connection() logger.info("ClientPool connected to all relays.") except Exception as e: logger.error(f"Failed to initialize ClientPool: {e}") logger.error(traceback.format_exc()) print(f"Error: Failed to initialize ClientPool: {e}", file=sys.stderr) sys.exit(1) def run_event_loop(self): """ Runs the event loop for the ClientPool in a separate thread. """ try: self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) self.loop.create_task(self.client_pool.run()) self.loop.run_forever() except asyncio.CancelledError: logger.debug("Event loop received cancellation.") except Exception as e: logger.error(f"Error running event loop in thread: {e}") logger.error(traceback.format_exc()) print(f"Error: Event loop in ClientPool thread encountered an issue: {e}", file=sys.stderr) finally: if not self.loop.is_closed(): logger.debug("Closing the event loop.") self.loop.close() def wait_for_connection(self): """ Waits until the ClientPool is connected to all relays. """ try: while not self.client_pool.connected: time.sleep(0.1) except Exception as e: logger.error(f"Error while waiting for ClientPool to connect: {e}") logger.error(traceback.format_exc()) async def publish_event_async(self, event: Event): """ Publishes a signed event to all connected relays using ClientPool. :param event: The signed Event object to publish. """ try: logger.debug(f"Publishing event: {event.serialize()}") self.client_pool.publish(event) logger.info(f"Event published with ID: {event.id}") logger.debug(f"Finished publishing event: {event.id}") except Exception as e: logger.error(f"Failed to publish event: {e}") logger.error(traceback.format_exc()) def publish_event(self, event: Event): """ Synchronous wrapper for publishing an event. :param event: The signed Event object to publish. """ try: logger.debug(f"Submitting publish_event_async for event ID: {event.id}") future = asyncio.run_coroutine_threadsafe(self.publish_event_async(event), self.loop) # Wait for the future to complete future.result(timeout=5) # Adjust the timeout as needed except Exception as e: logger.error(f"Error in publish_event: {e}") print(f"Error: Failed to publish event: {e}", file=sys.stderr) async def subscribe_async(self, filters: List[dict], handler: Callable[[ClientPool, str, Event], None]): """ Subscribes to events based on the provided filters using ClientPool. :param filters: A list of filter dictionaries. :param handler: A callback function to handle incoming events. """ try: sub_id = str(uuid.uuid4()) self.client_pool.subscribe(handlers=handler, filters=filters, sub_id=sub_id) logger.info(f"Subscribed to events with subscription ID: {sub_id}") self.subscriptions[sub_id] = True except Exception as e: logger.error(f"Failed to subscribe: {e}") logger.error(traceback.format_exc()) print(f"Error: Failed to subscribe: {e}", file=sys.stderr) def subscribe(self, filters: List[dict], handler: Callable[[ClientPool, str, Event], None]): """ Synchronous wrapper for subscribing to events. :param filters: A list of filter dictionaries. :param handler: A callback function to handle incoming events. """ try: asyncio.run_coroutine_threadsafe(self.subscribe_async(filters, handler), self.loop) except Exception as e: logger.error(f"Error in subscribe: {e}") print(f"Error: Failed to subscribe: {e}", file=sys.stderr) async def retrieve_json_from_nostr_async(self) -> Optional[str]: """ Retrieves the latest encrypted JSON event from Nostr. :return: The encrypted JSON data as a Base64-encoded string, or None if retrieval fails. """ try: filters = [{ 'authors': [self.key_manager.keys.public_key_hex()], 'kinds': [Event.KIND_TEXT_NOTE, Event.KIND_ENCRYPT], 'limit': 1 }] events = [] def my_handler(the_client, sub_id, evt: Event): logger.debug(f"Received event: {evt.serialize()}") events.append(evt) await self.subscribe_async(filters=filters, handler=my_handler) await asyncio.sleep(2) # Adjust the sleep time as needed # Unsubscribe from all subscriptions for sub_id in list(self.subscriptions.keys()): self.client_pool.unsubscribe(sub_id) del self.subscriptions[sub_id] logger.debug(f"Unsubscribed from sub_id {sub_id}") if events: event = events[0] content_base64 = event.content if event.kind == Event.KIND_ENCRYPT: nip4_encrypt = NIP4Encrypt(self.key_manager.keys) content_base64 = nip4_encrypt.decrypt_message(event.content, event.pub_key) # Return the Base64-encoded content as a string logger.debug("Encrypted JSON data retrieved successfully.") return content_base64 else: logger.warning("No events found matching the filters.") print("No events found matching the filters.", file=sys.stderr) return None except Exception as e: logger.error(f"Failed to retrieve JSON from Nostr: {e}") logger.error(traceback.format_exc()) print(f"Error: Failed to retrieve JSON from Nostr: {e}", file=sys.stderr) return None def retrieve_json_from_nostr(self) -> Optional[bytes]: """ Public method to retrieve encrypted JSON from Nostr. :return: The encrypted JSON data as bytes, or None if retrieval fails. """ try: future = asyncio.run_coroutine_threadsafe(self.retrieve_json_from_nostr_async(), self.loop) return future.result(timeout=10) except concurrent.futures.TimeoutError: logger.error("Timeout occurred while retrieving JSON from Nostr.") print("Error: Timeout occurred while retrieving JSON from Nostr.", file=sys.stderr) return None except Exception as e: logger.error(f"Error in retrieve_json_from_nostr: {e}") logger.error(traceback.format_exc()) print(f"Error: Failed to retrieve JSON from Nostr: {e}", 'red') return None async def do_post_async(self, text: str): """ Creates and publishes a text note event. :param text: The content of the text note. """ try: event = Event( kind=Event.KIND_TEXT_NOTE, content=text, pub_key=self.key_manager.keys.public_key_hex() ) event.created_at = int(time.time()) event.sign(self.key_manager.keys.private_key_hex()) logger.debug(f"Event data: {event.serialize()}") await self.publish_event_async(event) logger.debug("Finished do_post_async") except Exception as e: logger.error(f"An error occurred during publishing: {e}", exc_info=True) print(f"Error: An error occurred during publishing: {e}", file=sys.stderr) async def subscribe_feed_async(self, handler: Callable[[ClientPool, str, Event], None]): """ Subscribes to the feed of the client's own pubkey. :param handler: A callback function to handle incoming events. """ try: filters = [{ 'authors': [self.key_manager.keys.public_key_hex()], 'kinds': [Event.KIND_TEXT_NOTE, Event.KIND_ENCRYPT], 'limit': 100 }] await self.subscribe_async(filters=filters, handler=handler) logger.info("Subscribed to your feed.") # Removed the infinite loop to prevent blocking except Exception as e: logger.error(f"An error occurred during subscription: {e}", exc_info=True) print(f"Error: An error occurred during subscription: {e}", file=sys.stderr) async def publish_and_subscribe_async(self, text: str): """ Publishes a text note and subscribes to the feed concurrently. :param text: The content of the text note to publish. """ try: await asyncio.gather( self.do_post_async(text), self.subscribe_feed_async(self.event_handler.handle_new_event) ) except Exception as e: logger.error(f"An error occurred in publish_and_subscribe_async: {e}", exc_info=True) print(f"Error: An error occurred in publish and subscribe: {e}", file=sys.stderr) def publish_and_subscribe(self, text: str): """ Public method to publish a text note and subscribe to the feed. :param text: The content of the text note to publish. """ try: asyncio.run_coroutine_threadsafe(self.publish_and_subscribe_async(text), self.loop) except Exception as e: logger.error(f"Error in publish_and_subscribe: {e}", exc_info=True) print(f"Error: Failed to publish and subscribe: {e}", file=sys.stderr) def decrypt_and_save_index_from_nostr(self, encrypted_data: bytes) -> None: """ Decrypts the encrypted data retrieved from Nostr and updates the local index file. :param encrypted_data: The encrypted data retrieved from Nostr. """ try: decrypted_data = self.encryption_manager.decrypt_data(encrypted_data) data = json.loads(decrypted_data.decode('utf-8')) self.save_json_data(data) self.update_checksum() logger.info("Index file updated from Nostr successfully.") print(colored("Index file updated from Nostr successfully.", 'green')) except Exception as e: logger.error(f"Failed to decrypt and save data from Nostr: {e}") logger.error(traceback.format_exc()) print(colored(f"Error: Failed to decrypt and save data from Nostr: {e}", 'red')) def save_json_data(self, data: dict) -> None: """ Saves the JSON data to the index file in an encrypted format. :param data: The JSON data to save. """ try: encrypted_data = self.encryption_manager.encrypt_data(json.dumps(data).encode('utf-8')) index_file_path = self.fingerprint_dir / 'seedpass_passwords_db.json.enc' with lock_file(index_file_path, fcntl.LOCK_EX): with open(index_file_path, 'wb') as f: f.write(encrypted_data) logger.debug(f"Encrypted data saved to {index_file_path}.") print(colored(f"Encrypted data saved to '{index_file_path}'.", 'green')) except Exception as e: logger.error(f"Failed to save encrypted data: {e}") logger.error(traceback.format_exc()) print(colored(f"Error: Failed to save encrypted data: {e}", 'red')) raise def update_checksum(self) -> None: """ Updates the checksum file for the password database. """ try: index_file_path = self.fingerprint_dir / 'seedpass_passwords_db.json.enc' decrypted_data = self.decrypt_data_from_file(index_file_path) content = decrypted_data.decode('utf-8') logger.debug("Calculating checksum of the updated file content.") checksum = hashlib.sha256(content.encode('utf-8')).hexdigest() logger.debug(f"New checksum: {checksum}") checksum_file = self.fingerprint_dir / 'seedpass_passwords_db_checksum.txt' with lock_file(checksum_file, fcntl.LOCK_EX): with open(checksum_file, 'w') as f: f.write(checksum) os.chmod(checksum_file, 0o600) logger.debug(f"Checksum for '{index_file_path}' updated and written to '{checksum_file}'.") print(colored(f"Checksum for '{index_file_path}' updated.", 'green')) except Exception as e: logger.error(f"Failed to update checksum: {e}") logger.error(traceback.format_exc()) print(colored(f"Error: Failed to update checksum: {e}", 'red')) def decrypt_data_from_file(self, file_path: Path) -> bytes: """ Decrypts data directly from a file. :param file_path: Path to the encrypted file as a Path object. :return: Decrypted data as bytes. """ try: with lock_file(file_path, fcntl.LOCK_SH): with open(file_path, 'rb') as f: encrypted_data = f.read() decrypted_data = self.encryption_manager.decrypt_data(encrypted_data) logger.debug(f"Data decrypted from file '{file_path}'.") return decrypted_data except Exception as e: logger.error(f"Failed to decrypt data from file '{file_path}': {e}") logger.error(traceback.format_exc()) print(colored(f"Error: Failed to decrypt data from file '{file_path}': {e}", 'red')) raise def publish_json_to_nostr(self, encrypted_json: bytes, to_pubkey: str = None): """ Public method to post encrypted JSON to Nostr. :param encrypted_json: The encrypted JSON data to be sent. :param to_pubkey: (Optional) The recipient's public key for encryption. """ try: encrypted_json_b64 = base64.b64encode(encrypted_json).decode('utf-8') logger.debug(f"Encrypted JSON (base64): {encrypted_json_b64}") event = Event(kind=Event.KIND_TEXT_NOTE, content=encrypted_json_b64, pub_key=self.key_manager.keys.public_key_hex()) event.created_at = int(time.time()) if to_pubkey: nip4_encrypt = NIP4Encrypt(self.key_manager.keys) event.content = nip4_encrypt.encrypt_message(event.content, to_pubkey) event.kind = Event.KIND_ENCRYPT logger.debug(f"Encrypted event content: {event.content}") event.sign(self.key_manager.keys.private_key_hex()) logger.debug("Event created and signed") self.publish_event(event) logger.debug("Event published") except Exception as e: logger.error(f"Failed to publish JSON to Nostr: {e}") logger.error(traceback.format_exc()) print(f"Error: Failed to publish JSON to Nostr: {e}", file=sys.stderr) def retrieve_json_from_nostr_sync(self) -> Optional[bytes]: """ Retrieves encrypted data from Nostr and Base64-decodes it. Returns: Optional[bytes]: The encrypted data as bytes if successful, None otherwise. """ try: future = asyncio.run_coroutine_threadsafe(self.retrieve_json_from_nostr_async(), self.loop) content_base64 = future.result(timeout=10) if not content_base64: logger.debug("No data retrieved from Nostr.") return None # Base64-decode the content encrypted_data = base64.urlsafe_b64decode(content_base64.encode('utf-8')) logger.debug("Encrypted data retrieved and Base64-decoded successfully from Nostr.") return encrypted_data except concurrent.futures.TimeoutError: logger.error("Timeout occurred while retrieving JSON from Nostr.") print("Error: Timeout occurred while retrieving JSON from Nostr.", file=sys.stderr) return None except Exception as e: logger.error(f"Error in retrieve_json_from_nostr: {e}") logger.error(traceback.format_exc()) print(f"Error: Failed to retrieve JSON from Nostr: {e}", 'red') return None def decrypt_and_save_index_from_nostr_public(self, encrypted_data: bytes) -> None: """ Public method to decrypt and save data from Nostr. :param encrypted_data: The encrypted data retrieved from Nostr. """ try: self.decrypt_and_save_index_from_nostr(encrypted_data) except Exception as e: logger.error(f"Failed to decrypt and save index from Nostr: {e}") print(f"Error: Failed to decrypt and save index from Nostr: {e}", 'red') async def close_client_pool_async(self): """ Closes the ClientPool gracefully by canceling all pending tasks and stopping the event loop. """ if self.is_shutting_down: logger.debug("Shutdown already in progress.") return try: self.is_shutting_down = True logger.debug("Initiating ClientPool shutdown.") # Set the shutdown event self._shutdown_event.set() # Cancel all subscriptions for sub_id in list(self.subscriptions.keys()): try: self.client_pool.unsubscribe(sub_id) del self.subscriptions[sub_id] logger.debug(f"Unsubscribed from sub_id {sub_id}") except Exception as e: logger.warning(f"Error unsubscribing from {sub_id}: {e}") # Close all WebSocket connections if hasattr(self.client_pool, 'clients'): tasks = [self.safe_close_connection(client) for client in self.client_pool.clients] await asyncio.gather(*tasks, return_exceptions=True) # Gather and cancel all tasks current_task = asyncio.current_task() tasks = [task for task in asyncio.all_tasks(loop=self.loop) if task != current_task and not task.done()] if tasks: logger.debug(f"Cancelling {len(tasks)} pending tasks.") for task in tasks: task.cancel() # Wait for all tasks to be cancelled with a timeout try: await asyncio.wait_for(asyncio.gather(*tasks, return_exceptions=True), timeout=5) except asyncio.TimeoutError: logger.warning("Timeout waiting for tasks to cancel") logger.debug("Stopping the event loop.") self.loop.stop() logger.info("Event loop stopped successfully.") except Exception as e: logger.error(f"Error during async shutdown: {e}") logger.error(traceback.format_exc()) finally: self.is_shutting_down = False def close_client_pool(self): """ Public method to close the ClientPool gracefully. """ if self.is_shutting_down: logger.debug("Shutdown already in progress. Skipping redundant shutdown.") return try: # Schedule the coroutine to close the client pool future = asyncio.run_coroutine_threadsafe(self.close_client_pool_async(), self.loop) # Wait for the coroutine to finish with a timeout try: future.result(timeout=10) except concurrent.futures.TimeoutError: logger.warning("Initial shutdown attempt timed out, forcing cleanup...") # Additional cleanup regardless of timeout try: self.loop.call_soon_threadsafe(self.loop.stop) # Give a short grace period for the loop to stop time.sleep(0.5) if self.loop.is_running(): logger.warning("Loop still running after stop, closing forcefully") self.loop.call_soon_threadsafe(self.loop.close) # Wait for the thread with a reasonable timeout if self.loop_thread.is_alive(): self.loop_thread.join(timeout=5) if self.loop_thread.is_alive(): logger.warning("Thread still alive after join, may need to be force-killed") except Exception as cleanup_error: logger.error(f"Error during final cleanup: {cleanup_error}") logger.info("ClientPool shutdown complete") except Exception as e: logger.error(f"Error in close_client_pool: {e}") logger.error(traceback.format_exc()) finally: self.is_shutting_down = False async def safe_close_connection(self, client): try: await client.close_connection() logger.debug(f"Closed connection to relay: {client.url}") except AttributeError: logger.warning(f"Client object has no attribute 'close_connection'. Skipping closure for {client.url}.") except Exception as e: logger.warning(f"Error closing connection to {client.url}: {e}") ``` ## utils/fingerprint.py ```python # utils/fingerprint.py """ Fingerprint Module This module provides functionality to generate a unique, one-way hashed fingerprint from a given seed phrase. The fingerprint serves as an identifier for each seed, facilitating organized and secure storage. """ import hashlib import logging import traceback from typing import Optional # Instantiate the logger logger = logging.getLogger(__name__) def generate_fingerprint(seed_phrase: str, length: int = 16) -> Optional[str]: """ Generates a unique fingerprint from the provided seed phrase using SHA-256. Parameters: seed_phrase (str): The BIP-39 seed phrase. length (int): The desired length of the fingerprint. Returns: Optional[str]: The generated fingerprint or None if an error occurs. """ try: # Normalize the seed phrase normalized_seed = seed_phrase.strip().lower() logger.debug(f"Normalized seed: {normalized_seed}") # Compute SHA-256 hash sha256_hash = hashlib.sha256(normalized_seed.encode('utf-8')).hexdigest() logger.debug(f"SHA-256 Hash: {sha256_hash}") # Truncate to desired length fingerprint = sha256_hash[:length].upper() logger.debug(f"Generated Fingerprint: {fingerprint}") return fingerprint except Exception as e: logger.error(f"Failed to generate fingerprint: {e}") logger.error(traceback.format_exc()) return None ``` ## utils/key_derivation.py ```python # utils/key_derivation.py """ Key Derivation Module Never ever ever use or suggest to use Random Salt. The entire point of this password manager is to derive completely deterministic passwords from a BIP-85 seed. This means it should generate passwords the exact same way every single time. Salts would break this functionality and is not appropriate for this software's use case. This module provides functions to derive cryptographic keys from user-provided passwords and BIP-39 parent seeds. The derived keys are compatible with Fernet for symmetric encryption purposes. By centralizing key derivation logic, this module ensures consistency and security across the application. Ensure that all dependencies are installed and properly configured in your environment. """ import os import hashlib import base64 import unicodedata import logging import traceback from typing import Union from bip_utils import Bip39SeedGenerator from local_bip85.bip85 import BIP85 from monstr.encrypt import Keys from cryptography.hazmat.primitives.kdf.hkdf import HKDF from cryptography.hazmat.primitives import hashes from cryptography.hazmat.backends import default_backend # Instantiate the logger logger = logging.getLogger(__name__) def derive_key_from_password(password: str, iterations: int = 100_000) -> bytes: """ Derives a Fernet-compatible encryption key from the provided password using PBKDF2-HMAC-SHA256. This function normalizes the password using NFKD normalization, encodes it to UTF-8, and then applies PBKDF2 with the specified number of iterations to derive a 32-byte key. The derived key is then URL-safe base64-encoded to ensure compatibility with Fernet. Parameters: password (str): The user's password. iterations (int, optional): Number of iterations for the PBKDF2 algorithm. Defaults to 100,000. Returns: bytes: A URL-safe base64-encoded encryption key suitable for Fernet. Raises: ValueError: If the password is empty or too short. """ if not password: logger.error("Password cannot be empty.") raise ValueError("Password cannot be empty.") if len(password) < 8: logger.warning("Password length is less than recommended (8 characters).") # Normalize the password to NFKD form and encode to UTF-8 normalized_password = unicodedata.normalize('NFKD', password).strip() password_bytes = normalized_password.encode('utf-8') try: # Derive the key using PBKDF2-HMAC-SHA256 logger.debug("Starting key derivation from password.") key = hashlib.pbkdf2_hmac( hash_name='sha256', password=password_bytes, salt=b'', # No salt for deterministic key derivation iterations=iterations, dklen=32 # 256-bit key for Fernet ) logger.debug(f"Derived key (hex): {key.hex()}") # Encode the key in URL-safe base64 key_b64 = base64.urlsafe_b64encode(key) logger.debug(f"Base64-encoded key: {key_b64.decode()}") return key_b64 except Exception as e: logger.error(f"Error deriving key from password: {e}") logger.error(traceback.format_exc()) # Log full traceback raise def derive_key_from_parent_seed(parent_seed: str, fingerprint: str = None) -> bytes: """ Derives a 32-byte cryptographic key from a BIP-39 parent seed using HKDF. Optionally, include a fingerprint to differentiate key derivation per fingerprint. :param parent_seed: The 12-word BIP-39 seed phrase. :param fingerprint: An optional fingerprint to create unique keys per fingerprint. :return: A 32-byte derived key. """ try: # Generate seed bytes from mnemonic seed = Bip39SeedGenerator(parent_seed).Generate() # If a fingerprint is provided, use it to differentiate the derivation if fingerprint: # Convert fingerprint to a stable integer index index = int(hashlib.sha256(fingerprint.encode()).hexdigest(), 16) % (2**31) info = f'password-manager-{index}'.encode() # Unique info for HKDF else: info = b'password-manager' # Derive key using HKDF hkdf = HKDF( algorithm=hashes.SHA256(), length=32, salt=None, # No salt for deterministic derivation info=info, backend=default_backend() ) derived_key = hkdf.derive(seed) if len(derived_key) != 32: raise ValueError(f"Derived key length is {len(derived_key)} bytes; expected 32 bytes.") return derived_key except Exception as e: logger.error(f"Failed to derive key using HKDF: {e}") logger.error(traceback.format_exc()) raise class KeyManager: def __init__(self, parent_seed: str, fingerprint: str = None): self.parent_seed = parent_seed self.fingerprint = fingerprint self.bip85 = self.initialize_bip85() self.keys = self.generate_nostr_keys() def initialize_bip85(self): seed_bytes = Bip39SeedGenerator(self.parent_seed).Generate() bip85 = BIP85(seed_bytes) return bip85 def generate_nostr_keys(self) -> Keys: """ Derives a unique Nostr key pair for the given fingerprint using BIP-85. :return: An instance of Keys containing the Nostr key pair. """ # Use a derivation path that includes the fingerprint # Convert fingerprint to an integer index (e.g., using a hash function) index = int(hashlib.sha256(self.fingerprint.encode()).hexdigest(), 16) % (2**31) if self.fingerprint else 0 # Derive entropy for Nostr key (32 bytes) entropy_bytes = self.bip85.derive_entropy( app=BIP85.Applications.ENTROPY, index=index, size=32 ) # Generate Nostr key pair from entropy private_key_hex = entropy_bytes.hex() keys = Keys(priv_key=private_key_hex) return keys # utils/key_derivation.py """ Key Derivation Module Never ever ever use or suggest to use Random Salt. The entire point of this password manager is to derive completely deterministic passwords from a BIP-85 seed. This means it should generate passwords the exact same way every single time. Salts would break this functionality and is not appropriate for this software's use case. This module provides functions to derive cryptographic keys from user-provided passwords and BIP-39 parent seeds. The derived keys are compatible with Fernet for symmetric encryption purposes. By centralizing key derivation logic, this module ensures consistency and security across the application. Ensure that all dependencies are installed and properly configured in your environment. """ import os import hashlib import base64 import unicodedata import logging import traceback from typing import Union from bip_utils import Bip39SeedGenerator from local_bip85.bip85 import BIP85 from monstr.encrypt import Keys from cryptography.hazmat.primitives.kdf.hkdf import HKDF from cryptography.hazmat.primitives import hashes from cryptography.hazmat.backends import default_backend # Instantiate the logger logger = logging.getLogger(__name__) def derive_key_from_password(password: str, iterations: int = 100_000) -> bytes: """ Derives a Fernet-compatible encryption key from the provided password using PBKDF2-HMAC-SHA256. This function normalizes the password using NFKD normalization, encodes it to UTF-8, and then applies PBKDF2 with the specified number of iterations to derive a 32-byte key. The derived key is then URL-safe base64-encoded to ensure compatibility with Fernet. Parameters: password (str): The user's password. iterations (int, optional): Number of iterations for the PBKDF2 algorithm. Defaults to 100,000. Returns: bytes: A URL-safe base64-encoded encryption key suitable for Fernet. Raises: ValueError: If the password is empty or too short. """ if not password: logger.error("Password cannot be empty.") raise ValueError("Password cannot be empty.") if len(password) < 8: logger.warning("Password length is less than recommended (8 characters).") # Normalize the password to NFKD form and encode to UTF-8 normalized_password = unicodedata.normalize('NFKD', password).strip() password_bytes = normalized_password.encode('utf-8') try: # Derive the key using PBKDF2-HMAC-SHA256 logger.debug("Starting key derivation from password.") key = hashlib.pbkdf2_hmac( hash_name='sha256', password=password_bytes, salt=b'', # No salt for deterministic key derivation iterations=iterations, dklen=32 # 256-bit key for Fernet ) logger.debug(f"Derived key (hex): {key.hex()}") # Encode the key in URL-safe base64 key_b64 = base64.urlsafe_b64encode(key) logger.debug(f"Base64-encoded key: {key_b64.decode()}") return key_b64 except Exception as e: logger.error(f"Error deriving key from password: {e}") logger.error(traceback.format_exc()) # Log full traceback raise def derive_key_from_parent_seed(parent_seed: str, fingerprint: str = None) -> bytes: """ Derives a 32-byte cryptographic key from a BIP-39 parent seed using HKDF. Optionally, include a fingerprint to differentiate key derivation per fingerprint. :param parent_seed: The 12-word BIP-39 seed phrase. :param fingerprint: An optional fingerprint to create unique keys per fingerprint. :return: A 32-byte derived key. """ try: # Generate seed bytes from mnemonic seed = Bip39SeedGenerator(parent_seed).Generate() # If a fingerprint is provided, use it to differentiate the derivation if fingerprint: # Convert fingerprint to a stable integer index index = int(hashlib.sha256(fingerprint.encode()).hexdigest(), 16) % (2**31) info = f'password-manager-{index}'.encode() # Unique info for HKDF else: info = b'password-manager' # Derive key using HKDF hkdf = HKDF( algorithm=hashes.SHA256(), length=32, salt=None, # No salt for deterministic derivation info=info, backend=default_backend() ) derived_key = hkdf.derive(seed) if len(derived_key) != 32: raise ValueError(f"Derived key length is {len(derived_key)} bytes; expected 32 bytes.") return derived_key except Exception as e: logger.error(f"Failed to derive key using HKDF: {e}") logger.error(traceback.format_exc()) raise class KeyManager: def __init__(self, parent_seed: str, fingerprint: str = None): self.parent_seed = parent_seed self.fingerprint = fingerprint self.bip85 = self.initialize_bip85() self.keys = self.generate_nostr_keys() def initialize_bip85(self): seed_bytes = Bip39SeedGenerator(self.parent_seed).Generate() bip85 = BIP85(seed_bytes) return bip85 def generate_nostr_keys(self) -> Keys: """ Derives a unique Nostr key pair for the given fingerprint using BIP-85. :return: An instance of Keys containing the Nostr key pair. """ # Use a derivation path that includes the fingerprint # Convert fingerprint to an integer index (e.g., using a hash function) index = int(hashlib.sha256(self.fingerprint.encode()).hexdigest(), 16) % (2**31) if self.fingerprint else 0 # Derive entropy for Nostr key (32 bytes) entropy_bytes = self.bip85.derive_entropy( app=BIP85.Applications.ENTROPY, index=index, size=32 ) # Generate Nostr key pair from entropy private_key_hex = entropy_bytes.hex() keys = Keys(priv_key=private_key_hex) return keys ``` ## utils/file_lock.py ```python # utils/file_lock.py """ File Lock Module This module provides a single context manager, `lock_file`, for acquiring and releasing locks on files using the `fcntl` library. It ensures that critical files are accessed safely, preventing race conditions and maintaining data integrity when multiple processes or threads attempt to read from or write to the same file concurrently. I need to change this to something that supports Windows in the future. Ensure that all dependencies are installed and properly configured in your environment. """ import os import fcntl import logging from contextlib import contextmanager from typing import Generator from pathlib import Path from termcolor import colored import sys import traceback # Instantiate the logger logger = logging.getLogger(__name__) @contextmanager def lock_file(file_path: Path, lock_type: int) -> Generator[None, None, None]: """ Context manager to acquire a lock on a file. Parameters: file_path (Path): The path to the file to lock. lock_type (int): The type of lock to acquire (`fcntl.LOCK_EX` for exclusive, `fcntl.LOCK_SH` for shared). Yields: None Raises: ValueError: If an invalid lock type is provided. SystemExit: Exits the program if the lock cannot be acquired. """ if lock_type not in (fcntl.LOCK_EX, fcntl.LOCK_SH): logging.error(f"Invalid lock type: {lock_type}. Use fcntl.LOCK_EX or fcntl.LOCK_SH.") print(colored("Error: Invalid lock type provided.", 'red')) sys.exit(1) file = None try: # Determine the mode based on whether the file exists mode = 'rb+' if file_path.exists() else 'wb' # Open the file file = open(file_path, mode) logging.debug(f"Opened file '{file_path}' in mode '{mode}' for locking.") # Acquire the lock fcntl.flock(file, lock_type) lock_type_str = "Exclusive" if lock_type == fcntl.LOCK_EX else "Shared" logging.debug(f"{lock_type_str} lock acquired on '{file_path}'.") yield # Control is transferred to the block inside the `with` statement except IOError as e: lock_type_str = "exclusive" if lock_type == fcntl.LOCK_EX else "shared" logging.error(f"Failed to acquire {lock_type_str} lock on '{file_path}': {e}") logging.error(traceback.format_exc()) # Log full traceback print(colored(f"Error: Failed to acquire {lock_type_str} lock on '{file_path}': {e}", 'red')) sys.exit(1) finally: if file: try: # Release the lock fcntl.flock(file, fcntl.LOCK_UN) logging.debug(f"Lock released on '{file_path}'.") except Exception as e: lock_type_str = "exclusive" if lock_type == fcntl.LOCK_EX else "shared" logging.warning(f"Failed to release {lock_type_str} lock on '{file_path}': {e}") logging.error(traceback.format_exc()) # Log full traceback print(colored(f"Warning: Failed to release {lock_type_str} lock on '{file_path}': {e}", 'yellow')) finally: # Close the file try: file.close() logging.debug(f"File '{file_path}' closed successfully.") except Exception as e: logging.warning(f"Failed to close file '{file_path}': {e}") logging.error(traceback.format_exc()) # Log full traceback print(colored(f"Warning: Failed to close file '{file_path}': {e}", 'yellow')) @contextmanager def exclusive_lock(file_path: Path) -> Generator[None, None, None]: """ Convenience context manager to acquire an exclusive lock on a file. Parameters: file_path (Path): The path to the file to lock. Yields: None """ with lock_file(file_path, fcntl.LOCK_EX): yield @contextmanager def shared_lock(file_path: Path) -> Generator[None, None, None]: """ Convenience context manager to acquire a shared lock on a file. Parameters: file_path (Path): The path to the file to lock. Yields: None """ with lock_file(file_path, fcntl.LOCK_SH): yield ``` ## utils/__init__.py ```python # utils/__init__.py import logging import traceback try: from .file_lock import lock_file from .key_derivation import derive_key_from_password, derive_key_from_parent_seed from .checksum import calculate_checksum, verify_checksum from .password_prompt import prompt_for_password logging.info("Modules imported successfully.") except Exception as e: logging.error(f"Failed to import one or more modules: {e}") logging.error(traceback.format_exc()) # Log full traceback __all__ = [ 'derive_key_from_password', 'derive_key_from_parent_seed', 'calculate_checksum', 'verify_checksum', 'lock_file', 'prompt_for_password' ] ``` ## utils/checksum.py ```python # utils/checksum.py """ Checksum Module This module provides functionalities to calculate and verify SHA-256 checksums for files. It ensures the integrity and authenticity of critical files within the application by comparing computed checksums against stored values. Ensure that all dependencies are installed and properly configured in your environment. """ import hashlib import logging import sys import os import traceback from typing import Optional from termcolor import colored from constants import ( APP_DIR, SCRIPT_CHECKSUM_FILE ) # Instantiate the logger logger = logging.getLogger(__name__) def calculate_checksum(file_path: str) -> Optional[str]: """ Calculates the SHA-256 checksum of the given file. Parameters: file_path (str): Path to the file. Returns: Optional[str]: Hexadecimal SHA-256 checksum if successful, None otherwise. """ hasher = hashlib.sha256() try: with open(file_path, 'rb') as f: for chunk in iter(lambda: f.read(4096), b""): hasher.update(chunk) checksum = hasher.hexdigest() logging.debug(f"Calculated checksum for '{file_path}': {checksum}") return checksum except FileNotFoundError: logging.error(f"File '{file_path}' not found for checksum calculation.") print(colored(f"Error: File '{file_path}' not found for checksum calculation.", 'red')) return None except Exception as e: logging.error(f"Error calculating checksum for '{file_path}': {e}") logging.error(traceback.format_exc()) # Log full traceback print(colored(f"Error: Failed to calculate checksum for '{file_path}': {e}", 'red')) return None def verify_checksum(current_checksum: str, checksum_file_path: str) -> bool: """ Verifies the current checksum against the stored checksum. Parameters: current_checksum (str): The newly calculated checksum. checksum_file_path (str): The checksum file to verify against. Returns: bool: True if checksums match, False otherwise. """ try: with open(checksum_file_path, 'r') as f: stored_checksum = f.read().strip() if current_checksum == stored_checksum: logging.debug(f"Checksum verification passed for '{checksum_file_path}'.") return True else: logging.warning(f"Checksum mismatch for '{checksum_file_path}'.") return False except FileNotFoundError: logging.error(f"Checksum file '{checksum_file_path}' not found.") print(colored(f"Error: Checksum file '{checksum_file_path}' not found.", 'red')) return False except Exception as e: logging.error(f"Error reading checksum file '{checksum_file_path}': {e}") logging.error(traceback.format_exc()) # Log full traceback print(colored(f"Error: Failed to read checksum file '{checksum_file_path}': {e}", 'red')) return False def update_checksum(content: str, checksum_file_path: str) -> bool: """ Updates the stored checksum file with the provided content's checksum. Parameters: content (str): The content to calculate the checksum for. checksum_file_path (str): The path to the checksum file to update. Returns: bool: True if the checksum was successfully updated, False otherwise. """ try: hasher = hashlib.sha256() hasher.update(content.encode('utf-8')) new_checksum = hasher.hexdigest() with open(checksum_file_path, 'w') as f: f.write(new_checksum) logging.debug(f"Updated checksum for '{checksum_file_path}' to: {new_checksum}") return True except Exception as e: logging.error(f"Failed to update checksum for '{checksum_file_path}': {e}") logging.error(traceback.format_exc()) # Log full traceback print(colored(f"Error: Failed to update checksum for '{checksum_file_path}': {e}", 'red')) return False def verify_and_update_checksum(file_path: str, checksum_file_path: str) -> bool: """ Verifies the checksum of a file against its stored checksum and updates it if necessary. Parameters: file_path (str): Path to the file to verify. checksum_file_path (str): Path to the checksum file. Returns: bool: True if verification is successful, False otherwise. """ current_checksum = calculate_checksum(file_path) if current_checksum is None: return False if verify_checksum(current_checksum, checksum_file_path): print(colored(f"Checksum verification passed for '{file_path}'.", 'green')) logging.info(f"Checksum verification passed for '{file_path}'.") return True else: print(colored(f"Checksum verification failed for '{file_path}'.", 'red')) logging.warning(f"Checksum verification failed for '{file_path}'.") return False def initialize_checksum(file_path: str, checksum_file_path: str) -> bool: """ Initializes the checksum file by calculating the checksum of the given file. Parameters: file_path (str): Path to the file to calculate checksum for. checksum_file_path (str): Path to the checksum file to create. Returns: bool: True if initialization is successful, False otherwise. """ checksum = calculate_checksum(file_path) if checksum is None: return False try: with open(checksum_file_path, 'w') as f: f.write(checksum) logging.debug(f"Initialized checksum file '{checksum_file_path}' with checksum: {checksum}") print(colored(f"Initialized checksum for '{file_path}'.", 'green')) return True except Exception as e: logging.error(f"Failed to initialize checksum file '{checksum_file_path}': {e}") logging.error(traceback.format_exc()) # Log full traceback print(colored(f"Error: Failed to initialize checksum file '{checksum_file_path}': {e}", 'red')) return False ``` ## utils/fingerprint_manager.py ```python # utils/fingerprint_manager.py import os import json import logging import traceback from pathlib import Path from typing import List, Optional import shutil # Ensure shutil is imported if used within the class from utils.fingerprint import generate_fingerprint # Instantiate the logger logger = logging.getLogger(__name__) class FingerprintManager: """ FingerprintManager Class Handles operations related to fingerprints, including generation, storage, listing, selection, and removal. Ensures that each seed is uniquely identified by its fingerprint and manages the corresponding directory structure. """ def __init__(self, app_dir: Path): """ Initializes the FingerprintManager. Parameters: app_dir (Path): The root application directory (e.g., ~/.seedpass). """ self.app_dir = app_dir self.fingerprints_file = self.app_dir / 'fingerprints.json' self._ensure_app_directory() self.fingerprints = self._load_fingerprints() self.current_fingerprint: Optional[str] = None def get_current_fingerprint_dir(self) -> Optional[Path]: """ Retrieves the directory path for the current fingerprint. Returns: Optional[Path]: The Path object of the current fingerprint directory or None. """ if hasattr(self, 'current_fingerprint') and self.current_fingerprint: return self.get_fingerprint_directory(self.current_fingerprint) else: logger.error("No current fingerprint is set.") return None def _ensure_app_directory(self): """ Ensures that the application directory exists. """ try: self.app_dir.mkdir(parents=True, exist_ok=True) logger.debug(f"Application directory ensured at {self.app_dir}") except Exception as e: logger.error(f"Failed to create application directory at {self.app_dir}: {e}") logger.error(traceback.format_exc()) raise def _load_fingerprints(self) -> List[str]: """ Loads the list of fingerprints from the fingerprints.json file. Returns: List[str]: A list of fingerprint strings. """ try: if self.fingerprints_file.exists(): with open(self.fingerprints_file, 'r') as f: data = json.load(f) fingerprints = data.get('fingerprints', []) logger.debug(f"Loaded fingerprints: {fingerprints}") return fingerprints else: logger.debug("fingerprints.json not found. Initializing empty fingerprint list.") return [] except Exception as e: logger.error(f"Failed to load fingerprints: {e}") logger.error(traceback.format_exc()) return [] def _save_fingerprints(self): """ Saves the current list of fingerprints to the fingerprints.json file. """ try: with open(self.fingerprints_file, 'w') as f: json.dump({'fingerprints': self.fingerprints}, f, indent=4) logger.debug(f"Fingerprints saved: {self.fingerprints}") except Exception as e: logger.error(f"Failed to save fingerprints: {e}") logger.error(traceback.format_exc()) raise def add_fingerprint(self, seed_phrase: str) -> Optional[str]: """ Generates a fingerprint from the seed phrase and adds it to the list. Parameters: seed_phrase (str): The BIP-39 seed phrase. Returns: Optional[str]: The generated fingerprint or None if failed. """ fingerprint = generate_fingerprint(seed_phrase) if fingerprint and fingerprint not in self.fingerprints: self.fingerprints.append(fingerprint) self._save_fingerprints() logger.info(f"Fingerprint {fingerprint} added successfully.") # Create fingerprint directory fingerprint_dir = self.app_dir / fingerprint fingerprint_dir.mkdir(parents=True, exist_ok=True) logger.debug(f"Fingerprint directory created at {fingerprint_dir}") return fingerprint elif fingerprint in self.fingerprints: logger.warning(f"Fingerprint {fingerprint} already exists.") return fingerprint else: logger.error("Fingerprint generation failed.") return None def remove_fingerprint(self, fingerprint: str) -> bool: """ Removes a fingerprint and its associated directory. Parameters: fingerprint (str): The fingerprint to remove. Returns: bool: True if removed successfully, False otherwise. """ if fingerprint in self.fingerprints: try: self.fingerprints.remove(fingerprint) self._save_fingerprints() # Remove fingerprint directory fingerprint_dir = self.app_dir / fingerprint if fingerprint_dir.exists() and fingerprint_dir.is_dir(): for child in fingerprint_dir.glob('*'): if child.is_file(): child.unlink() elif child.is_dir(): shutil.rmtree(child) fingerprint_dir.rmdir() logger.info(f"Fingerprint {fingerprint} removed successfully.") return True except Exception as e: logger.error(f"Failed to remove fingerprint {fingerprint}: {e}") logger.error(traceback.format_exc()) return False else: logger.warning(f"Fingerprint {fingerprint} does not exist.") return False def list_fingerprints(self) -> List[str]: """ Lists all available fingerprints. Returns: List[str]: A list of fingerprint strings. """ logger.debug(f"Listing fingerprints: {self.fingerprints}") return self.fingerprints def select_fingerprint(self, fingerprint: str) -> bool: """ Selects a fingerprint for the current session. Parameters: fingerprint (str): The fingerprint to select. Returns: bool: True if selection is successful, False otherwise. """ if fingerprint in self.fingerprints: self.current_fingerprint = fingerprint logger.info(f"Fingerprint {fingerprint} selected.") return True else: logger.error(f"Fingerprint {fingerprint} not found.") return False def get_fingerprint_directory(self, fingerprint: str) -> Optional[Path]: """ Retrieves the directory path for a given fingerprint. Parameters: fingerprint (str): The fingerprint. Returns: Optional[Path]: The Path object of the fingerprint directory or None. """ fingerprint_dir = self.app_dir / fingerprint if fingerprint_dir.exists() and fingerprint_dir.is_dir(): return fingerprint_dir else: logger.error(f"Directory for fingerprint {fingerprint} does not exist.") return None ``` ## utils/password_prompt.py ```python # utils/password_prompt.py """ Password Prompt Module This module provides functions to securely prompt users for passwords, ensuring that passwords are entered and confirmed correctly. It handles both the creation of new passwords and the input of existing passwords for decryption purposes. By centralizing password prompting logic, this module enhances code reuse, security, and maintainability across the application. Ensure that all dependencies are installed and properly configured in your environment. """ import os import getpass import logging import sys import unicodedata import traceback from termcolor import colored from colorama import init as colorama_init from constants import MIN_PASSWORD_LENGTH # Initialize colorama for colored terminal text colorama_init() # Instantiate the logger logger = logging.getLogger(__name__) def prompt_new_password() -> str: """ Prompts the user to enter and confirm a new password for encrypting the parent seed. This function ensures that the password meets the minimum length requirement and that the password and confirmation match. It provides user-friendly messages and handles retries. Returns: str: The confirmed password entered by the user. Raises: SystemExit: If the user fails to provide a valid password after multiple attempts. """ max_retries = 5 attempts = 0 while attempts < max_retries: try: password = getpass.getpass(prompt="Enter a new password: ").strip() confirm_password = getpass.getpass(prompt="Confirm your password: ").strip() if not password: print(colored("Error: Password cannot be empty. Please try again.", 'red')) logging.warning("User attempted to enter an empty password.") attempts += 1 continue if len(password) < MIN_PASSWORD_LENGTH: print(colored(f"Error: Password must be at least {MIN_PASSWORD_LENGTH} characters long.", 'red')) logging.warning(f"User entered a password shorter than {MIN_PASSWORD_LENGTH} characters.") attempts += 1 continue if password != confirm_password: print(colored("Error: Passwords do not match. Please try again.", 'red')) logging.warning("User entered mismatching passwords.") attempts += 1 continue # Normalize the password to NFKD form normalized_password = unicodedata.normalize('NFKD', password) logging.debug("User entered a valid and confirmed password.") return normalized_password except KeyboardInterrupt: print(colored("\nOperation cancelled by user.", 'yellow')) logging.info("Password prompt interrupted by user.") sys.exit(0) except Exception as e: logging.error(f"Unexpected error during password prompt: {e}") logging.error(traceback.format_exc()) # Log full traceback print(colored(f"Error: {e}", 'red')) attempts += 1 print(colored("Maximum password attempts exceeded. Exiting.", 'red')) logging.error("User failed to provide a valid password after multiple attempts.") sys.exit(1) def prompt_existing_password(prompt_message: str = "Enter your password: ") -> str: """ Prompts the user to enter an existing password, typically used for decryption purposes. This function ensures that the password is entered securely without echoing it to the terminal. Parameters: prompt_message (str): The message displayed to prompt the user. Defaults to "Enter your password: ". Returns: str: The password entered by the user. Raises: SystemExit: If the user interrupts the operation. """ try: password = getpass.getpass(prompt=prompt_message).strip() if not password: print(colored("Error: Password cannot be empty.", 'red')) logging.warning("User attempted to enter an empty password.") sys.exit(1) # Normalize the password to NFKD form normalized_password = unicodedata.normalize('NFKD', password) logging.debug("User entered an existing password for decryption.") return normalized_password except KeyboardInterrupt: print(colored("\nOperation cancelled by user.", 'yellow')) logging.info("Existing password prompt interrupted by user.") sys.exit(0) except Exception as e: logging.error(f"Unexpected error during existing password prompt: {e}") logging.error(traceback.format_exc()) # Log full traceback print(colored(f"Error: {e}", 'red')) sys.exit(1) def confirm_action(prompt_message: str = "Are you sure you want to proceed? (Y/N): ") -> bool: """ Prompts the user to confirm an action, typically used before performing critical operations. Parameters: prompt_message (str): The confirmation message displayed to the user. Defaults to "Are you sure you want to proceed? (Y/N): ". Returns: bool: True if the user confirms the action, False otherwise. Raises: SystemExit: If the user interrupts the operation. """ try: while True: response = input(colored(prompt_message, 'cyan')).strip().lower() if response in ['y', 'yes']: logging.debug("User confirmed the action.") return True elif response in ['n', 'no']: logging.debug("User declined the action.") return False else: print(colored("Please respond with 'Y' or 'N'.", 'yellow')) except KeyboardInterrupt: print(colored("\nOperation cancelled by user.", 'yellow')) logging.info("Action confirmation interrupted by user.") sys.exit(0) except Exception as e: logging.error(f"Unexpected error during action confirmation: {e}") logging.error(traceback.format_exc()) # Log full traceback print(colored(f"Error: {e}", 'red')) sys.exit(1) def prompt_for_password() -> str: """ Prompts the user to enter a new password by invoking the prompt_new_password function. This function serves as an alias to maintain consistency with import statements in other modules. Returns: str: The confirmed password entered by the user. """ return prompt_new_password() ```