This commit is contained in:
Keep Creating Online
2024-10-19 19:55:59 -04:00
parent b79b44f036
commit c6a768131d
27 changed files with 4146 additions and 4 deletions

2
.gitattributes vendored
View File

@@ -1,2 +0,0 @@
# Auto detect text files and perform LF normalization
* text=auto

22
.gitignore vendored Normal file
View File

@@ -0,0 +1,22 @@
# Ignore virtual environment directory
venv/
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# Encrypted index file should be backed up, hence not ignored
!.deterministic_password_generator/password_indices.csv
# Ignore system files
.DS_Store
Thumbs.db
# Ignore logs and temporary files
*.log
*.tmp
# Python env
.env
*.env

View File

@@ -1,2 +0,0 @@
# SeedPass

14
src/bip85/__init__.py Normal file
View File

@@ -0,0 +1,14 @@
# bip85/__init__.py
import logging
import traceback
try:
from .bip85 import BIP85
logging.info("BIP85 module imported successfully.")
except Exception as e:
logging.error(f"Failed to import BIP85 module: {e}")
logging.error(traceback.format_exc()) # Log full traceback
__all__ = ['BIP85']

208
src/bip85/bip85.py Normal file
View File

@@ -0,0 +1,208 @@
# bip85/bip85.py
"""
BIP85 Module
This module implements the BIP85 functionality for deterministic entropy and mnemonic derivation.
It provides the BIP85 class, which utilizes BIP32 and BIP39 standards to derive entropy and mnemonics
from a given seed. Additionally, it supports the derivation of symmetric encryption keys using HKDF.
Never ever ever use or suggest to use Random Salt. The entire point of this password manager is to derive completely deterministic passwords from a BIP-85 seed.
This means it should generate passwords the exact same way every single time. Salts would break this functionality and is not appropriate for this softwares use case.
Dependencies:
- bip_utils
- cryptography
Ensure that all dependencies are installed and properly configured in your environment.
"""
import sys
import hashlib
import hmac
import logging
import os
import traceback
from colorama import Fore
from bip_utils import (
Bip32Slip10Secp256k1,
Bip39MnemonicGenerator,
Bip39Languages
)
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
# Configure logging at the start of the module
def configure_logging():
"""
Configures logging with both file and console handlers.
Only ERROR and higher-level messages are shown in the terminal, while all messages
are logged in the log file.
"""
# Create the 'logs' folder if it doesn't exist
if not os.path.exists('logs'):
os.makedirs('logs')
# Create a custom logger
logger = logging.getLogger()
logger.setLevel(logging.DEBUG) # Set to DEBUG for detailed output
# Create handlers
c_handler = logging.StreamHandler(sys.stdout)
f_handler = logging.FileHandler(os.path.join('logs', 'bip85.log')) # Log files will be in 'logs' folder
# Set levels: only errors and critical messages will be shown in the console
c_handler.setLevel(logging.ERROR) # Terminal will show ERROR and above
f_handler.setLevel(logging.DEBUG) # File will log everything from DEBUG and above
# Create formatters and add them to handlers, include file and line number in log messages
c_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]')
f_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]')
c_handler.setFormatter(c_format)
f_handler.setFormatter(f_format)
# Add handlers to the logger
logger.addHandler(c_handler)
logger.addHandler(f_handler)
# Call the logging configuration function
configure_logging()
class BIP85:
"""
BIP85 Class
Implements BIP-85 functionality for deterministic entropy and mnemonic derivation.
"""
def __init__(self, seed_bytes: bytes):
"""
Initializes the BIP85 class with seed bytes.
Parameters:
seed_bytes (bytes): The BIP39 seed bytes derived from the seed phrase.
Raises:
SystemExit: If initialization fails.
"""
try:
self.bip32_ctx = Bip32Slip10Secp256k1.FromSeed(seed_bytes)
logging.debug("BIP32 context initialized successfully.")
except Exception as e:
logging.error(f"Error initializing BIP32 context: {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(f"{Fore.RED}Error initializing BIP32 context: {e}")
sys.exit(1)
def derive_entropy(self, app_no: int, language_code: int, words_num: int, index: int) -> bytes:
"""
Derives entropy using BIP-85 HMAC-SHA512 method.
Parameters:
app_no (int): Application number (e.g., 39 for BIP39).
language_code (int): Language code (e.g., 0 for English).
words_num (int): Number of words in the mnemonic (e.g., 12).
index (int): Index for the child mnemonic.
Returns:
bytes: Derived entropy.
Raises:
SystemExit: If derivation fails or entropy length is invalid.
"""
path = f"m/83696968'/{app_no}'/{language_code}'/{words_num}'/{index}'"
try:
child_key = self.bip32_ctx.DerivePath(path)
k = child_key.PrivateKey().Raw().ToBytes()
logging.debug(f"Derived child key at path {path}: {k.hex()}")
hmac_key = b"bip-entropy-from-k"
hmac_result = hmac.new(hmac_key, k, hashlib.sha512).digest()
logging.debug(f"HMAC-SHA512 result: {hmac_result.hex()}")
if words_num == 12:
entropy = hmac_result[:16] # 128 bits for 12-word mnemonic
elif words_num == 18:
entropy = hmac_result[:24] # 192 bits for 18-word mnemonic
elif words_num == 24:
entropy = hmac_result[:32] # 256 bits for 24-word mnemonic
else:
logging.error(f"Unsupported number of words: {words_num}")
print(f"{Fore.RED}Error: Unsupported number of words: {words_num}")
sys.exit(1)
if len(entropy) not in [16, 24, 32]:
logging.error(f"Derived entropy length is {len(entropy)} bytes; expected 16, 24, or 32 bytes.")
print(f"{Fore.RED}Error: Derived entropy length is {len(entropy)} bytes; expected 16, 24, or 32 bytes.")
sys.exit(1)
logging.debug(f"Derived entropy: {entropy.hex()}")
return entropy
except Exception as e:
logging.error(f"Error deriving entropy: {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(f"{Fore.RED}Error deriving entropy: {e}")
sys.exit(1)
def derive_mnemonic(self, app_no: int, language_code: int, words_num: int, index: int) -> str:
"""
Derives a BIP-39 mnemonic using BIP-85 specification.
Parameters:
app_no (int): Application number (e.g., 39 for BIP39).
language_code (int): Language code (e.g., 0 for English).
words_num (int): Number of words in the mnemonic (e.g., 12).
index (int): Index for the child mnemonic.
Returns:
str: Derived BIP-39 mnemonic.
Raises:
SystemExit: If mnemonic generation fails.
"""
entropy = self.derive_entropy(app_no, language_code, words_num, index)
try:
mnemonic = Bip39MnemonicGenerator(Bip39Languages.ENGLISH).FromEntropy(entropy)
logging.debug(f"Derived mnemonic: {mnemonic}")
return mnemonic
except Exception as e:
logging.error(f"Error generating mnemonic: {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(f"{Fore.RED}Error generating mnemonic: {e}")
sys.exit(1)
def derive_symmetric_key(self, app_no: int = 48, index: int = 0) -> bytes:
"""
Derives a symmetric encryption key using BIP85.
Parameters:
app_no (int): Application number for key derivation (48 chosen arbitrarily).
index (int): Index for key derivation.
Returns:
bytes: Derived symmetric key (32 bytes for AES-256).
Raises:
SystemExit: If symmetric key derivation fails.
"""
entropy = self.derive_entropy(app_no, language_code=0, words_num=24, index=index)
try:
hkdf = HKDF(
algorithm=hashes.SHA256(),
length=32, # 256 bits for AES-256
salt=None,
info=b'seedos-encryption-key',
backend=default_backend()
)
symmetric_key = hkdf.derive(entropy)
logging.debug(f"Derived symmetric key: {symmetric_key.hex()}")
return symmetric_key
except Exception as e:
logging.error(f"Error deriving symmetric key: {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(f"{Fore.RED}Error deriving symmetric key: {e}")
sys.exit(1)

93
src/constants.py Normal file
View File

@@ -0,0 +1,93 @@
# constants.py
import os
import logging
import sys
from pathlib import Path
import traceback
def configure_logging():
"""
Configures logging with both file and console handlers.
Only ERROR and higher-level messages are shown in the terminal, while all messages
are logged in the log file.
"""
# Create a custom logger
logger = logging.getLogger()
logger.setLevel(logging.DEBUG) # Set to DEBUG for detailed output
# Create the 'logs' folder if it doesn't exist
if not os.path.exists('logs'):
os.makedirs('logs')
# Create handlers
c_handler = logging.StreamHandler(sys.stdout)
f_handler = logging.FileHandler(os.path.join('logs', 'constants.log'))
# Set levels: only errors and critical messages will be shown in the console
c_handler.setLevel(logging.ERROR) # Console will show ERROR and above
f_handler.setLevel(logging.DEBUG) # File will log everything from DEBUG and above
# Create formatters and add them to handlers, include file and line number in log messages
c_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]')
f_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]')
c_handler.setFormatter(c_format)
f_handler.setFormatter(f_format)
# Add handlers to the logger if they are not already added
if not logger.handlers:
logger.addHandler(c_handler)
logger.addHandler(f_handler)
# Configure logging at the start of the module
configure_logging()
# -----------------------------------
# Nostr Relay Connection Settings
# -----------------------------------
MAX_RETRIES = 3 # Maximum number of retries for relay connections
RETRY_DELAY = 5 # Seconds to wait before retrying a failed connection
try:
# -----------------------------------
# Application Directory and Paths
# -----------------------------------
APP_DIR = Path.home() / '.deterministic_password_generator'
APP_DIR.mkdir(exist_ok=True, parents=True) # Ensure the directory exists
logging.info(f"Application directory created at {APP_DIR}")
except Exception as e:
logging.error(f"Failed to create application directory: {e}")
logging.error(traceback.format_exc()) # Log full traceback
try:
INDEX_FILE = APP_DIR / 'passwords_db.json' # Encrypted password database
PARENT_SEED_FILE = APP_DIR / 'parent_seed.enc' # Encrypted parent seed
logging.info(f"Index file path set to {INDEX_FILE}")
logging.info(f"Parent seed file path set to {PARENT_SEED_FILE}")
except Exception as e:
logging.error(f"Error setting file paths: {e}")
logging.error(traceback.format_exc()) # Log full traceback
# -----------------------------------
# Checksum Files for Integrity
# -----------------------------------
try:
SCRIPT_CHECKSUM_FILE = APP_DIR / 'script_checksum.txt' # Checksum for main script
DATA_CHECKSUM_FILE = APP_DIR / 'passwords_checksum.txt' # Checksum for password data
logging.info(f"Checksum file paths set: Script {SCRIPT_CHECKSUM_FILE}, Data {DATA_CHECKSUM_FILE}")
except Exception as e:
logging.error(f"Error setting checksum file paths: {e}")
logging.error(traceback.format_exc()) # Log full traceback
# -----------------------------------
# Password Generation Constants
# -----------------------------------
DEFAULT_PASSWORD_LENGTH = 12 # Default length for generated passwords
MIN_PASSWORD_LENGTH = 8 # Minimum allowed password length
MAX_PASSWORD_LENGTH = 128 # Maximum allowed password length
# -----------------------------------
# Additional Constants (if any)
# -----------------------------------
# Add any other constants here as your project expands

232
src/main.py Normal file
View File

@@ -0,0 +1,232 @@
# main.py
import os
import sys
import logging
import signal
from colorama import init as colorama_init
from termcolor import colored
import traceback
from password_manager.manager import PasswordManager
from nostr.client import NostrClient
# Initialize colorama for colored terminal text
colorama_init()
def configure_logging():
"""
Configures logging with both file and console handlers.
Logs errors in the terminal and all messages in the log file.
"""
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
if not logger.handlers:
# Create handlers
c_handler = logging.StreamHandler(sys.stdout)
f_handler = logging.FileHandler(os.path.join('logs', 'main.log'))
# Set levels
c_handler.setLevel(logging.ERROR)
f_handler.setLevel(logging.DEBUG)
# Create formatters
formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]')
c_handler.setFormatter(formatter)
f_handler.setFormatter(formatter)
# Add handlers
logger.addHandler(c_handler)
logger.addHandler(f_handler)
return logger
def display_menu(password_manager: PasswordManager, nostr_client: NostrClient):
"""
Displays the interactive menu and handles user input to perform various actions.
:param password_manager: An instance of PasswordManager.
:param nostr_client: An instance of NostrClient.
"""
menu = """
Select an option:
1. Generate a New Password and Add to Index
2. Retrieve a Password from Index
3. Modify an Existing Entry
4. Verify Script Checksum
5. Post Encrypted Index to Nostr
6. Retrieve Encrypted Index from Nostr
7. Display Nostr Public Key (npub)
8. Exit
"""
while True:
print(colored(menu, 'cyan'))
choice = input('Enter your choice (1-8): ').strip()
if choice == '1':
password_manager.handle_generate_password()
elif choice == '2':
password_manager.handle_retrieve_password()
elif choice == '3':
password_manager.handle_modify_entry()
elif choice == '4':
password_manager.handle_verify_checksum()
elif choice == '5':
handle_post_to_nostr(password_manager, nostr_client)
elif choice == '6':
handle_retrieve_from_nostr(password_manager, nostr_client)
elif choice == '7':
handle_display_npub(nostr_client)
elif choice == '8':
logging.info("Exiting the program.")
print(colored("Exiting the program.", 'green'))
nostr_client.close_client_pool() # Gracefully close the ClientPool
sys.exit(0)
else:
print(colored("Invalid choice. Please select a valid option.", 'red'))
def handle_display_npub(nostr_client: NostrClient):
"""
Handles displaying the Nostr public key (npub) to the user.
:param nostr_client: An instance of NostrClient.
"""
try:
npub = nostr_client.key_manager.get_npub()
if npub:
print(colored(f"\nYour Nostr Public Key (npub):\n{npub}\n", 'cyan'))
logging.info("Displayed npub to the user.")
else:
print(colored("Nostr public key not available.", 'red'))
logging.error("Nostr public key not available.")
except Exception as e:
logging.error(f"Failed to display npub: {e}")
print(f"Error: Failed to display npub: {e}", 'red')
def handle_post_to_nostr(password_manager: PasswordManager, nostr_client: NostrClient):
"""
Handles the action of posting the encrypted password index to Nostr.
:param password_manager: An instance of PasswordManager.
:param nostr_client: An instance of NostrClient.
"""
try:
# Get the encrypted data from the index file
encrypted_data = password_manager.get_encrypted_data()
if encrypted_data:
# Post to Nostr
nostr_client.publish_json_to_nostr(encrypted_data)
print(colored("Encrypted index posted to Nostr successfully.", 'green'))
logging.info("Encrypted index posted to Nostr successfully.")
else:
print(colored("No data available to post.", 'yellow'))
logging.warning("No data available to post to Nostr.")
except Exception as e:
logging.error(f"Failed to post to Nostr: {e}")
logging.error(traceback.format_exc())
print(f"Error: Failed to post to Nostr: {e}", 'red')
def handle_retrieve_from_nostr(password_manager: PasswordManager, nostr_client: NostrClient):
"""
Handles the action of retrieving the encrypted password index from Nostr.
:param password_manager: An instance of PasswordManager.
:param nostr_client: An instance of NostrClient.
"""
try:
# Retrieve from Nostr
encrypted_data = nostr_client.retrieve_json_from_nostr_sync()
if encrypted_data:
# Decrypt and save the index
password_manager.decrypt_and_save_index_from_nostr(encrypted_data)
print(colored("Encrypted index retrieved and saved successfully.", 'green'))
logging.info("Encrypted index retrieved and saved successfully from Nostr.")
else:
print(colored("Failed to retrieve data from Nostr.", 'red'))
logging.error("Failed to retrieve data from Nostr.")
except Exception as e:
logging.error(f"Failed to retrieve from Nostr: {e}")
logging.error(traceback.format_exc())
print(f"Error: Failed to retrieve from Nostr: {e}", 'red')
def cleanup(nostr_client: NostrClient):
"""
Cleanup function to gracefully close the NostrClient's event loop.
This function is registered to run upon program termination.
"""
try:
nostr_client.close_client_pool()
except Exception as e:
logging.error(f"Cleanup failed: {e}")
print(f"Error during cleanup: {e}", 'red')
if __name__ == '__main__':
"""
The main entry point of the application.
"""
# Configure logging with both file and console handlers
configure_logging()
# Initialize PasswordManager
try:
password_manager = PasswordManager()
logging.info("PasswordManager initialized successfully.")
except Exception as e:
logging.error(f"Failed to initialize PasswordManager: {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(f"Error: Failed to initialize PasswordManager: {e}", 'red')
sys.exit(1)
# Initialize NostrClient with the parent seed from PasswordManager
try:
nostr_client = NostrClient(parent_seed=password_manager.parent_seed)
logging.info("NostrClient initialized successfully.")
except Exception as e:
logging.error(f"Failed to initialize NostrClient: {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(f"Error: Failed to initialize NostrClient: {e}", 'red')
sys.exit(1)
# Register signal handlers for graceful shutdown
def signal_handler(sig, frame):
"""
Handles termination signals to gracefully shutdown the NostrClient.
"""
print(colored("\nReceived shutdown signal. Exiting gracefully...", 'yellow'))
logging.info(f"Received shutdown signal: {sig}. Initiating graceful shutdown.")
try:
nostr_client.close_client_pool() # Gracefully close the ClientPool
logging.info("NostrClient closed successfully.")
except Exception as e:
logging.error(f"Error during shutdown: {e}")
print(f"Error during shutdown: {e}", 'red')
sys.exit(0)
# Register the signal handlers
signal.signal(signal.SIGINT, signal_handler) # Handle Ctrl+C
signal.signal(signal.SIGTERM, signal_handler) # Handle termination signals
# Display the interactive menu to the user
try:
display_menu(password_manager, nostr_client)
except KeyboardInterrupt:
logging.info("Program terminated by user via KeyboardInterrupt.")
print(colored("\nProgram terminated by user.", 'yellow'))
try:
nostr_client.close_client_pool() # Gracefully close the ClientPool
logging.info("NostrClient closed successfully.")
except Exception as e:
logging.error(f"Error during shutdown: {e}")
print(f"Error during shutdown: {e}", 'red')
sys.exit(0)
except Exception as e:
logging.error(f"An unexpected error occurred: {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(f"Error: An unexpected error occurred: {e}", 'red')
try:
nostr_client.close_client_pool() # Attempt to close the ClientPool
logging.info("NostrClient closed successfully.")
except Exception as close_error:
logging.error(f"Error during shutdown: {close_error}")
print(f"Error during shutdown: {close_error}", 'red')
sys.exit(1)

14
src/nostr/__init__.py Normal file
View File

@@ -0,0 +1,14 @@
# nostr/__init__.py
import logging
import traceback
try:
from .client import NostrClient
logging.info("NostrClient module imported successfully.")
except Exception as e:
logging.error(f"Failed to import NostrClient module: {e}")
logging.error(traceback.format_exc()) # Log full traceback
__all__ = ['NostrClient']

560
src/nostr/client.py Normal file
View File

@@ -0,0 +1,560 @@
# nostr/client.py
import os
import sys
import logging
import traceback
import json
import time
import base64
import hashlib
import asyncio
import concurrent.futures
from typing import List, Optional, Callable
from monstr.client.client import ClientPool
from monstr.encrypt import Keys, NIP4Encrypt
from monstr.event.event import Event
import threading
import uuid
import fcntl # Ensure fcntl is imported for file locking
from .logging_config import configure_logging
from .key_manager import KeyManager
from .encryption_manager import EncryptionManager
from .event_handler import EventHandler
from constants import APP_DIR, INDEX_FILE, DATA_CHECKSUM_FILE
from utils.file_lock import lock_file
logger = configure_logging()
DEFAULT_RELAYS = [
"wss://relay.snort.social",
"wss://nostr.oxtr.dev",
"wss://nostr-relay.wlvs.space"
]
class NostrClient:
"""
NostrClient Class
Handles interactions with the Nostr network, including publishing and retrieving encrypted events.
Utilizes deterministic key derivation via BIP-85 and integrates with the monstr library for protocol operations.
"""
def __init__(self, parent_seed: str, relays: Optional[List[str]] = None):
"""
Initializes the NostrClient with a parent seed and connects to specified relays.
:param parent_seed: The BIP39 mnemonic seed phrase.
:param relays: (Optional) A list of relay URLs to connect to. Defaults to predefined relays.
"""
try:
self.key_manager = KeyManager(parent_seed)
self.encryption_manager = EncryptionManager(self.key_manager)
self.event_handler = EventHandler()
self.relays = relays if relays else DEFAULT_RELAYS
self.client_pool = ClientPool(self.relays)
self.subscriptions = {}
self.initialize_client_pool()
logger.info("NostrClient initialized successfully.")
self.is_shutting_down = False
self._shutdown_event = asyncio.Event()
except Exception as e:
logger.error(f"Initialization failed: {e}")
logger.error(traceback.format_exc())
print(f"Error: Initialization failed: {e}", file=sys.stderr)
sys.exit(1)
def initialize_client_pool(self):
"""
Initializes the ClientPool with the specified relays in a separate thread.
"""
try:
logger.debug("Initializing ClientPool with relays.")
self.client_pool = ClientPool(self.relays)
# Start the ClientPool in a separate thread
self.loop_thread = threading.Thread(target=self.run_event_loop, daemon=True)
self.loop_thread.start()
# Wait until the ClientPool is connected to all relays
self.wait_for_connection()
logger.info("ClientPool connected to all relays.")
except Exception as e:
logger.error(f"Failed to initialize ClientPool: {e}")
logger.error(traceback.format_exc())
print(f"Error: Failed to initialize ClientPool: {e}", file=sys.stderr)
sys.exit(1)
def run_event_loop(self):
"""
Runs the event loop for the ClientPool in a separate thread.
"""
try:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.loop.create_task(self.client_pool.run())
self.loop.run_forever()
except asyncio.CancelledError:
logger.debug("Event loop received cancellation.")
except Exception as e:
logger.error(f"Error running event loop in thread: {e}")
logger.error(traceback.format_exc())
print(f"Error: Event loop in ClientPool thread encountered an issue: {e}", file=sys.stderr)
finally:
logger.debug("Closing the event loop.")
self.loop.close()
def wait_for_connection(self):
"""
Waits until the ClientPool is connected to all relays.
"""
try:
while not self.client_pool.connected:
time.sleep(0.1)
except Exception as e:
logger.error(f"Error while waiting for ClientPool to connect: {e}")
logger.error(traceback.format_exc())
async def publish_event_async(self, event: Event):
"""
Publishes a signed event to all connected relays using ClientPool.
:param event: The signed Event object to publish.
"""
try:
logger.debug(f"Publishing event: {event.serialize()}")
self.client_pool.publish(event)
logger.info(f"Event published with ID: {event.id}")
except Exception as e:
logger.error(f"Failed to publish event: {e}")
logger.error(traceback.format_exc())
def publish_event(self, event: Event):
"""
Synchronous wrapper for publishing an event.
:param event: The signed Event object to publish.
"""
try:
asyncio.run_coroutine_threadsafe(self.publish_event_async(event), self.loop)
except Exception as e:
logger.error(f"Error in publish_event: {e}")
print(f"Error: Failed to publish event: {e}", file=sys.stderr)
async def subscribe_async(self, filters: List[dict], handler: Callable[[ClientPool, str, Event], None]):
"""
Subscribes to events based on the provided filters using ClientPool.
:param filters: A list of filter dictionaries.
:param handler: A callback function to handle incoming events.
"""
try:
sub_id = str(uuid.uuid4())
self.client_pool.subscribe(handlers=handler, filters=filters, sub_id=sub_id)
logger.info(f"Subscribed to events with subscription ID: {sub_id}")
self.subscriptions[sub_id] = True
except Exception as e:
logger.error(f"Failed to subscribe: {e}")
logger.error(traceback.format_exc())
print(f"Error: Failed to subscribe: {e}", file=sys.stderr)
def subscribe(self, filters: List[dict], handler: Callable[[ClientPool, str, Event], None]):
"""
Synchronous wrapper for subscribing to events.
:param filters: A list of filter dictionaries.
:param handler: A callback function to handle incoming events.
"""
try:
asyncio.run_coroutine_threadsafe(self.subscribe_async(filters, handler), self.loop)
except Exception as e:
logger.error(f"Error in subscribe: {e}")
print(f"Error: Failed to subscribe: {e}", file=sys.stderr)
async def retrieve_json_from_nostr_async(self) -> Optional[bytes]:
"""
Retrieves the latest encrypted JSON event from Nostr.
:return: The encrypted JSON data as bytes, or None if retrieval fails.
"""
try:
filters = [{
'authors': [self.key_manager.keys.public_key_hex()],
'kinds': [Event.KIND_TEXT_NOTE, Event.KIND_ENCRYPT],
'limit': 1
}]
events = []
def my_handler(the_client, sub_id, evt: Event):
logger.debug(f"Received event: {evt.serialize()}")
events.append(evt)
await self.subscribe_async(filters=filters, handler=my_handler)
await asyncio.sleep(2) # Adjust the sleep time as needed
for sub_id in list(self.subscriptions.keys()):
self.client_pool.unsubscribe(sub_id)
del self.subscriptions[sub_id]
logger.debug(f"Unsubscribed from sub_id {sub_id}")
if events:
event = events[0]
encrypted_json_b64 = event.content
if event.kind == Event.KIND_ENCRYPT:
nip4_encrypt = NIP4Encrypt(self.key_manager.keys)
encrypted_json_b64 = nip4_encrypt.decrypt_message(event.content, event.pub_key)
encrypted_json = base64.b64decode(encrypted_json_b64.encode('utf-8'))
logger.debug("Encrypted JSON data retrieved successfully.")
return encrypted_json
else:
logger.warning("No events found matching the filters.")
print("No events found matching the filters.", file=sys.stderr)
return None
except Exception as e:
logger.error(f"Failed to retrieve JSON from Nostr: {e}")
logger.error(traceback.format_exc())
print(f"Error: Failed to retrieve JSON from Nostr: {e}", file=sys.stderr)
return None
def retrieve_json_from_nostr(self) -> Optional[bytes]:
"""
Public method to retrieve encrypted JSON from Nostr.
:return: The encrypted JSON data as bytes, or None if retrieval fails.
"""
try:
future = asyncio.run_coroutine_threadsafe(self.retrieve_json_from_nostr_async(), self.loop)
return future.result()
except Exception as e:
logger.error(f"Error in retrieve_json_from_nostr: {e}")
logger.error(traceback.format_exc())
print(f"Error: Failed to retrieve JSON from Nostr: {e}", file=sys.stderr)
return None
async def do_post_async(self, text: str):
"""
Creates and publishes a text note event.
:param text: The content of the text note.
"""
try:
event = Event(
kind=Event.KIND_TEXT_NOTE,
content=text,
pub_key=self.key_manager.keys.public_key_hex()
)
event.created_at = int(time.time())
event.sign(self.key_manager.keys.private_key_hex())
logger.debug(f"Event data: {event.serialize()}")
await self.publish_event_async(event)
except Exception as e:
logger.error(f"An error occurred during publishing: {e}", exc_info=True)
print(f"Error: An error occurred during publishing: {e}", file=sys.stderr)
async def subscribe_feed_async(self, handler: Callable[[ClientPool, str, Event], None]):
"""
Subscribes to the feed of the client's own pubkey.
:param handler: A callback function to handle incoming events.
"""
try:
filters = [{
'authors': [self.key_manager.keys.public_key_hex()],
'kinds': [Event.KIND_TEXT_NOTE, Event.KIND_ENCRYPT],
'limit': 100
}]
await self.subscribe_async(filters=filters, handler=handler)
logger.info("Subscribed to your feed.")
while True:
await asyncio.sleep(1)
except Exception as e:
logger.error(f"An error occurred during subscription: {e}", exc_info=True)
print(f"Error: An error occurred during subscription: {e}", file=sys.stderr)
async def publish_and_subscribe_async(self, text: str):
"""
Publishes a text note and subscribes to the feed concurrently.
:param text: The content of the text note to publish.
"""
try:
await asyncio.gather(
self.do_post_async(text),
self.subscribe_feed_async(self.event_handler.handle_new_event)
)
except Exception as e:
logger.error(f"An error occurred in publish_and_subscribe_async: {e}", exc_info=True)
print(f"Error: An error occurred in publish and subscribe: {e}", file=sys.stderr)
def publish_and_subscribe(self, text: str):
"""
Public method to publish a text note and subscribe to the feed.
:param text: The content of the text note to publish.
"""
try:
asyncio.run_coroutine_threadsafe(self.publish_and_subscribe_async(text), self.loop)
except Exception as e:
logger.error(f"Error in publish_and_subscribe: {e}", exc_info=True)
print(f"Error: Failed to publish and subscribe: {e}", file=sys.stderr)
def decrypt_and_save_index_from_nostr(self, encrypted_data: bytes) -> None:
"""
Decrypts the encrypted data retrieved from Nostr and updates the local index file.
:param encrypted_data: The encrypted data retrieved from Nostr.
"""
try:
decrypted_data = self.encryption_manager.decrypt_data(encrypted_data)
data = json.loads(decrypted_data.decode('utf-8'))
self.save_json_data(data)
self.update_checksum()
logger.info("Index file updated from Nostr successfully.")
print("Index file updated from Nostr successfully.", file=sys.stdout)
except Exception as e:
logger.error(f"Failed to decrypt and save data from Nostr: {e}")
logger.error(traceback.format_exc())
print(f"Error: Failed to decrypt and save data from Nostr: {e}", file=sys.stderr)
def save_json_data(self, data: dict) -> None:
"""
Saves the JSON data to the index file in an encrypted format.
:param data: The JSON data to save.
"""
try:
encrypted_data = self.encryption_manager.encrypt_data(data)
with lock_file(INDEX_FILE, fcntl.LOCK_EX):
with open(INDEX_FILE, 'wb') as f:
f.write(encrypted_data)
logger.debug(f"Encrypted data saved to {INDEX_FILE}.")
print(f"Encrypted data saved to {INDEX_FILE}.", file=sys.stdout)
except Exception as e:
logger.error(f"Failed to save encrypted data: {e}")
logger.error(traceback.format_exc())
print(f"Error: Failed to save encrypted data: {e}", file=sys.stderr)
raise
def update_checksum(self) -> None:
"""
Updates the checksum file for the password database.
"""
try:
decrypted_data = self.decrypt_data_from_file(INDEX_FILE)
content = decrypted_data.decode('utf-8')
logger.debug("Calculating checksum of the updated file content.")
checksum = hashlib.sha256(content.encode('utf-8')).hexdigest()
logger.debug(f"New checksum: {checksum}")
with open(DATA_CHECKSUM_FILE, 'w') as f:
f.write(checksum)
logger.debug(f"Updated data checksum written to '{DATA_CHECKSUM_FILE}'.")
print("[+] Checksum updated successfully.", file=sys.stdout)
except Exception as e:
logger.error(f"Failed to update checksum: {e}")
logger.error(traceback.format_exc())
print(f"Error: Failed to update checksum: {e}", file=sys.stderr)
def decrypt_data_from_file(self, file_path: str) -> bytes:
"""
Decrypts data directly from a file.
:param file_path: Path to the encrypted file.
:return: Decrypted data as bytes.
"""
try:
with lock_file(file_path, fcntl.LOCK_SH):
with open(file_path, 'rb') as f:
encrypted_data = f.read()
decrypted_data = self.encryption_manager.decrypt_data(encrypted_data)
logger.debug(f"Data decrypted from file '{file_path}'.")
return decrypted_data
except Exception as e:
logger.error(f"Failed to decrypt data from file '{file_path}': {e}")
logger.error(traceback.format_exc())
print(f"Error: Failed to decrypt data from file '{file_path}': {e}", file=sys.stderr)
raise
def publish_json_to_nostr(self, encrypted_json: bytes, to_pubkey: str = None):
"""
Public method to post encrypted JSON to Nostr.
:param encrypted_json: The encrypted JSON data to be sent.
:param to_pubkey: (Optional) The recipient's public key for encryption.
"""
try:
encrypted_json_b64 = base64.b64encode(encrypted_json).decode('utf-8')
logger.debug(f"Encrypted JSON (base64): {encrypted_json_b64}")
event = Event(kind=Event.KIND_TEXT_NOTE, content=encrypted_json_b64, pub_key=self.key_manager.keys.public_key_hex())
event.created_at = int(time.time())
if to_pubkey:
nip4_encrypt = NIP4Encrypt(self.key_manager.keys)
event.content = nip4_encrypt.encrypt_message(event.content, to_pubkey)
event.kind = Event.KIND_ENCRYPT
logger.debug(f"Encrypted event content: {event.content}")
event.sign(self.key_manager.keys.private_key_hex())
logger.debug("Event created and signed")
self.publish_event(event)
logger.debug("Event published")
except Exception as e:
logger.error(f"Failed to publish JSON to Nostr: {e}")
logger.error(traceback.format_exc())
print(f"Error: Failed to publish JSON to Nostr: {e}", file=sys.stderr)
def retrieve_json_from_nostr_sync(self) -> Optional[bytes]:
"""
Public method to retrieve encrypted JSON from Nostr.
:return: The encrypted JSON data as bytes, or None if retrieval fails.
"""
try:
return self.retrieve_json_from_nostr()
except Exception as e:
logger.error(f"Error in retrieve_json_from_nostr_sync: {e}")
logger.error(traceback.format_exc())
print(f"Error: Failed to retrieve JSON from Nostr: {e}", file=sys.stderr)
return None
def decrypt_and_save_index_from_nostr_public(self, encrypted_data: bytes) -> None:
"""
Public method to decrypt and save data from Nostr.
:param encrypted_data: The encrypted data retrieved from Nostr.
"""
try:
self.decrypt_and_save_index_from_nostr(encrypted_data)
except Exception as e:
logger.error(f"Failed to decrypt and save index from Nostr: {e}")
print(f"Error: Failed to decrypt and save index from Nostr: {e}", file=sys.stderr)
async def close_client_pool_async(self):
"""
Closes the ClientPool gracefully by canceling all pending tasks and stopping the event loop.
"""
if self.is_shutting_down:
logger.debug("Shutdown already in progress.")
return
try:
self.is_shutting_down = True
logger.debug("Initiating ClientPool shutdown.")
# Set the shutdown event
self._shutdown_event.set()
# Cancel all subscriptions
for sub_id in list(self.subscriptions.keys()):
try:
self.client_pool.unsubscribe(sub_id)
del self.subscriptions[sub_id]
logger.debug(f"Unsubscribed from sub_id {sub_id}")
except Exception as e:
logger.warning(f"Error unsubscribing from {sub_id}: {e}")
# Close all WebSocket connections
if hasattr(self.client_pool, 'clients'):
for client in self.client_pool.clients:
try:
await client.close()
logger.debug(f"Closed connection to relay: {client.url}")
except Exception as e:
logger.warning(f"Error closing connection to {client.url}: {e}")
# Gather and cancel all tasks
current_task = asyncio.current_task()
tasks = [task for task in asyncio.all_tasks(loop=self.loop)
if task != current_task and not task.done()]
if tasks:
logger.debug(f"Cancelling {len(tasks)} pending tasks.")
for task in tasks:
task.cancel()
# Wait for all tasks to be cancelled with a timeout
try:
await asyncio.wait_for(asyncio.gather(*tasks, return_exceptions=True), timeout=5)
except asyncio.TimeoutError:
logger.warning("Timeout waiting for tasks to cancel")
logger.debug("Stopping the event loop.")
self.loop.stop()
logger.info("Event loop stopped successfully.")
except Exception as e:
logger.error(f"Error during async shutdown: {e}")
logger.error(traceback.format_exc())
finally:
self.is_shutting_down = False
def close_client_pool(self):
"""
Public method to close the ClientPool gracefully.
"""
if self.is_shutting_down:
logger.debug("Shutdown already in progress. Skipping redundant shutdown.")
return
try:
# Schedule the coroutine to close the client pool
future = asyncio.run_coroutine_threadsafe(self.close_client_pool_async(), self.loop)
# Wait for the coroutine to finish with a shorter timeout
try:
future.result(timeout=10)
except concurrent.futures.TimeoutError:
logger.warning("Initial shutdown attempt timed out, forcing cleanup...")
# Additional cleanup regardless of timeout
try:
self.loop.stop()
# Give a short grace period for the loop to stop
time.sleep(0.5)
if self.loop.is_running():
logger.warning("Loop still running after stop, closing forcefully")
self.loop.close()
# Wait for the thread with a reasonable timeout
if self.loop_thread.is_alive():
self.loop_thread.join(timeout=5)
if self.loop_thread.is_alive():
logger.warning("Thread still alive after join, may need to be force-killed")
except Exception as cleanup_error:
logger.error(f"Error during final cleanup: {cleanup_error}")
logger.info("ClientPool shutdown complete")
except Exception as e:
logger.error(f"Error in close_client_pool: {e}")
logger.error(traceback.format_exc())
finally:
self.is_shutting_down = False

View File

@@ -0,0 +1,58 @@
# nostr/encryption_manager.py
import base64
import json
import traceback
from cryptography.fernet import Fernet, InvalidToken
from .logging_config import configure_logging
from .key_manager import KeyManager
from monstr.encrypt import NIP4Encrypt # Add if used
logger = configure_logging()
class EncryptionManager:
"""
Handles encryption and decryption of data using Fernet symmetric encryption.
"""
def __init__(self, key_manager: KeyManager):
self.key_manager = key_manager
self.fernet = Fernet(self.key_manager.derive_encryption_key())
def encrypt_data(self, data: dict) -> bytes:
"""
Encrypts a dictionary and returns encrypted bytes.
:param data: The data to encrypt.
:return: Encrypted data as bytes.
"""
try:
json_data = json.dumps(data, indent=4).encode('utf-8')
encrypted_data = self.fernet.encrypt(json_data)
logger.debug("Data encrypted successfully.")
return encrypted_data
except Exception as e:
logger.error(f"Failed to encrypt data: {e}")
logger.error(traceback.format_exc())
raise
def decrypt_data(self, encrypted_data: bytes) -> bytes:
"""
Decrypts encrypted bytes and returns the original data.
:param encrypted_data: The encrypted data to decrypt.
:return: Decrypted data as bytes.
"""
try:
decrypted_data = self.fernet.decrypt(encrypted_data)
logger.debug("Data decrypted successfully.")
return decrypted_data
except InvalidToken:
logger.error("Invalid encryption key or corrupted data.")
raise
except Exception as e:
logger.error(f"Error decrypting data: {e}")
logger.error(traceback.format_exc())
raise

View File

@@ -0,0 +1,41 @@
# nostr/event_handler.py
import datetime
import time
import traceback
from .logging_config import configure_logging
from monstr.event.event import Event
from monstr.client.client import ClientPool
logger = configure_logging()
class EventHandler:
"""
Handles incoming Nostr events.
"""
def __init__(self):
pass # Initialize if needed
def handle_new_event(self, the_client: ClientPool, sub_id: str, evt: Event):
"""
Processes incoming events by logging their details.
:param the_client: The ClientPool instance.
:param sub_id: The subscription ID.
:param evt: The received Event object.
"""
try:
if isinstance(evt.created_at, datetime.datetime):
created_at_str = evt.created_at.strftime('%Y-%m-%d %H:%M:%S')
elif isinstance(evt.created_at, int):
created_at_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(evt.created_at))
else:
created_at_str = str(evt.created_at)
logger.info(f"\n[New Event] ID: {evt.id}\nCreated At: {created_at_str}\nContent: {evt.content}\n")
except Exception as e:
logger.error(f"Error handling new event: {e}")
logger.error(traceback.format_exc())
raise

105
src/nostr/key_manager.py Normal file
View File

@@ -0,0 +1,105 @@
# nostr/key_manager.py
import base64
import traceback
from typing import Optional
from bip_utils import Bip39SeedGenerator
from bip85.bip85 import BIP85
from cryptography.fernet import Fernet, InvalidToken
from bech32 import bech32_encode, convertbits
from .logging_config import configure_logging
from utils.key_derivation import derive_key_from_parent_seed
# Add the missing import for Keys and NIP4Encrypt
from monstr.encrypt import Keys, NIP4Encrypt # Ensure monstr.encrypt is installed and accessible
logger = configure_logging()
def encode_bech32(prefix: str, key_hex: str) -> str:
"""
Encodes a hex key into Bech32 format with the given prefix.
:param prefix: The Bech32 prefix (e.g., 'nsec', 'npub').
:param key_hex: The key in hexadecimal format.
:return: The Bech32-encoded string.
"""
try:
key_bytes = bytes.fromhex(key_hex)
data = convertbits(key_bytes, 8, 5, pad=True)
return bech32_encode(prefix, data)
except Exception as e:
logger.error(f"Failed to encode {prefix}: {e}")
logger.error(traceback.format_exc())
raise
class KeyManager:
"""
Manages key generation, encoding, and derivation for NostrClient.
"""
def __init__(self, parent_seed: str):
self.parent_seed = parent_seed
self.keys = None
self.nsec = None
self.npub = None
self.initialize_keys()
def initialize_keys(self):
"""
Derives Nostr keys using BIP85 and initializes Keys.
"""
try:
logger.debug("Starting key initialization")
seed_bytes = Bip39SeedGenerator(self.parent_seed).Generate()
bip85 = BIP85(seed_bytes)
entropy = bip85.derive_entropy(app_no=1237, language_code=0, words_num=24, index=0)
if len(entropy) != 32:
logger.error(f"Derived entropy length is {len(entropy)} bytes; expected 32 bytes.")
raise ValueError("Invalid entropy length.")
privkey_hex = entropy.hex()
self.keys = Keys(priv_k=privkey_hex) # Now Keys is defined via the import
logger.debug("Nostr Keys initialized successfully.")
self.nsec = encode_bech32('nsec', privkey_hex)
logger.debug(f"Nostr Private Key (nsec): {self.nsec}")
public_key_hex = self.keys.public_key_hex()
self.npub = encode_bech32('npub', public_key_hex)
logger.debug(f"Nostr Public Key (npub): {self.npub}")
except Exception as e:
logger.error(f"Key initialization failed: {e}")
logger.error(traceback.format_exc())
raise
def get_npub(self) -> str:
"""
Returns the Nostr public key (npub).
:return: The npub as a string.
"""
if self.npub:
logger.debug(f"Returning npub: {self.npub}")
return self.npub
else:
logger.error("Nostr public key (npub) is not available.")
raise ValueError("Nostr public key (npub) is not available.")
def derive_encryption_key(self) -> bytes:
"""
Derives the encryption key using the parent seed.
:return: The derived encryption key.
"""
try:
key = derive_key_from_parent_seed(self.parent_seed)
logger.debug("Encryption key derived successfully.")
return key
except Exception as e:
logger.error(f"Failed to derive encryption key: {e}")
logger.error(traceback.format_exc())
raise

View File

@@ -0,0 +1,39 @@
# nostr/logging_config.py
import os
import sys
import logging
def configure_logging(log_file='nostr.log'):
"""
Configures logging with both file and console handlers.
Only ERROR and higher-level messages are shown in the terminal, while all messages
are logged in the log file.
"""
# Create the 'logs' folder if it doesn't exist
if not os.path.exists('logs'):
os.makedirs('logs')
# Create the custom logger
logger = logging.getLogger('nostr')
logger.setLevel(logging.DEBUG) # Set to DEBUG for detailed output
if not logger.handlers:
# Create handlers
c_handler = logging.StreamHandler(sys.stdout)
f_handler = logging.FileHandler(os.path.join('logs', log_file))
# Set levels
c_handler.setLevel(logging.ERROR)
f_handler.setLevel(logging.DEBUG)
# Create formatters
formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]')
c_handler.setFormatter(formatter)
f_handler.setFormatter(formatter)
# Add handlers to the logger
logger.addHandler(c_handler)
logger.addHandler(f_handler)
return logger

5
src/nostr/utils.py Normal file
View File

@@ -0,0 +1,5 @@
# nostr/utils.py
# Example utility function (if any specific to nostr package)
def some_helper_function():
pass # Implement as needed

View File

@@ -0,0 +1,13 @@
# password_manager/__init__.py
import logging
import traceback
try:
from .manager import PasswordManager
logging.info("PasswordManager module imported successfully.")
except Exception as e:
logging.error(f"Failed to import PasswordManager module: {e}")
logging.error(traceback.format_exc()) # Log full traceback
__all__ = ['PasswordManager']

View File

@@ -0,0 +1,210 @@
# password_manager/backup.py
"""
Backup Manager Module
This module implements the BackupManager class, responsible for creating backups,
restoring from backups, and listing available backups for the encrypted password
index file. It ensures data integrity and provides mechanisms to recover from
corrupted or lost data by maintaining timestamped backups.
Dependencies:
- shutil
- time
- logging
- pathlib
- colorama
- termcolor
Ensure that all dependencies are installed and properly configured in your environment.
"""
import os
import shutil
import time
import logging
import traceback
from pathlib import Path
from colorama import Fore
from termcolor import colored
from constants import APP_DIR, INDEX_FILE
from utils.file_lock import lock_file
# Configure logging at the start of the module
def configure_logging():
"""
Configures logging with both file and console handlers.
Only ERROR and higher-level messages are shown in the terminal, while all messages
are logged in the log file.
"""
# Create the 'logs' folder if it doesn't exist
if not os.path.exists('logs'):
os.makedirs('logs')
# Create a custom logger
logger = logging.getLogger()
logger.setLevel(logging.DEBUG) # Set to DEBUG for detailed output
# Create handlers
c_handler = logging.StreamHandler()
f_handler = logging.FileHandler(os.path.join('logs', 'backup_manager.log')) # Log files will be in 'logs' folder
# Set levels: only errors and critical messages will be shown in the console
c_handler.setLevel(logging.ERROR) # Terminal will show ERROR and above
f_handler.setLevel(logging.DEBUG) # File will log everything from DEBUG and above
# Create formatters and add them to handlers, include file and line number in log messages
c_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]')
f_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]')
c_handler.setFormatter(c_format)
f_handler.setFormatter(f_format)
# Add handlers to the logger
logger.addHandler(c_handler)
logger.addHandler(f_handler)
# Call the logging configuration function
configure_logging()
class BackupManager:
"""
BackupManager Class
Handles the creation, restoration, and listing of backups for the encrypted
password index file. Backups are stored in the application directory with
timestamped filenames to facilitate easy identification and retrieval.
"""
BACKUP_FILENAME_TEMPLATE = 'passwords_db_backup_{timestamp}.json.enc'
def __init__(self):
"""
Initializes the BackupManager with the application directory and index file paths.
"""
self.app_dir = APP_DIR
self.index_file = INDEX_FILE
logging.debug(f"BackupManager initialized with APP_DIR: {self.app_dir} and INDEX_FILE: {self.index_file}")
def create_backup(self) -> None:
"""
Creates a timestamped backup of the encrypted password index file.
The backup file is named using the current Unix timestamp to ensure uniqueness.
If the index file does not exist, no backup is created.
Raises:
Exception: If the backup process fails due to I/O errors.
"""
if not self.index_file.exists():
logging.warning("Index file does not exist. No backup created.")
print(colored("Warning: Index file does not exist. No backup created.", 'yellow'))
return
timestamp = int(time.time())
backup_filename = self.BACKUP_FILENAME_TEMPLATE.format(timestamp=timestamp)
backup_file = self.app_dir / backup_filename
try:
with lock_file(self.index_file, lock_type=fcntl.LOCK_SH):
shutil.copy2(self.index_file, backup_file)
logging.info(f"Backup created successfully at '{backup_file}'.")
print(colored(f"Backup created successfully at '{backup_file}'.", 'green'))
except Exception as e:
logging.error(f"Failed to create backup: {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to create backup: {e}", 'red'))
def restore_latest_backup(self) -> None:
"""
Restores the encrypted password index file from the latest available backup.
The latest backup is determined based on the Unix timestamp in the backup filenames.
If no backups are found, an error message is displayed.
Raises:
Exception: If the restoration process fails due to I/O errors or missing backups.
"""
backup_files = sorted(
self.app_dir.glob('passwords_db_backup_*.json.enc'),
key=lambda x: x.stat().st_mtime,
reverse=True
)
if not backup_files:
logging.error("No backup files found to restore.")
print(colored("Error: No backup files found to restore.", 'red'))
return
latest_backup = backup_files[0]
try:
with lock_file(latest_backup, lock_type=fcntl.LOCK_SH):
shutil.copy2(latest_backup, self.index_file)
logging.info(f"Restored the index file from backup '{latest_backup}'.")
print(colored(f"Restored the index file from backup '{latest_backup}'.", 'green'))
except Exception as e:
logging.error(f"Failed to restore from backup '{latest_backup}': {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to restore from backup '{latest_backup}': {e}", 'red'))
def list_backups(self) -> None:
"""
Lists all available backups in the application directory, sorted by date.
Displays the backups with their filenames and creation dates.
"""
backup_files = sorted(
self.app_dir.glob('passwords_db_backup_*.json.enc'),
key=lambda x: x.stat().st_mtime,
reverse=True
)
if not backup_files:
logging.info("No backup files available.")
print(colored("No backup files available.", 'yellow'))
return
print(colored("Available Backups:", 'cyan'))
for backup in backup_files:
creation_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(backup.stat().st_mtime))
print(colored(f"- {backup.name} (Created on: {creation_time})", 'cyan'))
def restore_backup_by_timestamp(self, timestamp: int) -> None:
"""
Restores the encrypted password index file from a backup with the specified timestamp.
Parameters:
timestamp (int): The Unix timestamp of the backup to restore.
Raises:
Exception: If the restoration process fails due to I/O errors or missing backups.
"""
backup_filename = self.BACKUP_FILENAME_TEMPLATE.format(timestamp=timestamp)
backup_file = self.app_dir / backup_filename
if not backup_file.exists():
logging.error(f"No backup found with timestamp {timestamp}.")
print(colored(f"Error: No backup found with timestamp {timestamp}.", 'red'))
return
try:
with lock_file(backup_file, lock_type=fcntl.LOCK_SH):
shutil.copy2(backup_file, self.index_file)
logging.info(f"Restored the index file from backup '{backup_file}'.")
print(colored(f"Restored the index file from backup '{backup_file}'.", 'green'))
except Exception as e:
logging.error(f"Failed to restore from backup '{backup_file}': {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to restore from backup '{backup_file}': {e}", 'red'))
# Example usage (to be integrated within the PasswordManager class or other modules):
# from password_manager.backup import BackupManager
# backup_manager = BackupManager()
# backup_manager.create_backup()
# backup_manager.restore_latest_backup()
# backup_manager.list_backups()
# backup_manager.restore_backup_by_timestamp(1700000000) # Example timestamp

