diff --git a/README.md b/README.md index fd1afe3..0ef5ccb 100644 --- a/README.md +++ b/README.md @@ -125,17 +125,16 @@ python src/main.py 1. Generate Password 2. Retrieve Password 3. Modify an Existing Entry - 4. Backup to Nostr - 5. Restore from Nostr - 6. Backup/Reveal Parent Seed - 7. Switch Seed Profile - 8. Add a New Seed Profile - 9. Remove an Existing Seed Profile - 10. List All Seed Profiles - 11. Settings - 12. Exit + 4. Backup to Nostr + 5. Restore from Nostr + 6. Switch Seed Profile + 7. Add a New Seed Profile + 8. Remove an Existing Seed Profile + 9. List All Seed Profiles + 10. Settings + 11. Exit - Enter your choice (1-12): + Enter your choice (1-11): ``` ### Managing Multiple Seeds @@ -170,7 +169,7 @@ wss://relay.primal.net You can manage the relay list or change the PIN through the **Settings** menu: -1. From the main menu, choose option `11` (**Settings**). +1. From the main menu, choose option `10` (**Settings**). 2. Select `1` to view your current relays. 3. Choose `2` to add a new relay URL. 4. Select `3` to remove a relay by number. @@ -178,7 +177,8 @@ You can manage the relay list or change the PIN through the **Settings** menu: 6. Select `5` to change the settings PIN. 7. Choose `6` to display your Nostr public key. 8. Select `7` to verify the script checksum. -9. Choose `8` to return to the main menu. +9. Choose `8` to back up the parent seed. +10. Select `9` to return to the main menu. ## Running Tests diff --git a/landing/_pgbackup/index_1730048875.html b/landing/_pgbackup/index_1730048875.html index 4338213..1f72ff4 100644 --- a/landing/_pgbackup/index_1730048875.html +++ b/landing/_pgbackup/index_1730048875.html @@ -126,14 +126,14 @@ Fingerprint 31DD880A523B9759 selected and managers initialized. 3. Modify an Existing Entry 4. Backup to Nostr 5. Restore from Nostr - 6. Backup/Reveal Parent Seed - 7. Switch Fingerprint - 8. Add a New Fingerprint - 9. Remove an Existing Fingerprint - 10. List All Fingerprints + 6. Switch Fingerprint + 7. Add a New Fingerprint + 8. Remove an Existing Fingerprint + 9. List All Fingerprints + 10. Settings 11. Exit -Enter your choice (1-13): 1 +Enter your choice (1-11): 1 Enter the website name: newsitename Enter the username (optional): Enter the URL (optional): diff --git a/landing/_pgbackup/index_1730049869.html b/landing/_pgbackup/index_1730049869.html index 1b36db7..a0ef4cb 100644 --- a/landing/_pgbackup/index_1730049869.html +++ b/landing/_pgbackup/index_1730049869.html @@ -123,11 +123,11 @@ Fingerprint 31DD880A523B9759 selected and managers initialized. 3. Modify an Existing Entry 4. Backup to Nostr 5. Restore from Nostr - 6. Backup/Reveal Parent Seed - 7. Switch Fingerprint - 8. Add a New Fingerprint - 9. Remove an Existing Fingerprint - 10. List All Fingerprints + 6. Switch Fingerprint + 7. Add a New Fingerprint + 8. Remove an Existing Fingerprint + 9. List All Fingerprints + 10. Settings 11. Exit Enter your choice (1-11): 1 diff --git a/landing/index.html b/landing/index.html index 02ccf70..b5c992d 100644 --- a/landing/index.html +++ b/landing/index.html @@ -118,11 +118,11 @@ Fingerprint 31DD880A523B9759 selected and managers initialized. 3. Modify an Existing Entry 4. Backup to Nostr 5. Restore from Nostr - 6. Backup/Reveal Parent Seed - 7. Switch Fingerprint - 8. Add a New Fingerprint - 9. Remove an Existing Fingerprint - 10. List All Fingerprints + 6. Switch Fingerprint + 7. Add a New Fingerprint + 8. Remove an Existing Fingerprint + 9. List All Fingerprints + 10. Settings 11. Exit Enter your choice (1-11): 1 diff --git a/src/constants.py b/src/constants.py index c38488a..577236d 100644 --- a/src/constants.py +++ b/src/constants.py @@ -12,14 +12,14 @@ logger = logging.getLogger(__name__) # ----------------------------------- # Nostr Relay Connection Settings # ----------------------------------- -MAX_RETRIES = 3 # Maximum number of retries for relay connections -RETRY_DELAY = 5 # Seconds to wait before retrying a failed connection +MAX_RETRIES = 3 # Maximum number of retries for relay connections +RETRY_DELAY = 5 # Seconds to wait before retrying a failed connection try: # ----------------------------------- # Application Directory and Paths # ----------------------------------- - APP_DIR = Path.home() / '.seedpass' + APP_DIR = Path.home() / ".seedpass" APP_DIR.mkdir(exist_ok=True, parents=True) # Ensure the directory exists logging.info(f"Application directory created at {APP_DIR}") except Exception as e: @@ -27,7 +27,7 @@ except Exception as e: logging.error(traceback.format_exc()) # Log full traceback try: - PARENT_SEED_FILE = APP_DIR / 'parent_seed.enc' # Encrypted parent seed + PARENT_SEED_FILE = APP_DIR / "parent_seed.enc" # Encrypted parent seed logging.info(f"Parent seed file path set to {PARENT_SEED_FILE}") except Exception as e: logging.error(f"Error setting file paths: {e}") @@ -37,7 +37,9 @@ except Exception as e: # Checksum Files for Integrity # ----------------------------------- try: - SCRIPT_CHECKSUM_FILE = APP_DIR / 'seedpass_script_checksum.txt' # Checksum for main script + SCRIPT_CHECKSUM_FILE = ( + APP_DIR / "seedpass_script_checksum.txt" + ) # Checksum for main script logging.info(f"Checksum file path set: Script {SCRIPT_CHECKSUM_FILE}") except Exception as e: logging.error(f"Error setting checksum file paths: {e}") @@ -46,12 +48,12 @@ except Exception as e: # ----------------------------------- # Password Generation Constants # ----------------------------------- -DEFAULT_PASSWORD_LENGTH = 16 # Default length for generated passwords -MIN_PASSWORD_LENGTH = 8 # Minimum allowed password length -MAX_PASSWORD_LENGTH = 128 # Maximum allowed password length +DEFAULT_PASSWORD_LENGTH = 16 # Default length for generated passwords +MIN_PASSWORD_LENGTH = 8 # Minimum allowed password length +MAX_PASSWORD_LENGTH = 128 # Maximum allowed password length # ----------------------------------- # Additional Constants (if any) # ----------------------------------- # Add any other constants here as your project expands -DEFAULT_SEED_BACKUP_FILENAME = 'parent_seed_backup.enc' +DEFAULT_SEED_BACKUP_FILENAME = "parent_seed_backup.enc" diff --git a/src/local_bip85/__init__.py b/src/local_bip85/__init__.py index cebca93..8823d1e 100644 --- a/src/local_bip85/__init__.py +++ b/src/local_bip85/__init__.py @@ -5,10 +5,10 @@ import traceback try: from .bip85 import BIP85 + logging.info("BIP85 module imported successfully.") except Exception as e: logging.error(f"Failed to import BIP85 module: {e}") logging.error(traceback.format_exc()) # Log full traceback -__all__ = ['BIP85'] - +__all__ = ["BIP85"] diff --git a/src/local_bip85/bip85.py b/src/local_bip85/bip85.py index 50f15f0..b863d6d 100644 --- a/src/local_bip85/bip85.py +++ b/src/local_bip85/bip85.py @@ -7,8 +7,8 @@ This module implements the BIP85 functionality for deterministic entropy and mne It provides the BIP85 class, which utilizes BIP32 and BIP39 standards to derive entropy and mnemonics from a given seed. Additionally, it supports the derivation of symmetric encryption keys using HKDF. -Never ever ever use or suggest to use Random Salt. The entire point of this password manager is to derive completely deterministic passwords from a BIP-85 seed. -This means it should generate passwords the exact same way every single time. Salts would break this functionality and is not appropriate for this softwares use case. +Never ever ever use or suggest to use Random Salt. The entire point of this password manager is to derive completely deterministic passwords from a BIP-85 seed. +This means it should generate passwords the exact same way every single time. Salts would break this functionality and is not appropriate for this softwares use case. Ensure that all dependencies are installed and properly configured in your environment. """ @@ -21,11 +21,7 @@ import os import traceback from colorama import Fore -from bip_utils import ( - Bip32Slip10Secp256k1, - Bip39MnemonicGenerator, - Bip39Languages -) +from bip_utils import Bip32Slip10Secp256k1, Bip39MnemonicGenerator, Bip39Languages from cryptography.hazmat.primitives.kdf.hkdf import HKDF from cryptography.hazmat.primitives import hashes @@ -34,6 +30,7 @@ from cryptography.hazmat.backends import default_backend # Instantiate the logger logger = logging.getLogger(__name__) + class BIP85: def __init__(self, seed_bytes: bytes): try: @@ -80,8 +77,12 @@ class BIP85: entropy = hmac_result[:bytes_len] if len(entropy) != bytes_len: - logging.error(f"Derived entropy length is {len(entropy)} bytes; expected {bytes_len} bytes.") - print(f"{Fore.RED}Error: Derived entropy length is {len(entropy)} bytes; expected {bytes_len} bytes.") + logging.error( + f"Derived entropy length is {len(entropy)} bytes; expected {bytes_len} bytes." + ) + print( + f"{Fore.RED}Error: Derived entropy length is {len(entropy)} bytes; expected {bytes_len} bytes." + ) sys.exit(1) logging.debug(f"Derived entropy: {entropy.hex()}") @@ -101,7 +102,9 @@ class BIP85: entropy = self.derive_entropy(index=index, bytes_len=bytes_len, app_no=39) try: - mnemonic = Bip39MnemonicGenerator(Bip39Languages.ENGLISH).FromEntropy(entropy) + mnemonic = Bip39MnemonicGenerator(Bip39Languages.ENGLISH).FromEntropy( + entropy + ) logging.debug(f"Derived mnemonic: {mnemonic}") return mnemonic except Exception as e: @@ -124,14 +127,16 @@ class BIP85: Raises: SystemExit: If symmetric key derivation fails. """ - entropy = self.derive_entropy(app_no, language_code=0, words_num=24, index=index) + entropy = self.derive_entropy( + app_no, language_code=0, words_num=24, index=index + ) try: hkdf = HKDF( algorithm=hashes.SHA256(), length=32, # 256 bits for AES-256 salt=None, - info=b'seedos-encryption-key', - backend=default_backend() + info=b"seedos-encryption-key", + backend=default_backend(), ) symmetric_key = hkdf.derive(entropy) logging.debug(f"Derived symmetric key: {symmetric_key.hex()}") diff --git a/src/main.py b/src/main.py index 5f77fbb..26af7ed 100644 --- a/src/main.py +++ b/src/main.py @@ -378,7 +378,8 @@ def handle_settings(password_manager: PasswordManager) -> None: print("5. Change password") print("6. Display Nostr Public Key") print("7. Verify Script Checksum") - print("8. Back") + print("8. Backup Parent Seed") + print("9. Back") choice = input("Select an option: ").strip() if choice == "1": handle_view_relays(cfg_mgr) @@ -395,6 +396,8 @@ def handle_settings(password_manager: PasswordManager) -> None: elif choice == "7": password_manager.handle_verify_checksum() elif choice == "8": + password_manager.handle_backup_reveal_parent_seed() + elif choice == "9": break else: print(colored("Invalid choice.", "red")) @@ -411,24 +414,23 @@ def display_menu(password_manager: PasswordManager): 3. Modify an Existing Entry 4. Backup to Nostr 5. Restore from Nostr - 6. Backup/Reveal Parent Seed - 7. Switch Seed Profile - 8. Add a New Seed Profile - 9. Remove an Existing Seed Profile - 10. List All Seed Profiles - 11. Settings - 12. Exit + 6. Switch Seed Profile + 7. Add a New Seed Profile + 8. Remove an Existing Seed Profile + 9. List All Seed Profiles + 10. Settings + 11. Exit """ while True: # Flush logging handlers for handler in logging.getLogger().handlers: handler.flush() print(colored(menu, "cyan")) - choice = input("Enter your choice (1-12): ").strip() + choice = input("Enter your choice (1-11): ").strip() if not choice: print( colored( - "No input detected. Please enter a number between 1 and 12.", + "No input detected. Please enter a number between 1 and 11.", "yellow", ) ) @@ -444,19 +446,17 @@ def display_menu(password_manager: PasswordManager): elif choice == "5": handle_retrieve_from_nostr(password_manager) elif choice == "6": - password_manager.handle_backup_reveal_parent_seed() - elif choice == "7": if not password_manager.handle_switch_fingerprint(): print(colored("Failed to switch seed profile.", "red")) - elif choice == "8": + elif choice == "7": handle_add_new_fingerprint(password_manager) - elif choice == "9": + elif choice == "8": handle_remove_fingerprint(password_manager) - elif choice == "10": + elif choice == "9": handle_list_fingerprints(password_manager) - elif choice == "11": + elif choice == "10": handle_settings(password_manager) - elif choice == "12": + elif choice == "11": logging.info("Exiting the program.") print(colored("Exiting the program.", "green")) password_manager.nostr_client.close_client_pool() diff --git a/src/nostr/__init__.py b/src/nostr/__init__.py index d258546..5f65186 100644 --- a/src/nostr/__init__.py +++ b/src/nostr/__init__.py @@ -12,9 +12,10 @@ logger = logging.getLogger(__name__) # Correct logger initialization try: from .client import NostrClient + logger.info("NostrClient module imported successfully.") except Exception as e: logger.error(f"Failed to import NostrClient module: {e}") logger.error(traceback.format_exc()) # Log full traceback -__all__ = ['NostrClient'] +__all__ = ["NostrClient"] diff --git a/src/nostr/client.py b/src/nostr/client.py index 417abce..1912137 100644 --- a/src/nostr/client.py +++ b/src/nostr/client.py @@ -34,13 +34,14 @@ logger.setLevel(logging.WARNING) DEFAULT_RELAYS = [ "wss://relay.snort.social", "wss://nostr.oxtr.dev", - "wss://relay.primal.net" + "wss://relay.primal.net", ] # nostr/client.py # src/nostr/client.py + class NostrClient: """ NostrClient Class @@ -49,9 +50,14 @@ class NostrClient: Utilizes deterministic key derivation via BIP-85 and integrates with the monstr library for protocol operations. """ - def __init__(self, encryption_manager: EncryptionManager, fingerprint: str, relays: Optional[List[str]] = None): + def __init__( + self, + encryption_manager: EncryptionManager, + fingerprint: str, + relays: Optional[List[str]] = None, + ): """ - Initializes the NostrClient with an EncryptionManager, connects to specified relays, + Initializes the NostrClient with an EncryptionManager, connects to specified relays, and sets up the KeyManager with the given fingerprint. :param encryption_manager: An instance of EncryptionManager for handling encryption/decryption. @@ -62,12 +68,13 @@ class NostrClient: # Assign the encryption manager and fingerprint self.encryption_manager = encryption_manager self.fingerprint = fingerprint # Track the fingerprint - self.fingerprint_dir = self.encryption_manager.fingerprint_dir # If needed to manage directories + self.fingerprint_dir = ( + self.encryption_manager.fingerprint_dir + ) # If needed to manage directories # Initialize KeyManager with the decrypted parent seed and the provided fingerprint self.key_manager = KeyManager( - self.encryption_manager.decrypt_parent_seed(), - self.fingerprint + self.encryption_manager.decrypt_parent_seed(), self.fingerprint ) # Initialize event handler and client pool @@ -126,7 +133,10 @@ class NostrClient: except Exception as e: logger.error(f"Error running event loop in thread: {e}") logger.error(traceback.format_exc()) - print(f"Error: Event loop in ClientPool thread encountered an issue: {e}", file=sys.stderr) + print( + f"Error: Event loop in ClientPool thread encountered an issue: {e}", + file=sys.stderr, + ) finally: if not self.loop.is_closed(): logger.debug("Closing the event loop.") @@ -166,14 +176,18 @@ class NostrClient: """ try: logger.debug(f"Submitting publish_event_async for event ID: {event.id}") - future = asyncio.run_coroutine_threadsafe(self.publish_event_async(event), self.loop) + future = asyncio.run_coroutine_threadsafe( + self.publish_event_async(event), self.loop + ) # Wait for the future to complete future.result(timeout=5) # Adjust the timeout as needed except Exception as e: logger.error(f"Error in publish_event: {e}") print(f"Error: Failed to publish event: {e}", file=sys.stderr) - async def subscribe_async(self, filters: List[dict], handler: Callable[[ClientPool, str, Event], None]): + async def subscribe_async( + self, filters: List[dict], handler: Callable[[ClientPool, str, Event], None] + ): """ Subscribes to events based on the provided filters using ClientPool. @@ -190,7 +204,9 @@ class NostrClient: logger.error(traceback.format_exc()) print(f"Error: Failed to subscribe: {e}", file=sys.stderr) - def subscribe(self, filters: List[dict], handler: Callable[[ClientPool, str, Event], None]): + def subscribe( + self, filters: List[dict], handler: Callable[[ClientPool, str, Event], None] + ): """ Synchronous wrapper for subscribing to events. @@ -198,7 +214,9 @@ class NostrClient: :param handler: A callback function to handle incoming events. """ try: - asyncio.run_coroutine_threadsafe(self.subscribe_async(filters, handler), self.loop) + asyncio.run_coroutine_threadsafe( + self.subscribe_async(filters, handler), self.loop + ) except Exception as e: logger.error(f"Error in subscribe: {e}") print(f"Error: Failed to subscribe: {e}", file=sys.stderr) @@ -210,11 +228,13 @@ class NostrClient: :return: The encrypted JSON data as a Base64-encoded string, or None if retrieval fails. """ try: - filters = [{ - 'authors': [self.key_manager.keys.public_key_hex()], - 'kinds': [Event.KIND_TEXT_NOTE, Event.KIND_ENCRYPT], - 'limit': 1 - }] + filters = [ + { + "authors": [self.key_manager.keys.public_key_hex()], + "kinds": [Event.KIND_TEXT_NOTE, Event.KIND_ENCRYPT], + "limit": 1, + } + ] events = [] @@ -238,7 +258,9 @@ class NostrClient: if event.kind == Event.KIND_ENCRYPT: nip4_encrypt = NIP4Encrypt(self.key_manager.keys) - content_base64 = nip4_encrypt.decrypt_message(event.content, event.pub_key) + content_base64 = nip4_encrypt.decrypt_message( + event.content, event.pub_key + ) # Return the Base64-encoded content as a string logger.debug("Encrypted JSON data retrieved successfully.") @@ -261,16 +283,21 @@ class NostrClient: :return: The encrypted JSON data as bytes, or None if retrieval fails. """ try: - future = asyncio.run_coroutine_threadsafe(self.retrieve_json_from_nostr_async(), self.loop) + future = asyncio.run_coroutine_threadsafe( + self.retrieve_json_from_nostr_async(), self.loop + ) return future.result(timeout=10) except concurrent.futures.TimeoutError: logger.error("Timeout occurred while retrieving JSON from Nostr.") - print("Error: Timeout occurred while retrieving JSON from Nostr.", file=sys.stderr) + print( + "Error: Timeout occurred while retrieving JSON from Nostr.", + file=sys.stderr, + ) return None except Exception as e: logger.error(f"Error in retrieve_json_from_nostr: {e}") logger.error(traceback.format_exc()) - print(f"Error: Failed to retrieve JSON from Nostr: {e}", 'red') + print(f"Error: Failed to retrieve JSON from Nostr: {e}", "red") return None async def do_post_async(self, text: str): @@ -283,7 +310,7 @@ class NostrClient: event = Event( kind=Event.KIND_TEXT_NOTE, content=text, - pub_key=self.key_manager.keys.public_key_hex() + pub_key=self.key_manager.keys.public_key_hex(), ) event.created_at = int(time.time()) event.sign(self.key_manager.keys.private_key_hex()) @@ -296,18 +323,22 @@ class NostrClient: logger.error(f"An error occurred during publishing: {e}", exc_info=True) print(f"Error: An error occurred during publishing: {e}", file=sys.stderr) - async def subscribe_feed_async(self, handler: Callable[[ClientPool, str, Event], None]): + async def subscribe_feed_async( + self, handler: Callable[[ClientPool, str, Event], None] + ): """ Subscribes to the feed of the client's own pubkey. :param handler: A callback function to handle incoming events. """ try: - filters = [{ - 'authors': [self.key_manager.keys.public_key_hex()], - 'kinds': [Event.KIND_TEXT_NOTE, Event.KIND_ENCRYPT], - 'limit': 100 - }] + filters = [ + { + "authors": [self.key_manager.keys.public_key_hex()], + "kinds": [Event.KIND_TEXT_NOTE, Event.KIND_ENCRYPT], + "limit": 100, + } + ] await self.subscribe_async(filters=filters, handler=handler) logger.info("Subscribed to your feed.") @@ -327,11 +358,16 @@ class NostrClient: try: await asyncio.gather( self.do_post_async(text), - self.subscribe_feed_async(self.event_handler.handle_new_event) + self.subscribe_feed_async(self.event_handler.handle_new_event), ) except Exception as e: - logger.error(f"An error occurred in publish_and_subscribe_async: {e}", exc_info=True) - print(f"Error: An error occurred in publish and subscribe: {e}", file=sys.stderr) + logger.error( + f"An error occurred in publish_and_subscribe_async: {e}", exc_info=True + ) + print( + f"Error: An error occurred in publish and subscribe: {e}", + file=sys.stderr, + ) def publish_and_subscribe(self, text: str): """ @@ -340,7 +376,9 @@ class NostrClient: :param text: The content of the text note to publish. """ try: - asyncio.run_coroutine_threadsafe(self.publish_and_subscribe_async(text), self.loop) + asyncio.run_coroutine_threadsafe( + self.publish_and_subscribe_async(text), self.loop + ) except Exception as e: logger.error(f"Error in publish_and_subscribe: {e}", exc_info=True) print(f"Error: Failed to publish and subscribe: {e}", file=sys.stderr) @@ -353,15 +391,19 @@ class NostrClient: """ try: decrypted_data = self.encryption_manager.decrypt_data(encrypted_data) - data = json.loads(decrypted_data.decode('utf-8')) + data = json.loads(decrypted_data.decode("utf-8")) self.save_json_data(data) self.update_checksum() logger.info("Index file updated from Nostr successfully.") - print(colored("Index file updated from Nostr successfully.", 'green')) + print(colored("Index file updated from Nostr successfully.", "green")) except Exception as e: logger.error(f"Failed to decrypt and save data from Nostr: {e}") logger.error(traceback.format_exc()) - print(colored(f"Error: Failed to decrypt and save data from Nostr: {e}", 'red')) + print( + colored( + f"Error: Failed to decrypt and save data from Nostr: {e}", "red" + ) + ) def save_json_data(self, data: dict) -> None: """ @@ -370,17 +412,19 @@ class NostrClient: :param data: The JSON data to save. """ try: - encrypted_data = self.encryption_manager.encrypt_data(json.dumps(data).encode('utf-8')) - index_file_path = self.fingerprint_dir / 'seedpass_passwords_db.json.enc' + encrypted_data = self.encryption_manager.encrypt_data( + json.dumps(data).encode("utf-8") + ) + index_file_path = self.fingerprint_dir / "seedpass_passwords_db.json.enc" with lock_file(index_file_path, fcntl.LOCK_EX): - with open(index_file_path, 'wb') as f: + with open(index_file_path, "wb") as f: f.write(encrypted_data) logger.debug(f"Encrypted data saved to {index_file_path}.") - print(colored(f"Encrypted data saved to '{index_file_path}'.", 'green')) + print(colored(f"Encrypted data saved to '{index_file_path}'.", "green")) except Exception as e: logger.error(f"Failed to save encrypted data: {e}") logger.error(traceback.format_exc()) - print(colored(f"Error: Failed to save encrypted data: {e}", 'red')) + print(colored(f"Error: Failed to save encrypted data: {e}", "red")) raise def update_checksum(self) -> None: @@ -388,28 +432,30 @@ class NostrClient: Updates the checksum file for the password database. """ try: - index_file_path = self.fingerprint_dir / 'seedpass_passwords_db.json.enc' + index_file_path = self.fingerprint_dir / "seedpass_passwords_db.json.enc" decrypted_data = self.decrypt_data_from_file(index_file_path) - content = decrypted_data.decode('utf-8') + content = decrypted_data.decode("utf-8") logger.debug("Calculating checksum of the updated file content.") - checksum = hashlib.sha256(content.encode('utf-8')).hexdigest() + checksum = hashlib.sha256(content.encode("utf-8")).hexdigest() logger.debug(f"New checksum: {checksum}") - checksum_file = self.fingerprint_dir / 'seedpass_passwords_db_checksum.txt' + checksum_file = self.fingerprint_dir / "seedpass_passwords_db_checksum.txt" with lock_file(checksum_file, fcntl.LOCK_EX): - with open(checksum_file, 'w') as f: + with open(checksum_file, "w") as f: f.write(checksum) os.chmod(checksum_file, 0o600) - logger.debug(f"Checksum for '{index_file_path}' updated and written to '{checksum_file}'.") - print(colored(f"Checksum for '{index_file_path}' updated.", 'green')) + logger.debug( + f"Checksum for '{index_file_path}' updated and written to '{checksum_file}'." + ) + print(colored(f"Checksum for '{index_file_path}' updated.", "green")) except Exception as e: logger.error(f"Failed to update checksum: {e}") logger.error(traceback.format_exc()) - print(colored(f"Error: Failed to update checksum: {e}", 'red')) + print(colored(f"Error: Failed to update checksum: {e}", "red")) def decrypt_data_from_file(self, file_path: Path) -> bytes: """ @@ -420,7 +466,7 @@ class NostrClient: """ try: with lock_file(file_path, fcntl.LOCK_SH): - with open(file_path, 'rb') as f: + with open(file_path, "rb") as f: encrypted_data = f.read() decrypted_data = self.encryption_manager.decrypt_data(encrypted_data) logger.debug(f"Data decrypted from file '{file_path}'.") @@ -428,7 +474,11 @@ class NostrClient: except Exception as e: logger.error(f"Failed to decrypt data from file '{file_path}': {e}") logger.error(traceback.format_exc()) - print(colored(f"Error: Failed to decrypt data from file '{file_path}': {e}", 'red')) + print( + colored( + f"Error: Failed to decrypt data from file '{file_path}': {e}", "red" + ) + ) raise def publish_json_to_nostr(self, encrypted_json: bytes, to_pubkey: str = None): @@ -439,10 +489,14 @@ class NostrClient: :param to_pubkey: (Optional) The recipient's public key for encryption. """ try: - encrypted_json_b64 = base64.b64encode(encrypted_json).decode('utf-8') + encrypted_json_b64 = base64.b64encode(encrypted_json).decode("utf-8") logger.debug(f"Encrypted JSON (base64): {encrypted_json_b64}") - event = Event(kind=Event.KIND_TEXT_NOTE, content=encrypted_json_b64, pub_key=self.key_manager.keys.public_key_hex()) + event = Event( + kind=Event.KIND_TEXT_NOTE, + content=encrypted_json_b64, + pub_key=self.key_manager.keys.public_key_hex(), + ) event.created_at = int(time.time()) @@ -471,7 +525,9 @@ class NostrClient: Optional[bytes]: The encrypted data as bytes if successful, None otherwise. """ try: - future = asyncio.run_coroutine_threadsafe(self.retrieve_json_from_nostr_async(), self.loop) + future = asyncio.run_coroutine_threadsafe( + self.retrieve_json_from_nostr_async(), self.loop + ) content_base64 = future.result(timeout=10) if not content_base64: @@ -479,17 +535,22 @@ class NostrClient: return None # Base64-decode the content - encrypted_data = base64.urlsafe_b64decode(content_base64.encode('utf-8')) - logger.debug("Encrypted data retrieved and Base64-decoded successfully from Nostr.") + encrypted_data = base64.urlsafe_b64decode(content_base64.encode("utf-8")) + logger.debug( + "Encrypted data retrieved and Base64-decoded successfully from Nostr." + ) return encrypted_data except concurrent.futures.TimeoutError: logger.error("Timeout occurred while retrieving JSON from Nostr.") - print("Error: Timeout occurred while retrieving JSON from Nostr.", file=sys.stderr) + print( + "Error: Timeout occurred while retrieving JSON from Nostr.", + file=sys.stderr, + ) return None except Exception as e: logger.error(f"Error in retrieve_json_from_nostr: {e}") logger.error(traceback.format_exc()) - print(f"Error: Failed to retrieve JSON from Nostr: {e}", 'red') + print(f"Error: Failed to retrieve JSON from Nostr: {e}", "red") return None def decrypt_and_save_index_from_nostr_public(self, encrypted_data: bytes) -> None: @@ -502,7 +563,7 @@ class NostrClient: self.decrypt_and_save_index_from_nostr(encrypted_data) except Exception as e: logger.error(f"Failed to decrypt and save index from Nostr: {e}") - print(f"Error: Failed to decrypt and save index from Nostr: {e}", 'red') + print(f"Error: Failed to decrypt and save index from Nostr: {e}", "red") async def close_client_pool_async(self): """ @@ -529,14 +590,20 @@ class NostrClient: logger.warning(f"Error unsubscribing from {sub_id}: {e}") # Close all WebSocket connections - if hasattr(self.client_pool, 'clients'): - tasks = [self.safe_close_connection(client) for client in self.client_pool.clients] + if hasattr(self.client_pool, "clients"): + tasks = [ + self.safe_close_connection(client) + for client in self.client_pool.clients + ] await asyncio.gather(*tasks, return_exceptions=True) # Gather and cancel all tasks current_task = asyncio.current_task() - tasks = [task for task in asyncio.all_tasks(loop=self.loop) - if task != current_task and not task.done()] + tasks = [ + task + for task in asyncio.all_tasks(loop=self.loop) + if task != current_task and not task.done() + ] if tasks: logger.debug(f"Cancelling {len(tasks)} pending tasks.") @@ -545,7 +612,9 @@ class NostrClient: # Wait for all tasks to be cancelled with a timeout try: - await asyncio.wait_for(asyncio.gather(*tasks, return_exceptions=True), timeout=5) + await asyncio.wait_for( + asyncio.gather(*tasks, return_exceptions=True), timeout=5 + ) except asyncio.TimeoutError: logger.warning("Timeout waiting for tasks to cancel") @@ -569,36 +638,40 @@ class NostrClient: try: # Schedule the coroutine to close the client pool - future = asyncio.run_coroutine_threadsafe(self.close_client_pool_async(), self.loop) - + future = asyncio.run_coroutine_threadsafe( + self.close_client_pool_async(), self.loop + ) + # Wait for the coroutine to finish with a timeout try: future.result(timeout=10) except concurrent.futures.TimeoutError: logger.warning("Initial shutdown attempt timed out, forcing cleanup...") - + # Additional cleanup regardless of timeout try: self.loop.call_soon_threadsafe(self.loop.stop) # Give a short grace period for the loop to stop time.sleep(0.5) - + if self.loop.is_running(): logger.warning("Loop still running after stop, closing forcefully") self.loop.call_soon_threadsafe(self.loop.close) - + # Wait for the thread with a reasonable timeout if self.loop_thread.is_alive(): self.loop_thread.join(timeout=5) - + if self.loop_thread.is_alive(): - logger.warning("Thread still alive after join, may need to be force-killed") - + logger.warning( + "Thread still alive after join, may need to be force-killed" + ) + except Exception as cleanup_error: logger.error(f"Error during final cleanup: {cleanup_error}") - + logger.info("ClientPool shutdown complete") - + except Exception as e: logger.error(f"Error in close_client_pool: {e}") logger.error(traceback.format_exc()) @@ -610,6 +683,8 @@ class NostrClient: await client.close_connection() logger.debug(f"Closed connection to relay: {client.url}") except AttributeError: - logger.warning(f"Client object has no attribute 'close_connection'. Skipping closure for {client.url}.") + logger.warning( + f"Client object has no attribute 'close_connection'. Skipping closure for {client.url}." + ) except Exception as e: logger.warning(f"Error closing connection to {client.url}: {e}") diff --git a/src/nostr/encryption_manager.py b/src/nostr/encryption_manager.py index 891264f..e3e4055 100644 --- a/src/nostr/encryption_manager.py +++ b/src/nostr/encryption_manager.py @@ -10,11 +10,12 @@ from .key_manager import KeyManager # Instantiate the logger logger = logging.getLogger(__name__) + class EncryptionManager: """ Manages encryption and decryption using Fernet symmetric encryption. """ - + def __init__(self, key_manager: KeyManager): """ Initializes the EncryptionManager with a Fernet instance. @@ -25,24 +26,26 @@ class EncryptionManager: # Derive the raw encryption key (32 bytes) raw_key = key_manager.derive_encryption_key() logger.debug(f"Derived raw encryption key length: {len(raw_key)} bytes") - + # Ensure the raw key is exactly 32 bytes if len(raw_key) != 32: - raise ValueError(f"Derived key length is {len(raw_key)} bytes; expected 32 bytes.") - + raise ValueError( + f"Derived key length is {len(raw_key)} bytes; expected 32 bytes." + ) + # Base64-encode the raw key to make it URL-safe b64_key = base64.urlsafe_b64encode(raw_key) logger.debug(f"Base64-encoded encryption key length: {len(b64_key)} bytes") - + # Initialize Fernet with the base64-encoded key self.fernet = Fernet(b64_key) logger.info("Fernet encryption manager initialized successfully.") - + except Exception as e: logger.error(f"EncryptionManager initialization failed: {e}") logger.error(traceback.format_exc()) raise - + def encrypt_parent_seed(self, seed: str, file_path: str) -> None: """ Encrypts the parent seed and saves it to the specified file. @@ -51,15 +54,15 @@ class EncryptionManager: :param file_path: The file path to save the encrypted seed. """ try: - encrypted_seed = self.fernet.encrypt(seed.encode('utf-8')) - with open(file_path, 'wb') as f: + encrypted_seed = self.fernet.encrypt(seed.encode("utf-8")) + with open(file_path, "wb") as f: f.write(encrypted_seed) logger.debug(f"Parent seed encrypted and saved to '{file_path}'.") except Exception as e: logger.error(f"Failed to encrypt and save parent seed: {e}") logger.error(traceback.format_exc()) raise - + def decrypt_parent_seed(self, file_path: str) -> str: """ Decrypts the parent seed from the specified file. @@ -68,19 +71,23 @@ class EncryptionManager: :return: The decrypted parent seed as a string. """ try: - with open(file_path, 'rb') as f: + with open(file_path, "rb") as f: encrypted_seed = f.read() - decrypted_seed = self.fernet.decrypt(encrypted_seed).decode('utf-8') + decrypted_seed = self.fernet.decrypt(encrypted_seed).decode("utf-8") logger.debug(f"Parent seed decrypted successfully from '{file_path}'.") return decrypted_seed except InvalidToken: - logger.error("Decryption failed: Invalid token. Possibly incorrect password or corrupted file.") - raise ValueError("Decryption failed: Invalid token. Possibly incorrect password or corrupted file.") + logger.error( + "Decryption failed: Invalid token. Possibly incorrect password or corrupted file." + ) + raise ValueError( + "Decryption failed: Invalid token. Possibly incorrect password or corrupted file." + ) except Exception as e: logger.error(f"Failed to decrypt parent seed: {e}") logger.error(traceback.format_exc()) raise - + def encrypt_data(self, data: dict) -> bytes: """ Encrypts a dictionary by serializing it to JSON and then encrypting it. @@ -89,7 +96,7 @@ class EncryptionManager: :return: Encrypted data as bytes. """ try: - json_data = json.dumps(data).encode('utf-8') + json_data = json.dumps(data).encode("utf-8") encrypted = self.fernet.encrypt(json_data) logger.debug("Data encrypted successfully.") return encrypted @@ -97,7 +104,7 @@ class EncryptionManager: logger.error(f"Data encryption failed: {e}") logger.error(traceback.format_exc()) raise - + def decrypt_data(self, encrypted_data: bytes) -> bytes: """ Decrypts encrypted data. diff --git a/src/nostr/event_handler.py b/src/nostr/event_handler.py index 6f8e494..8ff02d5 100644 --- a/src/nostr/event_handler.py +++ b/src/nostr/event_handler.py @@ -8,6 +8,7 @@ from monstr.event.event import Event # Instantiate the logger logger = logging.getLogger(__name__) + class EventHandler: """ Handles incoming Nostr events. @@ -25,7 +26,9 @@ class EventHandler: try: # Assuming evt.created_at is always an integer Unix timestamp if isinstance(evt.created_at, int): - created_at_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(evt.created_at)) + created_at_str = time.strftime( + "%Y-%m-%d %H:%M:%S", time.localtime(evt.created_at) + ) else: # Handle unexpected types gracefully created_at_str = str(evt.created_at) diff --git a/src/nostr/key_manager.py b/src/nostr/key_manager.py index cc89072..2aab346 100644 --- a/src/nostr/key_manager.py +++ b/src/nostr/key_manager.py @@ -11,6 +11,7 @@ from monstr.encrypt import Keys logger = logging.getLogger(__name__) + class KeyManager: """ Manages key generation, encoding, and derivation for NostrClient. @@ -26,9 +27,13 @@ class KeyManager: """ try: if not isinstance(parent_seed, str): - raise TypeError(f"Parent seed must be a string, got {type(parent_seed)}") + raise TypeError( + f"Parent seed must be a string, got {type(parent_seed)}" + ) if not isinstance(fingerprint, str): - raise TypeError(f"Fingerprint must be a string, got {type(fingerprint)}") + raise TypeError( + f"Fingerprint must be a string, got {type(fingerprint)}" + ) self.parent_seed = parent_seed self.fingerprint = fingerprint @@ -72,12 +77,14 @@ class KeyManager: """ try: # Convert fingerprint to an integer index (using a hash function) - index = int(hashlib.sha256(self.fingerprint.encode()).hexdigest(), 16) % (2**31) + index = int(hashlib.sha256(self.fingerprint.encode()).hexdigest(), 16) % ( + 2**31 + ) # Derive entropy for Nostr key (32 bytes) entropy_bytes = self.bip85.derive_entropy( index=index, - bytes_len=32 # Adjust parameter name and value as per your method signature + bytes_len=32, # Adjust parameter name and value as per your method signature ) # Generate Nostr key pair from entropy @@ -107,7 +114,7 @@ class KeyManager: str: The private key in hex. """ return self.keys.private_key_hex() - + def get_npub(self) -> str: """ Returns the npub (Bech32 encoded public key). @@ -119,7 +126,7 @@ class KeyManager: pub_key_hex = self.get_public_key_hex() pub_key_bytes = bytes.fromhex(pub_key_hex) data = convertbits(pub_key_bytes, 8, 5, True) - npub = bech32_encode('npub', data) + npub = bech32_encode("npub", data) return npub except Exception as e: logger.error(f"Failed to generate npub: {e}") diff --git a/src/nostr/utils.py b/src/nostr/utils.py index 33b82d0..6485a82 100644 --- a/src/nostr/utils.py +++ b/src/nostr/utils.py @@ -2,6 +2,7 @@ import logging + # Example utility function (if any specific to nostr package) def some_helper_function(): pass # Implement as needed diff --git a/src/password_manager/encryption.py b/src/password_manager/encryption.py index 24f3817..8d3866e 100644 --- a/src/password_manager/encryption.py +++ b/src/password_manager/encryption.py @@ -30,12 +30,14 @@ import fcntl # For file locking # Instantiate the logger logger = logging.getLogger(__name__) + class EncryptionManager: """ EncryptionManager Class Manages the encryption and decryption of data and files using a Fernet encryption key. """ + def __init__(self, encryption_key: bytes, fingerprint_dir: Path): """ Initializes the EncryptionManager with the provided encryption key and fingerprint directory. @@ -45,16 +47,20 @@ class EncryptionManager: fingerprint_dir (Path): The directory corresponding to the fingerprint. """ self.fingerprint_dir = fingerprint_dir - self.parent_seed_file = self.fingerprint_dir / 'parent_seed.enc' + self.parent_seed_file = self.fingerprint_dir / "parent_seed.enc" self.key = encryption_key try: self.fernet = Fernet(self.key) logger.debug(f"EncryptionManager initialized for {self.fingerprint_dir}") except Exception as e: - logger.error(f"Failed to initialize Fernet with provided encryption key: {e}") + logger.error( + f"Failed to initialize Fernet with provided encryption key: {e}" + ) logger.error(traceback.format_exc()) - print(colored(f"Error: Failed to initialize encryption manager: {e}", 'red')) + print( + colored(f"Error: Failed to initialize encryption manager: {e}", "red") + ) raise def encrypt_parent_seed(self, parent_seed: str) -> None: @@ -65,25 +71,32 @@ class EncryptionManager: """ try: # Convert seed to bytes - data = parent_seed.encode('utf-8') + data = parent_seed.encode("utf-8") # Encrypt the data encrypted_data = self.encrypt_data(data) # Write the encrypted data to the file with locking with lock_file(self.parent_seed_file, fcntl.LOCK_EX): - with open(self.parent_seed_file, 'wb') as f: + with open(self.parent_seed_file, "wb") as f: f.write(encrypted_data) # Set file permissions to read/write for the user only os.chmod(self.parent_seed_file, 0o600) - logger.info(f"Parent seed encrypted and saved to '{self.parent_seed_file}'.") - print(colored(f"Parent seed encrypted and saved to '{self.parent_seed_file}'.", 'green')) + logger.info( + f"Parent seed encrypted and saved to '{self.parent_seed_file}'." + ) + print( + colored( + f"Parent seed encrypted and saved to '{self.parent_seed_file}'.", + "green", + ) + ) except Exception as e: logger.error(f"Failed to encrypt and save parent seed: {e}") logger.error(traceback.format_exc()) - print(colored(f"Error: Failed to encrypt and save parent seed: {e}", 'red')) + print(colored(f"Error: Failed to encrypt and save parent seed: {e}", "red")) raise def decrypt_parent_seed(self) -> str: @@ -93,24 +106,28 @@ class EncryptionManager: :return: The decrypted parent seed. """ try: - parent_seed_path = self.fingerprint_dir / 'parent_seed.enc' + parent_seed_path = self.fingerprint_dir / "parent_seed.enc" with lock_file(parent_seed_path, fcntl.LOCK_SH): - with open(parent_seed_path, 'rb') as f: + with open(parent_seed_path, "rb") as f: encrypted_data = f.read() decrypted_data = self.decrypt_data(encrypted_data) - parent_seed = decrypted_data.decode('utf-8').strip() + parent_seed = decrypted_data.decode("utf-8").strip() - logger.debug(f"Parent seed decrypted successfully from '{parent_seed_path}'.") + logger.debug( + f"Parent seed decrypted successfully from '{parent_seed_path}'." + ) return parent_seed except InvalidToken: - logger.error("Invalid encryption key or corrupted data while decrypting parent seed.") - print(colored("Error: Invalid encryption key or corrupted data.", 'red')) + logger.error( + "Invalid encryption key or corrupted data while decrypting parent seed." + ) + print(colored("Error: Invalid encryption key or corrupted data.", "red")) raise except Exception as e: logger.error(f"Failed to decrypt parent seed: {e}") logger.error(traceback.format_exc()) - print(colored(f"Error: Failed to decrypt parent seed: {e}", 'red')) + print(colored(f"Error: Failed to decrypt parent seed: {e}", "red")) raise def encrypt_data(self, data: bytes) -> bytes: @@ -127,7 +144,7 @@ class EncryptionManager: except Exception as e: logger.error(f"Failed to encrypt data: {e}") logger.error(traceback.format_exc()) - print(colored(f"Error: Failed to encrypt data: {e}", 'red')) + print(colored(f"Error: Failed to encrypt data: {e}", "red")) raise def decrypt_data(self, encrypted_data: bytes) -> bytes: @@ -142,13 +159,15 @@ class EncryptionManager: logger.debug("Data decrypted successfully.") return decrypted_data except InvalidToken: - logger.error("Invalid encryption key or corrupted data while decrypting data.") - print(colored("Error: Invalid encryption key or corrupted data.", 'red')) + logger.error( + "Invalid encryption key or corrupted data while decrypting data." + ) + print(colored("Error: Invalid encryption key or corrupted data.", "red")) raise except Exception as e: logger.error(f"Failed to decrypt data: {e}") logger.error(traceback.format_exc()) - print(colored(f"Error: Failed to decrypt data: {e}", 'red')) + print(colored(f"Error: Failed to decrypt data: {e}", "red")) raise def encrypt_and_save_file(self, data: bytes, relative_path: Path) -> None: @@ -170,18 +189,23 @@ class EncryptionManager: # Write the encrypted data to the file with locking with lock_file(file_path, fcntl.LOCK_EX): - with open(file_path, 'wb') as f: + with open(file_path, "wb") as f: f.write(encrypted_data) # Set file permissions to read/write for the user only os.chmod(file_path, 0o600) logger.info(f"Data encrypted and saved to '{file_path}'.") - print(colored(f"Data encrypted and saved to '{file_path}'.", 'green')) + print(colored(f"Data encrypted and saved to '{file_path}'.", "green")) except Exception as e: logger.error(f"Failed to encrypt and save data to '{relative_path}': {e}") logger.error(traceback.format_exc()) - print(colored(f"Error: Failed to encrypt and save data to '{relative_path}': {e}", 'red')) + print( + colored( + f"Error: Failed to encrypt and save data to '{relative_path}': {e}", + "red", + ) + ) raise def decrypt_file(self, relative_path: Path) -> bytes: @@ -197,7 +221,7 @@ class EncryptionManager: # Read the encrypted data with locking with lock_file(file_path, fcntl.LOCK_SH): - with open(file_path, 'rb') as f: + with open(file_path, "rb") as f: encrypted_data = f.read() # Decrypt the data @@ -205,13 +229,19 @@ class EncryptionManager: logger.debug(f"Data decrypted successfully from '{file_path}'.") return decrypted_data except InvalidToken: - logger.error("Invalid encryption key or corrupted data while decrypting file.") - print(colored("Error: Invalid encryption key or corrupted data.", 'red')) + logger.error( + "Invalid encryption key or corrupted data while decrypting file." + ) + print(colored("Error: Invalid encryption key or corrupted data.", "red")) raise except Exception as e: logger.error(f"Failed to decrypt data from '{relative_path}': {e}") logger.error(traceback.format_exc()) - print(colored(f"Error: Failed to decrypt data from '{relative_path}': {e}", 'red')) + print( + colored( + f"Error: Failed to decrypt data from '{relative_path}': {e}", "red" + ) + ) raise def save_json_data(self, data: dict, relative_path: Optional[Path] = None) -> None: @@ -223,16 +253,22 @@ class EncryptionManager: Defaults to 'seedpass_passwords_db.json.enc'. """ if relative_path is None: - relative_path = Path('seedpass_passwords_db.json.enc') + relative_path = Path("seedpass_passwords_db.json.enc") try: - json_data = json.dumps(data, indent=4).encode('utf-8') + json_data = json.dumps(data, indent=4).encode("utf-8") self.encrypt_and_save_file(json_data, relative_path) logger.debug(f"JSON data encrypted and saved to '{relative_path}'.") - print(colored(f"JSON data encrypted and saved to '{relative_path}'.", 'green')) + print( + colored(f"JSON data encrypted and saved to '{relative_path}'.", "green") + ) except Exception as e: logger.error(f"Failed to save JSON data to '{relative_path}': {e}") logger.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: Failed to save JSON data to '{relative_path}': {e}", 'red')) + print( + colored( + f"Error: Failed to save JSON data to '{relative_path}': {e}", "red" + ) + ) raise def load_json_data(self, relative_path: Optional[Path] = None) -> dict: @@ -244,35 +280,54 @@ class EncryptionManager: :return: The decrypted JSON data as a dictionary. """ if relative_path is None: - relative_path = Path('seedpass_passwords_db.json.enc') + relative_path = Path("seedpass_passwords_db.json.enc") file_path = self.fingerprint_dir / relative_path if not file_path.exists(): - logger.info(f"Index file '{file_path}' does not exist. Initializing empty data.") - print(colored(f"Info: Index file '{file_path}' not found. Initializing new password database.", 'yellow')) - return {'passwords': {}} + logger.info( + f"Index file '{file_path}' does not exist. Initializing empty data." + ) + print( + colored( + f"Info: Index file '{file_path}' not found. Initializing new password database.", + "yellow", + ) + ) + return {"passwords": {}} try: decrypted_data = self.decrypt_file(relative_path) - json_content = decrypted_data.decode('utf-8').strip() + json_content = decrypted_data.decode("utf-8").strip() data = json.loads(json_content) logger.debug(f"JSON data loaded and decrypted from '{file_path}': {data}") - print(colored(f"JSON data loaded and decrypted from '{file_path}'.", 'green')) + print( + colored(f"JSON data loaded and decrypted from '{file_path}'.", "green") + ) return data except json.JSONDecodeError as e: logger.error(f"Failed to decode JSON data from '{file_path}': {e}") logger.error(traceback.format_exc()) - print(colored(f"Error: Failed to decode JSON data from '{file_path}': {e}", 'red')) + print( + colored( + f"Error: Failed to decode JSON data from '{file_path}': {e}", "red" + ) + ) raise except InvalidToken: - logger.error("Invalid encryption key or corrupted data while decrypting JSON data.") - print(colored("Error: Invalid encryption key or corrupted data.", 'red')) + logger.error( + "Invalid encryption key or corrupted data while decrypting JSON data." + ) + print(colored("Error: Invalid encryption key or corrupted data.", "red")) raise except Exception as e: logger.error(f"Failed to load JSON data from '{file_path}': {e}") logger.error(traceback.format_exc()) - print(colored(f"Error: Failed to load JSON data from '{file_path}': {e}", 'red')) + print( + colored( + f"Error: Failed to load JSON data from '{file_path}': {e}", "red" + ) + ) raise def update_checksum(self, relative_path: Optional[Path] = None) -> None: @@ -283,32 +338,39 @@ class EncryptionManager: Defaults to 'seedpass_passwords_db.json.enc'. """ if relative_path is None: - relative_path = Path('seedpass_passwords_db.json.enc') + relative_path = Path("seedpass_passwords_db.json.enc") try: file_path = self.fingerprint_dir / relative_path decrypted_data = self.decrypt_file(relative_path) - content = decrypted_data.decode('utf-8') + content = decrypted_data.decode("utf-8") logger.debug("Calculating checksum of the updated file content.") - checksum = hashlib.sha256(content.encode('utf-8')).hexdigest() + checksum = hashlib.sha256(content.encode("utf-8")).hexdigest() logger.debug(f"New checksum: {checksum}") checksum_file = file_path.parent / f"{file_path.stem}_checksum.txt" # Write the checksum to the file with locking with lock_file(checksum_file, fcntl.LOCK_EX): - with open(checksum_file, 'w') as f: + with open(checksum_file, "w") as f: f.write(checksum) # Set file permissions to read/write for the user only os.chmod(checksum_file, 0o600) - logger.debug(f"Checksum for '{file_path}' updated and written to '{checksum_file}'.") - print(colored(f"Checksum for '{file_path}' updated.", 'green')) + logger.debug( + f"Checksum for '{file_path}' updated and written to '{checksum_file}'." + ) + print(colored(f"Checksum for '{file_path}' updated.", "green")) except Exception as e: logger.error(f"Failed to update checksum for '{relative_path}': {e}") logger.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: Failed to update checksum for '{relative_path}': {e}", 'red')) + print( + colored( + f"Error: Failed to update checksum for '{relative_path}': {e}", + "red", + ) + ) raise def get_encrypted_index(self) -> Optional[bytes]: @@ -318,14 +380,20 @@ class EncryptionManager: :return: Encrypted data as bytes or None if the index file does not exist. """ try: - relative_path = Path('seedpass_passwords_db.json.enc') + relative_path = Path("seedpass_passwords_db.json.enc") if not (self.fingerprint_dir / relative_path).exists(): - logger.error(f"Index file '{relative_path}' does not exist in '{self.fingerprint_dir}'.") - print(colored(f"Error: Index file '{relative_path}' does not exist.", 'red')) + logger.error( + f"Index file '{relative_path}' does not exist in '{self.fingerprint_dir}'." + ) + print( + colored( + f"Error: Index file '{relative_path}' does not exist.", "red" + ) + ) return None with lock_file(self.fingerprint_dir / relative_path, fcntl.LOCK_SH): - with open(self.fingerprint_dir / relative_path, 'rb') as file: + with open(self.fingerprint_dir / relative_path, "rb") as file: encrypted_data = file.read() logger.debug(f"Encrypted index data read from '{relative_path}'.") @@ -333,10 +401,17 @@ class EncryptionManager: except Exception as e: logger.error(f"Failed to read encrypted index file '{relative_path}': {e}") logger.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: Failed to read encrypted index file '{relative_path}': {e}", 'red')) + print( + colored( + f"Error: Failed to read encrypted index file '{relative_path}': {e}", + "red", + ) + ) return None - def decrypt_and_save_index_from_nostr(self, encrypted_data: bytes, relative_path: Optional[Path] = None) -> None: + def decrypt_and_save_index_from_nostr( + self, encrypted_data: bytes, relative_path: Optional[Path] = None + ) -> None: """ Decrypts the encrypted data retrieved from Nostr and updates the local index file. @@ -345,18 +420,22 @@ class EncryptionManager: Defaults to 'seedpass_passwords_db.json.enc'. """ if relative_path is None: - relative_path = Path('seedpass_passwords_db.json.enc') + relative_path = Path("seedpass_passwords_db.json.enc") try: decrypted_data = self.decrypt_data(encrypted_data) - data = json.loads(decrypted_data.decode('utf-8')) + data = json.loads(decrypted_data.decode("utf-8")) self.save_json_data(data, relative_path) self.update_checksum(relative_path) logger.info("Index file updated from Nostr successfully.") - print(colored("Index file updated from Nostr successfully.", 'green')) + print(colored("Index file updated from Nostr successfully.", "green")) except Exception as e: logger.error(f"Failed to decrypt and save data from Nostr: {e}") logger.error(traceback.format_exc()) - print(colored(f"Error: Failed to decrypt and save data from Nostr: {e}", 'red')) + print( + colored( + f"Error: Failed to decrypt and save data from Nostr: {e}", "red" + ) + ) # Re-raise the exception to inform the calling function of the failure raise @@ -371,7 +450,9 @@ class EncryptionManager: words = seed_phrase.split() if len(words) != 12: logger.error("Seed phrase does not contain exactly 12 words.") - print(colored("Error: Seed phrase must contain exactly 12 words.", 'red')) + print( + colored("Error: Seed phrase must contain exactly 12 words.", "red") + ) return False # Additional validation can be added here (e.g., word list checks) logger.debug("Seed phrase validated successfully.") @@ -379,7 +460,7 @@ class EncryptionManager: except Exception as e: logging.error(f"Error validating seed phrase: {e}") logging.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: Failed to validate seed phrase: {e}", 'red')) + print(colored(f"Error: Failed to validate seed phrase: {e}", "red")) return False def derive_seed_from_mnemonic(self, mnemonic: str, passphrase: str = "") -> bytes: @@ -399,11 +480,12 @@ class EncryptionManager: if not isinstance(mnemonic, str): raise TypeError("Mnemonic must be a string after conversion") from bip_utils import Bip39SeedGenerator + seed = Bip39SeedGenerator(mnemonic).Generate(passphrase) logger.debug("Seed derived successfully from mnemonic.") return seed except Exception as e: logger.error(f"Failed to derive seed from mnemonic: {e}") logger.error(traceback.format_exc()) - print(colored(f"Error: Failed to derive seed from mnemonic: {e}", 'red')) + print(colored(f"Error: Failed to derive seed from mnemonic: {e}", "red")) raise diff --git a/src/password_manager/manager.py b/src/password_manager/manager.py index f9d71e6..9f3ef65 100644 --- a/src/password_manager/manager.py +++ b/src/password_manager/manager.py @@ -1003,7 +1003,7 @@ class PasswordManager: Handles the backup and reveal of the parent seed. """ try: - print(colored("\n=== Backup/Reveal Parent Seed ===", "yellow")) + print(colored("\n=== Backup Parent Seed ===", "yellow")) print( colored( "Warning: Revealing your parent seed is a highly sensitive operation.", diff --git a/src/password_manager/password_generation.py b/src/password_manager/password_generation.py index 4ab0f2c..fa3049e 100644 --- a/src/password_manager/password_generation.py +++ b/src/password_manager/password_generation.py @@ -35,6 +35,7 @@ from password_manager.encryption import EncryptionManager # Instantiate the logger logger = logging.getLogger(__name__) + class PasswordGenerator: """ PasswordGenerator Class @@ -44,7 +45,9 @@ class PasswordGenerator: complexity requirements. """ - def __init__(self, encryption_manager: EncryptionManager, parent_seed: str, bip85: BIP85): + def __init__( + self, encryption_manager: EncryptionManager, parent_seed: str, bip85: BIP85 + ): """ Initializes the PasswordGenerator with the encryption manager, parent seed, and BIP85 instance. @@ -59,16 +62,20 @@ class PasswordGenerator: self.bip85 = bip85 # Derive seed bytes from parent_seed using BIP39 (handled by EncryptionManager) - self.seed_bytes = self.encryption_manager.derive_seed_from_mnemonic(self.parent_seed) + self.seed_bytes = self.encryption_manager.derive_seed_from_mnemonic( + self.parent_seed + ) logger.debug("PasswordGenerator initialized successfully.") except Exception as e: logger.error(f"Failed to initialize PasswordGenerator: {e}") logger.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: Failed to initialize PasswordGenerator: {e}", 'red')) + print(colored(f"Error: Failed to initialize PasswordGenerator: {e}", "red")) raise - def generate_password(self, length: int = DEFAULT_PASSWORD_LENGTH, index: int = 0) -> str: + def generate_password( + self, length: int = DEFAULT_PASSWORD_LENGTH, index: int = 0 + ) -> str: """ Generates a deterministic password based on the parent seed, desired length, and index. @@ -90,11 +97,19 @@ class PasswordGenerator: try: # Validate password length if length < MIN_PASSWORD_LENGTH: - logger.error(f"Password length must be at least {MIN_PASSWORD_LENGTH} characters.") - raise ValueError(f"Password length must be at least {MIN_PASSWORD_LENGTH} characters.") + logger.error( + f"Password length must be at least {MIN_PASSWORD_LENGTH} characters." + ) + raise ValueError( + f"Password length must be at least {MIN_PASSWORD_LENGTH} characters." + ) if length > MAX_PASSWORD_LENGTH: - logger.error(f"Password length must not exceed {MAX_PASSWORD_LENGTH} characters.") - raise ValueError(f"Password length must not exceed {MAX_PASSWORD_LENGTH} characters.") + logger.error( + f"Password length must not exceed {MAX_PASSWORD_LENGTH} characters." + ) + raise ValueError( + f"Password length must not exceed {MAX_PASSWORD_LENGTH} characters." + ) # Derive entropy using BIP-85 entropy = self.bip85.derive_entropy(index=index, bytes_len=64, app_no=32) @@ -105,39 +120,43 @@ class PasswordGenerator: algorithm=hashes.SHA256(), length=32, # 256 bits for AES-256 salt=None, - info=b'password-generation', - backend=default_backend() + info=b"password-generation", + backend=default_backend(), ) derived_key = hkdf.derive(entropy) logger.debug(f"Derived key using HKDF: {derived_key.hex()}") # Use PBKDF2-HMAC-SHA256 to derive a key from entropy - dk = hashlib.pbkdf2_hmac('sha256', entropy, b'', 100000) + dk = hashlib.pbkdf2_hmac("sha256", entropy, b"", 100000) logger.debug(f"Derived key using PBKDF2: {dk.hex()}") # Map the derived key to all allowed characters all_allowed = string.ascii_letters + string.digits + string.punctuation - password = ''.join(all_allowed[byte % len(all_allowed)] for byte in dk) - logger.debug(f"Password after mapping to all allowed characters: {password}") + password = "".join(all_allowed[byte % len(all_allowed)] for byte in dk) + logger.debug( + f"Password after mapping to all allowed characters: {password}" + ) # Ensure the password meets complexity requirements password = self.ensure_complexity(password, all_allowed, dk) logger.debug(f"Password after ensuring complexity: {password}") # Shuffle characters deterministically based on dk - shuffle_seed = int.from_bytes(dk, 'big') + shuffle_seed = int.from_bytes(dk, "big") rng = random.Random(shuffle_seed) password_chars = list(password) rng.shuffle(password_chars) - password = ''.join(password_chars) + password = "".join(password_chars) logger.debug("Shuffled password deterministically.") # Ensure password length by extending if necessary if len(password) < length: while len(password) < length: - dk = hashlib.pbkdf2_hmac('sha256', dk, b'', 1) - base64_extra = ''.join(all_allowed[byte % len(all_allowed)] for byte in dk) - password += ''.join(base64_extra) + dk = hashlib.pbkdf2_hmac("sha256", dk, b"", 1) + base64_extra = "".join( + all_allowed[byte % len(all_allowed)] for byte in dk + ) + password += "".join(base64_extra) logger.debug(f"Extended password: {password}") # Trim the password to the desired length @@ -149,7 +168,7 @@ class PasswordGenerator: except Exception as e: logger.error(f"Error generating password: {e}") logger.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: Failed to generate password: {e}", 'red')) + print(colored(f"Error: Failed to generate password: {e}", "red")) raise def ensure_complexity(self, password: str, alphabet: str, dk: bytes) -> str: @@ -180,7 +199,9 @@ class PasswordGenerator: current_digits = sum(1 for c in password_chars if c in digits) current_special = sum(1 for c in password_chars if c in special) - logger.debug(f"Current character counts - Upper: {current_upper}, Lower: {current_lower}, Digits: {current_digits}, Special: {current_special}") + logger.debug( + f"Current character counts - Upper: {current_upper}, Lower: {current_lower}, Digits: {current_digits}, Special: {current_special}" + ) # Set minimum counts min_upper = 2 @@ -204,14 +225,18 @@ class PasswordGenerator: index = get_dk_value() % len(password_chars) char = uppercase[get_dk_value() % len(uppercase)] password_chars[index] = char - logger.debug(f"Added uppercase letter '{char}' at position {index}.") + logger.debug( + f"Added uppercase letter '{char}' at position {index}." + ) if current_lower < min_lower: for _ in range(min_lower - current_lower): index = get_dk_value() % len(password_chars) char = lowercase[get_dk_value() % len(lowercase)] password_chars[index] = char - logger.debug(f"Added lowercase letter '{char}' at position {index}.") + logger.debug( + f"Added lowercase letter '{char}' at position {index}." + ) if current_digits < min_digits: for _ in range(min_digits - current_digits): @@ -225,7 +250,9 @@ class PasswordGenerator: index = get_dk_value() % len(password_chars) char = special[get_dk_value() % len(special)] password_chars[index] = char - logger.debug(f"Added special character '{char}' at position {index}.") + logger.debug( + f"Added special character '{char}' at position {index}." + ) # Additional deterministic inclusion of symbols to increase score symbol_target = 3 # Increase target number of symbols @@ -253,11 +280,15 @@ class PasswordGenerator: if i == 0 and password_chars[j] not in uppercase: char = uppercase[get_dk_value() % len(uppercase)] password_chars[j] = char - logger.debug(f"Assigned uppercase letter '{char}' to position {j}.") + logger.debug( + f"Assigned uppercase letter '{char}' to position {j}." + ) elif i == 1 and password_chars[j] not in lowercase: char = lowercase[get_dk_value() % len(lowercase)] password_chars[j] = char - logger.debug(f"Assigned lowercase letter '{char}' to position {j}.") + logger.debug( + f"Assigned lowercase letter '{char}' to position {j}." + ) elif i == 2 and password_chars[j] not in digits: char = digits[get_dk_value() % len(digits)] password_chars[j] = char @@ -265,10 +296,14 @@ class PasswordGenerator: elif i == 3 and password_chars[j] not in special: char = special[get_dk_value() % len(special)] password_chars[j] = char - logger.debug(f"Assigned special character '{char}' to position {j}.") + logger.debug( + f"Assigned special character '{char}' to position {j}." + ) # Shuffle again to distribute the characters more evenly - shuffle_seed = int.from_bytes(dk, 'big') + dk_index # Modify seed to vary shuffle + shuffle_seed = ( + int.from_bytes(dk, "big") + dk_index + ) # Modify seed to vary shuffle rng = random.Random(shuffle_seed) rng.shuffle(password_chars) logger.debug(f"Shuffled password characters for balanced distribution.") @@ -278,12 +313,14 @@ class PasswordGenerator: final_lower = sum(1 for c in password_chars if c in lowercase) final_digits = sum(1 for c in password_chars if c in digits) final_special = sum(1 for c in password_chars if c in special) - logger.debug(f"Final character counts - Upper: {final_upper}, Lower: {final_lower}, Digits: {final_digits}, Special: {final_special}") + logger.debug( + f"Final character counts - Upper: {final_upper}, Lower: {final_lower}, Digits: {final_digits}, Special: {final_special}" + ) - return ''.join(password_chars) + return "".join(password_chars) except Exception as e: logger.error(f"Error ensuring password complexity: {e}") logger.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: Failed to ensure password complexity: {e}", 'red')) + print(colored(f"Error: Failed to ensure password complexity: {e}", "red")) raise diff --git a/src/repo-context.txt b/src/repo-context.txt index a9d7f9c..30f018d 100644 --- a/src/repo-context.txt +++ b/src/repo-context.txt @@ -350,7 +350,7 @@ def display_menu(password_manager: PasswordManager): 5. Post Encrypted Index to Nostr 6. Retrieve Encrypted Index from Nostr 7. Display Nostr Public Key (npub) - 8. Backup/Reveal Parent Seed + 8. Backup Parent Seed 9. Switch Fingerprint 10. Add a New Fingerprint 11. Remove an Existing Fingerprint @@ -1602,7 +1602,7 @@ class PasswordManager: Handles the backup and reveal of the parent seed. """ try: - print(colored("\n=== Backup/Reveal Parent Seed ===", 'yellow')) + print(colored("\n=== Backup Parent Seed ===", 'yellow')) print(colored("Warning: Revealing your parent seed is a highly sensitive operation.", 'red')) print(colored("Ensure you're in a secure, private environment and no one is watching your screen.", 'red')) diff --git a/src/tests/test_import.py b/src/tests/test_import.py index 8c6a743..38f0eaf 100644 --- a/src/tests/test_import.py +++ b/src/tests/test_import.py @@ -2,6 +2,7 @@ try: from bip_utils import Bip39SeedGenerator + print("Bip39SeedGenerator imported successfully.") except ImportError as e: print(f"ImportError: {e}") diff --git a/src/utils/__init__.py b/src/utils/__init__.py index 927234f..1c3e453 100644 --- a/src/utils/__init__.py +++ b/src/utils/__init__.py @@ -8,17 +8,17 @@ try: from .key_derivation import derive_key_from_password, derive_key_from_parent_seed from .checksum import calculate_checksum, verify_checksum from .password_prompt import prompt_for_password - + logging.info("Modules imported successfully.") except Exception as e: logging.error(f"Failed to import one or more modules: {e}") logging.error(traceback.format_exc()) # Log full traceback __all__ = [ - 'derive_key_from_password', - 'derive_key_from_parent_seed', - 'calculate_checksum', - 'verify_checksum', - 'lock_file', - 'prompt_for_password' + "derive_key_from_password", + "derive_key_from_parent_seed", + "calculate_checksum", + "verify_checksum", + "lock_file", + "prompt_for_password", ] diff --git a/src/utils/checksum.py b/src/utils/checksum.py index 9266b3d..37f2e45 100644 --- a/src/utils/checksum.py +++ b/src/utils/checksum.py @@ -19,14 +19,12 @@ from typing import Optional from termcolor import colored -from constants import ( - APP_DIR, - SCRIPT_CHECKSUM_FILE -) +from constants import APP_DIR, SCRIPT_CHECKSUM_FILE # Instantiate the logger logger = logging.getLogger(__name__) + def calculate_checksum(file_path: str) -> Optional[str]: """ Calculates the SHA-256 checksum of the given file. @@ -39,7 +37,7 @@ def calculate_checksum(file_path: str) -> Optional[str]: """ hasher = hashlib.sha256() try: - with open(file_path, 'rb') as f: + with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hasher.update(chunk) checksum = hasher.hexdigest() @@ -47,12 +45,20 @@ def calculate_checksum(file_path: str) -> Optional[str]: return checksum except FileNotFoundError: logging.error(f"File '{file_path}' not found for checksum calculation.") - print(colored(f"Error: File '{file_path}' not found for checksum calculation.", 'red')) + print( + colored( + f"Error: File '{file_path}' not found for checksum calculation.", "red" + ) + ) return None except Exception as e: logging.error(f"Error calculating checksum for '{file_path}': {e}") logging.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: Failed to calculate checksum for '{file_path}': {e}", 'red')) + print( + colored( + f"Error: Failed to calculate checksum for '{file_path}': {e}", "red" + ) + ) return None @@ -68,7 +74,7 @@ def verify_checksum(current_checksum: str, checksum_file_path: str) -> bool: bool: True if checksums match, False otherwise. """ try: - with open(checksum_file_path, 'r') as f: + with open(checksum_file_path, "r") as f: stored_checksum = f.read().strip() if current_checksum == stored_checksum: logging.debug(f"Checksum verification passed for '{checksum_file_path}'.") @@ -78,12 +84,17 @@ def verify_checksum(current_checksum: str, checksum_file_path: str) -> bool: return False except FileNotFoundError: logging.error(f"Checksum file '{checksum_file_path}' not found.") - print(colored(f"Error: Checksum file '{checksum_file_path}' not found.", 'red')) + print(colored(f"Error: Checksum file '{checksum_file_path}' not found.", "red")) return False except Exception as e: logging.error(f"Error reading checksum file '{checksum_file_path}': {e}") logging.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: Failed to read checksum file '{checksum_file_path}': {e}", 'red')) + print( + colored( + f"Error: Failed to read checksum file '{checksum_file_path}': {e}", + "red", + ) + ) return False @@ -100,16 +111,21 @@ def update_checksum(content: str, checksum_file_path: str) -> bool: """ try: hasher = hashlib.sha256() - hasher.update(content.encode('utf-8')) + hasher.update(content.encode("utf-8")) new_checksum = hasher.hexdigest() - with open(checksum_file_path, 'w') as f: + with open(checksum_file_path, "w") as f: f.write(new_checksum) logging.debug(f"Updated checksum for '{checksum_file_path}' to: {new_checksum}") return True except Exception as e: logging.error(f"Failed to update checksum for '{checksum_file_path}': {e}") logging.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: Failed to update checksum for '{checksum_file_path}': {e}", 'red')) + print( + colored( + f"Error: Failed to update checksum for '{checksum_file_path}': {e}", + "red", + ) + ) return False @@ -129,11 +145,11 @@ def verify_and_update_checksum(file_path: str, checksum_file_path: str) -> bool: return False if verify_checksum(current_checksum, checksum_file_path): - print(colored(f"Checksum verification passed for '{file_path}'.", 'green')) + print(colored(f"Checksum verification passed for '{file_path}'.", "green")) logging.info(f"Checksum verification passed for '{file_path}'.") return True else: - print(colored(f"Checksum verification failed for '{file_path}'.", 'red')) + print(colored(f"Checksum verification failed for '{file_path}'.", "red")) logging.warning(f"Checksum verification failed for '{file_path}'.") return False @@ -154,13 +170,20 @@ def initialize_checksum(file_path: str, checksum_file_path: str) -> bool: return False try: - with open(checksum_file_path, 'w') as f: + with open(checksum_file_path, "w") as f: f.write(checksum) - logging.debug(f"Initialized checksum file '{checksum_file_path}' with checksum: {checksum}") - print(colored(f"Initialized checksum for '{file_path}'.", 'green')) + logging.debug( + f"Initialized checksum file '{checksum_file_path}' with checksum: {checksum}" + ) + print(colored(f"Initialized checksum for '{file_path}'.", "green")) return True except Exception as e: logging.error(f"Failed to initialize checksum file '{checksum_file_path}': {e}") logging.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: Failed to initialize checksum file '{checksum_file_path}': {e}", 'red')) + print( + colored( + f"Error: Failed to initialize checksum file '{checksum_file_path}': {e}", + "red", + ) + ) return False diff --git a/src/utils/file_lock.py b/src/utils/file_lock.py index 118eec7..431eea3 100644 --- a/src/utils/file_lock.py +++ b/src/utils/file_lock.py @@ -26,6 +26,7 @@ import traceback # Instantiate the logger logger = logging.getLogger(__name__) + @contextmanager def lock_file(file_path: Path, lock_type: int) -> Generator[None, None, None]: """ @@ -44,14 +45,16 @@ def lock_file(file_path: Path, lock_type: int) -> Generator[None, None, None]: SystemExit: Exits the program if the lock cannot be acquired. """ if lock_type not in (fcntl.LOCK_EX, fcntl.LOCK_SH): - logging.error(f"Invalid lock type: {lock_type}. Use fcntl.LOCK_EX or fcntl.LOCK_SH.") - print(colored("Error: Invalid lock type provided.", 'red')) + logging.error( + f"Invalid lock type: {lock_type}. Use fcntl.LOCK_EX or fcntl.LOCK_SH." + ) + print(colored("Error: Invalid lock type provided.", "red")) sys.exit(1) file = None try: # Determine the mode based on whether the file exists - mode = 'rb+' if file_path.exists() else 'wb' + mode = "rb+" if file_path.exists() else "wb" # Open the file file = open(file_path, mode) @@ -67,7 +70,12 @@ def lock_file(file_path: Path, lock_type: int) -> Generator[None, None, None]: lock_type_str = "exclusive" if lock_type == fcntl.LOCK_EX else "shared" logging.error(f"Failed to acquire {lock_type_str} lock on '{file_path}': {e}") logging.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: Failed to acquire {lock_type_str} lock on '{file_path}': {e}", 'red')) + print( + colored( + f"Error: Failed to acquire {lock_type_str} lock on '{file_path}': {e}", + "red", + ) + ) sys.exit(1) finally: @@ -78,9 +86,16 @@ def lock_file(file_path: Path, lock_type: int) -> Generator[None, None, None]: logging.debug(f"Lock released on '{file_path}'.") except Exception as e: lock_type_str = "exclusive" if lock_type == fcntl.LOCK_EX else "shared" - logging.warning(f"Failed to release {lock_type_str} lock on '{file_path}': {e}") + logging.warning( + f"Failed to release {lock_type_str} lock on '{file_path}': {e}" + ) logging.error(traceback.format_exc()) # Log full traceback - print(colored(f"Warning: Failed to release {lock_type_str} lock on '{file_path}': {e}", 'yellow')) + print( + colored( + f"Warning: Failed to release {lock_type_str} lock on '{file_path}': {e}", + "yellow", + ) + ) finally: # Close the file try: @@ -89,7 +104,12 @@ def lock_file(file_path: Path, lock_type: int) -> Generator[None, None, None]: except Exception as e: logging.warning(f"Failed to close file '{file_path}': {e}") logging.error(traceback.format_exc()) # Log full traceback - print(colored(f"Warning: Failed to close file '{file_path}': {e}", 'yellow')) + print( + colored( + f"Warning: Failed to close file '{file_path}': {e}", + "yellow", + ) + ) @contextmanager diff --git a/src/utils/fingerprint.py b/src/utils/fingerprint.py index e831756..6e7046d 100644 --- a/src/utils/fingerprint.py +++ b/src/utils/fingerprint.py @@ -16,6 +16,7 @@ from typing import Optional # Instantiate the logger logger = logging.getLogger(__name__) + def generate_fingerprint(seed_phrase: str, length: int = 16) -> Optional[str]: """ Generates a unique fingerprint from the provided seed phrase using SHA-256. @@ -33,7 +34,7 @@ def generate_fingerprint(seed_phrase: str, length: int = 16) -> Optional[str]: logger.debug(f"Normalized seed: {normalized_seed}") # Compute SHA-256 hash - sha256_hash = hashlib.sha256(normalized_seed.encode('utf-8')).hexdigest() + sha256_hash = hashlib.sha256(normalized_seed.encode("utf-8")).hexdigest() logger.debug(f"SHA-256 Hash: {sha256_hash}") # Truncate to desired length diff --git a/src/utils/fingerprint_manager.py b/src/utils/fingerprint_manager.py index 3b9597f..84c1929 100644 --- a/src/utils/fingerprint_manager.py +++ b/src/utils/fingerprint_manager.py @@ -14,6 +14,7 @@ from utils.fingerprint import generate_fingerprint # Instantiate the logger logger = logging.getLogger(__name__) + class FingerprintManager: """ FingerprintManager Class @@ -31,7 +32,7 @@ class FingerprintManager: app_dir (Path): The root application directory (e.g., ~/.seedpass). """ self.app_dir = app_dir - self.fingerprints_file = self.app_dir / 'fingerprints.json' + self.fingerprints_file = self.app_dir / "fingerprints.json" self._ensure_app_directory() self.fingerprints = self._load_fingerprints() self.current_fingerprint: Optional[str] = None @@ -43,7 +44,7 @@ class FingerprintManager: Returns: Optional[Path]: The Path object of the current fingerprint directory or None. """ - if hasattr(self, 'current_fingerprint') and self.current_fingerprint: + if hasattr(self, "current_fingerprint") and self.current_fingerprint: return self.get_fingerprint_directory(self.current_fingerprint) else: logger.error("No current fingerprint is set.") @@ -57,7 +58,9 @@ class FingerprintManager: self.app_dir.mkdir(parents=True, exist_ok=True) logger.debug(f"Application directory ensured at {self.app_dir}") except Exception as e: - logger.error(f"Failed to create application directory at {self.app_dir}: {e}") + logger.error( + f"Failed to create application directory at {self.app_dir}: {e}" + ) logger.error(traceback.format_exc()) raise @@ -70,13 +73,15 @@ class FingerprintManager: """ try: if self.fingerprints_file.exists(): - with open(self.fingerprints_file, 'r') as f: + with open(self.fingerprints_file, "r") as f: data = json.load(f) - fingerprints = data.get('fingerprints', []) + fingerprints = data.get("fingerprints", []) logger.debug(f"Loaded fingerprints: {fingerprints}") return fingerprints else: - logger.debug("fingerprints.json not found. Initializing empty fingerprint list.") + logger.debug( + "fingerprints.json not found. Initializing empty fingerprint list." + ) return [] except Exception as e: logger.error(f"Failed to load fingerprints: {e}") @@ -88,8 +93,8 @@ class FingerprintManager: Saves the current list of fingerprints to the fingerprints.json file. """ try: - with open(self.fingerprints_file, 'w') as f: - json.dump({'fingerprints': self.fingerprints}, f, indent=4) + with open(self.fingerprints_file, "w") as f: + json.dump({"fingerprints": self.fingerprints}, f, indent=4) logger.debug(f"Fingerprints saved: {self.fingerprints}") except Exception as e: logger.error(f"Failed to save fingerprints: {e}") @@ -140,7 +145,7 @@ class FingerprintManager: # Remove fingerprint directory fingerprint_dir = self.app_dir / fingerprint if fingerprint_dir.exists() and fingerprint_dir.is_dir(): - for child in fingerprint_dir.glob('*'): + for child in fingerprint_dir.glob("*"): if child.is_file(): child.unlink() elif child.is_dir(): diff --git a/src/utils/password_prompt.py b/src/utils/password_prompt.py index 2a5be96..0b42899 100644 --- a/src/utils/password_prompt.py +++ b/src/utils/password_prompt.py @@ -29,6 +29,7 @@ colorama_init() # Instantiate the logger logger = logging.getLogger(__name__) + def prompt_new_password() -> str: """ Prompts the user to enter and confirm a new password for encrypting the parent seed. @@ -51,39 +52,50 @@ def prompt_new_password() -> str: confirm_password = getpass.getpass(prompt="Confirm your password: ").strip() if not password: - print(colored("Error: Password cannot be empty. Please try again.", 'red')) + print( + colored("Error: Password cannot be empty. Please try again.", "red") + ) logging.warning("User attempted to enter an empty password.") attempts += 1 continue if len(password) < MIN_PASSWORD_LENGTH: - print(colored(f"Error: Password must be at least {MIN_PASSWORD_LENGTH} characters long.", 'red')) - logging.warning(f"User entered a password shorter than {MIN_PASSWORD_LENGTH} characters.") + print( + colored( + f"Error: Password must be at least {MIN_PASSWORD_LENGTH} characters long.", + "red", + ) + ) + logging.warning( + f"User entered a password shorter than {MIN_PASSWORD_LENGTH} characters." + ) attempts += 1 continue if password != confirm_password: - print(colored("Error: Passwords do not match. Please try again.", 'red')) + print( + colored("Error: Passwords do not match. Please try again.", "red") + ) logging.warning("User entered mismatching passwords.") attempts += 1 continue # Normalize the password to NFKD form - normalized_password = unicodedata.normalize('NFKD', password) + normalized_password = unicodedata.normalize("NFKD", password) logging.debug("User entered a valid and confirmed password.") return normalized_password except KeyboardInterrupt: - print(colored("\nOperation cancelled by user.", 'yellow')) + print(colored("\nOperation cancelled by user.", "yellow")) logging.info("Password prompt interrupted by user.") sys.exit(0) except Exception as e: logging.error(f"Unexpected error during password prompt: {e}") logging.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: {e}", 'red')) + print(colored(f"Error: {e}", "red")) attempts += 1 - print(colored("Maximum password attempts exceeded. Exiting.", 'red')) + print(colored("Maximum password attempts exceeded. Exiting.", "red")) logging.error("User failed to provide a valid password after multiple attempts.") sys.exit(1) @@ -107,27 +119,29 @@ def prompt_existing_password(prompt_message: str = "Enter your password: ") -> s password = getpass.getpass(prompt=prompt_message).strip() if not password: - print(colored("Error: Password cannot be empty.", 'red')) + print(colored("Error: Password cannot be empty.", "red")) logging.warning("User attempted to enter an empty password.") sys.exit(1) # Normalize the password to NFKD form - normalized_password = unicodedata.normalize('NFKD', password) + normalized_password = unicodedata.normalize("NFKD", password) logging.debug("User entered an existing password for decryption.") return normalized_password except KeyboardInterrupt: - print(colored("\nOperation cancelled by user.", 'yellow')) + print(colored("\nOperation cancelled by user.", "yellow")) logging.info("Existing password prompt interrupted by user.") sys.exit(0) except Exception as e: logging.error(f"Unexpected error during existing password prompt: {e}") logging.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: {e}", 'red')) + print(colored(f"Error: {e}", "red")) sys.exit(1) -def confirm_action(prompt_message: str = "Are you sure you want to proceed? (Y/N): ") -> bool: +def confirm_action( + prompt_message: str = "Are you sure you want to proceed? (Y/N): ", +) -> bool: """ Prompts the user to confirm an action, typically used before performing critical operations. @@ -143,24 +157,24 @@ def confirm_action(prompt_message: str = "Are you sure you want to proceed? (Y/N """ try: while True: - response = input(colored(prompt_message, 'cyan')).strip().lower() - if response in ['y', 'yes']: + response = input(colored(prompt_message, "cyan")).strip().lower() + if response in ["y", "yes"]: logging.debug("User confirmed the action.") return True - elif response in ['n', 'no']: + elif response in ["n", "no"]: logging.debug("User declined the action.") return False else: - print(colored("Please respond with 'Y' or 'N'.", 'yellow')) + print(colored("Please respond with 'Y' or 'N'.", "yellow")) except KeyboardInterrupt: - print(colored("\nOperation cancelled by user.", 'yellow')) + print(colored("\nOperation cancelled by user.", "yellow")) logging.info("Action confirmation interrupted by user.") sys.exit(0) except Exception as e: logging.error(f"Unexpected error during action confirmation: {e}") logging.error(traceback.format_exc()) # Log full traceback - print(colored(f"Error: {e}", 'red')) + print(colored(f"Error: {e}", "red")) sys.exit(1)