diff --git a/.gitattributes b/.gitattributes deleted file mode 100644 index dfe0770..0000000 --- a/.gitattributes +++ /dev/null @@ -1,2 +0,0 @@ -# Auto detect text files and perform LF normalization -* text=auto diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..556056e --- /dev/null +++ b/.gitignore @@ -0,0 +1,22 @@ +# Ignore virtual environment directory +venv/ + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# Encrypted index file should be backed up, hence not ignored +!.deterministic_password_generator/password_indices.csv + +# Ignore system files +.DS_Store +Thumbs.db + +# Ignore logs and temporary files +*.log +*.tmp + +# Python env +.env +*.env \ No newline at end of file diff --git a/README.md b/README.md index 3731dd8..e69de29 100644 --- a/README.md +++ b/README.md @@ -1,2 +0,0 @@ -# SeedPass - diff --git a/src/bip85/__init__.py b/src/bip85/__init__.py new file mode 100644 index 0000000..cebca93 --- /dev/null +++ b/src/bip85/__init__.py @@ -0,0 +1,14 @@ +# bip85/__init__.py + +import logging +import traceback + +try: + from .bip85 import BIP85 + logging.info("BIP85 module imported successfully.") +except Exception as e: + logging.error(f"Failed to import BIP85 module: {e}") + logging.error(traceback.format_exc()) # Log full traceback + +__all__ = ['BIP85'] + diff --git a/src/bip85/bip85.py b/src/bip85/bip85.py new file mode 100644 index 0000000..cd989c1 --- /dev/null +++ b/src/bip85/bip85.py @@ -0,0 +1,208 @@ +# bip85/bip85.py + +""" +BIP85 Module + +This module implements the BIP85 functionality for deterministic entropy and mnemonic derivation. +It provides the BIP85 class, which utilizes BIP32 and BIP39 standards to derive entropy and mnemonics +from a given seed. Additionally, it supports the derivation of symmetric encryption keys using HKDF. + +Never ever ever use or suggest to use Random Salt. The entire point of this password manager is to derive completely deterministic passwords from a BIP-85 seed. +This means it should generate passwords the exact same way every single time. Salts would break this functionality and is not appropriate for this softwares use case. + +Dependencies: +- bip_utils +- cryptography + +Ensure that all dependencies are installed and properly configured in your environment. +""" + +import sys +import hashlib +import hmac +import logging +import os +import traceback +from colorama import Fore + +from bip_utils import ( + Bip32Slip10Secp256k1, + Bip39MnemonicGenerator, + Bip39Languages +) + +from cryptography.hazmat.primitives.kdf.hkdf import HKDF +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.backends import default_backend + +# Configure logging at the start of the module +def configure_logging(): + """ + Configures logging with both file and console handlers. + Only ERROR and higher-level messages are shown in the terminal, while all messages + are logged in the log file. + """ + # Create the 'logs' folder if it doesn't exist + if not os.path.exists('logs'): + os.makedirs('logs') + + # Create a custom logger + logger = logging.getLogger() + logger.setLevel(logging.DEBUG) # Set to DEBUG for detailed output + + # Create handlers + c_handler = logging.StreamHandler(sys.stdout) + f_handler = logging.FileHandler(os.path.join('logs', 'bip85.log')) # Log files will be in 'logs' folder + + # Set levels: only errors and critical messages will be shown in the console + c_handler.setLevel(logging.ERROR) # Terminal will show ERROR and above + f_handler.setLevel(logging.DEBUG) # File will log everything from DEBUG and above + + # Create formatters and add them to handlers, include file and line number in log messages + c_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]') + f_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]') + + c_handler.setFormatter(c_format) + f_handler.setFormatter(f_format) + + # Add handlers to the logger + logger.addHandler(c_handler) + logger.addHandler(f_handler) + +# Call the logging configuration function +configure_logging() + +class BIP85: + """ + BIP85 Class + + Implements BIP-85 functionality for deterministic entropy and mnemonic derivation. + """ + + def __init__(self, seed_bytes: bytes): + """ + Initializes the BIP85 class with seed bytes. + + Parameters: + seed_bytes (bytes): The BIP39 seed bytes derived from the seed phrase. + + Raises: + SystemExit: If initialization fails. + """ + try: + self.bip32_ctx = Bip32Slip10Secp256k1.FromSeed(seed_bytes) + logging.debug("BIP32 context initialized successfully.") + except Exception as e: + logging.error(f"Error initializing BIP32 context: {e}") + logging.error(traceback.format_exc()) # Log full traceback + print(f"{Fore.RED}Error initializing BIP32 context: {e}") + sys.exit(1) + + def derive_entropy(self, app_no: int, language_code: int, words_num: int, index: int) -> bytes: + """ + Derives entropy using BIP-85 HMAC-SHA512 method. + + Parameters: + app_no (int): Application number (e.g., 39 for BIP39). + language_code (int): Language code (e.g., 0 for English). + words_num (int): Number of words in the mnemonic (e.g., 12). + index (int): Index for the child mnemonic. + + Returns: + bytes: Derived entropy. + + Raises: + SystemExit: If derivation fails or entropy length is invalid. + """ + path = f"m/83696968'/{app_no}'/{language_code}'/{words_num}'/{index}'" + try: + child_key = self.bip32_ctx.DerivePath(path) + k = child_key.PrivateKey().Raw().ToBytes() + logging.debug(f"Derived child key at path {path}: {k.hex()}") + + hmac_key = b"bip-entropy-from-k" + hmac_result = hmac.new(hmac_key, k, hashlib.sha512).digest() + logging.debug(f"HMAC-SHA512 result: {hmac_result.hex()}") + + if words_num == 12: + entropy = hmac_result[:16] # 128 bits for 12-word mnemonic + elif words_num == 18: + entropy = hmac_result[:24] # 192 bits for 18-word mnemonic + elif words_num == 24: + entropy = hmac_result[:32] # 256 bits for 24-word mnemonic + else: + logging.error(f"Unsupported number of words: {words_num}") + print(f"{Fore.RED}Error: Unsupported number of words: {words_num}") + sys.exit(1) + + if len(entropy) not in [16, 24, 32]: + logging.error(f"Derived entropy length is {len(entropy)} bytes; expected 16, 24, or 32 bytes.") + print(f"{Fore.RED}Error: Derived entropy length is {len(entropy)} bytes; expected 16, 24, or 32 bytes.") + sys.exit(1) + + logging.debug(f"Derived entropy: {entropy.hex()}") + return entropy + except Exception as e: + logging.error(f"Error deriving entropy: {e}") + logging.error(traceback.format_exc()) # Log full traceback + print(f"{Fore.RED}Error deriving entropy: {e}") + sys.exit(1) + + def derive_mnemonic(self, app_no: int, language_code: int, words_num: int, index: int) -> str: + """ + Derives a BIP-39 mnemonic using BIP-85 specification. + + Parameters: + app_no (int): Application number (e.g., 39 for BIP39). + language_code (int): Language code (e.g., 0 for English). + words_num (int): Number of words in the mnemonic (e.g., 12). + index (int): Index for the child mnemonic. + + Returns: + str: Derived BIP-39 mnemonic. + + Raises: + SystemExit: If mnemonic generation fails. + """ + entropy = self.derive_entropy(app_no, language_code, words_num, index) + try: + mnemonic = Bip39MnemonicGenerator(Bip39Languages.ENGLISH).FromEntropy(entropy) + logging.debug(f"Derived mnemonic: {mnemonic}") + return mnemonic + except Exception as e: + logging.error(f"Error generating mnemonic: {e}") + logging.error(traceback.format_exc()) # Log full traceback + print(f"{Fore.RED}Error generating mnemonic: {e}") + sys.exit(1) + + def derive_symmetric_key(self, app_no: int = 48, index: int = 0) -> bytes: + """ + Derives a symmetric encryption key using BIP85. + + Parameters: + app_no (int): Application number for key derivation (48 chosen arbitrarily). + index (int): Index for key derivation. + + Returns: + bytes: Derived symmetric key (32 bytes for AES-256). + + Raises: + SystemExit: If symmetric key derivation fails. + """ + entropy = self.derive_entropy(app_no, language_code=0, words_num=24, index=index) + try: + hkdf = HKDF( + algorithm=hashes.SHA256(), + length=32, # 256 bits for AES-256 + salt=None, + info=b'seedos-encryption-key', + backend=default_backend() + ) + symmetric_key = hkdf.derive(entropy) + logging.debug(f"Derived symmetric key: {symmetric_key.hex()}") + return symmetric_key + except Exception as e: + logging.error(f"Error deriving symmetric key: {e}") + logging.error(traceback.format_exc()) # Log full traceback + print(f"{Fore.RED}Error deriving symmetric key: {e}") + sys.exit(1) diff --git a/src/constants.py b/src/constants.py new file mode 100644 index 0000000..68ba06e --- /dev/null +++ b/src/constants.py @@ -0,0 +1,93 @@ +# constants.py + +import os +import logging +import sys +from pathlib import Path +import traceback + +def configure_logging(): + """ + Configures logging with both file and console handlers. + Only ERROR and higher-level messages are shown in the terminal, while all messages + are logged in the log file. + """ + # Create a custom logger + logger = logging.getLogger() + logger.setLevel(logging.DEBUG) # Set to DEBUG for detailed output + + # Create the 'logs' folder if it doesn't exist + if not os.path.exists('logs'): + os.makedirs('logs') + + # Create handlers + c_handler = logging.StreamHandler(sys.stdout) + f_handler = logging.FileHandler(os.path.join('logs', 'constants.log')) + + # Set levels: only errors and critical messages will be shown in the console + c_handler.setLevel(logging.ERROR) # Console will show ERROR and above + f_handler.setLevel(logging.DEBUG) # File will log everything from DEBUG and above + + # Create formatters and add them to handlers, include file and line number in log messages + c_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]') + f_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]') + + c_handler.setFormatter(c_format) + f_handler.setFormatter(f_format) + + # Add handlers to the logger if they are not already added + if not logger.handlers: + logger.addHandler(c_handler) + logger.addHandler(f_handler) + +# Configure logging at the start of the module +configure_logging() + +# ----------------------------------- +# Nostr Relay Connection Settings +# ----------------------------------- +MAX_RETRIES = 3 # Maximum number of retries for relay connections +RETRY_DELAY = 5 # Seconds to wait before retrying a failed connection + +try: + # ----------------------------------- + # Application Directory and Paths + # ----------------------------------- + APP_DIR = Path.home() / '.deterministic_password_generator' + APP_DIR.mkdir(exist_ok=True, parents=True) # Ensure the directory exists + logging.info(f"Application directory created at {APP_DIR}") +except Exception as e: + logging.error(f"Failed to create application directory: {e}") + logging.error(traceback.format_exc()) # Log full traceback + +try: + INDEX_FILE = APP_DIR / 'passwords_db.json' # Encrypted password database + PARENT_SEED_FILE = APP_DIR / 'parent_seed.enc' # Encrypted parent seed + logging.info(f"Index file path set to {INDEX_FILE}") + logging.info(f"Parent seed file path set to {PARENT_SEED_FILE}") +except Exception as e: + logging.error(f"Error setting file paths: {e}") + logging.error(traceback.format_exc()) # Log full traceback + +# ----------------------------------- +# Checksum Files for Integrity +# ----------------------------------- +try: + SCRIPT_CHECKSUM_FILE = APP_DIR / 'script_checksum.txt' # Checksum for main script + DATA_CHECKSUM_FILE = APP_DIR / 'passwords_checksum.txt' # Checksum for password data + logging.info(f"Checksum file paths set: Script {SCRIPT_CHECKSUM_FILE}, Data {DATA_CHECKSUM_FILE}") +except Exception as e: + logging.error(f"Error setting checksum file paths: {e}") + logging.error(traceback.format_exc()) # Log full traceback + +# ----------------------------------- +# Password Generation Constants +# ----------------------------------- +DEFAULT_PASSWORD_LENGTH = 12 # Default length for generated passwords +MIN_PASSWORD_LENGTH = 8 # Minimum allowed password length +MAX_PASSWORD_LENGTH = 128 # Maximum allowed password length + +# ----------------------------------- +# Additional Constants (if any) +# ----------------------------------- +# Add any other constants here as your project expands diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..fd3668e --- /dev/null +++ b/src/main.py @@ -0,0 +1,232 @@ +# main.py + +import os +import sys +import logging +import signal +from colorama import init as colorama_init +from termcolor import colored +import traceback + +from password_manager.manager import PasswordManager +from nostr.client import NostrClient + +# Initialize colorama for colored terminal text +colorama_init() + +def configure_logging(): + """ + Configures logging with both file and console handlers. + Logs errors in the terminal and all messages in the log file. + """ + logger = logging.getLogger() + logger.setLevel(logging.DEBUG) + + if not logger.handlers: + # Create handlers + c_handler = logging.StreamHandler(sys.stdout) + f_handler = logging.FileHandler(os.path.join('logs', 'main.log')) + + # Set levels + c_handler.setLevel(logging.ERROR) + f_handler.setLevel(logging.DEBUG) + + # Create formatters + formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]') + c_handler.setFormatter(formatter) + f_handler.setFormatter(formatter) + + # Add handlers + logger.addHandler(c_handler) + logger.addHandler(f_handler) + + return logger + +def display_menu(password_manager: PasswordManager, nostr_client: NostrClient): + """ + Displays the interactive menu and handles user input to perform various actions. + + :param password_manager: An instance of PasswordManager. + :param nostr_client: An instance of NostrClient. + """ + menu = """ + Select an option: + 1. Generate a New Password and Add to Index + 2. Retrieve a Password from Index + 3. Modify an Existing Entry + 4. Verify Script Checksum + 5. Post Encrypted Index to Nostr + 6. Retrieve Encrypted Index from Nostr + 7. Display Nostr Public Key (npub) + 8. Exit + """ + while True: + print(colored(menu, 'cyan')) + choice = input('Enter your choice (1-8): ').strip() + if choice == '1': + password_manager.handle_generate_password() + elif choice == '2': + password_manager.handle_retrieve_password() + elif choice == '3': + password_manager.handle_modify_entry() + elif choice == '4': + password_manager.handle_verify_checksum() + elif choice == '5': + handle_post_to_nostr(password_manager, nostr_client) + elif choice == '6': + handle_retrieve_from_nostr(password_manager, nostr_client) + elif choice == '7': + handle_display_npub(nostr_client) + elif choice == '8': + logging.info("Exiting the program.") + print(colored("Exiting the program.", 'green')) + nostr_client.close_client_pool() # Gracefully close the ClientPool + sys.exit(0) + else: + print(colored("Invalid choice. Please select a valid option.", 'red')) + +def handle_display_npub(nostr_client: NostrClient): + """ + Handles displaying the Nostr public key (npub) to the user. + + :param nostr_client: An instance of NostrClient. + """ + try: + npub = nostr_client.key_manager.get_npub() + if npub: + print(colored(f"\nYour Nostr Public Key (npub):\n{npub}\n", 'cyan')) + logging.info("Displayed npub to the user.") + else: + print(colored("Nostr public key not available.", 'red')) + logging.error("Nostr public key not available.") + except Exception as e: + logging.error(f"Failed to display npub: {e}") + print(f"Error: Failed to display npub: {e}", 'red') + +def handle_post_to_nostr(password_manager: PasswordManager, nostr_client: NostrClient): + """ + Handles the action of posting the encrypted password index to Nostr. + + :param password_manager: An instance of PasswordManager. + :param nostr_client: An instance of NostrClient. + """ + try: + # Get the encrypted data from the index file + encrypted_data = password_manager.get_encrypted_data() + if encrypted_data: + # Post to Nostr + nostr_client.publish_json_to_nostr(encrypted_data) + print(colored("Encrypted index posted to Nostr successfully.", 'green')) + logging.info("Encrypted index posted to Nostr successfully.") + else: + print(colored("No data available to post.", 'yellow')) + logging.warning("No data available to post to Nostr.") + except Exception as e: + logging.error(f"Failed to post to Nostr: {e}") + logging.error(traceback.format_exc()) + print(f"Error: Failed to post to Nostr: {e}", 'red') + +def handle_retrieve_from_nostr(password_manager: PasswordManager, nostr_client: NostrClient): + """ + Handles the action of retrieving the encrypted password index from Nostr. + + :param password_manager: An instance of PasswordManager. + :param nostr_client: An instance of NostrClient. + """ + try: + # Retrieve from Nostr + encrypted_data = nostr_client.retrieve_json_from_nostr_sync() + if encrypted_data: + # Decrypt and save the index + password_manager.decrypt_and_save_index_from_nostr(encrypted_data) + print(colored("Encrypted index retrieved and saved successfully.", 'green')) + logging.info("Encrypted index retrieved and saved successfully from Nostr.") + else: + print(colored("Failed to retrieve data from Nostr.", 'red')) + logging.error("Failed to retrieve data from Nostr.") + except Exception as e: + logging.error(f"Failed to retrieve from Nostr: {e}") + logging.error(traceback.format_exc()) + print(f"Error: Failed to retrieve from Nostr: {e}", 'red') + +def cleanup(nostr_client: NostrClient): + """ + Cleanup function to gracefully close the NostrClient's event loop. + This function is registered to run upon program termination. + """ + try: + nostr_client.close_client_pool() + except Exception as e: + logging.error(f"Cleanup failed: {e}") + print(f"Error during cleanup: {e}", 'red') + +if __name__ == '__main__': + """ + The main entry point of the application. + """ + # Configure logging with both file and console handlers + configure_logging() + + # Initialize PasswordManager + try: + password_manager = PasswordManager() + logging.info("PasswordManager initialized successfully.") + except Exception as e: + logging.error(f"Failed to initialize PasswordManager: {e}") + logging.error(traceback.format_exc()) # Log full traceback + print(f"Error: Failed to initialize PasswordManager: {e}", 'red') + sys.exit(1) + + # Initialize NostrClient with the parent seed from PasswordManager + try: + nostr_client = NostrClient(parent_seed=password_manager.parent_seed) + logging.info("NostrClient initialized successfully.") + except Exception as e: + logging.error(f"Failed to initialize NostrClient: {e}") + logging.error(traceback.format_exc()) # Log full traceback + print(f"Error: Failed to initialize NostrClient: {e}", 'red') + sys.exit(1) + + # Register signal handlers for graceful shutdown + def signal_handler(sig, frame): + """ + Handles termination signals to gracefully shutdown the NostrClient. + """ + print(colored("\nReceived shutdown signal. Exiting gracefully...", 'yellow')) + logging.info(f"Received shutdown signal: {sig}. Initiating graceful shutdown.") + try: + nostr_client.close_client_pool() # Gracefully close the ClientPool + logging.info("NostrClient closed successfully.") + except Exception as e: + logging.error(f"Error during shutdown: {e}") + print(f"Error during shutdown: {e}", 'red') + sys.exit(0) + + # Register the signal handlers + signal.signal(signal.SIGINT, signal_handler) # Handle Ctrl+C + signal.signal(signal.SIGTERM, signal_handler) # Handle termination signals + + # Display the interactive menu to the user + try: + display_menu(password_manager, nostr_client) + except KeyboardInterrupt: + logging.info("Program terminated by user via KeyboardInterrupt.") + print(colored("\nProgram terminated by user.", 'yellow')) + try: + nostr_client.close_client_pool() # Gracefully close the ClientPool + logging.info("NostrClient closed successfully.") + except Exception as e: + logging.error(f"Error during shutdown: {e}") + print(f"Error during shutdown: {e}", 'red') + sys.exit(0) + except Exception as e: + logging.error(f"An unexpected error occurred: {e}") + logging.error(traceback.format_exc()) # Log full traceback + print(f"Error: An unexpected error occurred: {e}", 'red') + try: + nostr_client.close_client_pool() # Attempt to close the ClientPool + logging.info("NostrClient closed successfully.") + except Exception as close_error: + logging.error(f"Error during shutdown: {close_error}") + print(f"Error during shutdown: {close_error}", 'red') + sys.exit(1) diff --git a/src/nostr/__init__.py b/src/nostr/__init__.py new file mode 100644 index 0000000..e98d3a3 --- /dev/null +++ b/src/nostr/__init__.py @@ -0,0 +1,14 @@ +# nostr/__init__.py + +import logging +import traceback + +try: + from .client import NostrClient + logging.info("NostrClient module imported successfully.") +except Exception as e: + logging.error(f"Failed to import NostrClient module: {e}") + logging.error(traceback.format_exc()) # Log full traceback + +__all__ = ['NostrClient'] + diff --git a/src/nostr/client.py b/src/nostr/client.py new file mode 100644 index 0000000..0adb720 --- /dev/null +++ b/src/nostr/client.py @@ -0,0 +1,560 @@ +# nostr/client.py + +import os +import sys +import logging +import traceback +import json +import time +import base64 +import hashlib +import asyncio +import concurrent.futures +from typing import List, Optional, Callable + +from monstr.client.client import ClientPool +from monstr.encrypt import Keys, NIP4Encrypt +from monstr.event.event import Event + +import threading +import uuid +import fcntl # Ensure fcntl is imported for file locking + +from .logging_config import configure_logging +from .key_manager import KeyManager +from .encryption_manager import EncryptionManager +from .event_handler import EventHandler +from constants import APP_DIR, INDEX_FILE, DATA_CHECKSUM_FILE +from utils.file_lock import lock_file + +logger = configure_logging() + +DEFAULT_RELAYS = [ + "wss://relay.snort.social", + "wss://nostr.oxtr.dev", + "wss://nostr-relay.wlvs.space" +] + +class NostrClient: + """ + NostrClient Class + + Handles interactions with the Nostr network, including publishing and retrieving encrypted events. + Utilizes deterministic key derivation via BIP-85 and integrates with the monstr library for protocol operations. + """ + + def __init__(self, parent_seed: str, relays: Optional[List[str]] = None): + """ + Initializes the NostrClient with a parent seed and connects to specified relays. + + :param parent_seed: The BIP39 mnemonic seed phrase. + :param relays: (Optional) A list of relay URLs to connect to. Defaults to predefined relays. + """ + try: + self.key_manager = KeyManager(parent_seed) + self.encryption_manager = EncryptionManager(self.key_manager) + self.event_handler = EventHandler() + + self.relays = relays if relays else DEFAULT_RELAYS + self.client_pool = ClientPool(self.relays) + self.subscriptions = {} + + self.initialize_client_pool() + logger.info("NostrClient initialized successfully.") + + self.is_shutting_down = False + self._shutdown_event = asyncio.Event() + + except Exception as e: + logger.error(f"Initialization failed: {e}") + logger.error(traceback.format_exc()) + print(f"Error: Initialization failed: {e}", file=sys.stderr) + sys.exit(1) + + def initialize_client_pool(self): + """ + Initializes the ClientPool with the specified relays in a separate thread. + """ + try: + logger.debug("Initializing ClientPool with relays.") + self.client_pool = ClientPool(self.relays) + + # Start the ClientPool in a separate thread + self.loop_thread = threading.Thread(target=self.run_event_loop, daemon=True) + self.loop_thread.start() + + # Wait until the ClientPool is connected to all relays + self.wait_for_connection() + + logger.info("ClientPool connected to all relays.") + except Exception as e: + logger.error(f"Failed to initialize ClientPool: {e}") + logger.error(traceback.format_exc()) + print(f"Error: Failed to initialize ClientPool: {e}", file=sys.stderr) + sys.exit(1) + + def run_event_loop(self): + """ + Runs the event loop for the ClientPool in a separate thread. + """ + try: + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + self.loop.create_task(self.client_pool.run()) + self.loop.run_forever() + except asyncio.CancelledError: + logger.debug("Event loop received cancellation.") + except Exception as e: + logger.error(f"Error running event loop in thread: {e}") + logger.error(traceback.format_exc()) + print(f"Error: Event loop in ClientPool thread encountered an issue: {e}", file=sys.stderr) + finally: + logger.debug("Closing the event loop.") + self.loop.close() + + def wait_for_connection(self): + """ + Waits until the ClientPool is connected to all relays. + """ + try: + while not self.client_pool.connected: + time.sleep(0.1) + except Exception as e: + logger.error(f"Error while waiting for ClientPool to connect: {e}") + logger.error(traceback.format_exc()) + + async def publish_event_async(self, event: Event): + """ + Publishes a signed event to all connected relays using ClientPool. + + :param event: The signed Event object to publish. + """ + try: + logger.debug(f"Publishing event: {event.serialize()}") + self.client_pool.publish(event) + logger.info(f"Event published with ID: {event.id}") + except Exception as e: + logger.error(f"Failed to publish event: {e}") + logger.error(traceback.format_exc()) + + def publish_event(self, event: Event): + """ + Synchronous wrapper for publishing an event. + + :param event: The signed Event object to publish. + """ + try: + asyncio.run_coroutine_threadsafe(self.publish_event_async(event), self.loop) + except Exception as e: + logger.error(f"Error in publish_event: {e}") + print(f"Error: Failed to publish event: {e}", file=sys.stderr) + + async def subscribe_async(self, filters: List[dict], handler: Callable[[ClientPool, str, Event], None]): + """ + Subscribes to events based on the provided filters using ClientPool. + + :param filters: A list of filter dictionaries. + :param handler: A callback function to handle incoming events. + """ + try: + sub_id = str(uuid.uuid4()) + self.client_pool.subscribe(handlers=handler, filters=filters, sub_id=sub_id) + logger.info(f"Subscribed to events with subscription ID: {sub_id}") + self.subscriptions[sub_id] = True + except Exception as e: + logger.error(f"Failed to subscribe: {e}") + logger.error(traceback.format_exc()) + print(f"Error: Failed to subscribe: {e}", file=sys.stderr) + + def subscribe(self, filters: List[dict], handler: Callable[[ClientPool, str, Event], None]): + """ + Synchronous wrapper for subscribing to events. + + :param filters: A list of filter dictionaries. + :param handler: A callback function to handle incoming events. + """ + try: + asyncio.run_coroutine_threadsafe(self.subscribe_async(filters, handler), self.loop) + except Exception as e: + logger.error(f"Error in subscribe: {e}") + print(f"Error: Failed to subscribe: {e}", file=sys.stderr) + + async def retrieve_json_from_nostr_async(self) -> Optional[bytes]: + """ + Retrieves the latest encrypted JSON event from Nostr. + + :return: The encrypted JSON data as bytes, or None if retrieval fails. + """ + try: + filters = [{ + 'authors': [self.key_manager.keys.public_key_hex()], + 'kinds': [Event.KIND_TEXT_NOTE, Event.KIND_ENCRYPT], + 'limit': 1 + }] + + events = [] + + def my_handler(the_client, sub_id, evt: Event): + logger.debug(f"Received event: {evt.serialize()}") + events.append(evt) + + await self.subscribe_async(filters=filters, handler=my_handler) + + await asyncio.sleep(2) # Adjust the sleep time as needed + + for sub_id in list(self.subscriptions.keys()): + self.client_pool.unsubscribe(sub_id) + del self.subscriptions[sub_id] + logger.debug(f"Unsubscribed from sub_id {sub_id}") + + if events: + event = events[0] + encrypted_json_b64 = event.content + + if event.kind == Event.KIND_ENCRYPT: + nip4_encrypt = NIP4Encrypt(self.key_manager.keys) + encrypted_json_b64 = nip4_encrypt.decrypt_message(event.content, event.pub_key) + + encrypted_json = base64.b64decode(encrypted_json_b64.encode('utf-8')) + logger.debug("Encrypted JSON data retrieved successfully.") + return encrypted_json + else: + logger.warning("No events found matching the filters.") + print("No events found matching the filters.", file=sys.stderr) + return None + + except Exception as e: + logger.error(f"Failed to retrieve JSON from Nostr: {e}") + logger.error(traceback.format_exc()) + print(f"Error: Failed to retrieve JSON from Nostr: {e}", file=sys.stderr) + return None + + def retrieve_json_from_nostr(self) -> Optional[bytes]: + """ + Public method to retrieve encrypted JSON from Nostr. + + :return: The encrypted JSON data as bytes, or None if retrieval fails. + """ + try: + future = asyncio.run_coroutine_threadsafe(self.retrieve_json_from_nostr_async(), self.loop) + return future.result() + except Exception as e: + logger.error(f"Error in retrieve_json_from_nostr: {e}") + logger.error(traceback.format_exc()) + print(f"Error: Failed to retrieve JSON from Nostr: {e}", file=sys.stderr) + return None + + async def do_post_async(self, text: str): + """ + Creates and publishes a text note event. + + :param text: The content of the text note. + """ + try: + event = Event( + kind=Event.KIND_TEXT_NOTE, + content=text, + pub_key=self.key_manager.keys.public_key_hex() + ) + event.created_at = int(time.time()) + event.sign(self.key_manager.keys.private_key_hex()) + + logger.debug(f"Event data: {event.serialize()}") + + await self.publish_event_async(event) + + except Exception as e: + logger.error(f"An error occurred during publishing: {e}", exc_info=True) + print(f"Error: An error occurred during publishing: {e}", file=sys.stderr) + + async def subscribe_feed_async(self, handler: Callable[[ClientPool, str, Event], None]): + """ + Subscribes to the feed of the client's own pubkey. + + :param handler: A callback function to handle incoming events. + """ + try: + filters = [{ + 'authors': [self.key_manager.keys.public_key_hex()], + 'kinds': [Event.KIND_TEXT_NOTE, Event.KIND_ENCRYPT], + 'limit': 100 + }] + + await self.subscribe_async(filters=filters, handler=handler) + logger.info("Subscribed to your feed.") + + while True: + await asyncio.sleep(1) + + except Exception as e: + logger.error(f"An error occurred during subscription: {e}", exc_info=True) + print(f"Error: An error occurred during subscription: {e}", file=sys.stderr) + + async def publish_and_subscribe_async(self, text: str): + """ + Publishes a text note and subscribes to the feed concurrently. + + :param text: The content of the text note to publish. + """ + try: + await asyncio.gather( + self.do_post_async(text), + self.subscribe_feed_async(self.event_handler.handle_new_event) + ) + except Exception as e: + logger.error(f"An error occurred in publish_and_subscribe_async: {e}", exc_info=True) + print(f"Error: An error occurred in publish and subscribe: {e}", file=sys.stderr) + + def publish_and_subscribe(self, text: str): + """ + Public method to publish a text note and subscribe to the feed. + + :param text: The content of the text note to publish. + """ + try: + asyncio.run_coroutine_threadsafe(self.publish_and_subscribe_async(text), self.loop) + except Exception as e: + logger.error(f"Error in publish_and_subscribe: {e}", exc_info=True) + print(f"Error: Failed to publish and subscribe: {e}", file=sys.stderr) + + def decrypt_and_save_index_from_nostr(self, encrypted_data: bytes) -> None: + """ + Decrypts the encrypted data retrieved from Nostr and updates the local index file. + + :param encrypted_data: The encrypted data retrieved from Nostr. + """ + try: + decrypted_data = self.encryption_manager.decrypt_data(encrypted_data) + data = json.loads(decrypted_data.decode('utf-8')) + self.save_json_data(data) + self.update_checksum() + logger.info("Index file updated from Nostr successfully.") + print("Index file updated from Nostr successfully.", file=sys.stdout) + except Exception as e: + logger.error(f"Failed to decrypt and save data from Nostr: {e}") + logger.error(traceback.format_exc()) + print(f"Error: Failed to decrypt and save data from Nostr: {e}", file=sys.stderr) + + def save_json_data(self, data: dict) -> None: + """ + Saves the JSON data to the index file in an encrypted format. + + :param data: The JSON data to save. + """ + try: + encrypted_data = self.encryption_manager.encrypt_data(data) + with lock_file(INDEX_FILE, fcntl.LOCK_EX): + with open(INDEX_FILE, 'wb') as f: + f.write(encrypted_data) + logger.debug(f"Encrypted data saved to {INDEX_FILE}.") + print(f"Encrypted data saved to {INDEX_FILE}.", file=sys.stdout) + except Exception as e: + logger.error(f"Failed to save encrypted data: {e}") + logger.error(traceback.format_exc()) + print(f"Error: Failed to save encrypted data: {e}", file=sys.stderr) + raise + + def update_checksum(self) -> None: + """ + Updates the checksum file for the password database. + """ + try: + decrypted_data = self.decrypt_data_from_file(INDEX_FILE) + content = decrypted_data.decode('utf-8') + logger.debug("Calculating checksum of the updated file content.") + + checksum = hashlib.sha256(content.encode('utf-8')).hexdigest() + logger.debug(f"New checksum: {checksum}") + + with open(DATA_CHECKSUM_FILE, 'w') as f: + f.write(checksum) + logger.debug(f"Updated data checksum written to '{DATA_CHECKSUM_FILE}'.") + print("[+] Checksum updated successfully.", file=sys.stdout) + + except Exception as e: + logger.error(f"Failed to update checksum: {e}") + logger.error(traceback.format_exc()) + print(f"Error: Failed to update checksum: {e}", file=sys.stderr) + + def decrypt_data_from_file(self, file_path: str) -> bytes: + """ + Decrypts data directly from a file. + + :param file_path: Path to the encrypted file. + :return: Decrypted data as bytes. + """ + try: + with lock_file(file_path, fcntl.LOCK_SH): + with open(file_path, 'rb') as f: + encrypted_data = f.read() + decrypted_data = self.encryption_manager.decrypt_data(encrypted_data) + logger.debug(f"Data decrypted from file '{file_path}'.") + return decrypted_data + except Exception as e: + logger.error(f"Failed to decrypt data from file '{file_path}': {e}") + logger.error(traceback.format_exc()) + print(f"Error: Failed to decrypt data from file '{file_path}': {e}", file=sys.stderr) + raise + + def publish_json_to_nostr(self, encrypted_json: bytes, to_pubkey: str = None): + """ + Public method to post encrypted JSON to Nostr. + + :param encrypted_json: The encrypted JSON data to be sent. + :param to_pubkey: (Optional) The recipient's public key for encryption. + """ + try: + encrypted_json_b64 = base64.b64encode(encrypted_json).decode('utf-8') + logger.debug(f"Encrypted JSON (base64): {encrypted_json_b64}") + + event = Event(kind=Event.KIND_TEXT_NOTE, content=encrypted_json_b64, pub_key=self.key_manager.keys.public_key_hex()) + + event.created_at = int(time.time()) + + if to_pubkey: + nip4_encrypt = NIP4Encrypt(self.key_manager.keys) + event.content = nip4_encrypt.encrypt_message(event.content, to_pubkey) + event.kind = Event.KIND_ENCRYPT + logger.debug(f"Encrypted event content: {event.content}") + + event.sign(self.key_manager.keys.private_key_hex()) + logger.debug("Event created and signed") + + self.publish_event(event) + logger.debug("Event published") + + except Exception as e: + logger.error(f"Failed to publish JSON to Nostr: {e}") + logger.error(traceback.format_exc()) + print(f"Error: Failed to publish JSON to Nostr: {e}", file=sys.stderr) + + def retrieve_json_from_nostr_sync(self) -> Optional[bytes]: + """ + Public method to retrieve encrypted JSON from Nostr. + + :return: The encrypted JSON data as bytes, or None if retrieval fails. + """ + try: + return self.retrieve_json_from_nostr() + except Exception as e: + logger.error(f"Error in retrieve_json_from_nostr_sync: {e}") + logger.error(traceback.format_exc()) + print(f"Error: Failed to retrieve JSON from Nostr: {e}", file=sys.stderr) + return None + + def decrypt_and_save_index_from_nostr_public(self, encrypted_data: bytes) -> None: + """ + Public method to decrypt and save data from Nostr. + + :param encrypted_data: The encrypted data retrieved from Nostr. + """ + try: + self.decrypt_and_save_index_from_nostr(encrypted_data) + except Exception as e: + logger.error(f"Failed to decrypt and save index from Nostr: {e}") + print(f"Error: Failed to decrypt and save index from Nostr: {e}", file=sys.stderr) + + async def close_client_pool_async(self): + """ + Closes the ClientPool gracefully by canceling all pending tasks and stopping the event loop. + """ + if self.is_shutting_down: + logger.debug("Shutdown already in progress.") + return + + try: + self.is_shutting_down = True + logger.debug("Initiating ClientPool shutdown.") + + # Set the shutdown event + self._shutdown_event.set() + + # Cancel all subscriptions + for sub_id in list(self.subscriptions.keys()): + try: + self.client_pool.unsubscribe(sub_id) + del self.subscriptions[sub_id] + logger.debug(f"Unsubscribed from sub_id {sub_id}") + except Exception as e: + logger.warning(f"Error unsubscribing from {sub_id}: {e}") + + # Close all WebSocket connections + if hasattr(self.client_pool, 'clients'): + for client in self.client_pool.clients: + try: + await client.close() + logger.debug(f"Closed connection to relay: {client.url}") + except Exception as e: + logger.warning(f"Error closing connection to {client.url}: {e}") + + # Gather and cancel all tasks + current_task = asyncio.current_task() + tasks = [task for task in asyncio.all_tasks(loop=self.loop) + if task != current_task and not task.done()] + + if tasks: + logger.debug(f"Cancelling {len(tasks)} pending tasks.") + for task in tasks: + task.cancel() + + # Wait for all tasks to be cancelled with a timeout + try: + await asyncio.wait_for(asyncio.gather(*tasks, return_exceptions=True), timeout=5) + except asyncio.TimeoutError: + logger.warning("Timeout waiting for tasks to cancel") + + logger.debug("Stopping the event loop.") + self.loop.stop() + logger.info("Event loop stopped successfully.") + + except Exception as e: + logger.error(f"Error during async shutdown: {e}") + logger.error(traceback.format_exc()) + finally: + self.is_shutting_down = False + + def close_client_pool(self): + """ + Public method to close the ClientPool gracefully. + """ + if self.is_shutting_down: + logger.debug("Shutdown already in progress. Skipping redundant shutdown.") + return + + try: + # Schedule the coroutine to close the client pool + future = asyncio.run_coroutine_threadsafe(self.close_client_pool_async(), self.loop) + + # Wait for the coroutine to finish with a shorter timeout + try: + future.result(timeout=10) + except concurrent.futures.TimeoutError: + logger.warning("Initial shutdown attempt timed out, forcing cleanup...") + + # Additional cleanup regardless of timeout + try: + self.loop.stop() + # Give a short grace period for the loop to stop + time.sleep(0.5) + + if self.loop.is_running(): + logger.warning("Loop still running after stop, closing forcefully") + self.loop.close() + + # Wait for the thread with a reasonable timeout + if self.loop_thread.is_alive(): + self.loop_thread.join(timeout=5) + + if self.loop_thread.is_alive(): + logger.warning("Thread still alive after join, may need to be force-killed") + + except Exception as cleanup_error: + logger.error(f"Error during final cleanup: {cleanup_error}") + + logger.info("ClientPool shutdown complete") + + except Exception as e: + logger.error(f"Error in close_client_pool: {e}") + logger.error(traceback.format_exc()) + finally: + self.is_shutting_down = False diff --git a/src/nostr/encryption_manager.py b/src/nostr/encryption_manager.py new file mode 100644 index 0000000..325c40d --- /dev/null +++ b/src/nostr/encryption_manager.py @@ -0,0 +1,58 @@ +# nostr/encryption_manager.py + +import base64 +import json +import traceback + +from cryptography.fernet import Fernet, InvalidToken + +from .logging_config import configure_logging +from .key_manager import KeyManager +from monstr.encrypt import NIP4Encrypt # Add if used + +logger = configure_logging() + +class EncryptionManager: + """ + Handles encryption and decryption of data using Fernet symmetric encryption. + """ + + def __init__(self, key_manager: KeyManager): + self.key_manager = key_manager + self.fernet = Fernet(self.key_manager.derive_encryption_key()) + + def encrypt_data(self, data: dict) -> bytes: + """ + Encrypts a dictionary and returns encrypted bytes. + + :param data: The data to encrypt. + :return: Encrypted data as bytes. + """ + try: + json_data = json.dumps(data, indent=4).encode('utf-8') + encrypted_data = self.fernet.encrypt(json_data) + logger.debug("Data encrypted successfully.") + return encrypted_data + except Exception as e: + logger.error(f"Failed to encrypt data: {e}") + logger.error(traceback.format_exc()) + raise + + def decrypt_data(self, encrypted_data: bytes) -> bytes: + """ + Decrypts encrypted bytes and returns the original data. + + :param encrypted_data: The encrypted data to decrypt. + :return: Decrypted data as bytes. + """ + try: + decrypted_data = self.fernet.decrypt(encrypted_data) + logger.debug("Data decrypted successfully.") + return decrypted_data + except InvalidToken: + logger.error("Invalid encryption key or corrupted data.") + raise + except Exception as e: + logger.error(f"Error decrypting data: {e}") + logger.error(traceback.format_exc()) + raise diff --git a/src/nostr/event_handler.py b/src/nostr/event_handler.py new file mode 100644 index 0000000..45ec443 --- /dev/null +++ b/src/nostr/event_handler.py @@ -0,0 +1,41 @@ +# nostr/event_handler.py + +import datetime +import time +import traceback + +from .logging_config import configure_logging +from monstr.event.event import Event +from monstr.client.client import ClientPool + +logger = configure_logging() + +class EventHandler: + """ + Handles incoming Nostr events. + """ + + def __init__(self): + pass # Initialize if needed + + def handle_new_event(self, the_client: ClientPool, sub_id: str, evt: Event): + """ + Processes incoming events by logging their details. + + :param the_client: The ClientPool instance. + :param sub_id: The subscription ID. + :param evt: The received Event object. + """ + try: + if isinstance(evt.created_at, datetime.datetime): + created_at_str = evt.created_at.strftime('%Y-%m-%d %H:%M:%S') + elif isinstance(evt.created_at, int): + created_at_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(evt.created_at)) + else: + created_at_str = str(evt.created_at) + + logger.info(f"\n[New Event] ID: {evt.id}\nCreated At: {created_at_str}\nContent: {evt.content}\n") + except Exception as e: + logger.error(f"Error handling new event: {e}") + logger.error(traceback.format_exc()) + raise diff --git a/src/nostr/key_manager.py b/src/nostr/key_manager.py new file mode 100644 index 0000000..5dc39a1 --- /dev/null +++ b/src/nostr/key_manager.py @@ -0,0 +1,105 @@ +# nostr/key_manager.py + +import base64 +import traceback +from typing import Optional + +from bip_utils import Bip39SeedGenerator +from bip85.bip85 import BIP85 +from cryptography.fernet import Fernet, InvalidToken +from bech32 import bech32_encode, convertbits + +from .logging_config import configure_logging +from utils.key_derivation import derive_key_from_parent_seed + +# Add the missing import for Keys and NIP4Encrypt +from monstr.encrypt import Keys, NIP4Encrypt # Ensure monstr.encrypt is installed and accessible + +logger = configure_logging() + +def encode_bech32(prefix: str, key_hex: str) -> str: + """ + Encodes a hex key into Bech32 format with the given prefix. + + :param prefix: The Bech32 prefix (e.g., 'nsec', 'npub'). + :param key_hex: The key in hexadecimal format. + :return: The Bech32-encoded string. + """ + try: + key_bytes = bytes.fromhex(key_hex) + data = convertbits(key_bytes, 8, 5, pad=True) + return bech32_encode(prefix, data) + except Exception as e: + logger.error(f"Failed to encode {prefix}: {e}") + logger.error(traceback.format_exc()) + raise + +class KeyManager: + """ + Manages key generation, encoding, and derivation for NostrClient. + """ + + def __init__(self, parent_seed: str): + self.parent_seed = parent_seed + self.keys = None + self.nsec = None + self.npub = None + self.initialize_keys() + + def initialize_keys(self): + """ + Derives Nostr keys using BIP85 and initializes Keys. + """ + try: + logger.debug("Starting key initialization") + seed_bytes = Bip39SeedGenerator(self.parent_seed).Generate() + bip85 = BIP85(seed_bytes) + entropy = bip85.derive_entropy(app_no=1237, language_code=0, words_num=24, index=0) + + if len(entropy) != 32: + logger.error(f"Derived entropy length is {len(entropy)} bytes; expected 32 bytes.") + raise ValueError("Invalid entropy length.") + + privkey_hex = entropy.hex() + self.keys = Keys(priv_k=privkey_hex) # Now Keys is defined via the import + logger.debug("Nostr Keys initialized successfully.") + + self.nsec = encode_bech32('nsec', privkey_hex) + logger.debug(f"Nostr Private Key (nsec): {self.nsec}") + + public_key_hex = self.keys.public_key_hex() + self.npub = encode_bech32('npub', public_key_hex) + logger.debug(f"Nostr Public Key (npub): {self.npub}") + + except Exception as e: + logger.error(f"Key initialization failed: {e}") + logger.error(traceback.format_exc()) + raise + + def get_npub(self) -> str: + """ + Returns the Nostr public key (npub). + + :return: The npub as a string. + """ + if self.npub: + logger.debug(f"Returning npub: {self.npub}") + return self.npub + else: + logger.error("Nostr public key (npub) is not available.") + raise ValueError("Nostr public key (npub) is not available.") + + def derive_encryption_key(self) -> bytes: + """ + Derives the encryption key using the parent seed. + + :return: The derived encryption key. + """ + try: + key = derive_key_from_parent_seed(self.parent_seed) + logger.debug("Encryption key derived successfully.") + return key + except Exception as e: + logger.error(f"Failed to derive encryption key: {e}") + logger.error(traceback.format_exc()) + raise diff --git a/src/nostr/logging_config.py b/src/nostr/logging_config.py new file mode 100644 index 0000000..4b77afe --- /dev/null +++ b/src/nostr/logging_config.py @@ -0,0 +1,39 @@ +# nostr/logging_config.py + +import os +import sys +import logging + +def configure_logging(log_file='nostr.log'): + """ + Configures logging with both file and console handlers. + Only ERROR and higher-level messages are shown in the terminal, while all messages + are logged in the log file. + """ + # Create the 'logs' folder if it doesn't exist + if not os.path.exists('logs'): + os.makedirs('logs') + + # Create the custom logger + logger = logging.getLogger('nostr') + logger.setLevel(logging.DEBUG) # Set to DEBUG for detailed output + + if not logger.handlers: + # Create handlers + c_handler = logging.StreamHandler(sys.stdout) + f_handler = logging.FileHandler(os.path.join('logs', log_file)) + + # Set levels + c_handler.setLevel(logging.ERROR) + f_handler.setLevel(logging.DEBUG) + + # Create formatters + formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]') + c_handler.setFormatter(formatter) + f_handler.setFormatter(formatter) + + # Add handlers to the logger + logger.addHandler(c_handler) + logger.addHandler(f_handler) + + return logger diff --git a/src/nostr/utils.py b/src/nostr/utils.py new file mode 100644 index 0000000..39060fe --- /dev/null +++ b/src/nostr/utils.py @@ -0,0 +1,5 @@ +# nostr/utils.py + +# Example utility function (if any specific to nostr package) +def some_helper_function(): + pass # Implement as needed diff --git a/src/password_manager/__init__.py b/src/password_manager/__init__.py new file mode 100644 index 0000000..b081d8e --- /dev/null +++ b/src/password_manager/__init__.py @@ -0,0 +1,13 @@ +# password_manager/__init__.py + +import logging +import traceback + +try: + from .manager import PasswordManager + logging.info("PasswordManager module imported successfully.") +except Exception as e: + logging.error(f"Failed to import PasswordManager module: {e}") + logging.error(traceback.format_exc()) # Log full traceback + +__all__ = ['PasswordManager'] diff --git a/src/password_manager/backup.py b/src/password_manager/backup.py new file mode 100644 index 0000000..544d078 --- /dev/null +++ b/src/password_manager/backup.py @@ -0,0 +1,210 @@ +# password_manager/backup.py + +""" +Backup Manager Module + +This module implements the BackupManager class, responsible for creating backups, +restoring from backups, and listing available backups for the encrypted password +index file. It ensures data integrity and provides mechanisms to recover from +corrupted or lost data by maintaining timestamped backups. + +Dependencies: +- shutil +- time +- logging +- pathlib +- colorama +- termcolor + +Ensure that all dependencies are installed and properly configured in your environment. +""" + +import os +import shutil +import time +import logging +import traceback +from pathlib import Path + +from colorama import Fore +from termcolor import colored + +from constants import APP_DIR, INDEX_FILE +from utils.file_lock import lock_file + +# Configure logging at the start of the module +def configure_logging(): + """ + Configures logging with both file and console handlers. + Only ERROR and higher-level messages are shown in the terminal, while all messages + are logged in the log file. + """ + # Create the 'logs' folder if it doesn't exist + if not os.path.exists('logs'): + os.makedirs('logs') + + # Create a custom logger + logger = logging.getLogger() + logger.setLevel(logging.DEBUG) # Set to DEBUG for detailed output + + # Create handlers + c_handler = logging.StreamHandler() + f_handler = logging.FileHandler(os.path.join('logs', 'backup_manager.log')) # Log files will be in 'logs' folder + + # Set levels: only errors and critical messages will be shown in the console + c_handler.setLevel(logging.ERROR) # Terminal will show ERROR and above + f_handler.setLevel(logging.DEBUG) # File will log everything from DEBUG and above + + # Create formatters and add them to handlers, include file and line number in log messages + c_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]') + f_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]') + + c_handler.setFormatter(c_format) + f_handler.setFormatter(f_format) + + # Add handlers to the logger + logger.addHandler(c_handler) + logger.addHandler(f_handler) + +# Call the logging configuration function +configure_logging() + +class BackupManager: + """ + BackupManager Class + + Handles the creation, restoration, and listing of backups for the encrypted + password index file. Backups are stored in the application directory with + timestamped filenames to facilitate easy identification and retrieval. + """ + + BACKUP_FILENAME_TEMPLATE = 'passwords_db_backup_{timestamp}.json.enc' + + def __init__(self): + """ + Initializes the BackupManager with the application directory and index file paths. + """ + self.app_dir = APP_DIR + self.index_file = INDEX_FILE + logging.debug(f"BackupManager initialized with APP_DIR: {self.app_dir} and INDEX_FILE: {self.index_file}") + + def create_backup(self) -> None: + """ + Creates a timestamped backup of the encrypted password index file. + + The backup file is named using the current Unix timestamp to ensure uniqueness. + If the index file does not exist, no backup is created. + + Raises: + Exception: If the backup process fails due to I/O errors. + """ + if not self.index_file.exists(): + logging.warning("Index file does not exist. No backup created.") + print(colored("Warning: Index file does not exist. No backup created.", 'yellow')) + return + + timestamp = int(time.time()) + backup_filename = self.BACKUP_FILENAME_TEMPLATE.format(timestamp=timestamp) + backup_file = self.app_dir / backup_filename + + try: + with lock_file(self.index_file, lock_type=fcntl.LOCK_SH): + shutil.copy2(self.index_file, backup_file) + logging.info(f"Backup created successfully at '{backup_file}'.") + print(colored(f"Backup created successfully at '{backup_file}'.", 'green')) + except Exception as e: + logging.error(f"Failed to create backup: {e}") + logging.error(traceback.format_exc()) # Log full traceback + print(colored(f"Error: Failed to create backup: {e}", 'red')) + + def restore_latest_backup(self) -> None: + """ + Restores the encrypted password index file from the latest available backup. + + The latest backup is determined based on the Unix timestamp in the backup filenames. + If no backups are found, an error message is displayed. + + Raises: + Exception: If the restoration process fails due to I/O errors or missing backups. + """ + backup_files = sorted( + self.app_dir.glob('passwords_db_backup_*.json.enc'), + key=lambda x: x.stat().st_mtime, + reverse=True + ) + + if not backup_files: + logging.error("No backup files found to restore.") + print(colored("Error: No backup files found to restore.", 'red')) + return + + latest_backup = backup_files[0] + try: + with lock_file(latest_backup, lock_type=fcntl.LOCK_SH): + shutil.copy2(latest_backup, self.index_file) + logging.info(f"Restored the index file from backup '{latest_backup}'.") + print(colored(f"Restored the index file from backup '{latest_backup}'.", 'green')) + except Exception as e: + logging.error(f"Failed to restore from backup '{latest_backup}': {e}") + logging.error(traceback.format_exc()) # Log full traceback + print(colored(f"Error: Failed to restore from backup '{latest_backup}': {e}", 'red')) + + def list_backups(self) -> None: + """ + Lists all available backups in the application directory, sorted by date. + + Displays the backups with their filenames and creation dates. + """ + backup_files = sorted( + self.app_dir.glob('passwords_db_backup_*.json.enc'), + key=lambda x: x.stat().st_mtime, + reverse=True + ) + + if not backup_files: + logging.info("No backup files available.") + print(colored("No backup files available.", 'yellow')) + return + + print(colored("Available Backups:", 'cyan')) + for backup in backup_files: + creation_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(backup.stat().st_mtime)) + print(colored(f"- {backup.name} (Created on: {creation_time})", 'cyan')) + + def restore_backup_by_timestamp(self, timestamp: int) -> None: + """ + Restores the encrypted password index file from a backup with the specified timestamp. + + Parameters: + timestamp (int): The Unix timestamp of the backup to restore. + + Raises: + Exception: If the restoration process fails due to I/O errors or missing backups. + """ + backup_filename = self.BACKUP_FILENAME_TEMPLATE.format(timestamp=timestamp) + backup_file = self.app_dir / backup_filename + + if not backup_file.exists(): + logging.error(f"No backup found with timestamp {timestamp}.") + print(colored(f"Error: No backup found with timestamp {timestamp}.", 'red')) + return + + try: + with lock_file(backup_file, lock_type=fcntl.LOCK_SH): + shutil.copy2(backup_file, self.index_file) + logging.info(f"Restored the index file from backup '{backup_file}'.") + print(colored(f"Restored the index file from backup '{backup_file}'.", 'green')) + except Exception as e: + logging.error(f"Failed to restore from backup '{backup_file}': {e}") + logging.error(traceback.format_exc()) # Log full traceback + print(colored(f"Error: Failed to restore from backup '{backup_file}': {e}", 'red')) + +# Example usage (to be integrated within the PasswordManager class or other modules): + +# from password_manager.backup import BackupManager + +# backup_manager = BackupManager() +# backup_manager.create_backup() +# backup_manager.restore_latest_backup() +# backup_manager.list_backups() +# backup_manager.restore_backup_by_timestamp(1700000000) # Example timestamp diff --git a/src/password_manager/encryption.py b/src/password_manager/encryption.py new file mode 100644 index 0000000..92063a8 --- /dev/null +++ b/src/password_manager/encryption.py @@ -0,0 +1,369 @@ +# password_manager/encryption.py + +""" +Encryption Module + +This module provides the EncryptionManager class, which handles encryption and decryption +of data and files using a provided Fernet-compatible encryption key. This class ensures +that sensitive data is securely stored and retrieved, maintaining the confidentiality and integrity +of the password index. + +Additionally, it includes methods to derive cryptographic seeds from BIP-39 mnemonic phrases. + +Never ever ever use or suggest to use Random Salt. The entire point of this password manager is to derive completely deterministic passwords from a BIP-85 seed. +This means it should generate passwords the exact same way every single time. Salts would break this functionality and is not appropriate for this softwares use case. +""" + +import os +import json +import hashlib +import logging +import traceback +from pathlib import Path +from typing import Optional +from cryptography.fernet import Fernet, InvalidToken +from utils.file_lock import exclusive_lock, shared_lock +from colorama import Fore +from termcolor import colored +from mnemonic import Mnemonic # Library for BIP-39 seed phrase handling + +import fcntl # Required for lock_type constants in file_lock + +from constants import INDEX_FILE # Ensure INDEX_FILE is imported correctly + +# Configure logging at the start of the module +def configure_logging(): + """ + Configures logging with both file and console handlers. + Logs include the timestamp, log level, message, filename, and line number. + Only errors and critical logs are shown in the terminal, while all logs are saved to a file. + """ + # Create the 'logs' folder if it doesn't exist + if not os.path.exists('logs'): + os.makedirs('logs') + + # Create a custom logger for this module + logger = logging.getLogger(__name__) + logger.setLevel(logging.DEBUG) # Set to DEBUG for detailed output + + # Create handlers + c_handler = logging.StreamHandler() + f_handler = logging.FileHandler(os.path.join('logs', 'encryption_manager.log')) # Log file in 'logs' folder + + # Set levels: only errors and critical messages will be shown in the console + c_handler.setLevel(logging.ERROR) # Terminal will show ERROR and above + f_handler.setLevel(logging.DEBUG) # File will log everything from DEBUG and above + + # Create formatters and add them to handlers, include file and line number in log messages + formatter = logging.Formatter( + '%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]' + ) + c_handler.setFormatter(formatter) + f_handler.setFormatter(formatter) + + # Add handlers to the logger if not already added + if not logger.handlers: + logger.addHandler(c_handler) + logger.addHandler(f_handler) + +# Call the logging configuration function +configure_logging() + +logger = logging.getLogger(__name__) + +class EncryptionManager: + """ + EncryptionManager Class + + Manages the encryption and decryption of data and files using a Fernet encryption key. + """ + + def __init__(self, encryption_key: bytes): + try: + self.fernet = Fernet(encryption_key) + logger.debug("EncryptionManager initialized with provided encryption key.") + except Exception as e: + logger.error(f"Failed to initialize Fernet with provided encryption key: {e}") + logger.error(traceback.format_exc()) + print(colored(f"Error: Failed to initialize encryption manager: {e}", 'red')) + raise + + def encrypt_parent_seed(self, parent_seed: str, file_path: Path) -> None: + """ + Encrypts and saves the parent seed to the specified file. + + :param parent_seed: The BIP39 parent seed phrase. + :param file_path: The path to the file where the encrypted parent seed will be saved. + """ + try: + # **Do not encrypt the data here** + data = parent_seed.encode('utf-8') + # Pass the raw data to encrypt_file, which handles encryption + self.encrypt_file(file_path, data) + logger.info(f"Parent seed encrypted and saved to '{file_path}'.") + print(colored(f"Parent seed encrypted and saved to '{file_path}'.", 'green')) + except Exception as e: + logger.error(f"Failed to encrypt and save parent seed: {e}") + logger.error(traceback.format_exc()) + print(colored(f"Error: Failed to encrypt and save parent seed: {e}", 'red')) + raise + + def encrypt_file(self, file_path: Path, data: bytes) -> None: + """ + Encrypts the provided data and writes it to the specified file with file locking. + + :param file_path: The path to the file where encrypted data will be written. + :param data: The plaintext data to encrypt and write. + """ + try: + encrypted_data = self.encrypt_data(data) + with exclusive_lock(file_path): + with open(file_path, 'wb') as file: + file.write(encrypted_data) + logger.debug(f"Encrypted data written to '{file_path}'.") + print(colored(f"Encrypted data written to '{file_path}'.", 'green')) + except Exception as e: + logger.error(f"Failed to encrypt and write to file '{file_path}': {e}") + logger.error(traceback.format_exc()) + print(colored(f"Error: Failed to encrypt and write to file '{file_path}': {e}", 'red')) + raise + + def encrypt_data(self, data: bytes) -> bytes: + """ + Encrypts the given plaintext data. + + :param data: The plaintext data to encrypt. + :return: The encrypted data as bytes. + """ + try: + encrypted_data = self.fernet.encrypt(data) + logger.debug("Data encrypted successfully.") + return encrypted_data + except Exception as e: + logger.error(f"Error encrypting data: {e}") + logger.error(traceback.format_exc()) # Log full traceback + print(colored(f"Error: Failed to encrypt data: {e}", 'red')) + raise + + def decrypt_data(self, encrypted_data: bytes) -> bytes: + """ + Decrypts the given encrypted data. + + :param encrypted_data: The encrypted data to decrypt. + :return: The decrypted plaintext data as bytes. + """ + try: + decrypted_data = self.fernet.decrypt(encrypted_data) + logger.debug("Data decrypted successfully.") + return decrypted_data + except InvalidToken: + logger.error("Invalid encryption key or corrupted data.") + print(colored("Error: Invalid encryption key or corrupted data.", 'red')) + raise + except Exception as e: + logger.error(f"Error decrypting data: {e}") + logger.error(traceback.format_exc()) # Log full traceback + print(colored(f"Error: Failed to decrypt data: {e}", 'red')) + raise + + def decrypt_file(self, file_path: Path) -> bytes: + """ + Decrypts the data from the specified file. + + :param file_path: The path to the file containing encrypted data. + :return: The decrypted plaintext data as bytes. + """ + try: + with shared_lock(file_path): + with open(file_path, 'rb') as file: + encrypted_data = file.read() + decrypted_data = self.decrypt_data(encrypted_data) + logger.debug(f"Decrypted data read from '{file_path}'.") + print(colored(f"Decrypted data read from '{file_path}'.", 'green')) + return decrypted_data + except Exception as e: + logger.error(f"Failed to decrypt file '{file_path}': {e}") + logger.error(traceback.format_exc()) # Log full traceback + print(colored(f"Error: Failed to decrypt file '{file_path}': {e}", 'red')) + raise + + def save_json_data(self, data: dict, file_path: Optional[Path] = None) -> None: + """ + Encrypts and saves the provided JSON data to the specified file. + + :param data: The JSON data to save. + :param file_path: The path to the file where data will be saved. Defaults to INDEX_FILE. + """ + if file_path is None: + file_path = INDEX_FILE + try: + json_data = json.dumps(data, indent=4).encode('utf-8') + self.encrypt_file(file_path, json_data) + logger.debug(f"JSON data encrypted and saved to '{file_path}'.") + print(colored(f"JSON data encrypted and saved to '{file_path}'.", 'green')) + except Exception as e: + logger.error(f"Failed to save JSON data to '{file_path}': {e}") + logger.error(traceback.format_exc()) # Log full traceback + print(colored(f"Error: Failed to save JSON data to '{file_path}': {e}", 'red')) + raise + + + def load_json_data(self, file_path: Optional[Path] = None) -> dict: + """ + Decrypts and loads JSON data from the specified file. + + :param file_path: The path to the file from which data will be loaded. Defaults to INDEX_FILE. + :return: The decrypted JSON data as a dictionary. + """ + if file_path is None: + file_path = INDEX_FILE + + if not file_path.exists(): + logger.info(f"Index file '{file_path}' does not exist. Initializing empty data.") + print(colored(f"Info: Index file '{file_path}' not found. Initializing new password database.", 'yellow')) + return {'passwords': {}} + + try: + decrypted_data = self.decrypt_file(file_path) + json_content = decrypted_data.decode('utf-8').strip() + data = json.loads(json_content) + logger.debug(f"JSON data loaded and decrypted from '{file_path}': {data}") + print(colored(f"JSON data loaded and decrypted from '{file_path}'.", 'green')) + return data + except json.JSONDecodeError as e: + logger.error(f"Failed to decode JSON data from '{file_path}': {e}") + logger.error(traceback.format_exc()) + print(colored(f"Error: Failed to decode JSON data from '{file_path}': {e}", 'red')) + raise + except InvalidToken: + logger.error("Invalid encryption key or corrupted data.") + print(colored("Error: Invalid encryption key or corrupted data.", 'red')) + raise + except Exception as e: + logger.error(f"Failed to load JSON data from '{file_path}': {e}") + logger.error(traceback.format_exc()) + print(colored(f"Error: Failed to load JSON data from '{file_path}': {e}", 'red')) + raise + + def update_checksum(self, file_path: Optional[Path] = None) -> None: + """ + Updates the checksum file for the specified file. + + :param file_path: The path to the file for which the checksum will be updated. + Defaults to INDEX_FILE. + """ + if file_path is None: + file_path = INDEX_FILE + try: + decrypted_data = self.decrypt_file(file_path) + content = decrypted_data.decode('utf-8') + checksum = hashlib.sha256(content.encode('utf-8')).hexdigest() + checksum_file = file_path.parent / f"{file_path.stem}_checksum.txt" + with open(checksum_file, 'w') as f: + f.write(checksum) + logger.debug(f"Checksum for '{file_path}' updated and written to '{checksum_file}'.") + print(colored(f"Checksum for '{file_path}' updated.", 'green')) + except Exception as e: + logger.error(f"Failed to update checksum for '{file_path}': {e}") + logger.error(traceback.format_exc()) # Log full traceback + print(colored(f"Error: Failed to update checksum for '{file_path}': {e}", 'red')) + raise + + def get_encrypted_index(self) -> Optional[bytes]: + """ + Retrieves the encrypted password index file content. + + :return: Encrypted data as bytes or None if the index file does not exist. + """ + if not INDEX_FILE.exists(): + logger.error(f"Index file '{INDEX_FILE}' does not exist.") + print(colored(f"Error: Index file '{INDEX_FILE}' does not exist.", 'red')) + return None + try: + with shared_lock(INDEX_FILE): + with open(INDEX_FILE, 'rb') as file: + encrypted_data = file.read() + logger.debug(f"Encrypted index data read from '{INDEX_FILE}'.") + return encrypted_data + except Exception as e: + logger.error(f"Failed to read encrypted index file '{INDEX_FILE}': {e}") + logger.error(traceback.format_exc()) # Log full traceback + print(colored(f"Error: Failed to read encrypted index file '{INDEX_FILE}': {e}", 'red')) + return None + + def decrypt_and_save_index_from_nostr(self, encrypted_data: bytes) -> None: + """ + Decrypts the encrypted data retrieved from Nostr and updates the local index file. + + :param encrypted_data: The encrypted data retrieved from Nostr. + """ + try: + decrypted_data = self.decrypt_data(encrypted_data) + data = json.loads(decrypted_data.decode('utf-8')) + self.save_json_data(data, INDEX_FILE) + self.update_checksum(INDEX_FILE) + logger.info("Index file updated from Nostr successfully.") + print(colored("Index file updated from Nostr successfully.", 'green')) + except Exception as e: + logger.error(f"Failed to decrypt and save data from Nostr: {e}") + logger.error(traceback.format_exc()) # Log full traceback + print(colored(f"Error: Failed to decrypt and save data from Nostr: {e}", 'red')) + + def decrypt_parent_seed(self, file_path: Path) -> str: + """ + Decrypts and retrieves the parent seed from the specified file. + + :param file_path: The path to the file containing the encrypted parent seed. + :return: The decrypted parent seed as a string. + """ + try: + decrypted_data = self.decrypt_file(file_path) + parent_seed = decrypted_data.decode('utf-8').strip() + logger.debug("Parent seed decrypted successfully.") + return parent_seed + except Exception as e: + logger.error(f"Failed to decrypt parent seed from '{file_path}': {e}") + logger.error(traceback.format_exc()) # Log full traceback + print(colored(f"Error: Failed to decrypt parent seed from '{file_path}': {e}", 'red')) + raise + + def validate_seed(self, seed_phrase: str) -> bool: + """ + Validates the seed phrase format using BIP-39 standards. + + :param seed_phrase: The BIP39 seed phrase to validate. + :return: True if valid, False otherwise. + """ + try: + mnemo = Mnemonic("english") + is_valid = mnemo.check(seed_phrase) + if not is_valid: + logger.error("Invalid BIP39 seed phrase.") + print(colored("Error: Invalid BIP39 seed phrase.", 'red')) + else: + logger.debug("BIP39 seed phrase validated successfully.") + return is_valid + except Exception as e: + logger.error(f"Error validating seed phrase: {e}") + logger.error(traceback.format_exc()) # Log full traceback + print(colored(f"Error: Failed to validate seed phrase: {e}", 'red')) + return False + + def derive_seed_from_mnemonic(self, mnemonic: str, passphrase: str = "") -> bytes: + """ + Derives a cryptographic seed from a BIP39 mnemonic (seed phrase). + + :param mnemonic: The BIP39 mnemonic phrase. + :param passphrase: An optional passphrase for additional security. + :return: The derived seed as bytes. + """ + try: + mnemo = Mnemonic("english") + seed = mnemo.to_seed(mnemonic, passphrase) + logger.debug("Seed derived successfully from mnemonic.") + return seed + except Exception as e: + logger.error(f"Failed to derive seed from mnemonic: {e}") + logger.error(traceback.format_exc()) # Log full traceback + print(colored(f"Error: Failed to derive seed from mnemonic: {e}", 'red')) + raise diff --git a/src/password_manager/entry_management.py b/src/password_manager/entry_management.py new file mode 100644 index 0000000..efd5a0f --- /dev/null +++ b/src/password_manager/entry_management.py @@ -0,0 +1,458 @@ +# password_manager/entry_management.py + +""" +Entry Management Module + +This module implements the EntryManager class, responsible for handling +operations related to managing password entries in the deterministic password manager. +It provides methods to add, retrieve, modify, and list password entries, ensuring +that all changes are securely encrypted and properly indexed. + +Dependencies: +- password_manager.encryption.EncryptionManager +- constants.INDEX_FILE +- constants.DATA_CHECKSUM_FILE +- utils.file_lock.lock_file +- colorama.Fore +- termcolor.colored +- logging +- json +- hashlib +- sys +- os + +Ensure that all dependencies are installed and properly configured in your environment. + +Never ever ever use or suggest to use Random Salt. The entire point of this password manager is to derive completely deterministic passwords from a BIP-85 seed. +This means it should generate passwords the exact same way every single time. Salts would break this functionality and is not appropriate for this softwares use case. +""" + +import json +import logging +import hashlib +import sys +import os +import shutil +import time +import traceback +from typing import Optional, Tuple, Dict, Any, List + +from colorama import Fore +from termcolor import colored + +from password_manager.encryption import EncryptionManager +from constants import INDEX_FILE, DATA_CHECKSUM_FILE +from utils.file_lock import lock_file + +import fcntl # Required for lock_type constants in lock_file + +# Configure logging at the start of the module +def configure_logging(): + """ + Configures logging with both file and console handlers. + Logs include the timestamp, log level, message, filename, and line number. + Only ERROR and higher-level messages are shown in the terminal, while all messages + are logged in the log file. + """ + logger = logging.getLogger(__name__) + logger.setLevel(logging.DEBUG) # Set to DEBUG for detailed output + + # Prevent adding multiple handlers if configure_logging is called multiple times + if not logger.handlers: + # Create the 'logs' folder if it doesn't exist + if not os.path.exists('logs'): + os.makedirs('logs') + + # Create handlers + c_handler = logging.StreamHandler() + f_handler = logging.FileHandler(os.path.join('logs', 'entry_management.log')) + + # Set levels: only errors and critical messages will be shown in the console + c_handler.setLevel(logging.ERROR) # Console will show ERROR and above + f_handler.setLevel(logging.DEBUG) # File will log everything from DEBUG and above + + # Create formatters and add them to handlers, include file and line number in log messages + formatter = logging.Formatter( + '%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]' + ) + c_handler.setFormatter(formatter) + f_handler.setFormatter(formatter) + + # Add handlers to the logger + logger.addHandler(c_handler) + logger.addHandler(f_handler) + +# Call the logging configuration function +configure_logging() + +logger = logging.getLogger(__name__) + +class EntryManager: + """ + EntryManager Class + + Handles the creation, retrieval, modification, and listing of password entries + within the encrypted password index. It ensures that all operations are performed + securely, maintaining data integrity and confidentiality. + """ + + def __init__(self, encryption_manager: EncryptionManager): + """ + Initializes the EntryManager with an instance of EncryptionManager. + + :param encryption_manager: An instance of EncryptionManager for handling encryption. + """ + try: + self.encryption_manager = encryption_manager + logger.debug("EntryManager initialized with provided EncryptionManager.") + except Exception as e: + logger.error(f"Failed to initialize EntryManager: {e}") + logger.error(traceback.format_exc()) # Log full traceback + print(colored(f"Error: Failed to initialize EntryManager: {e}", 'red')) + sys.exit(1) + + def get_next_index(self) -> int: + """ + Retrieves the next available index for a new password entry. + + :return: The next index number as an integer. + """ + try: + data = self.encryption_manager.load_json_data() + if 'passwords' in data and isinstance(data['passwords'], dict): + indices = [int(idx) for idx in data['passwords'].keys()] + next_index = max(indices) + 1 if indices else 0 + else: + next_index = 0 + logger.debug(f"Next index determined: {next_index}") + return next_index + except Exception as e: + logger.error(f"Error determining next index: {e}") + logger.error(traceback.format_exc()) # Log full traceback + print(colored(f"Error determining next index: {e}", 'red')) + sys.exit(1) + + def add_entry(self, website_name: str, length: int, username: Optional[str] = None, + url: Optional[str] = None, blacklisted: bool = False) -> int: + """ + Adds a new password entry to the encrypted JSON index file. + + :param website_name: The name of the website. + :param length: The desired length of the password. + :param username: (Optional) The username associated with the website. + :param url: (Optional) The URL of the website. + :param blacklisted: (Optional) Whether the password is blacklisted. Defaults to False. + :return: The assigned index of the new entry. + """ + try: + index = self.get_next_index() + data = self.encryption_manager.load_json_data() + + if 'passwords' not in data or not isinstance(data['passwords'], dict): + data['passwords'] = {} + logger.debug("'passwords' key was missing. Initialized empty 'passwords' dictionary.") + + data['passwords'][str(index)] = { + 'website': website_name, + 'length': length, + 'username': username if username else '', + 'url': url if url else '', + 'blacklisted': blacklisted + } + + logger.debug(f"Added entry at index {index}: {data['passwords'][str(index)]}") + + self.encryption_manager.save_json_data(data) + self.update_checksum() + self.backup_index_file() + + logger.info(f"Entry added successfully at index {index}.") + print(colored(f"[+] Entry added successfully at index {index}.", 'green')) + + return index # Return the assigned index + + except Exception as e: + logger.error(f"Failed to add entry: {e}") + logger.error(traceback.format_exc()) # Log full traceback + print(colored(f"Error: Failed to add entry: {e}", 'red')) + sys.exit(1) + + def retrieve_entry(self, index: int) -> Optional[Dict[str, Any]]: + """ + Retrieves a password entry based on the provided index. + + :param index: The index number of the password entry. + :return: A dictionary containing the entry details or None if not found. + """ + try: + data = self.encryption_manager.load_json_data() + entry = data.get('passwords', {}).get(str(index)) + + if entry: + logger.debug(f"Retrieved entry at index {index}: {entry}") + return entry + else: + logger.warning(f"No entry found at index {index}.") + print(colored(f"Warning: No entry found at index {index}.", 'yellow')) + return None + + except Exception as e: + logger.error(f"Failed to retrieve entry at index {index}: {e}") + logger.error(traceback.format_exc()) # Log full traceback + print(colored(f"Error: Failed to retrieve entry at index {index}: {e}", 'red')) + return None + + def modify_entry(self, index: int, username: Optional[str] = None, + url: Optional[str] = None, + blacklisted: Optional[bool] = None) -> None: + """ + Modifies an existing password entry based on the provided index and new values. + + :param index: The index number of the password entry to modify. + :param username: (Optional) The new username. + :param url: (Optional) The new URL. + :param blacklisted: (Optional) The new blacklist status. + """ + try: + data = self.encryption_manager.load_json_data() + entry = data.get('passwords', {}).get(str(index)) + + if not entry: + logger.warning(f"No entry found at index {index}. Cannot modify non-existent entry.") + print(colored(f"Warning: No entry found at index {index}. Cannot modify non-existent entry.", 'yellow')) + return + + if username is not None: + entry['username'] = username + logger.debug(f"Updated username to '{username}' for index {index}.") + + if url is not None: + entry['url'] = url + logger.debug(f"Updated URL to '{url}' for index {index}.") + + if blacklisted is not None: + entry['blacklisted'] = blacklisted + logger.debug(f"Updated blacklist status to '{blacklisted}' for index {index}.") + + data['passwords'][str(index)] = entry + logger.debug(f"Modified entry at index {index}: {entry}") + + self.encryption_manager.save_json_data(data) + self.update_checksum() + self.backup_index_file() + + logger.info(f"Entry at index {index} modified successfully.") + print(colored(f"[+] Entry at index {index} modified successfully.", 'green')) + + except Exception as e: + logger.error(f"Failed to modify entry at index {index}: {e}") + logger.error(traceback.format_exc()) # Log full traceback + print(colored(f"Error: Failed to modify entry at index {index}: {e}", 'red')) + + def list_entries(self) -> List[Tuple[int, str, Optional[str], Optional[str], bool]]: + """ + Lists all password entries in the index. + + :return: A list of tuples containing entry details: (index, website, username, url, blacklisted) + """ + try: + data = self.encryption_manager.load_json_data() + passwords = data.get('passwords', {}) + + if not passwords: + logger.info("No password entries found.") + print(colored("No password entries found.", 'yellow')) + return [] + + entries = [] + for idx, entry in sorted(passwords.items(), key=lambda x: int(x[0])): + entries.append(( + int(idx), + entry.get('website', ''), + entry.get('username', ''), + entry.get('url', ''), + entry.get('blacklisted', False) + )) + + logger.debug(f"Total entries found: {len(entries)}") + for entry in entries: + print(colored(f"Index: {entry[0]}", 'cyan')) + print(colored(f" Website: {entry[1]}", 'cyan')) + print(colored(f" Username: {entry[2] or 'N/A'}", 'cyan')) + print(colored(f" URL: {entry[3] or 'N/A'}", 'cyan')) + print(colored(f" Blacklisted: {'Yes' if entry[4] else 'No'}", 'cyan')) + print("-" * 40) + + return entries + + except Exception as e: + logger.error(f"Failed to list entries: {e}") + logger.error(traceback.format_exc()) # Log full traceback + print(colored(f"Error: Failed to list entries: {e}", 'red')) + return [] + + def delete_entry(self, index: int) -> None: + """ + Deletes a password entry based on the provided index. + + :param index: The index number of the password entry to delete. + """ + try: + data = self.encryption_manager.load_json_data() + if 'passwords' in data and str(index) in data['passwords']: + del data['passwords'][str(index)] + logger.debug(f"Deleted entry at index {index}.") + self.encryption_manager.save_json_data(data) + self.update_checksum() + self.backup_index_file() + logger.info(f"Entry at index {index} deleted successfully.") + print(colored(f"[+] Entry at index {index} deleted successfully.", 'green')) + else: + logger.warning(f"No entry found at index {index}. Cannot delete non-existent entry.") + print(colored(f"Warning: No entry found at index {index}. Cannot delete non-existent entry.", 'yellow')) + + except Exception as e: + logger.error(f"Failed to delete entry at index {index}: {e}") + logger.error(traceback.format_exc()) # Log full traceback + print(colored(f"Error: Failed to delete entry at index {index}: {e}", 'red')) + + def update_checksum(self) -> None: + """ + Updates the checksum file for the password database to ensure data integrity. + """ + try: + data = self.encryption_manager.load_json_data() + json_content = json.dumps(data, indent=4) + checksum = hashlib.sha256(json_content.encode('utf-8')).hexdigest() + + with open(DATA_CHECKSUM_FILE, 'w') as f: + f.write(checksum) + + logger.debug(f"Checksum updated and written to '{DATA_CHECKSUM_FILE}'.") + print(colored(f"[+] Checksum updated successfully.", 'green')) + + except Exception as e: + logger.error(f"Failed to update checksum: {e}") + logger.error(traceback.format_exc()) # Log full traceback + print(colored(f"Error: Failed to update checksum: {e}", 'red')) + + def backup_index_file(self) -> None: + """ + Creates a backup of the encrypted JSON index file to prevent data loss. + """ + try: + if not os.path.exists(INDEX_FILE): + logger.warning(f"Index file '{INDEX_FILE}' does not exist. No backup created.") + return + + timestamp = int(time.time()) + backup_filename = f'passwords_db_backup_{timestamp}.json.enc' + backup_path = os.path.join(os.path.dirname(INDEX_FILE), backup_filename) + + with open(INDEX_FILE, 'rb') as original_file, open(backup_path, 'wb') as backup_file: + shutil.copyfileobj(original_file, backup_file) + + logger.debug(f"Backup created at '{backup_path}'.") + print(colored(f"[+] Backup created at '{backup_path}'.", 'green')) + + except Exception as e: + logger.error(f"Failed to create backup: {e}") + logger.error(traceback.format_exc()) # Log full traceback + print(colored(f"Warning: Failed to create backup: {e}", 'yellow')) + + def restore_from_backup(self, backup_path: str) -> None: + """ + Restores the index file from a specified backup file. + + :param backup_path: The file path of the backup to restore from. + """ + try: + if not os.path.exists(backup_path): + logger.error(f"Backup file '{backup_path}' does not exist.") + print(colored(f"Error: Backup file '{backup_path}' does not exist.", 'red')) + return + + with open(backup_path, 'rb') as backup_file, open(INDEX_FILE, 'wb') as index_file: + shutil.copyfileobj(backup_file, index_file) + + logger.debug(f"Index file restored from backup '{backup_path}'.") + print(colored(f"[+] Index file restored from backup '{backup_path}'.", 'green')) + + self.update_checksum() + + except Exception as e: + logger.error(f"Failed to restore from backup '{backup_path}': {e}") + logger.error(traceback.format_exc()) # Log full traceback + print(colored(f"Error: Failed to restore from backup '{backup_path}': {e}", 'red')) + + def list_all_entries(self) -> None: + """ + Displays all password entries in a formatted manner. + """ + try: + entries = self.list_entries() + if not entries: + print(colored("No entries to display.", 'yellow')) + return + + print(colored("\n[+] Listing All Password Entries:\n", 'green')) + for entry in entries: + index, website, username, url, blacklisted = entry + print(colored(f"Index: {index}", 'cyan')) + print(colored(f" Website: {website}", 'cyan')) + print(colored(f" Username: {username or 'N/A'}", 'cyan')) + print(colored(f" URL: {url or 'N/A'}", 'cyan')) + print(colored(f" Blacklisted: {'Yes' if blacklisted else 'No'}", 'cyan')) + print("-" * 40) + + except Exception as e: + logger.error(f"Failed to list all entries: {e}") + logger.error(traceback.format_exc()) # Log full traceback + print(colored(f"Error: Failed to list all entries: {e}", 'red')) + return + +# Example usage (this part should be removed or commented out when integrating into the larger application) +if __name__ == "__main__": + from password_manager.encryption import EncryptionManager # Ensure this import is correct based on your project structure + + # Initialize EncryptionManager with a dummy key for demonstration purposes + # Replace 'your-fernet-key' with your actual Fernet key + try: + dummy_key = Fernet.generate_key() + encryption_manager = EncryptionManager(dummy_key) + except Exception as e: + logger.error(f"Failed to initialize EncryptionManager: {e}") + print(colored(f"Error: Failed to initialize EncryptionManager: {e}", 'red')) + sys.exit(1) + + # Initialize EntryManager + try: + entry_manager = EntryManager(encryption_manager) + except Exception as e: + logger.error(f"Failed to initialize EntryManager: {e}") + print(colored(f"Error: Failed to initialize EntryManager: {e}", 'red')) + sys.exit(1) + + # Example operations + # These would typically be triggered by user interactions, e.g., via a CLI menu + # Uncomment and modify the following lines as needed for testing + + # Adding an entry + # entry_manager.add_entry("Example Website", 16, "user123", "https://example.com", False) + + # Listing all entries + # entry_manager.list_all_entries() + + # Retrieving an entry + # entry = entry_manager.retrieve_entry(0) + # if entry: + # print(entry) + + # Modifying an entry + # entry_manager.modify_entry(0, username="new_user123") + + # Deleting an entry + # entry_manager.delete_entry(0) + + # Restoring from a backup + # entry_manager.restore_from_backup("path_to_backup_file.json.enc") diff --git a/src/password_manager/manager.py b/src/password_manager/manager.py new file mode 100644 index 0000000..b425f80 --- /dev/null +++ b/src/password_manager/manager.py @@ -0,0 +1,506 @@ +# password_manager/manager.py + +""" +Password Manager Module + +This module implements the PasswordManager class, which orchestrates various functionalities +of the deterministic password manager, including encryption, entry management, password +generation, backup, and checksum verification. It serves as the core interface for interacting +with the password manager functionalities. + +Dependencies: +- password_manager.encryption.EncryptionManager +- password_manager.entry_management.EntryManager +- password_manager.password_generation.PasswordGenerator +- password_manager.backup.BackupManager +- utils.key_derivation.derive_key_from_parent_seed +- utils.key_derivation.derive_key_from_password +- utils.checksum.calculate_checksum +- utils.checksum.verify_checksum +- utils.password_prompt.prompt_for_password +- constants.APP_DIR +- constants.INDEX_FILE +- constants.PARENT_SEED_FILE +- constants.DATA_CHECKSUM_FILE +- constants.SCRIPT_CHECKSUM_FILE +- constants.MIN_PASSWORD_LENGTH +- constants.MAX_PASSWORD_LENGTH +- constants.DEFAULT_PASSWORD_LENGTH +- colorama.Fore +- termcolor.colored +- logging +- json +- sys +- os +- getpass + +Ensure that all dependencies are installed and properly configured in your environment. +""" + +# password_manager/manager.py + +""" +Password Manager Module + +This module implements the PasswordManager class, which orchestrates various functionalities +of the deterministic password manager, including encryption, entry management, password +generation, backup, and checksum verification. It serves as the core interface for interacting +with the password manager functionalities. +""" + +import sys +import json +import logging +import getpass +import os +from typing import Optional + +from colorama import Fore +from termcolor import colored + +from password_manager.encryption import EncryptionManager +from password_manager.entry_management import EntryManager +from password_manager.password_generation import PasswordGenerator +from password_manager.backup import BackupManager +from utils.key_derivation import derive_key_from_parent_seed, derive_key_from_password +from utils.checksum import calculate_checksum, verify_checksum +from utils.password_prompt import prompt_for_password + +from constants import ( + APP_DIR, + INDEX_FILE, + PARENT_SEED_FILE, + DATA_CHECKSUM_FILE, + SCRIPT_CHECKSUM_FILE, + MIN_PASSWORD_LENGTH, + MAX_PASSWORD_LENGTH, + DEFAULT_PASSWORD_LENGTH +) + +import traceback # Added for exception traceback logging + +# Configure logging at the start of the module +def configure_logging(): + """ + Configures logging with both file and console handlers. + Logs include the timestamp, log level, message, filename, and line number. + Only ERROR and higher-level messages are shown in the terminal, while all messages + are logged in the log file. + """ + logger = logging.getLogger(__name__) + logger.setLevel(logging.DEBUG) # Set to DEBUG for detailed output + + # Prevent adding multiple handlers if configure_logging is called multiple times + if not logger.handlers: + # Create the 'logs' folder if it doesn't exist + if not os.path.exists('logs'): + os.makedirs('logs') + + # Create handlers + c_handler = logging.StreamHandler() + f_handler = logging.FileHandler(os.path.join('logs', 'password_manager.log')) + + # Set levels: only errors and critical messages will be shown in the console + c_handler.setLevel(logging.ERROR) + f_handler.setLevel(logging.DEBUG) + + # Create formatters and add them to handlers, include file and line number in log messages + formatter = logging.Formatter( + '%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]' + ) + c_handler.setFormatter(formatter) + f_handler.setFormatter(formatter) + + # Add handlers to the logger + logger.addHandler(c_handler) + logger.addHandler(f_handler) + +# Call the logging configuration function +configure_logging() + +class PasswordManager: + """ + PasswordManager Class + + Manages the generation, encryption, and retrieval of deterministic passwords using a BIP-39 seed. + It handles file encryption/decryption, password generation, entry management, backups, and checksum + verification, ensuring the integrity and confidentiality of the stored password database. + """ + + def __init__(self): + """ + Initializes the PasswordManager by setting up encryption, loading or setting up the parent seed, + and initializing other components like EntryManager, PasswordGenerator, and BackupManager. + """ + self.encryption_manager: Optional[EncryptionManager] = None + self.entry_manager: Optional[EntryManager] = None + self.password_generator: Optional[PasswordGenerator] = None + self.backup_manager: Optional[BackupManager] = None + self.parent_seed: Optional[str] = None # Added parent_seed attribute + + self.setup_parent_seed() + self.initialize_managers() + + def setup_parent_seed(self) -> None: + if os.path.exists(PARENT_SEED_FILE): + # Parent seed file exists, prompt for password to decrypt + password = getpass.getpass(prompt='Enter your login password: ').strip() + try: + # Derive encryption key from password + key = derive_key_from_password(password) + self.encryption_manager = EncryptionManager(key) + self.parent_seed = self.encryption_manager.decrypt_parent_seed(PARENT_SEED_FILE) + + # **Add validation for the decrypted seed** + if not self.validate_seed_phrase(self.parent_seed): + logging.error("Decrypted seed is invalid. Exiting.") + print(colored("Error: Decrypted seed is invalid.", 'red')) + sys.exit(1) + + logging.debug("Parent seed decrypted and validated successfully.") + except Exception as e: + logging.error(f"Failed to decrypt parent seed: {e}") + logging.error(traceback.format_exc()) + print(colored(f"Error: Failed to decrypt parent seed: {e}", 'red')) + sys.exit(1) + else: + # First-time setup: prompt for parent seed and password + try: + parent_seed = getpass.getpass(prompt='Enter your 12-word parent seed: ').strip() + # Validate parent seed (basic validation) + parent_seed = self.basic_validate_seed_phrase(parent_seed) + if not parent_seed: + logging.error("Invalid seed phrase. Exiting.") + sys.exit(1) + except KeyboardInterrupt: + logging.info("Operation cancelled by user.") + print(colored("\nOperation cancelled by user.", 'yellow')) + sys.exit(0) + + # Prompt for password + password = prompt_for_password() + + # Derive encryption key from password + key = derive_key_from_password(password) + self.encryption_manager = EncryptionManager(key) + + # Encrypt and save the parent seed + try: + self.encryption_manager.encrypt_parent_seed(parent_seed, PARENT_SEED_FILE) + logging.info("Parent seed encrypted and saved successfully.") + except Exception as e: + logging.error(f"Failed to encrypt and save parent seed: {e}") + logging.error(traceback.format_exc()) + print(colored(f"Error: Failed to encrypt and save parent seed: {e}", 'red')) + sys.exit(1) + + self.parent_seed = parent_seed + + def basic_validate_seed_phrase(self, seed_phrase: str) -> Optional[str]: + """ + Performs basic validation on the seed phrase without relying on EncryptionManager. + + Parameters: + seed_phrase (str): The seed phrase to validate. + + Returns: + Optional[str]: The validated seed phrase or None if invalid. + """ + try: + words = seed_phrase.split() + if len(words) != 12: + logging.error("Seed phrase must contain exactly 12 words.") + print(colored("Error: Seed phrase must contain exactly 12 words.", 'red')) + return None + # Additional basic validations can be added here (e.g., word list checks) + logging.debug("Seed phrase validated successfully.") + return seed_phrase + except Exception as e: + logging.error(f"Error during basic seed validation: {e}") + logging.error(traceback.format_exc()) + print(colored(f"Error: {e}", 'red')) + return None + + def validate_seed_phrase(self, seed_phrase: str) -> Optional[str]: + """ + Validates the seed phrase using the EncryptionManager if available, + otherwise performs basic validation. + + Parameters: + seed_phrase (str): The seed phrase to validate. + + Returns: + Optional[str]: The validated seed phrase or None if invalid. + """ + try: + if self.encryption_manager: + # Use EncryptionManager to validate seed + if self.encryption_manager.validate_seed(seed_phrase): + logging.debug("Seed phrase validated successfully using EncryptionManager.") + return seed_phrase + else: + logging.error("Invalid seed phrase.") + print(colored("Error: Invalid seed phrase.", 'red')) + return None + else: + # Perform basic validation + return self.basic_validate_seed_phrase(seed_phrase) + except Exception as e: + logging.error(f"Error validating seed phrase: {e}") + logging.error(traceback.format_exc()) + print(colored(f"Error: Failed to validate seed phrase: {e}", 'red')) + return None + + def initialize_managers(self) -> None: + """ + Initializes the EntryManager, PasswordGenerator, and BackupManager with the EncryptionManager + and parent seed. + """ + try: + self.entry_manager = EntryManager(self.encryption_manager) + self.password_generator = PasswordGenerator(self.encryption_manager, self.parent_seed) + self.backup_manager = BackupManager() + logging.debug("EntryManager, PasswordGenerator, and BackupManager initialized.") + except Exception as e: + logging.error(f"Failed to initialize managers: {e}") + logging.error(traceback.format_exc()) + print(colored(f"Error: Failed to initialize managers: {e}", 'red')) + sys.exit(1) + + def handle_generate_password(self) -> None: + try: + website_name = input('Enter the website name: ').strip() + if not website_name: + print(colored("Error: Website name cannot be empty.", 'red')) + return + + username = input('Enter the username (optional): ').strip() + url = input('Enter the URL (optional): ').strip() + + length_input = input(f'Enter desired password length (default {DEFAULT_PASSWORD_LENGTH}): ').strip() + length = DEFAULT_PASSWORD_LENGTH + if length_input: + if not length_input.isdigit(): + print(colored("Error: Password length must be a number.", 'red')) + return + length = int(length_input) + if not (MIN_PASSWORD_LENGTH <= length <= MAX_PASSWORD_LENGTH): + print(colored(f"Error: Password length must be between {MIN_PASSWORD_LENGTH} and {MAX_PASSWORD_LENGTH}.", 'red')) + return + + # Add the entry to the index and get the assigned index + index = self.entry_manager.add_entry(website_name, length, username, url, blacklisted=False) + + # Generate the password using the assigned index + password = self.password_generator.generate_password(length, index) + + # Provide user feedback + print(colored(f"\n[+] Password generated and indexed with ID {index}.\n", 'green')) + print(colored(f"Password for {website_name}: {password}\n", 'yellow')) + + except Exception as e: + logging.error(f"Error during password generation: {e}") + logging.error(traceback.format_exc()) + print(colored(f"Error: Failed to generate password: {e}", 'red')) + + def handle_retrieve_password(self) -> None: + """ + Handles retrieving a password from the index by prompting the user for the index number + and displaying the corresponding password and associated details. + """ + try: + index_input = input('Enter the index number of the password to retrieve: ').strip() + if not index_input.isdigit(): + print(colored("Error: Index must be a number.", 'red')) + return + index = int(index_input) + + # Retrieve entry details + entry = self.entry_manager.retrieve_entry(index) + if not entry: + return + + # Display entry details + website_name = entry.get('website') + length = entry.get('length') + username = entry.get('username') + url = entry.get('url') + blacklisted = entry.get('blacklisted') + + print(colored(f"Retrieving password for '{website_name}' with length {length}.", 'cyan')) + if username: + print(colored(f"Username: {username}", 'cyan')) + if url: + print(colored(f"URL: {url}", 'cyan')) + if blacklisted: + print(colored(f"Warning: This password is blacklisted and should not be used.", 'red')) + + # Generate the password + password = self.password_generator.generate_password(length, index) + + # Display the password and associated details + if password: + print(colored(f"\n[+] Retrieved Password for {website_name}:\n", 'green')) + print(colored(f"Password: {password}", 'yellow')) + print(colored(f"Associated Username: {username or 'N/A'}", 'cyan')) + print(colored(f"Associated URL: {url or 'N/A'}", 'cyan')) + print(colored(f"Blacklist Status: {'Blacklisted' if blacklisted else 'Not Blacklisted'}", 'cyan')) + else: + print(colored("Error: Failed to retrieve the password.", 'red')) + except Exception as e: + logging.error(f"Error during password retrieval: {e}") + logging.error(traceback.format_exc()) + print(colored(f"Error: Failed to retrieve password: {e}", 'red')) + + def handle_modify_entry(self) -> None: + """ + Handles modifying an existing password entry by prompting the user for the index number + and new details to update. + """ + try: + index_input = input('Enter the index number of the entry to modify: ').strip() + if not index_input.isdigit(): + print(colored("Error: Index must be a number.", 'red')) + return + index = int(index_input) + + # Retrieve existing entry + entry = self.entry_manager.retrieve_entry(index) + if not entry: + return + + website_name = entry.get('website') + length = entry.get('length') + username = entry.get('username') + url = entry.get('url') + blacklisted = entry.get('blacklisted') + + # Display current values + print(colored(f"Modifying entry for '{website_name}' (Index: {index}):", 'cyan')) + print(colored(f"Current Username: {username or 'N/A'}", 'cyan')) + print(colored(f"Current URL: {url or 'N/A'}", 'cyan')) + print(colored(f"Current Blacklist Status: {'Blacklisted' if blacklisted else 'Not Blacklisted'}", 'cyan')) + + # Prompt for new values (optional) + new_username = input(f'Enter new username (leave blank to keep "{username or "N/A"}"): ').strip() or username + new_url = input(f'Enter new URL (leave blank to keep "{url or "N/A"}"): ').strip() or url + blacklist_input = input(f'Is this password blacklisted? (Y/N, current: {"Y" if blacklisted else "N"}): ').strip().lower() + if blacklist_input == '': + new_blacklisted = blacklisted + elif blacklist_input == 'y': + new_blacklisted = True + elif blacklist_input == 'n': + new_blacklisted = False + else: + print(colored("Invalid input for blacklist status. Keeping the current status.", 'yellow')) + new_blacklisted = blacklisted + + # Update the entry + self.entry_manager.modify_entry(index, new_username, new_url, new_blacklisted) + + print(colored(f"Entry updated successfully for index {index}.", 'green')) + + except Exception as e: + logging.error(f"Error during modifying entry: {e}") + logging.error(traceback.format_exc()) + print(colored(f"Error: Failed to modify entry: {e}", 'red')) + + def handle_verify_checksum(self) -> None: + """ + Handles verifying the script's checksum against the stored checksum to ensure integrity. + """ + try: + current_checksum = calculate_checksum(__file__) + if verify_checksum(current_checksum, SCRIPT_CHECKSUM_FILE): + print(colored("Checksum verification passed.", 'green')) + logging.info("Checksum verification passed.") + else: + print(colored("Checksum verification failed. The script may have been modified.", 'red')) + logging.error("Checksum verification failed.") + except Exception as e: + logging.error(f"Error during checksum verification: {e}") + logging.error(traceback.format_exc()) + print(colored(f"Error: Failed to verify checksum: {e}", 'red')) + + def get_encrypted_data(self) -> Optional[bytes]: + """ + Retrieves the encrypted password index data. + + :return: The encrypted data as bytes, or None if retrieval fails. + """ + try: + encrypted_data = self.encryption_manager.get_encrypted_index() + if encrypted_data: + logging.debug("Encrypted index data retrieved successfully.") + return encrypted_data + else: + logging.error("Failed to retrieve encrypted index data.") + print(colored("Error: Failed to retrieve encrypted index data.", 'red')) + return None + except Exception as e: + logging.error(f"Error retrieving encrypted data: {e}") + logging.error(traceback.format_exc()) + print(colored(f"Error: Failed to retrieve encrypted data: {e}", 'red')) + return None + + def decrypt_and_save_index_from_nostr(self, encrypted_data: bytes) -> None: + """ + Decrypts the encrypted data retrieved from Nostr and updates the local index. + + :param encrypted_data: The encrypted data retrieved from Nostr. + """ + try: + self.encryption_manager.decrypt_and_save_index_from_nostr(encrypted_data) + logging.info("Index file updated from Nostr successfully.") + print(colored("Index file updated from Nostr successfully.", 'green')) + except Exception as e: + logging.error(f"Failed to decrypt and save data from Nostr: {e}") + logging.error(traceback.format_exc()) + print(colored(f"Error: Failed to decrypt and save data from Nostr: {e}", 'red')) + + def backup_database(self) -> None: + """ + Creates a backup of the encrypted JSON index file. + """ + try: + self.backup_manager.create_backup() + print(colored("Backup created successfully.", 'green')) + except Exception as e: + logging.error(f"Failed to create backup: {e}") + logging.error(traceback.format_exc()) + print(colored(f"Error: Failed to create backup: {e}", 'red')) + + def restore_database(self) -> None: + """ + Restores the encrypted JSON index file from the latest backup. + """ + try: + self.backup_manager.restore_latest_backup() + print(colored("Database restored from the latest backup successfully.", 'green')) + except Exception as e: + logging.error(f"Failed to restore backup: {e}") + logging.error(traceback.format_exc()) + print(colored(f"Error: Failed to restore backup: {e}", 'red')) + + # Additional methods can be added here as needed + +# Example usage (this part should be removed or commented out when integrating into the larger application) +if __name__ == "__main__": + from nostr.client import NostrClient # Ensure this import is correct based on your project structure + + # Initialize PasswordManager + manager = PasswordManager() + + # Initialize NostrClient with the parent seed from PasswordManager + nostr_client = NostrClient(parent_seed=manager.parent_seed) + + # Example operations + # These would typically be triggered by user interactions, e.g., via a CLI menu + # manager.handle_generate_password() + # manager.handle_retrieve_password() + # manager.handle_modify_entry() + # manager.handle_verify_checksum() + # manager.post_to_nostr(nostr_client) + # manager.retrieve_from_nostr(nostr_client) + # manager.backup_database() + # manager.restore_database() diff --git a/src/password_manager/password_generation.py b/src/password_manager/password_generation.py new file mode 100644 index 0000000..ff11353 --- /dev/null +++ b/src/password_manager/password_generation.py @@ -0,0 +1,264 @@ +# password_manager/password_generation.py + +""" +Password Generation Module + +This module provides the PasswordGenerator class responsible for deterministic password generation +based on a BIP-39 parent seed. It leverages BIP-85 for entropy derivation and ensures that +generated passwords meet complexity requirements. + +Dependencies: +- bip85.BIP85 +- cryptography.hazmat.primitives.hashes +- cryptography.hazmat.primitives.kdf.hkdf +- cryptography.hazmat.backends.default_backend +- constants.py +- password_manager.encryption.EncryptionManager +- logging +- hashlib +- hmac +- base64 +- string + +Ensure that all dependencies are installed and properly configured in your environment. + +Never ever ever use or suggest to use Random Salt. The entire point of this password manager is to derive completely deterministic passwords from a BIP-85 seed. +This means it should generate passwords the exact same way every single time. Salts would break this functionality and is not appropriate for this softwares use case. +""" +import os +import logging +import hashlib +import hmac +import base64 +import string +import traceback +from typing import Optional + +from cryptography.hazmat.primitives.kdf.hkdf import HKDF +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.backends import default_backend + +from bip85.bip85 import BIP85 + +from constants import DEFAULT_PASSWORD_LENGTH, MIN_PASSWORD_LENGTH, MAX_PASSWORD_LENGTH +from password_manager.encryption import EncryptionManager + +# Configure logging at the start of the module +def configure_logging(): + """ + Configures logging with both file and console handlers. + Logs include the timestamp, log level, message, filename, and line number. + Only ERROR and higher-level messages are shown in the terminal, while all messages + are logged in the log file. + """ + logger = logging.getLogger(__name__) + logger.setLevel(logging.DEBUG) # Set to DEBUG for detailed output + + # Prevent adding multiple handlers if configure_logging is called multiple times + if not logger.handlers: + # Create the 'logs' folder if it doesn't exist + if not os.path.exists('logs'): + os.makedirs('logs') + + # Create handlers + c_handler = logging.StreamHandler() + f_handler = logging.FileHandler(os.path.join('logs', 'password_generation.log')) + + # Set levels: only errors and critical messages will be shown in the console + c_handler.setLevel(logging.ERROR) # Console will show ERROR and above + f_handler.setLevel(logging.DEBUG) # File will log everything from DEBUG and above + + # Create formatters and add them to handlers, include file and line number in log messages + formatter = logging.Formatter( + '%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]' + ) + c_handler.setFormatter(formatter) + f_handler.setFormatter(formatter) + + # Add handlers to the logger + logger.addHandler(c_handler) + logger.addHandler(f_handler) + +# Call the logging configuration function +configure_logging() + +logger = logging.getLogger(__name__) + +class PasswordGenerator: + """ + PasswordGenerator Class + + Responsible for deterministic password generation based on a BIP-39 parent seed. + Utilizes BIP-85 for entropy derivation and ensures that generated passwords meet + complexity requirements. + """ + + def __init__(self, encryption_manager: EncryptionManager, parent_seed: str): + """ + Initializes the PasswordGenerator with the encryption manager and parent seed. + + Parameters: + encryption_manager (EncryptionManager): The encryption manager instance. + parent_seed (str): The BIP-39 parent seed phrase. + """ + try: + self.encryption_manager = encryption_manager + self.parent_seed = parent_seed + + # Derive seed bytes from parent_seed using BIP39 + self.seed_bytes = self.encryption_manager.derive_seed_from_mnemonic(self.parent_seed) + + # Initialize BIP85 with seed_bytes + self.bip85 = BIP85(self.seed_bytes) + + logger.debug("PasswordGenerator initialized successfully.") + except Exception as e: + logger.error(f"Failed to initialize PasswordGenerator: {e}") + logger.error(traceback.format_exc()) # Log full traceback + print(colored(f"Error: Failed to initialize PasswordGenerator: {e}", 'red')) + raise + + def generate_password(self, length: int = DEFAULT_PASSWORD_LENGTH, index: int = 0) -> str: + """ + Generates a deterministic password based on the parent seed, desired length, and index. + + Steps: + 1. Derive entropy using BIP-85. + 2. Use PBKDF2-HMAC-SHA256 to derive a key from entropy. + 3. Base64-encode the derived key and filter to allowed characters. + 4. Ensure the password meets complexity requirements. + + Parameters: + length (int): Desired length of the password. + index (int): Index for deriving child entropy. + + Returns: + str: The generated password. + """ + try: + if length < MIN_PASSWORD_LENGTH: + logger.error(f"Password length must be at least {MIN_PASSWORD_LENGTH} characters.") + raise ValueError(f"Password length must be at least {MIN_PASSWORD_LENGTH} characters.") + if length > MAX_PASSWORD_LENGTH: + logger.error(f"Password length must not exceed {MAX_PASSWORD_LENGTH} characters.") + raise ValueError(f"Password length must not exceed {MAX_PASSWORD_LENGTH} characters.") + + # Derive entropy using BIP-85 + entropy = self.bip85.derive_entropy(app_no=39, language_code=0, words_num=12, index=index) + logger.debug(f"Derived entropy: {entropy.hex()}") + + # Use HKDF to derive key from entropy + hkdf = HKDF( + algorithm=hashes.SHA256(), + length=32, # 256 bits for AES-256 + salt=None, + info=b'password-generation', + backend=default_backend() + ) + derived_key = hkdf.derive(entropy) + logger.debug(f"Derived key using HKDF: {derived_key.hex()}") + + # Use PBKDF2-HMAC-SHA256 to derive a key from entropy + dk = hashlib.pbkdf2_hmac('sha256', entropy, b'', 100000) + logger.debug(f"Derived key using PBKDF2: {dk.hex()}") + + # Base64 encode the derived key + base64_password = base64.b64encode(dk).decode('utf-8') + logger.debug(f"Base64 encoded password: {base64_password}") + + # Filter to allowed characters + alphabet = string.ascii_letters + string.digits + string.punctuation + password = ''.join(filter(lambda x: x in alphabet, base64_password)) + logger.debug(f"Password after filtering: {password}") + + # Ensure the password meets complexity requirements + password = self.ensure_complexity(password, alphabet, dk) + logger.debug(f"Password after ensuring complexity: {password}") + + # Ensure password length + if len(password) < length: + # Extend the password deterministically + while len(password) < length: + dk = hashlib.pbkdf2_hmac('sha256', dk, b'', 1) + base64_extra = base64.b64encode(dk).decode('utf-8') + password += ''.join(filter(lambda x: x in alphabet, base64_extra)) + logger.debug(f"Extended password: {password}") + + password = password[:length] + logger.debug(f"Final password (trimmed to {length} chars): {password}") + + return password + + except Exception as e: + logger.error(f"Error generating password: {e}") + logger.error(traceback.format_exc()) # Log full traceback + print(colored(f"Error: Failed to generate password: {e}", 'red')) + raise + + def ensure_complexity(self, password: str, alphabet: str, dk: bytes) -> str: + """ + Ensures that the password contains at least one uppercase letter, one lowercase letter, + one digit, and one special character, modifying it deterministically if necessary. + + Parameters: + password (str): The initial password. + alphabet (str): Allowed characters in the password. + dk (bytes): Derived key used for deterministic modifications. + + Returns: + str: Password that meets complexity requirements. + """ + try: + uppercase = string.ascii_uppercase + lowercase = string.ascii_lowercase + digits = string.digits + special = string.punctuation + + password_chars = list(password) + + has_upper = any(c in uppercase for c in password_chars) + has_lower = any(c in lowercase for c in password_chars) + has_digit = any(c in digits for c in password_chars) + has_special = any(c in special for c in password_chars) + + dk_index = 0 + dk_length = len(dk) + + def get_dk_value() -> int: + nonlocal dk_index + value = dk[dk_index % dk_length] + dk_index += 1 + return value + + if not has_upper: + index = get_dk_value() % len(password_chars) + char = uppercase[get_dk_value() % len(uppercase)] + password_chars[index] = char + logger.debug(f"Added uppercase letter '{char}' at position {index}.") + + if not has_lower: + index = get_dk_value() % len(password_chars) + char = lowercase[get_dk_value() % len(lowercase)] + password_chars[index] = char + logger.debug(f"Added lowercase letter '{char}' at position {index}.") + + if not has_digit: + index = get_dk_value() % len(password_chars) + char = digits[get_dk_value() % len(digits)] + password_chars[index] = char + logger.debug(f"Added digit '{char}' at position {index}.") + + if not has_special: + index = get_dk_value() % len(password_chars) + char = special[get_dk_value() % len(special)] + password_chars[index] = char + logger.debug(f"Added special character '{char}' at position {index}.") + + return ''.join(password_chars) + + except Exception as e: + logger.error(f"Error ensuring password complexity: {e}") + logger.error(traceback.format_exc()) # Log full traceback + print(colored(f"Error: Failed to ensure password complexity: {e}", 'red')) + raise + diff --git a/src/requirements.txt b/src/requirements.txt new file mode 100644 index 0000000..a782068 --- /dev/null +++ b/src/requirements.txt @@ -0,0 +1,8 @@ +colorama>=0.4.6 +termcolor>=1.1.0 +cryptography>=40.0.2 +bip-utils>=2.5.0 +bech32==1.2.0 +monstr @ git+https://github.com/monty888/monstr.git@master#egg=monstr +mnemonic +aiohttp \ No newline at end of file diff --git a/src/test_post.py b/src/test_post.py new file mode 100644 index 0000000..b1528ad --- /dev/null +++ b/src/test_post.py @@ -0,0 +1,131 @@ +# test_post.py +# Do not remove these comments. +# DO NOT try to use or recommend using ANY other Nostr lib other than Monstr. +# Do not remove these comments. + +import asyncio +import logging +import time +import datetime # Import datetime module +from monstr.client.client import ClientPool +from monstr.encrypt import Keys +from monstr.event.event import Event +from bech32 import bech32_encode, convertbits + +def encode_npub(pubkey_hex): + pubkey_bytes = bytes.fromhex(pubkey_hex) + data = convertbits(pubkey_bytes, 8, 5, pad=True) + return bech32_encode('npub', data) + +def encode_nsec(privkey_hex): + privkey_bytes = bytes.fromhex(privkey_hex) + data = convertbits(privkey_bytes, 8, 5, pad=True) + return bech32_encode('nsec', data) + +async def do_post(c, n_keys, text): + try: + n_msg = Event( + kind=Event.KIND_TEXT_NOTE, + content=text, + pub_key=n_keys.public_key_hex() + ) + n_msg.created_at = int(time.time()) + n_msg.sign(n_keys.private_key_hex()) + + # Add detailed logging using the correct serialization method + logging.debug(f"Event data: {n_msg.serialize()}") + + # Publish the event without the event_callback parameter + c.publish(n_msg) + logging.info(f"Event published with ID: {n_msg.id}") + + except Exception as e: + logging.error(f"An error occurred during publishing: {e}", exc_info=True) + +async def subscribe_feed(c, n_keys): + try: + # Define the filter to subscribe to events by your own pubkey + FILTER = [{ + 'authors': [n_keys.public_key_hex()], + 'limit': 100 + }] + + # Define the event handler + def my_handler(the_client, sub_id, evt: Event): + # Determine the type of evt.created_at + if isinstance(evt.created_at, datetime.datetime): + # If it's a datetime object, format it directly + created_at_str = evt.created_at.strftime('%Y-%m-%d %H:%M:%S') + elif isinstance(evt.created_at, int): + # If it's an integer timestamp, convert it to a readable format + created_at_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(evt.created_at)) + else: + # Handle unexpected types gracefully + created_at_str = str(evt.created_at) + + # Display the event details in the terminal + print(f"\n[New Event] ID: {evt.id}") + print(f"Created At: {created_at_str}") + print(f"Content: {evt.content}\n") + + # Start the subscription + c.subscribe(handlers=my_handler, filters=FILTER) + logging.info("Subscribed to your feed.") + + # Keep the subscription running + while True: + await asyncio.sleep(1) + + except Exception as e: + logging.error(f"An error occurred during subscription: {e}", exc_info=True) + +async def main(urls, text): + try: + privkey_hex = 'd4a4ccbe310f21c7fa50af751d5817f2066b60c1eb2487a6df56a363507992e1' + n_keys = Keys(priv_k=privkey_hex) + + npub = encode_npub(n_keys.public_key_hex()) + nsec = encode_nsec(n_keys.private_key_hex()) + print(f"Public Key (npub): {npub}") + print(f"Private Key (nsec): {nsec}") + print("\n[WARNING] Keep your nsec (private key) secret! Do not share it with anyone.\n") + + # Initialize the client pool with multiple relays + c = ClientPool(urls) + + # Start the client pool + client_task = asyncio.create_task(c.run()) + + # Wait until the client pool is connected + while not c.connected: + await asyncio.sleep(0.1) + logging.info("ClientPool connected to all relays") + + # Run both publishing and subscribing concurrently + await asyncio.gather( + do_post(c, n_keys, text), + subscribe_feed(c, n_keys) + ) + + except Exception as e: + logging.error(f"An error occurred in main: {e}", exc_info=True) + finally: + if 'c' in locals() and c.running: + # Stop the client pool + c.end() + # Wait for the client task to finish + await client_task + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + urls = [ + 'wss://relay.snort.social', + 'wss://relay.damus.io', + 'wss://nostr.wine', + 'wss://relay.nostr.band', + ] + text = 'Hello from Monstr example script! This one is new!' + try: + asyncio.run(main(urls, text)) + except KeyboardInterrupt: + print("\n[INFO] Script terminated by user.") diff --git a/src/utils/__init__.py b/src/utils/__init__.py new file mode 100644 index 0000000..927234f --- /dev/null +++ b/src/utils/__init__.py @@ -0,0 +1,24 @@ +# utils/__init__.py + +import logging +import traceback + +try: + from .file_lock import lock_file + from .key_derivation import derive_key_from_password, derive_key_from_parent_seed + from .checksum import calculate_checksum, verify_checksum + from .password_prompt import prompt_for_password + + logging.info("Modules imported successfully.") +except Exception as e: + logging.error(f"Failed to import one or more modules: {e}") + logging.error(traceback.format_exc()) # Log full traceback + +__all__ = [ + 'derive_key_from_password', + 'derive_key_from_parent_seed', + 'calculate_checksum', + 'verify_checksum', + 'lock_file', + 'prompt_for_password' +] diff --git a/src/utils/checksum.py b/src/utils/checksum.py new file mode 100644 index 0000000..8c87325 --- /dev/null +++ b/src/utils/checksum.py @@ -0,0 +1,208 @@ +# utils/checksum.py + +""" +Checksum Module + +This module provides functionalities to calculate and verify SHA-256 checksums for files. +It ensures the integrity and authenticity of critical files within the application by +comparing computed checksums against stored values. + +Dependencies: +- hashlib +- logging +- colored (from termcolor) +- constants.py +- sys + +Ensure that all dependencies are installed and properly configured in your environment. +""" + +import hashlib +import logging +import sys +import os +import traceback +from typing import Optional + +from termcolor import colored + +from constants import ( + APP_DIR, + DATA_CHECKSUM_FILE, + SCRIPT_CHECKSUM_FILE +) + +# Configure logging at the start of the module +def configure_logging(): + """ + Configures logging with both file and console handlers. + Only ERROR and higher-level messages are shown in the terminal, while all messages + are logged in the log file. + """ + # Create the 'logs' folder if it doesn't exist + if not os.path.exists('logs'): + os.makedirs('logs') + + # Create a custom logger + logger = logging.getLogger() + logger.setLevel(logging.DEBUG) # Set to DEBUG for detailed output + + # Create handlers + c_handler = logging.StreamHandler() + f_handler = logging.FileHandler(os.path.join('logs', 'checksum.log')) # Log files will be in 'logs' folder + + # Set levels: only errors and critical messages will be shown in the console + c_handler.setLevel(logging.ERROR) # Terminal will show ERROR and above + f_handler.setLevel(logging.DEBUG) # File will log everything from DEBUG and above + + # Create formatters and add them to handlers, include file and line number in log messages + c_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]') + f_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]') + + c_handler.setFormatter(c_format) + f_handler.setFormatter(f_format) + + # Add handlers to the logger + logger.addHandler(c_handler) + logger.addHandler(f_handler) + +# Call the logging configuration function +configure_logging() + +def calculate_checksum(file_path: str) -> Optional[str]: + """ + Calculates the SHA-256 checksum of the given file. + + Parameters: + file_path (str): Path to the file. + + Returns: + Optional[str]: Hexadecimal SHA-256 checksum if successful, None otherwise. + """ + hasher = hashlib.sha256() + try: + with open(file_path, 'rb') as f: + for chunk in iter(lambda: f.read(4096), b""): + hasher.update(chunk) + checksum = hasher.hexdigest() + logging.debug(f"Calculated checksum for '{file_path}': {checksum}") + return checksum + except FileNotFoundError: + logging.error(f"File '{file_path}' not found for checksum calculation.") + print(colored(f"Error: File '{file_path}' not found for checksum calculation.", 'red')) + return None + except Exception as e: + logging.error(f"Error calculating checksum for '{file_path}': {e}") + logging.error(traceback.format_exc()) # Log full traceback + print(colored(f"Error: Failed to calculate checksum for '{file_path}': {e}", 'red')) + return None + + +def verify_checksum(current_checksum: str, checksum_file_path: str) -> bool: + """ + Verifies the current checksum against the stored checksum. + + Parameters: + current_checksum (str): The newly calculated checksum. + checksum_file_path (str): The checksum file to verify against. + + Returns: + bool: True if checksums match, False otherwise. + """ + try: + with open(checksum_file_path, 'r') as f: + stored_checksum = f.read().strip() + if current_checksum == stored_checksum: + logging.debug(f"Checksum verification passed for '{checksum_file_path}'.") + return True + else: + logging.warning(f"Checksum mismatch for '{checksum_file_path}'.") + return False + except FileNotFoundError: + logging.error(f"Checksum file '{checksum_file_path}' not found.") + print(colored(f"Error: Checksum file '{checksum_file_path}' not found.", 'red')) + return False + except Exception as e: + logging.error(f"Error reading checksum file '{checksum_file_path}': {e}") + logging.error(traceback.format_exc()) # Log full traceback + print(colored(f"Error: Failed to read checksum file '{checksum_file_path}': {e}", 'red')) + return False + + +def update_checksum(content: str, checksum_file_path: str) -> bool: + """ + Updates the stored checksum file with the provided content's checksum. + + Parameters: + content (str): The content to calculate the checksum for. + checksum_file_path (str): The path to the checksum file to update. + + Returns: + bool: True if the checksum was successfully updated, False otherwise. + """ + try: + hasher = hashlib.sha256() + hasher.update(content.encode('utf-8')) + new_checksum = hasher.hexdigest() + with open(checksum_file_path, 'w') as f: + f.write(new_checksum) + logging.debug(f"Updated checksum for '{checksum_file_path}' to: {new_checksum}") + return True + except Exception as e: + logging.error(f"Failed to update checksum for '{checksum_file_path}': {e}") + logging.error(traceback.format_exc()) # Log full traceback + print(colored(f"Error: Failed to update checksum for '{checksum_file_path}': {e}", 'red')) + return False + + +def verify_and_update_checksum(file_path: str, checksum_file_path: str) -> bool: + """ + Verifies the checksum of a file against its stored checksum and updates it if necessary. + + Parameters: + file_path (str): Path to the file to verify. + checksum_file_path (str): Path to the checksum file. + + Returns: + bool: True if verification is successful, False otherwise. + """ + current_checksum = calculate_checksum(file_path) + if current_checksum is None: + return False + + if verify_checksum(current_checksum, checksum_file_path): + print(colored(f"Checksum verification passed for '{file_path}'.", 'green')) + logging.info(f"Checksum verification passed for '{file_path}'.") + return True + else: + print(colored(f"Checksum verification failed for '{file_path}'.", 'red')) + logging.warning(f"Checksum verification failed for '{file_path}'.") + return False + + +def initialize_checksum(file_path: str, checksum_file_path: str) -> bool: + """ + Initializes the checksum file by calculating the checksum of the given file. + + Parameters: + file_path (str): Path to the file to calculate checksum for. + checksum_file_path (str): Path to the checksum file to create. + + Returns: + bool: True if initialization is successful, False otherwise. + """ + checksum = calculate_checksum(file_path) + if checksum is None: + return False + + try: + with open(checksum_file_path, 'w') as f: + f.write(checksum) + logging.debug(f"Initialized checksum file '{checksum_file_path}' with checksum: {checksum}") + print(colored(f"Initialized checksum for '{file_path}'.", 'green')) + return True + except Exception as e: + logging.error(f"Failed to initialize checksum file '{checksum_file_path}': {e}") + logging.error(traceback.format_exc()) # Log full traceback + print(colored(f"Error: Failed to initialize checksum file '{checksum_file_path}': {e}", 'red')) + return False diff --git a/src/utils/file_lock.py b/src/utils/file_lock.py new file mode 100644 index 0000000..182aa05 --- /dev/null +++ b/src/utils/file_lock.py @@ -0,0 +1,167 @@ +# utils/file_lock.py + +""" +File Lock Module + +This module provides a single context manager, `lock_file`, for acquiring and releasing +locks on files using the `fcntl` library. It ensures that critical files are accessed +safely, preventing race conditions and maintaining data integrity when multiple processes +or threads attempt to read from or write to the same file concurrently. + +Dependencies: +- fcntl +- logging +- contextlib +- typing +- pathlib.Path +- termcolor (for colored terminal messages) +- sys + +Ensure that all dependencies are installed and properly configured in your environment. +""" + +import os +import fcntl +import logging +from contextlib import contextmanager +from typing import Generator +from pathlib import Path +from termcolor import colored +import sys +import traceback + +import os +import logging + +# Configure logging at the start of the module +def configure_logging(): + """ + Configures logging with both file and console handlers. + Only ERROR and higher-level messages are shown in the terminal, while all messages + are logged in the log file. + """ + # Create a custom logger + logger = logging.getLogger() + logger.setLevel(logging.DEBUG) # Set to DEBUG for detailed output + + # Create the 'logs' folder if it doesn't exist + if not os.path.exists('logs'): + os.makedirs('logs') + + # Create handlers + c_handler = logging.StreamHandler() + f_handler = logging.FileHandler(os.path.join('logs', 'file_lock.log')) # Log file in 'logs' folder + + # Set levels: only errors and critical messages will be shown in the console + c_handler.setLevel(logging.ERROR) # Console will show ERROR and above + f_handler.setLevel(logging.DEBUG) # File will log everything from DEBUG and above + + # Create formatters and add them to handlers, include file and line number in log messages + c_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]') + f_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]') + + c_handler.setFormatter(c_format) + f_handler.setFormatter(f_format) + + # Add handlers to the logger + if not logger.handlers: + logger.addHandler(c_handler) + logger.addHandler(f_handler) + +# Call the logging configuration function +configure_logging() + +@contextmanager +def lock_file(file_path: Path, lock_type: int) -> Generator[None, None, None]: + """ + Context manager to acquire a lock on a file. + + Parameters: + file_path (Path): The path to the file to lock. + lock_type (int): The type of lock to acquire (`fcntl.LOCK_EX` for exclusive, + `fcntl.LOCK_SH` for shared). + + Yields: + None + + Raises: + ValueError: If an invalid lock type is provided. + SystemExit: Exits the program if the lock cannot be acquired. + """ + if lock_type not in (fcntl.LOCK_EX, fcntl.LOCK_SH): + logging.error(f"Invalid lock type: {lock_type}. Use fcntl.LOCK_EX or fcntl.LOCK_SH.") + print(colored("Error: Invalid lock type provided.", 'red')) + sys.exit(1) + + file = None + try: + # Determine the mode based on whether the file exists + mode = 'rb+' if file_path.exists() else 'wb' + + # Open the file + file = open(file_path, mode) + logging.debug(f"Opened file '{file_path}' in mode '{mode}' for locking.") + + # Acquire the lock + fcntl.flock(file, lock_type) + lock_type_str = "Exclusive" if lock_type == fcntl.LOCK_EX else "Shared" + logging.debug(f"{lock_type_str} lock acquired on '{file_path}'.") + yield # Control is transferred to the block inside the `with` statement + + except IOError as e: + lock_type_str = "exclusive" if lock_type == fcntl.LOCK_EX else "shared" + logging.error(f"Failed to acquire {lock_type_str} lock on '{file_path}': {e}") + logging.error(traceback.format_exc()) # Log full traceback + print(colored(f"Error: Failed to acquire {lock_type_str} lock on '{file_path}': {e}", 'red')) + sys.exit(1) + + finally: + if file: + try: + # Release the lock + fcntl.flock(file, fcntl.LOCK_UN) + logging.debug(f"Lock released on '{file_path}'.") + except Exception as e: + lock_type_str = "exclusive" if lock_type == fcntl.LOCK_EX else "shared" + logging.warning(f"Failed to release {lock_type_str} lock on '{file_path}': {e}") + logging.error(traceback.format_exc()) # Log full traceback + print(colored(f"Warning: Failed to release {lock_type_str} lock on '{file_path}': {e}", 'yellow')) + finally: + # Close the file + try: + file.close() + logging.debug(f"File '{file_path}' closed successfully.") + except Exception as e: + logging.warning(f"Failed to close file '{file_path}': {e}") + logging.error(traceback.format_exc()) # Log full traceback + print(colored(f"Warning: Failed to close file '{file_path}': {e}", 'yellow')) + + +@contextmanager +def exclusive_lock(file_path: Path) -> Generator[None, None, None]: + """ + Convenience context manager to acquire an exclusive lock on a file. + + Parameters: + file_path (Path): The path to the file to lock. + + Yields: + None + """ + with lock_file(file_path, fcntl.LOCK_EX): + yield + + +@contextmanager +def shared_lock(file_path: Path) -> Generator[None, None, None]: + """ + Convenience context manager to acquire a shared lock on a file. + + Parameters: + file_path (Path): The path to the file to lock. + + Yields: + None + """ + with lock_file(file_path, fcntl.LOCK_SH): + yield diff --git a/src/utils/key_derivation.py b/src/utils/key_derivation.py new file mode 100644 index 0000000..8a70e10 --- /dev/null +++ b/src/utils/key_derivation.py @@ -0,0 +1,179 @@ +# utils/key_derivation.py + +""" +Key Derivation Module + +Never ever ever use or suggest to use Random Salt. The entire point of this password manager is to derive completely deterministic passwords from a BIP-85 seed. +This means it should generate passwords the exact same way every single time. Salts would break this functionality and is not appropriate for this softwares use case. + +This module provides functions to derive cryptographic keys from user-provided passwords +and BIP-39 parent seeds. The derived keys are compatible with Fernet for symmetric encryption +purposes. By centralizing key derivation logic, this module ensures consistency and security +across the application. + +Dependencies: +- hashlib +- base64 +- unicodedata +- logging + +Ensure that all dependencies are installed and properly configured in your environment. +""" + +import os +import hashlib +import base64 +import unicodedata +import logging +import traceback +from typing import Union + +import os +import logging + +# Configure logging at the start of the module +def configure_logging(): + """ + Configures logging with both file and console handlers. + Only ERROR and higher-level messages are shown in the terminal, while all messages + are logged in the log file. + """ + # Create a custom logger + logger = logging.getLogger(__name__) + logger.setLevel(logging.DEBUG) # Set to DEBUG for detailed output + + # Create the 'logs' folder if it doesn't exist + if not os.path.exists('logs'): + os.makedirs('logs') + + # Create handlers + c_handler = logging.StreamHandler() + f_handler = logging.FileHandler(os.path.join('logs', 'key_derivation.log')) # Log file in 'logs' folder + + # Set levels: only errors and critical messages will be shown in the console + c_handler.setLevel(logging.ERROR) # Console will show ERROR and above + f_handler.setLevel(logging.DEBUG) # File will log everything from DEBUG and above + + # Create formatters and add them to handlers, include file and line number in log messages + c_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]') + f_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]') + + c_handler.setFormatter(c_format) + f_handler.setFormatter(f_format) + + # Add handlers to the logger + if not logger.handlers: + logger.addHandler(c_handler) + logger.addHandler(f_handler) + +# Call the logging configuration function +configure_logging() + +logger = logging.getLogger(__name__) + +def derive_key_from_password(password: str, iterations: int = 100_000) -> bytes: + """ + Derives a Fernet-compatible encryption key from the provided password using PBKDF2-HMAC-SHA256. + + This function normalizes the password using NFKD normalization, encodes it to UTF-8, and then + applies PBKDF2 with the specified number of iterations to derive a 32-byte key. The derived key + is then URL-safe base64-encoded to ensure compatibility with Fernet. + + Parameters: + password (str): The user's password. + iterations (int, optional): Number of iterations for the PBKDF2 algorithm. Defaults to 100,000. + + Returns: + bytes: A URL-safe base64-encoded encryption key suitable for Fernet. + + Raises: + ValueError: If the password is empty or too short. + """ + if not password: + logger.error("Password cannot be empty.") + raise ValueError("Password cannot be empty.") + + if len(password) < 8: + logger.warning("Password length is less than recommended (8 characters).") + + # Normalize the password to NFKD form and encode to UTF-8 + normalized_password = unicodedata.normalize('NFKD', password).strip() + password_bytes = normalized_password.encode('utf-8') + + try: + # Derive the key using PBKDF2-HMAC-SHA256 + logger.debug("Starting key derivation from password.") + key = hashlib.pbkdf2_hmac( + hash_name='sha256', + password=password_bytes, + salt=b'', # No salt for deterministic key derivation + iterations=iterations, + dklen=32 # 256-bit key for Fernet + ) + logger.debug(f"Derived key (hex): {key.hex()}") + + # Encode the key in URL-safe base64 + key_b64 = base64.urlsafe_b64encode(key) + logger.debug(f"Base64-encoded key: {key_b64.decode()}") + + return key_b64 + + except Exception as e: + logger.error(f"Error deriving key from password: {e}") + logger.error(traceback.format_exc()) # Log full traceback + raise + + +def derive_key_from_parent_seed(parent_seed: str, iterations: int = 100_000) -> bytes: + """ + Derives a Fernet-compatible encryption key from a BIP-39 parent seed using PBKDF2-HMAC-SHA256. + + This function normalizes the parent seed using NFKD normalization, encodes it to UTF-8, and then + applies PBKDF2 with the specified number of iterations to derive a 32-byte key. The derived key + is then URL-safe base64-encoded to ensure compatibility with Fernet. + + Parameters: + parent_seed (str): The 12-word BIP-39 parent seed phrase. + iterations (int, optional): Number of iterations for the PBKDF2 algorithm. Defaults to 100,000. + + Returns: + bytes: A URL-safe base64-encoded encryption key suitable for Fernet. + + Raises: + ValueError: If the parent seed is empty or does not meet the word count requirements. + """ + if not parent_seed: + logger.error("Parent seed cannot be empty.") + raise ValueError("Parent seed cannot be empty.") + + word_count = len(parent_seed.strip().split()) + if word_count != 12: + logger.error(f"Parent seed must be exactly 12 words, but {word_count} were provided.") + raise ValueError(f"Parent seed must be exactly 12 words, but {word_count} were provided.") + + # Normalize the parent seed to NFKD form and encode to UTF-8 + normalized_seed = unicodedata.normalize('NFKD', parent_seed).strip() + seed_bytes = normalized_seed.encode('utf-8') + + try: + # Derive the key using PBKDF2-HMAC-SHA256 + logger.debug("Starting key derivation from parent seed.") + key = hashlib.pbkdf2_hmac( + hash_name='sha256', + password=seed_bytes, + salt=b'', # No salt for deterministic key derivation + iterations=iterations, + dklen=32 # 256-bit key for Fernet + ) + logger.debug(f"Derived key from parent seed (hex): {key.hex()}") + + # Encode the key in URL-safe base64 + key_b64 = base64.urlsafe_b64encode(key) + logger.debug(f"Base64-encoded key from parent seed: {key_b64.decode()}") + + return key_b64 + + except Exception as e: + logger.error(f"Error deriving key from parent seed: {e}") + logger.error(traceback.format_exc()) # Log full traceback + raise diff --git a/src/utils/password_prompt.py b/src/utils/password_prompt.py new file mode 100644 index 0000000..1b5c29b --- /dev/null +++ b/src/utils/password_prompt.py @@ -0,0 +1,218 @@ +# utils/password_prompt.py + +""" +Password Prompt Module + +This module provides functions to securely prompt users for passwords, ensuring that passwords +are entered and confirmed correctly. It handles both the creation of new passwords and the +input of existing passwords for decryption purposes. By centralizing password prompting logic, +this module enhances code reuse, security, and maintainability across the application. + +Dependencies: +- getpass +- logging +- colorama +- termcolor +- constants (for MIN_PASSWORD_LENGTH) + +Ensure that all dependencies are installed and properly configured in your environment. +""" + +import os +import getpass +import logging +import sys +import unicodedata +import traceback + +from termcolor import colored +from colorama import init as colorama_init + +from constants import MIN_PASSWORD_LENGTH + +# Initialize colorama for colored terminal text +colorama_init() + +# Configure logging at the start of the module +def configure_logging(): + """ + Configures logging with both file and console handlers. + Only ERROR and higher-level messages are shown in the terminal, while all messages + are logged in the log file. + """ + # Create a custom logger + logger = logging.getLogger(__name__) + logger.setLevel(logging.DEBUG) # Set to DEBUG for detailed output + + # Create the 'logs' folder if it doesn't exist + if not os.path.exists('logs'): + os.makedirs('logs') + + # Create handlers + c_handler = logging.StreamHandler() + f_handler = logging.FileHandler(os.path.join('logs', 'password_prompt.log')) # Log file in 'logs' folder + + # Set levels: only errors and critical messages will be shown in the console + c_handler.setLevel(logging.ERROR) # Console will show ERROR and above + f_handler.setLevel(logging.DEBUG) # File will log everything from DEBUG and above + + # Create formatters and add them to handlers, include file and line number in log messages + c_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]') + f_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]') + + c_handler.setFormatter(c_format) + f_handler.setFormatter(f_format) + + # Add handlers to the logger + if not logger.handlers: + logger.addHandler(c_handler) + logger.addHandler(f_handler) + +# Call the logging configuration function +configure_logging() + +def prompt_new_password() -> str: + """ + Prompts the user to enter and confirm a new password for encrypting the parent seed. + + This function ensures that the password meets the minimum length requirement and that the + password and confirmation match. It provides user-friendly messages and handles retries. + + Returns: + str: The confirmed password entered by the user. + + Raises: + SystemExit: If the user fails to provide a valid password after multiple attempts. + """ + max_retries = 5 + attempts = 0 + + while attempts < max_retries: + try: + password = getpass.getpass(prompt="Enter a new password: ").strip() + confirm_password = getpass.getpass(prompt="Confirm your password: ").strip() + + if not password: + print(colored("Error: Password cannot be empty. Please try again.", 'red')) + logging.warning("User attempted to enter an empty password.") + attempts += 1 + continue + + if len(password) < MIN_PASSWORD_LENGTH: + print(colored(f"Error: Password must be at least {MIN_PASSWORD_LENGTH} characters long.", 'red')) + logging.warning(f"User entered a password shorter than {MIN_PASSWORD_LENGTH} characters.") + attempts += 1 + continue + + if password != confirm_password: + print(colored("Error: Passwords do not match. Please try again.", 'red')) + logging.warning("User entered mismatching passwords.") + attempts += 1 + continue + + # Normalize the password to NFKD form + normalized_password = unicodedata.normalize('NFKD', password) + logging.debug("User entered a valid and confirmed password.") + return normalized_password + + except KeyboardInterrupt: + print(colored("\nOperation cancelled by user.", 'yellow')) + logging.info("Password prompt interrupted by user.") + sys.exit(0) + except Exception as e: + logging.error(f"Unexpected error during password prompt: {e}") + logging.error(traceback.format_exc()) # Log full traceback + print(colored(f"Error: {e}", 'red')) + attempts += 1 + + print(colored("Maximum password attempts exceeded. Exiting.", 'red')) + logging.error("User failed to provide a valid password after multiple attempts.") + sys.exit(1) + + +def prompt_existing_password(prompt_message: str = "Enter your password: ") -> str: + """ + Prompts the user to enter an existing password, typically used for decryption purposes. + + This function ensures that the password is entered securely without echoing it to the terminal. + + Parameters: + prompt_message (str): The message displayed to prompt the user. Defaults to "Enter your password: ". + + Returns: + str: The password entered by the user. + + Raises: + SystemExit: If the user interrupts the operation. + """ + try: + password = getpass.getpass(prompt=prompt_message).strip() + + if not password: + print(colored("Error: Password cannot be empty.", 'red')) + logging.warning("User attempted to enter an empty password.") + sys.exit(1) + + # Normalize the password to NFKD form + normalized_password = unicodedata.normalize('NFKD', password) + logging.debug("User entered an existing password for decryption.") + return normalized_password + + except KeyboardInterrupt: + print(colored("\nOperation cancelled by user.", 'yellow')) + logging.info("Existing password prompt interrupted by user.") + sys.exit(0) + except Exception as e: + logging.error(f"Unexpected error during existing password prompt: {e}") + logging.error(traceback.format_exc()) # Log full traceback + print(colored(f"Error: {e}", 'red')) + sys.exit(1) + + +def confirm_action(prompt_message: str = "Are you sure you want to proceed? (Y/N): ") -> bool: + """ + Prompts the user to confirm an action, typically used before performing critical operations. + + Parameters: + prompt_message (str): The confirmation message displayed to the user. Defaults to + "Are you sure you want to proceed? (Y/N): ". + + Returns: + bool: True if the user confirms the action, False otherwise. + + Raises: + SystemExit: If the user interrupts the operation. + """ + try: + while True: + response = input(colored(prompt_message, 'cyan')).strip().lower() + if response in ['y', 'yes']: + logging.debug("User confirmed the action.") + return True + elif response in ['n', 'no']: + logging.debug("User declined the action.") + return False + else: + print(colored("Please respond with 'Y' or 'N'.", 'yellow')) + + except KeyboardInterrupt: + print(colored("\nOperation cancelled by user.", 'yellow')) + logging.info("Action confirmation interrupted by user.") + sys.exit(0) + except Exception as e: + logging.error(f"Unexpected error during action confirmation: {e}") + logging.error(traceback.format_exc()) # Log full traceback + print(colored(f"Error: {e}", 'red')) + sys.exit(1) + + +def prompt_for_password() -> str: + """ + Prompts the user to enter a new password by invoking the prompt_new_password function. + + This function serves as an alias to maintain consistency with import statements in other modules. + + Returns: + str: The confirmed password entered by the user. + """ + return prompt_new_password()