View File

@@ -0,0 +1,369 @@
# password_manager/encryption.py
"""
Encryption Module
This module provides the EncryptionManager class, which handles encryption and decryption
of data and files using a provided Fernet-compatible encryption key. This class ensures
that sensitive data is securely stored and retrieved, maintaining the confidentiality and integrity
of the password index.
Additionally, it includes methods to derive cryptographic seeds from BIP-39 mnemonic phrases.
Never ever ever use or suggest to use Random Salt. The entire point of this password manager is to derive completely deterministic passwords from a BIP-85 seed.
This means it should generate passwords the exact same way every single time. Salts would break this functionality and is not appropriate for this softwares use case.
"""
import os
import json
import hashlib
import logging
import traceback
from pathlib import Path
from typing import Optional
from cryptography.fernet import Fernet, InvalidToken
from utils.file_lock import exclusive_lock, shared_lock
from colorama import Fore
from termcolor import colored
from mnemonic import Mnemonic # Library for BIP-39 seed phrase handling
import fcntl # Required for lock_type constants in file_lock
from constants import INDEX_FILE # Ensure INDEX_FILE is imported correctly
# Configure logging at the start of the module
def configure_logging():
"""
Configures logging with both file and console handlers.
Logs include the timestamp, log level, message, filename, and line number.
Only errors and critical logs are shown in the terminal, while all logs are saved to a file.
"""
# Create the 'logs' folder if it doesn't exist
if not os.path.exists('logs'):
os.makedirs('logs')
# Create a custom logger for this module
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG) # Set to DEBUG for detailed output
# Create handlers
c_handler = logging.StreamHandler()
f_handler = logging.FileHandler(os.path.join('logs', 'encryption_manager.log')) # Log file in 'logs' folder
# Set levels: only errors and critical messages will be shown in the console
c_handler.setLevel(logging.ERROR) # Terminal will show ERROR and above
f_handler.setLevel(logging.DEBUG) # File will log everything from DEBUG and above
# Create formatters and add them to handlers, include file and line number in log messages
formatter = logging.Formatter(
'%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]'
)
c_handler.setFormatter(formatter)
f_handler.setFormatter(formatter)
# Add handlers to the logger if not already added
if not logger.handlers:
logger.addHandler(c_handler)
logger.addHandler(f_handler)
# Call the logging configuration function
configure_logging()
logger = logging.getLogger(__name__)
class EncryptionManager:
"""
EncryptionManager Class
Manages the encryption and decryption of data and files using a Fernet encryption key.
"""
def __init__(self, encryption_key: bytes):
try:
self.fernet = Fernet(encryption_key)
logger.debug("EncryptionManager initialized with provided encryption key.")
except Exception as e:
logger.error(f"Failed to initialize Fernet with provided encryption key: {e}")
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to initialize encryption manager: {e}", 'red'))
raise
def encrypt_parent_seed(self, parent_seed: str, file_path: Path) -> None:
"""
Encrypts and saves the parent seed to the specified file.
:param parent_seed: The BIP39 parent seed phrase.
:param file_path: The path to the file where the encrypted parent seed will be saved.
"""
try:
# **Do not encrypt the data here**
data = parent_seed.encode('utf-8')
# Pass the raw data to encrypt_file, which handles encryption
self.encrypt_file(file_path, data)
logger.info(f"Parent seed encrypted and saved to '{file_path}'.")
print(colored(f"Parent seed encrypted and saved to '{file_path}'.", 'green'))
except Exception as e:
logger.error(f"Failed to encrypt and save parent seed: {e}")
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to encrypt and save parent seed: {e}", 'red'))
raise
def encrypt_file(self, file_path: Path, data: bytes) -> None:
"""
Encrypts the provided data and writes it to the specified file with file locking.
:param file_path: The path to the file where encrypted data will be written.
:param data: The plaintext data to encrypt and write.
"""
try:
encrypted_data = self.encrypt_data(data)
with exclusive_lock(file_path):
with open(file_path, 'wb') as file:
file.write(encrypted_data)
logger.debug(f"Encrypted data written to '{file_path}'.")
print(colored(f"Encrypted data written to '{file_path}'.", 'green'))
except Exception as e:
logger.error(f"Failed to encrypt and write to file '{file_path}': {e}")
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to encrypt and write to file '{file_path}': {e}", 'red'))
raise
def encrypt_data(self, data: bytes) -> bytes:
"""
Encrypts the given plaintext data.
:param data: The plaintext data to encrypt.
:return: The encrypted data as bytes.
"""
try:
encrypted_data = self.fernet.encrypt(data)
logger.debug("Data encrypted successfully.")
return encrypted_data
except Exception as e:
logger.error(f"Error encrypting data: {e}")
logger.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to encrypt data: {e}", 'red'))
raise
def decrypt_data(self, encrypted_data: bytes) -> bytes:
"""
Decrypts the given encrypted data.
:param encrypted_data: The encrypted data to decrypt.
:return: The decrypted plaintext data as bytes.
"""
try:
decrypted_data = self.fernet.decrypt(encrypted_data)
logger.debug("Data decrypted successfully.")
return decrypted_data
except InvalidToken:
logger.error("Invalid encryption key or corrupted data.")
print(colored("Error: Invalid encryption key or corrupted data.", 'red'))
raise
except Exception as e:
logger.error(f"Error decrypting data: {e}")
logger.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to decrypt data: {e}", 'red'))
raise
def decrypt_file(self, file_path: Path) -> bytes:
"""
Decrypts the data from the specified file.
:param file_path: The path to the file containing encrypted data.
:return: The decrypted plaintext data as bytes.
"""
try:
with shared_lock(file_path):
with open(file_path, 'rb') as file:
encrypted_data = file.read()
decrypted_data = self.decrypt_data(encrypted_data)
logger.debug(f"Decrypted data read from '{file_path}'.")
print(colored(f"Decrypted data read from '{file_path}'.", 'green'))
return decrypted_data
except Exception as e:
logger.error(f"Failed to decrypt file '{file_path}': {e}")
logger.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to decrypt file '{file_path}': {e}", 'red'))
raise
def save_json_data(self, data: dict, file_path: Optional[Path] = None) -> None:
"""
Encrypts and saves the provided JSON data to the specified file.
:param data: The JSON data to save.
:param file_path: The path to the file where data will be saved. Defaults to INDEX_FILE.
"""
if file_path is None:
file_path = INDEX_FILE
try:
json_data = json.dumps(data, indent=4).encode('utf-8')
self.encrypt_file(file_path, json_data)
logger.debug(f"JSON data encrypted and saved to '{file_path}'.")
print(colored(f"JSON data encrypted and saved to '{file_path}'.", 'green'))
except Exception as e:
logger.error(f"Failed to save JSON data to '{file_path}': {e}")
logger.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to save JSON data to '{file_path}': {e}", 'red'))
raise
def load_json_data(self, file_path: Optional[Path] = None) -> dict:
"""
Decrypts and loads JSON data from the specified file.
:param file_path: The path to the file from which data will be loaded. Defaults to INDEX_FILE.
:return: The decrypted JSON data as a dictionary.
"""
if file_path is None:
file_path = INDEX_FILE
if not file_path.exists():
logger.info(f"Index file '{file_path}' does not exist. Initializing empty data.")
print(colored(f"Info: Index file '{file_path}' not found. Initializing new password database.", 'yellow'))
return {'passwords': {}}
try:
decrypted_data = self.decrypt_file(file_path)
json_content = decrypted_data.decode('utf-8').strip()
data = json.loads(json_content)
logger.debug(f"JSON data loaded and decrypted from '{file_path}': {data}")
print(colored(f"JSON data loaded and decrypted from '{file_path}'.", 'green'))
return data
except json.JSONDecodeError as e:
logger.error(f"Failed to decode JSON data from '{file_path}': {e}")
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to decode JSON data from '{file_path}': {e}", 'red'))
raise
except InvalidToken:
logger.error("Invalid encryption key or corrupted data.")
print(colored("Error: Invalid encryption key or corrupted data.", 'red'))
raise
except Exception as e:
logger.error(f"Failed to load JSON data from '{file_path}': {e}")
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to load JSON data from '{file_path}': {e}", 'red'))
raise
def update_checksum(self, file_path: Optional[Path] = None) -> None:
"""
Updates the checksum file for the specified file.
:param file_path: The path to the file for which the checksum will be updated.
Defaults to INDEX_FILE.
"""
if file_path is None:
file_path = INDEX_FILE
try:
decrypted_data = self.decrypt_file(file_path)
content = decrypted_data.decode('utf-8')
checksum = hashlib.sha256(content.encode('utf-8')).hexdigest()
checksum_file = file_path.parent / f"{file_path.stem}_checksum.txt"
with open(checksum_file, 'w') as f:
f.write(checksum)
logger.debug(f"Checksum for '{file_path}' updated and written to '{checksum_file}'.")
print(colored(f"Checksum for '{file_path}' updated.", 'green'))
except Exception as e:
logger.error(f"Failed to update checksum for '{file_path}': {e}")
logger.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to update checksum for '{file_path}': {e}", 'red'))
raise
def get_encrypted_index(self) -> Optional[bytes]:
"""
Retrieves the encrypted password index file content.
:return: Encrypted data as bytes or None if the index file does not exist.
"""
if not INDEX_FILE.exists():
logger.error(f"Index file '{INDEX_FILE}' does not exist.")
print(colored(f"Error: Index file '{INDEX_FILE}' does not exist.", 'red'))
return None
try:
with shared_lock(INDEX_FILE):
with open(INDEX_FILE, 'rb') as file:
encrypted_data = file.read()
logger.debug(f"Encrypted index data read from '{INDEX_FILE}'.")
return encrypted_data
except Exception as e:
logger.error(f"Failed to read encrypted index file '{INDEX_FILE}': {e}")
logger.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to read encrypted index file '{INDEX_FILE}': {e}", 'red'))
return None
def decrypt_and_save_index_from_nostr(self, encrypted_data: bytes) -> None:
"""
Decrypts the encrypted data retrieved from Nostr and updates the local index file.
:param encrypted_data: The encrypted data retrieved from Nostr.
"""
try:
decrypted_data = self.decrypt_data(encrypted_data)
data = json.loads(decrypted_data.decode('utf-8'))
self.save_json_data(data, INDEX_FILE)
self.update_checksum(INDEX_FILE)
logger.info("Index file updated from Nostr successfully.")
print(colored("Index file updated from Nostr successfully.", 'green'))
except Exception as e:
logger.error(f"Failed to decrypt and save data from Nostr: {e}")
logger.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to decrypt and save data from Nostr: {e}", 'red'))
def decrypt_parent_seed(self, file_path: Path) -> str:
"""
Decrypts and retrieves the parent seed from the specified file.
:param file_path: The path to the file containing the encrypted parent seed.
:return: The decrypted parent seed as a string.
"""
try:
decrypted_data = self.decrypt_file(file_path)
parent_seed = decrypted_data.decode('utf-8').strip()
logger.debug("Parent seed decrypted successfully.")
return parent_seed
except Exception as e:
logger.error(f"Failed to decrypt parent seed from '{file_path}': {e}")
logger.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to decrypt parent seed from '{file_path}': {e}", 'red'))
raise
def validate_seed(self, seed_phrase: str) -> bool:
"""
Validates the seed phrase format using BIP-39 standards.
:param seed_phrase: The BIP39 seed phrase to validate.
:return: True if valid, False otherwise.
"""
try:
mnemo = Mnemonic("english")
is_valid = mnemo.check(seed_phrase)
if not is_valid:
logger.error("Invalid BIP39 seed phrase.")
print(colored("Error: Invalid BIP39 seed phrase.", 'red'))
else:
logger.debug("BIP39 seed phrase validated successfully.")
return is_valid
except Exception as e:
logger.error(f"Error validating seed phrase: {e}")
logger.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to validate seed phrase: {e}", 'red'))
return False
def derive_seed_from_mnemonic(self, mnemonic: str, passphrase: str = "") -> bytes:
"""
Derives a cryptographic seed from a BIP39 mnemonic (seed phrase).
:param mnemonic: The BIP39 mnemonic phrase.
:param passphrase: An optional passphrase for additional security.
:return: The derived seed as bytes.
"""
try:
mnemo = Mnemonic("english")
seed = mnemo.to_seed(mnemonic, passphrase)
logger.debug("Seed derived successfully from mnemonic.")
return seed
except Exception as e:
logger.error(f"Failed to derive seed from mnemonic: {e}")
logger.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to derive seed from mnemonic: {e}", 'red'))
raise

