diff --git a/.github/workflows/python-ci.yml b/.github/workflows/python-ci.yml index e962843..2404994 100644 --- a/.github/workflows/python-ci.yml +++ b/.github/workflows/python-ci.yml @@ -2,9 +2,9 @@ name: CI on: push: - branches: [ "main" ] + branches: [ "**" ] pull_request: - branches: [ "main" ] + branches: [ "**" ] jobs: build: @@ -14,9 +14,24 @@ jobs: - uses: actions/setup-python@v4 with: python-version: '3.11' - - name: Install dependencies + - name: Cache pip + uses: actions/cache@v3 + with: + path: ~/.cache/pip + key: ${{ runner.os }}-pip-${{ hashFiles('src/requirements.txt') }} + restore-keys: | + ${{ runner.os }}-pip- + - name: Set up Python dependencies + id: deps run: | python -m pip install --upgrade pip pip install -r src/requirements.txt - - name: Test with pytest - run: pytest -q src/tests + - name: Run tests with coverage + run: | + pytest --cov=src --cov-report=xml --cov-report=term-missing \ + --cov-fail-under=20 src/tests + - name: Upload coverage report + uses: actions/upload-artifact@v4 + with: + name: coverage-xml + path: coverage.xml diff --git a/.gitignore b/.gitignore index 9c52ede..cd141a1 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,7 @@ Thumbs.db # Python env .env *.env + +# Coverage files +.coverage +coverage.xml diff --git a/README.md b/README.md index 33292cf..3846529 100644 --- a/README.md +++ b/README.md @@ -122,22 +122,13 @@ python src/main.py ``` Select an option: - 1. Generate a New Password and Add to Index - 2. Retrieve a Password from Index + 1. Add Entry + 2. Retrieve Entry 3. Modify an Existing Entry - 4. Verify Script Checksum - 5. Post Encrypted Index to Nostr - 6. Retrieve Encrypted Index from Nostr - 7. Display Nostr Public Key (npub) - 8. Backup/Reveal Parent Seed - 9. Switch Seed Profile - 10. Add a New Seed Profile - 11. Remove an Existing Seed Profile - 12. List All Seed Profiles - 13. Settings - 14. Exit + 4. Settings + 5. Exit - Enter your choice (1-14): + Enter your choice (1-5): ``` ### Managing Multiple Seeds @@ -145,18 +136,18 @@ python src/main.py SeedPass allows you to manage multiple seed profiles (previously referred to as "fingerprints"). Each seed profile has its own parent seed and associated data, enabling you to compartmentalize your passwords. - **Add a New Seed Profile:** - - Select option `10` from the main menu. + - From the main menu, select **Settings** then **Profiles** and choose "Add a New Seed Profile". - Choose to enter an existing seed or generate a new one. - If generating a new seed, you'll be provided with a 12-word BIP-85 seed phrase. **Ensure you write this down and store it securely.** - **Switch Between Seed Profiles:** - - Select option `9` from the main menu. + - From the **Profiles** menu, select "Switch Seed Profile". - You'll see a list of available seed profiles. - Enter the number corresponding to the seed profile you wish to switch to. - Enter the master password associated with that seed profile. - **List All Seed Profiles:** - - Select option `12` from the main menu to view all existing seed profiles. + - In the **Profiles** menu, choose "List All Seed Profiles" to view all existing profiles. **Note:** The term "seed profile" is used to represent different sets of seeds you can manage within SeedPass. This provides an intuitive way to handle multiple identities or sets of passwords. @@ -172,13 +163,16 @@ wss://relay.primal.net You can manage the relay list or change the PIN through the **Settings** menu: -1. From the main menu, choose option `13` (**Settings**). -2. Select `1` to view your current relays. -3. Choose `2` to add a new relay URL. -4. Select `3` to remove a relay by number. -5. Choose `4` to reset to the default relay list. -6. Select `5` to change the settings PIN. -7. Choose `6` to return to the main menu. +1. From the main menu, choose option `4` (**Settings**). +2. Select `2` (**Nostr**) to open the Nostr submenu. +3. Choose `3` to view your current relays. +4. Select `4` to add a new relay URL. +5. Choose `5` to remove a relay by number. +6. Select `6` to reset to the default relay list. +7. Choose `7` to display your Nostr public key. +8. Select `8` to return to the Settings menu. +9. From the Settings menu you can select `3` to change the settings PIN. +10. Choose `4` to verify the script checksum or `5` to back up the parent seed. ## Running Tests diff --git a/landing/_pgbackup/index_1730048875.html b/landing/_pgbackup/index_1730048875.html index 9b2cf41..1f72ff4 100644 --- a/landing/_pgbackup/index_1730048875.html +++ b/landing/_pgbackup/index_1730048875.html @@ -121,21 +121,19 @@ Enter your master password: Fingerprint 31DD880A523B9759 selected and managers initialized. Select an option: - 1. Generate a New Password and Add to Index - 2. Retrieve a Password from Index + 1. Generate Password + 2. Retrieve Password 3. Modify an Existing Entry - 4. Verify Script Checksum - 5. Post Encrypted Index to Nostr - 6. Retrieve Encrypted Index from Nostr - 7. Display Nostr Public Key (npub) - 8. Backup/Reveal Parent Seed - 9. Switch Fingerprint - 10. Add a New Fingerprint - 11. Remove an Existing Fingerprint - 12. List All Fingerprints - 13. Exit + 4. Backup to Nostr + 5. Restore from Nostr + 6. Switch Fingerprint + 7. Add a New Fingerprint + 8. Remove an Existing Fingerprint + 9. List All Fingerprints + 10. Settings + 11. Exit -Enter your choice (1-13): 1 +Enter your choice (1-11): 1 Enter the website name: newsitename Enter the username (optional): Enter the URL (optional): diff --git a/landing/_pgbackup/index_1730049869.html b/landing/_pgbackup/index_1730049869.html index ecd8e40..a0ef4cb 100644 --- a/landing/_pgbackup/index_1730049869.html +++ b/landing/_pgbackup/index_1730049869.html @@ -118,21 +118,19 @@ Enter your master password: Fingerprint 31DD880A523B9759 selected and managers initialized. Select an option: - 1. Generate a New Password and Add to Index - 2. Retrieve a Password from Index + 1. Generate Password + 2. Retrieve Password 3. Modify an Existing Entry - 4. Verify Script Checksum - 5. Post Encrypted Index to Nostr - 6. Retrieve Encrypted Index from Nostr - 7. Display Nostr Public Key (npub) - 8. Backup/Reveal Parent Seed - 9. Switch Fingerprint - 10. Add a New Fingerprint - 11. Remove an Existing Fingerprint - 12. List All Fingerprints - 13. Exit + 4. Backup to Nostr + 5. Restore from Nostr + 6. Switch Fingerprint + 7. Add a New Fingerprint + 8. Remove an Existing Fingerprint + 9. List All Fingerprints + 10. Settings + 11. Exit -Enter your choice (1-13): 1 +Enter your choice (1-11): 1 Enter the website name: newsitename Enter the username (optional): Enter the URL (optional): diff --git a/landing/index.html b/landing/index.html index 353e1c6..7f8df1f 100644 --- a/landing/index.html +++ b/landing/index.html @@ -113,21 +113,19 @@ Enter your master password: Fingerprint 31DD880A523B9759 selected and managers initialized. Select an option: - 1. Generate a New Password and Add to Index - 2. Retrieve a Password from Index + 1. Add Entry + 2. Retrieve Entry 3. Modify an Existing Entry - 4. Verify Script Checksum - 5. Post Encrypted Index to Nostr - 6. Retrieve Encrypted Index from Nostr - 7. Display Nostr Public Key (npub) - 8. Backup/Reveal Parent Seed - 9. Switch Fingerprint - 10. Add a New Fingerprint - 11. Remove an Existing Fingerprint - 12. List All Fingerprints - 13. Exit + 4. Backup to Nostr + 5. Restore from Nostr + 6. Switch Fingerprint + 7. Add a New Fingerprint + 8. Remove an Existing Fingerprint + 9. List All Fingerprints + 10. Settings + 11. Exit -Enter your choice (1-13): 1 +Enter your choice (1-11): 1 Enter the website name: newsitename Enter the username (optional): Enter the URL (optional): diff --git a/src/constants.py b/src/constants.py index c38488a..577236d 100644 --- a/src/constants.py +++ b/src/constants.py @@ -12,14 +12,14 @@ logger = logging.getLogger(__name__) # ----------------------------------- # Nostr Relay Connection Settings # ----------------------------------- -MAX_RETRIES = 3 # Maximum number of retries for relay connections -RETRY_DELAY = 5 # Seconds to wait before retrying a failed connection +MAX_RETRIES = 3 # Maximum number of retries for relay connections +RETRY_DELAY = 5 # Seconds to wait before retrying a failed connection try: # ----------------------------------- # Application Directory and Paths # ----------------------------------- - APP_DIR = Path.home() / '.seedpass' + APP_DIR = Path.home() / ".seedpass" APP_DIR.mkdir(exist_ok=True, parents=True) # Ensure the directory exists logging.info(f"Application directory created at {APP_DIR}") except Exception as e: @@ -27,7 +27,7 @@ except Exception as e: logging.error(traceback.format_exc()) # Log full traceback try: - PARENT_SEED_FILE = APP_DIR / 'parent_seed.enc' # Encrypted parent seed + PARENT_SEED_FILE = APP_DIR / "parent_seed.enc" # Encrypted parent seed logging.info(f"Parent seed file path set to {PARENT_SEED_FILE}") except Exception as e: logging.error(f"Error setting file paths: {e}") @@ -37,7 +37,9 @@ except Exception as e: # Checksum Files for Integrity # ----------------------------------- try: - SCRIPT_CHECKSUM_FILE = APP_DIR / 'seedpass_script_checksum.txt' # Checksum for main script + SCRIPT_CHECKSUM_FILE = ( + APP_DIR / "seedpass_script_checksum.txt" + ) # Checksum for main script logging.info(f"Checksum file path set: Script {SCRIPT_CHECKSUM_FILE}") except Exception as e: logging.error(f"Error setting checksum file paths: {e}") @@ -46,12 +48,12 @@ except Exception as e: # ----------------------------------- # Password Generation Constants # ----------------------------------- -DEFAULT_PASSWORD_LENGTH = 16 # Default length for generated passwords -MIN_PASSWORD_LENGTH = 8 # Minimum allowed password length -MAX_PASSWORD_LENGTH = 128 # Maximum allowed password length +DEFAULT_PASSWORD_LENGTH = 16 # Default length for generated passwords +MIN_PASSWORD_LENGTH = 8 # Minimum allowed password length +MAX_PASSWORD_LENGTH = 128 # Maximum allowed password length # ----------------------------------- # Additional Constants (if any) # ----------------------------------- # Add any other constants here as your project expands -DEFAULT_SEED_BACKUP_FILENAME = 'parent_seed_backup.enc' +DEFAULT_SEED_BACKUP_FILENAME = "parent_seed_backup.enc" diff --git a/src/local_bip85/__init__.py b/src/local_bip85/__init__.py index cebca93..8823d1e 100644 --- a/src/local_bip85/__init__.py +++ b/src/local_bip85/__init__.py @@ -5,10 +5,10 @@ import traceback try: from .bip85 import BIP85 + logging.info("BIP85 module imported successfully.") except Exception as e: logging.error(f"Failed to import BIP85 module: {e}") logging.error(traceback.format_exc()) # Log full traceback -__all__ = ['BIP85'] - +__all__ = ["BIP85"] diff --git a/src/local_bip85/bip85.py b/src/local_bip85/bip85.py index 50f15f0..b863d6d 100644 --- a/src/local_bip85/bip85.py +++ b/src/local_bip85/bip85.py @@ -7,8 +7,8 @@ This module implements the BIP85 functionality for deterministic entropy and mne It provides the BIP85 class, which utilizes BIP32 and BIP39 standards to derive entropy and mnemonics from a given seed. Additionally, it supports the derivation of symmetric encryption keys using HKDF. -Never ever ever use or suggest to use Random Salt. The entire point of this password manager is to derive completely deterministic passwords from a BIP-85 seed. -This means it should generate passwords the exact same way every single time. Salts would break this functionality and is not appropriate for this softwares use case. +Never ever ever use or suggest to use Random Salt. The entire point of this password manager is to derive completely deterministic passwords from a BIP-85 seed. +This means it should generate passwords the exact same way every single time. Salts would break this functionality and is not appropriate for this softwares use case. Ensure that all dependencies are installed and properly configured in your environment. """ @@ -21,11 +21,7 @@ import os import traceback from colorama import Fore -from bip_utils import ( - Bip32Slip10Secp256k1, - Bip39MnemonicGenerator, - Bip39Languages -) +from bip_utils import Bip32Slip10Secp256k1, Bip39MnemonicGenerator, Bip39Languages from cryptography.hazmat.primitives.kdf.hkdf import HKDF from cryptography.hazmat.primitives import hashes @@ -34,6 +30,7 @@ from cryptography.hazmat.backends import default_backend # Instantiate the logger logger = logging.getLogger(__name__) + class BIP85: def __init__(self, seed_bytes: bytes): try: @@ -80,8 +77,12 @@ class BIP85: entropy = hmac_result[:bytes_len] if len(entropy) != bytes_len: - logging.error(f"Derived entropy length is {len(entropy)} bytes; expected {bytes_len} bytes.") - print(f"{Fore.RED}Error: Derived entropy length is {len(entropy)} bytes; expected {bytes_len} bytes.") + logging.error( + f"Derived entropy length is {len(entropy)} bytes; expected {bytes_len} bytes." + ) + print( + f"{Fore.RED}Error: Derived entropy length is {len(entropy)} bytes; expected {bytes_len} bytes." + ) sys.exit(1) logging.debug(f"Derived entropy: {entropy.hex()}") @@ -101,7 +102,9 @@ class BIP85: entropy = self.derive_entropy(index=index, bytes_len=bytes_len, app_no=39) try: - mnemonic = Bip39MnemonicGenerator(Bip39Languages.ENGLISH).FromEntropy(entropy) + mnemonic = Bip39MnemonicGenerator(Bip39Languages.ENGLISH).FromEntropy( + entropy + ) logging.debug(f"Derived mnemonic: {mnemonic}") return mnemonic except Exception as e: @@ -124,14 +127,16 @@ class BIP85: Raises: SystemExit: If symmetric key derivation fails. """ - entropy = self.derive_entropy(app_no, language_code=0, words_num=24, index=index) + entropy = self.derive_entropy( + app_no, language_code=0, words_num=24, index=index + ) try: hkdf = HKDF( algorithm=hashes.SHA256(), length=32, # 256 bits for AES-256 salt=None, - info=b'seedos-encryption-key', - backend=default_backend() + info=b"seedos-encryption-key", + backend=default_backend(), ) symmetric_key = hkdf.derive(entropy) logging.debug(f"Derived symmetric key: {symmetric_key.hex()}") diff --git a/src/main.py b/src/main.py index 22889e4..782fb91 100644 --- a/src/main.py +++ b/src/main.py @@ -79,70 +79,70 @@ def handle_switch_fingerprint(password_manager: PasswordManager): if not fingerprints: print( colored( - "No fingerprints available to switch. Please add a new fingerprint first.", + "No seed profiles available to switch. Please add a new seed profile first.", "yellow", ) ) return - print(colored("Available Fingerprints:", "cyan")) + print(colored("Available Seed Profiles:", "cyan")) for idx, fp in enumerate(fingerprints, start=1): print(colored(f"{idx}. {fp}", "cyan")) - choice = input("Select a fingerprint by number to switch: ").strip() + choice = input("Select a seed profile by number to switch: ").strip() if not choice.isdigit() or not (1 <= int(choice) <= len(fingerprints)): print(colored("Invalid selection.", "red")) return selected_fingerprint = fingerprints[int(choice) - 1] if password_manager.select_fingerprint(selected_fingerprint): - print(colored(f"Switched to fingerprint {selected_fingerprint}.", "green")) + print(colored(f"Switched to seed profile {selected_fingerprint}.", "green")) else: - print(colored("Failed to switch fingerprint.", "red")) + print(colored("Failed to switch seed profile.", "red")) except Exception as e: logging.error(f"Error during fingerprint switch: {e}") logging.error(traceback.format_exc()) - print(colored(f"Error: Failed to switch fingerprint: {e}", "red")) + print(colored(f"Error: Failed to switch seed profile: {e}", "red")) def handle_add_new_fingerprint(password_manager: PasswordManager): """ - Handles adding a new fingerprint. + Handles adding a new seed profile. :param password_manager: An instance of PasswordManager. """ try: password_manager.add_new_fingerprint() except Exception as e: - logging.error(f"Error adding new fingerprint: {e}") + logging.error(f"Error adding new seed profile: {e}") logging.error(traceback.format_exc()) - print(colored(f"Error: Failed to add new fingerprint: {e}", "red")) + print(colored(f"Error: Failed to add new seed profile: {e}", "red")) def handle_remove_fingerprint(password_manager: PasswordManager): """ - Handles removing an existing fingerprint. + Handles removing an existing seed profile. :param password_manager: An instance of PasswordManager. """ try: fingerprints = password_manager.fingerprint_manager.list_fingerprints() if not fingerprints: - print(colored("No fingerprints available to remove.", "yellow")) + print(colored("No seed profiles available to remove.", "yellow")) return - print(colored("Available Fingerprints:", "cyan")) + print(colored("Available Seed Profiles:", "cyan")) for idx, fp in enumerate(fingerprints, start=1): print(colored(f"{idx}. {fp}", "cyan")) - choice = input("Select a fingerprint by number to remove: ").strip() + choice = input("Select a seed profile by number to remove: ").strip() if not choice.isdigit() or not (1 <= int(choice) <= len(fingerprints)): print(colored("Invalid selection.", "red")) return selected_fingerprint = fingerprints[int(choice) - 1] confirm = confirm_action( - f"Are you sure you want to remove fingerprint {selected_fingerprint}? This will delete all associated data. (Y/N): " + f"Are you sure you want to remove seed profile {selected_fingerprint}? This will delete all associated data. (Y/N): " ) if confirm: if password_manager.fingerprint_manager.remove_fingerprint( @@ -150,39 +150,39 @@ def handle_remove_fingerprint(password_manager: PasswordManager): ): print( colored( - f"Fingerprint {selected_fingerprint} removed successfully.", + f"Seed profile {selected_fingerprint} removed successfully.", "green", ) ) else: - print(colored("Failed to remove fingerprint.", "red")) + print(colored("Failed to remove seed profile.", "red")) else: - print(colored("Fingerprint removal cancelled.", "yellow")) + print(colored("Seed profile removal cancelled.", "yellow")) except Exception as e: - logging.error(f"Error removing fingerprint: {e}") + logging.error(f"Error removing seed profile: {e}") logging.error(traceback.format_exc()) - print(colored(f"Error: Failed to remove fingerprint: {e}", "red")) + print(colored(f"Error: Failed to remove seed profile: {e}", "red")) def handle_list_fingerprints(password_manager: PasswordManager): """ - Handles listing all available fingerprints. + Handles listing all available seed profiles. :param password_manager: An instance of PasswordManager. """ try: fingerprints = password_manager.fingerprint_manager.list_fingerprints() if not fingerprints: - print(colored("No fingerprints available.", "yellow")) + print(colored("No seed profiles available.", "yellow")) return - print(colored("Available Fingerprints:", "cyan")) + print(colored("Available Seed Profiles:", "cyan")) for fp in fingerprints: print(colored(f"- {fp}", "cyan")) except Exception as e: - logging.error(f"Error listing fingerprints: {e}") + logging.error(f"Error listing seed profiles: {e}") logging.error(traceback.format_exc()) - print(colored(f"Error: Failed to list fingerprints: {e}", "red")) + print(colored(f"Error: Failed to list seed profiles: {e}", "red")) def handle_display_npub(password_manager: PasswordManager): @@ -357,8 +357,33 @@ def handle_reset_relays(password_manager: PasswordManager) -> None: print(colored(f"Error: {e}", "red")) -def handle_settings(password_manager: PasswordManager) -> None: - """Interactive settings menu for relay list and password changes.""" +def handle_profiles_menu(password_manager: PasswordManager) -> None: + """Submenu for managing seed profiles.""" + while True: + print("\nProfiles:") + print("1. Switch Seed Profile") + print("2. Add a New Seed Profile") + print("3. Remove an Existing Seed Profile") + print("4. List All Seed Profiles") + print("5. Back") + choice = input("Select an option: ").strip() + if choice == "1": + if not password_manager.handle_switch_fingerprint(): + print(colored("Failed to switch seed profile.", "red")) + elif choice == "2": + handle_add_new_fingerprint(password_manager) + elif choice == "3": + handle_remove_fingerprint(password_manager) + elif choice == "4": + handle_list_fingerprints(password_manager) + elif choice == "5": + break + else: + print(colored("Invalid choice.", "red")) + + +def handle_nostr_menu(password_manager: PasswordManager) -> None: + """Submenu for Nostr-related actions and relay configuration.""" cfg_mgr = password_manager.config_manager if cfg_mgr is None: print(colored("Configuration manager unavailable.", "red")) @@ -369,25 +394,58 @@ def handle_settings(password_manager: PasswordManager) -> None: print(colored(f"Error loading settings: {e}", "red")) return + while True: + print("\nNostr Settings:") + print("1. Backup to Nostr") + print("2. Restore from Nostr") + print("3. View current relays") + print("4. Add a relay URL") + print("5. Remove a relay by number") + print("6. Reset to default relays") + print("7. Display Nostr Public Key") + print("8. Back") + choice = input("Select an option: ").strip() + if choice == "1": + handle_post_to_nostr(password_manager) + elif choice == "2": + handle_retrieve_from_nostr(password_manager) + elif choice == "3": + handle_view_relays(cfg_mgr) + elif choice == "4": + handle_add_relay(password_manager) + elif choice == "5": + handle_remove_relay(password_manager) + elif choice == "6": + handle_reset_relays(password_manager) + elif choice == "7": + handle_display_npub(password_manager) + elif choice == "8": + break + else: + print(colored("Invalid choice.", "red")) + + +def handle_settings(password_manager: PasswordManager) -> None: + """Interactive settings menu with submenus for profiles and Nostr.""" while True: print("\nSettings:") - print("1. View current relays") - print("2. Add a relay URL") - print("3. Remove a relay by number") - print("4. Reset to default relays") - print("5. Change password") + print("1. Profiles") + print("2. Nostr") + print("3. Change password") + print("4. Verify Script Checksum") + print("5. Backup Parent Seed") print("6. Back") choice = input("Select an option: ").strip() if choice == "1": - handle_view_relays(cfg_mgr) + handle_profiles_menu(password_manager) elif choice == "2": - handle_add_relay(password_manager) + handle_nostr_menu(password_manager) elif choice == "3": - handle_remove_relay(password_manager) - elif choice == "4": - handle_reset_relays(password_manager) - elif choice == "5": password_manager.change_password() + elif choice == "4": + password_manager.handle_verify_checksum() + elif choice == "5": + password_manager.handle_backup_reveal_parent_seed() elif choice == "6": break else: @@ -400,63 +458,46 @@ def display_menu(password_manager: PasswordManager): """ menu = """ Select an option: - 1. Generate a New Password and Add to Index - 2. Retrieve a Password from Index + 1. Add Entry + 2. Retrieve Entry 3. Modify an Existing Entry - 4. Verify Script Checksum - 5. Post Encrypted Index to Nostr - 6. Retrieve Encrypted Index from Nostr - 7. Display Nostr Public Key (npub) - 8. Backup/Reveal Parent Seed - 9. Switch Fingerprint - 10. Add a New Fingerprint - 11. Remove an Existing Fingerprint - 12. List All Fingerprints - 13. Settings - 14. Exit + 4. Settings + 5. Exit """ while True: # Flush logging handlers for handler in logging.getLogger().handlers: handler.flush() print(colored(menu, "cyan")) - choice = input("Enter your choice (1-14): ").strip() + choice = input("Enter your choice (1-5): ").strip() if not choice: print( colored( - "No input detected. Please enter a number between 1 and 14.", + "No input detected. Please enter a number between 1 and 5.", "yellow", ) ) continue # Re-display the menu without marking as invalid if choice == "1": - password_manager.handle_generate_password() + while True: + print("\nAdd Entry:") + print("1. Password") + print("2. Back") + sub_choice = input("Select entry type: ").strip() + if sub_choice == "1": + password_manager.handle_add_password() + break + elif sub_choice == "2": + break + else: + print(colored("Invalid choice.", "red")) elif choice == "2": - password_manager.handle_retrieve_password() + password_manager.handle_retrieve_entry() elif choice == "3": password_manager.handle_modify_entry() elif choice == "4": - password_manager.handle_verify_checksum() - elif choice == "5": - handle_post_to_nostr(password_manager) - elif choice == "6": - handle_retrieve_from_nostr(password_manager) - elif choice == "7": - handle_display_npub(password_manager) - elif choice == "8": - password_manager.handle_backup_reveal_parent_seed() - elif choice == "9": - if not password_manager.handle_switch_fingerprint(): - print(colored("Failed to switch fingerprint.", "red")) - elif choice == "10": - handle_add_new_fingerprint(password_manager) - elif choice == "11": - handle_remove_fingerprint(password_manager) - elif choice == "12": - handle_list_fingerprints(password_manager) - elif choice == "13": handle_settings(password_manager) - elif choice == "14": + elif choice == "5": logging.info("Exiting the program.") print(colored("Exiting the program.", "green")) password_manager.nostr_client.close_client_pool() diff --git a/src/nostr/__init__.py b/src/nostr/__init__.py index d258546..5f65186 100644 --- a/src/nostr/__init__.py +++ b/src/nostr/__init__.py @@ -12,9 +12,10 @@ logger = logging.getLogger(__name__) # Correct logger initialization try: from .client import NostrClient + logger.info("NostrClient module imported successfully.") except Exception as e: logger.error(f"Failed to import NostrClient module: {e}") logger.error(traceback.format_exc()) # Log full traceback -__all__ = ['NostrClient'] +__all__ = ["NostrClient"] diff --git a/src/nostr/client.py b/src/nostr/client.py index 417abce..1912137 100644 --- a/src/nostr/client.py +++ b/src/nostr/client.py @@ -34,13 +34,14 @@ logger.setLevel(logging.WARNING) DEFAULT_RELAYS = [ "wss://relay.snort.social", "wss://nostr.oxtr.dev", - "wss://relay.primal.net" + "wss://relay.primal.net", ] # nostr/client.py # src/nostr/client.py + class NostrClient: """ NostrClient Class @@ -49,9 +50,14 @@ class NostrClient: Utilizes deterministic key derivation via BIP-85 and integrates with the monstr library for protocol operations. """ - def __init__(self, encryption_manager: EncryptionManager, fingerprint: str, relays: Optional[List[str]] = None): + def __init__( + self, + encryption_manager: EncryptionManager, + fingerprint: str, + relays: Optional[List[str]] = None, + ): """ - Initializes the NostrClient with an EncryptionManager, connects to specified relays, + Initializes the NostrClient with an EncryptionManager, connects to specified relays, and sets up the KeyManager with the given fingerprint. :param encryption_manager: An instance of EncryptionManager for handling encryption/decryption. @@ -62,12 +68,13 @@ class NostrClient: # Assign the encryption manager and fingerprint self.encryption_manager = encryption_manager self.fingerprint = fingerprint # Track the fingerprint - self.fingerprint_dir = self.encryption_manager.fingerprint_dir # If needed to manage directories + self.fingerprint_dir = ( + self.encryption_manager.fingerprint_dir + ) # If needed to manage directories # Initialize KeyManager with the decrypted parent seed and the provided fingerprint self.key_manager = KeyManager( - self.encryption_manager.decrypt_parent_seed(), - self.fingerprint + self.encryption_manager.decrypt_parent_seed(), self.fingerprint ) # Initialize event handler and client pool @@ -126,7 +133,10 @@ class NostrClient: except Exception as e: logger.error(f"Error running event loop in thread: {e}") logger.error(traceback.format_exc()) - print(f"Error: Event loop in ClientPool thread encountered an issue: {e}", file=sys.stderr) + print( + f"Error: Event loop in ClientPool thread encountered an issue: {e}", + file=sys.stderr, + ) finally: if not self.loop.is_closed(): logger.debug("Closing the event loop.") @@ -166,14 +176,18 @@ class NostrClient: """ try: logger.debug(f"Submitting publish_event_async for event ID: {event.id}") - future = asyncio.run_coroutine_threadsafe(self.publish_event_async(event), self.loop) + future = asyncio.run_coroutine_threadsafe( + self.publish_event_async(event), self.loop + ) # Wait for the future to complete future.result(timeout=5) # Adjust the timeout as needed except Exception as e: logger.error(f"Error in publish_event: {e}") print(f"Error: Failed to publish event: {e}", file=sys.stderr) - async def subscribe_async(self, filters: List[dict], handler: Callable[[ClientPool, str, Event], None]): + async def subscribe_async( + self, filters: List[dict], handler: Callable[[ClientPool, str, Event], None] + ): """ Subscribes to events based on the provided filters using ClientPool. @@ -190,7 +204,9 @@ class NostrClient: logger.error(traceback.format_exc()) print(f"Error: Failed to subscribe: {e}", file=sys.stderr) - def subscribe(self, filters: List[dict], handler: Callable[[ClientPool, str, Event], None]): + def subscribe( + self, filters: List[dict], handler: Callable[[ClientPool, str, Event], None] + ): """ Synchronous wrapper for subscribing to events. @@ -198,7 +214,9 @@ class NostrClient: :param handler: A callback function to handle incoming events. """ try: - asyncio.run_coroutine_threadsafe(self.subscribe_async(filters, handler), self.loop) + asyncio.run_coroutine_threadsafe( + self.subscribe_async(filters, handler), self.loop + ) except Exception as e: logger.error(f"Error in subscribe: {e}") print(f"Error: Failed to subscribe: {e}", file=sys.stderr) @@ -210,11 +228,13 @@ class NostrClient: :return: The encrypted JSON data as a Base64-encoded string, or None if retrieval fails. """ try: - filters = [{ - 'authors': [self.key_manager.keys.public_key_hex()], - 'kinds': [Event.KIND_TEXT_NOTE, Event.KIND_ENCRYPT], - 'limit': 1 - }] + filters = [ + { + "authors": [self.key_manager.keys.public_key_hex()], + "kinds": [Event.KIND_TEXT_NOTE, Event.KIND_ENCRYPT], + "limit": 1, + } + ] events = [] @@ -238,7 +258,9 @@ class NostrClient: if event.kind == Event.KIND_ENCRYPT: nip4_encrypt = NIP4Encrypt(self.key_manager.keys) - content_base64 = nip4_encrypt.decrypt_message(event.content, event.pub_key) + content_base64 = nip4_encrypt.decrypt_message( + event.content, event.pub_key + ) # Return the Base64-encoded content as a string logger.debug("Encrypted JSON data retrieved successfully.") @@ -261,16 +283,21 @@ class NostrClient: :return: The encrypted JSON data as bytes, or None if retrieval fails. """ try: - future = asyncio.run_coroutine_threadsafe(self.retrieve_json_from_nostr_async(), self.loop) + future = asyncio.run_coroutine_threadsafe( + self.retrieve_json_from_nostr_async(), self.loop + ) return future.result(timeout=10) except concurrent.futures.TimeoutError: logger.error("Timeout occurred while retrieving JSON from Nostr.") - print("Error: Timeout occurred while retrieving JSON from Nostr.", file=sys.stderr) + print( + "Error: Timeout occurred while retrieving JSON from Nostr.", + file=sys.stderr, + ) return None except Exception as e: logger.error(f"Error in retrieve_json_from_nostr: {e}") logger.error(traceback.format_exc()) - print(f"Error: Failed to retrieve JSON from Nostr: {e}", 'red') + print(f"Error: Failed to retrieve JSON from Nostr: {e}", "red") return None async def do_post_async(self, text: str): @@ -283,7 +310,7 @@ class NostrClient: event = Event( kind=Event.KIND_TEXT_NOTE, content=text, - pub_key=self.key_manager.keys.public_key_hex() + pub_key=self.key_manager.keys.public_key_hex(), ) event.created_at = int(time.time()) event.sign(self.key_manager.keys.private_key_hex()) @@ -296,18 +323,22 @@ class NostrClient: logger.error(f"An error occurred during publishing: {e}", exc_info=True) print(f"Error: An error occurred during publishing: {e}", file=sys.stderr) - async def subscribe_feed_async(self, handler: Callable[[ClientPool, str, Event], None]): + async def subscribe_feed_async( + self, handler: Callable[[ClientPool, str, Event], None] + ): """ Subscribes to the feed of the client's own pubkey. :param handler: A callback function to handle incoming events. """ try: - filters = [{ - 'authors': [self.key_manager.keys.public_key_hex()], - 'kinds': [Event.KIND_TEXT_NOTE, Event.KIND_ENCRYPT], - 'limit': 100 - }] + filters = [ + { + "authors": [self.key_manager.keys.public_key_hex()], + "kinds": [Event.KIND_TEXT_NOTE, Event.KIND_ENCRYPT], + "limit": 100, + } + ] await self.subscribe_async(filters=filters, handler=handler) logger.info("Subscribed to your feed.") @@ -327,11 +358,16 @@ class NostrClient: try: await asyncio.gather( self.do_post_async(text), - self.subscribe_feed_async(self.event_handler.handle_new_event) + self.subscribe_feed_async(self.event_handler.handle_new_event), ) except Exception as e: - logger.error(f"An error occurred in publish_and_subscribe_async: {e}", exc_info=True) - print(f"Error: An error occurred in publish and subscribe: {e}", file=sys.stderr) + logger.error( + f"An error occurred in publish_and_subscribe_async: {e}", exc_info=True + ) + print( + f"Error: An error occurred in publish and subscribe: {e}", + file=sys.stderr, + ) def publish_and_subscribe(self, text: str): """ @@ -340,7 +376,9 @@ class NostrClient: :param text: The content of the text note to publish. """ try: - asyncio.run_coroutine_threadsafe(self.publish_and_subscribe_async(text), self.loop) + asyncio.run_coroutine_threadsafe( + self.publish_and_subscribe_async(text), self.loop + ) except Exception as e: logger.error(f"Error in publish_and_subscribe: {e}", exc_info=True) print(f"Error: Failed to publish and subscribe: {e}", file=sys.stderr) @@ -353,15 +391,19 @@ class NostrClient: """ try: decrypted_data = self.encryption_manager.decrypt_data(encrypted_data) - data = json.loads(decrypted_data.decode('utf-8')) + data = json.loads(decrypted_data.decode("utf-8")) self.save_json_data(data) self.update_checksum() logger.info("Index file updated from Nostr successfully.") - print(colored("Index file updated from Nostr successfully.", 'green')) + print(colored("Index file updated from Nostr successfully.", "green")) except Exception as e: logger.error(f"Failed to decrypt and save data from Nostr: {e}") logger.error(traceback.format_exc()) - print(colored(f"Error: Failed to decrypt and save data from Nostr: {e}", 'red')) + print( + colored( + f"Error: Failed to decrypt and save data from Nostr: {e}", "red" + ) + ) def save_json_data(self, data: dict) -> None: """ @@ -370,17 +412,19 @@ class NostrClient: :param data: The JSON data to save. """ try: - encrypted_data = self.encryption_manager.encrypt_data(json.dumps(data).encode('utf-8')) - index_file_path = self.fingerprint_dir / 'seedpass_passwords_db.json.enc' + encrypted_data = self.encryption_manager.encrypt_data( + json.dumps(data).encode("utf-8") + ) + index_file_path = self.fingerprint_dir / "seedpass_passwords_db.json.enc" with lock_file(index_file_path, fcntl.LOCK_EX): - with open(index_file_path, 'wb') as f: + with open(index_file_path, "wb") as f: f.write(encrypted_data) logger.debug(f"Encrypted data saved to {index_file_path}.") - print(colored(f"Encrypted data saved to '{index_file_path}'.", 'green')) + print(colored(f"Encrypted data saved to '{index_file_path}'.", "green")) except Exception as e: logger.error(f"Failed to save encrypted data: {e}") logger.error(traceback.format_exc()) - print(colored(f"Error: Failed to save encrypted data: {e}", 'red')) + print(colored(f"Error: Failed to save encrypted data: {e}", "red")) raise def update_checksum(self) -> None: @@ -388,28 +432,30 @@ class NostrClient: Updates the checksum file for the password database. """ try: - index_file_path = self.fingerprint_dir / 'seedpass_passwords_db.json.enc' + index_file_path = self.fingerprint_dir / "seedpass_passwords_db.json.enc" decrypted_data = self.decrypt_data_from_file(index_file_path) - content = decrypted_data.decode('utf-8') + content = decrypted_data.decode("utf-8") logger.debug("Calculating checksum of the updated file content.") - checksum = hashlib.sha256(content.encode('utf-8')).hexdigest() + checksum = hashlib.sha256(content.encode("utf-8")).hexdigest() logger.debug(f"New checksum: {checksum}") - checksum_file = self.fingerprint_dir / 'seedpass_passwords_db_checksum.txt' + checksum_file = self.fingerprint_dir / "seedpass_passwords_db_checksum.txt" with lock_file(checksum_file, fcntl.LOCK_EX): - with open(checksum_file, 'w') as f: + with open(checksum_file, "w") as f: f.write(checksum) os.chmod(checksum_file, 0o600) - logger.debug(f"Checksum for '{index_file_path}' updated and written to '{checksum_file}'.") - print(colored(f"Checksum for '{index_file_path}' updated.", 'green')) + logger.debug( + f"Checksum for '{index_file_path}' updated and written to '{checksum_file}'." + ) + print(colored(f"Checksum for '{index_file_path}' updated.", "green")) except Exception as e: logger.error(f"Failed to update checksum: {e}") logger.error(traceback.format_exc()) - print(colored(f"Error: Failed to update checksum: {e}", 'red')) + print(colored(f"Error: Failed to update checksum: {e}", "red")) def decrypt_data_from_file(self, file_path: Path) -> bytes: """ @@ -420,7 +466,7 @@ class NostrClient: """ try: with lock_file(file_path, fcntl.LOCK_SH): - with open(file_path, 'rb') as f: + with open(file_path, "rb") as f: encrypted_data = f.read() decrypted_data = self.encryption_manager.decrypt_data(encrypted_data) logger.debug(f"Data decrypted from file '{file_path}'.") @@ -428,7 +474,11 @@ class NostrClient: except Exception as e: logger.error(f"Failed to decrypt data from file '{file_path}': {e}") logger.error(traceback.format_exc()) - print(colored(f"Error: Failed to decrypt data from file '{file_path}': {e}", 'red')) + print( + colored( + f"Error: Failed to decrypt data from file '{file_path}': {e}", "red" + ) + ) raise def publish_json_to_nostr(self, encrypted_json: bytes, to_pubkey: str = None): @@ -439,10 +489,14 @@ class NostrClient: :param to_pubkey: (Optional) The recipient's public key for encryption. """ try: - encrypted_json_b64 = base64.b64encode(encrypted_json).decode('utf-8') + encrypted_json_b64 = base64.b64encode(encrypted_json).decode("utf-8") logger.debug(f"Encrypted JSON (base64): {encrypted_json_b64}") - event = Event(kind=Event.KIND_TEXT_NOTE, content=encrypted_json_b64, pub_key=self.key_manager.keys.public_key_hex()) + event = Event( + kind=Event.KIND_TEXT_NOTE, + content=encrypted_json_b64, + pub_key=self.key_manager.keys.public_key_hex(), + ) event.created_at = int(time.time()) @@ -471,7 +525,9 @@ class NostrClient: Optional[bytes]: The encrypted data as bytes if successful, None otherwise. """ try: - future = asyncio.run_coroutine_threadsafe(self.retrieve_json_from_nostr_async(), self.loop) + future = asyncio.run_coroutine_threadsafe( + self.retrieve_json_from_nostr_async(), self.loop + ) content_base64 = future.result(timeout=10) if not content_base64: @@ -479,17 +535,22 @@ class NostrClient: return None # Base64-decode the content - encrypted_data = base64.urlsafe_b64decode(content_base64.encode('utf-8')) - logger.debug("Encrypted data retrieved and Base64-decoded successfully from Nostr.") + encrypted_data = base64.urlsafe_b64decode(content_base64.encode("utf-8")) + logger.debug( + "Encrypted data retrieved and Base64-decoded successfully from Nostr." + ) return encrypted_data except concurrent.futures.TimeoutError: logger.error("Timeout occurred while retrieving JSON from Nostr.") - print("Error: Timeout occurred while retrieving JSON from Nostr.", file=sys.stderr) + print( + "Error: Timeout occurred while retrieving JSON from Nostr.", + file=sys.stderr, + ) return None except Exception as e: logger.error(f"Error in retrieve_json_from_nostr: {e}") logger.error(traceback.format_exc()) - print(f"Error: Failed to retrieve JSON from Nostr: {e}", 'red') + print(f"Error: Failed to retrieve JSON from Nostr: {e}", "red") return None def decrypt_and_save_index_from_nostr_public(self, encrypted_data: bytes) -> None: @@ -502,7 +563,7 @@ class NostrClient: self.decrypt_and_save_index_from_nostr(encrypted_data) except Exception as e: logger.error(f"Failed to decrypt and save index from Nostr: {e}") - print(f"Error: Failed to decrypt and save index from Nostr: {e}", 'red') + print(f"Error: Failed to decrypt and save index from Nostr: {e}", "red") async def close_client_pool_async(self): """ @@ -529,14 +590,20 @@ class NostrClient: logger.warning(f"Error unsubscribing from {sub_id}: {e}") # Close all WebSocket connections - if hasattr(self.client_pool, 'clients'): - tasks = [self.safe_close_connection(client) for client in self.client_pool.clients] + if hasattr(self.client_pool, "clients"): + tasks = [ + self.safe_close_connection(client) + for client in self.client_pool.clients + ] await asyncio.gather(*tasks, return_exceptions=True) # Gather and cancel all tasks current_task = asyncio.current_task() - tasks = [task for task in asyncio.all_tasks(loop=self.loop) - if task != current_task and not task.done()] + tasks = [ + task + for task in asyncio.all_tasks(loop=self.loop) + if task != current_task and not task.done() + ] if tasks: logger.debug(f"Cancelling {len(tasks)} pending tasks.") @@ -545,7 +612,9 @@ class NostrClient: # Wait for all tasks to be cancelled with a timeout try: - await asyncio.wait_for(asyncio.gather(*tasks, return_exceptions=True), timeout=5) + await asyncio.wait_for( + asyncio.gather(*tasks, return_exceptions=True), timeout=5 + ) except asyncio.TimeoutError: logger.warning("Timeout waiting for tasks to cancel") @@ -569,36 +638,40 @@ class NostrClient: try: # Schedule the coroutine to close the client pool - future = asyncio.run_coroutine_threadsafe(self.close_client_pool_async(), self.loop) - + future = asyncio.run_coroutine_threadsafe( + self.close_client_pool_async(), self.loop + ) + # Wait for the coroutine to finish with a timeout try: future.result(timeout=10) except concurrent.futures.TimeoutError: logger.warning("Initial shutdown attempt timed out, forcing cleanup...") - + # Additional cleanup regardless of timeout try: self.loop.call_soon_threadsafe(self.loop.stop) # Give a short grace period for the loop to stop time.sleep(0.5) - + if self.loop.is_running(): logger.warning("Loop still running after stop, closing forcefully") self.loop.call_soon_threadsafe(self.loop.close) - + # Wait for the thread with a reasonable timeout if self.loop_thread.is_alive(): self.loop_thread.join(timeout=5) - + if self.loop_thread.is_alive(): - logger.warning("Thread still alive after join, may need to be force-killed") - + logger.warning( + "Thread still alive after join, may need to be force-killed" + ) + except Exception as cleanup_error: logger.error(f"Error during final cleanup: {cleanup_error}") - + logger.info("ClientPool shutdown complete") - + except Exception as e: logger.error(f"Error in close_client_pool: {e}") logger.error(traceback.format_exc()) @@ -610,6 +683,8 @@ class NostrClient: await client.close_connection() logger.debug(f"Closed connection to relay: {client.url}") except AttributeError: - logger.warning(f"Client object has no attribute 'close_connection'. Skipping closure for {client.url}.") + logger.warning( + f"Client object has no attribute 'close_connection'. Skipping closure for {client.url}." + ) except Exception as e: logger.warning(f"Error closing connection to {client.url}: {e}") diff --git a/src/nostr/encryption_manager.py b/src/nostr/encryption_manager.py index 891264f..e3e4055 100644 --- a/src/nostr/encryption_manager.py +++ b/src/nostr/encryption_manager.py @@ -10,11 +10,12 @@ from .key_manager import KeyManager # Instantiate the logger logger = logging.getLogger(__name__) + class EncryptionManager: """ Manages encryption and decryption using Fernet symmetric encryption. """ - + def __init__(self, key_manager: KeyManager): """ Initializes the EncryptionManager with a Fernet instance. @@ -25,24 +26,26 @@ class EncryptionManager: # Derive the raw encryption key (32 bytes) raw_key = key_manager.derive_encryption_key() logger.debug(f"Derived raw encryption key length: {len(raw_key)} bytes") - + # Ensure the raw key is exactly 32 bytes if len(raw_key) != 32: - raise ValueError(f"Derived key length is {len(raw_key)} bytes; expected 32 bytes.") - + raise ValueError( + f"Derived key length is {len(raw_key)} bytes; expected 32 bytes." + ) + # Base64-encode the raw key to make it URL-safe b64_key = base64.urlsafe_b64encode(raw_key) logger.debug(f"Base64-encoded encryption key length: {len(b64_key)} bytes") - + # Initialize Fernet with the base64-encoded key self.fernet = Fernet(b64_key) logger.info("Fernet encryption manager initialized successfully.") - + except Exception as e: logger.error(f"EncryptionManager initialization failed: {e}") logger.error(traceback.format_exc()) raise - + def encrypt_parent_seed(self, seed: str, file_path: str) -> None: """ Encrypts the parent seed and saves it to the specified file. @@ -51,15 +54,15 @@ class EncryptionManager: :param file_path: The file path to save the encrypted seed. """ try: - encrypted_seed = self.fernet.encrypt(seed.encode('utf-8')) - with open(file_path, 'wb') as f: + encrypted_seed = self.fernet.encrypt(seed.encode("utf-8")) + with open(file_path, "wb") as f: f.write(encrypted_seed) logger.debug(f"Parent seed encrypted and saved to '{file_path}'.") except Exception as e: logger.error(f"Failed to encrypt and save parent seed: {e}") logger.error(traceback.format_exc()) raise - + def decrypt_parent_seed(self, file_path: str) -> str: """ Decrypts the parent seed from the specified file. @@ -68,19 +71,23 @@ class EncryptionManager: :return: The decrypted parent seed as a string. """ try: - with open(file_path, 'rb') as f: + with open(file_path, "rb") as f: encrypted_seed = f.read() - decrypted_seed = self.fernet.decrypt(encrypted_seed).decode('utf-8') + decrypted_seed = self.fernet.decrypt(encrypted_seed).decode("utf-8") logger.debug(f"Parent seed decrypted successfully from '{file_path}'.") return decrypted_seed except InvalidToken: - logger.error("Decryption failed: Invalid token. Possibly incorrect password or corrupted file.") - raise ValueError("Decryption failed: Invalid token. Possibly incorrect password or corrupted file.") + logger.error( + "Decryption failed: Invalid token. Possibly incorrect password or corrupted file." + ) + raise ValueError( + "Decryption failed: Invalid token. Possibly incorrect password or corrupted file." + ) except Exception as e: logger.error(f"Failed to decrypt parent seed: {e}") logger.error(traceback.format_exc()) raise - + def encrypt_data(self, data: dict) -> bytes: """ Encrypts a dictionary by serializing it to JSON and then encrypting it. @@ -89,7 +96,7 @@ class EncryptionManager: :return: Encrypted data as bytes. """ try: - json_data = json.dumps(data).encode('utf-8') + json_data = json.dumps(data).encode("utf-8") encrypted = self.fernet.encrypt(json_data) logger.debug("Data encrypted successfully.") return encrypted @@ -97,7 +104,7 @@ class EncryptionManager: logger.error(f"Data encryption failed: {e}") logger.error(traceback.format_exc()) raise - + def decrypt_data(self, encrypted_data: bytes) -> bytes: """ Decrypts encrypted data. diff --git a/src/nostr/event_handler.py b/src/nostr/event_handler.py index 6f8e494..8ff02d5 100644 --- a/src/nostr/event_handler.py +++ b/src/nostr/event_handler.py @@ -8,6 +8,7 @@ from monstr.event.event import Event # Instantiate the logger logger = logging.getLogger(__name__) + class EventHandler: """ Handles incoming Nostr events. @@ -25,7 +26,9 @@ class EventHandler: try: # Assuming evt.created_at is always an integer Unix timestamp if isinstance(evt.created_at, int): - created_at_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(evt.created_at)) + created_at_str = time.strftime( + "%Y-%m-%d %H:%M:%S", time.localtime(evt.created_at) + ) else: # Handle unexpected types gracefully created_at_str = str(evt.created_at) diff --git a/src/nostr/key_manager.py b/src/nostr/key_manager.py index cc89072..2aab346 100644 --- a/src/nostr/key_manager.py +++ b/src/nostr/key_manager.py @@ -11,6 +11,7 @@ from monstr.encrypt import Keys logger = logging.getLogger(__name__) + class KeyManager: """ Manages key generation, encoding, and derivation for NostrClient. @@ -26,9 +27,13 @@ class KeyManager: """ try: if not isinstance(parent_seed, str): - raise TypeError(f"Parent seed must be a string, got {type(parent_seed)}") + raise TypeError( + f"Parent seed must be a string, got {type(parent_seed)}" + ) if not isinstance(fingerprint, str): - raise TypeError(f"Fingerprint must be a string, got {type(fingerprint)}") + raise TypeError( + f"Fingerprint must be a string, got {type(fingerprint)}" + ) self.parent_seed = parent_seed self.fingerprint = fingerprint @@ -72,12 +77,14 @@ class KeyManager: """ try: # Convert fingerprint to an integer index (using a hash function) - index = int(hashlib.sha256(self.fingerprint.encode()).hexdigest(), 16) % (2**31) + index = int(hashlib.sha256(self.fingerprint.encode()).hexdigest(), 16) % ( + 2**31 + ) # Derive entropy for Nostr key (32 bytes) entropy_bytes = self.bip85.derive_entropy( index=index, - bytes_len=32 # Adjust parameter name and value as per your method signature + bytes_len=32, # Adjust parameter name and value as per your method signature ) # Generate Nostr key pair from entropy @@ -107,7 +114,7 @@ class KeyManager: str: The private key in hex. """ return self.keys.private_key_hex() - + def get_npub(self) -> str: """ Returns the npub (Bech32 encoded public key). @@ -119,7 +126,7 @@ class KeyManager: pub_key_hex = self.get_public_key_hex() pub_key_bytes = bytes.fromhex(pub_key_hex) data = convertbits(pub_key_bytes, 8, 5, True) - npub = bech32_encode('npub', data) + npub = bech32_encode("npub", data) return npub except Exception as e: logger.error(f"Failed to generate npub: {e}") diff --git a/src/nostr/utils.py b/src/nostr/utils.py index 33b82d0..6485a82 100644 --- a/src/nostr/utils.py +++ b/src/nostr/utils.py @@ -2,6 +2,7 @@ import logging + # Example utility function (if any specific to nostr package) def some_helper_function(): pass # Implement as needed diff --git a/src/password_manager/encryption.py b/src/password_manager/encryption.py index 24f3817..8d3866e 100644 --- a/src/password_manager/encryption.py +++ b/src/password_manager/encryption.py @@ -30,12 +30,14 @@ import fcntl # For file locking # Instantiate the logger logger = logging.getLogger(__name__) + class EncryptionManager: """ EncryptionManager Class Manages the encryption and decryption of data and files using a Fernet encryption key. """ + def __init__(self, encryption_key: bytes, fingerprint_dir: Path): """ Initializes the EncryptionManager with the provided encryption key and fingerprint directory. @@ -45,16 +47,20 @@ class EncryptionManager: fingerprint_dir (Path): The directory corresponding to the fingerprint. """ self.fingerprint_dir = fingerprint_dir - self.parent_seed_file = self.fingerprint_dir / 'parent_seed.enc' + self.parent_seed_file = self.fingerprint_dir / "parent_seed.enc" self.key = encryption_key try: self.fernet = Fernet(self.key) logger.debug(f"EncryptionManager initialized for {self.fingerprint_dir}") except Exception as e: - logger.error(f"Failed to initialize Fernet with provided encryption key: {e}") + logger.error( + f"Failed to initialize Fernet with provided encryption key: {e}" + ) logger.error(traceback.format_exc()) - print(colored(f"Error: Failed to initialize encryption manager: {e}", 'red')) + print( + colored(f"Error: Failed to initialize encryption manager: {e}", "red") + ) raise def encrypt_parent_seed(self, parent_seed: str) -> None: @@ -65,25 +71,32 @@ class EncryptionManager: """ try: # Convert seed to bytes - data = parent_seed.encode('utf-8') + data = parent_seed.encode("utf-8") # Encrypt the data encrypted_data = self.encrypt_data(data) # Write the encrypted data to the file with locking with lock_file(self.parent_seed_file, fcntl.LOCK_EX): - with open(self.parent_seed_file, 'wb') as f: + with open(self.parent_seed_file, "wb") as f: f.write(encrypted_data) # Set file permissions to read/write for the user only os.chmod(self.parent_seed_file, 0o600) - logger.info(f"Parent seed encrypted and saved to '{self.parent_seed_file}'.") - print(colored(f"Parent seed encrypted and saved to '{self.parent_seed_file}'.", 'green')) + logger.info( + f"Parent seed encrypted and saved to '{self.parent_seed_file}'." + ) + print( + colored( + f"Parent seed encrypted and saved to '{self.parent_seed_file}'.", + "green", + ) + ) except Exception as e: logger.error(f"Failed to encrypt and save parent seed: {e}") logger.error(traceback.format_exc()) - print(colored(f"Error: Failed to encrypt and save parent seed: {e}", 'red')) + print(colored(f"Error: Failed to encrypt and save parent seed: {e}", "red")) raise def decrypt_parent_seed(self) -> str: @@ -93,24 +106,28 @@ class EncryptionManager: :return: The decrypted parent seed. """ try: - parent_seed_path = self.fingerprint_dir / 'parent_seed.enc' + parent_seed_path = self.fingerprint_dir / "parent_seed.enc" with lock_file(parent_seed_path, fcntl.LOCK_SH): - with open(parent_seed_path, 'rb') as f: + with open(parent_seed_path, "rb") as f: encrypted_data = f.read() decrypted_data = self.decrypt_data(encrypted_data) - parent_seed = decrypted_data.decode('utf-8').strip() + parent_seed = decrypted_data.decode("utf-8").strip() - logger.debug(f"Parent seed decrypted successfully from '{parent_seed_path}'.") + logger.debug( + f"Parent seed decrypted successfully from '{parent_seed_path}'." + ) return parent_seed except InvalidToken: - logger.error("Invalid encryption key or corrupted data while decrypting parent seed.") - print(colored("Error: Invalid encryption key or corrupted data.", 'red')) + logger.error( + "Invalid encryption key or corrupted data while decrypting parent seed." + ) + print(colored("Error: Invalid encryption key or corrupted data.", "red")) raise except Exception as e: logger.error(f"Failed to decrypt parent seed: {e}") logger.error(traceback.format_exc()) - print(colored(f"Error: Failed to decrypt parent seed: {e}", 'red')) + print(colored(f"Error: Failed to decrypt parent seed: {e}", "red")) raise def encrypt_data(self, data: bytes) -> bytes: @@ -127,7 +144,7 @@ class EncryptionManager: except Exception as e: logger.error(f"Failed to encrypt data: {e}") logger.error(traceback.format_exc()) - print(colored(f"Error: Failed to encrypt data: {e}", 'red')) + print(colored(f"Error: Failed to encrypt data: {e}", "red")) raise def decrypt_data(self, encrypted_data: bytes) -> bytes: @@ -142,13 +159,15 @@ class EncryptionManager: logger.debug("Data decrypted successfully.") return decrypted_data except InvalidToken: - logger.error("Invalid encryption key or corrupted data while decrypting data.") - print(colored("Error: Invalid encryption key or corrupted data.", 'red')) + logger.error( + "Invalid encryption key or corrupted data while decrypting data." + ) + print(colored("Error: Invalid encryption key or corrupted data.", "red")) raise except Exception as e: logger.error(f"Failed to decrypt data: {e}") logger.error(traceback.format_exc()) - print(colored(f"Error: Failed to decrypt data: {e}", 'red')) + print(colored(f"Error: Failed to decrypt data: {e}", "red")) raise def encrypt_and_save_file(self, data: bytes, relative_path: Path) -> None: @@ -170,18 +189,23 @@ class EncryptionManager: # Write the encrypted data to the file with locking with lock_file(file_path, fcntl.LOCK_EX): - with open(file_path, 'wb') as f: + with open(file_path, "wb") as f: f.write(encrypted_data) # Set file permissions to read/write for the user only os.chmod(file_path, 0o600) logger.info(f"Data encrypted and saved to '{file_path}'.") - print(colored(f"Data encrypted and saved to '{file_path}'.", 'green')) + print(colored(f"Data encrypted and saved to '{file_path}'.", "green")) except Exception as e: logger.error(f"Failed to encrypt and save data to '{relative_path}': {e}") logger.error(traceback.format_exc()) - print(colored(f"Error: Failed to encrypt and save data to '{relative_path}': {e}", 'red')) + print( + colored( + f"Error: Failed to encrypt and save data to '{relative_path}': {e}", + "red", + ) + ) raise def decrypt_file(self, relative_path: Path) -> bytes: @@ -197,7 +221,7 @@ class EncryptionManager: # Read the encrypted data with locking with lock_file(file_path, fcntl.LOCK_SH): - with open(file_path, 'rb') as f: + with open(file_path, "rb") as f: encrypted_data = f.read() # Decrypt the data @@ -205,13 +229,19 @@ class EncryptionManager: logger.debug(f"Data decrypted successfully from '{file_path}'.") return decrypted_data except InvalidToken: - logger.error("Invalid encryption key or corrupted data while decrypting file.") - print(colored("Error: Invalid encryption key or corrupted data.", 'red')) + logger.error( + "Invalid encryption key or corrupted data while decrypting file." + ) + print(colored("Error: Invalid encryption key or corrupted data.", "red")) raise except Exception as e: logger.error(f"Failed to decrypt data from '{relative_path}': {e}") logger.error(traceback.format_exc()) - print(colored(f"Error: Failed to decrypt data from '{relative_path}': {e}", 'red')) + print( + colored( + f"Error: Failed to decrypt data from '{relative_path}': {e}", "red" + ) + ) raise def save_json_data(self, data: dict, relative_path: Optional[Path] = None) -> None: @@ -223,16 +253,22 @@ class EncryptionManager: Defaults to 'seedpass_passwords_db.json.enc'. """ if relative_path is None: - relative_path = Path('seedpass_passwords_db.json.enc') + relative_path = Path("seedpass_passwords_db.json.enc") try: - json_data = json.dumps(data, indent=4).encode('utf-8') + json_data = json.dumps(data, indent=4).encode("utf-8") self.encrypt_and_save_file(json_data, relative_path) logger.debug(f"JSON data encrypted and saved to '{relative_path}'.") - print(colored(f"JSON data encrypted and saved to '{relative_path}'.", 'green')) + print( + colored(f"JSON data encrypted and saved to '{relative_path}'.", "green") + ) except Exception as e: logger.error(f"Failed to save JSON data to '{relative_path}': {e}") logger.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: Failed to save JSON data to '{relative_path}': {e}", 'red')) + print( + colored( + f"Error: Failed to save JSON data to '{relative_path}': {e}", "red" + ) + ) raise def load_json_data(self, relative_path: Optional[Path] = None) -> dict: @@ -244,35 +280,54 @@ class EncryptionManager: :return: The decrypted JSON data as a dictionary. """ if relative_path is None: - relative_path = Path('seedpass_passwords_db.json.enc') + relative_path = Path("seedpass_passwords_db.json.enc") file_path = self.fingerprint_dir / relative_path if not file_path.exists(): - logger.info(f"Index file '{file_path}' does not exist. Initializing empty data.") - print(colored(f"Info: Index file '{file_path}' not found. Initializing new password database.", 'yellow')) - return {'passwords': {}} + logger.info( + f"Index file '{file_path}' does not exist. Initializing empty data." + ) + print( + colored( + f"Info: Index file '{file_path}' not found. Initializing new password database.", + "yellow", + ) + ) + return {"passwords": {}} try: decrypted_data = self.decrypt_file(relative_path) - json_content = decrypted_data.decode('utf-8').strip() + json_content = decrypted_data.decode("utf-8").strip() data = json.loads(json_content) logger.debug(f"JSON data loaded and decrypted from '{file_path}': {data}") - print(colored(f"JSON data loaded and decrypted from '{file_path}'.", 'green')) + print( + colored(f"JSON data loaded and decrypted from '{file_path}'.", "green") + ) return data except json.JSONDecodeError as e: logger.error(f"Failed to decode JSON data from '{file_path}': {e}") logger.error(traceback.format_exc()) - print(colored(f"Error: Failed to decode JSON data from '{file_path}': {e}", 'red')) + print( + colored( + f"Error: Failed to decode JSON data from '{file_path}': {e}", "red" + ) + ) raise except InvalidToken: - logger.error("Invalid encryption key or corrupted data while decrypting JSON data.") - print(colored("Error: Invalid encryption key or corrupted data.", 'red')) + logger.error( + "Invalid encryption key or corrupted data while decrypting JSON data." + ) + print(colored("Error: Invalid encryption key or corrupted data.", "red")) raise except Exception as e: logger.error(f"Failed to load JSON data from '{file_path}': {e}") logger.error(traceback.format_exc()) - print(colored(f"Error: Failed to load JSON data from '{file_path}': {e}", 'red')) + print( + colored( + f"Error: Failed to load JSON data from '{file_path}': {e}", "red" + ) + ) raise def update_checksum(self, relative_path: Optional[Path] = None) -> None: @@ -283,32 +338,39 @@ class EncryptionManager: Defaults to 'seedpass_passwords_db.json.enc'. """ if relative_path is None: - relative_path = Path('seedpass_passwords_db.json.enc') + relative_path = Path("seedpass_passwords_db.json.enc") try: file_path = self.fingerprint_dir / relative_path decrypted_data = self.decrypt_file(relative_path) - content = decrypted_data.decode('utf-8') + content = decrypted_data.decode("utf-8") logger.debug("Calculating checksum of the updated file content.") - checksum = hashlib.sha256(content.encode('utf-8')).hexdigest() + checksum = hashlib.sha256(content.encode("utf-8")).hexdigest() logger.debug(f"New checksum: {checksum}") checksum_file = file_path.parent / f"{file_path.stem}_checksum.txt" # Write the checksum to the file with locking with lock_file(checksum_file, fcntl.LOCK_EX): - with open(checksum_file, 'w') as f: + with open(checksum_file, "w") as f: f.write(checksum) # Set file permissions to read/write for the user only os.chmod(checksum_file, 0o600) - logger.debug(f"Checksum for '{file_path}' updated and written to '{checksum_file}'.") - print(colored(f"Checksum for '{file_path}' updated.", 'green')) + logger.debug( + f"Checksum for '{file_path}' updated and written to '{checksum_file}'." + ) + print(colored(f"Checksum for '{file_path}' updated.", "green")) except Exception as e: logger.error(f"Failed to update checksum for '{relative_path}': {e}") logger.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: Failed to update checksum for '{relative_path}': {e}", 'red')) + print( + colored( + f"Error: Failed to update checksum for '{relative_path}': {e}", + "red", + ) + ) raise def get_encrypted_index(self) -> Optional[bytes]: @@ -318,14 +380,20 @@ class EncryptionManager: :return: Encrypted data as bytes or None if the index file does not exist. """ try: - relative_path = Path('seedpass_passwords_db.json.enc') + relative_path = Path("seedpass_passwords_db.json.enc") if not (self.fingerprint_dir / relative_path).exists(): - logger.error(f"Index file '{relative_path}' does not exist in '{self.fingerprint_dir}'.") - print(colored(f"Error: Index file '{relative_path}' does not exist.", 'red')) + logger.error( + f"Index file '{relative_path}' does not exist in '{self.fingerprint_dir}'." + ) + print( + colored( + f"Error: Index file '{relative_path}' does not exist.", "red" + ) + ) return None with lock_file(self.fingerprint_dir / relative_path, fcntl.LOCK_SH): - with open(self.fingerprint_dir / relative_path, 'rb') as file: + with open(self.fingerprint_dir / relative_path, "rb") as file: encrypted_data = file.read() logger.debug(f"Encrypted index data read from '{relative_path}'.") @@ -333,10 +401,17 @@ class EncryptionManager: except Exception as e: logger.error(f"Failed to read encrypted index file '{relative_path}': {e}") logger.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: Failed to read encrypted index file '{relative_path}': {e}", 'red')) + print( + colored( + f"Error: Failed to read encrypted index file '{relative_path}': {e}", + "red", + ) + ) return None - def decrypt_and_save_index_from_nostr(self, encrypted_data: bytes, relative_path: Optional[Path] = None) -> None: + def decrypt_and_save_index_from_nostr( + self, encrypted_data: bytes, relative_path: Optional[Path] = None + ) -> None: """ Decrypts the encrypted data retrieved from Nostr and updates the local index file. @@ -345,18 +420,22 @@ class EncryptionManager: Defaults to 'seedpass_passwords_db.json.enc'. """ if relative_path is None: - relative_path = Path('seedpass_passwords_db.json.enc') + relative_path = Path("seedpass_passwords_db.json.enc") try: decrypted_data = self.decrypt_data(encrypted_data) - data = json.loads(decrypted_data.decode('utf-8')) + data = json.loads(decrypted_data.decode("utf-8")) self.save_json_data(data, relative_path) self.update_checksum(relative_path) logger.info("Index file updated from Nostr successfully.") - print(colored("Index file updated from Nostr successfully.", 'green')) + print(colored("Index file updated from Nostr successfully.", "green")) except Exception as e: logger.error(f"Failed to decrypt and save data from Nostr: {e}") logger.error(traceback.format_exc()) - print(colored(f"Error: Failed to decrypt and save data from Nostr: {e}", 'red')) + print( + colored( + f"Error: Failed to decrypt and save data from Nostr: {e}", "red" + ) + ) # Re-raise the exception to inform the calling function of the failure raise @@ -371,7 +450,9 @@ class EncryptionManager: words = seed_phrase.split() if len(words) != 12: logger.error("Seed phrase does not contain exactly 12 words.") - print(colored("Error: Seed phrase must contain exactly 12 words.", 'red')) + print( + colored("Error: Seed phrase must contain exactly 12 words.", "red") + ) return False # Additional validation can be added here (e.g., word list checks) logger.debug("Seed phrase validated successfully.") @@ -379,7 +460,7 @@ class EncryptionManager: except Exception as e: logging.error(f"Error validating seed phrase: {e}") logging.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: Failed to validate seed phrase: {e}", 'red')) + print(colored(f"Error: Failed to validate seed phrase: {e}", "red")) return False def derive_seed_from_mnemonic(self, mnemonic: str, passphrase: str = "") -> bytes: @@ -399,11 +480,12 @@ class EncryptionManager: if not isinstance(mnemonic, str): raise TypeError("Mnemonic must be a string after conversion") from bip_utils import Bip39SeedGenerator + seed = Bip39SeedGenerator(mnemonic).Generate(passphrase) logger.debug("Seed derived successfully from mnemonic.") return seed except Exception as e: logger.error(f"Failed to derive seed from mnemonic: {e}") logger.error(traceback.format_exc()) - print(colored(f"Error: Failed to derive seed from mnemonic: {e}", 'red')) + print(colored(f"Error: Failed to derive seed from mnemonic: {e}", "red")) raise diff --git a/src/password_manager/manager.py b/src/password_manager/manager.py index b18a335..c891417 100644 --- a/src/password_manager/manager.py +++ b/src/password_manager/manager.py @@ -122,36 +122,36 @@ class PasswordManager: Prompts the user to select an existing fingerprint or add a new one. """ try: - print(colored("\nAvailable Fingerprints:", "cyan")) + print(colored("\nAvailable Seed Profiles:", "cyan")) fingerprints = self.fingerprint_manager.list_fingerprints() for idx, fp in enumerate(fingerprints, start=1): print(colored(f"{idx}. {fp}", "cyan")) - print(colored(f"{len(fingerprints)+1}. Add a new fingerprint", "cyan")) + print(colored(f"{len(fingerprints)+1}. Add a new seed profile", "cyan")) - choice = input("Select a fingerprint by number: ").strip() + choice = input("Select a seed profile by number: ").strip() if not choice.isdigit() or not (1 <= int(choice) <= len(fingerprints) + 1): print(colored("Invalid selection. Exiting.", "red")) sys.exit(1) choice = int(choice) if choice == len(fingerprints) + 1: - # Add a new fingerprint + # Add a new seed profile self.add_new_fingerprint() else: - # Select existing fingerprint + # Select existing seed profile selected_fingerprint = fingerprints[choice - 1] self.select_fingerprint(selected_fingerprint) except Exception as e: - logger.error(f"Error during fingerprint selection: {e}") + logger.error(f"Error during seed profile selection: {e}") logger.error(traceback.format_exc()) - print(colored(f"Error: Failed to select fingerprint: {e}", "red")) + print(colored(f"Error: Failed to select seed profile: {e}", "red")) sys.exit(1) def add_new_fingerprint(self): """ - Adds a new fingerprint by generating it from a seed phrase. + Adds a new seed profile by generating it from a seed phrase. """ try: choice = input( @@ -169,15 +169,15 @@ class PasswordManager: self.fingerprint_manager.current_fingerprint = fingerprint print( colored( - f"New fingerprint '{fingerprint}' added and set as current.", + f"New seed profile '{fingerprint}' added and set as current.", "green", ) ) except Exception as e: - logger.error(f"Error adding new fingerprint: {e}") + logger.error(f"Error adding new seed profile: {e}") logger.error(traceback.format_exc()) - print(colored(f"Error: Failed to add new fingerprint: {e}", "red")) + print(colored(f"Error: Failed to add new seed profile: {e}", "red")) sys.exit(1) def select_fingerprint(self, fingerprint: str) -> None: @@ -189,7 +189,7 @@ class PasswordManager: if not self.fingerprint_dir: print( colored( - f"Error: Fingerprint directory for {fingerprint} not found.", + f"Error: Seed profile directory for {fingerprint} not found.", "red", ) ) @@ -203,12 +203,12 @@ class PasswordManager: self.sync_index_from_nostr_if_missing() print( colored( - f"Fingerprint {fingerprint} selected and managers initialized.", + f"Seed profile {fingerprint} selected and managers initialized.", "green", ) ) else: - print(colored(f"Error: Fingerprint {fingerprint} not found.", "red")) + print(colored(f"Error: Seed profile {fingerprint} not found.", "red")) sys.exit(1) def setup_encryption_manager( @@ -267,18 +267,18 @@ class PasswordManager: def handle_switch_fingerprint(self) -> bool: """ - Handles switching to a different fingerprint. + Handles switching to a different seed profile. Returns: bool: True if switch was successful, False otherwise. """ try: - print(colored("\nAvailable Fingerprints:", "cyan")) + print(colored("\nAvailable Seed Profiles:", "cyan")) fingerprints = self.fingerprint_manager.list_fingerprints() for idx, fp in enumerate(fingerprints, start=1): print(colored(f"{idx}. {fp}", "cyan")) - choice = input("Select a fingerprint by number to switch: ").strip() + choice = input("Select a seed profile by number to switch: ").strip() if not choice.isdigit() or not (1 <= int(choice) <= len(fingerprints)): print(colored("Invalid selection. Returning to main menu.", "red")) return False # Return False to indicate failure @@ -294,26 +294,26 @@ class PasswordManager: if not self.fingerprint_dir: print( colored( - f"Error: Fingerprint directory for {selected_fingerprint} not found.", + f"Error: Seed profile directory for {selected_fingerprint} not found.", "red", ) ) return False # Return False to indicate failure - # Prompt for master password for the selected fingerprint + # Prompt for master password for the selected seed profile password = prompt_existing_password("Enter your master password: ") - # Set up the encryption manager with the new password and fingerprint directory + # Set up the encryption manager with the new password and seed profile directory self.setup_encryption_manager(self.fingerprint_dir, password) - # Load the parent seed for the selected fingerprint + # Load the parent seed for the selected seed profile self.load_parent_seed(self.fingerprint_dir) # Initialize BIP85 and other managers self.initialize_bip85() self.initialize_managers() self.sync_index_from_nostr_if_missing() - print(colored(f"Switched to fingerprint {selected_fingerprint}.", "green")) + print(colored(f"Switched to seed profile {selected_fingerprint}.", "green")) # Re-initialize NostrClient with the new fingerprint try: @@ -322,7 +322,7 @@ class PasswordManager: fingerprint=self.current_fingerprint, ) logging.info( - f"NostrClient re-initialized with fingerprint {self.current_fingerprint}." + f"NostrClient re-initialized with seed profile {self.current_fingerprint}." ) except Exception as e: logging.error(f"Failed to re-initialize NostrClient: {e}") @@ -334,9 +334,9 @@ class PasswordManager: return True # Return True to indicate success except Exception as e: - logging.error(f"Error during fingerprint switching: {e}") + logging.error(f"Error during seed profile switching: {e}") logging.error(traceback.format_exc()) - print(colored(f"Error: Failed to switch fingerprints: {e}", "red")) + print(colored(f"Error: Failed to switch seed profiles: {e}", "red")) return False # Return False to indicate failure def handle_existing_seed(self) -> None: @@ -355,22 +355,22 @@ class PasswordManager: if not self.fingerprint_manager: self.initialize_fingerprint_manager() - # Prompt the user to select an existing fingerprint + # Prompt the user to select an existing seed profile fingerprints = self.fingerprint_manager.list_fingerprints() if not fingerprints: print( colored( - "No fingerprints available. Please add a fingerprint first.", + "No seed profiles available. Please add a seed profile first.", "red", ) ) sys.exit(1) - print(colored("Available Fingerprints:", "cyan")) + print(colored("Available Seed Profiles:", "cyan")) for idx, fp in enumerate(fingerprints, start=1): print(colored(f"{idx}. {fp}", "cyan")) - choice = input("Select a fingerprint by number: ").strip() + choice = input("Select a seed profile by number: ").strip() if not choice.isdigit() or not (1 <= int(choice) <= len(fingerprints)): print(colored("Invalid selection. Exiting.", "red")) sys.exit(1) @@ -381,7 +381,7 @@ class PasswordManager: selected_fingerprint ) if not fingerprint_dir: - print(colored("Error: Fingerprint directory not found.", "red")) + print(colored("Error: Seed profile directory not found.", "red")) sys.exit(1) # Initialize EncryptionManager with key and fingerprint_dir @@ -442,7 +442,7 @@ class PasswordManager: if not fingerprint: print( colored( - "Error: Failed to generate fingerprint for the provided seed.", + "Error: Failed to generate seed profile for the provided seed.", "red", ) ) @@ -454,7 +454,7 @@ class PasswordManager: if not fingerprint_dir: print( colored( - "Error: Failed to retrieve fingerprint directory.", "red" + "Error: Failed to retrieve seed profile directory.", "red" ) ) sys.exit(1) @@ -463,7 +463,7 @@ class PasswordManager: self.current_fingerprint = fingerprint self.fingerprint_manager.current_fingerprint = fingerprint self.fingerprint_dir = fingerprint_dir - logging.info(f"Current fingerprint set to {fingerprint}") + logging.info(f"Current seed profile set to {fingerprint}") # Initialize EncryptionManager with key and fingerprint_dir password = prompt_for_password() @@ -514,7 +514,8 @@ class PasswordManager: if not fingerprint: print( colored( - "Error: Failed to generate fingerprint for the new seed.", "red" + "Error: Failed to generate seed profile for the new seed.", + "red", ) ) sys.exit(1) @@ -524,14 +525,14 @@ class PasswordManager: ) if not fingerprint_dir: print( - colored("Error: Failed to retrieve fingerprint directory.", "red") + colored("Error: Failed to retrieve seed profile directory.", "red") ) sys.exit(1) # Set the current fingerprint in both PasswordManager and FingerprintManager self.current_fingerprint = fingerprint self.fingerprint_manager.current_fingerprint = fingerprint - logging.info(f"Current fingerprint set to {fingerprint}") + logging.info(f"Current seed profile set to {fingerprint}") # Now, save and encrypt the seed with the fingerprint_dir self.save_and_encrypt_seed(new_seed, fingerprint_dir) @@ -696,7 +697,7 @@ class PasswordManager: except Exception as e: logger.warning(f"Unable to sync index from Nostr: {e}") - def handle_generate_password(self) -> None: + def handle_add_password(self) -> None: try: website_name = input("Enter the website name: ").strip() if not website_name: @@ -735,17 +736,31 @@ class PasswordManager: # Provide user feedback print( colored( - f"\n[+] Password generated and indexed with ID {index}.\n", "green" + f"\n[+] Password generated and indexed with ID {index}.\n", + "green", ) ) print(colored(f"Password for {website_name}: {password}\n", "yellow")) + # Automatically push the updated encrypted index to Nostr so the + # latest changes are backed up remotely. + try: + encrypted_data = self.get_encrypted_data() + if encrypted_data: + self.nostr_client.publish_json_to_nostr(encrypted_data) + logging.info( + "Encrypted index posted to Nostr after entry addition." + ) + except Exception as nostr_error: + logging.error(f"Failed to post updated index to Nostr: {nostr_error}") + logging.error(traceback.format_exc()) + except Exception as e: logging.error(f"Error during password generation: {e}") logging.error(traceback.format_exc()) print(colored(f"Error: Failed to generate password: {e}", "red")) - def handle_retrieve_password(self) -> None: + def handle_retrieve_entry(self) -> None: """ Handles retrieving a password from the index by prompting the user for the index number and displaying the corresponding password and associated details. @@ -1002,7 +1017,7 @@ class PasswordManager: Handles the backup and reveal of the parent seed. """ try: - print(colored("\n=== Backup/Reveal Parent Seed ===", "yellow")) + print(colored("\n=== Backup Parent Seed ===", "yellow")) print( colored( "Warning: Revealing your parent seed is a highly sensitive operation.", @@ -1207,8 +1222,8 @@ if __name__ == "__main__": # Example operations # These would typically be triggered by user interactions, e.g., via a CLI menu - # manager.handle_generate_password() - # manager.handle_retrieve_password() + # manager.handle_add_password() + # manager.handle_retrieve_entry() # manager.handle_modify_entry() # manager.handle_verify_checksum() # manager.nostr_client.publish_and_subscribe("Sample password data") diff --git a/src/password_manager/password_generation.py b/src/password_manager/password_generation.py index 4ab0f2c..fa3049e 100644 --- a/src/password_manager/password_generation.py +++ b/src/password_manager/password_generation.py @@ -35,6 +35,7 @@ from password_manager.encryption import EncryptionManager # Instantiate the logger logger = logging.getLogger(__name__) + class PasswordGenerator: """ PasswordGenerator Class @@ -44,7 +45,9 @@ class PasswordGenerator: complexity requirements. """ - def __init__(self, encryption_manager: EncryptionManager, parent_seed: str, bip85: BIP85): + def __init__( + self, encryption_manager: EncryptionManager, parent_seed: str, bip85: BIP85 + ): """ Initializes the PasswordGenerator with the encryption manager, parent seed, and BIP85 instance. @@ -59,16 +62,20 @@ class PasswordGenerator: self.bip85 = bip85 # Derive seed bytes from parent_seed using BIP39 (handled by EncryptionManager) - self.seed_bytes = self.encryption_manager.derive_seed_from_mnemonic(self.parent_seed) + self.seed_bytes = self.encryption_manager.derive_seed_from_mnemonic( + self.parent_seed + ) logger.debug("PasswordGenerator initialized successfully.") except Exception as e: logger.error(f"Failed to initialize PasswordGenerator: {e}") logger.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: Failed to initialize PasswordGenerator: {e}", 'red')) + print(colored(f"Error: Failed to initialize PasswordGenerator: {e}", "red")) raise - def generate_password(self, length: int = DEFAULT_PASSWORD_LENGTH, index: int = 0) -> str: + def generate_password( + self, length: int = DEFAULT_PASSWORD_LENGTH, index: int = 0 + ) -> str: """ Generates a deterministic password based on the parent seed, desired length, and index. @@ -90,11 +97,19 @@ class PasswordGenerator: try: # Validate password length if length < MIN_PASSWORD_LENGTH: - logger.error(f"Password length must be at least {MIN_PASSWORD_LENGTH} characters.") - raise ValueError(f"Password length must be at least {MIN_PASSWORD_LENGTH} characters.") + logger.error( + f"Password length must be at least {MIN_PASSWORD_LENGTH} characters." + ) + raise ValueError( + f"Password length must be at least {MIN_PASSWORD_LENGTH} characters." + ) if length > MAX_PASSWORD_LENGTH: - logger.error(f"Password length must not exceed {MAX_PASSWORD_LENGTH} characters.") - raise ValueError(f"Password length must not exceed {MAX_PASSWORD_LENGTH} characters.") + logger.error( + f"Password length must not exceed {MAX_PASSWORD_LENGTH} characters." + ) + raise ValueError( + f"Password length must not exceed {MAX_PASSWORD_LENGTH} characters." + ) # Derive entropy using BIP-85 entropy = self.bip85.derive_entropy(index=index, bytes_len=64, app_no=32) @@ -105,39 +120,43 @@ class PasswordGenerator: algorithm=hashes.SHA256(), length=32, # 256 bits for AES-256 salt=None, - info=b'password-generation', - backend=default_backend() + info=b"password-generation", + backend=default_backend(), ) derived_key = hkdf.derive(entropy) logger.debug(f"Derived key using HKDF: {derived_key.hex()}") # Use PBKDF2-HMAC-SHA256 to derive a key from entropy - dk = hashlib.pbkdf2_hmac('sha256', entropy, b'', 100000) + dk = hashlib.pbkdf2_hmac("sha256", entropy, b"", 100000) logger.debug(f"Derived key using PBKDF2: {dk.hex()}") # Map the derived key to all allowed characters all_allowed = string.ascii_letters + string.digits + string.punctuation - password = ''.join(all_allowed[byte % len(all_allowed)] for byte in dk) - logger.debug(f"Password after mapping to all allowed characters: {password}") + password = "".join(all_allowed[byte % len(all_allowed)] for byte in dk) + logger.debug( + f"Password after mapping to all allowed characters: {password}" + ) # Ensure the password meets complexity requirements password = self.ensure_complexity(password, all_allowed, dk) logger.debug(f"Password after ensuring complexity: {password}") # Shuffle characters deterministically based on dk - shuffle_seed = int.from_bytes(dk, 'big') + shuffle_seed = int.from_bytes(dk, "big") rng = random.Random(shuffle_seed) password_chars = list(password) rng.shuffle(password_chars) - password = ''.join(password_chars) + password = "".join(password_chars) logger.debug("Shuffled password deterministically.") # Ensure password length by extending if necessary if len(password) < length: while len(password) < length: - dk = hashlib.pbkdf2_hmac('sha256', dk, b'', 1) - base64_extra = ''.join(all_allowed[byte % len(all_allowed)] for byte in dk) - password += ''.join(base64_extra) + dk = hashlib.pbkdf2_hmac("sha256", dk, b"", 1) + base64_extra = "".join( + all_allowed[byte % len(all_allowed)] for byte in dk + ) + password += "".join(base64_extra) logger.debug(f"Extended password: {password}") # Trim the password to the desired length @@ -149,7 +168,7 @@ class PasswordGenerator: except Exception as e: logger.error(f"Error generating password: {e}") logger.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: Failed to generate password: {e}", 'red')) + print(colored(f"Error: Failed to generate password: {e}", "red")) raise def ensure_complexity(self, password: str, alphabet: str, dk: bytes) -> str: @@ -180,7 +199,9 @@ class PasswordGenerator: current_digits = sum(1 for c in password_chars if c in digits) current_special = sum(1 for c in password_chars if c in special) - logger.debug(f"Current character counts - Upper: {current_upper}, Lower: {current_lower}, Digits: {current_digits}, Special: {current_special}") + logger.debug( + f"Current character counts - Upper: {current_upper}, Lower: {current_lower}, Digits: {current_digits}, Special: {current_special}" + ) # Set minimum counts min_upper = 2 @@ -204,14 +225,18 @@ class PasswordGenerator: index = get_dk_value() % len(password_chars) char = uppercase[get_dk_value() % len(uppercase)] password_chars[index] = char - logger.debug(f"Added uppercase letter '{char}' at position {index}.") + logger.debug( + f"Added uppercase letter '{char}' at position {index}." + ) if current_lower < min_lower: for _ in range(min_lower - current_lower): index = get_dk_value() % len(password_chars) char = lowercase[get_dk_value() % len(lowercase)] password_chars[index] = char - logger.debug(f"Added lowercase letter '{char}' at position {index}.") + logger.debug( + f"Added lowercase letter '{char}' at position {index}." + ) if current_digits < min_digits: for _ in range(min_digits - current_digits): @@ -225,7 +250,9 @@ class PasswordGenerator: index = get_dk_value() % len(password_chars) char = special[get_dk_value() % len(special)] password_chars[index] = char - logger.debug(f"Added special character '{char}' at position {index}.") + logger.debug( + f"Added special character '{char}' at position {index}." + ) # Additional deterministic inclusion of symbols to increase score symbol_target = 3 # Increase target number of symbols @@ -253,11 +280,15 @@ class PasswordGenerator: if i == 0 and password_chars[j] not in uppercase: char = uppercase[get_dk_value() % len(uppercase)] password_chars[j] = char - logger.debug(f"Assigned uppercase letter '{char}' to position {j}.") + logger.debug( + f"Assigned uppercase letter '{char}' to position {j}." + ) elif i == 1 and password_chars[j] not in lowercase: char = lowercase[get_dk_value() % len(lowercase)] password_chars[j] = char - logger.debug(f"Assigned lowercase letter '{char}' to position {j}.") + logger.debug( + f"Assigned lowercase letter '{char}' to position {j}." + ) elif i == 2 and password_chars[j] not in digits: char = digits[get_dk_value() % len(digits)] password_chars[j] = char @@ -265,10 +296,14 @@ class PasswordGenerator: elif i == 3 and password_chars[j] not in special: char = special[get_dk_value() % len(special)] password_chars[j] = char - logger.debug(f"Assigned special character '{char}' to position {j}.") + logger.debug( + f"Assigned special character '{char}' to position {j}." + ) # Shuffle again to distribute the characters more evenly - shuffle_seed = int.from_bytes(dk, 'big') + dk_index # Modify seed to vary shuffle + shuffle_seed = ( + int.from_bytes(dk, "big") + dk_index + ) # Modify seed to vary shuffle rng = random.Random(shuffle_seed) rng.shuffle(password_chars) logger.debug(f"Shuffled password characters for balanced distribution.") @@ -278,12 +313,14 @@ class PasswordGenerator: final_lower = sum(1 for c in password_chars if c in lowercase) final_digits = sum(1 for c in password_chars if c in digits) final_special = sum(1 for c in password_chars if c in special) - logger.debug(f"Final character counts - Upper: {final_upper}, Lower: {final_lower}, Digits: {final_digits}, Special: {final_special}") + logger.debug( + f"Final character counts - Upper: {final_upper}, Lower: {final_lower}, Digits: {final_digits}, Special: {final_special}" + ) - return ''.join(password_chars) + return "".join(password_chars) except Exception as e: logger.error(f"Error ensuring password complexity: {e}") logger.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: Failed to ensure password complexity: {e}", 'red')) + print(colored(f"Error: Failed to ensure password complexity: {e}", "red")) raise diff --git a/src/repo-context.txt b/src/repo-context.txt index a9d7f9c..30f018d 100644 --- a/src/repo-context.txt +++ b/src/repo-context.txt @@ -350,7 +350,7 @@ def display_menu(password_manager: PasswordManager): 5. Post Encrypted Index to Nostr 6. Retrieve Encrypted Index from Nostr 7. Display Nostr Public Key (npub) - 8. Backup/Reveal Parent Seed + 8. Backup Parent Seed 9. Switch Fingerprint 10. Add a New Fingerprint 11. Remove an Existing Fingerprint @@ -1602,7 +1602,7 @@ class PasswordManager: Handles the backup and reveal of the parent seed. """ try: - print(colored("\n=== Backup/Reveal Parent Seed ===", 'yellow')) + print(colored("\n=== Backup Parent Seed ===", 'yellow')) print(colored("Warning: Revealing your parent seed is a highly sensitive operation.", 'red')) print(colored("Ensure you're in a secure, private environment and no one is watching your screen.", 'red')) diff --git a/src/requirements.txt b/src/requirements.txt index 5042da5..b195427 100644 --- a/src/requirements.txt +++ b/src/requirements.txt @@ -9,4 +9,5 @@ aiohttp bcrypt bip85 pytest>=7.0 +pytest-cov diff --git a/src/tests/test_encryption_files.py b/src/tests/test_encryption_files.py new file mode 100644 index 0000000..3b93e02 --- /dev/null +++ b/src/tests/test_encryption_files.py @@ -0,0 +1,41 @@ +import json +import sys +from pathlib import Path +from tempfile import TemporaryDirectory + +from cryptography.fernet import Fernet + +sys.path.append(str(Path(__file__).resolve().parents[1])) + +from password_manager.encryption import EncryptionManager + + +def test_json_save_and_load_round_trip(): + with TemporaryDirectory() as tmpdir: + key = Fernet.generate_key() + manager = EncryptionManager(key, Path(tmpdir)) + + data = {"hello": "world", "nums": [1, 2, 3]} + manager.save_json_data(data) + loaded = manager.load_json_data() + assert loaded == data + + file_path = Path(tmpdir) / "seedpass_passwords_db.json.enc" + raw = file_path.read_bytes() + assert raw != json.dumps(data, indent=4).encode("utf-8") + + +def test_encrypt_and_decrypt_file_binary_round_trip(): + with TemporaryDirectory() as tmpdir: + key = Fernet.generate_key() + manager = EncryptionManager(key, Path(tmpdir)) + + payload = b"binary secret" + rel = Path("payload.bin.enc") + manager.encrypt_and_save_file(payload, rel) + decrypted = manager.decrypt_file(rel) + assert decrypted == payload + + file_path = Path(tmpdir) / rel + raw = file_path.read_bytes() + assert raw != payload diff --git a/src/tests/test_import.py b/src/tests/test_import.py index 8c6a743..38f0eaf 100644 --- a/src/tests/test_import.py +++ b/src/tests/test_import.py @@ -2,6 +2,7 @@ try: from bip_utils import Bip39SeedGenerator + print("Bip39SeedGenerator imported successfully.") except ImportError as e: print(f"ImportError: {e}") diff --git a/src/tests/test_seed_generation.py b/src/tests/test_seed_generation.py new file mode 100644 index 0000000..d8bd2a8 --- /dev/null +++ b/src/tests/test_seed_generation.py @@ -0,0 +1,41 @@ +import sys +import importlib +from pathlib import Path +from tempfile import TemporaryDirectory +from unittest.mock import patch + +sys.path.append(str(Path(__file__).resolve().parents[1])) + + +def setup_password_manager(): + """Instantiate PasswordManager using a temporary APP_DIR without running __init__.""" + import constants + import password_manager.manager as manager_module + + # Reload modules so constants use the mocked home directory + importlib.reload(constants) + importlib.reload(manager_module) + + pm = manager_module.PasswordManager.__new__(manager_module.PasswordManager) + pm.fingerprint_manager = manager_module.FingerprintManager(constants.APP_DIR) + pm.current_fingerprint = None + pm.save_and_encrypt_seed = lambda seed, fingerprint_dir: None + return pm, constants + + +def test_generate_bip85_and_new_seed(monkeypatch): + with TemporaryDirectory() as tmpdir: + tmp_path = Path(tmpdir) + monkeypatch.setattr(Path, "home", lambda: tmp_path) + + pm, const = setup_password_manager() + + mnemonic = pm.generate_bip85_seed() + assert len(mnemonic.split()) == 12 + + with patch("password_manager.manager.confirm_action", return_value=True): + fingerprint = pm.generate_new_seed() + + expected_dir = const.APP_DIR / fingerprint + assert expected_dir.exists() + assert expected_dir.is_dir() diff --git a/src/tests/test_settings_menu.py b/src/tests/test_settings_menu.py new file mode 100644 index 0000000..8416df6 --- /dev/null +++ b/src/tests/test_settings_menu.py @@ -0,0 +1,88 @@ +import sys +import importlib +from pathlib import Path +from tempfile import TemporaryDirectory +from types import SimpleNamespace +from unittest.mock import patch + +from cryptography.fernet import Fernet + +sys.path.append(str(Path(__file__).resolve().parents[1])) + +import main +from nostr.client import DEFAULT_RELAYS +from password_manager.encryption import EncryptionManager +from password_manager.config_manager import ConfigManager +from utils.fingerprint_manager import FingerprintManager + + +def setup_pm(tmp_path, monkeypatch): + monkeypatch.setattr(Path, "home", lambda: tmp_path) + import constants + + importlib.reload(constants) + importlib.reload(main) + + fp_dir = constants.APP_DIR / "fp" + fp_dir.mkdir(parents=True) + enc_mgr = EncryptionManager(Fernet.generate_key(), fp_dir) + cfg_mgr = ConfigManager(enc_mgr, fp_dir) + fp_mgr = FingerprintManager(constants.APP_DIR) + + nostr_stub = SimpleNamespace( + relays=list(DEFAULT_RELAYS), + close_client_pool=lambda: None, + initialize_client_pool=lambda: None, + publish_json_to_nostr=lambda data: None, + key_manager=SimpleNamespace(get_npub=lambda: "npub"), + ) + + pm = SimpleNamespace( + config_manager=cfg_mgr, + fingerprint_manager=fp_mgr, + nostr_client=nostr_stub, + ) + return pm, cfg_mgr, fp_mgr + + +def test_relay_and_profile_actions(monkeypatch, capsys): + with TemporaryDirectory() as tmpdir: + tmp_path = Path(tmpdir) + pm, cfg_mgr, fp_mgr = setup_pm(tmp_path, monkeypatch) + + # Add two fingerprints for listing + fp1 = fp_mgr.add_fingerprint( + "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about" + ) + fp2 = fp_mgr.add_fingerprint( + "legal winner thank year wave sausage worth useful legal winner thank yellow" + ) + + # Add a relay + with patch("builtins.input", return_value="wss://new"), patch( + "main.handle_post_to_nostr" + ), patch("main._reload_relays"): + main.handle_add_relay(pm) + cfg = cfg_mgr.load_config(require_pin=False) + assert "wss://new" in cfg["relays"] + + # Remove the relay + idx = cfg["relays"].index("wss://new") + 1 + with patch("builtins.input", return_value=str(idx)), patch( + "main._reload_relays" + ): + main.handle_remove_relay(pm) + cfg = cfg_mgr.load_config(require_pin=False) + assert "wss://new" not in cfg["relays"] + + # Reset to defaults + with patch("main._reload_relays"): + main.handle_reset_relays(pm) + cfg = cfg_mgr.load_config(require_pin=False) + assert cfg["relays"] == list(DEFAULT_RELAYS) + + # List profiles + main.handle_list_fingerprints(pm) + out = capsys.readouterr().out + assert fp1 in out + assert fp2 in out diff --git a/src/utils/__init__.py b/src/utils/__init__.py index 927234f..1c3e453 100644 --- a/src/utils/__init__.py +++ b/src/utils/__init__.py @@ -8,17 +8,17 @@ try: from .key_derivation import derive_key_from_password, derive_key_from_parent_seed from .checksum import calculate_checksum, verify_checksum from .password_prompt import prompt_for_password - + logging.info("Modules imported successfully.") except Exception as e: logging.error(f"Failed to import one or more modules: {e}") logging.error(traceback.format_exc()) # Log full traceback __all__ = [ - 'derive_key_from_password', - 'derive_key_from_parent_seed', - 'calculate_checksum', - 'verify_checksum', - 'lock_file', - 'prompt_for_password' + "derive_key_from_password", + "derive_key_from_parent_seed", + "calculate_checksum", + "verify_checksum", + "lock_file", + "prompt_for_password", ] diff --git a/src/utils/checksum.py b/src/utils/checksum.py index 9266b3d..37f2e45 100644 --- a/src/utils/checksum.py +++ b/src/utils/checksum.py @@ -19,14 +19,12 @@ from typing import Optional from termcolor import colored -from constants import ( - APP_DIR, - SCRIPT_CHECKSUM_FILE -) +from constants import APP_DIR, SCRIPT_CHECKSUM_FILE # Instantiate the logger logger = logging.getLogger(__name__) + def calculate_checksum(file_path: str) -> Optional[str]: """ Calculates the SHA-256 checksum of the given file. @@ -39,7 +37,7 @@ def calculate_checksum(file_path: str) -> Optional[str]: """ hasher = hashlib.sha256() try: - with open(file_path, 'rb') as f: + with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hasher.update(chunk) checksum = hasher.hexdigest() @@ -47,12 +45,20 @@ def calculate_checksum(file_path: str) -> Optional[str]: return checksum except FileNotFoundError: logging.error(f"File '{file_path}' not found for checksum calculation.") - print(colored(f"Error: File '{file_path}' not found for checksum calculation.", 'red')) + print( + colored( + f"Error: File '{file_path}' not found for checksum calculation.", "red" + ) + ) return None except Exception as e: logging.error(f"Error calculating checksum for '{file_path}': {e}") logging.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: Failed to calculate checksum for '{file_path}': {e}", 'red')) + print( + colored( + f"Error: Failed to calculate checksum for '{file_path}': {e}", "red" + ) + ) return None @@ -68,7 +74,7 @@ def verify_checksum(current_checksum: str, checksum_file_path: str) -> bool: bool: True if checksums match, False otherwise. """ try: - with open(checksum_file_path, 'r') as f: + with open(checksum_file_path, "r") as f: stored_checksum = f.read().strip() if current_checksum == stored_checksum: logging.debug(f"Checksum verification passed for '{checksum_file_path}'.") @@ -78,12 +84,17 @@ def verify_checksum(current_checksum: str, checksum_file_path: str) -> bool: return False except FileNotFoundError: logging.error(f"Checksum file '{checksum_file_path}' not found.") - print(colored(f"Error: Checksum file '{checksum_file_path}' not found.", 'red')) + print(colored(f"Error: Checksum file '{checksum_file_path}' not found.", "red")) return False except Exception as e: logging.error(f"Error reading checksum file '{checksum_file_path}': {e}") logging.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: Failed to read checksum file '{checksum_file_path}': {e}", 'red')) + print( + colored( + f"Error: Failed to read checksum file '{checksum_file_path}': {e}", + "red", + ) + ) return False @@ -100,16 +111,21 @@ def update_checksum(content: str, checksum_file_path: str) -> bool: """ try: hasher = hashlib.sha256() - hasher.update(content.encode('utf-8')) + hasher.update(content.encode("utf-8")) new_checksum = hasher.hexdigest() - with open(checksum_file_path, 'w') as f: + with open(checksum_file_path, "w") as f: f.write(new_checksum) logging.debug(f"Updated checksum for '{checksum_file_path}' to: {new_checksum}") return True except Exception as e: logging.error(f"Failed to update checksum for '{checksum_file_path}': {e}") logging.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: Failed to update checksum for '{checksum_file_path}': {e}", 'red')) + print( + colored( + f"Error: Failed to update checksum for '{checksum_file_path}': {e}", + "red", + ) + ) return False @@ -129,11 +145,11 @@ def verify_and_update_checksum(file_path: str, checksum_file_path: str) -> bool: return False if verify_checksum(current_checksum, checksum_file_path): - print(colored(f"Checksum verification passed for '{file_path}'.", 'green')) + print(colored(f"Checksum verification passed for '{file_path}'.", "green")) logging.info(f"Checksum verification passed for '{file_path}'.") return True else: - print(colored(f"Checksum verification failed for '{file_path}'.", 'red')) + print(colored(f"Checksum verification failed for '{file_path}'.", "red")) logging.warning(f"Checksum verification failed for '{file_path}'.") return False @@ -154,13 +170,20 @@ def initialize_checksum(file_path: str, checksum_file_path: str) -> bool: return False try: - with open(checksum_file_path, 'w') as f: + with open(checksum_file_path, "w") as f: f.write(checksum) - logging.debug(f"Initialized checksum file '{checksum_file_path}' with checksum: {checksum}") - print(colored(f"Initialized checksum for '{file_path}'.", 'green')) + logging.debug( + f"Initialized checksum file '{checksum_file_path}' with checksum: {checksum}" + ) + print(colored(f"Initialized checksum for '{file_path}'.", "green")) return True except Exception as e: logging.error(f"Failed to initialize checksum file '{checksum_file_path}': {e}") logging.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: Failed to initialize checksum file '{checksum_file_path}': {e}", 'red')) + print( + colored( + f"Error: Failed to initialize checksum file '{checksum_file_path}': {e}", + "red", + ) + ) return False diff --git a/src/utils/file_lock.py b/src/utils/file_lock.py index 118eec7..431eea3 100644 --- a/src/utils/file_lock.py +++ b/src/utils/file_lock.py @@ -26,6 +26,7 @@ import traceback # Instantiate the logger logger = logging.getLogger(__name__) + @contextmanager def lock_file(file_path: Path, lock_type: int) -> Generator[None, None, None]: """ @@ -44,14 +45,16 @@ def lock_file(file_path: Path, lock_type: int) -> Generator[None, None, None]: SystemExit: Exits the program if the lock cannot be acquired. """ if lock_type not in (fcntl.LOCK_EX, fcntl.LOCK_SH): - logging.error(f"Invalid lock type: {lock_type}. Use fcntl.LOCK_EX or fcntl.LOCK_SH.") - print(colored("Error: Invalid lock type provided.", 'red')) + logging.error( + f"Invalid lock type: {lock_type}. Use fcntl.LOCK_EX or fcntl.LOCK_SH." + ) + print(colored("Error: Invalid lock type provided.", "red")) sys.exit(1) file = None try: # Determine the mode based on whether the file exists - mode = 'rb+' if file_path.exists() else 'wb' + mode = "rb+" if file_path.exists() else "wb" # Open the file file = open(file_path, mode) @@ -67,7 +70,12 @@ def lock_file(file_path: Path, lock_type: int) -> Generator[None, None, None]: lock_type_str = "exclusive" if lock_type == fcntl.LOCK_EX else "shared" logging.error(f"Failed to acquire {lock_type_str} lock on '{file_path}': {e}") logging.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: Failed to acquire {lock_type_str} lock on '{file_path}': {e}", 'red')) + print( + colored( + f"Error: Failed to acquire {lock_type_str} lock on '{file_path}': {e}", + "red", + ) + ) sys.exit(1) finally: @@ -78,9 +86,16 @@ def lock_file(file_path: Path, lock_type: int) -> Generator[None, None, None]: logging.debug(f"Lock released on '{file_path}'.") except Exception as e: lock_type_str = "exclusive" if lock_type == fcntl.LOCK_EX else "shared" - logging.warning(f"Failed to release {lock_type_str} lock on '{file_path}': {e}") + logging.warning( + f"Failed to release {lock_type_str} lock on '{file_path}': {e}" + ) logging.error(traceback.format_exc()) # Log full traceback - print(colored(f"Warning: Failed to release {lock_type_str} lock on '{file_path}': {e}", 'yellow')) + print( + colored( + f"Warning: Failed to release {lock_type_str} lock on '{file_path}': {e}", + "yellow", + ) + ) finally: # Close the file try: @@ -89,7 +104,12 @@ def lock_file(file_path: Path, lock_type: int) -> Generator[None, None, None]: except Exception as e: logging.warning(f"Failed to close file '{file_path}': {e}") logging.error(traceback.format_exc()) # Log full traceback - print(colored(f"Warning: Failed to close file '{file_path}': {e}", 'yellow')) + print( + colored( + f"Warning: Failed to close file '{file_path}': {e}", + "yellow", + ) + ) @contextmanager diff --git a/src/utils/fingerprint.py b/src/utils/fingerprint.py index e831756..6e7046d 100644 --- a/src/utils/fingerprint.py +++ b/src/utils/fingerprint.py @@ -16,6 +16,7 @@ from typing import Optional # Instantiate the logger logger = logging.getLogger(__name__) + def generate_fingerprint(seed_phrase: str, length: int = 16) -> Optional[str]: """ Generates a unique fingerprint from the provided seed phrase using SHA-256. @@ -33,7 +34,7 @@ def generate_fingerprint(seed_phrase: str, length: int = 16) -> Optional[str]: logger.debug(f"Normalized seed: {normalized_seed}") # Compute SHA-256 hash - sha256_hash = hashlib.sha256(normalized_seed.encode('utf-8')).hexdigest() + sha256_hash = hashlib.sha256(normalized_seed.encode("utf-8")).hexdigest() logger.debug(f"SHA-256 Hash: {sha256_hash}") # Truncate to desired length diff --git a/src/utils/fingerprint_manager.py b/src/utils/fingerprint_manager.py index 3b9597f..84c1929 100644 --- a/src/utils/fingerprint_manager.py +++ b/src/utils/fingerprint_manager.py @@ -14,6 +14,7 @@ from utils.fingerprint import generate_fingerprint # Instantiate the logger logger = logging.getLogger(__name__) + class FingerprintManager: """ FingerprintManager Class @@ -31,7 +32,7 @@ class FingerprintManager: app_dir (Path): The root application directory (e.g., ~/.seedpass). """ self.app_dir = app_dir - self.fingerprints_file = self.app_dir / 'fingerprints.json' + self.fingerprints_file = self.app_dir / "fingerprints.json" self._ensure_app_directory() self.fingerprints = self._load_fingerprints() self.current_fingerprint: Optional[str] = None @@ -43,7 +44,7 @@ class FingerprintManager: Returns: Optional[Path]: The Path object of the current fingerprint directory or None. """ - if hasattr(self, 'current_fingerprint') and self.current_fingerprint: + if hasattr(self, "current_fingerprint") and self.current_fingerprint: return self.get_fingerprint_directory(self.current_fingerprint) else: logger.error("No current fingerprint is set.") @@ -57,7 +58,9 @@ class FingerprintManager: self.app_dir.mkdir(parents=True, exist_ok=True) logger.debug(f"Application directory ensured at {self.app_dir}") except Exception as e: - logger.error(f"Failed to create application directory at {self.app_dir}: {e}") + logger.error( + f"Failed to create application directory at {self.app_dir}: {e}" + ) logger.error(traceback.format_exc()) raise @@ -70,13 +73,15 @@ class FingerprintManager: """ try: if self.fingerprints_file.exists(): - with open(self.fingerprints_file, 'r') as f: + with open(self.fingerprints_file, "r") as f: data = json.load(f) - fingerprints = data.get('fingerprints', []) + fingerprints = data.get("fingerprints", []) logger.debug(f"Loaded fingerprints: {fingerprints}") return fingerprints else: - logger.debug("fingerprints.json not found. Initializing empty fingerprint list.") + logger.debug( + "fingerprints.json not found. Initializing empty fingerprint list." + ) return [] except Exception as e: logger.error(f"Failed to load fingerprints: {e}") @@ -88,8 +93,8 @@ class FingerprintManager: Saves the current list of fingerprints to the fingerprints.json file. """ try: - with open(self.fingerprints_file, 'w') as f: - json.dump({'fingerprints': self.fingerprints}, f, indent=4) + with open(self.fingerprints_file, "w") as f: + json.dump({"fingerprints": self.fingerprints}, f, indent=4) logger.debug(f"Fingerprints saved: {self.fingerprints}") except Exception as e: logger.error(f"Failed to save fingerprints: {e}") @@ -140,7 +145,7 @@ class FingerprintManager: # Remove fingerprint directory fingerprint_dir = self.app_dir / fingerprint if fingerprint_dir.exists() and fingerprint_dir.is_dir(): - for child in fingerprint_dir.glob('*'): + for child in fingerprint_dir.glob("*"): if child.is_file(): child.unlink() elif child.is_dir(): diff --git a/src/utils/password_prompt.py b/src/utils/password_prompt.py index 2a5be96..0b42899 100644 --- a/src/utils/password_prompt.py +++ b/src/utils/password_prompt.py @@ -29,6 +29,7 @@ colorama_init() # Instantiate the logger logger = logging.getLogger(__name__) + def prompt_new_password() -> str: """ Prompts the user to enter and confirm a new password for encrypting the parent seed. @@ -51,39 +52,50 @@ def prompt_new_password() -> str: confirm_password = getpass.getpass(prompt="Confirm your password: ").strip() if not password: - print(colored("Error: Password cannot be empty. Please try again.", 'red')) + print( + colored("Error: Password cannot be empty. Please try again.", "red") + ) logging.warning("User attempted to enter an empty password.") attempts += 1 continue if len(password) < MIN_PASSWORD_LENGTH: - print(colored(f"Error: Password must be at least {MIN_PASSWORD_LENGTH} characters long.", 'red')) - logging.warning(f"User entered a password shorter than {MIN_PASSWORD_LENGTH} characters.") + print( + colored( + f"Error: Password must be at least {MIN_PASSWORD_LENGTH} characters long.", + "red", + ) + ) + logging.warning( + f"User entered a password shorter than {MIN_PASSWORD_LENGTH} characters." + ) attempts += 1 continue if password != confirm_password: - print(colored("Error: Passwords do not match. Please try again.", 'red')) + print( + colored("Error: Passwords do not match. Please try again.", "red") + ) logging.warning("User entered mismatching passwords.") attempts += 1 continue # Normalize the password to NFKD form - normalized_password = unicodedata.normalize('NFKD', password) + normalized_password = unicodedata.normalize("NFKD", password) logging.debug("User entered a valid and confirmed password.") return normalized_password except KeyboardInterrupt: - print(colored("\nOperation cancelled by user.", 'yellow')) + print(colored("\nOperation cancelled by user.", "yellow")) logging.info("Password prompt interrupted by user.") sys.exit(0) except Exception as e: logging.error(f"Unexpected error during password prompt: {e}") logging.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: {e}", 'red')) + print(colored(f"Error: {e}", "red")) attempts += 1 - print(colored("Maximum password attempts exceeded. Exiting.", 'red')) + print(colored("Maximum password attempts exceeded. Exiting.", "red")) logging.error("User failed to provide a valid password after multiple attempts.") sys.exit(1) @@ -107,27 +119,29 @@ def prompt_existing_password(prompt_message: str = "Enter your password: ") -> s password = getpass.getpass(prompt=prompt_message).strip() if not password: - print(colored("Error: Password cannot be empty.", 'red')) + print(colored("Error: Password cannot be empty.", "red")) logging.warning("User attempted to enter an empty password.") sys.exit(1) # Normalize the password to NFKD form - normalized_password = unicodedata.normalize('NFKD', password) + normalized_password = unicodedata.normalize("NFKD", password) logging.debug("User entered an existing password for decryption.") return normalized_password except KeyboardInterrupt: - print(colored("\nOperation cancelled by user.", 'yellow')) + print(colored("\nOperation cancelled by user.", "yellow")) logging.info("Existing password prompt interrupted by user.") sys.exit(0) except Exception as e: logging.error(f"Unexpected error during existing password prompt: {e}") logging.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: {e}", 'red')) + print(colored(f"Error: {e}", "red")) sys.exit(1) -def confirm_action(prompt_message: str = "Are you sure you want to proceed? (Y/N): ") -> bool: +def confirm_action( + prompt_message: str = "Are you sure you want to proceed? (Y/N): ", +) -> bool: """ Prompts the user to confirm an action, typically used before performing critical operations. @@ -143,24 +157,24 @@ def confirm_action(prompt_message: str = "Are you sure you want to proceed? (Y/N """ try: while True: - response = input(colored(prompt_message, 'cyan')).strip().lower() - if response in ['y', 'yes']: + response = input(colored(prompt_message, "cyan")).strip().lower() + if response in ["y", "yes"]: logging.debug("User confirmed the action.") return True - elif response in ['n', 'no']: + elif response in ["n", "no"]: logging.debug("User declined the action.") return False else: - print(colored("Please respond with 'Y' or 'N'.", 'yellow')) + print(colored("Please respond with 'Y' or 'N'.", "yellow")) except KeyboardInterrupt: - print(colored("\nOperation cancelled by user.", 'yellow')) + print(colored("\nOperation cancelled by user.", "yellow")) logging.info("Action confirmation interrupted by user.") sys.exit(0) except Exception as e: logging.error(f"Unexpected error during action confirmation: {e}") logging.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: {e}", 'red')) + print(colored(f"Error: {e}", "red")) sys.exit(1) diff --git a/tests/test_entries_empty.py b/tests/test_entries_empty.py new file mode 100644 index 0000000..d10556a --- /dev/null +++ b/tests/test_entries_empty.py @@ -0,0 +1,19 @@ +import sys +from pathlib import Path +from tempfile import TemporaryDirectory +from cryptography.fernet import Fernet + +sys.path.append(str(Path(__file__).resolve().parents[1])) + +from password_manager.encryption import EncryptionManager +from password_manager.entry_management import EntryManager + + +def test_list_entries_empty(): + with TemporaryDirectory() as tmpdir: + key = Fernet.generate_key() + enc_mgr = EncryptionManager(key, Path(tmpdir)) + entry_mgr = EntryManager(enc_mgr, Path(tmpdir)) + + entries = entry_mgr.list_entries() + assert entries == [] diff --git a/tests/test_entry_add.py b/tests/test_entry_add.py new file mode 100644 index 0000000..27f4fb5 --- /dev/null +++ b/tests/test_entry_add.py @@ -0,0 +1,31 @@ +import sys +from pathlib import Path +from tempfile import TemporaryDirectory +from cryptography.fernet import Fernet + +sys.path.append(str(Path(__file__).resolve().parents[1])) + +from password_manager.encryption import EncryptionManager +from password_manager.entry_management import EntryManager + + +def test_add_and_retrieve_entry(): + with TemporaryDirectory() as tmpdir: + key = Fernet.generate_key() + enc_mgr = EncryptionManager(key, Path(tmpdir)) + entry_mgr = EntryManager(enc_mgr, Path(tmpdir)) + + index = entry_mgr.add_entry("example.com", 12, "user") + entry = entry_mgr.retrieve_entry(index) + + assert entry == { + "website": "example.com", + "length": 12, + "username": "user", + "url": "", + "blacklisted": False, + } + + data = enc_mgr.load_json_data(entry_mgr.index_file) + assert str(index) in data.get("passwords", {}) + assert data["passwords"][str(index)] == entry diff --git a/tests/test_nostr_backup.py b/tests/test_nostr_backup.py new file mode 100644 index 0000000..d1926ea --- /dev/null +++ b/tests/test_nostr_backup.py @@ -0,0 +1,39 @@ +import sys +from pathlib import Path +from tempfile import TemporaryDirectory +from unittest.mock import patch +from cryptography.fernet import Fernet + +sys.path.append(str(Path(__file__).resolve().parents[1])) + +from password_manager.encryption import EncryptionManager +from password_manager.entry_management import EntryManager +from nostr.client import NostrClient + + +def test_backup_and_publish_to_nostr(): + with TemporaryDirectory() as tmpdir: + tmp_path = Path(tmpdir) + key = Fernet.generate_key() + enc_mgr = EncryptionManager(key, tmp_path) + entry_mgr = EntryManager(enc_mgr, tmp_path) + + # create an index by adding an entry + entry_mgr.add_entry("example.com", 12) + encrypted_index = entry_mgr.get_encrypted_index() + assert encrypted_index is not None + + with patch( + "nostr.client.NostrClient.publish_json_to_nostr" + ) as mock_publish, patch("nostr.client.ClientPool"), patch( + "nostr.client.KeyManager" + ), patch.object( + NostrClient, "initialize_client_pool" + ), patch.object( + enc_mgr, "decrypt_parent_seed", return_value="seed" + ): + nostr_client = NostrClient(enc_mgr, "fp") + entry_mgr.backup_index_file() + nostr_client.publish_json_to_nostr(encrypted_index) + + mock_publish.assert_called_with(encrypted_index) diff --git a/tests/test_profiles.py b/tests/test_profiles.py new file mode 100644 index 0000000..634f7d8 --- /dev/null +++ b/tests/test_profiles.py @@ -0,0 +1,52 @@ +import sys +from pathlib import Path +from tempfile import TemporaryDirectory + +sys.path.append(str(Path(__file__).resolve().parents[1])) + +from utils.fingerprint_manager import FingerprintManager +from password_manager.manager import PasswordManager + + +VALID_SEED = "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about" + + +def test_add_and_switch_fingerprint(monkeypatch): + with TemporaryDirectory() as tmpdir: + app_dir = Path(tmpdir) + fm = FingerprintManager(app_dir) + + fingerprint = fm.add_fingerprint(VALID_SEED) + assert fingerprint in fm.list_fingerprints() + expected_dir = app_dir / fingerprint + assert expected_dir.exists() + + pm = PasswordManager.__new__(PasswordManager) + pm.fingerprint_manager = fm + pm.encryption_manager = object() + pm.current_fingerprint = None + + monkeypatch.setattr("builtins.input", lambda *_args, **_kwargs: "1") + monkeypatch.setattr( + "password_manager.manager.prompt_existing_password", + lambda *_a, **_k: "pass", + ) + monkeypatch.setattr( + PasswordManager, + "setup_encryption_manager", + lambda self, d, password=None: None, + ) + monkeypatch.setattr(PasswordManager, "load_parent_seed", lambda self, d: None) + monkeypatch.setattr(PasswordManager, "initialize_bip85", lambda self: None) + monkeypatch.setattr(PasswordManager, "initialize_managers", lambda self: None) + monkeypatch.setattr( + PasswordManager, "sync_index_from_nostr_if_missing", lambda self: None + ) + monkeypatch.setattr( + "password_manager.manager.NostrClient", lambda *a, **kw: object() + ) + + assert pm.handle_switch_fingerprint() + assert pm.current_fingerprint == fingerprint + assert fm.current_fingerprint == fingerprint + assert pm.fingerprint_dir == expected_dir diff --git a/tests/test_seed_import.py b/tests/test_seed_import.py new file mode 100644 index 0000000..b88263d --- /dev/null +++ b/tests/test_seed_import.py @@ -0,0 +1,24 @@ +import sys +from pathlib import Path +from tempfile import TemporaryDirectory +from cryptography.fernet import Fernet +from mnemonic import Mnemonic + +sys.path.append(str(Path(__file__).resolve().parents[1])) + +from password_manager.encryption import EncryptionManager +from password_manager.manager import PasswordManager + + +def test_seed_encryption_round_trip(): + with TemporaryDirectory() as tmpdir: + key = Fernet.generate_key() + enc_mgr = EncryptionManager(key, Path(tmpdir)) + + seed = Mnemonic("english").generate(strength=128) + enc_mgr.encrypt_parent_seed(seed) + decrypted = enc_mgr.decrypt_parent_seed() + + assert decrypted == seed + pm = PasswordManager.__new__(PasswordManager) + assert pm.validate_bip85_seed(seed)