diff --git a/src/constants.py b/src/constants.py index b8655f1..c38488a 100644 --- a/src/constants.py +++ b/src/constants.py @@ -6,42 +6,8 @@ import sys from pathlib import Path import traceback -def configure_logging(): - """ - Configures logging with both file and console handlers. - Only ERROR and higher-level messages are shown in the terminal, while all messages - are logged in the log file. - """ - # Create a custom logger - logger = logging.getLogger() - logger.setLevel(logging.DEBUG) # Set to DEBUG for detailed output - - # Create the 'logs' folder if it doesn't exist - if not os.path.exists('logs'): - os.makedirs('logs') - - # Create handlers - c_handler = logging.StreamHandler(sys.stdout) - f_handler = logging.FileHandler(os.path.join('logs', 'constants.log')) - - # Set levels: only errors and critical messages will be shown in the console - c_handler.setLevel(logging.ERROR) # Console will show ERROR and above - f_handler.setLevel(logging.DEBUG) # File will log everything from DEBUG and above - - # Create formatters and add them to handlers, include file and line number in log messages - c_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]') - f_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]') - - c_handler.setFormatter(c_format) - f_handler.setFormatter(f_format) - - # Add handlers to the logger if they are not already added - if not logger.handlers: - logger.addHandler(c_handler) - logger.addHandler(f_handler) - -# Configure logging at the start of the module -configure_logging() +# Instantiate the logger +logger = logging.getLogger(__name__) # ----------------------------------- # Nostr Relay Connection Settings @@ -61,9 +27,7 @@ except Exception as e: logging.error(traceback.format_exc()) # Log full traceback try: - INDEX_FILE = APP_DIR / 'seedpass_passwords_db.json' # Encrypted password database PARENT_SEED_FILE = APP_DIR / 'parent_seed.enc' # Encrypted parent seed - logging.info(f"Index file path set to {INDEX_FILE}") logging.info(f"Parent seed file path set to {PARENT_SEED_FILE}") except Exception as e: logging.error(f"Error setting file paths: {e}") @@ -74,8 +38,7 @@ except Exception as e: # ----------------------------------- try: SCRIPT_CHECKSUM_FILE = APP_DIR / 'seedpass_script_checksum.txt' # Checksum for main script - DATA_CHECKSUM_FILE = APP_DIR / 'seedpass_passwords_checksum.txt' # Checksum for password data - logging.info(f"Checksum file paths set: Script {SCRIPT_CHECKSUM_FILE}, Data {DATA_CHECKSUM_FILE}") + logging.info(f"Checksum file path set: Script {SCRIPT_CHECKSUM_FILE}") except Exception as e: logging.error(f"Error setting checksum file paths: {e}") logging.error(traceback.format_exc()) # Log full traceback @@ -91,5 +54,4 @@ MAX_PASSWORD_LENGTH = 128 # Maximum allowed password length # Additional Constants (if any) # ----------------------------------- # Add any other constants here as your project expands -HASHED_PASSWORD_FILE = APP_DIR / 'hashed_password.enc' DEFAULT_SEED_BACKUP_FILENAME = 'parent_seed_backup.enc' diff --git a/src/bip85/__init__.py b/src/local_bip85/__init__.py similarity index 100% rename from src/bip85/__init__.py rename to src/local_bip85/__init__.py diff --git a/src/bip85/bip85.py b/src/local_bip85/bip85.py similarity index 54% rename from src/bip85/bip85.py rename to src/local_bip85/bip85.py index edab3c7..50f15f0 100644 --- a/src/bip85/bip85.py +++ b/src/local_bip85/bip85.py @@ -31,60 +31,11 @@ from cryptography.hazmat.primitives.kdf.hkdf import HKDF from cryptography.hazmat.primitives import hashes from cryptography.hazmat.backends import default_backend -# Configure logging at the start of the module -def configure_logging(): - """ - Configures logging with both file and console handlers. - Only ERROR and higher-level messages are shown in the terminal, while all messages - are logged in the log file. - """ - # Create the 'logs' folder if it doesn't exist - if not os.path.exists('logs'): - os.makedirs('logs') - - # Create a custom logger - logger = logging.getLogger() - logger.setLevel(logging.DEBUG) # Set to DEBUG for detailed output - - # Create handlers - c_handler = logging.StreamHandler(sys.stdout) - f_handler = logging.FileHandler(os.path.join('logs', 'bip85.log')) # Log files will be in 'logs' folder - - # Set levels: only errors and critical messages will be shown in the console - c_handler.setLevel(logging.ERROR) # Terminal will show ERROR and above - f_handler.setLevel(logging.DEBUG) # File will log everything from DEBUG and above - - # Create formatters and add them to handlers, include file and line number in log messages - c_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]') - f_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]') - - c_handler.setFormatter(c_format) - f_handler.setFormatter(f_format) - - # Add handlers to the logger - logger.addHandler(c_handler) - logger.addHandler(f_handler) - -# Call the logging configuration function -configure_logging() +# Instantiate the logger +logger = logging.getLogger(__name__) class BIP85: - """ - BIP85 Class - - Implements BIP-85 functionality for deterministic entropy and mnemonic derivation. - """ - def __init__(self, seed_bytes: bytes): - """ - Initializes the BIP85 class with seed bytes. - - Parameters: - seed_bytes (bytes): The BIP39 seed bytes derived from the seed phrase. - - Raises: - SystemExit: If initialization fails. - """ try: self.bip32_ctx = Bip32Slip10Secp256k1.FromSeed(seed_bytes) logging.debug("BIP32 context initialized successfully.") @@ -94,15 +45,14 @@ class BIP85: print(f"{Fore.RED}Error initializing BIP32 context: {e}") sys.exit(1) - def derive_entropy(self, app_no: int, language_code: int, words_num: int, index: int) -> bytes: + def derive_entropy(self, index: int, bytes_len: int, app_no: int = 39) -> bytes: """ Derives entropy using BIP-85 HMAC-SHA512 method. Parameters: - app_no (int): Application number (e.g., 39 for BIP39). - language_code (int): Language code (e.g., 0 for English). - words_num (int): Number of words in the mnemonic (e.g., 12). - index (int): Index for the child mnemonic. + index (int): Index for the child entropy. + bytes_len (int): Number of bytes to derive for the entropy. + app_no (int): Application number (default 39 for BIP39) Returns: bytes: Derived entropy. @@ -110,7 +60,14 @@ class BIP85: Raises: SystemExit: If derivation fails or entropy length is invalid. """ - path = f"m/83696968'/{app_no}'/{language_code}'/{words_num}'/{index}'" + if app_no == 39: + path = f"m/83696968'/{app_no}'/0'/{bytes_len}'/{index}'" + elif app_no == 32: + path = f"m/83696968'/{app_no}'/{index}'" + else: + # Handle other app_no if necessary + path = f"m/83696968'/{app_no}'/{index}'" + try: child_key = self.bip32_ctx.DerivePath(path) k = child_key.PrivateKey().Raw().ToBytes() @@ -120,20 +77,11 @@ class BIP85: hmac_result = hmac.new(hmac_key, k, hashlib.sha512).digest() logging.debug(f"HMAC-SHA512 result: {hmac_result.hex()}") - if words_num == 12: - entropy = hmac_result[:16] # 128 bits for 12-word mnemonic - elif words_num == 18: - entropy = hmac_result[:24] # 192 bits for 18-word mnemonic - elif words_num == 24: - entropy = hmac_result[:32] # 256 bits for 24-word mnemonic - else: - logging.error(f"Unsupported number of words: {words_num}") - print(f"{Fore.RED}Error: Unsupported number of words: {words_num}") - sys.exit(1) + entropy = hmac_result[:bytes_len] - if len(entropy) not in [16, 24, 32]: - logging.error(f"Derived entropy length is {len(entropy)} bytes; expected 16, 24, or 32 bytes.") - print(f"{Fore.RED}Error: Derived entropy length is {len(entropy)} bytes; expected 16, 24, or 32 bytes.") + if len(entropy) != bytes_len: + logging.error(f"Derived entropy length is {len(entropy)} bytes; expected {bytes_len} bytes.") + print(f"{Fore.RED}Error: Derived entropy length is {len(entropy)} bytes; expected {bytes_len} bytes.") sys.exit(1) logging.debug(f"Derived entropy: {entropy.hex()}") @@ -144,23 +92,14 @@ class BIP85: print(f"{Fore.RED}Error deriving entropy: {e}") sys.exit(1) - def derive_mnemonic(self, app_no: int, language_code: int, words_num: int, index: int) -> str: - """ - Derives a BIP-39 mnemonic using BIP-85 specification. + def derive_mnemonic(self, index: int, words_num: int) -> str: + bytes_len = {12: 16, 18: 24, 24: 32}.get(words_num) + if not bytes_len: + logging.error(f"Unsupported number of words: {words_num}") + print(f"{Fore.RED}Error: Unsupported number of words: {words_num}") + sys.exit(1) - Parameters: - app_no (int): Application number (e.g., 39 for BIP39). - language_code (int): Language code (e.g., 0 for English). - words_num (int): Number of words in the mnemonic (e.g., 12). - index (int): Index for the child mnemonic. - - Returns: - str: Derived BIP-39 mnemonic. - - Raises: - SystemExit: If mnemonic generation fails. - """ - entropy = self.derive_entropy(app_no, language_code, words_num, index) + entropy = self.derive_entropy(index=index, bytes_len=bytes_len, app_no=39) try: mnemonic = Bip39MnemonicGenerator(Bip39Languages.ENGLISH).FromEntropy(entropy) logging.debug(f"Derived mnemonic: {mnemonic}") diff --git a/src/main.py b/src/main.py index c1f4136..457a597 100644 --- a/src/main.py +++ b/src/main.py @@ -1,5 +1,4 @@ # main.py - import os import sys import logging @@ -14,39 +13,214 @@ from nostr.client import NostrClient colorama_init() def configure_logging(): - """ - Configures logging with both file and console handlers. - Logs errors in the terminal and all messages in the log file. - """ logger = logging.getLogger() - logger.setLevel(logging.DEBUG) + logger.setLevel(logging.DEBUG) # Keep this as DEBUG to capture all logs - if not logger.handlers: - # Create handlers - c_handler = logging.StreamHandler(sys.stdout) - f_handler = logging.FileHandler(os.path.join('logs', 'main.log')) + # Remove all handlers associated with the root logger object + for handler in logger.handlers[:]: + logger.removeHandler(handler) - # Set levels - c_handler.setLevel(logging.ERROR) - f_handler.setLevel(logging.DEBUG) + # Ensure the 'logs' directory exists + log_directory = 'logs' + if not os.path.exists(log_directory): + os.makedirs(log_directory) - # Create formatters - formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]') - c_handler.setFormatter(formatter) - f_handler.setFormatter(formatter) + # Create handlers + c_handler = logging.StreamHandler(sys.stdout) + f_handler = logging.FileHandler(os.path.join(log_directory, 'main.log')) - # Add handlers - logger.addHandler(c_handler) - logger.addHandler(f_handler) + # Set levels: only errors and critical messages will be shown in the console + c_handler.setLevel(logging.ERROR) + f_handler.setLevel(logging.DEBUG) - return logger + # Create formatters and add them to handlers + formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]') + c_handler.setFormatter(formatter) + f_handler.setFormatter(formatter) -def display_menu(password_manager: PasswordManager, nostr_client: NostrClient): + # Add handlers to the logger + logger.addHandler(c_handler) + logger.addHandler(f_handler) + + # Set logging level for third-party libraries to WARNING to suppress their debug logs + logging.getLogger('monstr').setLevel(logging.WARNING) + logging.getLogger('nostr').setLevel(logging.WARNING) + +def confirm_action(prompt: str) -> bool: """ - Displays the interactive menu and handles user input to perform various actions. + Prompts the user for confirmation. + + :param prompt: The confirmation message to display. + :return: True if user confirms, False otherwise. + """ + while True: + choice = input(colored(prompt, 'yellow')).strip().lower() + if choice in ['y', 'yes']: + return True + elif choice in ['n', 'no']: + return False + else: + print(colored("Please enter 'Y' or 'N'.", 'red')) + +def handle_switch_fingerprint(password_manager: PasswordManager): + """ + Handles switching the active fingerprint. :param password_manager: An instance of PasswordManager. - :param nostr_client: An instance of NostrClient. + """ + try: + fingerprints = password_manager.fingerprint_manager.list_fingerprints() + if not fingerprints: + print(colored("No fingerprints available to switch. Please add a new fingerprint first.", 'yellow')) + return + + print(colored("Available Fingerprints:", 'cyan')) + for idx, fp in enumerate(fingerprints, start=1): + print(colored(f"{idx}. {fp}", 'cyan')) + + choice = input("Select a fingerprint by number to switch: ").strip() + if not choice.isdigit() or not (1 <= int(choice) <= len(fingerprints)): + print(colored("Invalid selection.", 'red')) + return + + selected_fingerprint = fingerprints[int(choice)-1] + if password_manager.select_fingerprint(selected_fingerprint): + print(colored(f"Switched to fingerprint {selected_fingerprint}.", 'green')) + else: + print(colored("Failed to switch fingerprint.", 'red')) + except Exception as e: + logging.error(f"Error during fingerprint switch: {e}") + logging.error(traceback.format_exc()) + print(colored(f"Error: Failed to switch fingerprint: {e}", 'red')) + +def handle_add_new_fingerprint(password_manager: PasswordManager): + """ + Handles adding a new fingerprint. + + :param password_manager: An instance of PasswordManager. + """ + try: + password_manager.add_new_fingerprint() + except Exception as e: + logging.error(f"Error adding new fingerprint: {e}") + logging.error(traceback.format_exc()) + print(colored(f"Error: Failed to add new fingerprint: {e}", 'red')) + +def handle_remove_fingerprint(password_manager: PasswordManager): + """ + Handles removing an existing fingerprint. + + :param password_manager: An instance of PasswordManager. + """ + try: + fingerprints = password_manager.fingerprint_manager.list_fingerprints() + if not fingerprints: + print(colored("No fingerprints available to remove.", 'yellow')) + return + + print(colored("Available Fingerprints:", 'cyan')) + for idx, fp in enumerate(fingerprints, start=1): + print(colored(f"{idx}. {fp}", 'cyan')) + + choice = input("Select a fingerprint by number to remove: ").strip() + if not choice.isdigit() or not (1 <= int(choice) <= len(fingerprints)): + print(colored("Invalid selection.", 'red')) + return + + selected_fingerprint = fingerprints[int(choice)-1] + confirm = confirm_action(f"Are you sure you want to remove fingerprint {selected_fingerprint}? This will delete all associated data. (Y/N): ") + if confirm: + if password_manager.fingerprint_manager.remove_fingerprint(selected_fingerprint): + print(colored(f"Fingerprint {selected_fingerprint} removed successfully.", 'green')) + else: + print(colored("Failed to remove fingerprint.", 'red')) + else: + print(colored("Fingerprint removal cancelled.", 'yellow')) + except Exception as e: + logging.error(f"Error removing fingerprint: {e}") + logging.error(traceback.format_exc()) + print(colored(f"Error: Failed to remove fingerprint: {e}", 'red')) + +def handle_list_fingerprints(password_manager: PasswordManager): + """ + Handles listing all available fingerprints. + + :param password_manager: An instance of PasswordManager. + """ + try: + fingerprints = password_manager.fingerprint_manager.list_fingerprints() + if not fingerprints: + print(colored("No fingerprints available.", 'yellow')) + return + + print(colored("Available Fingerprints:", 'cyan')) + for fp in fingerprints: + print(colored(f"- {fp}", 'cyan')) + except Exception as e: + logging.error(f"Error listing fingerprints: {e}") + logging.error(traceback.format_exc()) + print(colored(f"Error: Failed to list fingerprints: {e}", 'red')) + +def handle_display_npub(password_manager: PasswordManager): + """ + Handles displaying the Nostr public key (npub) to the user. + """ + try: + npub = password_manager.nostr_client.key_manager.get_npub() + if npub: + print(colored(f"\nYour Nostr Public Key (npub):\n{npub}\n", 'cyan')) + logging.info("Displayed npub to the user.") + else: + print(colored("Nostr public key not available.", 'red')) + logging.error("Nostr public key not available.") + except Exception as e: + logging.error(f"Failed to display npub: {e}") + logging.error(traceback.format_exc()) + print(colored(f"Error: Failed to display npub: {e}", 'red')) + +def handle_post_to_nostr(password_manager: PasswordManager): + """ + Handles the action of posting the encrypted password index to Nostr. + """ + try: + # Get the encrypted data from the index file + encrypted_data = password_manager.get_encrypted_data() + if encrypted_data: + # Post to Nostr + password_manager.nostr_client.publish_json_to_nostr(encrypted_data) + print(colored("Encrypted index posted to Nostr successfully.", 'green')) + logging.info("Encrypted index posted to Nostr successfully.") + else: + print(colored("No data available to post.", 'yellow')) + logging.warning("No data available to post to Nostr.") + except Exception as e: + logging.error(f"Failed to post to Nostr: {e}") + logging.error(traceback.format_exc()) + print(colored(f"Error: Failed to post to Nostr: {e}", 'red')) + +def handle_retrieve_from_nostr(password_manager: PasswordManager): + """ + Handles the action of retrieving the encrypted password index from Nostr. + """ + try: + # Use the Nostr client from the password_manager + encrypted_data = password_manager.nostr_client.retrieve_json_from_nostr_sync() + if encrypted_data: + # Decrypt and save the index + password_manager.encryption_manager.decrypt_and_save_index_from_nostr(encrypted_data) + print(colored("Encrypted index retrieved and saved successfully.", 'green')) + logging.info("Encrypted index retrieved and saved successfully from Nostr.") + else: + print(colored("Failed to retrieve data from Nostr.", 'red')) + logging.error("Failed to retrieve data from Nostr.") + except Exception as e: + logging.error(f"Failed to retrieve from Nostr: {e}") + logging.error(traceback.format_exc()) + print(colored(f"Error: Failed to retrieve from Nostr: {e}", 'red')) + +def display_menu(password_manager: PasswordManager): + """ + Displays the interactive menu and handles user input to perform various actions. """ menu = """ Select an option: @@ -58,11 +232,21 @@ def display_menu(password_manager: PasswordManager, nostr_client: NostrClient): 6. Retrieve Encrypted Index from Nostr 7. Display Nostr Public Key (npub) 8. Backup/Reveal Parent Seed - 9. Exit + 9. Switch Fingerprint + 10. Add a New Fingerprint + 11. Remove an Existing Fingerprint + 12. List All Fingerprints + 13. Exit """ while True: + # Flush logging handlers + for handler in logging.getLogger().handlers: + handler.flush() print(colored(menu, 'cyan')) - choice = input('Enter your choice (1-9): ').strip() # Updated to include option 9 + choice = input('Enter your choice (1-13): ').strip() + if not choice: + print(colored("No input detected. Please enter a number between 1 and 13.", 'yellow')) + continue # Re-display the menu without marking as invalid if choice == '1': password_manager.handle_generate_password() elif choice == '2': @@ -72,123 +256,46 @@ def display_menu(password_manager: PasswordManager, nostr_client: NostrClient): elif choice == '4': password_manager.handle_verify_checksum() elif choice == '5': - handle_post_to_nostr(password_manager, nostr_client) + handle_post_to_nostr(password_manager) elif choice == '6': - handle_retrieve_from_nostr(password_manager, nostr_client) + handle_retrieve_from_nostr(password_manager) elif choice == '7': - handle_display_npub(nostr_client) + handle_display_npub(password_manager) elif choice == '8': - password_manager.handle_backup_reveal_parent_seed() # Corrected variable name + password_manager.handle_backup_reveal_parent_seed() elif choice == '9': + if not password_manager.handle_switch_fingerprint(): + print(colored("Failed to switch fingerprint.", 'red')) + elif choice == '10': + handle_add_new_fingerprint(password_manager) + elif choice == '11': + handle_remove_fingerprint(password_manager) + elif choice == '12': + handle_list_fingerprints(password_manager) + elif choice == '13': logging.info("Exiting the program.") print(colored("Exiting the program.", 'green')) - nostr_client.close_client_pool() # Gracefully close the ClientPool + password_manager.nostr_client.close_client_pool() sys.exit(0) else: print(colored("Invalid choice. Please select a valid option.", 'red')) -def handle_display_npub(nostr_client: NostrClient): - """ - Handles displaying the Nostr public key (npub) to the user. - - :param nostr_client: An instance of NostrClient. - """ - try: - npub = nostr_client.key_manager.get_npub() - if npub: - print(colored(f"\nYour Nostr Public Key (npub):\n{npub}\n", 'cyan')) - logging.info("Displayed npub to the user.") - else: - print(colored("Nostr public key not available.", 'red')) - logging.error("Nostr public key not available.") - except Exception as e: - logging.error(f"Failed to display npub: {e}") - print(f"Error: Failed to display npub: {e}", 'red') - -def handle_post_to_nostr(password_manager: PasswordManager, nostr_client: NostrClient): - """ - Handles the action of posting the encrypted password index to Nostr. - - :param password_manager: An instance of PasswordManager. - :param nostr_client: An instance of NostrClient. - """ - try: - # Get the encrypted data from the index file - encrypted_data = password_manager.get_encrypted_data() - if encrypted_data: - # Post to Nostr - nostr_client.publish_json_to_nostr(encrypted_data) - print(colored("Encrypted index posted to Nostr successfully.", 'green')) - logging.info("Encrypted index posted to Nostr successfully.") - else: - print(colored("No data available to post.", 'yellow')) - logging.warning("No data available to post to Nostr.") - except Exception as e: - logging.error(f"Failed to post to Nostr: {e}") - logging.error(traceback.format_exc()) - print(f"Error: Failed to post to Nostr: {e}", 'red') - -def handle_retrieve_from_nostr(password_manager: PasswordManager, nostr_client: NostrClient): - """ - Handles the action of retrieving the encrypted password index from Nostr. - - :param password_manager: An instance of PasswordManager. - :param nostr_client: An instance of NostrClient. - """ - try: - # Retrieve from Nostr - encrypted_data = nostr_client.retrieve_json_from_nostr_sync() - if encrypted_data: - # Decrypt and save the index - password_manager.decrypt_and_save_index_from_nostr(encrypted_data) - print(colored("Encrypted index retrieved and saved successfully.", 'green')) - logging.info("Encrypted index retrieved and saved successfully from Nostr.") - else: - print(colored("Failed to retrieve data from Nostr.", 'red')) - logging.error("Failed to retrieve data from Nostr.") - except Exception as e: - logging.error(f"Failed to retrieve from Nostr: {e}") - logging.error(traceback.format_exc()) - print(f"Error: Failed to retrieve from Nostr: {e}", 'red') - -def cleanup(nostr_client: NostrClient): - """ - Cleanup function to gracefully close the NostrClient's event loop. - This function is registered to run upon program termination. - """ - try: - nostr_client.close_client_pool() - except Exception as e: - logging.error(f"Cleanup failed: {e}") - print(f"Error during cleanup: {e}", 'red') - if __name__ == '__main__': - """ - The main entry point of the application. - """ # Configure logging with both file and console handlers configure_logging() - - # Initialize PasswordManager + logger = logging.getLogger(__name__) + logger.info("Starting SeedPass Password Manager") + + # Initialize PasswordManager and proceed with application logic try: password_manager = PasswordManager() - logging.info("PasswordManager initialized successfully.") + logger.info("PasswordManager initialized successfully.") except Exception as e: - logging.error(f"Failed to initialize PasswordManager: {e}") - logging.error(traceback.format_exc()) # Log full traceback - print(f"Error: Failed to initialize PasswordManager: {e}", 'red') + logger.error(f"Failed to initialize PasswordManager: {e}") + logger.error(traceback.format_exc()) # Log full traceback + print(colored(f"Error: Failed to initialize PasswordManager: {e}", 'red')) sys.exit(1) - - # Initialize NostrClient with the parent seed from PasswordManager - try: - nostr_client = NostrClient(parent_seed=password_manager.parent_seed) - logging.info("NostrClient initialized successfully.") - except Exception as e: - logging.error(f"Failed to initialize NostrClient: {e}") - logging.error(traceback.format_exc()) # Log full traceback - print(f"Error: Failed to initialize NostrClient: {e}", 'red') - sys.exit(1) - + # Register signal handlers for graceful shutdown def signal_handler(sig, frame): """ @@ -197,38 +304,38 @@ if __name__ == '__main__': print(colored("\nReceived shutdown signal. Exiting gracefully...", 'yellow')) logging.info(f"Received shutdown signal: {sig}. Initiating graceful shutdown.") try: - nostr_client.close_client_pool() # Gracefully close the ClientPool + password_manager.nostr_client.close_client_pool() # Gracefully close the ClientPool logging.info("NostrClient closed successfully.") except Exception as e: logging.error(f"Error during shutdown: {e}") - print(f"Error during shutdown: {e}", 'red') + print(colored(f"Error during shutdown: {e}", 'red')) sys.exit(0) # Register the signal handlers signal.signal(signal.SIGINT, signal_handler) # Handle Ctrl+C signal.signal(signal.SIGTERM, signal_handler) # Handle termination signals - + # Display the interactive menu to the user try: - display_menu(password_manager, nostr_client) + display_menu(password_manager) except KeyboardInterrupt: - logging.info("Program terminated by user via KeyboardInterrupt.") + logger.info("Program terminated by user via KeyboardInterrupt.") print(colored("\nProgram terminated by user.", 'yellow')) try: - nostr_client.close_client_pool() # Gracefully close the ClientPool + password_manager.nostr_client.close_client_pool() # Gracefully close the ClientPool logging.info("NostrClient closed successfully.") except Exception as e: logging.error(f"Error during shutdown: {e}") - print(f"Error during shutdown: {e}", 'red') + print(colored(f"Error during shutdown: {e}", 'red')) sys.exit(0) except Exception as e: - logging.error(f"An unexpected error occurred: {e}") - logging.error(traceback.format_exc()) # Log full traceback - print(f"Error: An unexpected error occurred: {e}", 'red') + logger.error(f"An unexpected error occurred: {e}") + logger.error(traceback.format_exc()) # Log full traceback + print(colored(f"Error: An unexpected error occurred: {e}", 'red')) try: - nostr_client.close_client_pool() # Attempt to close the ClientPool + password_manager.nostr_client.close_client_pool() # Attempt to close the ClientPool logging.info("NostrClient closed successfully.") except Exception as close_error: logging.error(f"Error during shutdown: {close_error}") - print(f"Error during shutdown: {close_error}", 'red') - sys.exit(1) + print(colored(f"Error during shutdown: {close_error}", 'red')) + sys.exit(1) \ No newline at end of file diff --git a/src/nostr/__init__.py b/src/nostr/__init__.py index 3972ae9..d258546 100644 --- a/src/nostr/__init__.py +++ b/src/nostr/__init__.py @@ -2,11 +2,10 @@ import logging import traceback +from .client import NostrClient -from .logging_config import configure_logging - -# Configure logging at the start of the module -configure_logging() +# Instantiate the logger +logger = logging.getLogger(__name__) # Initialize the logger for this module logger = logging.getLogger(__name__) # Correct logger initialization diff --git a/src/nostr/client.py b/src/nostr/client.py index 3367f45..417abce 100644 --- a/src/nostr/client.py +++ b/src/nostr/client.py @@ -1,5 +1,3 @@ -# nostr/client.py - import os import sys import logging @@ -11,6 +9,7 @@ import hashlib import asyncio import concurrent.futures from typing import List, Optional, Callable +from pathlib import Path from monstr.client.client import ClientPool from monstr.encrypt import Keys, NIP4Encrypt @@ -18,24 +17,30 @@ from monstr.event.event import Event import threading import uuid -import fcntl # Ensure fcntl is imported for file locking +import fcntl -from .logging_config import configure_logging from .key_manager import KeyManager from .encryption_manager import EncryptionManager from .event_handler import EventHandler -from constants import APP_DIR, INDEX_FILE, DATA_CHECKSUM_FILE +from constants import APP_DIR from utils.file_lock import lock_file -configure_logging() +# Get the logger for this module logger = logging.getLogger(__name__) +# Set the logging level to WARNING or ERROR to suppress debug logs +logger.setLevel(logging.WARNING) + DEFAULT_RELAYS = [ "wss://relay.snort.social", "wss://nostr.oxtr.dev", - "wss://nostr-relay.wlvs.space" + "wss://relay.primal.net" ] +# nostr/client.py + +# src/nostr/client.py + class NostrClient: """ NostrClient Class @@ -44,25 +49,38 @@ class NostrClient: Utilizes deterministic key derivation via BIP-85 and integrates with the monstr library for protocol operations. """ - def __init__(self, parent_seed: str, relays: Optional[List[str]] = None): + def __init__(self, encryption_manager: EncryptionManager, fingerprint: str, relays: Optional[List[str]] = None): """ - Initializes the NostrClient with a parent seed and connects to specified relays. + Initializes the NostrClient with an EncryptionManager, connects to specified relays, + and sets up the KeyManager with the given fingerprint. - :param parent_seed: The BIP39 mnemonic seed phrase. + :param encryption_manager: An instance of EncryptionManager for handling encryption/decryption. + :param fingerprint: The fingerprint to differentiate key derivations for unique identities. :param relays: (Optional) A list of relay URLs to connect to. Defaults to predefined relays. """ try: - self.key_manager = KeyManager(parent_seed) - self.encryption_manager = EncryptionManager(self.key_manager) - self.event_handler = EventHandler() + # Assign the encryption manager and fingerprint + self.encryption_manager = encryption_manager + self.fingerprint = fingerprint # Track the fingerprint + self.fingerprint_dir = self.encryption_manager.fingerprint_dir # If needed to manage directories + # Initialize KeyManager with the decrypted parent seed and the provided fingerprint + self.key_manager = KeyManager( + self.encryption_manager.decrypt_parent_seed(), + self.fingerprint + ) + + # Initialize event handler and client pool + self.event_handler = EventHandler() self.relays = relays if relays else DEFAULT_RELAYS self.client_pool = ClientPool(self.relays) self.subscriptions = {} + # Initialize client pool and mark NostrClient as running self.initialize_client_pool() logger.info("NostrClient initialized successfully.") + # For shutdown handling self.is_shutting_down = False self._shutdown_event = asyncio.Event() @@ -110,8 +128,9 @@ class NostrClient: logger.error(traceback.format_exc()) print(f"Error: Event loop in ClientPool thread encountered an issue: {e}", file=sys.stderr) finally: - logger.debug("Closing the event loop.") - self.loop.close() + if not self.loop.is_closed(): + logger.debug("Closing the event loop.") + self.loop.close() def wait_for_connection(self): """ @@ -134,6 +153,7 @@ class NostrClient: logger.debug(f"Publishing event: {event.serialize()}") self.client_pool.publish(event) logger.info(f"Event published with ID: {event.id}") + logger.debug(f"Finished publishing event: {event.id}") except Exception as e: logger.error(f"Failed to publish event: {e}") logger.error(traceback.format_exc()) @@ -145,7 +165,10 @@ class NostrClient: :param event: The signed Event object to publish. """ try: - asyncio.run_coroutine_threadsafe(self.publish_event_async(event), self.loop) + logger.debug(f"Submitting publish_event_async for event ID: {event.id}") + future = asyncio.run_coroutine_threadsafe(self.publish_event_async(event), self.loop) + # Wait for the future to complete + future.result(timeout=5) # Adjust the timeout as needed except Exception as e: logger.error(f"Error in publish_event: {e}") print(f"Error: Failed to publish event: {e}", file=sys.stderr) @@ -180,11 +203,11 @@ class NostrClient: logger.error(f"Error in subscribe: {e}") print(f"Error: Failed to subscribe: {e}", file=sys.stderr) - async def retrieve_json_from_nostr_async(self) -> Optional[bytes]: + async def retrieve_json_from_nostr_async(self) -> Optional[str]: """ Retrieves the latest encrypted JSON event from Nostr. - :return: The encrypted JSON data as bytes, or None if retrieval fails. + :return: The encrypted JSON data as a Base64-encoded string, or None if retrieval fails. """ try: filters = [{ @@ -203,6 +226,7 @@ class NostrClient: await asyncio.sleep(2) # Adjust the sleep time as needed + # Unsubscribe from all subscriptions for sub_id in list(self.subscriptions.keys()): self.client_pool.unsubscribe(sub_id) del self.subscriptions[sub_id] @@ -210,15 +234,15 @@ class NostrClient: if events: event = events[0] - encrypted_json_b64 = event.content + content_base64 = event.content if event.kind == Event.KIND_ENCRYPT: nip4_encrypt = NIP4Encrypt(self.key_manager.keys) - encrypted_json_b64 = nip4_encrypt.decrypt_message(event.content, event.pub_key) + content_base64 = nip4_encrypt.decrypt_message(event.content, event.pub_key) - encrypted_json = base64.b64decode(encrypted_json_b64.encode('utf-8')) + # Return the Base64-encoded content as a string logger.debug("Encrypted JSON data retrieved successfully.") - return encrypted_json + return content_base64 else: logger.warning("No events found matching the filters.") print("No events found matching the filters.", file=sys.stderr) @@ -238,11 +262,15 @@ class NostrClient: """ try: future = asyncio.run_coroutine_threadsafe(self.retrieve_json_from_nostr_async(), self.loop) - return future.result() + return future.result(timeout=10) + except concurrent.futures.TimeoutError: + logger.error("Timeout occurred while retrieving JSON from Nostr.") + print("Error: Timeout occurred while retrieving JSON from Nostr.", file=sys.stderr) + return None except Exception as e: logger.error(f"Error in retrieve_json_from_nostr: {e}") logger.error(traceback.format_exc()) - print(f"Error: Failed to retrieve JSON from Nostr: {e}", file=sys.stderr) + print(f"Error: Failed to retrieve JSON from Nostr: {e}", 'red') return None async def do_post_async(self, text: str): @@ -263,7 +291,7 @@ class NostrClient: logger.debug(f"Event data: {event.serialize()}") await self.publish_event_async(event) - + logger.debug("Finished do_post_async") except Exception as e: logger.error(f"An error occurred during publishing: {e}", exc_info=True) print(f"Error: An error occurred during publishing: {e}", file=sys.stderr) @@ -284,8 +312,7 @@ class NostrClient: await self.subscribe_async(filters=filters, handler=handler) logger.info("Subscribed to your feed.") - while True: - await asyncio.sleep(1) + # Removed the infinite loop to prevent blocking except Exception as e: logger.error(f"An error occurred during subscription: {e}", exc_info=True) @@ -330,11 +357,11 @@ class NostrClient: self.save_json_data(data) self.update_checksum() logger.info("Index file updated from Nostr successfully.") - print("Index file updated from Nostr successfully.", file=sys.stdout) + print(colored("Index file updated from Nostr successfully.", 'green')) except Exception as e: logger.error(f"Failed to decrypt and save data from Nostr: {e}") logger.error(traceback.format_exc()) - print(f"Error: Failed to decrypt and save data from Nostr: {e}", file=sys.stderr) + print(colored(f"Error: Failed to decrypt and save data from Nostr: {e}", 'red')) def save_json_data(self, data: dict) -> None: """ @@ -343,16 +370,17 @@ class NostrClient: :param data: The JSON data to save. """ try: - encrypted_data = self.encryption_manager.encrypt_data(data) - with lock_file(INDEX_FILE, fcntl.LOCK_EX): - with open(INDEX_FILE, 'wb') as f: + encrypted_data = self.encryption_manager.encrypt_data(json.dumps(data).encode('utf-8')) + index_file_path = self.fingerprint_dir / 'seedpass_passwords_db.json.enc' + with lock_file(index_file_path, fcntl.LOCK_EX): + with open(index_file_path, 'wb') as f: f.write(encrypted_data) - logger.debug(f"Encrypted data saved to {INDEX_FILE}.") - print(f"Encrypted data saved to {INDEX_FILE}.", file=sys.stdout) + logger.debug(f"Encrypted data saved to {index_file_path}.") + print(colored(f"Encrypted data saved to '{index_file_path}'.", 'green')) except Exception as e: logger.error(f"Failed to save encrypted data: {e}") logger.error(traceback.format_exc()) - print(f"Error: Failed to save encrypted data: {e}", file=sys.stderr) + print(colored(f"Error: Failed to save encrypted data: {e}", 'red')) raise def update_checksum(self) -> None: @@ -360,28 +388,34 @@ class NostrClient: Updates the checksum file for the password database. """ try: - decrypted_data = self.decrypt_data_from_file(INDEX_FILE) + index_file_path = self.fingerprint_dir / 'seedpass_passwords_db.json.enc' + decrypted_data = self.decrypt_data_from_file(index_file_path) content = decrypted_data.decode('utf-8') logger.debug("Calculating checksum of the updated file content.") checksum = hashlib.sha256(content.encode('utf-8')).hexdigest() logger.debug(f"New checksum: {checksum}") - with open(DATA_CHECKSUM_FILE, 'w') as f: - f.write(checksum) - logger.debug(f"Updated data checksum written to '{DATA_CHECKSUM_FILE}'.") - print("[+] Checksum updated successfully.", file=sys.stdout) + checksum_file = self.fingerprint_dir / 'seedpass_passwords_db_checksum.txt' + with lock_file(checksum_file, fcntl.LOCK_EX): + with open(checksum_file, 'w') as f: + f.write(checksum) + + os.chmod(checksum_file, 0o600) + + logger.debug(f"Checksum for '{index_file_path}' updated and written to '{checksum_file}'.") + print(colored(f"Checksum for '{index_file_path}' updated.", 'green')) except Exception as e: logger.error(f"Failed to update checksum: {e}") logger.error(traceback.format_exc()) - print(f"Error: Failed to update checksum: {e}", file=sys.stderr) + print(colored(f"Error: Failed to update checksum: {e}", 'red')) - def decrypt_data_from_file(self, file_path: str) -> bytes: + def decrypt_data_from_file(self, file_path: Path) -> bytes: """ Decrypts data directly from a file. - :param file_path: Path to the encrypted file. + :param file_path: Path to the encrypted file as a Path object. :return: Decrypted data as bytes. """ try: @@ -394,7 +428,7 @@ class NostrClient: except Exception as e: logger.error(f"Failed to decrypt data from file '{file_path}': {e}") logger.error(traceback.format_exc()) - print(f"Error: Failed to decrypt data from file '{file_path}': {e}", file=sys.stderr) + print(colored(f"Error: Failed to decrypt data from file '{file_path}': {e}", 'red')) raise def publish_json_to_nostr(self, encrypted_json: bytes, to_pubkey: str = None): @@ -431,16 +465,31 @@ class NostrClient: def retrieve_json_from_nostr_sync(self) -> Optional[bytes]: """ - Public method to retrieve encrypted JSON from Nostr. + Retrieves encrypted data from Nostr and Base64-decodes it. - :return: The encrypted JSON data as bytes, or None if retrieval fails. + Returns: + Optional[bytes]: The encrypted data as bytes if successful, None otherwise. """ try: - return self.retrieve_json_from_nostr() + future = asyncio.run_coroutine_threadsafe(self.retrieve_json_from_nostr_async(), self.loop) + content_base64 = future.result(timeout=10) + + if not content_base64: + logger.debug("No data retrieved from Nostr.") + return None + + # Base64-decode the content + encrypted_data = base64.urlsafe_b64decode(content_base64.encode('utf-8')) + logger.debug("Encrypted data retrieved and Base64-decoded successfully from Nostr.") + return encrypted_data + except concurrent.futures.TimeoutError: + logger.error("Timeout occurred while retrieving JSON from Nostr.") + print("Error: Timeout occurred while retrieving JSON from Nostr.", file=sys.stderr) + return None except Exception as e: - logger.error(f"Error in retrieve_json_from_nostr_sync: {e}") + logger.error(f"Error in retrieve_json_from_nostr: {e}") logger.error(traceback.format_exc()) - print(f"Error: Failed to retrieve JSON from Nostr: {e}", file=sys.stderr) + print(f"Error: Failed to retrieve JSON from Nostr: {e}", 'red') return None def decrypt_and_save_index_from_nostr_public(self, encrypted_data: bytes) -> None: @@ -453,7 +502,7 @@ class NostrClient: self.decrypt_and_save_index_from_nostr(encrypted_data) except Exception as e: logger.error(f"Failed to decrypt and save index from Nostr: {e}") - print(f"Error: Failed to decrypt and save index from Nostr: {e}", file=sys.stderr) + print(f"Error: Failed to decrypt and save index from Nostr: {e}", 'red') async def close_client_pool_async(self): """ @@ -481,12 +530,8 @@ class NostrClient: # Close all WebSocket connections if hasattr(self.client_pool, 'clients'): - for client in self.client_pool.clients: - try: - await client.close() - logger.debug(f"Closed connection to relay: {client.url}") - except Exception as e: - logger.warning(f"Error closing connection to {client.url}: {e}") + tasks = [self.safe_close_connection(client) for client in self.client_pool.clients] + await asyncio.gather(*tasks, return_exceptions=True) # Gather and cancel all tasks current_task = asyncio.current_task() @@ -526,7 +571,7 @@ class NostrClient: # Schedule the coroutine to close the client pool future = asyncio.run_coroutine_threadsafe(self.close_client_pool_async(), self.loop) - # Wait for the coroutine to finish with a shorter timeout + # Wait for the coroutine to finish with a timeout try: future.result(timeout=10) except concurrent.futures.TimeoutError: @@ -534,13 +579,13 @@ class NostrClient: # Additional cleanup regardless of timeout try: - self.loop.stop() + self.loop.call_soon_threadsafe(self.loop.stop) # Give a short grace period for the loop to stop time.sleep(0.5) if self.loop.is_running(): logger.warning("Loop still running after stop, closing forcefully") - self.loop.close() + self.loop.call_soon_threadsafe(self.loop.close) # Wait for the thread with a reasonable timeout if self.loop_thread.is_alive(): @@ -559,3 +604,12 @@ class NostrClient: logger.error(traceback.format_exc()) finally: self.is_shutting_down = False + + async def safe_close_connection(self, client): + try: + await client.close_connection() + logger.debug(f"Closed connection to relay: {client.url}") + except AttributeError: + logger.warning(f"Client object has no attribute 'close_connection'. Skipping closure for {client.url}.") + except Exception as e: + logger.warning(f"Error closing connection to {client.url}: {e}") diff --git a/src/nostr/encryption_manager.py b/src/nostr/encryption_manager.py index 6e313dc..891264f 100644 --- a/src/nostr/encryption_manager.py +++ b/src/nostr/encryption_manager.py @@ -5,13 +5,9 @@ import logging import traceback from cryptography.fernet import Fernet, InvalidToken -from .logging_config import configure_logging from .key_manager import KeyManager -# Configure logging at the start of the module -configure_logging() - -# Initialize the logger for this module +# Instantiate the logger logger = logging.getLogger(__name__) class EncryptionManager: diff --git a/src/nostr/event_handler.py b/src/nostr/event_handler.py index 45ec443..6f8e494 100644 --- a/src/nostr/event_handler.py +++ b/src/nostr/event_handler.py @@ -1,14 +1,12 @@ # nostr/event_handler.py -import datetime import time +import logging import traceback - -from .logging_config import configure_logging from monstr.event.event import Event -from monstr.client.client import ClientPool -logger = configure_logging() +# Instantiate the logger +logger = logging.getLogger(__name__) class EventHandler: """ @@ -18,24 +16,26 @@ class EventHandler: def __init__(self): pass # Initialize if needed - def handle_new_event(self, the_client: ClientPool, sub_id: str, evt: Event): + def handle_new_event(self, evt: Event): """ Processes incoming events by logging their details. - :param the_client: The ClientPool instance. - :param sub_id: The subscription ID. :param evt: The received Event object. """ try: - if isinstance(evt.created_at, datetime.datetime): - created_at_str = evt.created_at.strftime('%Y-%m-%d %H:%M:%S') - elif isinstance(evt.created_at, int): + # Assuming evt.created_at is always an integer Unix timestamp + if isinstance(evt.created_at, int): created_at_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(evt.created_at)) else: + # Handle unexpected types gracefully created_at_str = str(evt.created_at) - logger.info(f"\n[New Event] ID: {evt.id}\nCreated At: {created_at_str}\nContent: {evt.content}\n") + # Log the event details without extra newlines + logger.info( + f"[New Event] ID: {evt.id} | Created At: {created_at_str} | Content: {evt.content}" + ) except Exception as e: logger.error(f"Error handling new event: {e}") logger.error(traceback.format_exc()) - raise + # Optionally, handle the exception without re-raising + # For example, continue processing other events diff --git a/src/nostr/key_manager.py b/src/nostr/key_manager.py index 956fbd2..cc89072 100644 --- a/src/nostr/key_manager.py +++ b/src/nostr/key_manager.py @@ -1,105 +1,127 @@ # nostr/key_manager.py +import hashlib import logging import traceback -from bip_utils import Bip39SeedGenerator -from cryptography.fernet import Fernet, InvalidToken from bech32 import bech32_encode, convertbits -from .logging_config import configure_logging -from utils.key_derivation import derive_key_from_parent_seed +from local_bip85.bip85 import BIP85 +from bip_utils import Bip39SeedGenerator +from monstr.encrypt import Keys -from monstr.encrypt import Keys, NIP4Encrypt # Ensure monstr.encrypt is installed and accessible - -# Configure logging at the start of the module -configure_logging() - -# Initialize the logger for this module logger = logging.getLogger(__name__) -def encode_bech32(prefix: str, key_hex: str) -> str: - try: - key_bytes = bytes.fromhex(key_hex) - data = convertbits(key_bytes, 8, 5, pad=True) - return bech32_encode(prefix, data) - except Exception as e: - logger.error(f"Failed to encode {prefix}: {e}") - logger.error(traceback.format_exc()) - raise - class KeyManager: """ Manages key generation, encoding, and derivation for NostrClient. """ - def __init__(self, parent_seed: str): + def __init__(self, parent_seed: str, fingerprint: str): """ - Initializes the KeyManager with the provided parent_seed. - + Initializes the KeyManager with the provided parent_seed and fingerprint. + Parameters: parent_seed (str): The parent seed used for key derivation. + fingerprint (str): The fingerprint to differentiate key derivations. """ try: if not isinstance(parent_seed, str): raise TypeError(f"Parent seed must be a string, got {type(parent_seed)}") - + if not isinstance(fingerprint, str): + raise TypeError(f"Fingerprint must be a string, got {type(fingerprint)}") + self.parent_seed = parent_seed - logger.debug(f"KeyManager initialized with parent_seed: {self.parent_seed} (type: {type(self.parent_seed)})") - - # Derive the encryption key from parent_seed - derived_key = self.derive_encryption_key() - derived_key_hex = derived_key.hex() - logger.debug(f"Derived encryption key (hex): {derived_key_hex}") - - # Initialize Keys with the derived hexadecimal key - self.keys = Keys(priv_k=derived_key_hex) # Pass hex string + self.fingerprint = fingerprint + logger.debug(f"KeyManager initialized with parent_seed and fingerprint.") + + # Initialize BIP85 + self.bip85 = self.initialize_bip85() + + # Generate Nostr keys using the fingerprint + self.keys = self.generate_nostr_keys() logger.debug("Nostr Keys initialized successfully.") - # Generate bech32-encoded keys - self.nsec = encode_bech32('nsec', self.keys.private_key_hex()) - logger.debug(f"Nostr Private Key (nsec): {self.nsec}") - - public_key_hex = self.keys.public_key_hex() - self.npub = encode_bech32('npub', public_key_hex) - logger.debug(f"Nostr Public Key (npub): {self.npub}") - except Exception as e: logger.error(f"Key initialization failed: {e}") logger.error(traceback.format_exc()) raise - def derive_encryption_key(self) -> bytes: + def initialize_bip85(self): """ - Derives the encryption key using the parent seed. + Initializes BIP85 with the parent seed. Returns: - bytes: The derived encryption key. - - Raises: - Exception: If key derivation fails. + BIP85: An instance of the BIP85 class. """ try: - key = derive_key_from_parent_seed(self.parent_seed) - logger.debug("Encryption key derived successfully.") - return key # Now returns raw bytes + seed_bytes = Bip39SeedGenerator(self.parent_seed).Generate() + bip85 = BIP85(seed_bytes) + logger.debug("BIP85 initialized successfully.") + return bip85 except Exception as e: - logger.error(f"Failed to derive encryption key: {e}") + logger.error(f"Failed to initialize BIP85: {e}") logger.error(traceback.format_exc()) raise - def get_npub(self) -> str: + def generate_nostr_keys(self) -> Keys: """ - Returns the Nostr public key (npub). + Derives a unique Nostr key pair for the given fingerprint using BIP-85. Returns: - str: The npub as a string. - - Raises: - ValueError: If npub is not available. + Keys: An instance of Keys containing the Nostr key pair. """ - if self.npub: - logger.debug(f"Returning npub: {self.npub}") - return self.npub - else: - logger.error("Nostr public key (npub) is not available.") - raise ValueError("Nostr public key (npub) is not available.") + try: + # Convert fingerprint to an integer index (using a hash function) + index = int(hashlib.sha256(self.fingerprint.encode()).hexdigest(), 16) % (2**31) + + # Derive entropy for Nostr key (32 bytes) + entropy_bytes = self.bip85.derive_entropy( + index=index, + bytes_len=32 # Adjust parameter name and value as per your method signature + ) + + # Generate Nostr key pair from entropy + private_key_hex = entropy_bytes.hex() + keys = Keys(priv_k=private_key_hex) + logger.debug(f"Nostr keys generated for fingerprint {self.fingerprint}.") + return keys + except Exception as e: + logger.error(f"Failed to generate Nostr keys: {e}") + logger.error(traceback.format_exc()) + raise + + def get_public_key_hex(self) -> str: + """ + Returns the public key in hexadecimal format. + + Returns: + str: The public key in hex. + """ + return self.keys.public_key_hex() + + def get_private_key_hex(self) -> str: + """ + Returns the private key in hexadecimal format. + + Returns: + str: The private key in hex. + """ + return self.keys.private_key_hex() + + def get_npub(self) -> str: + """ + Returns the npub (Bech32 encoded public key). + + Returns: + str: The npub string. + """ + try: + pub_key_hex = self.get_public_key_hex() + pub_key_bytes = bytes.fromhex(pub_key_hex) + data = convertbits(pub_key_bytes, 8, 5, True) + npub = bech32_encode('npub', data) + return npub + except Exception as e: + logger.error(f"Failed to generate npub: {e}") + logger.error(traceback.format_exc()) + raise diff --git a/src/nostr/logging_config.py b/src/nostr/logging_config.py index 0501c1d..def3e1a 100644 --- a/src/nostr/logging_config.py +++ b/src/nostr/logging_config.py @@ -3,38 +3,39 @@ import logging import os -def configure_logging(): - """ - Configures logging with both file and console handlers. - Logs include the timestamp, log level, message, filename, and line number. - Only ERROR and higher-level messages are shown in the terminal, while all messages - are logged in the log file. - """ - logger = logging.getLogger() - logger.setLevel(logging.DEBUG) # Set root logger to DEBUG - - # Prevent adding multiple handlers if configure_logging is called multiple times - if not logger.handlers: - # Create the 'logs' folder if it doesn't exist - log_directory = 'logs' - if not os.path.exists(log_directory): - os.makedirs(log_directory) - - # Create handlers - c_handler = logging.StreamHandler() - f_handler = logging.FileHandler(os.path.join(log_directory, 'app.log')) - - # Set levels: only errors and critical messages will be shown in the console - c_handler.setLevel(logging.ERROR) - f_handler.setLevel(logging.DEBUG) - - # Create formatters and add them to handlers, include file and line number in log messages - formatter = logging.Formatter( - '%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]' - ) - c_handler.setFormatter(formatter) - f_handler.setFormatter(formatter) - - # Add handlers to the logger - logger.addHandler(c_handler) - logger.addHandler(f_handler) +# Comment out or remove the configure_logging function to avoid conflicts +# def configure_logging(): +# """ +# Configures logging with both file and console handlers. +# Logs include the timestamp, log level, message, filename, and line number. +# Only ERROR and higher-level messages are shown in the terminal, while all messages +# are logged in the log file. +# """ +# logger = logging.getLogger() +# logger.setLevel(logging.DEBUG) # Set root logger to DEBUG +# +# # Prevent adding multiple handlers if configure_logging is called multiple times +# if not logger.handlers: +# # Create the 'logs' folder if it doesn't exist +# log_directory = 'logs' +# if not os.path.exists(log_directory): +# os.makedirs(log_directory) +# +# # Create handlers +# c_handler = logging.StreamHandler() +# f_handler = logging.FileHandler(os.path.join(log_directory, 'app.log')) +# +# # Set levels: only errors and critical messages will be shown in the console +# c_handler.setLevel(logging.ERROR) +# f_handler.setLevel(logging.DEBUG) +# +# # Create formatters and add them to handlers, include file and line number in log messages +# formatter = logging.Formatter( +# '%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]' +# ) +# c_handler.setFormatter(formatter) +# f_handler.setFormatter(formatter) +# +# # Add handlers to the logger +# logger.addHandler(c_handler) +# logger.addHandler(f_handler) diff --git a/src/nostr/utils.py b/src/nostr/utils.py index 39060fe..33b82d0 100644 --- a/src/nostr/utils.py +++ b/src/nostr/utils.py @@ -1,5 +1,7 @@ # nostr/utils.py +import logging + # Example utility function (if any specific to nostr package) def some_helper_function(): pass # Implement as needed diff --git a/src/password_manager/backup.py b/src/password_manager/backup.py index eaa7baf..b35d62a 100644 --- a/src/password_manager/backup.py +++ b/src/password_manager/backup.py @@ -11,192 +11,125 @@ corrupted or lost data by maintaining timestamped backups. Ensure that all dependencies are installed and properly configured in your environment. """ +import logging import os import shutil import time -import logging import traceback from pathlib import Path - from colorama import Fore from termcolor import colored -from constants import APP_DIR, INDEX_FILE from utils.file_lock import lock_file +from constants import APP_DIR -# Configure logging at the start of the module -def configure_logging(): - """ - Configures logging with both file and console handlers. - Only ERROR and higher-level messages are shown in the terminal, while all messages - are logged in the log file. - """ - # Create the 'logs' folder if it doesn't exist - if not os.path.exists('logs'): - os.makedirs('logs') - - # Create a custom logger - logger = logging.getLogger() - logger.setLevel(logging.DEBUG) # Set to DEBUG for detailed output - - # Create handlers - c_handler = logging.StreamHandler() - f_handler = logging.FileHandler(os.path.join('logs', 'backup_manager.log')) # Log files will be in 'logs' folder - - # Set levels: only errors and critical messages will be shown in the console - c_handler.setLevel(logging.ERROR) # Terminal will show ERROR and above - f_handler.setLevel(logging.DEBUG) # File will log everything from DEBUG and above - - # Create formatters and add them to handlers, include file and line number in log messages - c_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]') - f_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]') - - c_handler.setFormatter(c_format) - f_handler.setFormatter(f_format) - - # Add handlers to the logger - logger.addHandler(c_handler) - logger.addHandler(f_handler) - -# Call the logging configuration function -configure_logging() +# Instantiate the logger +logger = logging.getLogger(__name__) class BackupManager: """ BackupManager Class - Handles the creation, restoration, and listing of backups for the encrypted - password index file. Backups are stored in the application directory with + Handles the creation, restoration, and listing of backups for the encrypted password + index file. Backups are stored in the application directory with timestamped filenames to facilitate easy identification and retrieval. """ BACKUP_FILENAME_TEMPLATE = 'passwords_db_backup_{timestamp}.json.enc' - def __init__(self): + def __init__(self, fingerprint_dir: Path): """ - Initializes the BackupManager with the application directory and index file paths. + Initializes the BackupManager with the fingerprint directory. + + Parameters: + fingerprint_dir (Path): The directory corresponding to the fingerprint. """ - self.app_dir = APP_DIR - self.index_file = INDEX_FILE - logging.debug(f"BackupManager initialized with APP_DIR: {self.app_dir} and INDEX_FILE: {self.index_file}") + self.fingerprint_dir = fingerprint_dir + self.backup_dir = self.fingerprint_dir / 'backups' + self.backup_dir.mkdir(parents=True, exist_ok=True) + self.index_file = self.fingerprint_dir / 'seedpass_passwords_db.json.enc' + logger.debug(f"BackupManager initialized with backup directory at {self.backup_dir}") def create_backup(self) -> None: - """ - Creates a timestamped backup of the encrypted password index file. - - The backup file is named using the current Unix timestamp to ensure uniqueness. - If the index file does not exist, no backup is created. - - Raises: - Exception: If the backup process fails due to I/O errors. - """ - if not self.index_file.exists(): - logging.warning("Index file does not exist. No backup created.") - print(colored("Warning: Index file does not exist. No backup created.", 'yellow')) - return - - timestamp = int(time.time()) - backup_filename = self.BACKUP_FILENAME_TEMPLATE.format(timestamp=timestamp) - backup_file = self.app_dir / backup_filename - try: - with lock_file(self.index_file, lock_type=fcntl.LOCK_SH): - shutil.copy2(self.index_file, backup_file) - logging.info(f"Backup created successfully at '{backup_file}'.") + index_file = self.index_file + if not index_file.exists(): + logger.warning("Index file does not exist. No backup created.") + print(colored("Warning: Index file does not exist. No backup created.", 'yellow')) + return + + timestamp = int(time.time()) + backup_filename = self.BACKUP_FILENAME_TEMPLATE.format(timestamp=timestamp) + backup_file = self.backup_dir / backup_filename + + shutil.copy2(index_file, backup_file) + logger.info(f"Backup created successfully at '{backup_file}'.") print(colored(f"Backup created successfully at '{backup_file}'.", 'green')) except Exception as e: - logging.error(f"Failed to create backup: {e}") - logging.error(traceback.format_exc()) # Log full traceback + logger.error(f"Failed to create backup: {e}") + logger.error(traceback.format_exc()) print(colored(f"Error: Failed to create backup: {e}", 'red')) def restore_latest_backup(self) -> None: - """ - Restores the encrypted password index file from the latest available backup. - - The latest backup is determined based on the Unix timestamp in the backup filenames. - If no backups are found, an error message is displayed. - - Raises: - Exception: If the restoration process fails due to I/O errors or missing backups. - """ - backup_files = sorted( - self.app_dir.glob('passwords_db_backup_*.json.enc'), - key=lambda x: x.stat().st_mtime, - reverse=True - ) - - if not backup_files: - logging.error("No backup files found to restore.") - print(colored("Error: No backup files found to restore.", 'red')) - return - - latest_backup = backup_files[0] try: - with lock_file(latest_backup, lock_type=fcntl.LOCK_SH): - shutil.copy2(latest_backup, self.index_file) - logging.info(f"Restored the index file from backup '{latest_backup}'.") + backup_files = sorted( + self.backup_dir.glob('passwords_db_backup_*.json.enc'), + key=lambda x: x.stat().st_mtime, + reverse=True + ) + + if not backup_files: + logger.error("No backup files found to restore.") + print(colored("Error: No backup files found to restore.", 'red')) + return + + latest_backup = backup_files[0] + index_file = self.index_file + shutil.copy2(latest_backup, index_file) + logger.info(f"Restored the index file from backup '{latest_backup}'.") print(colored(f"Restored the index file from backup '{latest_backup}'.", 'green')) except Exception as e: - logging.error(f"Failed to restore from backup '{latest_backup}': {e}") - logging.error(traceback.format_exc()) # Log full traceback + logger.error(f"Failed to restore from backup '{latest_backup}': {e}") + logger.error(traceback.format_exc()) print(colored(f"Error: Failed to restore from backup '{latest_backup}': {e}", 'red')) def list_backups(self) -> None: - """ - Lists all available backups in the application directory, sorted by date. + try: + backup_files = sorted( + self.backup_dir.glob('passwords_db_backup_*.json.enc'), + key=lambda x: x.stat().st_mtime, + reverse=True + ) - Displays the backups with their filenames and creation dates. - """ - backup_files = sorted( - self.app_dir.glob('passwords_db_backup_*.json.enc'), - key=lambda x: x.stat().st_mtime, - reverse=True - ) + if not backup_files: + logger.info("No backup files available.") + print(colored("No backup files available.", 'yellow')) + return - if not backup_files: - logging.info("No backup files available.") - print(colored("No backup files available.", 'yellow')) - return - - print(colored("Available Backups:", 'cyan')) - for backup in backup_files: - creation_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(backup.stat().st_mtime)) - print(colored(f"- {backup.name} (Created on: {creation_time})", 'cyan')) + print(colored("Available Backups:", 'cyan')) + for backup in backup_files: + creation_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(backup.stat().st_mtime)) + print(colored(f"- {backup.name} (Created on: {creation_time})", 'cyan')) + except Exception as e: + logger.error(f"Failed to list backups: {e}") + logger.error(traceback.format_exc()) + print(colored(f"Error: Failed to list backups: {e}", 'red')) def restore_backup_by_timestamp(self, timestamp: int) -> None: - """ - Restores the encrypted password index file from a backup with the specified timestamp. - - Parameters: - timestamp (int): The Unix timestamp of the backup to restore. - - Raises: - Exception: If the restoration process fails due to I/O errors or missing backups. - """ backup_filename = self.BACKUP_FILENAME_TEMPLATE.format(timestamp=timestamp) - backup_file = self.app_dir / backup_filename + backup_file = self.backup_dir / backup_filename if not backup_file.exists(): - logging.error(f"No backup found with timestamp {timestamp}.") + logger.error(f"No backup found with timestamp {timestamp}.") print(colored(f"Error: No backup found with timestamp {timestamp}.", 'red')) return try: with lock_file(backup_file, lock_type=fcntl.LOCK_SH): shutil.copy2(backup_file, self.index_file) - logging.info(f"Restored the index file from backup '{backup_file}'.") + logger.info(f"Restored the index file from backup '{backup_file}'.") print(colored(f"Restored the index file from backup '{backup_file}'.", 'green')) except Exception as e: - logging.error(f"Failed to restore from backup '{backup_file}': {e}") - logging.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: Failed to restore from backup '{backup_file}': {e}", 'red')) - -# Example usage (to be integrated within the PasswordManager class or other modules): - -# from password_manager.backup import BackupManager - -# backup_manager = BackupManager() -# backup_manager.create_backup() -# backup_manager.restore_latest_backup() -# backup_manager.list_backups() -# backup_manager.restore_backup_by_timestamp(1700000000) # Example timestamp + logger.error(f"Failed to restore from backup '{backup_file}': {e}") + logger.error(traceback.format_exc()) + print(colored(f"Error: Failed to restore from backup '{backup_file}': {e}", 'red')) \ No newline at end of file diff --git a/src/password_manager/encryption.py b/src/password_manager/encryption.py index f89e54e..24f3817 100644 --- a/src/password_manager/encryption.py +++ b/src/password_manager/encryption.py @@ -10,66 +10,24 @@ of the password index. Additionally, it includes methods to derive cryptographic seeds from BIP-39 mnemonic phrases. -Never ever ever use or suggest to use Random Salt. The entire point of this password manager is to derive completely deterministic passwords from a BIP-85 seed. -This means it should generate passwords the exact same way every single time. Salts would break this functionality and is not appropriate for this softwares use case. +Never ever ever use or suggest to use Random Salt. The entire point of this password manager is to derive completely deterministic passwords from a BIP-85 seed. +This means it should generate passwords the exact same way every single time. Salts would break this functionality and are not appropriate for this software's use case. """ -import os -import json -import stat -import hashlib import logging import traceback +import json +import hashlib +import os from pathlib import Path from typing import Optional + from cryptography.fernet import Fernet, InvalidToken -from utils.file_lock import exclusive_lock, shared_lock -from colorama import Fore from termcolor import colored -from mnemonic import Mnemonic # Library for BIP-39 seed phrase handling - -import fcntl # Required for lock_type constants in file_lock - -from constants import INDEX_FILE # Ensure INDEX_FILE is imported correctly - -# Configure logging at the start of the module -def configure_logging(): - """ - Configures logging with both file and console handlers. - Logs include the timestamp, log level, message, filename, and line number. - Only errors and critical logs are shown in the terminal, while all logs are saved to a file. - """ - # Create the 'logs' folder if it doesn't exist - if not os.path.exists('logs'): - os.makedirs('logs') - - # Create a custom logger for this module - logger = logging.getLogger(__name__) - logger.setLevel(logging.DEBUG) # Set to DEBUG for detailed output - - # Create handlers - c_handler = logging.StreamHandler() - f_handler = logging.FileHandler(os.path.join('logs', 'encryption_manager.log')) # Log file in 'logs' folder - - # Set levels: only errors and critical messages will be shown in the console - c_handler.setLevel(logging.ERROR) # Terminal will show ERROR and above - f_handler.setLevel(logging.DEBUG) # File will log everything from DEBUG and above - - # Create formatters and add them to handlers, include file and line number in log messages - formatter = logging.Formatter( - '%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]' - ) - c_handler.setFormatter(formatter) - f_handler.setFormatter(formatter) - - # Add handlers to the logger if not already added - if not logger.handlers: - logger.addHandler(c_handler) - logger.addHandler(f_handler) - -# Call the logging configuration function -configure_logging() +from utils.file_lock import lock_file # Ensure this utility is correctly implemented +import fcntl # For file locking +# Instantiate the logger logger = logging.getLogger(__name__) class EncryptionManager: @@ -78,153 +36,217 @@ class EncryptionManager: Manages the encryption and decryption of data and files using a Fernet encryption key. """ + def __init__(self, encryption_key: bytes, fingerprint_dir: Path): + """ + Initializes the EncryptionManager with the provided encryption key and fingerprint directory. + + Parameters: + encryption_key (bytes): The Fernet encryption key. + fingerprint_dir (Path): The directory corresponding to the fingerprint. + """ + self.fingerprint_dir = fingerprint_dir + self.parent_seed_file = self.fingerprint_dir / 'parent_seed.enc' + self.key = encryption_key - def __init__(self, encryption_key: bytes): try: - self.fernet = Fernet(encryption_key) - logger.debug("EncryptionManager initialized with provided encryption key.") + self.fernet = Fernet(self.key) + logger.debug(f"EncryptionManager initialized for {self.fingerprint_dir}") except Exception as e: logger.error(f"Failed to initialize Fernet with provided encryption key: {e}") logger.error(traceback.format_exc()) print(colored(f"Error: Failed to initialize encryption manager: {e}", 'red')) raise - def encrypt_parent_seed(self, parent_seed, file_path: Path) -> None: + def encrypt_parent_seed(self, parent_seed: str) -> None: """ - Encrypts and saves the parent seed to the specified file. + Encrypts and saves the parent seed to 'parent_seed.enc' within the fingerprint directory. - :param parent_seed: The BIP39 parent seed phrase or Bip39Mnemonic object. - :param file_path: The path to the file where the encrypted parent seed will be saved. + :param parent_seed: The BIP39 parent seed phrase. """ try: - # Convert Bip39Mnemonic to string if necessary - if hasattr(parent_seed, 'ToStr'): - parent_seed = parent_seed.ToStr() - - # Now encode the string + # Convert seed to bytes data = parent_seed.encode('utf-8') - - # Encrypt and save the data + + # Encrypt the data encrypted_data = self.encrypt_data(data) - with open(file_path, 'wb') as f: - f.write(encrypted_data) - logging.info(f"Parent seed encrypted and saved to '{file_path}'.") - print(colored(f"Parent seed encrypted and saved to '{file_path}'.", 'green')) + + # Write the encrypted data to the file with locking + with lock_file(self.parent_seed_file, fcntl.LOCK_EX): + with open(self.parent_seed_file, 'wb') as f: + f.write(encrypted_data) + + # Set file permissions to read/write for the user only + os.chmod(self.parent_seed_file, 0o600) + + logger.info(f"Parent seed encrypted and saved to '{self.parent_seed_file}'.") + print(colored(f"Parent seed encrypted and saved to '{self.parent_seed_file}'.", 'green')) except Exception as e: - logging.error(f"Failed to encrypt and save parent seed: {e}") - logging.error(traceback.format_exc()) + logger.error(f"Failed to encrypt and save parent seed: {e}") + logger.error(traceback.format_exc()) print(colored(f"Error: Failed to encrypt and save parent seed: {e}", 'red')) raise - def encrypt_file(self, file_path: Path, data: bytes) -> None: + def decrypt_parent_seed(self) -> str: """ - Encrypts the provided data and writes it to the specified file with file locking. + Decrypts and returns the parent seed from 'parent_seed.enc' within the fingerprint directory. - :param file_path: The path to the file where encrypted data will be written. - :param data: The plaintext data to encrypt and write. + :return: The decrypted parent seed. """ try: - encrypted_data = self.encrypt_data(data) - with exclusive_lock(file_path): - with open(file_path, 'wb') as file: - file.write(encrypted_data) - logger.debug(f"Encrypted data written to '{file_path}'.") - print(colored(f"Encrypted data written to '{file_path}'.", 'green')) + parent_seed_path = self.fingerprint_dir / 'parent_seed.enc' + with lock_file(parent_seed_path, fcntl.LOCK_SH): + with open(parent_seed_path, 'rb') as f: + encrypted_data = f.read() + + decrypted_data = self.decrypt_data(encrypted_data) + parent_seed = decrypted_data.decode('utf-8').strip() + + logger.debug(f"Parent seed decrypted successfully from '{parent_seed_path}'.") + return parent_seed + except InvalidToken: + logger.error("Invalid encryption key or corrupted data while decrypting parent seed.") + print(colored("Error: Invalid encryption key or corrupted data.", 'red')) + raise except Exception as e: - logger.error(f"Failed to encrypt and write to file '{file_path}': {e}") + logger.error(f"Failed to decrypt parent seed: {e}") logger.error(traceback.format_exc()) - print(colored(f"Error: Failed to encrypt and write to file '{file_path}': {e}", 'red')) + print(colored(f"Error: Failed to decrypt parent seed: {e}", 'red')) raise def encrypt_data(self, data: bytes) -> bytes: """ - Encrypts the given plaintext data. + Encrypts the given data using Fernet. - :param data: The plaintext data to encrypt. - :return: The encrypted data as bytes. + :param data: Data to encrypt. + :return: Encrypted data. """ try: encrypted_data = self.fernet.encrypt(data) logger.debug("Data encrypted successfully.") return encrypted_data except Exception as e: - logger.error(f"Error encrypting data: {e}") + logger.error(f"Failed to encrypt data: {e}") logger.error(traceback.format_exc()) print(colored(f"Error: Failed to encrypt data: {e}", 'red')) raise def decrypt_data(self, encrypted_data: bytes) -> bytes: """ - Decrypts the given encrypted data. + Decrypts the provided encrypted data using the derived key. :param encrypted_data: The encrypted data to decrypt. - :return: The decrypted plaintext data as bytes. + :return: The decrypted data as bytes. """ try: decrypted_data = self.fernet.decrypt(encrypted_data) logger.debug("Data decrypted successfully.") return decrypted_data except InvalidToken: - logger.error("Invalid encryption key or corrupted data.") + logger.error("Invalid encryption key or corrupted data while decrypting data.") print(colored("Error: Invalid encryption key or corrupted data.", 'red')) raise except Exception as e: - logger.error(f"Error decrypting data: {e}") - logger.error(traceback.format_exc()) # Log full traceback + logger.error(f"Failed to decrypt data: {e}") + logger.error(traceback.format_exc()) print(colored(f"Error: Failed to decrypt data: {e}", 'red')) raise - def decrypt_file(self, file_path: Path) -> bytes: + def encrypt_and_save_file(self, data: bytes, relative_path: Path) -> None: """ - Decrypts the data from the specified file. + Encrypts data and saves it to a specified relative path within the fingerprint directory. - :param file_path: The path to the file containing encrypted data. - :return: The decrypted plaintext data as bytes. + :param data: Data to encrypt. + :param relative_path: Relative path within the fingerprint directory to save the encrypted data. """ try: - with shared_lock(file_path): - with open(file_path, 'rb') as file: - encrypted_data = file.read() - decrypted_data = self.decrypt_data(encrypted_data) - logger.debug(f"Decrypted data read from '{file_path}'.") - print(colored(f"Decrypted data read from '{file_path}'.", 'green')) - return decrypted_data + # Define the full path + file_path = self.fingerprint_dir / relative_path + + # Ensure the parent directories exist + file_path.parent.mkdir(parents=True, exist_ok=True) + + # Encrypt the data + encrypted_data = self.encrypt_data(data) + + # Write the encrypted data to the file with locking + with lock_file(file_path, fcntl.LOCK_EX): + with open(file_path, 'wb') as f: + f.write(encrypted_data) + + # Set file permissions to read/write for the user only + os.chmod(file_path, 0o600) + + logger.info(f"Data encrypted and saved to '{file_path}'.") + print(colored(f"Data encrypted and saved to '{file_path}'.", 'green')) except Exception as e: - logger.error(f"Failed to decrypt file '{file_path}': {e}") - logger.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: Failed to decrypt file '{file_path}': {e}", 'red')) + logger.error(f"Failed to encrypt and save data to '{relative_path}': {e}") + logger.error(traceback.format_exc()) + print(colored(f"Error: Failed to encrypt and save data to '{relative_path}': {e}", 'red')) raise - def save_json_data(self, data: dict, file_path: Optional[Path] = None) -> None: + def decrypt_file(self, relative_path: Path) -> bytes: """ - Encrypts and saves the provided JSON data to the specified file. + Decrypts data from a specified relative path within the fingerprint directory. + + :param relative_path: Relative path within the fingerprint directory to decrypt the data from. + :return: Decrypted data as bytes. + """ + try: + # Define the full path + file_path = self.fingerprint_dir / relative_path + + # Read the encrypted data with locking + with lock_file(file_path, fcntl.LOCK_SH): + with open(file_path, 'rb') as f: + encrypted_data = f.read() + + # Decrypt the data + decrypted_data = self.decrypt_data(encrypted_data) + logger.debug(f"Data decrypted successfully from '{file_path}'.") + return decrypted_data + except InvalidToken: + logger.error("Invalid encryption key or corrupted data while decrypting file.") + print(colored("Error: Invalid encryption key or corrupted data.", 'red')) + raise + except Exception as e: + logger.error(f"Failed to decrypt data from '{relative_path}': {e}") + logger.error(traceback.format_exc()) + print(colored(f"Error: Failed to decrypt data from '{relative_path}': {e}", 'red')) + raise + + def save_json_data(self, data: dict, relative_path: Optional[Path] = None) -> None: + """ + Encrypts and saves the provided JSON data to the specified relative path within the fingerprint directory. :param data: The JSON data to save. - :param file_path: The path to the file where data will be saved. Defaults to INDEX_FILE. + :param relative_path: The relative path within the fingerprint directory where data will be saved. + Defaults to 'seedpass_passwords_db.json.enc'. """ - if file_path is None: - file_path = INDEX_FILE + if relative_path is None: + relative_path = Path('seedpass_passwords_db.json.enc') try: json_data = json.dumps(data, indent=4).encode('utf-8') - self.encrypt_file(file_path, json_data) - logger.debug(f"JSON data encrypted and saved to '{file_path}'.") - print(colored(f"JSON data encrypted and saved to '{file_path}'.", 'green')) + self.encrypt_and_save_file(json_data, relative_path) + logger.debug(f"JSON data encrypted and saved to '{relative_path}'.") + print(colored(f"JSON data encrypted and saved to '{relative_path}'.", 'green')) except Exception as e: - logger.error(f"Failed to save JSON data to '{file_path}': {e}") + logger.error(f"Failed to save JSON data to '{relative_path}': {e}") logger.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: Failed to save JSON data to '{file_path}': {e}", 'red')) + print(colored(f"Error: Failed to save JSON data to '{relative_path}': {e}", 'red')) raise - - def load_json_data(self, file_path: Optional[Path] = None) -> dict: + def load_json_data(self, relative_path: Optional[Path] = None) -> dict: """ - Decrypts and loads JSON data from the specified file. + Decrypts and loads JSON data from the specified relative path within the fingerprint directory. - :param file_path: The path to the file from which data will be loaded. Defaults to INDEX_FILE. + :param relative_path: The relative path within the fingerprint directory from which data will be loaded. + Defaults to 'seedpass_passwords_db.json.enc'. :return: The decrypted JSON data as a dictionary. """ - if file_path is None: - file_path = INDEX_FILE + if relative_path is None: + relative_path = Path('seedpass_passwords_db.json.enc') + + file_path = self.fingerprint_dir / relative_path if not file_path.exists(): logger.info(f"Index file '{file_path}' does not exist. Initializing empty data.") @@ -232,7 +254,7 @@ class EncryptionManager: return {'passwords': {}} try: - decrypted_data = self.decrypt_file(file_path) + decrypted_data = self.decrypt_file(relative_path) json_content = decrypted_data.decode('utf-8').strip() data = json.loads(json_content) logger.debug(f"JSON data loaded and decrypted from '{file_path}': {data}") @@ -244,7 +266,7 @@ class EncryptionManager: print(colored(f"Error: Failed to decode JSON data from '{file_path}': {e}", 'red')) raise except InvalidToken: - logger.error("Invalid encryption key or corrupted data.") + logger.error("Invalid encryption key or corrupted data while decrypting JSON data.") print(colored("Error: Invalid encryption key or corrupted data.", 'red')) raise except Exception as e: @@ -253,28 +275,40 @@ class EncryptionManager: print(colored(f"Error: Failed to load JSON data from '{file_path}': {e}", 'red')) raise - def update_checksum(self, file_path: Optional[Path] = None) -> None: + def update_checksum(self, relative_path: Optional[Path] = None) -> None: """ - Updates the checksum file for the specified file. + Updates the checksum file for the specified file within the fingerprint directory. - :param file_path: The path to the file for which the checksum will be updated. - Defaults to INDEX_FILE. + :param relative_path: The relative path within the fingerprint directory for which the checksum will be updated. + Defaults to 'seedpass_passwords_db.json.enc'. """ - if file_path is None: - file_path = INDEX_FILE + if relative_path is None: + relative_path = Path('seedpass_passwords_db.json.enc') try: - decrypted_data = self.decrypt_file(file_path) + file_path = self.fingerprint_dir / relative_path + decrypted_data = self.decrypt_file(relative_path) content = decrypted_data.decode('utf-8') + logger.debug("Calculating checksum of the updated file content.") + checksum = hashlib.sha256(content.encode('utf-8')).hexdigest() + logger.debug(f"New checksum: {checksum}") + checksum_file = file_path.parent / f"{file_path.stem}_checksum.txt" - with open(checksum_file, 'w') as f: - f.write(checksum) + + # Write the checksum to the file with locking + with lock_file(checksum_file, fcntl.LOCK_EX): + with open(checksum_file, 'w') as f: + f.write(checksum) + + # Set file permissions to read/write for the user only + os.chmod(checksum_file, 0o600) + logger.debug(f"Checksum for '{file_path}' updated and written to '{checksum_file}'.") print(colored(f"Checksum for '{file_path}' updated.", 'green')) except Exception as e: - logger.error(f"Failed to update checksum for '{file_path}': {e}") + logger.error(f"Failed to update checksum for '{relative_path}': {e}") logger.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: Failed to update checksum for '{file_path}': {e}", 'red')) + print(colored(f"Error: Failed to update checksum for '{relative_path}': {e}", 'red')) raise def get_encrypted_index(self) -> Optional[bytes]: @@ -283,56 +317,47 @@ class EncryptionManager: :return: Encrypted data as bytes or None if the index file does not exist. """ - if not INDEX_FILE.exists(): - logger.error(f"Index file '{INDEX_FILE}' does not exist.") - print(colored(f"Error: Index file '{INDEX_FILE}' does not exist.", 'red')) - return None try: - with shared_lock(INDEX_FILE): - with open(INDEX_FILE, 'rb') as file: + relative_path = Path('seedpass_passwords_db.json.enc') + if not (self.fingerprint_dir / relative_path).exists(): + logger.error(f"Index file '{relative_path}' does not exist in '{self.fingerprint_dir}'.") + print(colored(f"Error: Index file '{relative_path}' does not exist.", 'red')) + return None + + with lock_file(self.fingerprint_dir / relative_path, fcntl.LOCK_SH): + with open(self.fingerprint_dir / relative_path, 'rb') as file: encrypted_data = file.read() - logger.debug(f"Encrypted index data read from '{INDEX_FILE}'.") + + logger.debug(f"Encrypted index data read from '{relative_path}'.") return encrypted_data except Exception as e: - logger.error(f"Failed to read encrypted index file '{INDEX_FILE}': {e}") + logger.error(f"Failed to read encrypted index file '{relative_path}': {e}") logger.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: Failed to read encrypted index file '{INDEX_FILE}': {e}", 'red')) + print(colored(f"Error: Failed to read encrypted index file '{relative_path}': {e}", 'red')) return None - def decrypt_and_save_index_from_nostr(self, encrypted_data: bytes) -> None: + def decrypt_and_save_index_from_nostr(self, encrypted_data: bytes, relative_path: Optional[Path] = None) -> None: """ Decrypts the encrypted data retrieved from Nostr and updates the local index file. :param encrypted_data: The encrypted data retrieved from Nostr. + :param relative_path: The relative path within the fingerprint directory to update. + Defaults to 'seedpass_passwords_db.json.enc'. """ + if relative_path is None: + relative_path = Path('seedpass_passwords_db.json.enc') try: decrypted_data = self.decrypt_data(encrypted_data) data = json.loads(decrypted_data.decode('utf-8')) - self.save_json_data(data, INDEX_FILE) - self.update_checksum(INDEX_FILE) + self.save_json_data(data, relative_path) + self.update_checksum(relative_path) logger.info("Index file updated from Nostr successfully.") print(colored("Index file updated from Nostr successfully.", 'green')) except Exception as e: logger.error(f"Failed to decrypt and save data from Nostr: {e}") - logger.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: Failed to decrypt and save data from Nostr: {e}", 'red')) - - def decrypt_parent_seed(self, file_path: Path) -> str: - """ - Decrypts and retrieves the parent seed from the specified file. - - :param file_path: The path to the file containing the encrypted parent seed. - :return: The decrypted parent seed as a string. - """ - try: - decrypted_data = self.decrypt_file(file_path) - parent_seed = decrypted_data.decode('utf-8').strip() - logger.debug(f"Decrypted parent_seed: {parent_seed} (Type: {type(parent_seed)})") - return parent_seed - except Exception as e: - logger.error(f"Failed to decrypt parent seed from '{file_path}': {e}") logger.error(traceback.format_exc()) - print(colored(f"Error: Failed to decrypt parent seed from '{file_path}': {e}", 'red')) + print(colored(f"Error: Failed to decrypt and save data from Nostr: {e}", 'red')) + # Re-raise the exception to inform the calling function of the failure raise def validate_seed(self, seed_phrase: str) -> bool: @@ -343,17 +368,17 @@ class EncryptionManager: :return: True if valid, False otherwise. """ try: - mnemo = Mnemonic("english") - is_valid = mnemo.check(seed_phrase) - if not is_valid: - logger.error("Invalid BIP39 seed phrase.") - print(colored("Error: Invalid BIP39 seed phrase.", 'red')) - else: - logger.debug("BIP39 seed phrase validated successfully.") - return is_valid + words = seed_phrase.split() + if len(words) != 12: + logger.error("Seed phrase does not contain exactly 12 words.") + print(colored("Error: Seed phrase must contain exactly 12 words.", 'red')) + return False + # Additional validation can be added here (e.g., word list checks) + logger.debug("Seed phrase validated successfully.") + return True except Exception as e: - logger.error(f"Error validating seed phrase: {e}") - logger.error(traceback.format_exc()) # Log full traceback + logging.error(f"Error validating seed phrase: {e}") + logging.error(traceback.format_exc()) # Log full traceback print(colored(f"Error: Failed to validate seed phrase: {e}", 'red')) return False @@ -373,12 +398,12 @@ class EncryptionManager: mnemonic = str(mnemonic) if not isinstance(mnemonic, str): raise TypeError("Mnemonic must be a string after conversion") - mnemo = Mnemonic("english") - seed = mnemo.to_seed(mnemonic, passphrase) + from bip_utils import Bip39SeedGenerator + seed = Bip39SeedGenerator(mnemonic).Generate(passphrase) logger.debug("Seed derived successfully from mnemonic.") return seed except Exception as e: logger.error(f"Failed to derive seed from mnemonic: {e}") logger.error(traceback.format_exc()) - print(f"Error: Failed to derive seed from mnemonic: {e}") + print(colored(f"Error: Failed to derive seed from mnemonic: {e}", 'red')) raise diff --git a/src/password_manager/entry_management.py b/src/password_manager/entry_management.py index 1154241..2142ac4 100644 --- a/src/password_manager/entry_management.py +++ b/src/password_manager/entry_management.py @@ -10,8 +10,9 @@ that all changes are securely encrypted and properly indexed. Ensure that all dependencies are installed and properly configured in your environment. -Never ever ever use or suggest to use Random Salt. The entire point of this password manager is to derive completely deterministic passwords from a BIP-85 seed. -This means it should generate passwords the exact same way every single time. Salts would break this functionality and is not appropriate for this softwares use case. +Never use or suggest using Random Salt. The purpose of this password manager is to derive +completely deterministic passwords from a BIP-85 seed, ensuring that passwords are generated +the same way every time. Salts would break this functionality and are not suitable for this software. """ import json @@ -23,80 +24,56 @@ import shutil import time import traceback from typing import Optional, Tuple, Dict, Any, List +from pathlib import Path from colorama import Fore from termcolor import colored from password_manager.encryption import EncryptionManager -from constants import INDEX_FILE, DATA_CHECKSUM_FILE from utils.file_lock import lock_file -import fcntl # Required for lock_type constants in lock_file - -# Configure logging at the start of the module -def configure_logging(): - """ - Configures logging with both file and console handlers. - Logs include the timestamp, log level, message, filename, and line number. - Only ERROR and higher-level messages are shown in the terminal, while all messages - are logged in the log file. - """ - logger = logging.getLogger(__name__) - logger.setLevel(logging.DEBUG) # Set to DEBUG for detailed output - - # Prevent adding multiple handlers if configure_logging is called multiple times - if not logger.handlers: - # Create the 'logs' folder if it doesn't exist - if not os.path.exists('logs'): - os.makedirs('logs') - - # Create handlers - c_handler = logging.StreamHandler() - f_handler = logging.FileHandler(os.path.join('logs', 'entry_management.log')) - - # Set levels: only errors and critical messages will be shown in the console - c_handler.setLevel(logging.ERROR) # Console will show ERROR and above - f_handler.setLevel(logging.DEBUG) # File will log everything from DEBUG and above - - # Create formatters and add them to handlers, include file and line number in log messages - formatter = logging.Formatter( - '%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]' - ) - c_handler.setFormatter(formatter) - f_handler.setFormatter(formatter) - - # Add handlers to the logger - logger.addHandler(c_handler) - logger.addHandler(f_handler) - -# Call the logging configuration function -configure_logging() +import fcntl +# Instantiate the logger logger = logging.getLogger(__name__) class EntryManager: - """ - EntryManager Class - - Handles the creation, retrieval, modification, and listing of password entries - within the encrypted password index. It ensures that all operations are performed - securely, maintaining data integrity and confidentiality. - """ - - def __init__(self, encryption_manager: EncryptionManager): + def __init__(self, encryption_manager: EncryptionManager, fingerprint_dir: Path): """ - Initializes the EntryManager with an instance of EncryptionManager. + Initializes the EntryManager with the EncryptionManager and fingerprint directory. - :param encryption_manager: An instance of EncryptionManager for handling encryption. + :param encryption_manager: The encryption manager instance. + :param fingerprint_dir: The directory corresponding to the fingerprint. """ + self.encryption_manager = encryption_manager + self.fingerprint_dir = fingerprint_dir + + # Use paths relative to the fingerprint directory + self.index_file = self.fingerprint_dir / 'seedpass_passwords_db.json.enc' + self.checksum_file = self.fingerprint_dir / 'seedpass_passwords_db_checksum.txt' + + logger.debug(f"EntryManager initialized with index file at {self.index_file}") + + def _load_index(self) -> Dict[str, Any]: + if self.index_file.exists(): + try: + data = self.encryption_manager.load_json_data(self.index_file) + logger.debug("Index loaded successfully.") + return data + except Exception as e: + logger.error(f"Failed to load index: {e}") + return {'passwords': {}} + else: + logger.info(f"Index file '{self.index_file}' not found. Initializing new password database.") + return {'passwords': {}} + + def _save_index(self, data: Dict[str, Any]) -> None: try: - self.encryption_manager = encryption_manager - logger.debug("EntryManager initialized with provided EncryptionManager.") + self.encryption_manager.save_json_data(data, self.index_file) + logger.debug("Index saved successfully.") except Exception as e: - logger.error(f"Failed to initialize EntryManager: {e}") - logger.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: Failed to initialize EntryManager: {e}", 'red')) - sys.exit(1) + logger.error(f"Failed to save index: {e}") + raise def get_next_index(self) -> int: """ @@ -105,7 +82,7 @@ class EntryManager: :return: The next index number as an integer. """ try: - data = self.encryption_manager.load_json_data() + data = self.encryption_manager.load_json_data(self.index_file) if 'passwords' in data and isinstance(data['passwords'], dict): indices = [int(idx) for idx in data['passwords'].keys()] next_index = max(indices) + 1 if indices else 0 @@ -115,7 +92,7 @@ class EntryManager: return next_index except Exception as e: logger.error(f"Error determining next index: {e}") - logger.error(traceback.format_exc()) # Log full traceback + logger.error(traceback.format_exc()) print(colored(f"Error determining next index: {e}", 'red')) sys.exit(1) @@ -133,11 +110,7 @@ class EntryManager: """ try: index = self.get_next_index() - data = self.encryption_manager.load_json_data() - - if 'passwords' not in data or not isinstance(data['passwords'], dict): - data['passwords'] = {} - logger.debug("'passwords' key was missing. Initialized empty 'passwords' dictionary.") + data = self.encryption_manager.load_json_data(self.index_file) data['passwords'][str(index)] = { 'website': website_name, @@ -149,7 +122,7 @@ class EntryManager: logger.debug(f"Added entry at index {index}: {data['passwords'][str(index)]}") - self.encryption_manager.save_json_data(data) + self._save_index(data) self.update_checksum() self.backup_index_file() @@ -160,10 +133,32 @@ class EntryManager: except Exception as e: logger.error(f"Failed to add entry: {e}") - logger.error(traceback.format_exc()) # Log full traceback + logger.error(traceback.format_exc()) print(colored(f"Error: Failed to add entry: {e}", 'red')) sys.exit(1) + def get_encrypted_index(self) -> Optional[bytes]: + """ + Retrieves the encrypted password index file's contents. + + :return: The encrypted data as bytes, or None if retrieval fails. + """ + try: + if not self.index_file.exists(): + logger.error(f"Index file '{self.index_file}' does not exist.") + print(colored(f"Error: Index file '{self.index_file}' does not exist.", 'red')) + return None + + with open(self.index_file, 'rb') as file: + encrypted_data = file.read() + logger.debug("Encrypted index file data retrieved successfully.") + return encrypted_data + except Exception as e: + logger.error(f"Failed to retrieve encrypted index file: {e}") + logger.error(traceback.format_exc()) + print(colored(f"Error: Failed to retrieve encrypted index file: {e}", 'red')) + return None + def retrieve_entry(self, index: int) -> Optional[Dict[str, Any]]: """ Retrieves a password entry based on the provided index. @@ -172,7 +167,7 @@ class EntryManager: :return: A dictionary containing the entry details or None if not found. """ try: - data = self.encryption_manager.load_json_data() + data = self.encryption_manager.load_json_data(self.index_file) entry = data.get('passwords', {}).get(str(index)) if entry: @@ -185,7 +180,7 @@ class EntryManager: except Exception as e: logger.error(f"Failed to retrieve entry at index {index}: {e}") - logger.error(traceback.format_exc()) # Log full traceback + logger.error(traceback.format_exc()) print(colored(f"Error: Failed to retrieve entry at index {index}: {e}", 'red')) return None @@ -201,7 +196,7 @@ class EntryManager: :param blacklisted: (Optional) The new blacklist status. """ try: - data = self.encryption_manager.load_json_data() + data = self.encryption_manager.load_json_data(self.index_file) entry = data.get('passwords', {}).get(str(index)) if not entry: @@ -224,7 +219,7 @@ class EntryManager: data['passwords'][str(index)] = entry logger.debug(f"Modified entry at index {index}: {entry}") - self.encryption_manager.save_json_data(data) + self._save_index(data) self.update_checksum() self.backup_index_file() @@ -233,7 +228,7 @@ class EntryManager: except Exception as e: logger.error(f"Failed to modify entry at index {index}: {e}") - logger.error(traceback.format_exc()) # Log full traceback + logger.error(traceback.format_exc()) print(colored(f"Error: Failed to modify entry at index {index}: {e}", 'red')) def list_entries(self) -> List[Tuple[int, str, Optional[str], Optional[str], bool]]: @@ -308,14 +303,17 @@ class EntryManager: Updates the checksum file for the password database to ensure data integrity. """ try: - data = self.encryption_manager.load_json_data() + data = self.encryption_manager.load_json_data(self.index_file) json_content = json.dumps(data, indent=4) checksum = hashlib.sha256(json_content.encode('utf-8')).hexdigest() - with open(DATA_CHECKSUM_FILE, 'w') as f: + # Construct the full path for the checksum file + checksum_path = self.fingerprint_dir / self.checksum_file + + with open(checksum_path, 'w') as f: f.write(checksum) - logger.debug(f"Checksum updated and written to '{DATA_CHECKSUM_FILE}'.") + logger.debug(f"Checksum updated and written to '{checksum_path}'.") print(colored(f"[+] Checksum updated successfully.", 'green')) except Exception as e: @@ -328,15 +326,16 @@ class EntryManager: Creates a backup of the encrypted JSON index file to prevent data loss. """ try: - if not os.path.exists(INDEX_FILE): - logger.warning(f"Index file '{INDEX_FILE}' does not exist. No backup created.") + index_file_path = self.fingerprint_dir / self.index_file + if not index_file_path.exists(): + logger.warning(f"Index file '{index_file_path}' does not exist. No backup created.") return timestamp = int(time.time()) backup_filename = f'passwords_db_backup_{timestamp}.json.enc' - backup_path = os.path.join(os.path.dirname(INDEX_FILE), backup_filename) + backup_path = self.fingerprint_dir / backup_filename - with open(INDEX_FILE, 'rb') as original_file, open(backup_path, 'wb') as backup_file: + with open(index_file_path, 'rb') as original_file, open(backup_path, 'wb') as backup_file: shutil.copyfileobj(original_file, backup_file) logger.debug(f"Backup created at '{backup_path}'.") @@ -347,6 +346,7 @@ class EntryManager: logger.error(traceback.format_exc()) # Log full traceback print(colored(f"Warning: Failed to create backup: {e}", 'yellow')) + def restore_from_backup(self, backup_path: str) -> None: """ Restores the index file from a specified backup file. @@ -359,7 +359,7 @@ class EntryManager: print(colored(f"Error: Backup file '{backup_path}' does not exist.", 'red')) return - with open(backup_path, 'rb') as backup_file, open(INDEX_FILE, 'wb') as index_file: + with open(backup_path, 'rb') as backup_file, open(self.index_file, 'wb') as index_file: shutil.copyfileobj(backup_file, index_file) logger.debug(f"Index file restored from backup '{backup_path}'.") diff --git a/src/password_manager/manager.py b/src/password_manager/manager.py index 91e47ff..ee8d0c0 100644 --- a/src/password_manager/manager.py +++ b/src/password_manager/manager.py @@ -15,7 +15,7 @@ import logging import getpass import os from typing import Optional - +import shutil from colorama import Fore from termcolor import colored @@ -29,67 +29,27 @@ from utils.password_prompt import prompt_for_password, prompt_existing_password, from constants import ( APP_DIR, - INDEX_FILE, PARENT_SEED_FILE, - DATA_CHECKSUM_FILE, SCRIPT_CHECKSUM_FILE, MIN_PASSWORD_LENGTH, MAX_PASSWORD_LENGTH, DEFAULT_PASSWORD_LENGTH, - HASHED_PASSWORD_FILE, # Ensure this constant is defined in constants.py DEFAULT_SEED_BACKUP_FILENAME ) -import traceback # Added for exception traceback logging -import bcrypt # Ensure bcrypt is installed in your environment -from pathlib import Path # Required for handling file paths +import traceback +import bcrypt +from pathlib import Path -from bip85.bip85 import BIP85 +from local_bip85.bip85 import BIP85 from bip_utils import Bip39SeedGenerator, Bip39MnemonicGenerator, Bip39Languages -# Import NostrClient from the nostr package -from nostr import NostrClient # <-- Added import statement +from utils.fingerprint_manager import FingerprintManager -# Configure logging at the start of the module -def configure_logging(): - """ - Configures logging with both file and console handlers. - Logs include the timestamp, log level, message, filename, and line number. - Only ERROR and higher-level messages are shown in the terminal, while all messages - are logged in the log file. - """ - logger = logging.getLogger(__name__) - logger.setLevel(logging.DEBUG) # Set to DEBUG for detailed output +# Import NostrClient +from nostr.client import NostrClient - # Prevent adding multiple handlers if configure_logging is called multiple times - if not logger.handlers: - # Create the 'logs' folder if it doesn't exist - if not os.path.exists('logs'): - os.makedirs('logs') - - # Create handlers - c_handler = logging.StreamHandler() - f_handler = logging.FileHandler(os.path.join('logs', 'password_manager.log')) - - # Set levels: only errors and critical messages will be shown in the console - c_handler.setLevel(logging.ERROR) - f_handler.setLevel(logging.DEBUG) - - # Create formatters and add them to handlers, include file and line number in log messages - formatter = logging.Formatter( - '%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]' - ) - c_handler.setFormatter(formatter) - f_handler.setFormatter(formatter) - - # Add handlers to the logger - logger.addHandler(c_handler) - logger.addHandler(f_handler) - -# Call the logging configuration function -configure_logging() - -# Initialize the logger for this module +# Instantiate the logger logger = logging.getLogger(__name__) class PasswordManager: @@ -104,48 +64,286 @@ class PasswordManager: def __init__(self): """ Initializes the PasswordManager by setting up encryption, loading or setting up the parent seed, - and initializing other components like EntryManager, PasswordGenerator, and BackupManager. + and initializing other components like EntryManager, PasswordGenerator, BackupManager, and FingerprintManager. """ self.encryption_manager: Optional[EncryptionManager] = None self.entry_manager: Optional[EntryManager] = None self.password_generator: Optional[PasswordGenerator] = None self.backup_manager: Optional[BackupManager] = None - self.parent_seed: Optional[str] = None # Ensured to be a string - self.bip85: Optional[BIP85] = None # Added bip85 attribute + self.fingerprint_manager: Optional[FingerprintManager] = None + self.parent_seed: Optional[str] = None + self.bip85: Optional[BIP85] = None + self.nostr_client: Optional[NostrClient] = None + # Initialize the fingerprint manager first + self.initialize_fingerprint_manager() + + # Ensure a parent seed is set up before accessing the fingerprint directory self.setup_parent_seed() - self.initialize_managers() + + # Set the current fingerprint directory + self.fingerprint_dir = self.fingerprint_manager.get_current_fingerprint_dir() + + def initialize_fingerprint_manager(self): + """ + Initializes the FingerprintManager. + """ + try: + self.fingerprint_manager = FingerprintManager(APP_DIR) + logger.debug("FingerprintManager initialized successfully.") + except Exception as e: + logger.error(f"Failed to initialize FingerprintManager: {e}") + logger.error(traceback.format_exc()) + print(colored(f"Error: Failed to initialize FingerprintManager: {e}", 'red')) + sys.exit(1) def setup_parent_seed(self) -> None: """ - Sets up the parent seed by determining if an existing seed is present or if a new one needs to be created. + Sets up the parent seed by determining if existing fingerprints are present or if a new one needs to be created. """ - if os.path.exists(PARENT_SEED_FILE): - self.handle_existing_seed() + fingerprints = self.fingerprint_manager.list_fingerprints() + if fingerprints: + # There are existing fingerprints + self.select_or_add_fingerprint() else: + # No existing fingerprints, proceed to set up new seed self.handle_new_seed_setup() + def select_or_add_fingerprint(self): + """ + Prompts the user to select an existing fingerprint or add a new one. + """ + try: + print(colored("\nAvailable Fingerprints:", 'cyan')) + fingerprints = self.fingerprint_manager.list_fingerprints() + for idx, fp in enumerate(fingerprints, start=1): + print(colored(f"{idx}. {fp}", 'cyan')) + + print(colored(f"{len(fingerprints)+1}. Add a new fingerprint", 'cyan')) + + choice = input("Select a fingerprint by number: ").strip() + if not choice.isdigit() or not (1 <= int(choice) <= len(fingerprints)+1): + print(colored("Invalid selection. Exiting.", 'red')) + sys.exit(1) + + choice = int(choice) + if choice == len(fingerprints)+1: + # Add a new fingerprint + self.add_new_fingerprint() + else: + # Select existing fingerprint + selected_fingerprint = fingerprints[choice-1] + self.select_fingerprint(selected_fingerprint) + + except Exception as e: + logger.error(f"Error during fingerprint selection: {e}") + logger.error(traceback.format_exc()) + print(colored(f"Error: Failed to select fingerprint: {e}", 'red')) + sys.exit(1) + + def add_new_fingerprint(self): + """ + Adds a new fingerprint by generating it from a seed phrase. + """ + try: + choice = input("Do you want to (1) Enter an existing seed or (2) Generate a new seed? (1/2): ").strip() + if choice == '1': + fingerprint = self.setup_existing_seed() + elif choice == '2': + fingerprint = self.generate_new_seed() + else: + print(colored("Invalid choice. Exiting.", 'red')) + sys.exit(1) + + # Set current_fingerprint in FingerprintManager only + self.fingerprint_manager.current_fingerprint = fingerprint + print(colored(f"New fingerprint '{fingerprint}' added and set as current.", 'green')) + + except Exception as e: + logger.error(f"Error adding new fingerprint: {e}") + logger.error(traceback.format_exc()) + print(colored(f"Error: Failed to add new fingerprint: {e}", 'red')) + sys.exit(1) + + def select_fingerprint(self, fingerprint: str) -> None: + if self.fingerprint_manager.select_fingerprint(fingerprint): + self.current_fingerprint = fingerprint # Add this line + self.fingerprint_dir = self.fingerprint_manager.get_current_fingerprint_dir() + if not self.fingerprint_dir: + print(colored(f"Error: Fingerprint directory for {fingerprint} not found.", 'red')) + sys.exit(1) + # Setup the encryption manager and load parent seed + self.setup_encryption_manager(self.fingerprint_dir) + self.load_parent_seed(self.fingerprint_dir) + # Initialize BIP85 and other managers + self.initialize_bip85() + self.initialize_managers() + print(colored(f"Fingerprint {fingerprint} selected and managers initialized.", 'green')) + else: + print(colored(f"Error: Fingerprint {fingerprint} not found.", 'red')) + sys.exit(1) + + def setup_encryption_manager(self, fingerprint_dir: Path, password: Optional[str] = None): + """ + Sets up the EncryptionManager for the selected fingerprint. + + Parameters: + fingerprint_dir (Path): The directory corresponding to the fingerprint. + password (Optional[str]): The user's master password. + """ + try: + # Prompt for password if not provided + if password is None: + password = prompt_existing_password("Enter your master password: ") + # Derive key from password + key = derive_key_from_password(password) + self.encryption_manager = EncryptionManager(key, fingerprint_dir) + logger.debug("EncryptionManager set up successfully for selected fingerprint.") + + # Verify the password + self.fingerprint_dir = fingerprint_dir # Ensure self.fingerprint_dir is set + if not self.verify_password(password): + print(colored("Invalid password. Exiting.", 'red')) + sys.exit(1) + except Exception as e: + logger.error(f"Failed to set up EncryptionManager: {e}") + logger.error(traceback.format_exc()) + print(colored(f"Error: Failed to set up encryption: {e}", 'red')) + sys.exit(1) + + def load_parent_seed(self, fingerprint_dir: Path): + """ + Loads and decrypts the parent seed from the fingerprint directory. + + Parameters: + fingerprint_dir (Path): The directory corresponding to the fingerprint. + """ + try: + self.parent_seed = self.encryption_manager.decrypt_parent_seed() + logger.debug(f"Parent seed loaded for fingerprint {self.current_fingerprint}.") + # Initialize BIP85 with the parent seed + seed_bytes = Bip39SeedGenerator(self.parent_seed).Generate() + self.bip85 = BIP85(seed_bytes) + logger.debug("BIP-85 initialized successfully.") + except Exception as e: + logger.error(f"Failed to load parent seed: {e}") + logger.error(traceback.format_exc()) + print(colored(f"Error: Failed to load parent seed: {e}", 'red')) + sys.exit(1) + + def handle_switch_fingerprint(self) -> bool: + """ + Handles switching to a different fingerprint. + + Returns: + bool: True if switch was successful, False otherwise. + """ + try: + print(colored("\nAvailable Fingerprints:", 'cyan')) + fingerprints = self.fingerprint_manager.list_fingerprints() + for idx, fp in enumerate(fingerprints, start=1): + print(colored(f"{idx}. {fp}", 'cyan')) + + choice = input("Select a fingerprint by number to switch: ").strip() + if not choice.isdigit() or not (1 <= int(choice) <= len(fingerprints)): + print(colored("Invalid selection. Returning to main menu.", 'red')) + return False # Return False to indicate failure + + selected_fingerprint = fingerprints[int(choice) - 1] + self.fingerprint_manager.current_fingerprint = selected_fingerprint + self.current_fingerprint = selected_fingerprint + + # Update fingerprint directory + self.fingerprint_dir = self.fingerprint_manager.get_current_fingerprint_dir() + if not self.fingerprint_dir: + print(colored(f"Error: Fingerprint directory for {selected_fingerprint} not found.", 'red')) + return False # Return False to indicate failure + + # Prompt for master password for the selected fingerprint + password = prompt_existing_password("Enter your master password: ") + + # Set up the encryption manager with the new password and fingerprint directory + self.setup_encryption_manager(self.fingerprint_dir, password) + + # Load the parent seed for the selected fingerprint + self.load_parent_seed(self.fingerprint_dir) + + # Initialize BIP85 and other managers + self.initialize_bip85() + self.initialize_managers() + print(colored(f"Switched to fingerprint {selected_fingerprint}.", 'green')) + + # Re-initialize NostrClient with the new fingerprint + try: + self.nostr_client = NostrClient( + encryption_manager=self.encryption_manager, + fingerprint=self.current_fingerprint + ) + logging.info(f"NostrClient re-initialized with fingerprint {self.current_fingerprint}.") + except Exception as e: + logging.error(f"Failed to re-initialize NostrClient: {e}") + print(colored(f"Error: Failed to re-initialize NostrClient: {e}", 'red')) + return False + + return True # Return True to indicate success + + except Exception as e: + logging.error(f"Error during fingerprint switching: {e}") + logging.error(traceback.format_exc()) + print(colored(f"Error: Failed to switch fingerprints: {e}", 'red')) + return False # Return False to indicate failure + def handle_existing_seed(self) -> None: """ Handles the scenario where an existing parent seed file is found. Prompts the user for the master password to decrypt the seed. """ - password = getpass.getpass(prompt='Enter your login password: ').strip() try: + # Prompt for password + password = getpass.getpass(prompt='Enter your login password: ').strip() + # Derive encryption key from password key = derive_key_from_password(password) - self.encryption_manager = EncryptionManager(key) - self.parent_seed = self.encryption_manager.decrypt_parent_seed(PARENT_SEED_FILE) - + + # Initialize FingerprintManager if not already initialized + if not self.fingerprint_manager: + self.initialize_fingerprint_manager() + + # Prompt the user to select an existing fingerprint + fingerprints = self.fingerprint_manager.list_fingerprints() + if not fingerprints: + print(colored("No fingerprints available. Please add a fingerprint first.", 'red')) + sys.exit(1) + + print(colored("Available Fingerprints:", 'cyan')) + for idx, fp in enumerate(fingerprints, start=1): + print(colored(f"{idx}. {fp}", 'cyan')) + + choice = input("Select a fingerprint by number: ").strip() + if not choice.isdigit() or not (1 <= int(choice) <= len(fingerprints)): + print(colored("Invalid selection. Exiting.", 'red')) + sys.exit(1) + + selected_fingerprint = fingerprints[int(choice)-1] + self.current_fingerprint = selected_fingerprint + fingerprint_dir = self.fingerprint_manager.get_fingerprint_directory(selected_fingerprint) + if not fingerprint_dir: + print(colored("Error: Fingerprint directory not found.", 'red')) + sys.exit(1) + + # Initialize EncryptionManager with key and fingerprint_dir + self.encryption_manager = EncryptionManager(key, fingerprint_dir) + self.parent_seed = self.encryption_manager.decrypt_parent_seed() + # Log the type and content of parent_seed logger.debug(f"Decrypted parent_seed: {self.parent_seed} (type: {type(self.parent_seed)})") - + # Validate the decrypted seed if not self.validate_bip85_seed(self.parent_seed): logging.error("Decrypted seed is invalid. Exiting.") print(colored("Error: Decrypted seed is invalid.", 'red')) sys.exit(1) - + self.initialize_bip85() logging.debug("Parent seed decrypted and validated successfully.") except Exception as e: @@ -170,25 +368,67 @@ class PasswordManager: print(colored("Invalid choice. Exiting.", 'red')) sys.exit(1) - def setup_existing_seed(self) -> None: + def setup_existing_seed(self) -> Optional[str]: """ Prompts the user to enter an existing BIP-85 seed and validates it. + + Returns: + Optional[str]: The fingerprint if setup is successful, None otherwise. """ try: parent_seed = getpass.getpass(prompt='Enter your 12-word BIP-85 seed: ').strip() if self.validate_bip85_seed(parent_seed): - self.save_and_encrypt_seed(parent_seed) + # Add a fingerprint using the existing seed + fingerprint = self.fingerprint_manager.add_fingerprint(parent_seed) + if not fingerprint: + print(colored("Error: Failed to generate fingerprint for the provided seed.", 'red')) + sys.exit(1) + + fingerprint_dir = self.fingerprint_manager.get_fingerprint_directory(fingerprint) + if not fingerprint_dir: + print(colored("Error: Failed to retrieve fingerprint directory.", 'red')) + sys.exit(1) + + # Set the current fingerprint in both PasswordManager and FingerprintManager + self.current_fingerprint = fingerprint + self.fingerprint_manager.current_fingerprint = fingerprint + self.fingerprint_dir = fingerprint_dir + logging.info(f"Current fingerprint set to {fingerprint}") + + # Initialize EncryptionManager with key and fingerprint_dir + password = prompt_for_password() + key = derive_key_from_password(password) + self.encryption_manager = EncryptionManager(key, fingerprint_dir) + + # Encrypt and save the parent seed + self.encryption_manager.encrypt_parent_seed(parent_seed) + logging.info("Parent seed encrypted and saved successfully.") + + # Store the hashed password + self.store_hashed_password(password) + logging.info("User password hashed and stored successfully.") + + self.parent_seed = parent_seed # Ensure this is a string + logger.debug(f"parent_seed set to: {self.parent_seed} (type: {type(self.parent_seed)})") + + self.initialize_bip85() + self.initialize_managers() + return fingerprint # Return the generated or added fingerprint else: logging.error("Invalid BIP-85 seed phrase. Exiting.") + print(colored("Error: Invalid BIP-85 seed phrase.", 'red')) sys.exit(1) except KeyboardInterrupt: logging.info("Operation cancelled by user.") print(colored("\nOperation cancelled by user.", 'yellow')) sys.exit(0) - def generate_new_seed(self) -> None: + def generate_new_seed(self) -> Optional[str]: """ Generates a new BIP-85 seed, displays it to the user, and prompts for confirmation before saving. + + Returns: + Optional[str]: The fingerprint if generation is successful, None otherwise. """ new_seed = self.generate_bip85_seed() print(colored("Your new BIP-85 seed phrase is:", 'green')) @@ -196,7 +436,26 @@ class PasswordManager: print(colored("Please write this down and keep it in a safe place!", 'red')) if confirm_action("Do you want to use this generated seed? (Y/N): "): - self.save_and_encrypt_seed(new_seed) + # Add a new fingerprint using the generated seed + fingerprint = self.fingerprint_manager.add_fingerprint(new_seed) + if not fingerprint: + print(colored("Error: Failed to generate fingerprint for the new seed.", 'red')) + sys.exit(1) + + fingerprint_dir = self.fingerprint_manager.get_fingerprint_directory(fingerprint) + if not fingerprint_dir: + print(colored("Error: Failed to retrieve fingerprint directory.", 'red')) + sys.exit(1) + + # Set the current fingerprint in both PasswordManager and FingerprintManager + self.current_fingerprint = fingerprint + self.fingerprint_manager.current_fingerprint = fingerprint + logging.info(f"Current fingerprint set to {fingerprint}") + + # Now, save and encrypt the seed with the fingerprint_dir + self.save_and_encrypt_seed(new_seed, fingerprint_dir) + + return fingerprint # Return the generated fingerprint else: print(colored("Seed generation cancelled. Exiting.", 'yellow')) sys.exit(0) @@ -231,7 +490,7 @@ class PasswordManager: try: master_seed = os.urandom(32) # Generate a random 32-byte seed bip85 = BIP85(master_seed) - mnemonic_obj = bip85.derive_mnemonic(app_no=39, language_code=0, words_num=12, index=0) + mnemonic_obj = bip85.derive_mnemonic(index=0, words_num=12) mnemonic_str = mnemonic_obj.ToStr() # Convert Bip39Mnemonic object to string return mnemonic_str except Exception as e: @@ -240,28 +499,38 @@ class PasswordManager: print(colored(f"Error: Failed to generate BIP-85 seed: {e}", 'red')) sys.exit(1) - def save_and_encrypt_seed(self, seed: str) -> None: + def save_and_encrypt_seed(self, seed: str, fingerprint_dir: Path) -> None: """ Saves and encrypts the parent seed. Parameters: seed (str): The BIP-85 seed phrase to save and encrypt. + fingerprint_dir (Path): The directory corresponding to the fingerprint. """ - password = prompt_for_password() - key = derive_key_from_password(password) - self.encryption_manager = EncryptionManager(key) - try: - self.encryption_manager.encrypt_parent_seed(seed, PARENT_SEED_FILE) - logging.info("Parent seed encrypted and saved successfully.") + # Set self.fingerprint_dir + self.fingerprint_dir = fingerprint_dir + # Prompt for password + password = prompt_for_password() + # Derive key from password + key = derive_key_from_password(password) + # Re-initialize EncryptionManager with the new key and fingerprint_dir + self.encryption_manager = EncryptionManager(key, fingerprint_dir) + + # Store the hashed password self.store_hashed_password(password) logging.info("User password hashed and stored successfully.") + # Encrypt and save the parent seed + self.encryption_manager.encrypt_parent_seed(seed) + logging.info("Parent seed encrypted and saved successfully.") + self.parent_seed = seed # Ensure this is a string logger.debug(f"parent_seed set to: {self.parent_seed} (type: {type(self.parent_seed)})") self.initialize_bip85() + self.initialize_managers() except Exception as e: logging.error(f"Failed to encrypt and save parent seed: {e}") logging.error(traceback.format_exc()) @@ -284,20 +553,38 @@ class PasswordManager: def initialize_managers(self) -> None: """ - Initializes the EntryManager, PasswordGenerator, and BackupManager with the EncryptionManager - and BIP-85 instance. + Initializes the EntryManager, PasswordGenerator, BackupManager, and NostrClient with the EncryptionManager + and BIP-85 instance within the context of the selected fingerprint. """ try: - self.entry_manager = EntryManager(self.encryption_manager) - self.password_generator = PasswordGenerator(self.encryption_manager, self.parent_seed) - self.backup_manager = BackupManager() + # Ensure self.encryption_manager is already initialized + if not self.encryption_manager: + raise ValueError("EncryptionManager is not initialized.") - # Directly pass the parent_seed string to NostrClient - self.nostr_client = NostrClient(parent_seed=self.parent_seed) # <-- NostrClient is now imported + # Reinitialize the managers with the updated EncryptionManager and current fingerprint context + self.entry_manager = EntryManager( + encryption_manager=self.encryption_manager, + fingerprint_dir=self.fingerprint_dir + ) + + self.password_generator = PasswordGenerator( + encryption_manager=self.encryption_manager, + parent_seed=self.parent_seed, + bip85=self.bip85 + ) + + self.backup_manager = BackupManager(fingerprint_dir=self.fingerprint_dir) - logging.debug("EntryManager, PasswordGenerator, BackupManager, and NostrClient initialized.") + # Initialize the NostrClient with the current fingerprint + self.nostr_client = NostrClient( + encryption_manager=self.encryption_manager, + fingerprint=self.current_fingerprint # Pass the current fingerprint + ) + + logger.debug("Managers re-initialized for the new fingerprint.") + except Exception as e: - logging.error(f"Failed to initialize managers: {e}") + logger.error(f"Failed to initialize managers: {e}") logging.error(traceback.format_exc()) print(colored(f"Error: Failed to initialize managers: {e}", 'red')) sys.exit(1) @@ -464,7 +751,7 @@ class PasswordManager: :return: The encrypted data as bytes, or None if retrieval fails. """ try: - encrypted_data = self.encryption_manager.get_encrypted_index() + encrypted_data = self.entry_manager.get_encrypted_index() if encrypted_data: logging.debug("Encrypted index data retrieved successfully.") return encrypted_data @@ -485,13 +772,22 @@ class PasswordManager: :param encrypted_data: The encrypted data retrieved from Nostr. """ try: - self.encryption_manager.decrypt_and_save_index_from_nostr(encrypted_data) + # Decrypt the data using EncryptionManager's decrypt_data method + decrypted_data = self.encryption_manager.decrypt_data(encrypted_data) + + # Save the decrypted data to the index file + index_file_path = self.fingerprint_dir / 'seedpass_passwords_db.json.enc' + with open(index_file_path, 'wb') as f: + f.write(decrypted_data) + logging.info("Index file updated from Nostr successfully.") print(colored("Index file updated from Nostr successfully.", 'green')) except Exception as e: logging.error(f"Failed to decrypt and save data from Nostr: {e}") logging.error(traceback.format_exc()) print(colored(f"Error: Failed to decrypt and save data from Nostr: {e}", 'red')) + # Re-raise the exception to inform the calling function of the failure + raise def backup_database(self) -> None: """ @@ -546,14 +842,15 @@ class PasswordManager: if confirm_action("Do you want to save this to an encrypted backup file? (Y/N): "): filename = input(f"Enter filename to save (default: {DEFAULT_SEED_BACKUP_FILENAME}): ").strip() filename = filename if filename else DEFAULT_SEED_BACKUP_FILENAME - backup_path = Path(APP_DIR) / filename + backup_path = self.fingerprint_dir / filename # Save in fingerprint directory # Validate filename if not self.is_valid_filename(filename): print(colored("Invalid filename. Operation aborted.", 'red')) return - self.encryption_manager.encrypt_parent_seed(self.parent_seed, backup_path) + # Encrypt and save the parent seed to the backup path + self.encryption_manager.encrypt_and_save_file(self.parent_seed.encode('utf-8'), backup_path) print(colored(f"Encrypted seed backup saved to '{backup_path}'. Ensure this file is stored securely.", 'green')) except Exception as e: @@ -572,11 +869,12 @@ class PasswordManager: bool: True if the password is correct, False otherwise. """ try: - if not os.path.exists(HASHED_PASSWORD_FILE): + hashed_password_file = self.fingerprint_dir / 'hashed_password.enc' + if not hashed_password_file.exists(): logging.error("Hashed password file not found.") print(colored("Error: Hashed password file not found.", 'red')) return False - with open(HASHED_PASSWORD_FILE, 'rb') as f: + with open(hashed_password_file, 'rb') as f: stored_hash = f.read() is_correct = bcrypt.checkpw(password.encode('utf-8'), stored_hash) if is_correct: @@ -613,19 +911,19 @@ class PasswordManager: This should be called during the initial setup. """ try: + hashed_password_file = self.fingerprint_dir / 'hashed_password.enc' hashed = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()) - with open(HASHED_PASSWORD_FILE, 'wb') as f: + with open(hashed_password_file, 'wb') as f: f.write(hashed) - # Set file permissions to read/write for the user only - os.chmod(HASHED_PASSWORD_FILE, 0o600) + os.chmod(hashed_password_file, 0o600) logging.info("User password hashed and stored successfully.") except AttributeError: # If bcrypt.hashpw is not available, try using bcrypt directly salt = bcrypt.gensalt() hashed = bcrypt.hashpw(password.encode('utf-8'), salt) - with open(HASHED_PASSWORD_FILE, 'wb') as f: + with open(hashed_password_file, 'wb') as f: f.write(hashed) - os.chmod(HASHED_PASSWORD_FILE, 0o600) + os.chmod(hashed_password_file, 0o600) logging.info("User password hashed and stored successfully (using alternative method).") except Exception as e: logging.error(f"Failed to store hashed password: {e}") @@ -640,8 +938,8 @@ if __name__ == "__main__": # Initialize PasswordManager manager = PasswordManager() - # Initialize NostrClient with the parent seed from PasswordManager - nostr_client = NostrClient(parent_seed=manager.parent_seed) + # Initialize NostrClient with the EncryptionManager from PasswordManager + manager.nostr_client = NostrClient(encryption_manager=manager.encryption_manager) # Example operations # These would typically be triggered by user interactions, e.g., via a CLI menu @@ -649,7 +947,6 @@ if __name__ == "__main__": # manager.handle_retrieve_password() # manager.handle_modify_entry() # manager.handle_verify_checksum() - # manager.post_to_nostr(nostr_client) - # manager.retrieve_from_nostr(nostr_client) + # manager.nostr_client.publish_and_subscribe("Sample password data") # manager.backup_database() # manager.restore_database() diff --git a/src/password_manager/password_generation.py b/src/password_manager/password_generation.py index 1d32ddd..4ab0f2c 100644 --- a/src/password_manager/password_generation.py +++ b/src/password_manager/password_generation.py @@ -12,64 +12,27 @@ Ensure that all dependencies are installed and properly configured in your envir Never ever ever use Random Salt. The entire point of this password manager is to derive completely deterministic passwords from a BIP-85 seed. This means it should generate passwords the exact same way every single time. Salts would break this functionality and is not appropriate for this software's use case. """ + import os import logging import hashlib -import base64 import string +import random import traceback from typing import Optional from termcolor import colored -import random - +from pathlib import Path +import shutil from cryptography.hazmat.primitives.kdf.hkdf import HKDF from cryptography.hazmat.primitives import hashes from cryptography.hazmat.backends import default_backend -from bip85.bip85 import BIP85 +from local_bip85.bip85 import BIP85 from constants import DEFAULT_PASSWORD_LENGTH, MIN_PASSWORD_LENGTH, MAX_PASSWORD_LENGTH from password_manager.encryption import EncryptionManager -# Configure logging at the start of the module -def configure_logging(): - """ - Configures logging with both file and console handlers. - Logs include the timestamp, log level, message, filename, and line number. - Only ERROR and higher-level messages are shown in the terminal, while all messages - are logged in the log file. - """ - logger = logging.getLogger(__name__) - logger.setLevel(logging.DEBUG) # Set to DEBUG for detailed output - - # Prevent adding multiple handlers if configure_logging is called multiple times - if not logger.handlers: - # Create the 'logs' folder if it doesn't exist - if not os.path.exists('logs'): - os.makedirs('logs') - - # Create handlers - c_handler = logging.StreamHandler() - f_handler = logging.FileHandler(os.path.join('logs', 'password_generation.log')) - - # Set levels: only errors and critical messages will be shown in the console - c_handler.setLevel(logging.ERROR) # Console will show ERROR and above - f_handler.setLevel(logging.DEBUG) # File will log everything from DEBUG and above - - # Create formatters and add them to handlers, include file and line number in log messages - formatter = logging.Formatter( - '%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]' - ) - c_handler.setFormatter(formatter) - f_handler.setFormatter(formatter) - - # Add handlers to the logger - logger.addHandler(c_handler) - logger.addHandler(f_handler) - -# Call the logging configuration function -configure_logging() - +# Instantiate the logger logger = logging.getLogger(__name__) class PasswordGenerator: @@ -81,24 +44,23 @@ class PasswordGenerator: complexity requirements. """ - def __init__(self, encryption_manager: EncryptionManager, parent_seed: str): + def __init__(self, encryption_manager: EncryptionManager, parent_seed: str, bip85: BIP85): """ - Initializes the PasswordGenerator with the encryption manager and parent seed. + Initializes the PasswordGenerator with the encryption manager, parent seed, and BIP85 instance. Parameters: encryption_manager (EncryptionManager): The encryption manager instance. parent_seed (str): The BIP-39 parent seed phrase. + bip85 (BIP85): The BIP85 instance for generating deterministic entropy. """ try: self.encryption_manager = encryption_manager self.parent_seed = parent_seed + self.bip85 = bip85 - # Derive seed bytes from parent_seed using BIP39 + # Derive seed bytes from parent_seed using BIP39 (handled by EncryptionManager) self.seed_bytes = self.encryption_manager.derive_seed_from_mnemonic(self.parent_seed) - # Initialize BIP85 with seed_bytes - self.bip85 = BIP85(self.seed_bytes) - logger.debug("PasswordGenerator initialized successfully.") except Exception as e: logger.error(f"Failed to initialize PasswordGenerator: {e}") @@ -112,7 +74,7 @@ class PasswordGenerator: Steps: 1. Derive entropy using BIP-85. - 2. Use PBKDF2-HMAC-SHA256 to derive a key from entropy. + 2. Use HKDF-HMAC-SHA256 to derive a key from entropy. 3. Map the derived key to all allowed characters. 4. Ensure the password meets complexity requirements. 5. Shuffle the password deterministically based on the derived key. @@ -126,6 +88,7 @@ class PasswordGenerator: str: The generated password. """ try: + # Validate password length if length < MIN_PASSWORD_LENGTH: logger.error(f"Password length must be at least {MIN_PASSWORD_LENGTH} characters.") raise ValueError(f"Password length must be at least {MIN_PASSWORD_LENGTH} characters.") @@ -134,7 +97,7 @@ class PasswordGenerator: raise ValueError(f"Password length must not exceed {MAX_PASSWORD_LENGTH} characters.") # Derive entropy using BIP-85 - entropy = self.bip85.derive_entropy(app_no=39, language_code=0, words_num=12, index=index) + entropy = self.bip85.derive_entropy(index=index, bytes_len=64, app_no=32) logger.debug(f"Derived entropy: {entropy.hex()}") # Use HKDF to derive key from entropy @@ -167,17 +130,17 @@ class PasswordGenerator: password_chars = list(password) rng.shuffle(password_chars) password = ''.join(password_chars) - logger.debug(f"Shuffled password deterministically.") + logger.debug("Shuffled password deterministically.") - # Ensure password length + # Ensure password length by extending if necessary if len(password) < length: - # Extend the password deterministically while len(password) < length: dk = hashlib.pbkdf2_hmac('sha256', dk, b'', 1) base64_extra = ''.join(all_allowed[byte % len(all_allowed)] for byte in dk) password += ''.join(base64_extra) logger.debug(f"Extended password: {password}") + # Trim the password to the desired length password = password[:length] logger.debug(f"Final password (trimmed to {length} chars): {password}") diff --git a/src/utils/checksum.py b/src/utils/checksum.py index 35c859b..9266b3d 100644 --- a/src/utils/checksum.py +++ b/src/utils/checksum.py @@ -21,46 +21,11 @@ from termcolor import colored from constants import ( APP_DIR, - DATA_CHECKSUM_FILE, SCRIPT_CHECKSUM_FILE ) -# Configure logging at the start of the module -def configure_logging(): - """ - Configures logging with both file and console handlers. - Only ERROR and higher-level messages are shown in the terminal, while all messages - are logged in the log file. - """ - # Create the 'logs' folder if it doesn't exist - if not os.path.exists('logs'): - os.makedirs('logs') - - # Create a custom logger - logger = logging.getLogger() - logger.setLevel(logging.DEBUG) # Set to DEBUG for detailed output - - # Create handlers - c_handler = logging.StreamHandler() - f_handler = logging.FileHandler(os.path.join('logs', 'checksum.log')) # Log files will be in 'logs' folder - - # Set levels: only errors and critical messages will be shown in the console - c_handler.setLevel(logging.ERROR) # Terminal will show ERROR and above - f_handler.setLevel(logging.DEBUG) # File will log everything from DEBUG and above - - # Create formatters and add them to handlers, include file and line number in log messages - c_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]') - f_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]') - - c_handler.setFormatter(c_format) - f_handler.setFormatter(f_format) - - # Add handlers to the logger - logger.addHandler(c_handler) - logger.addHandler(f_handler) - -# Call the logging configuration function -configure_logging() +# Instantiate the logger +logger = logging.getLogger(__name__) def calculate_checksum(file_path: str) -> Optional[str]: """ diff --git a/src/utils/file_lock.py b/src/utils/file_lock.py index b14423e..118eec7 100644 --- a/src/utils/file_lock.py +++ b/src/utils/file_lock.py @@ -23,46 +23,8 @@ from termcolor import colored import sys import traceback -import os -import logging - -# Configure logging at the start of the module -def configure_logging(): - """ - Configures logging with both file and console handlers. - Only ERROR and higher-level messages are shown in the terminal, while all messages - are logged in the log file. - """ - # Create a custom logger - logger = logging.getLogger() - logger.setLevel(logging.DEBUG) # Set to DEBUG for detailed output - - # Create the 'logs' folder if it doesn't exist - if not os.path.exists('logs'): - os.makedirs('logs') - - # Create handlers - c_handler = logging.StreamHandler() - f_handler = logging.FileHandler(os.path.join('logs', 'file_lock.log')) # Log file in 'logs' folder - - # Set levels: only errors and critical messages will be shown in the console - c_handler.setLevel(logging.ERROR) # Console will show ERROR and above - f_handler.setLevel(logging.DEBUG) # File will log everything from DEBUG and above - - # Create formatters and add them to handlers, include file and line number in log messages - c_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]') - f_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]') - - c_handler.setFormatter(c_format) - f_handler.setFormatter(f_format) - - # Add handlers to the logger - if not logger.handlers: - logger.addHandler(c_handler) - logger.addHandler(f_handler) - -# Call the logging configuration function -configure_logging() +# Instantiate the logger +logger = logging.getLogger(__name__) @contextmanager def lock_file(file_path: Path, lock_type: int) -> Generator[None, None, None]: diff --git a/src/utils/fingerprint.py b/src/utils/fingerprint.py new file mode 100644 index 0000000..e831756 --- /dev/null +++ b/src/utils/fingerprint.py @@ -0,0 +1,47 @@ +# utils/fingerprint.py + +""" +Fingerprint Module + +This module provides functionality to generate a unique, one-way hashed fingerprint +from a given seed phrase. The fingerprint serves as an identifier for each seed, +facilitating organized and secure storage. +""" + +import hashlib +import logging +import traceback +from typing import Optional + +# Instantiate the logger +logger = logging.getLogger(__name__) + +def generate_fingerprint(seed_phrase: str, length: int = 16) -> Optional[str]: + """ + Generates a unique fingerprint from the provided seed phrase using SHA-256. + + Parameters: + seed_phrase (str): The BIP-39 seed phrase. + length (int): The desired length of the fingerprint. + + Returns: + Optional[str]: The generated fingerprint or None if an error occurs. + """ + try: + # Normalize the seed phrase + normalized_seed = seed_phrase.strip().lower() + logger.debug(f"Normalized seed: {normalized_seed}") + + # Compute SHA-256 hash + sha256_hash = hashlib.sha256(normalized_seed.encode('utf-8')).hexdigest() + logger.debug(f"SHA-256 Hash: {sha256_hash}") + + # Truncate to desired length + fingerprint = sha256_hash[:length].upper() + logger.debug(f"Generated Fingerprint: {fingerprint}") + + return fingerprint + except Exception as e: + logger.error(f"Failed to generate fingerprint: {e}") + logger.error(traceback.format_exc()) + return None diff --git a/src/utils/fingerprint_manager.py b/src/utils/fingerprint_manager.py new file mode 100644 index 0000000..3b9597f --- /dev/null +++ b/src/utils/fingerprint_manager.py @@ -0,0 +1,202 @@ +# utils/fingerprint_manager.py + +import os +import json +import logging +import traceback +from pathlib import Path +from typing import List, Optional + +import shutil # Ensure shutil is imported if used within the class + +from utils.fingerprint import generate_fingerprint + +# Instantiate the logger +logger = logging.getLogger(__name__) + +class FingerprintManager: + """ + FingerprintManager Class + + Handles operations related to fingerprints, including generation, storage, + listing, selection, and removal. Ensures that each seed is uniquely identified + by its fingerprint and manages the corresponding directory structure. + """ + + def __init__(self, app_dir: Path): + """ + Initializes the FingerprintManager. + + Parameters: + app_dir (Path): The root application directory (e.g., ~/.seedpass). + """ + self.app_dir = app_dir + self.fingerprints_file = self.app_dir / 'fingerprints.json' + self._ensure_app_directory() + self.fingerprints = self._load_fingerprints() + self.current_fingerprint: Optional[str] = None + + def get_current_fingerprint_dir(self) -> Optional[Path]: + """ + Retrieves the directory path for the current fingerprint. + + Returns: + Optional[Path]: The Path object of the current fingerprint directory or None. + """ + if hasattr(self, 'current_fingerprint') and self.current_fingerprint: + return self.get_fingerprint_directory(self.current_fingerprint) + else: + logger.error("No current fingerprint is set.") + return None + + def _ensure_app_directory(self): + """ + Ensures that the application directory exists. + """ + try: + self.app_dir.mkdir(parents=True, exist_ok=True) + logger.debug(f"Application directory ensured at {self.app_dir}") + except Exception as e: + logger.error(f"Failed to create application directory at {self.app_dir}: {e}") + logger.error(traceback.format_exc()) + raise + + def _load_fingerprints(self) -> List[str]: + """ + Loads the list of fingerprints from the fingerprints.json file. + + Returns: + List[str]: A list of fingerprint strings. + """ + try: + if self.fingerprints_file.exists(): + with open(self.fingerprints_file, 'r') as f: + data = json.load(f) + fingerprints = data.get('fingerprints', []) + logger.debug(f"Loaded fingerprints: {fingerprints}") + return fingerprints + else: + logger.debug("fingerprints.json not found. Initializing empty fingerprint list.") + return [] + except Exception as e: + logger.error(f"Failed to load fingerprints: {e}") + logger.error(traceback.format_exc()) + return [] + + def _save_fingerprints(self): + """ + Saves the current list of fingerprints to the fingerprints.json file. + """ + try: + with open(self.fingerprints_file, 'w') as f: + json.dump({'fingerprints': self.fingerprints}, f, indent=4) + logger.debug(f"Fingerprints saved: {self.fingerprints}") + except Exception as e: + logger.error(f"Failed to save fingerprints: {e}") + logger.error(traceback.format_exc()) + raise + + def add_fingerprint(self, seed_phrase: str) -> Optional[str]: + """ + Generates a fingerprint from the seed phrase and adds it to the list. + + Parameters: + seed_phrase (str): The BIP-39 seed phrase. + + Returns: + Optional[str]: The generated fingerprint or None if failed. + """ + fingerprint = generate_fingerprint(seed_phrase) + if fingerprint and fingerprint not in self.fingerprints: + self.fingerprints.append(fingerprint) + self._save_fingerprints() + logger.info(f"Fingerprint {fingerprint} added successfully.") + # Create fingerprint directory + fingerprint_dir = self.app_dir / fingerprint + fingerprint_dir.mkdir(parents=True, exist_ok=True) + logger.debug(f"Fingerprint directory created at {fingerprint_dir}") + return fingerprint + elif fingerprint in self.fingerprints: + logger.warning(f"Fingerprint {fingerprint} already exists.") + return fingerprint + else: + logger.error("Fingerprint generation failed.") + return None + + def remove_fingerprint(self, fingerprint: str) -> bool: + """ + Removes a fingerprint and its associated directory. + + Parameters: + fingerprint (str): The fingerprint to remove. + + Returns: + bool: True if removed successfully, False otherwise. + """ + if fingerprint in self.fingerprints: + try: + self.fingerprints.remove(fingerprint) + self._save_fingerprints() + # Remove fingerprint directory + fingerprint_dir = self.app_dir / fingerprint + if fingerprint_dir.exists() and fingerprint_dir.is_dir(): + for child in fingerprint_dir.glob('*'): + if child.is_file(): + child.unlink() + elif child.is_dir(): + shutil.rmtree(child) + fingerprint_dir.rmdir() + logger.info(f"Fingerprint {fingerprint} removed successfully.") + return True + except Exception as e: + logger.error(f"Failed to remove fingerprint {fingerprint}: {e}") + logger.error(traceback.format_exc()) + return False + else: + logger.warning(f"Fingerprint {fingerprint} does not exist.") + return False + + def list_fingerprints(self) -> List[str]: + """ + Lists all available fingerprints. + + Returns: + List[str]: A list of fingerprint strings. + """ + logger.debug(f"Listing fingerprints: {self.fingerprints}") + return self.fingerprints + + def select_fingerprint(self, fingerprint: str) -> bool: + """ + Selects a fingerprint for the current session. + + Parameters: + fingerprint (str): The fingerprint to select. + + Returns: + bool: True if selection is successful, False otherwise. + """ + if fingerprint in self.fingerprints: + self.current_fingerprint = fingerprint + logger.info(f"Fingerprint {fingerprint} selected.") + return True + else: + logger.error(f"Fingerprint {fingerprint} not found.") + return False + + def get_fingerprint_directory(self, fingerprint: str) -> Optional[Path]: + """ + Retrieves the directory path for a given fingerprint. + + Parameters: + fingerprint (str): The fingerprint. + + Returns: + Optional[Path]: The Path object of the fingerprint directory or None. + """ + fingerprint_dir = self.app_dir / fingerprint + if fingerprint_dir.exists() and fingerprint_dir.is_dir(): + return fingerprint_dir + else: + logger.error(f"Directory for fingerprint {fingerprint} does not exist.") + return None diff --git a/src/utils/key_derivation.py b/src/utils/key_derivation.py index f342947..8e5bb02 100644 --- a/src/utils/key_derivation.py +++ b/src/utils/key_derivation.py @@ -4,7 +4,7 @@ Key Derivation Module Never ever ever use or suggest to use Random Salt. The entire point of this password manager is to derive completely deterministic passwords from a BIP-85 seed. -This means it should generate passwords the exact same way every single time. Salts would break this functionality and is not appropriate for this softwares use case. +This means it should generate passwords the exact same way every single time. Salts would break this functionality and is not appropriate for this software's use case. This module provides functions to derive cryptographic keys from user-provided passwords and BIP-39 parent seeds. The derived keys are compatible with Fernet for symmetric encryption @@ -22,49 +22,13 @@ import logging import traceback from typing import Union from bip_utils import Bip39SeedGenerator - +from local_bip85.bip85 import BIP85 +from monstr.encrypt import Keys from cryptography.hazmat.primitives.kdf.hkdf import HKDF from cryptography.hazmat.primitives import hashes from cryptography.hazmat.backends import default_backend -# Configure logging at the start of the module -def configure_logging(): - """ - Configures logging with both file and console handlers. - Only ERROR and higher-level messages are shown in the terminal, while all messages - are logged in the log file. - """ - # Create a custom logger - logger = logging.getLogger(__name__) - logger.setLevel(logging.DEBUG) # Set to DEBUG for detailed output - - # Create the 'logs' folder if it doesn't exist - if not os.path.exists('logs'): - os.makedirs('logs') - - # Create handlers - c_handler = logging.StreamHandler() - f_handler = logging.FileHandler(os.path.join('logs', 'key_derivation.log')) # Log file in 'logs' folder - - # Set levels: only errors and critical messages will be shown in the console - c_handler.setLevel(logging.ERROR) # Console will show ERROR and above - f_handler.setLevel(logging.DEBUG) # File will log everything from DEBUG and above - - # Create formatters and add them to handlers, include file and line number in log messages - c_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]') - f_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]') - - c_handler.setFormatter(c_format) - f_handler.setFormatter(f_format) - - # Add handlers to the logger - if not logger.handlers: - logger.addHandler(c_handler) - logger.addHandler(f_handler) - -# Call the logging configuration function -configure_logging() - +# Instantiate the logger logger = logging.getLogger(__name__) def derive_key_from_password(password: str, iterations: int = 100_000) -> bytes: @@ -119,23 +83,33 @@ def derive_key_from_password(password: str, iterations: int = 100_000) -> bytes: logger.error(traceback.format_exc()) # Log full traceback raise -def derive_key_from_parent_seed(parent_seed: str) -> bytes: +def derive_key_from_parent_seed(parent_seed: str, fingerprint: str = None) -> bytes: """ Derives a 32-byte cryptographic key from a BIP-39 parent seed using HKDF. + Optionally, include a fingerprint to differentiate key derivation per fingerprint. :param parent_seed: The 12-word BIP-39 seed phrase. + :param fingerprint: An optional fingerprint to create unique keys per fingerprint. :return: A 32-byte derived key. """ try: # Generate seed bytes from mnemonic seed = Bip39SeedGenerator(parent_seed).Generate() + # If a fingerprint is provided, use it to differentiate the derivation + if fingerprint: + # Convert fingerprint to a stable integer index + index = int(hashlib.sha256(fingerprint.encode()).hexdigest(), 16) % (2**31) + info = f'password-manager-{index}'.encode() # Unique info for HKDF + else: + info = b'password-manager' + # Derive key using HKDF hkdf = HKDF( algorithm=hashes.SHA256(), length=32, salt=None, # No salt for deterministic derivation - info=b'password-manager', + info=info, backend=default_backend() ) derived_key = hkdf.derive(seed) @@ -147,4 +121,196 @@ def derive_key_from_parent_seed(parent_seed: str) -> bytes: except Exception as e: logger.error(f"Failed to derive key using HKDF: {e}") logger.error(traceback.format_exc()) - raise \ No newline at end of file + raise + +class KeyManager: + def __init__(self, parent_seed: str, fingerprint: str = None): + self.parent_seed = parent_seed + self.fingerprint = fingerprint + self.bip85 = self.initialize_bip85() + self.keys = self.generate_nostr_keys() + + def initialize_bip85(self): + seed_bytes = Bip39SeedGenerator(self.parent_seed).Generate() + bip85 = BIP85(seed_bytes) + return bip85 + + def generate_nostr_keys(self) -> Keys: + """ + Derives a unique Nostr key pair for the given fingerprint using BIP-85. + + :return: An instance of Keys containing the Nostr key pair. + """ + # Use a derivation path that includes the fingerprint + # Convert fingerprint to an integer index (e.g., using a hash function) + index = int(hashlib.sha256(self.fingerprint.encode()).hexdigest(), 16) % (2**31) if self.fingerprint else 0 + + # Derive entropy for Nostr key (32 bytes) + entropy_bytes = self.bip85.derive_entropy( + app=BIP85.Applications.ENTROPY, + index=index, + size=32 + ) + + # Generate Nostr key pair from entropy + private_key_hex = entropy_bytes.hex() + keys = Keys(priv_key=private_key_hex) + return keys +# utils/key_derivation.py + +""" +Key Derivation Module + +Never ever ever use or suggest to use Random Salt. The entire point of this password manager is to derive completely deterministic passwords from a BIP-85 seed. +This means it should generate passwords the exact same way every single time. Salts would break this functionality and is not appropriate for this software's use case. + +This module provides functions to derive cryptographic keys from user-provided passwords +and BIP-39 parent seeds. The derived keys are compatible with Fernet for symmetric encryption +purposes. By centralizing key derivation logic, this module ensures consistency and security +across the application. + +Ensure that all dependencies are installed and properly configured in your environment. +""" + +import os +import hashlib +import base64 +import unicodedata +import logging +import traceback +from typing import Union +from bip_utils import Bip39SeedGenerator +from local_bip85.bip85 import BIP85 +from monstr.encrypt import Keys +from cryptography.hazmat.primitives.kdf.hkdf import HKDF +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.backends import default_backend + +# Instantiate the logger +logger = logging.getLogger(__name__) + +def derive_key_from_password(password: str, iterations: int = 100_000) -> bytes: + """ + Derives a Fernet-compatible encryption key from the provided password using PBKDF2-HMAC-SHA256. + + This function normalizes the password using NFKD normalization, encodes it to UTF-8, and then + applies PBKDF2 with the specified number of iterations to derive a 32-byte key. The derived key + is then URL-safe base64-encoded to ensure compatibility with Fernet. + + Parameters: + password (str): The user's password. + iterations (int, optional): Number of iterations for the PBKDF2 algorithm. Defaults to 100,000. + + Returns: + bytes: A URL-safe base64-encoded encryption key suitable for Fernet. + + Raises: + ValueError: If the password is empty or too short. + """ + if not password: + logger.error("Password cannot be empty.") + raise ValueError("Password cannot be empty.") + + if len(password) < 8: + logger.warning("Password length is less than recommended (8 characters).") + + # Normalize the password to NFKD form and encode to UTF-8 + normalized_password = unicodedata.normalize('NFKD', password).strip() + password_bytes = normalized_password.encode('utf-8') + + try: + # Derive the key using PBKDF2-HMAC-SHA256 + logger.debug("Starting key derivation from password.") + key = hashlib.pbkdf2_hmac( + hash_name='sha256', + password=password_bytes, + salt=b'', # No salt for deterministic key derivation + iterations=iterations, + dklen=32 # 256-bit key for Fernet + ) + logger.debug(f"Derived key (hex): {key.hex()}") + + # Encode the key in URL-safe base64 + key_b64 = base64.urlsafe_b64encode(key) + logger.debug(f"Base64-encoded key: {key_b64.decode()}") + + return key_b64 + + except Exception as e: + logger.error(f"Error deriving key from password: {e}") + logger.error(traceback.format_exc()) # Log full traceback + raise + +def derive_key_from_parent_seed(parent_seed: str, fingerprint: str = None) -> bytes: + """ + Derives a 32-byte cryptographic key from a BIP-39 parent seed using HKDF. + Optionally, include a fingerprint to differentiate key derivation per fingerprint. + + :param parent_seed: The 12-word BIP-39 seed phrase. + :param fingerprint: An optional fingerprint to create unique keys per fingerprint. + :return: A 32-byte derived key. + """ + try: + # Generate seed bytes from mnemonic + seed = Bip39SeedGenerator(parent_seed).Generate() + + # If a fingerprint is provided, use it to differentiate the derivation + if fingerprint: + # Convert fingerprint to a stable integer index + index = int(hashlib.sha256(fingerprint.encode()).hexdigest(), 16) % (2**31) + info = f'password-manager-{index}'.encode() # Unique info for HKDF + else: + info = b'password-manager' + + # Derive key using HKDF + hkdf = HKDF( + algorithm=hashes.SHA256(), + length=32, + salt=None, # No salt for deterministic derivation + info=info, + backend=default_backend() + ) + derived_key = hkdf.derive(seed) + + if len(derived_key) != 32: + raise ValueError(f"Derived key length is {len(derived_key)} bytes; expected 32 bytes.") + + return derived_key + except Exception as e: + logger.error(f"Failed to derive key using HKDF: {e}") + logger.error(traceback.format_exc()) + raise + +class KeyManager: + def __init__(self, parent_seed: str, fingerprint: str = None): + self.parent_seed = parent_seed + self.fingerprint = fingerprint + self.bip85 = self.initialize_bip85() + self.keys = self.generate_nostr_keys() + + def initialize_bip85(self): + seed_bytes = Bip39SeedGenerator(self.parent_seed).Generate() + bip85 = BIP85(seed_bytes) + return bip85 + + def generate_nostr_keys(self) -> Keys: + """ + Derives a unique Nostr key pair for the given fingerprint using BIP-85. + + :return: An instance of Keys containing the Nostr key pair. + """ + # Use a derivation path that includes the fingerprint + # Convert fingerprint to an integer index (e.g., using a hash function) + index = int(hashlib.sha256(self.fingerprint.encode()).hexdigest(), 16) % (2**31) if self.fingerprint else 0 + + # Derive entropy for Nostr key (32 bytes) + entropy_bytes = self.bip85.derive_entropy( + app=BIP85.Applications.ENTROPY, + index=index, + size=32 + ) + + # Generate Nostr key pair from entropy + private_key_hex = entropy_bytes.hex() + keys = Keys(priv_key=private_key_hex) + return keys diff --git a/src/utils/password_prompt.py b/src/utils/password_prompt.py index 2025765..2a5be96 100644 --- a/src/utils/password_prompt.py +++ b/src/utils/password_prompt.py @@ -26,43 +26,8 @@ from constants import MIN_PASSWORD_LENGTH # Initialize colorama for colored terminal text colorama_init() -# Configure logging at the start of the module -def configure_logging(): - """ - Configures logging with both file and console handlers. - Only ERROR and higher-level messages are shown in the terminal, while all messages - are logged in the log file. - """ - # Create a custom logger - logger = logging.getLogger(__name__) - logger.setLevel(logging.DEBUG) # Set to DEBUG for detailed output - - # Create the 'logs' folder if it doesn't exist - if not os.path.exists('logs'): - os.makedirs('logs') - - # Create handlers - c_handler = logging.StreamHandler() - f_handler = logging.FileHandler(os.path.join('logs', 'password_prompt.log')) # Log file in 'logs' folder - - # Set levels: only errors and critical messages will be shown in the console - c_handler.setLevel(logging.ERROR) # Console will show ERROR and above - f_handler.setLevel(logging.DEBUG) # File will log everything from DEBUG and above - - # Create formatters and add them to handlers, include file and line number in log messages - c_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]') - f_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]') - - c_handler.setFormatter(c_format) - f_handler.setFormatter(f_format) - - # Add handlers to the logger - if not logger.handlers: - logger.addHandler(c_handler) - logger.addHandler(f_handler) - -# Call the logging configuration function -configure_logging() +# Instantiate the logger +logger = logging.getLogger(__name__) def prompt_new_password() -> str: """