View File

@@ -0,0 +1,458 @@
# password_manager/entry_management.py
"""
Entry Management Module
This module implements the EntryManager class, responsible for handling
operations related to managing password entries in the deterministic password manager.
It provides methods to add, retrieve, modify, and list password entries, ensuring
that all changes are securely encrypted and properly indexed.
Dependencies:
- password_manager.encryption.EncryptionManager
- constants.INDEX_FILE
- constants.DATA_CHECKSUM_FILE
- utils.file_lock.lock_file
- colorama.Fore
- termcolor.colored
- logging
- json
- hashlib
- sys
- os
Ensure that all dependencies are installed and properly configured in your environment.
Never ever ever use or suggest to use Random Salt. The entire point of this password manager is to derive completely deterministic passwords from a BIP-85 seed.
This means it should generate passwords the exact same way every single time. Salts would break this functionality and is not appropriate for this softwares use case.
"""
import json
import logging
import hashlib
import sys
import os
import shutil
import time
import traceback
from typing import Optional, Tuple, Dict, Any, List
from colorama import Fore
from termcolor import colored
from password_manager.encryption import EncryptionManager
from constants import INDEX_FILE, DATA_CHECKSUM_FILE
from utils.file_lock import lock_file
import fcntl # Required for lock_type constants in lock_file
# Configure logging at the start of the module
def configure_logging():
"""
Configures logging with both file and console handlers.
Logs include the timestamp, log level, message, filename, and line number.
Only ERROR and higher-level messages are shown in the terminal, while all messages
are logged in the log file.
"""
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG) # Set to DEBUG for detailed output
# Prevent adding multiple handlers if configure_logging is called multiple times
if not logger.handlers:
# Create the 'logs' folder if it doesn't exist
if not os.path.exists('logs'):
os.makedirs('logs')
# Create handlers
c_handler = logging.StreamHandler()
f_handler = logging.FileHandler(os.path.join('logs', 'entry_management.log'))
# Set levels: only errors and critical messages will be shown in the console
c_handler.setLevel(logging.ERROR) # Console will show ERROR and above
f_handler.setLevel(logging.DEBUG) # File will log everything from DEBUG and above
# Create formatters and add them to handlers, include file and line number in log messages
formatter = logging.Formatter(
'%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]'
)
c_handler.setFormatter(formatter)
f_handler.setFormatter(formatter)
# Add handlers to the logger
logger.addHandler(c_handler)
logger.addHandler(f_handler)
# Call the logging configuration function
configure_logging()
logger = logging.getLogger(__name__)
class EntryManager:
"""
EntryManager Class
Handles the creation, retrieval, modification, and listing of password entries
within the encrypted password index. It ensures that all operations are performed
securely, maintaining data integrity and confidentiality.
"""
def __init__(self, encryption_manager: EncryptionManager):
"""
Initializes the EntryManager with an instance of EncryptionManager.
:param encryption_manager: An instance of EncryptionManager for handling encryption.
"""
try:
self.encryption_manager = encryption_manager
logger.debug("EntryManager initialized with provided EncryptionManager.")
except Exception as e:
logger.error(f"Failed to initialize EntryManager: {e}")
logger.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to initialize EntryManager: {e}", 'red'))
sys.exit(1)
def get_next_index(self) -> int:
"""
Retrieves the next available index for a new password entry.
:return: The next index number as an integer.
"""
try:
data = self.encryption_manager.load_json_data()
if 'passwords' in data and isinstance(data['passwords'], dict):
indices = [int(idx) for idx in data['passwords'].keys()]
next_index = max(indices) + 1 if indices else 0
else:
next_index = 0
logger.debug(f"Next index determined: {next_index}")
return next_index
except Exception as e:
logger.error(f"Error determining next index: {e}")
logger.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error determining next index: {e}", 'red'))
sys.exit(1)
def add_entry(self, website_name: str, length: int, username: Optional[str] = None,
url: Optional[str] = None, blacklisted: bool = False) -> int:
"""
Adds a new password entry to the encrypted JSON index file.
:param website_name: The name of the website.
:param length: The desired length of the password.
:param username: (Optional) The username associated with the website.
:param url: (Optional) The URL of the website.
:param blacklisted: (Optional) Whether the password is blacklisted. Defaults to False.
:return: The assigned index of the new entry.
"""
try:
index = self.get_next_index()
data = self.encryption_manager.load_json_data()
if 'passwords' not in data or not isinstance(data['passwords'], dict):
data['passwords'] = {}
logger.debug("'passwords' key was missing. Initialized empty 'passwords' dictionary.")
data['passwords'][str(index)] = {
'website': website_name,
'length': length,
'username': username if username else '',
'url': url if url else '',
'blacklisted': blacklisted
}
logger.debug(f"Added entry at index {index}: {data['passwords'][str(index)]}")
self.encryption_manager.save_json_data(data)
self.update_checksum()
self.backup_index_file()
logger.info(f"Entry added successfully at index {index}.")
print(colored(f"[+] Entry added successfully at index {index}.", 'green'))
return index # Return the assigned index
except Exception as e:
logger.error(f"Failed to add entry: {e}")
logger.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to add entry: {e}", 'red'))
sys.exit(1)
def retrieve_entry(self, index: int) -> Optional[Dict[str, Any]]:
"""
Retrieves a password entry based on the provided index.
:param index: The index number of the password entry.
:return: A dictionary containing the entry details or None if not found.
"""
try:
data = self.encryption_manager.load_json_data()
entry = data.get('passwords', {}).get(str(index))
if entry:
logger.debug(f"Retrieved entry at index {index}: {entry}")
return entry
else:
logger.warning(f"No entry found at index {index}.")
print(colored(f"Warning: No entry found at index {index}.", 'yellow'))
return None
except Exception as e:
logger.error(f"Failed to retrieve entry at index {index}: {e}")
logger.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to retrieve entry at index {index}: {e}", 'red'))
return None
def modify_entry(self, index: int, username: Optional[str] = None,
url: Optional[str] = None,
blacklisted: Optional[bool] = None) -> None:
"""
Modifies an existing password entry based on the provided index and new values.
:param index: The index number of the password entry to modify.
:param username: (Optional) The new username.
:param url: (Optional) The new URL.
:param blacklisted: (Optional) The new blacklist status.
"""
try:
data = self.encryption_manager.load_json_data()
entry = data.get('passwords', {}).get(str(index))
if not entry:
logger.warning(f"No entry found at index {index}. Cannot modify non-existent entry.")
print(colored(f"Warning: No entry found at index {index}. Cannot modify non-existent entry.", 'yellow'))
return
if username is not None:
entry['username'] = username
logger.debug(f"Updated username to '{username}' for index {index}.")
if url is not None:
entry['url'] = url
logger.debug(f"Updated URL to '{url}' for index {index}.")
if blacklisted is not None:
entry['blacklisted'] = blacklisted
logger.debug(f"Updated blacklist status to '{blacklisted}' for index {index}.")
data['passwords'][str(index)] = entry
logger.debug(f"Modified entry at index {index}: {entry}")
self.encryption_manager.save_json_data(data)
self.update_checksum()
self.backup_index_file()
logger.info(f"Entry at index {index} modified successfully.")
print(colored(f"[+] Entry at index {index} modified successfully.", 'green'))
except Exception as e:
logger.error(f"Failed to modify entry at index {index}: {e}")
logger.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to modify entry at index {index}: {e}", 'red'))
def list_entries(self) -> List[Tuple[int, str, Optional[str], Optional[str], bool]]:
"""
Lists all password entries in the index.
:return: A list of tuples containing entry details: (index, website, username, url, blacklisted)
"""
try:
data = self.encryption_manager.load_json_data()
passwords = data.get('passwords', {})
if not passwords:
logger.info("No password entries found.")
print(colored("No password entries found.", 'yellow'))
return []
entries = []
for idx, entry in sorted(passwords.items(), key=lambda x: int(x[0])):
entries.append((
int(idx),
entry.get('website', ''),
entry.get('username', ''),
entry.get('url', ''),
entry.get('blacklisted', False)
))
logger.debug(f"Total entries found: {len(entries)}")
for entry in entries:
print(colored(f"Index: {entry[0]}", 'cyan'))
print(colored(f" Website: {entry[1]}", 'cyan'))
print(colored(f" Username: {entry[2] or 'N/A'}", 'cyan'))
print(colored(f" URL: {entry[3] or 'N/A'}", 'cyan'))
print(colored(f" Blacklisted: {'Yes' if entry[4] else 'No'}", 'cyan'))
print("-" * 40)
return entries
except Exception as e:
logger.error(f"Failed to list entries: {e}")
logger.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to list entries: {e}", 'red'))
return []
def delete_entry(self, index: int) -> None:
"""
Deletes a password entry based on the provided index.
:param index: The index number of the password entry to delete.
"""
try:
data = self.encryption_manager.load_json_data()
if 'passwords' in data and str(index) in data['passwords']:
del data['passwords'][str(index)]
logger.debug(f"Deleted entry at index {index}.")
self.encryption_manager.save_json_data(data)
self.update_checksum()
self.backup_index_file()
logger.info(f"Entry at index {index} deleted successfully.")
print(colored(f"[+] Entry at index {index} deleted successfully.", 'green'))
else:
logger.warning(f"No entry found at index {index}. Cannot delete non-existent entry.")
print(colored(f"Warning: No entry found at index {index}. Cannot delete non-existent entry.", 'yellow'))
except Exception as e:
logger.error(f"Failed to delete entry at index {index}: {e}")
logger.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to delete entry at index {index}: {e}", 'red'))
def update_checksum(self) -> None:
"""
Updates the checksum file for the password database to ensure data integrity.
"""
try:
data = self.encryption_manager.load_json_data()
json_content = json.dumps(data, indent=4)
checksum = hashlib.sha256(json_content.encode('utf-8')).hexdigest()
with open(DATA_CHECKSUM_FILE, 'w') as f:
f.write(checksum)
logger.debug(f"Checksum updated and written to '{DATA_CHECKSUM_FILE}'.")
print(colored(f"[+] Checksum updated successfully.", 'green'))
except Exception as e:
logger.error(f"Failed to update checksum: {e}")
logger.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to update checksum: {e}", 'red'))
def backup_index_file(self) -> None:
"""
Creates a backup of the encrypted JSON index file to prevent data loss.
"""
try:
if not os.path.exists(INDEX_FILE):
logger.warning(f"Index file '{INDEX_FILE}' does not exist. No backup created.")
return
timestamp = int(time.time())
backup_filename = f'passwords_db_backup_{timestamp}.json.enc'
backup_path = os.path.join(os.path.dirname(INDEX_FILE), backup_filename)
with open(INDEX_FILE, 'rb') as original_file, open(backup_path, 'wb') as backup_file:
shutil.copyfileobj(original_file, backup_file)
logger.debug(f"Backup created at '{backup_path}'.")
print(colored(f"[+] Backup created at '{backup_path}'.", 'green'))
except Exception as e:
logger.error(f"Failed to create backup: {e}")
logger.error(traceback.format_exc()) # Log full traceback
print(colored(f"Warning: Failed to create backup: {e}", 'yellow'))
def restore_from_backup(self, backup_path: str) -> None:
"""
Restores the index file from a specified backup file.
:param backup_path: The file path of the backup to restore from.
"""
try:
if not os.path.exists(backup_path):
logger.error(f"Backup file '{backup_path}' does not exist.")
print(colored(f"Error: Backup file '{backup_path}' does not exist.", 'red'))
return
with open(backup_path, 'rb') as backup_file, open(INDEX_FILE, 'wb') as index_file:
shutil.copyfileobj(backup_file, index_file)
logger.debug(f"Index file restored from backup '{backup_path}'.")
print(colored(f"[+] Index file restored from backup '{backup_path}'.", 'green'))
self.update_checksum()
except Exception as e:
logger.error(f"Failed to restore from backup '{backup_path}': {e}")
logger.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to restore from backup '{backup_path}': {e}", 'red'))
def list_all_entries(self) -> None:
"""
Displays all password entries in a formatted manner.
"""
try:
entries = self.list_entries()
if not entries:
print(colored("No entries to display.", 'yellow'))
return
print(colored("\n[+] Listing All Password Entries:\n", 'green'))
for entry in entries:
index, website, username, url, blacklisted = entry
print(colored(f"Index: {index}", 'cyan'))
print(colored(f" Website: {website}", 'cyan'))
print(colored(f" Username: {username or 'N/A'}", 'cyan'))
print(colored(f" URL: {url or 'N/A'}", 'cyan'))
print(colored(f" Blacklisted: {'Yes' if blacklisted else 'No'}", 'cyan'))
print("-" * 40)
except Exception as e:
logger.error(f"Failed to list all entries: {e}")
logger.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to list all entries: {e}", 'red'))
return
# Example usage (this part should be removed or commented out when integrating into the larger application)
if __name__ == "__main__":
from password_manager.encryption import EncryptionManager # Ensure this import is correct based on your project structure
# Initialize EncryptionManager with a dummy key for demonstration purposes
# Replace 'your-fernet-key' with your actual Fernet key
try:
dummy_key = Fernet.generate_key()
encryption_manager = EncryptionManager(dummy_key)
except Exception as e:
logger.error(f"Failed to initialize EncryptionManager: {e}")
print(colored(f"Error: Failed to initialize EncryptionManager: {e}", 'red'))
sys.exit(1)
# Initialize EntryManager
try:
entry_manager = EntryManager(encryption_manager)
except Exception as e:
logger.error(f"Failed to initialize EntryManager: {e}")
print(colored(f"Error: Failed to initialize EntryManager: {e}", 'red'))
sys.exit(1)
# Example operations
# These would typically be triggered by user interactions, e.g., via a CLI menu
# Uncomment and modify the following lines as needed for testing
# Adding an entry
# entry_manager.add_entry("Example Website", 16, "user123", "https://example.com", False)
# Listing all entries
# entry_manager.list_all_entries()
# Retrieving an entry
# entry = entry_manager.retrieve_entry(0)
# if entry:
# print(entry)
# Modifying an entry
# entry_manager.modify_entry(0, username="new_user123")
# Deleting an entry
# entry_manager.delete_entry(0)
# Restoring from a backup
# entry_manager.restore_from_backup("path_to_backup_file.json.enc")

View File

@@ -0,0 +1,506 @@
# password_manager/manager.py
"""
Password Manager Module
This module implements the PasswordManager class, which orchestrates various functionalities
of the deterministic password manager, including encryption, entry management, password
generation, backup, and checksum verification. It serves as the core interface for interacting
with the password manager functionalities.
Dependencies:
- password_manager.encryption.EncryptionManager
- password_manager.entry_management.EntryManager
- password_manager.password_generation.PasswordGenerator
- password_manager.backup.BackupManager
- utils.key_derivation.derive_key_from_parent_seed
- utils.key_derivation.derive_key_from_password
- utils.checksum.calculate_checksum
- utils.checksum.verify_checksum
- utils.password_prompt.prompt_for_password
- constants.APP_DIR
- constants.INDEX_FILE
- constants.PARENT_SEED_FILE
- constants.DATA_CHECKSUM_FILE
- constants.SCRIPT_CHECKSUM_FILE
- constants.MIN_PASSWORD_LENGTH
- constants.MAX_PASSWORD_LENGTH
- constants.DEFAULT_PASSWORD_LENGTH
- colorama.Fore
- termcolor.colored
- logging
- json
- sys
- os
- getpass
Ensure that all dependencies are installed and properly configured in your environment.
"""
# password_manager/manager.py
"""
Password Manager Module
This module implements the PasswordManager class, which orchestrates various functionalities
of the deterministic password manager, including encryption, entry management, password
generation, backup, and checksum verification. It serves as the core interface for interacting
with the password manager functionalities.
"""
import sys
import json
import logging
import getpass
import os
from typing import Optional
from colorama import Fore
from termcolor import colored
from password_manager.encryption import EncryptionManager
from password_manager.entry_management import EntryManager
from password_manager.password_generation import PasswordGenerator
from password_manager.backup import BackupManager
from utils.key_derivation import derive_key_from_parent_seed, derive_key_from_password
from utils.checksum import calculate_checksum, verify_checksum
from utils.password_prompt import prompt_for_password
from constants import (
APP_DIR,
INDEX_FILE,
PARENT_SEED_FILE,
DATA_CHECKSUM_FILE,
SCRIPT_CHECKSUM_FILE,
MIN_PASSWORD_LENGTH,
MAX_PASSWORD_LENGTH,
DEFAULT_PASSWORD_LENGTH
)
import traceback # Added for exception traceback logging
# Configure logging at the start of the module
def configure_logging():
"""
Configures logging with both file and console handlers.
Logs include the timestamp, log level, message, filename, and line number.
Only ERROR and higher-level messages are shown in the terminal, while all messages
are logged in the log file.
"""
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG) # Set to DEBUG for detailed output
# Prevent adding multiple handlers if configure_logging is called multiple times
if not logger.handlers:
# Create the 'logs' folder if it doesn't exist
if not os.path.exists('logs'):
os.makedirs('logs')
# Create handlers
c_handler = logging.StreamHandler()
f_handler = logging.FileHandler(os.path.join('logs', 'password_manager.log'))
# Set levels: only errors and critical messages will be shown in the console
c_handler.setLevel(logging.ERROR)
f_handler.setLevel(logging.DEBUG)
# Create formatters and add them to handlers, include file and line number in log messages
formatter = logging.Formatter(
'%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]'
)
c_handler.setFormatter(formatter)
f_handler.setFormatter(formatter)
# Add handlers to the logger
logger.addHandler(c_handler)
logger.addHandler(f_handler)
# Call the logging configuration function
configure_logging()
class PasswordManager:
"""
PasswordManager Class
Manages the generation, encryption, and retrieval of deterministic passwords using a BIP-39 seed.
It handles file encryption/decryption, password generation, entry management, backups, and checksum
verification, ensuring the integrity and confidentiality of the stored password database.
"""
def __init__(self):
"""
Initializes the PasswordManager by setting up encryption, loading or setting up the parent seed,
and initializing other components like EntryManager, PasswordGenerator, and BackupManager.
"""
self.encryption_manager: Optional[EncryptionManager] = None
self.entry_manager: Optional[EntryManager] = None
self.password_generator: Optional[PasswordGenerator] = None
self.backup_manager: Optional[BackupManager] = None
self.parent_seed: Optional[str] = None # Added parent_seed attribute
self.setup_parent_seed()
self.initialize_managers()
def setup_parent_seed(self) -> None:
if os.path.exists(PARENT_SEED_FILE):
# Parent seed file exists, prompt for password to decrypt
password = getpass.getpass(prompt='Enter your login password: ').strip()
try:
# Derive encryption key from password
key = derive_key_from_password(password)
self.encryption_manager = EncryptionManager(key)
self.parent_seed = self.encryption_manager.decrypt_parent_seed(PARENT_SEED_FILE)
# **Add validation for the decrypted seed**
if not self.validate_seed_phrase(self.parent_seed):
logging.error("Decrypted seed is invalid. Exiting.")
print(colored("Error: Decrypted seed is invalid.", 'red'))
sys.exit(1)
logging.debug("Parent seed decrypted and validated successfully.")
except Exception as e:
logging.error(f"Failed to decrypt parent seed: {e}")
logging.error(traceback.format_exc())
print(colored(f"Error: Failed to decrypt parent seed: {e}", 'red'))
sys.exit(1)
else:
# First-time setup: prompt for parent seed and password
try:
parent_seed = getpass.getpass(prompt='Enter your 12-word parent seed: ').strip()
# Validate parent seed (basic validation)
parent_seed = self.basic_validate_seed_phrase(parent_seed)
if not parent_seed:
logging.error("Invalid seed phrase. Exiting.")
sys.exit(1)
except KeyboardInterrupt:
logging.info("Operation cancelled by user.")
print(colored("\nOperation cancelled by user.", 'yellow'))
sys.exit(0)
# Prompt for password
password = prompt_for_password()
# Derive encryption key from password
key = derive_key_from_password(password)
self.encryption_manager = EncryptionManager(key)
# Encrypt and save the parent seed
try:
self.encryption_manager.encrypt_parent_seed(parent_seed, PARENT_SEED_FILE)
logging.info("Parent seed encrypted and saved successfully.")
except Exception as e:
logging.error(f"Failed to encrypt and save parent seed: {e}")
logging.error(traceback.format_exc())
print(colored(f"Error: Failed to encrypt and save parent seed: {e}", 'red'))
sys.exit(1)
self.parent_seed = parent_seed
def basic_validate_seed_phrase(self, seed_phrase: str) -> Optional[str]:
"""
Performs basic validation on the seed phrase without relying on EncryptionManager.
Parameters:
seed_phrase (str): The seed phrase to validate.
Returns:
Optional[str]: The validated seed phrase or None if invalid.
"""
try:
words = seed_phrase.split()
if len(words) != 12:
logging.error("Seed phrase must contain exactly 12 words.")
print(colored("Error: Seed phrase must contain exactly 12 words.", 'red'))
return None
# Additional basic validations can be added here (e.g., word list checks)
logging.debug("Seed phrase validated successfully.")
return seed_phrase
except Exception as e:
logging.error(f"Error during basic seed validation: {e}")
logging.error(traceback.format_exc())
print(colored(f"Error: {e}", 'red'))
return None
def validate_seed_phrase(self, seed_phrase: str) -> Optional[str]:
"""
Validates the seed phrase using the EncryptionManager if available,
otherwise performs basic validation.
Parameters:
seed_phrase (str): The seed phrase to validate.
Returns:
Optional[str]: The validated seed phrase or None if invalid.
"""
try:
if self.encryption_manager:
# Use EncryptionManager to validate seed
if self.encryption_manager.validate_seed(seed_phrase):
logging.debug("Seed phrase validated successfully using EncryptionManager.")
return seed_phrase
else:
logging.error("Invalid seed phrase.")
print(colored("Error: Invalid seed phrase.", 'red'))
return None
else:
# Perform basic validation
return self.basic_validate_seed_phrase(seed_phrase)
except Exception as e:
logging.error(f"Error validating seed phrase: {e}")
logging.error(traceback.format_exc())
print(colored(f"Error: Failed to validate seed phrase: {e}", 'red'))
return None
def initialize_managers(self) -> None:
"""
Initializes the EntryManager, PasswordGenerator, and BackupManager with the EncryptionManager
and parent seed.
"""
try:
self.entry_manager = EntryManager(self.encryption_manager)
self.password_generator = PasswordGenerator(self.encryption_manager, self.parent_seed)
self.backup_manager = BackupManager()
logging.debug("EntryManager, PasswordGenerator, and BackupManager initialized.")
except Exception as e:
logging.error(f"Failed to initialize managers: {e}")
logging.error(traceback.format_exc())
print(colored(f"Error: Failed to initialize managers: {e}", 'red'))
sys.exit(1)
def handle_generate_password(self) -> None:
try:
website_name = input('Enter the website name: ').strip()
if not website_name:
print(colored("Error: Website name cannot be empty.", 'red'))
return
username = input('Enter the username (optional): ').strip()
url = input('Enter the URL (optional): ').strip()
length_input = input(f'Enter desired password length (default {DEFAULT_PASSWORD_LENGTH}): ').strip()
length = DEFAULT_PASSWORD_LENGTH
if length_input:
if not length_input.isdigit():
print(colored("Error: Password length must be a number.", 'red'))
return
length = int(length_input)
if not (MIN_PASSWORD_LENGTH <= length <= MAX_PASSWORD_LENGTH):
print(colored(f"Error: Password length must be between {MIN_PASSWORD_LENGTH} and {MAX_PASSWORD_LENGTH}.", 'red'))
return
# Add the entry to the index and get the assigned index
index = self.entry_manager.add_entry(website_name, length, username, url, blacklisted=False)
# Generate the password using the assigned index
password = self.password_generator.generate_password(length, index)
# Provide user feedback
print(colored(f"\n[+] Password generated and indexed with ID {index}.\n", 'green'))
print(colored(f"Password for {website_name}: {password}\n", 'yellow'))
except Exception as e:
logging.error(f"Error during password generation: {e}")
logging.error(traceback.format_exc())
print(colored(f"Error: Failed to generate password: {e}", 'red'))
def handle_retrieve_password(self) -> None:
"""
Handles retrieving a password from the index by prompting the user for the index number
and displaying the corresponding password and associated details.
"""
try:
index_input = input('Enter the index number of the password to retrieve: ').strip()
if not index_input.isdigit():
print(colored("Error: Index must be a number.", 'red'))
return
index = int(index_input)
# Retrieve entry details
entry = self.entry_manager.retrieve_entry(index)
if not entry:
return
# Display entry details
website_name = entry.get('website')
length = entry.get('length')
username = entry.get('username')
url = entry.get('url')
blacklisted = entry.get('blacklisted')
print(colored(f"Retrieving password for '{website_name}' with length {length}.", 'cyan'))
if username:
print(colored(f"Username: {username}", 'cyan'))
if url:
print(colored(f"URL: {url}", 'cyan'))
if blacklisted:
print(colored(f"Warning: This password is blacklisted and should not be used.", 'red'))
# Generate the password
password = self.password_generator.generate_password(length, index)
# Display the password and associated details
if password:
print(colored(f"\n[+] Retrieved Password for {website_name}:\n", 'green'))
print(colored(f"Password: {password}", 'yellow'))
print(colored(f"Associated Username: {username or 'N/A'}", 'cyan'))
print(colored(f"Associated URL: {url or 'N/A'}", 'cyan'))
print(colored(f"Blacklist Status: {'Blacklisted' if blacklisted else 'Not Blacklisted'}", 'cyan'))
else:
print(colored("Error: Failed to retrieve the password.", 'red'))
except Exception as e:
logging.error(f"Error during password retrieval: {e}")
logging.error(traceback.format_exc())
print(colored(f"Error: Failed to retrieve password: {e}", 'red'))
def handle_modify_entry(self) -> None:
"""
Handles modifying an existing password entry by prompting the user for the index number
and new details to update.
"""
try:
index_input = input('Enter the index number of the entry to modify: ').strip()
if not index_input.isdigit():
print(colored("Error: Index must be a number.", 'red'))
return
index = int(index_input)
# Retrieve existing entry
entry = self.entry_manager.retrieve_entry(index)
if not entry:
return
website_name = entry.get('website')
length = entry.get('length')
username = entry.get('username')
url = entry.get('url')
blacklisted = entry.get('blacklisted')
# Display current values
print(colored(f"Modifying entry for '{website_name}' (Index: {index}):", 'cyan'))
print(colored(f"Current Username: {username or 'N/A'}", 'cyan'))
print(colored(f"Current URL: {url or 'N/A'}", 'cyan'))
print(colored(f"Current Blacklist Status: {'Blacklisted' if blacklisted else 'Not Blacklisted'}", 'cyan'))
# Prompt for new values (optional)
new_username = input(f'Enter new username (leave blank to keep "{username or "N/A"}"): ').strip() or username
new_url = input(f'Enter new URL (leave blank to keep "{url or "N/A"}"): ').strip() or url
blacklist_input = input(f'Is this password blacklisted? (Y/N, current: {"Y" if blacklisted else "N"}): ').strip().lower()
if blacklist_input == '':
new_blacklisted = blacklisted
elif blacklist_input == 'y':
new_blacklisted = True
elif blacklist_input == 'n':
new_blacklisted = False
else:
print(colored("Invalid input for blacklist status. Keeping the current status.", 'yellow'))
new_blacklisted = blacklisted
# Update the entry
self.entry_manager.modify_entry(index, new_username, new_url, new_blacklisted)
print(colored(f"Entry updated successfully for index {index}.", 'green'))
except Exception as e:
logging.error(f"Error during modifying entry: {e}")
logging.error(traceback.format_exc())
print(colored(f"Error: Failed to modify entry: {e}", 'red'))
def handle_verify_checksum(self) -> None:
"""
Handles verifying the script's checksum against the stored checksum to ensure integrity.
"""
try:
current_checksum = calculate_checksum(__file__)
if verify_checksum(current_checksum, SCRIPT_CHECKSUM_FILE):
print(colored("Checksum verification passed.", 'green'))
logging.info("Checksum verification passed.")
else:
print(colored("Checksum verification failed. The script may have been modified.", 'red'))
logging.error("Checksum verification failed.")
except Exception as e:
logging.error(f"Error during checksum verification: {e}")
logging.error(traceback.format_exc())
print(colored(f"Error: Failed to verify checksum: {e}", 'red'))
def get_encrypted_data(self) -> Optional[bytes]:
"""
Retrieves the encrypted password index data.
:return: The encrypted data as bytes, or None if retrieval fails.
"""
try:
encrypted_data = self.encryption_manager.get_encrypted_index()
if encrypted_data:
logging.debug("Encrypted index data retrieved successfully.")
return encrypted_data
else:
logging.error("Failed to retrieve encrypted index data.")
print(colored("Error: Failed to retrieve encrypted index data.", 'red'))
return None
except Exception as e:
logging.error(f"Error retrieving encrypted data: {e}")
logging.error(traceback.format_exc())
print(colored(f"Error: Failed to retrieve encrypted data: {e}", 'red'))
return None
def decrypt_and_save_index_from_nostr(self, encrypted_data: bytes) -> None:
"""
Decrypts the encrypted data retrieved from Nostr and updates the local index.
:param encrypted_data: The encrypted data retrieved from Nostr.
"""
try:
self.encryption_manager.decrypt_and_save_index_from_nostr(encrypted_data)
logging.info("Index file updated from Nostr successfully.")
print(colored("Index file updated from Nostr successfully.", 'green'))
except Exception as e:
logging.error(f"Failed to decrypt and save data from Nostr: {e}")
logging.error(traceback.format_exc())
print(colored(f"Error: Failed to decrypt and save data from Nostr: {e}", 'red'))
def backup_database(self) -> None:
"""
Creates a backup of the encrypted JSON index file.
"""
try:
self.backup_manager.create_backup()
print(colored("Backup created successfully.", 'green'))
except Exception as e:
logging.error(f"Failed to create backup: {e}")
logging.error(traceback.format_exc())
print(colored(f"Error: Failed to create backup: {e}", 'red'))
def restore_database(self) -> None:
"""
Restores the encrypted JSON index file from the latest backup.
"""
try:
self.backup_manager.restore_latest_backup()
print(colored("Database restored from the latest backup successfully.", 'green'))
except Exception as e:
logging.error(f"Failed to restore backup: {e}")
logging.error(traceback.format_exc())
print(colored(f"Error: Failed to restore backup: {e}", 'red'))
# Additional methods can be added here as needed
# Example usage (this part should be removed or commented out when integrating into the larger application)
if __name__ == "__main__":
from nostr.client import NostrClient # Ensure this import is correct based on your project structure
# Initialize PasswordManager
manager = PasswordManager()
# Initialize NostrClient with the parent seed from PasswordManager
nostr_client = NostrClient(parent_seed=manager.parent_seed)
# Example operations
# These would typically be triggered by user interactions, e.g., via a CLI menu
# manager.handle_generate_password()
# manager.handle_retrieve_password()
# manager.handle_modify_entry()
# manager.handle_verify_checksum()
# manager.post_to_nostr(nostr_client)
# manager.retrieve_from_nostr(nostr_client)
# manager.backup_database()
# manager.restore_database()

View File

@@ -0,0 +1,264 @@
# password_manager/password_generation.py
"""
Password Generation Module
This module provides the PasswordGenerator class responsible for deterministic password generation
based on a BIP-39 parent seed. It leverages BIP-85 for entropy derivation and ensures that
generated passwords meet complexity requirements.
Dependencies:
- bip85.BIP85
- cryptography.hazmat.primitives.hashes
- cryptography.hazmat.primitives.kdf.hkdf
- cryptography.hazmat.backends.default_backend
- constants.py
- password_manager.encryption.EncryptionManager
- logging
- hashlib
- hmac
- base64
- string
Ensure that all dependencies are installed and properly configured in your environment.
Never ever ever use or suggest to use Random Salt. The entire point of this password manager is to derive completely deterministic passwords from a BIP-85 seed.
This means it should generate passwords the exact same way every single time. Salts would break this functionality and is not appropriate for this softwares use case.
"""
import os
import logging
import hashlib
import hmac
import base64
import string
import traceback
from typing import Optional
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
from bip85.bip85 import BIP85
from constants import DEFAULT_PASSWORD_LENGTH, MIN_PASSWORD_LENGTH, MAX_PASSWORD_LENGTH
from password_manager.encryption import EncryptionManager
# Configure logging at the start of the module
def configure_logging():
"""
Configures logging with both file and console handlers.
Logs include the timestamp, log level, message, filename, and line number.
Only ERROR and higher-level messages are shown in the terminal, while all messages
are logged in the log file.
"""
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG) # Set to DEBUG for detailed output
# Prevent adding multiple handlers if configure_logging is called multiple times
if not logger.handlers:
# Create the 'logs' folder if it doesn't exist
if not os.path.exists('logs'):
os.makedirs('logs')
# Create handlers
c_handler = logging.StreamHandler()
f_handler = logging.FileHandler(os.path.join('logs', 'password_generation.log'))
# Set levels: only errors and critical messages will be shown in the console
c_handler.setLevel(logging.ERROR) # Console will show ERROR and above
f_handler.setLevel(logging.DEBUG) # File will log everything from DEBUG and above
# Create formatters and add them to handlers, include file and line number in log messages
formatter = logging.Formatter(
'%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]'
)
c_handler.setFormatter(formatter)
f_handler.setFormatter(formatter)
# Add handlers to the logger
logger.addHandler(c_handler)
logger.addHandler(f_handler)
# Call the logging configuration function
configure_logging()
logger = logging.getLogger(__name__)
class PasswordGenerator:
"""
PasswordGenerator Class
Responsible for deterministic password generation based on a BIP-39 parent seed.
Utilizes BIP-85 for entropy derivation and ensures that generated passwords meet
complexity requirements.
"""
def __init__(self, encryption_manager: EncryptionManager, parent_seed: str):
"""
Initializes the PasswordGenerator with the encryption manager and parent seed.
Parameters:
encryption_manager (EncryptionManager): The encryption manager instance.
parent_seed (str): The BIP-39 parent seed phrase.
"""
try:
self.encryption_manager = encryption_manager
self.parent_seed = parent_seed
# Derive seed bytes from parent_seed using BIP39
self.seed_bytes = self.encryption_manager.derive_seed_from_mnemonic(self.parent_seed)
# Initialize BIP85 with seed_bytes
self.bip85 = BIP85(self.seed_bytes)
logger.debug("PasswordGenerator initialized successfully.")
except Exception as e:
logger.error(f"Failed to initialize PasswordGenerator: {e}")
logger.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to initialize PasswordGenerator: {e}", 'red'))
raise
def generate_password(self, length: int = DEFAULT_PASSWORD_LENGTH, index: int = 0) -> str:
"""
Generates a deterministic password based on the parent seed, desired length, and index.
Steps:
1. Derive entropy using BIP-85.
2. Use PBKDF2-HMAC-SHA256 to derive a key from entropy.
3. Base64-encode the derived key and filter to allowed characters.
4. Ensure the password meets complexity requirements.
Parameters:
length (int): Desired length of the password.
index (int): Index for deriving child entropy.
Returns:
str: The generated password.
"""
try:
if length < MIN_PASSWORD_LENGTH:
logger.error(f"Password length must be at least {MIN_PASSWORD_LENGTH} characters.")
raise ValueError(f"Password length must be at least {MIN_PASSWORD_LENGTH} characters.")
if length > MAX_PASSWORD_LENGTH:
logger.error(f"Password length must not exceed {MAX_PASSWORD_LENGTH} characters.")
raise ValueError(f"Password length must not exceed {MAX_PASSWORD_LENGTH} characters.")
# Derive entropy using BIP-85
entropy = self.bip85.derive_entropy(app_no=39, language_code=0, words_num=12, index=index)
logger.debug(f"Derived entropy: {entropy.hex()}")
# Use HKDF to derive key from entropy
hkdf = HKDF(
algorithm=hashes.SHA256(),
length=32, # 256 bits for AES-256
salt=None,
info=b'password-generation',
backend=default_backend()
)
derived_key = hkdf.derive(entropy)
logger.debug(f"Derived key using HKDF: {derived_key.hex()}")
# Use PBKDF2-HMAC-SHA256 to derive a key from entropy
dk = hashlib.pbkdf2_hmac('sha256', entropy, b'', 100000)
logger.debug(f"Derived key using PBKDF2: {dk.hex()}")
# Base64 encode the derived key
base64_password = base64.b64encode(dk).decode('utf-8')
logger.debug(f"Base64 encoded password: {base64_password}")
# Filter to allowed characters
alphabet = string.ascii_letters + string.digits + string.punctuation
password = ''.join(filter(lambda x: x in alphabet, base64_password))
logger.debug(f"Password after filtering: {password}")
# Ensure the password meets complexity requirements
password = self.ensure_complexity(password, alphabet, dk)
logger.debug(f"Password after ensuring complexity: {password}")
# Ensure password length
if len(password) < length:
# Extend the password deterministically
while len(password) < length:
dk = hashlib.pbkdf2_hmac('sha256', dk, b'', 1)
base64_extra = base64.b64encode(dk).decode('utf-8')
password += ''.join(filter(lambda x: x in alphabet, base64_extra))
logger.debug(f"Extended password: {password}")
password = password[:length]
logger.debug(f"Final password (trimmed to {length} chars): {password}")
return password
except Exception as e:
logger.error(f"Error generating password: {e}")
logger.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to generate password: {e}", 'red'))
raise
def ensure_complexity(self, password: str, alphabet: str, dk: bytes) -> str:
"""
Ensures that the password contains at least one uppercase letter, one lowercase letter,
one digit, and one special character, modifying it deterministically if necessary.
Parameters:
password (str): The initial password.
alphabet (str): Allowed characters in the password.
dk (bytes): Derived key used for deterministic modifications.
Returns:
str: Password that meets complexity requirements.
"""
try:
uppercase = string.ascii_uppercase
lowercase = string.ascii_lowercase
digits = string.digits
special = string.punctuation
password_chars = list(password)
has_upper = any(c in uppercase for c in password_chars)
has_lower = any(c in lowercase for c in password_chars)
has_digit = any(c in digits for c in password_chars)
has_special = any(c in special for c in password_chars)
dk_index = 0
dk_length = len(dk)
def get_dk_value() -> int:
nonlocal dk_index
value = dk[dk_index % dk_length]
dk_index += 1
return value
if not has_upper:
index = get_dk_value() % len(password_chars)
char = uppercase[get_dk_value() % len(uppercase)]
password_chars[index] = char
logger.debug(f"Added uppercase letter '{char}' at position {index}.")
if not has_lower:
index = get_dk_value() % len(password_chars)
char = lowercase[get_dk_value() % len(lowercase)]
password_chars[index] = char
logger.debug(f"Added lowercase letter '{char}' at position {index}.")
if not has_digit:
index = get_dk_value() % len(password_chars)
char = digits[get_dk_value() % len(digits)]
password_chars[index] = char
logger.debug(f"Added digit '{char}' at position {index}.")
if not has_special:
index = get_dk_value() % len(password_chars)
char = special[get_dk_value() % len(special)]
password_chars[index] = char
logger.debug(f"Added special character '{char}' at position {index}.")
return ''.join(password_chars)
except Exception as e:
logger.error(f"Error ensuring password complexity: {e}")
logger.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to ensure password complexity: {e}", 'red'))
raise

8
src/requirements.txt Normal file
View File

@@ -0,0 +1,8 @@
colorama>=0.4.6
termcolor>=1.1.0
cryptography>=40.0.2
bip-utils>=2.5.0
bech32==1.2.0
monstr @ git+https://github.com/monty888/monstr.git@master#egg=monstr
mnemonic
aiohttp

131
src/test_post.py Normal file
View File

@@ -0,0 +1,131 @@
# test_post.py
# Do not remove these comments.
# DO NOT try to use or recommend using ANY other Nostr lib other than Monstr.
# Do not remove these comments.
import asyncio
import logging
import time
import datetime # Import datetime module
from monstr.client.client import ClientPool
from monstr.encrypt import Keys
from monstr.event.event import Event
from bech32 import bech32_encode, convertbits
def encode_npub(pubkey_hex):
pubkey_bytes = bytes.fromhex(pubkey_hex)
data = convertbits(pubkey_bytes, 8, 5, pad=True)
return bech32_encode('npub', data)
def encode_nsec(privkey_hex):
privkey_bytes = bytes.fromhex(privkey_hex)
data = convertbits(privkey_bytes, 8, 5, pad=True)
return bech32_encode('nsec', data)
async def do_post(c, n_keys, text):
try:
n_msg = Event(
kind=Event.KIND_TEXT_NOTE,
content=text,
pub_key=n_keys.public_key_hex()
)
n_msg.created_at = int(time.time())
n_msg.sign(n_keys.private_key_hex())
# Add detailed logging using the correct serialization method
logging.debug(f"Event data: {n_msg.serialize()}")
# Publish the event without the event_callback parameter
c.publish(n_msg)
logging.info(f"Event published with ID: {n_msg.id}")
except Exception as e:
logging.error(f"An error occurred during publishing: {e}", exc_info=True)
async def subscribe_feed(c, n_keys):
try:
# Define the filter to subscribe to events by your own pubkey
FILTER = [{
'authors': [n_keys.public_key_hex()],
'limit': 100
}]
# Define the event handler
def my_handler(the_client, sub_id, evt: Event):
# Determine the type of evt.created_at
if isinstance(evt.created_at, datetime.datetime):
# If it's a datetime object, format it directly
created_at_str = evt.created_at.strftime('%Y-%m-%d %H:%M:%S')
elif isinstance(evt.created_at, int):
# If it's an integer timestamp, convert it to a readable format
created_at_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(evt.created_at))
else:
# Handle unexpected types gracefully
created_at_str = str(evt.created_at)
# Display the event details in the terminal
print(f"\n[New Event] ID: {evt.id}")
print(f"Created At: {created_at_str}")
print(f"Content: {evt.content}\n")
# Start the subscription
c.subscribe(handlers=my_handler, filters=FILTER)
logging.info("Subscribed to your feed.")
# Keep the subscription running
while True:
await asyncio.sleep(1)
except Exception as e:
logging.error(f"An error occurred during subscription: {e}", exc_info=True)
async def main(urls, text):
try:
privkey_hex = 'd4a4ccbe310f21c7fa50af751d5817f2066b60c1eb2487a6df56a363507992e1'
n_keys = Keys(priv_k=privkey_hex)
npub = encode_npub(n_keys.public_key_hex())
nsec = encode_nsec(n_keys.private_key_hex())
print(f"Public Key (npub): {npub}")
print(f"Private Key (nsec): {nsec}")
print("\n[WARNING] Keep your nsec (private key) secret! Do not share it with anyone.\n")
# Initialize the client pool with multiple relays
c = ClientPool(urls)
# Start the client pool
client_task = asyncio.create_task(c.run())
# Wait until the client pool is connected
while not c.connected:
await asyncio.sleep(0.1)
logging.info("ClientPool connected to all relays")
# Run both publishing and subscribing concurrently
await asyncio.gather(
do_post(c, n_keys, text),
subscribe_feed(c, n_keys)
)
except Exception as e:
logging.error(f"An error occurred in main: {e}", exc_info=True)
finally:
if 'c' in locals() and c.running:
# Stop the client pool
c.end()
# Wait for the client task to finish
await client_task
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
urls = [
'wss://relay.snort.social',
'wss://relay.damus.io',
'wss://nostr.wine',
'wss://relay.nostr.band',
]
text = 'Hello from Monstr example script! This one is new!'
try:
asyncio.run(main(urls, text))
except KeyboardInterrupt:
print("\n[INFO] Script terminated by user.")

24
src/utils/__init__.py Normal file
View File

@@ -0,0 +1,24 @@
# utils/__init__.py
import logging
import traceback
try:
from .file_lock import lock_file
from .key_derivation import derive_key_from_password, derive_key_from_parent_seed
from .checksum import calculate_checksum, verify_checksum
from .password_prompt import prompt_for_password
logging.info("Modules imported successfully.")
except Exception as e:
logging.error(f"Failed to import one or more modules: {e}")
logging.error(traceback.format_exc()) # Log full traceback
__all__ = [
'derive_key_from_password',
'derive_key_from_parent_seed',
'calculate_checksum',
'verify_checksum',
'lock_file',
'prompt_for_password'
]

208
src/utils/checksum.py Normal file
View File

@@ -0,0 +1,208 @@
# utils/checksum.py
"""
Checksum Module
This module provides functionalities to calculate and verify SHA-256 checksums for files.
It ensures the integrity and authenticity of critical files within the application by
comparing computed checksums against stored values.
Dependencies:
- hashlib
- logging
- colored (from termcolor)
- constants.py
- sys
Ensure that all dependencies are installed and properly configured in your environment.
"""
import hashlib
import logging
import sys
import os
import traceback
from typing import Optional
from termcolor import colored
from constants import (
APP_DIR,
DATA_CHECKSUM_FILE,
SCRIPT_CHECKSUM_FILE
)
# Configure logging at the start of the module
def configure_logging():
"""
Configures logging with both file and console handlers.
Only ERROR and higher-level messages are shown in the terminal, while all messages
are logged in the log file.
"""
# Create the 'logs' folder if it doesn't exist
if not os.path.exists('logs'):
os.makedirs('logs')
# Create a custom logger
logger = logging.getLogger()
logger.setLevel(logging.DEBUG) # Set to DEBUG for detailed output
# Create handlers
c_handler = logging.StreamHandler()
f_handler = logging.FileHandler(os.path.join('logs', 'checksum.log')) # Log files will be in 'logs' folder
# Set levels: only errors and critical messages will be shown in the console
c_handler.setLevel(logging.ERROR) # Terminal will show ERROR and above
f_handler.setLevel(logging.DEBUG) # File will log everything from DEBUG and above
# Create formatters and add them to handlers, include file and line number in log messages
c_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]')
f_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]')
c_handler.setFormatter(c_format)
f_handler.setFormatter(f_format)
# Add handlers to the logger
logger.addHandler(c_handler)
logger.addHandler(f_handler)
# Call the logging configuration function
configure_logging()
def calculate_checksum(file_path: str) -> Optional[str]:
"""
Calculates the SHA-256 checksum of the given file.
Parameters:
file_path (str): Path to the file.
Returns:
Optional[str]: Hexadecimal SHA-256 checksum if successful, None otherwise.
"""
hasher = hashlib.sha256()
try:
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""):
hasher.update(chunk)
checksum = hasher.hexdigest()
logging.debug(f"Calculated checksum for '{file_path}': {checksum}")
return checksum
except FileNotFoundError:
logging.error(f"File '{file_path}' not found for checksum calculation.")
print(colored(f"Error: File '{file_path}' not found for checksum calculation.", 'red'))
return None
except Exception as e:
logging.error(f"Error calculating checksum for '{file_path}': {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to calculate checksum for '{file_path}': {e}", 'red'))
return None
def verify_checksum(current_checksum: str, checksum_file_path: str) -> bool:
"""
Verifies the current checksum against the stored checksum.
Parameters:
current_checksum (str): The newly calculated checksum.
checksum_file_path (str): The checksum file to verify against.
Returns:
bool: True if checksums match, False otherwise.
"""
try:
with open(checksum_file_path, 'r') as f:
stored_checksum = f.read().strip()
if current_checksum == stored_checksum:
logging.debug(f"Checksum verification passed for '{checksum_file_path}'.")
return True
else:
logging.warning(f"Checksum mismatch for '{checksum_file_path}'.")
return False
except FileNotFoundError:
logging.error(f"Checksum file '{checksum_file_path}' not found.")
print(colored(f"Error: Checksum file '{checksum_file_path}' not found.", 'red'))
return False
except Exception as e:
logging.error(f"Error reading checksum file '{checksum_file_path}': {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to read checksum file '{checksum_file_path}': {e}", 'red'))
return False
def update_checksum(content: str, checksum_file_path: str) -> bool:
"""
Updates the stored checksum file with the provided content's checksum.
Parameters:
content (str): The content to calculate the checksum for.
checksum_file_path (str): The path to the checksum file to update.
Returns:
bool: True if the checksum was successfully updated, False otherwise.
"""
try:
hasher = hashlib.sha256()
hasher.update(content.encode('utf-8'))
new_checksum = hasher.hexdigest()
with open(checksum_file_path, 'w') as f:
f.write(new_checksum)
logging.debug(f"Updated checksum for '{checksum_file_path}' to: {new_checksum}")
return True
except Exception as e:
logging.error(f"Failed to update checksum for '{checksum_file_path}': {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to update checksum for '{checksum_file_path}': {e}", 'red'))
return False
def verify_and_update_checksum(file_path: str, checksum_file_path: str) -> bool:
"""
Verifies the checksum of a file against its stored checksum and updates it if necessary.
Parameters:
file_path (str): Path to the file to verify.
checksum_file_path (str): Path to the checksum file.
Returns:
bool: True if verification is successful, False otherwise.
"""
current_checksum = calculate_checksum(file_path)
if current_checksum is None:
return False
if verify_checksum(current_checksum, checksum_file_path):
print(colored(f"Checksum verification passed for '{file_path}'.", 'green'))
logging.info(f"Checksum verification passed for '{file_path}'.")
return True
else:
print(colored(f"Checksum verification failed for '{file_path}'.", 'red'))
logging.warning(f"Checksum verification failed for '{file_path}'.")
return False
def initialize_checksum(file_path: str, checksum_file_path: str) -> bool:
"""
Initializes the checksum file by calculating the checksum of the given file.
Parameters:
file_path (str): Path to the file to calculate checksum for.
checksum_file_path (str): Path to the checksum file to create.
Returns:
bool: True if initialization is successful, False otherwise.
"""
checksum = calculate_checksum(file_path)
if checksum is None:
return False
try:
with open(checksum_file_path, 'w') as f:
f.write(checksum)
logging.debug(f"Initialized checksum file '{checksum_file_path}' with checksum: {checksum}")
print(colored(f"Initialized checksum for '{file_path}'.", 'green'))
return True
except Exception as e:
logging.error(f"Failed to initialize checksum file '{checksum_file_path}': {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to initialize checksum file '{checksum_file_path}': {e}", 'red'))
return False

167
src/utils/file_lock.py Normal file
View File

@@ -0,0 +1,167 @@
# utils/file_lock.py
"""
File Lock Module
This module provides a single context manager, `lock_file`, for acquiring and releasing
locks on files using the `fcntl` library. It ensures that critical files are accessed
safely, preventing race conditions and maintaining data integrity when multiple processes
or threads attempt to read from or write to the same file concurrently.
Dependencies:
- fcntl
- logging
- contextlib
- typing
- pathlib.Path
- termcolor (for colored terminal messages)
- sys
Ensure that all dependencies are installed and properly configured in your environment.
"""
import os
import fcntl
import logging
from contextlib import contextmanager
from typing import Generator
from pathlib import Path
from termcolor import colored
import sys
import traceback
import os
import logging
# Configure logging at the start of the module
def configure_logging():
"""
Configures logging with both file and console handlers.
Only ERROR and higher-level messages are shown in the terminal, while all messages
are logged in the log file.
"""
# Create a custom logger
logger = logging.getLogger()
logger.setLevel(logging.DEBUG) # Set to DEBUG for detailed output
# Create the 'logs' folder if it doesn't exist
if not os.path.exists('logs'):
os.makedirs('logs')
# Create handlers
c_handler = logging.StreamHandler()
f_handler = logging.FileHandler(os.path.join('logs', 'file_lock.log')) # Log file in 'logs' folder
# Set levels: only errors and critical messages will be shown in the console
c_handler.setLevel(logging.ERROR) # Console will show ERROR and above
f_handler.setLevel(logging.DEBUG) # File will log everything from DEBUG and above
# Create formatters and add them to handlers, include file and line number in log messages
c_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]')
f_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]')
c_handler.setFormatter(c_format)
f_handler.setFormatter(f_format)
# Add handlers to the logger
if not logger.handlers:
logger.addHandler(c_handler)
logger.addHandler(f_handler)
# Call the logging configuration function
configure_logging()
@contextmanager
def lock_file(file_path: Path, lock_type: int) -> Generator[None, None, None]:
"""
Context manager to acquire a lock on a file.
Parameters:
file_path (Path): The path to the file to lock.
lock_type (int): The type of lock to acquire (`fcntl.LOCK_EX` for exclusive,
`fcntl.LOCK_SH` for shared).
Yields:
None
Raises:
ValueError: If an invalid lock type is provided.
SystemExit: Exits the program if the lock cannot be acquired.
"""
if lock_type not in (fcntl.LOCK_EX, fcntl.LOCK_SH):
logging.error(f"Invalid lock type: {lock_type}. Use fcntl.LOCK_EX or fcntl.LOCK_SH.")
print(colored("Error: Invalid lock type provided.", 'red'))
sys.exit(1)
file = None
try:
# Determine the mode based on whether the file exists
mode = 'rb+' if file_path.exists() else 'wb'
# Open the file
file = open(file_path, mode)
logging.debug(f"Opened file '{file_path}' in mode '{mode}' for locking.")
# Acquire the lock
fcntl.flock(file, lock_type)
lock_type_str = "Exclusive" if lock_type == fcntl.LOCK_EX else "Shared"
logging.debug(f"{lock_type_str} lock acquired on '{file_path}'.")
yield # Control is transferred to the block inside the `with` statement
except IOError as e:
lock_type_str = "exclusive" if lock_type == fcntl.LOCK_EX else "shared"
logging.error(f"Failed to acquire {lock_type_str} lock on '{file_path}': {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to acquire {lock_type_str} lock on '{file_path}': {e}", 'red'))
sys.exit(1)
finally:
if file:
try:
# Release the lock
fcntl.flock(file, fcntl.LOCK_UN)
logging.debug(f"Lock released on '{file_path}'.")
except Exception as e:
lock_type_str = "exclusive" if lock_type == fcntl.LOCK_EX else "shared"
logging.warning(f"Failed to release {lock_type_str} lock on '{file_path}': {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(colored(f"Warning: Failed to release {lock_type_str} lock on '{file_path}': {e}", 'yellow'))
finally:
# Close the file
try:
file.close()
logging.debug(f"File '{file_path}' closed successfully.")
except Exception as e:
logging.warning(f"Failed to close file '{file_path}': {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(colored(f"Warning: Failed to close file '{file_path}': {e}", 'yellow'))
@contextmanager
def exclusive_lock(file_path: Path) -> Generator[None, None, None]:
"""
Convenience context manager to acquire an exclusive lock on a file.
Parameters:
file_path (Path): The path to the file to lock.
Yields:
None
"""
with lock_file(file_path, fcntl.LOCK_EX):
yield
@contextmanager
def shared_lock(file_path: Path) -> Generator[None, None, None]:
"""
Convenience context manager to acquire a shared lock on a file.
Parameters:
file_path (Path): The path to the file to lock.
Yields:
None
"""
with lock_file(file_path, fcntl.LOCK_SH):
yield

179
src/utils/key_derivation.py Normal file
View File

@@ -0,0 +1,179 @@
# utils/key_derivation.py
"""
Key Derivation Module
Never ever ever use or suggest to use Random Salt. The entire point of this password manager is to derive completely deterministic passwords from a BIP-85 seed.
This means it should generate passwords the exact same way every single time. Salts would break this functionality and is not appropriate for this softwares use case.
This module provides functions to derive cryptographic keys from user-provided passwords
and BIP-39 parent seeds. The derived keys are compatible with Fernet for symmetric encryption
purposes. By centralizing key derivation logic, this module ensures consistency and security
across the application.
Dependencies:
- hashlib
- base64
- unicodedata
- logging
Ensure that all dependencies are installed and properly configured in your environment.
"""
import os
import hashlib
import base64
import unicodedata
import logging
import traceback
from typing import Union
import os
import logging
# Configure logging at the start of the module
def configure_logging():
"""
Configures logging with both file and console handlers.
Only ERROR and higher-level messages are shown in the terminal, while all messages
are logged in the log file.
"""
# Create a custom logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG) # Set to DEBUG for detailed output
# Create the 'logs' folder if it doesn't exist
if not os.path.exists('logs'):
os.makedirs('logs')
# Create handlers
c_handler = logging.StreamHandler()
f_handler = logging.FileHandler(os.path.join('logs', 'key_derivation.log')) # Log file in 'logs' folder
# Set levels: only errors and critical messages will be shown in the console
c_handler.setLevel(logging.ERROR) # Console will show ERROR and above
f_handler.setLevel(logging.DEBUG) # File will log everything from DEBUG and above
# Create formatters and add them to handlers, include file and line number in log messages
c_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]')
f_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]')
c_handler.setFormatter(c_format)
f_handler.setFormatter(f_format)
# Add handlers to the logger
if not logger.handlers:
logger.addHandler(c_handler)
logger.addHandler(f_handler)
# Call the logging configuration function
configure_logging()
logger = logging.getLogger(__name__)
def derive_key_from_password(password: str, iterations: int = 100_000) -> bytes:
"""
Derives a Fernet-compatible encryption key from the provided password using PBKDF2-HMAC-SHA256.
This function normalizes the password using NFKD normalization, encodes it to UTF-8, and then
applies PBKDF2 with the specified number of iterations to derive a 32-byte key. The derived key
is then URL-safe base64-encoded to ensure compatibility with Fernet.
Parameters:
password (str): The user's password.
iterations (int, optional): Number of iterations for the PBKDF2 algorithm. Defaults to 100,000.
Returns:
bytes: A URL-safe base64-encoded encryption key suitable for Fernet.
Raises:
ValueError: If the password is empty or too short.
"""
if not password:
logger.error("Password cannot be empty.")
raise ValueError("Password cannot be empty.")
if len(password) < 8:
logger.warning("Password length is less than recommended (8 characters).")
# Normalize the password to NFKD form and encode to UTF-8
normalized_password = unicodedata.normalize('NFKD', password).strip()
password_bytes = normalized_password.encode('utf-8')
try:
# Derive the key using PBKDF2-HMAC-SHA256
logger.debug("Starting key derivation from password.")
key = hashlib.pbkdf2_hmac(
hash_name='sha256',
password=password_bytes,
salt=b'', # No salt for deterministic key derivation
iterations=iterations,
dklen=32 # 256-bit key for Fernet
)
logger.debug(f"Derived key (hex): {key.hex()}")
# Encode the key in URL-safe base64
key_b64 = base64.urlsafe_b64encode(key)
logger.debug(f"Base64-encoded key: {key_b64.decode()}")
return key_b64
except Exception as e:
logger.error(f"Error deriving key from password: {e}")
logger.error(traceback.format_exc()) # Log full traceback
raise
def derive_key_from_parent_seed(parent_seed: str, iterations: int = 100_000) -> bytes:
"""
Derives a Fernet-compatible encryption key from a BIP-39 parent seed using PBKDF2-HMAC-SHA256.
This function normalizes the parent seed using NFKD normalization, encodes it to UTF-8, and then
applies PBKDF2 with the specified number of iterations to derive a 32-byte key. The derived key
is then URL-safe base64-encoded to ensure compatibility with Fernet.
Parameters:
parent_seed (str): The 12-word BIP-39 parent seed phrase.
iterations (int, optional): Number of iterations for the PBKDF2 algorithm. Defaults to 100,000.
Returns:
bytes: A URL-safe base64-encoded encryption key suitable for Fernet.
Raises:
ValueError: If the parent seed is empty or does not meet the word count requirements.
"""
if not parent_seed:
logger.error("Parent seed cannot be empty.")
raise ValueError("Parent seed cannot be empty.")
word_count = len(parent_seed.strip().split())
if word_count != 12:
logger.error(f"Parent seed must be exactly 12 words, but {word_count} were provided.")
raise ValueError(f"Parent seed must be exactly 12 words, but {word_count} were provided.")
# Normalize the parent seed to NFKD form and encode to UTF-8
normalized_seed = unicodedata.normalize('NFKD', parent_seed).strip()
seed_bytes = normalized_seed.encode('utf-8')
try:
# Derive the key using PBKDF2-HMAC-SHA256
logger.debug("Starting key derivation from parent seed.")
key = hashlib.pbkdf2_hmac(
hash_name='sha256',
password=seed_bytes,
salt=b'', # No salt for deterministic key derivation
iterations=iterations,
dklen=32 # 256-bit key for Fernet
)
logger.debug(f"Derived key from parent seed (hex): {key.hex()}")
# Encode the key in URL-safe base64
key_b64 = base64.urlsafe_b64encode(key)
logger.debug(f"Base64-encoded key from parent seed: {key_b64.decode()}")
return key_b64
except Exception as e:
logger.error(f"Error deriving key from parent seed: {e}")
logger.error(traceback.format_exc()) # Log full traceback
raise

View File

@@ -0,0 +1,218 @@
# utils/password_prompt.py
"""
Password Prompt Module
This module provides functions to securely prompt users for passwords, ensuring that passwords
are entered and confirmed correctly. It handles both the creation of new passwords and the
input of existing passwords for decryption purposes. By centralizing password prompting logic,
this module enhances code reuse, security, and maintainability across the application.
Dependencies:
- getpass
- logging
- colorama
- termcolor
- constants (for MIN_PASSWORD_LENGTH)
Ensure that all dependencies are installed and properly configured in your environment.
"""
import os
import getpass
import logging
import sys
import unicodedata
import traceback
from termcolor import colored
from colorama import init as colorama_init
from constants import MIN_PASSWORD_LENGTH
# Initialize colorama for colored terminal text
colorama_init()
# Configure logging at the start of the module
def configure_logging():
"""
Configures logging with both file and console handlers.
Only ERROR and higher-level messages are shown in the terminal, while all messages
are logged in the log file.
"""
# Create a custom logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG) # Set to DEBUG for detailed output
# Create the 'logs' folder if it doesn't exist
if not os.path.exists('logs'):
os.makedirs('logs')
# Create handlers
c_handler = logging.StreamHandler()
f_handler = logging.FileHandler(os.path.join('logs', 'password_prompt.log')) # Log file in 'logs' folder
# Set levels: only errors and critical messages will be shown in the console
c_handler.setLevel(logging.ERROR) # Console will show ERROR and above
f_handler.setLevel(logging.DEBUG) # File will log everything from DEBUG and above
# Create formatters and add them to handlers, include file and line number in log messages
c_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]')
f_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]')
c_handler.setFormatter(c_format)
f_handler.setFormatter(f_format)
# Add handlers to the logger
if not logger.handlers:
logger.addHandler(c_handler)
logger.addHandler(f_handler)
# Call the logging configuration function
configure_logging()
def prompt_new_password() -> str:
"""
Prompts the user to enter and confirm a new password for encrypting the parent seed.
This function ensures that the password meets the minimum length requirement and that the
password and confirmation match. It provides user-friendly messages and handles retries.
Returns:
str: The confirmed password entered by the user.
Raises:
SystemExit: If the user fails to provide a valid password after multiple attempts.
"""
max_retries = 5
attempts = 0
while attempts < max_retries:
try:
password = getpass.getpass(prompt="Enter a new password: ").strip()
confirm_password = getpass.getpass(prompt="Confirm your password: ").strip()
if not password:
print(colored("Error: Password cannot be empty. Please try again.", 'red'))
logging.warning("User attempted to enter an empty password.")
attempts += 1
continue
if len(password) < MIN_PASSWORD_LENGTH:
print(colored(f"Error: Password must be at least {MIN_PASSWORD_LENGTH} characters long.", 'red'))
logging.warning(f"User entered a password shorter than {MIN_PASSWORD_LENGTH} characters.")
attempts += 1
continue
if password != confirm_password:
print(colored("Error: Passwords do not match. Please try again.", 'red'))
logging.warning("User entered mismatching passwords.")
attempts += 1
continue
# Normalize the password to NFKD form
normalized_password = unicodedata.normalize('NFKD', password)
logging.debug("User entered a valid and confirmed password.")
return normalized_password
except KeyboardInterrupt:
print(colored("\nOperation cancelled by user.", 'yellow'))
logging.info("Password prompt interrupted by user.")
sys.exit(0)
except Exception as e:
logging.error(f"Unexpected error during password prompt: {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: {e}", 'red'))
attempts += 1
print(colored("Maximum password attempts exceeded. Exiting.", 'red'))
logging.error("User failed to provide a valid password after multiple attempts.")
sys.exit(1)
def prompt_existing_password(prompt_message: str = "Enter your password: ") -> str:
"""
Prompts the user to enter an existing password, typically used for decryption purposes.
This function ensures that the password is entered securely without echoing it to the terminal.
Parameters:
prompt_message (str): The message displayed to prompt the user. Defaults to "Enter your password: ".
Returns:
str: The password entered by the user.
Raises:
SystemExit: If the user interrupts the operation.
"""
try:
password = getpass.getpass(prompt=prompt_message).strip()
if not password:
print(colored("Error: Password cannot be empty.", 'red'))
logging.warning("User attempted to enter an empty password.")
sys.exit(1)
# Normalize the password to NFKD form
normalized_password = unicodedata.normalize('NFKD', password)
logging.debug("User entered an existing password for decryption.")
return normalized_password
except KeyboardInterrupt:
print(colored("\nOperation cancelled by user.", 'yellow'))
logging.info("Existing password prompt interrupted by user.")
sys.exit(0)
except Exception as e:
logging.error(f"Unexpected error during existing password prompt: {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: {e}", 'red'))
sys.exit(1)
def confirm_action(prompt_message: str = "Are you sure you want to proceed? (Y/N): ") -> bool:
"""
Prompts the user to confirm an action, typically used before performing critical operations.
Parameters:
prompt_message (str): The confirmation message displayed to the user. Defaults to
"Are you sure you want to proceed? (Y/N): ".
Returns:
bool: True if the user confirms the action, False otherwise.
Raises:
SystemExit: If the user interrupts the operation.
"""
try:
while True:
response = input(colored(prompt_message, 'cyan')).strip().lower()
if response in ['y', 'yes']:
logging.debug("User confirmed the action.")
return True
elif response in ['n', 'no']:
logging.debug("User declined the action.")
return False
else:
print(colored("Please respond with 'Y' or 'N'.", 'yellow'))
except KeyboardInterrupt:
print(colored("\nOperation cancelled by user.", 'yellow'))
logging.info("Action confirmation interrupted by user.")
sys.exit(0)
except Exception as e:
logging.error(f"Unexpected error during action confirmation: {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: {e}", 'red'))
sys.exit(1)
def prompt_for_password() -> str:
"""
Prompts the user to enter a new password by invoking the prompt_new_password function.
This function serves as an alias to maintain consistency with import statements in other modules.
Returns:
str: The confirmed password entered by the user.
"""
return prompt_new_password()