Format key_manager

This commit is contained in:
thePR0M3TH3AN
2025-06-29 14:51:17 -04:00
parent 3b4ba54cbf
commit 4698384b5d
25 changed files with 606 additions and 322 deletions

View File

@@ -127,15 +127,14 @@ python src/main.py
3. Modify an Existing Entry
4. Backup to Nostr
5. Restore from Nostr
6. Backup/Reveal Parent Seed
7. Switch Seed Profile
8. Add a New Seed Profile
9. Remove an Existing Seed Profile
10. List All Seed Profiles
11. Settings
12. Exit
6. Switch Seed Profile
7. Add a New Seed Profile
8. Remove an Existing Seed Profile
9. List All Seed Profiles
10. Settings
11. Exit
Enter your choice (1-12):
Enter your choice (1-11):
```
### Managing Multiple Seeds
@@ -170,7 +169,7 @@ wss://relay.primal.net
You can manage the relay list or change the PIN through the **Settings** menu:
1. From the main menu, choose option `11` (**Settings**).
1. From the main menu, choose option `10` (**Settings**).
2. Select `1` to view your current relays.
3. Choose `2` to add a new relay URL.
4. Select `3` to remove a relay by number.
@@ -178,7 +177,8 @@ You can manage the relay list or change the PIN through the **Settings** menu:
6. Select `5` to change the settings PIN.
7. Choose `6` to display your Nostr public key.
8. Select `7` to verify the script checksum.
9. Choose `8` to return to the main menu.
9. Choose `8` to back up the parent seed.
10. Select `9` to return to the main menu.
## Running Tests

View File

@@ -126,14 +126,14 @@ Fingerprint 31DD880A523B9759 selected and managers initialized.
3. Modify an Existing Entry
4. Backup to Nostr
5. Restore from Nostr
6. Backup/Reveal Parent Seed
7. Switch Fingerprint
8. Add a New Fingerprint
9. Remove an Existing Fingerprint
10. List All Fingerprints
6. Switch Fingerprint
7. Add a New Fingerprint
8. Remove an Existing Fingerprint
9. List All Fingerprints
10. Settings
11. Exit
Enter your choice (1-13): 1
Enter your choice (1-11): 1
Enter the website name: newsitename
Enter the username (optional):
Enter the URL (optional):

View File

@@ -123,11 +123,11 @@ Fingerprint 31DD880A523B9759 selected and managers initialized.
3. Modify an Existing Entry
4. Backup to Nostr
5. Restore from Nostr
6. Backup/Reveal Parent Seed
7. Switch Fingerprint
8. Add a New Fingerprint
9. Remove an Existing Fingerprint
10. List All Fingerprints
6. Switch Fingerprint
7. Add a New Fingerprint
8. Remove an Existing Fingerprint
9. List All Fingerprints
10. Settings
11. Exit
Enter your choice (1-11): 1

View File

@@ -118,11 +118,11 @@ Fingerprint 31DD880A523B9759 selected and managers initialized.
3. Modify an Existing Entry
4. Backup to Nostr
5. Restore from Nostr
6. Backup/Reveal Parent Seed
7. Switch Fingerprint
8. Add a New Fingerprint
9. Remove an Existing Fingerprint
10. List All Fingerprints
6. Switch Fingerprint
7. Add a New Fingerprint
8. Remove an Existing Fingerprint
9. List All Fingerprints
10. Settings
11. Exit
Enter your choice (1-11): 1

View File

@@ -19,7 +19,7 @@ try:
# -----------------------------------
# Application Directory and Paths
# -----------------------------------
APP_DIR = Path.home() / '.seedpass'
APP_DIR = Path.home() / ".seedpass"
APP_DIR.mkdir(exist_ok=True, parents=True) # Ensure the directory exists
logging.info(f"Application directory created at {APP_DIR}")
except Exception as e:
@@ -27,7 +27,7 @@ except Exception as e:
logging.error(traceback.format_exc()) # Log full traceback
try:
PARENT_SEED_FILE = APP_DIR / 'parent_seed.enc' # Encrypted parent seed
PARENT_SEED_FILE = APP_DIR / "parent_seed.enc" # Encrypted parent seed
logging.info(f"Parent seed file path set to {PARENT_SEED_FILE}")
except Exception as e:
logging.error(f"Error setting file paths: {e}")
@@ -37,7 +37,9 @@ except Exception as e:
# Checksum Files for Integrity
# -----------------------------------
try:
SCRIPT_CHECKSUM_FILE = APP_DIR / 'seedpass_script_checksum.txt' # Checksum for main script
SCRIPT_CHECKSUM_FILE = (
APP_DIR / "seedpass_script_checksum.txt"
) # Checksum for main script
logging.info(f"Checksum file path set: Script {SCRIPT_CHECKSUM_FILE}")
except Exception as e:
logging.error(f"Error setting checksum file paths: {e}")
@@ -54,4 +56,4 @@ MAX_PASSWORD_LENGTH = 128 # Maximum allowed password length
# Additional Constants (if any)
# -----------------------------------
# Add any other constants here as your project expands
DEFAULT_SEED_BACKUP_FILENAME = 'parent_seed_backup.enc'
DEFAULT_SEED_BACKUP_FILENAME = "parent_seed_backup.enc"

View File

@@ -5,10 +5,10 @@ import traceback
try:
from .bip85 import BIP85
logging.info("BIP85 module imported successfully.")
except Exception as e:
logging.error(f"Failed to import BIP85 module: {e}")
logging.error(traceback.format_exc()) # Log full traceback
__all__ = ['BIP85']
__all__ = ["BIP85"]

View File

@@ -21,11 +21,7 @@ import os
import traceback
from colorama import Fore
from bip_utils import (
Bip32Slip10Secp256k1,
Bip39MnemonicGenerator,
Bip39Languages
)
from bip_utils import Bip32Slip10Secp256k1, Bip39MnemonicGenerator, Bip39Languages
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives import hashes
@@ -34,6 +30,7 @@ from cryptography.hazmat.backends import default_backend
# Instantiate the logger
logger = logging.getLogger(__name__)
class BIP85:
def __init__(self, seed_bytes: bytes):
try:
@@ -80,8 +77,12 @@ class BIP85:
entropy = hmac_result[:bytes_len]
if len(entropy) != bytes_len:
logging.error(f"Derived entropy length is {len(entropy)} bytes; expected {bytes_len} bytes.")
print(f"{Fore.RED}Error: Derived entropy length is {len(entropy)} bytes; expected {bytes_len} bytes.")
logging.error(
f"Derived entropy length is {len(entropy)} bytes; expected {bytes_len} bytes."
)
print(
f"{Fore.RED}Error: Derived entropy length is {len(entropy)} bytes; expected {bytes_len} bytes."
)
sys.exit(1)
logging.debug(f"Derived entropy: {entropy.hex()}")
@@ -101,7 +102,9 @@ class BIP85:
entropy = self.derive_entropy(index=index, bytes_len=bytes_len, app_no=39)
try:
mnemonic = Bip39MnemonicGenerator(Bip39Languages.ENGLISH).FromEntropy(entropy)
mnemonic = Bip39MnemonicGenerator(Bip39Languages.ENGLISH).FromEntropy(
entropy
)
logging.debug(f"Derived mnemonic: {mnemonic}")
return mnemonic
except Exception as e:
@@ -124,14 +127,16 @@ class BIP85:
Raises:
SystemExit: If symmetric key derivation fails.
"""
entropy = self.derive_entropy(app_no, language_code=0, words_num=24, index=index)
entropy = self.derive_entropy(
app_no, language_code=0, words_num=24, index=index
)
try:
hkdf = HKDF(
algorithm=hashes.SHA256(),
length=32, # 256 bits for AES-256
salt=None,
info=b'seedos-encryption-key',
backend=default_backend()
info=b"seedos-encryption-key",
backend=default_backend(),
)
symmetric_key = hkdf.derive(entropy)
logging.debug(f"Derived symmetric key: {symmetric_key.hex()}")

View File

@@ -378,7 +378,8 @@ def handle_settings(password_manager: PasswordManager) -> None:
print("5. Change password")
print("6. Display Nostr Public Key")
print("7. Verify Script Checksum")
print("8. Back")
print("8. Backup Parent Seed")
print("9. Back")
choice = input("Select an option: ").strip()
if choice == "1":
handle_view_relays(cfg_mgr)
@@ -395,6 +396,8 @@ def handle_settings(password_manager: PasswordManager) -> None:
elif choice == "7":
password_manager.handle_verify_checksum()
elif choice == "8":
password_manager.handle_backup_reveal_parent_seed()
elif choice == "9":
break
else:
print(colored("Invalid choice.", "red"))
@@ -411,24 +414,23 @@ def display_menu(password_manager: PasswordManager):
3. Modify an Existing Entry
4. Backup to Nostr
5. Restore from Nostr
6. Backup/Reveal Parent Seed
7. Switch Seed Profile
8. Add a New Seed Profile
9. Remove an Existing Seed Profile
10. List All Seed Profiles
11. Settings
12. Exit
6. Switch Seed Profile
7. Add a New Seed Profile
8. Remove an Existing Seed Profile
9. List All Seed Profiles
10. Settings
11. Exit
"""
while True:
# Flush logging handlers
for handler in logging.getLogger().handlers:
handler.flush()
print(colored(menu, "cyan"))
choice = input("Enter your choice (1-12): ").strip()
choice = input("Enter your choice (1-11): ").strip()
if not choice:
print(
colored(
"No input detected. Please enter a number between 1 and 12.",
"No input detected. Please enter a number between 1 and 11.",
"yellow",
)
)
@@ -444,19 +446,17 @@ def display_menu(password_manager: PasswordManager):
elif choice == "5":
handle_retrieve_from_nostr(password_manager)
elif choice == "6":
password_manager.handle_backup_reveal_parent_seed()
elif choice == "7":
if not password_manager.handle_switch_fingerprint():
print(colored("Failed to switch seed profile.", "red"))
elif choice == "8":
elif choice == "7":
handle_add_new_fingerprint(password_manager)
elif choice == "9":
elif choice == "8":
handle_remove_fingerprint(password_manager)
elif choice == "10":
elif choice == "9":
handle_list_fingerprints(password_manager)
elif choice == "11":
elif choice == "10":
handle_settings(password_manager)
elif choice == "12":
elif choice == "11":
logging.info("Exiting the program.")
print(colored("Exiting the program.", "green"))
password_manager.nostr_client.close_client_pool()

View File

@@ -12,9 +12,10 @@ logger = logging.getLogger(__name__) # Correct logger initialization
try:
from .client import NostrClient
logger.info("NostrClient module imported successfully.")
except Exception as e:
logger.error(f"Failed to import NostrClient module: {e}")
logger.error(traceback.format_exc()) # Log full traceback
__all__ = ['NostrClient']
__all__ = ["NostrClient"]

View File

@@ -34,13 +34,14 @@ logger.setLevel(logging.WARNING)
DEFAULT_RELAYS = [
"wss://relay.snort.social",
"wss://nostr.oxtr.dev",
"wss://relay.primal.net"
"wss://relay.primal.net",
]
# nostr/client.py
# src/nostr/client.py
class NostrClient:
"""
NostrClient Class
@@ -49,7 +50,12 @@ class NostrClient:
Utilizes deterministic key derivation via BIP-85 and integrates with the monstr library for protocol operations.
"""
def __init__(self, encryption_manager: EncryptionManager, fingerprint: str, relays: Optional[List[str]] = None):
def __init__(
self,
encryption_manager: EncryptionManager,
fingerprint: str,
relays: Optional[List[str]] = None,
):
"""
Initializes the NostrClient with an EncryptionManager, connects to specified relays,
and sets up the KeyManager with the given fingerprint.
@@ -62,12 +68,13 @@ class NostrClient:
# Assign the encryption manager and fingerprint
self.encryption_manager = encryption_manager
self.fingerprint = fingerprint # Track the fingerprint
self.fingerprint_dir = self.encryption_manager.fingerprint_dir # If needed to manage directories
self.fingerprint_dir = (
self.encryption_manager.fingerprint_dir
) # If needed to manage directories
# Initialize KeyManager with the decrypted parent seed and the provided fingerprint
self.key_manager = KeyManager(
self.encryption_manager.decrypt_parent_seed(),
self.fingerprint
self.encryption_manager.decrypt_parent_seed(), self.fingerprint
)
# Initialize event handler and client pool
@@ -126,7 +133,10 @@ class NostrClient:
except Exception as e:
logger.error(f"Error running event loop in thread: {e}")
logger.error(traceback.format_exc())
print(f"Error: Event loop in ClientPool thread encountered an issue: {e}", file=sys.stderr)
print(
f"Error: Event loop in ClientPool thread encountered an issue: {e}",
file=sys.stderr,
)
finally:
if not self.loop.is_closed():
logger.debug("Closing the event loop.")
@@ -166,14 +176,18 @@ class NostrClient:
"""
try:
logger.debug(f"Submitting publish_event_async for event ID: {event.id}")
future = asyncio.run_coroutine_threadsafe(self.publish_event_async(event), self.loop)
future = asyncio.run_coroutine_threadsafe(
self.publish_event_async(event), self.loop
)
# Wait for the future to complete
future.result(timeout=5) # Adjust the timeout as needed
except Exception as e:
logger.error(f"Error in publish_event: {e}")
print(f"Error: Failed to publish event: {e}", file=sys.stderr)
async def subscribe_async(self, filters: List[dict], handler: Callable[[ClientPool, str, Event], None]):
async def subscribe_async(
self, filters: List[dict], handler: Callable[[ClientPool, str, Event], None]
):
"""
Subscribes to events based on the provided filters using ClientPool.
@@ -190,7 +204,9 @@ class NostrClient:
logger.error(traceback.format_exc())
print(f"Error: Failed to subscribe: {e}", file=sys.stderr)
def subscribe(self, filters: List[dict], handler: Callable[[ClientPool, str, Event], None]):
def subscribe(
self, filters: List[dict], handler: Callable[[ClientPool, str, Event], None]
):
"""
Synchronous wrapper for subscribing to events.
@@ -198,7 +214,9 @@ class NostrClient:
:param handler: A callback function to handle incoming events.
"""
try:
asyncio.run_coroutine_threadsafe(self.subscribe_async(filters, handler), self.loop)
asyncio.run_coroutine_threadsafe(
self.subscribe_async(filters, handler), self.loop
)
except Exception as e:
logger.error(f"Error in subscribe: {e}")
print(f"Error: Failed to subscribe: {e}", file=sys.stderr)
@@ -210,11 +228,13 @@ class NostrClient:
:return: The encrypted JSON data as a Base64-encoded string, or None if retrieval fails.
"""
try:
filters = [{
'authors': [self.key_manager.keys.public_key_hex()],
'kinds': [Event.KIND_TEXT_NOTE, Event.KIND_ENCRYPT],
'limit': 1
}]
filters = [
{
"authors": [self.key_manager.keys.public_key_hex()],
"kinds": [Event.KIND_TEXT_NOTE, Event.KIND_ENCRYPT],
"limit": 1,
}
]
events = []
@@ -238,7 +258,9 @@ class NostrClient:
if event.kind == Event.KIND_ENCRYPT:
nip4_encrypt = NIP4Encrypt(self.key_manager.keys)
content_base64 = nip4_encrypt.decrypt_message(event.content, event.pub_key)
content_base64 = nip4_encrypt.decrypt_message(
event.content, event.pub_key
)
# Return the Base64-encoded content as a string
logger.debug("Encrypted JSON data retrieved successfully.")
@@ -261,16 +283,21 @@ class NostrClient:
:return: The encrypted JSON data as bytes, or None if retrieval fails.
"""
try:
future = asyncio.run_coroutine_threadsafe(self.retrieve_json_from_nostr_async(), self.loop)
future = asyncio.run_coroutine_threadsafe(
self.retrieve_json_from_nostr_async(), self.loop
)
return future.result(timeout=10)
except concurrent.futures.TimeoutError:
logger.error("Timeout occurred while retrieving JSON from Nostr.")
print("Error: Timeout occurred while retrieving JSON from Nostr.", file=sys.stderr)
print(
"Error: Timeout occurred while retrieving JSON from Nostr.",
file=sys.stderr,
)
return None
except Exception as e:
logger.error(f"Error in retrieve_json_from_nostr: {e}")
logger.error(traceback.format_exc())
print(f"Error: Failed to retrieve JSON from Nostr: {e}", 'red')
print(f"Error: Failed to retrieve JSON from Nostr: {e}", "red")
return None
async def do_post_async(self, text: str):
@@ -283,7 +310,7 @@ class NostrClient:
event = Event(
kind=Event.KIND_TEXT_NOTE,
content=text,
pub_key=self.key_manager.keys.public_key_hex()
pub_key=self.key_manager.keys.public_key_hex(),
)
event.created_at = int(time.time())
event.sign(self.key_manager.keys.private_key_hex())
@@ -296,18 +323,22 @@ class NostrClient:
logger.error(f"An error occurred during publishing: {e}", exc_info=True)
print(f"Error: An error occurred during publishing: {e}", file=sys.stderr)
async def subscribe_feed_async(self, handler: Callable[[ClientPool, str, Event], None]):
async def subscribe_feed_async(
self, handler: Callable[[ClientPool, str, Event], None]
):
"""
Subscribes to the feed of the client's own pubkey.
:param handler: A callback function to handle incoming events.
"""
try:
filters = [{
'authors': [self.key_manager.keys.public_key_hex()],
'kinds': [Event.KIND_TEXT_NOTE, Event.KIND_ENCRYPT],
'limit': 100
}]
filters = [
{
"authors": [self.key_manager.keys.public_key_hex()],
"kinds": [Event.KIND_TEXT_NOTE, Event.KIND_ENCRYPT],
"limit": 100,
}
]
await self.subscribe_async(filters=filters, handler=handler)
logger.info("Subscribed to your feed.")
@@ -327,11 +358,16 @@ class NostrClient:
try:
await asyncio.gather(
self.do_post_async(text),
self.subscribe_feed_async(self.event_handler.handle_new_event)
self.subscribe_feed_async(self.event_handler.handle_new_event),
)
except Exception as e:
logger.error(f"An error occurred in publish_and_subscribe_async: {e}", exc_info=True)
print(f"Error: An error occurred in publish and subscribe: {e}", file=sys.stderr)
logger.error(
f"An error occurred in publish_and_subscribe_async: {e}", exc_info=True
)
print(
f"Error: An error occurred in publish and subscribe: {e}",
file=sys.stderr,
)
def publish_and_subscribe(self, text: str):
"""
@@ -340,7 +376,9 @@ class NostrClient:
:param text: The content of the text note to publish.
"""
try:
asyncio.run_coroutine_threadsafe(self.publish_and_subscribe_async(text), self.loop)
asyncio.run_coroutine_threadsafe(
self.publish_and_subscribe_async(text), self.loop
)
except Exception as e:
logger.error(f"Error in publish_and_subscribe: {e}", exc_info=True)
print(f"Error: Failed to publish and subscribe: {e}", file=sys.stderr)
@@ -353,15 +391,19 @@ class NostrClient:
"""
try:
decrypted_data = self.encryption_manager.decrypt_data(encrypted_data)
data = json.loads(decrypted_data.decode('utf-8'))
data = json.loads(decrypted_data.decode("utf-8"))
self.save_json_data(data)
self.update_checksum()
logger.info("Index file updated from Nostr successfully.")
print(colored("Index file updated from Nostr successfully.", 'green'))
print(colored("Index file updated from Nostr successfully.", "green"))
except Exception as e:
logger.error(f"Failed to decrypt and save data from Nostr: {e}")
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to decrypt and save data from Nostr: {e}", 'red'))
print(
colored(
f"Error: Failed to decrypt and save data from Nostr: {e}", "red"
)
)
def save_json_data(self, data: dict) -> None:
"""
@@ -370,17 +412,19 @@ class NostrClient:
:param data: The JSON data to save.
"""
try:
encrypted_data = self.encryption_manager.encrypt_data(json.dumps(data).encode('utf-8'))
index_file_path = self.fingerprint_dir / 'seedpass_passwords_db.json.enc'
encrypted_data = self.encryption_manager.encrypt_data(
json.dumps(data).encode("utf-8")
)
index_file_path = self.fingerprint_dir / "seedpass_passwords_db.json.enc"
with lock_file(index_file_path, fcntl.LOCK_EX):
with open(index_file_path, 'wb') as f:
with open(index_file_path, "wb") as f:
f.write(encrypted_data)
logger.debug(f"Encrypted data saved to {index_file_path}.")
print(colored(f"Encrypted data saved to '{index_file_path}'.", 'green'))
print(colored(f"Encrypted data saved to '{index_file_path}'.", "green"))
except Exception as e:
logger.error(f"Failed to save encrypted data: {e}")
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to save encrypted data: {e}", 'red'))
print(colored(f"Error: Failed to save encrypted data: {e}", "red"))
raise
def update_checksum(self) -> None:
@@ -388,28 +432,30 @@ class NostrClient:
Updates the checksum file for the password database.
"""
try:
index_file_path = self.fingerprint_dir / 'seedpass_passwords_db.json.enc'
index_file_path = self.fingerprint_dir / "seedpass_passwords_db.json.enc"
decrypted_data = self.decrypt_data_from_file(index_file_path)
content = decrypted_data.decode('utf-8')
content = decrypted_data.decode("utf-8")
logger.debug("Calculating checksum of the updated file content.")
checksum = hashlib.sha256(content.encode('utf-8')).hexdigest()
checksum = hashlib.sha256(content.encode("utf-8")).hexdigest()
logger.debug(f"New checksum: {checksum}")
checksum_file = self.fingerprint_dir / 'seedpass_passwords_db_checksum.txt'
checksum_file = self.fingerprint_dir / "seedpass_passwords_db_checksum.txt"
with lock_file(checksum_file, fcntl.LOCK_EX):
with open(checksum_file, 'w') as f:
with open(checksum_file, "w") as f:
f.write(checksum)
os.chmod(checksum_file, 0o600)
logger.debug(f"Checksum for '{index_file_path}' updated and written to '{checksum_file}'.")
print(colored(f"Checksum for '{index_file_path}' updated.", 'green'))
logger.debug(
f"Checksum for '{index_file_path}' updated and written to '{checksum_file}'."
)
print(colored(f"Checksum for '{index_file_path}' updated.", "green"))
except Exception as e:
logger.error(f"Failed to update checksum: {e}")
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to update checksum: {e}", 'red'))
print(colored(f"Error: Failed to update checksum: {e}", "red"))
def decrypt_data_from_file(self, file_path: Path) -> bytes:
"""
@@ -420,7 +466,7 @@ class NostrClient:
"""
try:
with lock_file(file_path, fcntl.LOCK_SH):
with open(file_path, 'rb') as f:
with open(file_path, "rb") as f:
encrypted_data = f.read()
decrypted_data = self.encryption_manager.decrypt_data(encrypted_data)
logger.debug(f"Data decrypted from file '{file_path}'.")
@@ -428,7 +474,11 @@ class NostrClient:
except Exception as e:
logger.error(f"Failed to decrypt data from file '{file_path}': {e}")
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to decrypt data from file '{file_path}': {e}", 'red'))
print(
colored(
f"Error: Failed to decrypt data from file '{file_path}': {e}", "red"
)
)
raise
def publish_json_to_nostr(self, encrypted_json: bytes, to_pubkey: str = None):
@@ -439,10 +489,14 @@ class NostrClient:
:param to_pubkey: (Optional) The recipient's public key for encryption.
"""
try:
encrypted_json_b64 = base64.b64encode(encrypted_json).decode('utf-8')
encrypted_json_b64 = base64.b64encode(encrypted_json).decode("utf-8")
logger.debug(f"Encrypted JSON (base64): {encrypted_json_b64}")
event = Event(kind=Event.KIND_TEXT_NOTE, content=encrypted_json_b64, pub_key=self.key_manager.keys.public_key_hex())
event = Event(
kind=Event.KIND_TEXT_NOTE,
content=encrypted_json_b64,
pub_key=self.key_manager.keys.public_key_hex(),
)
event.created_at = int(time.time())
@@ -471,7 +525,9 @@ class NostrClient:
Optional[bytes]: The encrypted data as bytes if successful, None otherwise.
"""
try:
future = asyncio.run_coroutine_threadsafe(self.retrieve_json_from_nostr_async(), self.loop)
future = asyncio.run_coroutine_threadsafe(
self.retrieve_json_from_nostr_async(), self.loop
)
content_base64 = future.result(timeout=10)
if not content_base64:
@@ -479,17 +535,22 @@ class NostrClient:
return None
# Base64-decode the content
encrypted_data = base64.urlsafe_b64decode(content_base64.encode('utf-8'))
logger.debug("Encrypted data retrieved and Base64-decoded successfully from Nostr.")
encrypted_data = base64.urlsafe_b64decode(content_base64.encode("utf-8"))
logger.debug(
"Encrypted data retrieved and Base64-decoded successfully from Nostr."
)
return encrypted_data
except concurrent.futures.TimeoutError:
logger.error("Timeout occurred while retrieving JSON from Nostr.")
print("Error: Timeout occurred while retrieving JSON from Nostr.", file=sys.stderr)
print(
"Error: Timeout occurred while retrieving JSON from Nostr.",
file=sys.stderr,
)
return None
except Exception as e:
logger.error(f"Error in retrieve_json_from_nostr: {e}")
logger.error(traceback.format_exc())
print(f"Error: Failed to retrieve JSON from Nostr: {e}", 'red')
print(f"Error: Failed to retrieve JSON from Nostr: {e}", "red")
return None
def decrypt_and_save_index_from_nostr_public(self, encrypted_data: bytes) -> None:
@@ -502,7 +563,7 @@ class NostrClient:
self.decrypt_and_save_index_from_nostr(encrypted_data)
except Exception as e:
logger.error(f"Failed to decrypt and save index from Nostr: {e}")
print(f"Error: Failed to decrypt and save index from Nostr: {e}", 'red')
print(f"Error: Failed to decrypt and save index from Nostr: {e}", "red")
async def close_client_pool_async(self):
"""
@@ -529,14 +590,20 @@ class NostrClient:
logger.warning(f"Error unsubscribing from {sub_id}: {e}")
# Close all WebSocket connections
if hasattr(self.client_pool, 'clients'):
tasks = [self.safe_close_connection(client) for client in self.client_pool.clients]
if hasattr(self.client_pool, "clients"):
tasks = [
self.safe_close_connection(client)
for client in self.client_pool.clients
]
await asyncio.gather(*tasks, return_exceptions=True)
# Gather and cancel all tasks
current_task = asyncio.current_task()
tasks = [task for task in asyncio.all_tasks(loop=self.loop)
if task != current_task and not task.done()]
tasks = [
task
for task in asyncio.all_tasks(loop=self.loop)
if task != current_task and not task.done()
]
if tasks:
logger.debug(f"Cancelling {len(tasks)} pending tasks.")
@@ -545,7 +612,9 @@ class NostrClient:
# Wait for all tasks to be cancelled with a timeout
try:
await asyncio.wait_for(asyncio.gather(*tasks, return_exceptions=True), timeout=5)
await asyncio.wait_for(
asyncio.gather(*tasks, return_exceptions=True), timeout=5
)
except asyncio.TimeoutError:
logger.warning("Timeout waiting for tasks to cancel")
@@ -569,7 +638,9 @@ class NostrClient:
try:
# Schedule the coroutine to close the client pool
future = asyncio.run_coroutine_threadsafe(self.close_client_pool_async(), self.loop)
future = asyncio.run_coroutine_threadsafe(
self.close_client_pool_async(), self.loop
)
# Wait for the coroutine to finish with a timeout
try:
@@ -592,7 +663,9 @@ class NostrClient:
self.loop_thread.join(timeout=5)
if self.loop_thread.is_alive():
logger.warning("Thread still alive after join, may need to be force-killed")
logger.warning(
"Thread still alive after join, may need to be force-killed"
)
except Exception as cleanup_error:
logger.error(f"Error during final cleanup: {cleanup_error}")
@@ -610,6 +683,8 @@ class NostrClient:
await client.close_connection()
logger.debug(f"Closed connection to relay: {client.url}")
except AttributeError:
logger.warning(f"Client object has no attribute 'close_connection'. Skipping closure for {client.url}.")
logger.warning(
f"Client object has no attribute 'close_connection'. Skipping closure for {client.url}."
)
except Exception as e:
logger.warning(f"Error closing connection to {client.url}: {e}")

View File

@@ -10,6 +10,7 @@ from .key_manager import KeyManager
# Instantiate the logger
logger = logging.getLogger(__name__)
class EncryptionManager:
"""
Manages encryption and decryption using Fernet symmetric encryption.
@@ -28,7 +29,9 @@ class EncryptionManager:
# Ensure the raw key is exactly 32 bytes
if len(raw_key) != 32:
raise ValueError(f"Derived key length is {len(raw_key)} bytes; expected 32 bytes.")
raise ValueError(
f"Derived key length is {len(raw_key)} bytes; expected 32 bytes."
)
# Base64-encode the raw key to make it URL-safe
b64_key = base64.urlsafe_b64encode(raw_key)
@@ -51,8 +54,8 @@ class EncryptionManager:
:param file_path: The file path to save the encrypted seed.
"""
try:
encrypted_seed = self.fernet.encrypt(seed.encode('utf-8'))
with open(file_path, 'wb') as f:
encrypted_seed = self.fernet.encrypt(seed.encode("utf-8"))
with open(file_path, "wb") as f:
f.write(encrypted_seed)
logger.debug(f"Parent seed encrypted and saved to '{file_path}'.")
except Exception as e:
@@ -68,14 +71,18 @@ class EncryptionManager:
:return: The decrypted parent seed as a string.
"""
try:
with open(file_path, 'rb') as f:
with open(file_path, "rb") as f:
encrypted_seed = f.read()
decrypted_seed = self.fernet.decrypt(encrypted_seed).decode('utf-8')
decrypted_seed = self.fernet.decrypt(encrypted_seed).decode("utf-8")
logger.debug(f"Parent seed decrypted successfully from '{file_path}'.")
return decrypted_seed
except InvalidToken:
logger.error("Decryption failed: Invalid token. Possibly incorrect password or corrupted file.")
raise ValueError("Decryption failed: Invalid token. Possibly incorrect password or corrupted file.")
logger.error(
"Decryption failed: Invalid token. Possibly incorrect password or corrupted file."
)
raise ValueError(
"Decryption failed: Invalid token. Possibly incorrect password or corrupted file."
)
except Exception as e:
logger.error(f"Failed to decrypt parent seed: {e}")
logger.error(traceback.format_exc())
@@ -89,7 +96,7 @@ class EncryptionManager:
:return: Encrypted data as bytes.
"""
try:
json_data = json.dumps(data).encode('utf-8')
json_data = json.dumps(data).encode("utf-8")
encrypted = self.fernet.encrypt(json_data)
logger.debug("Data encrypted successfully.")
return encrypted

View File

@@ -8,6 +8,7 @@ from monstr.event.event import Event
# Instantiate the logger
logger = logging.getLogger(__name__)
class EventHandler:
"""
Handles incoming Nostr events.
@@ -25,7 +26,9 @@ class EventHandler:
try:
# Assuming evt.created_at is always an integer Unix timestamp
if isinstance(evt.created_at, int):
created_at_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(evt.created_at))
created_at_str = time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(evt.created_at)
)
else:
# Handle unexpected types gracefully
created_at_str = str(evt.created_at)

View File

@@ -11,6 +11,7 @@ from monstr.encrypt import Keys
logger = logging.getLogger(__name__)
class KeyManager:
"""
Manages key generation, encoding, and derivation for NostrClient.
@@ -26,9 +27,13 @@ class KeyManager:
"""
try:
if not isinstance(parent_seed, str):
raise TypeError(f"Parent seed must be a string, got {type(parent_seed)}")
raise TypeError(
f"Parent seed must be a string, got {type(parent_seed)}"
)
if not isinstance(fingerprint, str):
raise TypeError(f"Fingerprint must be a string, got {type(fingerprint)}")
raise TypeError(
f"Fingerprint must be a string, got {type(fingerprint)}"
)
self.parent_seed = parent_seed
self.fingerprint = fingerprint
@@ -72,12 +77,14 @@ class KeyManager:
"""
try:
# Convert fingerprint to an integer index (using a hash function)
index = int(hashlib.sha256(self.fingerprint.encode()).hexdigest(), 16) % (2**31)
index = int(hashlib.sha256(self.fingerprint.encode()).hexdigest(), 16) % (
2**31
)
# Derive entropy for Nostr key (32 bytes)
entropy_bytes = self.bip85.derive_entropy(
index=index,
bytes_len=32 # Adjust parameter name and value as per your method signature
bytes_len=32, # Adjust parameter name and value as per your method signature
)
# Generate Nostr key pair from entropy
@@ -119,7 +126,7 @@ class KeyManager:
pub_key_hex = self.get_public_key_hex()
pub_key_bytes = bytes.fromhex(pub_key_hex)
data = convertbits(pub_key_bytes, 8, 5, True)
npub = bech32_encode('npub', data)
npub = bech32_encode("npub", data)
return npub
except Exception as e:
logger.error(f"Failed to generate npub: {e}")

View File

@@ -2,6 +2,7 @@
import logging
# Example utility function (if any specific to nostr package)
def some_helper_function():
pass # Implement as needed

View File

@@ -30,12 +30,14 @@ import fcntl # For file locking
# Instantiate the logger
logger = logging.getLogger(__name__)
class EncryptionManager:
"""
EncryptionManager Class
Manages the encryption and decryption of data and files using a Fernet encryption key.
"""
def __init__(self, encryption_key: bytes, fingerprint_dir: Path):
"""
Initializes the EncryptionManager with the provided encryption key and fingerprint directory.
@@ -45,16 +47,20 @@ class EncryptionManager:
fingerprint_dir (Path): The directory corresponding to the fingerprint.
"""
self.fingerprint_dir = fingerprint_dir
self.parent_seed_file = self.fingerprint_dir / 'parent_seed.enc'
self.parent_seed_file = self.fingerprint_dir / "parent_seed.enc"
self.key = encryption_key
try:
self.fernet = Fernet(self.key)
logger.debug(f"EncryptionManager initialized for {self.fingerprint_dir}")
except Exception as e:
logger.error(f"Failed to initialize Fernet with provided encryption key: {e}")
logger.error(
f"Failed to initialize Fernet with provided encryption key: {e}"
)
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to initialize encryption manager: {e}", 'red'))
print(
colored(f"Error: Failed to initialize encryption manager: {e}", "red")
)
raise
def encrypt_parent_seed(self, parent_seed: str) -> None:
@@ -65,25 +71,32 @@ class EncryptionManager:
"""
try:
# Convert seed to bytes
data = parent_seed.encode('utf-8')
data = parent_seed.encode("utf-8")
# Encrypt the data
encrypted_data = self.encrypt_data(data)
# Write the encrypted data to the file with locking
with lock_file(self.parent_seed_file, fcntl.LOCK_EX):
with open(self.parent_seed_file, 'wb') as f:
with open(self.parent_seed_file, "wb") as f:
f.write(encrypted_data)
# Set file permissions to read/write for the user only
os.chmod(self.parent_seed_file, 0o600)
logger.info(f"Parent seed encrypted and saved to '{self.parent_seed_file}'.")
print(colored(f"Parent seed encrypted and saved to '{self.parent_seed_file}'.", 'green'))
logger.info(
f"Parent seed encrypted and saved to '{self.parent_seed_file}'."
)
print(
colored(
f"Parent seed encrypted and saved to '{self.parent_seed_file}'.",
"green",
)
)
except Exception as e:
logger.error(f"Failed to encrypt and save parent seed: {e}")
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to encrypt and save parent seed: {e}", 'red'))
print(colored(f"Error: Failed to encrypt and save parent seed: {e}", "red"))
raise
def decrypt_parent_seed(self) -> str:
@@ -93,24 +106,28 @@ class EncryptionManager:
:return: The decrypted parent seed.
"""
try:
parent_seed_path = self.fingerprint_dir / 'parent_seed.enc'
parent_seed_path = self.fingerprint_dir / "parent_seed.enc"
with lock_file(parent_seed_path, fcntl.LOCK_SH):
with open(parent_seed_path, 'rb') as f:
with open(parent_seed_path, "rb") as f:
encrypted_data = f.read()
decrypted_data = self.decrypt_data(encrypted_data)
parent_seed = decrypted_data.decode('utf-8').strip()
parent_seed = decrypted_data.decode("utf-8").strip()
logger.debug(f"Parent seed decrypted successfully from '{parent_seed_path}'.")
logger.debug(
f"Parent seed decrypted successfully from '{parent_seed_path}'."
)
return parent_seed
except InvalidToken:
logger.error("Invalid encryption key or corrupted data while decrypting parent seed.")
print(colored("Error: Invalid encryption key or corrupted data.", 'red'))
logger.error(
"Invalid encryption key or corrupted data while decrypting parent seed."
)
print(colored("Error: Invalid encryption key or corrupted data.", "red"))
raise
except Exception as e:
logger.error(f"Failed to decrypt parent seed: {e}")
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to decrypt parent seed: {e}", 'red'))
print(colored(f"Error: Failed to decrypt parent seed: {e}", "red"))
raise
def encrypt_data(self, data: bytes) -> bytes:
@@ -127,7 +144,7 @@ class EncryptionManager:
except Exception as e:
logger.error(f"Failed to encrypt data: {e}")
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to encrypt data: {e}", 'red'))
print(colored(f"Error: Failed to encrypt data: {e}", "red"))
raise
def decrypt_data(self, encrypted_data: bytes) -> bytes:
@@ -142,13 +159,15 @@ class EncryptionManager:
logger.debug("Data decrypted successfully.")
return decrypted_data
except InvalidToken:
logger.error("Invalid encryption key or corrupted data while decrypting data.")
print(colored("Error: Invalid encryption key or corrupted data.", 'red'))
logger.error(
"Invalid encryption key or corrupted data while decrypting data."
)
print(colored("Error: Invalid encryption key or corrupted data.", "red"))
raise
except Exception as e:
logger.error(f"Failed to decrypt data: {e}")
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to decrypt data: {e}", 'red'))
print(colored(f"Error: Failed to decrypt data: {e}", "red"))
raise
def encrypt_and_save_file(self, data: bytes, relative_path: Path) -> None:
@@ -170,18 +189,23 @@ class EncryptionManager:
# Write the encrypted data to the file with locking
with lock_file(file_path, fcntl.LOCK_EX):
with open(file_path, 'wb') as f:
with open(file_path, "wb") as f:
f.write(encrypted_data)
# Set file permissions to read/write for the user only
os.chmod(file_path, 0o600)
logger.info(f"Data encrypted and saved to '{file_path}'.")
print(colored(f"Data encrypted and saved to '{file_path}'.", 'green'))
print(colored(f"Data encrypted and saved to '{file_path}'.", "green"))
except Exception as e:
logger.error(f"Failed to encrypt and save data to '{relative_path}': {e}")
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to encrypt and save data to '{relative_path}': {e}", 'red'))
print(
colored(
f"Error: Failed to encrypt and save data to '{relative_path}': {e}",
"red",
)
)
raise
def decrypt_file(self, relative_path: Path) -> bytes:
@@ -197,7 +221,7 @@ class EncryptionManager:
# Read the encrypted data with locking
with lock_file(file_path, fcntl.LOCK_SH):
with open(file_path, 'rb') as f:
with open(file_path, "rb") as f:
encrypted_data = f.read()
# Decrypt the data
@@ -205,13 +229,19 @@ class EncryptionManager:
logger.debug(f"Data decrypted successfully from '{file_path}'.")
return decrypted_data
except InvalidToken:
logger.error("Invalid encryption key or corrupted data while decrypting file.")
print(colored("Error: Invalid encryption key or corrupted data.", 'red'))
logger.error(
"Invalid encryption key or corrupted data while decrypting file."
)
print(colored("Error: Invalid encryption key or corrupted data.", "red"))
raise
except Exception as e:
logger.error(f"Failed to decrypt data from '{relative_path}': {e}")
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to decrypt data from '{relative_path}': {e}", 'red'))
print(
colored(
f"Error: Failed to decrypt data from '{relative_path}': {e}", "red"
)
)
raise
def save_json_data(self, data: dict, relative_path: Optional[Path] = None) -> None:
@@ -223,16 +253,22 @@ class EncryptionManager:
Defaults to 'seedpass_passwords_db.json.enc'.
"""
if relative_path is None:
relative_path = Path('seedpass_passwords_db.json.enc')
relative_path = Path("seedpass_passwords_db.json.enc")
try:
json_data = json.dumps(data, indent=4).encode('utf-8')
json_data = json.dumps(data, indent=4).encode("utf-8")
self.encrypt_and_save_file(json_data, relative_path)
logger.debug(f"JSON data encrypted and saved to '{relative_path}'.")
print(colored(f"JSON data encrypted and saved to '{relative_path}'.", 'green'))
print(
colored(f"JSON data encrypted and saved to '{relative_path}'.", "green")
)
except Exception as e:
logger.error(f"Failed to save JSON data to '{relative_path}': {e}")
logger.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to save JSON data to '{relative_path}': {e}", 'red'))
print(
colored(
f"Error: Failed to save JSON data to '{relative_path}': {e}", "red"
)
)
raise
def load_json_data(self, relative_path: Optional[Path] = None) -> dict:
@@ -244,35 +280,54 @@ class EncryptionManager:
:return: The decrypted JSON data as a dictionary.
"""
if relative_path is None:
relative_path = Path('seedpass_passwords_db.json.enc')
relative_path = Path("seedpass_passwords_db.json.enc")
file_path = self.fingerprint_dir / relative_path
if not file_path.exists():
logger.info(f"Index file '{file_path}' does not exist. Initializing empty data.")
print(colored(f"Info: Index file '{file_path}' not found. Initializing new password database.", 'yellow'))
return {'passwords': {}}
logger.info(
f"Index file '{file_path}' does not exist. Initializing empty data."
)
print(
colored(
f"Info: Index file '{file_path}' not found. Initializing new password database.",
"yellow",
)
)
return {"passwords": {}}
try:
decrypted_data = self.decrypt_file(relative_path)
json_content = decrypted_data.decode('utf-8').strip()
json_content = decrypted_data.decode("utf-8").strip()
data = json.loads(json_content)
logger.debug(f"JSON data loaded and decrypted from '{file_path}': {data}")
print(colored(f"JSON data loaded and decrypted from '{file_path}'.", 'green'))
print(
colored(f"JSON data loaded and decrypted from '{file_path}'.", "green")
)
return data
except json.JSONDecodeError as e:
logger.error(f"Failed to decode JSON data from '{file_path}': {e}")
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to decode JSON data from '{file_path}': {e}", 'red'))
print(
colored(
f"Error: Failed to decode JSON data from '{file_path}': {e}", "red"
)
)
raise
except InvalidToken:
logger.error("Invalid encryption key or corrupted data while decrypting JSON data.")
print(colored("Error: Invalid encryption key or corrupted data.", 'red'))
logger.error(
"Invalid encryption key or corrupted data while decrypting JSON data."
)
print(colored("Error: Invalid encryption key or corrupted data.", "red"))
raise
except Exception as e:
logger.error(f"Failed to load JSON data from '{file_path}': {e}")
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to load JSON data from '{file_path}': {e}", 'red'))
print(
colored(
f"Error: Failed to load JSON data from '{file_path}': {e}", "red"
)
)
raise
def update_checksum(self, relative_path: Optional[Path] = None) -> None:
@@ -283,32 +338,39 @@ class EncryptionManager:
Defaults to 'seedpass_passwords_db.json.enc'.
"""
if relative_path is None:
relative_path = Path('seedpass_passwords_db.json.enc')
relative_path = Path("seedpass_passwords_db.json.enc")
try:
file_path = self.fingerprint_dir / relative_path
decrypted_data = self.decrypt_file(relative_path)
content = decrypted_data.decode('utf-8')
content = decrypted_data.decode("utf-8")
logger.debug("Calculating checksum of the updated file content.")
checksum = hashlib.sha256(content.encode('utf-8')).hexdigest()
checksum = hashlib.sha256(content.encode("utf-8")).hexdigest()
logger.debug(f"New checksum: {checksum}")
checksum_file = file_path.parent / f"{file_path.stem}_checksum.txt"
# Write the checksum to the file with locking
with lock_file(checksum_file, fcntl.LOCK_EX):
with open(checksum_file, 'w') as f:
with open(checksum_file, "w") as f:
f.write(checksum)
# Set file permissions to read/write for the user only
os.chmod(checksum_file, 0o600)
logger.debug(f"Checksum for '{file_path}' updated and written to '{checksum_file}'.")
print(colored(f"Checksum for '{file_path}' updated.", 'green'))
logger.debug(
f"Checksum for '{file_path}' updated and written to '{checksum_file}'."
)
print(colored(f"Checksum for '{file_path}' updated.", "green"))
except Exception as e:
logger.error(f"Failed to update checksum for '{relative_path}': {e}")
logger.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to update checksum for '{relative_path}': {e}", 'red'))
print(
colored(
f"Error: Failed to update checksum for '{relative_path}': {e}",
"red",
)
)
raise
def get_encrypted_index(self) -> Optional[bytes]:
@@ -318,14 +380,20 @@ class EncryptionManager:
:return: Encrypted data as bytes or None if the index file does not exist.
"""
try:
relative_path = Path('seedpass_passwords_db.json.enc')
relative_path = Path("seedpass_passwords_db.json.enc")
if not (self.fingerprint_dir / relative_path).exists():
logger.error(f"Index file '{relative_path}' does not exist in '{self.fingerprint_dir}'.")
print(colored(f"Error: Index file '{relative_path}' does not exist.", 'red'))
logger.error(
f"Index file '{relative_path}' does not exist in '{self.fingerprint_dir}'."
)
print(
colored(
f"Error: Index file '{relative_path}' does not exist.", "red"
)
)
return None
with lock_file(self.fingerprint_dir / relative_path, fcntl.LOCK_SH):
with open(self.fingerprint_dir / relative_path, 'rb') as file:
with open(self.fingerprint_dir / relative_path, "rb") as file:
encrypted_data = file.read()
logger.debug(f"Encrypted index data read from '{relative_path}'.")
@@ -333,10 +401,17 @@ class EncryptionManager:
except Exception as e:
logger.error(f"Failed to read encrypted index file '{relative_path}': {e}")
logger.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to read encrypted index file '{relative_path}': {e}", 'red'))
print(
colored(
f"Error: Failed to read encrypted index file '{relative_path}': {e}",
"red",
)
)
return None
def decrypt_and_save_index_from_nostr(self, encrypted_data: bytes, relative_path: Optional[Path] = None) -> None:
def decrypt_and_save_index_from_nostr(
self, encrypted_data: bytes, relative_path: Optional[Path] = None
) -> None:
"""
Decrypts the encrypted data retrieved from Nostr and updates the local index file.
@@ -345,18 +420,22 @@ class EncryptionManager:
Defaults to 'seedpass_passwords_db.json.enc'.
"""
if relative_path is None:
relative_path = Path('seedpass_passwords_db.json.enc')
relative_path = Path("seedpass_passwords_db.json.enc")
try:
decrypted_data = self.decrypt_data(encrypted_data)
data = json.loads(decrypted_data.decode('utf-8'))
data = json.loads(decrypted_data.decode("utf-8"))
self.save_json_data(data, relative_path)
self.update_checksum(relative_path)
logger.info("Index file updated from Nostr successfully.")
print(colored("Index file updated from Nostr successfully.", 'green'))
print(colored("Index file updated from Nostr successfully.", "green"))
except Exception as e:
logger.error(f"Failed to decrypt and save data from Nostr: {e}")
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to decrypt and save data from Nostr: {e}", 'red'))
print(
colored(
f"Error: Failed to decrypt and save data from Nostr: {e}", "red"
)
)
# Re-raise the exception to inform the calling function of the failure
raise
@@ -371,7 +450,9 @@ class EncryptionManager:
words = seed_phrase.split()
if len(words) != 12:
logger.error("Seed phrase does not contain exactly 12 words.")
print(colored("Error: Seed phrase must contain exactly 12 words.", 'red'))
print(
colored("Error: Seed phrase must contain exactly 12 words.", "red")
)
return False
# Additional validation can be added here (e.g., word list checks)
logger.debug("Seed phrase validated successfully.")
@@ -379,7 +460,7 @@ class EncryptionManager:
except Exception as e:
logging.error(f"Error validating seed phrase: {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to validate seed phrase: {e}", 'red'))
print(colored(f"Error: Failed to validate seed phrase: {e}", "red"))
return False
def derive_seed_from_mnemonic(self, mnemonic: str, passphrase: str = "") -> bytes:
@@ -399,11 +480,12 @@ class EncryptionManager:
if not isinstance(mnemonic, str):
raise TypeError("Mnemonic must be a string after conversion")
from bip_utils import Bip39SeedGenerator
seed = Bip39SeedGenerator(mnemonic).Generate(passphrase)
logger.debug("Seed derived successfully from mnemonic.")
return seed
except Exception as e:
logger.error(f"Failed to derive seed from mnemonic: {e}")
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to derive seed from mnemonic: {e}", 'red'))
print(colored(f"Error: Failed to derive seed from mnemonic: {e}", "red"))
raise

View File

@@ -1003,7 +1003,7 @@ class PasswordManager:
Handles the backup and reveal of the parent seed.
"""
try:
print(colored("\n=== Backup/Reveal Parent Seed ===", "yellow"))
print(colored("\n=== Backup Parent Seed ===", "yellow"))
print(
colored(
"Warning: Revealing your parent seed is a highly sensitive operation.",

View File

@@ -35,6 +35,7 @@ from password_manager.encryption import EncryptionManager
# Instantiate the logger
logger = logging.getLogger(__name__)
class PasswordGenerator:
"""
PasswordGenerator Class
@@ -44,7 +45,9 @@ class PasswordGenerator:
complexity requirements.
"""
def __init__(self, encryption_manager: EncryptionManager, parent_seed: str, bip85: BIP85):
def __init__(
self, encryption_manager: EncryptionManager, parent_seed: str, bip85: BIP85
):
"""
Initializes the PasswordGenerator with the encryption manager, parent seed, and BIP85 instance.
@@ -59,16 +62,20 @@ class PasswordGenerator:
self.bip85 = bip85
# Derive seed bytes from parent_seed using BIP39 (handled by EncryptionManager)
self.seed_bytes = self.encryption_manager.derive_seed_from_mnemonic(self.parent_seed)
self.seed_bytes = self.encryption_manager.derive_seed_from_mnemonic(
self.parent_seed
)
logger.debug("PasswordGenerator initialized successfully.")
except Exception as e:
logger.error(f"Failed to initialize PasswordGenerator: {e}")
logger.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to initialize PasswordGenerator: {e}", 'red'))
print(colored(f"Error: Failed to initialize PasswordGenerator: {e}", "red"))
raise
def generate_password(self, length: int = DEFAULT_PASSWORD_LENGTH, index: int = 0) -> str:
def generate_password(
self, length: int = DEFAULT_PASSWORD_LENGTH, index: int = 0
) -> str:
"""
Generates a deterministic password based on the parent seed, desired length, and index.
@@ -90,11 +97,19 @@ class PasswordGenerator:
try:
# Validate password length
if length < MIN_PASSWORD_LENGTH:
logger.error(f"Password length must be at least {MIN_PASSWORD_LENGTH} characters.")
raise ValueError(f"Password length must be at least {MIN_PASSWORD_LENGTH} characters.")
logger.error(
f"Password length must be at least {MIN_PASSWORD_LENGTH} characters."
)
raise ValueError(
f"Password length must be at least {MIN_PASSWORD_LENGTH} characters."
)
if length > MAX_PASSWORD_LENGTH:
logger.error(f"Password length must not exceed {MAX_PASSWORD_LENGTH} characters.")
raise ValueError(f"Password length must not exceed {MAX_PASSWORD_LENGTH} characters.")
logger.error(
f"Password length must not exceed {MAX_PASSWORD_LENGTH} characters."
)
raise ValueError(
f"Password length must not exceed {MAX_PASSWORD_LENGTH} characters."
)
# Derive entropy using BIP-85
entropy = self.bip85.derive_entropy(index=index, bytes_len=64, app_no=32)
@@ -105,39 +120,43 @@ class PasswordGenerator:
algorithm=hashes.SHA256(),
length=32, # 256 bits for AES-256
salt=None,
info=b'password-generation',
backend=default_backend()
info=b"password-generation",
backend=default_backend(),
)
derived_key = hkdf.derive(entropy)
logger.debug(f"Derived key using HKDF: {derived_key.hex()}")
# Use PBKDF2-HMAC-SHA256 to derive a key from entropy
dk = hashlib.pbkdf2_hmac('sha256', entropy, b'', 100000)
dk = hashlib.pbkdf2_hmac("sha256", entropy, b"", 100000)
logger.debug(f"Derived key using PBKDF2: {dk.hex()}")
# Map the derived key to all allowed characters
all_allowed = string.ascii_letters + string.digits + string.punctuation
password = ''.join(all_allowed[byte % len(all_allowed)] for byte in dk)
logger.debug(f"Password after mapping to all allowed characters: {password}")
password = "".join(all_allowed[byte % len(all_allowed)] for byte in dk)
logger.debug(
f"Password after mapping to all allowed characters: {password}"
)
# Ensure the password meets complexity requirements
password = self.ensure_complexity(password, all_allowed, dk)
logger.debug(f"Password after ensuring complexity: {password}")
# Shuffle characters deterministically based on dk
shuffle_seed = int.from_bytes(dk, 'big')
shuffle_seed = int.from_bytes(dk, "big")
rng = random.Random(shuffle_seed)
password_chars = list(password)
rng.shuffle(password_chars)
password = ''.join(password_chars)
password = "".join(password_chars)
logger.debug("Shuffled password deterministically.")
# Ensure password length by extending if necessary
if len(password) < length:
while len(password) < length:
dk = hashlib.pbkdf2_hmac('sha256', dk, b'', 1)
base64_extra = ''.join(all_allowed[byte % len(all_allowed)] for byte in dk)
password += ''.join(base64_extra)
dk = hashlib.pbkdf2_hmac("sha256", dk, b"", 1)
base64_extra = "".join(
all_allowed[byte % len(all_allowed)] for byte in dk
)
password += "".join(base64_extra)
logger.debug(f"Extended password: {password}")
# Trim the password to the desired length
@@ -149,7 +168,7 @@ class PasswordGenerator:
except Exception as e:
logger.error(f"Error generating password: {e}")
logger.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to generate password: {e}", 'red'))
print(colored(f"Error: Failed to generate password: {e}", "red"))
raise
def ensure_complexity(self, password: str, alphabet: str, dk: bytes) -> str:
@@ -180,7 +199,9 @@ class PasswordGenerator:
current_digits = sum(1 for c in password_chars if c in digits)
current_special = sum(1 for c in password_chars if c in special)
logger.debug(f"Current character counts - Upper: {current_upper}, Lower: {current_lower}, Digits: {current_digits}, Special: {current_special}")
logger.debug(
f"Current character counts - Upper: {current_upper}, Lower: {current_lower}, Digits: {current_digits}, Special: {current_special}"
)
# Set minimum counts
min_upper = 2
@@ -204,14 +225,18 @@ class PasswordGenerator:
index = get_dk_value() % len(password_chars)
char = uppercase[get_dk_value() % len(uppercase)]
password_chars[index] = char
logger.debug(f"Added uppercase letter '{char}' at position {index}.")
logger.debug(
f"Added uppercase letter '{char}' at position {index}."
)
if current_lower < min_lower:
for _ in range(min_lower - current_lower):
index = get_dk_value() % len(password_chars)
char = lowercase[get_dk_value() % len(lowercase)]
password_chars[index] = char
logger.debug(f"Added lowercase letter '{char}' at position {index}.")
logger.debug(
f"Added lowercase letter '{char}' at position {index}."
)
if current_digits < min_digits:
for _ in range(min_digits - current_digits):
@@ -225,7 +250,9 @@ class PasswordGenerator:
index = get_dk_value() % len(password_chars)
char = special[get_dk_value() % len(special)]
password_chars[index] = char
logger.debug(f"Added special character '{char}' at position {index}.")
logger.debug(
f"Added special character '{char}' at position {index}."
)
# Additional deterministic inclusion of symbols to increase score
symbol_target = 3 # Increase target number of symbols
@@ -253,11 +280,15 @@ class PasswordGenerator:
if i == 0 and password_chars[j] not in uppercase:
char = uppercase[get_dk_value() % len(uppercase)]
password_chars[j] = char
logger.debug(f"Assigned uppercase letter '{char}' to position {j}.")
logger.debug(
f"Assigned uppercase letter '{char}' to position {j}."
)
elif i == 1 and password_chars[j] not in lowercase:
char = lowercase[get_dk_value() % len(lowercase)]
password_chars[j] = char
logger.debug(f"Assigned lowercase letter '{char}' to position {j}.")
logger.debug(
f"Assigned lowercase letter '{char}' to position {j}."
)
elif i == 2 and password_chars[j] not in digits:
char = digits[get_dk_value() % len(digits)]
password_chars[j] = char
@@ -265,10 +296,14 @@ class PasswordGenerator:
elif i == 3 and password_chars[j] not in special:
char = special[get_dk_value() % len(special)]
password_chars[j] = char
logger.debug(f"Assigned special character '{char}' to position {j}.")
logger.debug(
f"Assigned special character '{char}' to position {j}."
)
# Shuffle again to distribute the characters more evenly
shuffle_seed = int.from_bytes(dk, 'big') + dk_index # Modify seed to vary shuffle
shuffle_seed = (
int.from_bytes(dk, "big") + dk_index
) # Modify seed to vary shuffle
rng = random.Random(shuffle_seed)
rng.shuffle(password_chars)
logger.debug(f"Shuffled password characters for balanced distribution.")
@@ -278,12 +313,14 @@ class PasswordGenerator:
final_lower = sum(1 for c in password_chars if c in lowercase)
final_digits = sum(1 for c in password_chars if c in digits)
final_special = sum(1 for c in password_chars if c in special)
logger.debug(f"Final character counts - Upper: {final_upper}, Lower: {final_lower}, Digits: {final_digits}, Special: {final_special}")
logger.debug(
f"Final character counts - Upper: {final_upper}, Lower: {final_lower}, Digits: {final_digits}, Special: {final_special}"
)
return ''.join(password_chars)
return "".join(password_chars)
except Exception as e:
logger.error(f"Error ensuring password complexity: {e}")
logger.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to ensure password complexity: {e}", 'red'))
print(colored(f"Error: Failed to ensure password complexity: {e}", "red"))
raise

View File

@@ -350,7 +350,7 @@ def display_menu(password_manager: PasswordManager):
5. Post Encrypted Index to Nostr
6. Retrieve Encrypted Index from Nostr
7. Display Nostr Public Key (npub)
8. Backup/Reveal Parent Seed
8. Backup Parent Seed
9. Switch Fingerprint
10. Add a New Fingerprint
11. Remove an Existing Fingerprint
@@ -1602,7 +1602,7 @@ class PasswordManager:
Handles the backup and reveal of the parent seed.
"""
try:
print(colored("\n=== Backup/Reveal Parent Seed ===", 'yellow'))
print(colored("\n=== Backup Parent Seed ===", 'yellow'))
print(colored("Warning: Revealing your parent seed is a highly sensitive operation.", 'red'))
print(colored("Ensure you're in a secure, private environment and no one is watching your screen.", 'red'))

View File

@@ -2,6 +2,7 @@
try:
from bip_utils import Bip39SeedGenerator
print("Bip39SeedGenerator imported successfully.")
except ImportError as e:
print(f"ImportError: {e}")

View File

@@ -15,10 +15,10 @@ except Exception as e:
logging.error(traceback.format_exc()) # Log full traceback
__all__ = [
'derive_key_from_password',
'derive_key_from_parent_seed',
'calculate_checksum',
'verify_checksum',
'lock_file',
'prompt_for_password'
"derive_key_from_password",
"derive_key_from_parent_seed",
"calculate_checksum",
"verify_checksum",
"lock_file",
"prompt_for_password",
]

View File

@@ -19,14 +19,12 @@ from typing import Optional
from termcolor import colored
from constants import (
APP_DIR,
SCRIPT_CHECKSUM_FILE
)
from constants import APP_DIR, SCRIPT_CHECKSUM_FILE
# Instantiate the logger
logger = logging.getLogger(__name__)
def calculate_checksum(file_path: str) -> Optional[str]:
"""
Calculates the SHA-256 checksum of the given file.
@@ -39,7 +37,7 @@ def calculate_checksum(file_path: str) -> Optional[str]:
"""
hasher = hashlib.sha256()
try:
with open(file_path, 'rb') as f:
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hasher.update(chunk)
checksum = hasher.hexdigest()
@@ -47,12 +45,20 @@ def calculate_checksum(file_path: str) -> Optional[str]:
return checksum
except FileNotFoundError:
logging.error(f"File '{file_path}' not found for checksum calculation.")
print(colored(f"Error: File '{file_path}' not found for checksum calculation.", 'red'))
print(
colored(
f"Error: File '{file_path}' not found for checksum calculation.", "red"
)
)
return None
except Exception as e:
logging.error(f"Error calculating checksum for '{file_path}': {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to calculate checksum for '{file_path}': {e}", 'red'))
print(
colored(
f"Error: Failed to calculate checksum for '{file_path}': {e}", "red"
)
)
return None
@@ -68,7 +74,7 @@ def verify_checksum(current_checksum: str, checksum_file_path: str) -> bool:
bool: True if checksums match, False otherwise.
"""
try:
with open(checksum_file_path, 'r') as f:
with open(checksum_file_path, "r") as f:
stored_checksum = f.read().strip()
if current_checksum == stored_checksum:
logging.debug(f"Checksum verification passed for '{checksum_file_path}'.")
@@ -78,12 +84,17 @@ def verify_checksum(current_checksum: str, checksum_file_path: str) -> bool:
return False
except FileNotFoundError:
logging.error(f"Checksum file '{checksum_file_path}' not found.")
print(colored(f"Error: Checksum file '{checksum_file_path}' not found.", 'red'))
print(colored(f"Error: Checksum file '{checksum_file_path}' not found.", "red"))
return False
except Exception as e:
logging.error(f"Error reading checksum file '{checksum_file_path}': {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to read checksum file '{checksum_file_path}': {e}", 'red'))
print(
colored(
f"Error: Failed to read checksum file '{checksum_file_path}': {e}",
"red",
)
)
return False
@@ -100,16 +111,21 @@ def update_checksum(content: str, checksum_file_path: str) -> bool:
"""
try:
hasher = hashlib.sha256()
hasher.update(content.encode('utf-8'))
hasher.update(content.encode("utf-8"))
new_checksum = hasher.hexdigest()
with open(checksum_file_path, 'w') as f:
with open(checksum_file_path, "w") as f:
f.write(new_checksum)
logging.debug(f"Updated checksum for '{checksum_file_path}' to: {new_checksum}")
return True
except Exception as e:
logging.error(f"Failed to update checksum for '{checksum_file_path}': {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to update checksum for '{checksum_file_path}': {e}", 'red'))
print(
colored(
f"Error: Failed to update checksum for '{checksum_file_path}': {e}",
"red",
)
)
return False
@@ -129,11 +145,11 @@ def verify_and_update_checksum(file_path: str, checksum_file_path: str) -> bool:
return False
if verify_checksum(current_checksum, checksum_file_path):
print(colored(f"Checksum verification passed for '{file_path}'.", 'green'))
print(colored(f"Checksum verification passed for '{file_path}'.", "green"))
logging.info(f"Checksum verification passed for '{file_path}'.")
return True
else:
print(colored(f"Checksum verification failed for '{file_path}'.", 'red'))
print(colored(f"Checksum verification failed for '{file_path}'.", "red"))
logging.warning(f"Checksum verification failed for '{file_path}'.")
return False
@@ -154,13 +170,20 @@ def initialize_checksum(file_path: str, checksum_file_path: str) -> bool:
return False
try:
with open(checksum_file_path, 'w') as f:
with open(checksum_file_path, "w") as f:
f.write(checksum)
logging.debug(f"Initialized checksum file '{checksum_file_path}' with checksum: {checksum}")
print(colored(f"Initialized checksum for '{file_path}'.", 'green'))
logging.debug(
f"Initialized checksum file '{checksum_file_path}' with checksum: {checksum}"
)
print(colored(f"Initialized checksum for '{file_path}'.", "green"))
return True
except Exception as e:
logging.error(f"Failed to initialize checksum file '{checksum_file_path}': {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to initialize checksum file '{checksum_file_path}': {e}", 'red'))
print(
colored(
f"Error: Failed to initialize checksum file '{checksum_file_path}': {e}",
"red",
)
)
return False

View File

@@ -26,6 +26,7 @@ import traceback
# Instantiate the logger
logger = logging.getLogger(__name__)
@contextmanager
def lock_file(file_path: Path, lock_type: int) -> Generator[None, None, None]:
"""
@@ -44,14 +45,16 @@ def lock_file(file_path: Path, lock_type: int) -> Generator[None, None, None]:
SystemExit: Exits the program if the lock cannot be acquired.
"""
if lock_type not in (fcntl.LOCK_EX, fcntl.LOCK_SH):
logging.error(f"Invalid lock type: {lock_type}. Use fcntl.LOCK_EX or fcntl.LOCK_SH.")
print(colored("Error: Invalid lock type provided.", 'red'))
logging.error(
f"Invalid lock type: {lock_type}. Use fcntl.LOCK_EX or fcntl.LOCK_SH."
)
print(colored("Error: Invalid lock type provided.", "red"))
sys.exit(1)
file = None
try:
# Determine the mode based on whether the file exists
mode = 'rb+' if file_path.exists() else 'wb'
mode = "rb+" if file_path.exists() else "wb"
# Open the file
file = open(file_path, mode)
@@ -67,7 +70,12 @@ def lock_file(file_path: Path, lock_type: int) -> Generator[None, None, None]:
lock_type_str = "exclusive" if lock_type == fcntl.LOCK_EX else "shared"
logging.error(f"Failed to acquire {lock_type_str} lock on '{file_path}': {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to acquire {lock_type_str} lock on '{file_path}': {e}", 'red'))
print(
colored(
f"Error: Failed to acquire {lock_type_str} lock on '{file_path}': {e}",
"red",
)
)
sys.exit(1)
finally:
@@ -78,9 +86,16 @@ def lock_file(file_path: Path, lock_type: int) -> Generator[None, None, None]:
logging.debug(f"Lock released on '{file_path}'.")
except Exception as e:
lock_type_str = "exclusive" if lock_type == fcntl.LOCK_EX else "shared"
logging.warning(f"Failed to release {lock_type_str} lock on '{file_path}': {e}")
logging.warning(
f"Failed to release {lock_type_str} lock on '{file_path}': {e}"
)
logging.error(traceback.format_exc()) # Log full traceback
print(colored(f"Warning: Failed to release {lock_type_str} lock on '{file_path}': {e}", 'yellow'))
print(
colored(
f"Warning: Failed to release {lock_type_str} lock on '{file_path}': {e}",
"yellow",
)
)
finally:
# Close the file
try:
@@ -89,7 +104,12 @@ def lock_file(file_path: Path, lock_type: int) -> Generator[None, None, None]:
except Exception as e:
logging.warning(f"Failed to close file '{file_path}': {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(colored(f"Warning: Failed to close file '{file_path}': {e}", 'yellow'))
print(
colored(
f"Warning: Failed to close file '{file_path}': {e}",
"yellow",
)
)
@contextmanager

View File

@@ -16,6 +16,7 @@ from typing import Optional
# Instantiate the logger
logger = logging.getLogger(__name__)
def generate_fingerprint(seed_phrase: str, length: int = 16) -> Optional[str]:
"""
Generates a unique fingerprint from the provided seed phrase using SHA-256.
@@ -33,7 +34,7 @@ def generate_fingerprint(seed_phrase: str, length: int = 16) -> Optional[str]:
logger.debug(f"Normalized seed: {normalized_seed}")
# Compute SHA-256 hash
sha256_hash = hashlib.sha256(normalized_seed.encode('utf-8')).hexdigest()
sha256_hash = hashlib.sha256(normalized_seed.encode("utf-8")).hexdigest()
logger.debug(f"SHA-256 Hash: {sha256_hash}")
# Truncate to desired length

View File

@@ -14,6 +14,7 @@ from utils.fingerprint import generate_fingerprint
# Instantiate the logger
logger = logging.getLogger(__name__)
class FingerprintManager:
"""
FingerprintManager Class
@@ -31,7 +32,7 @@ class FingerprintManager:
app_dir (Path): The root application directory (e.g., ~/.seedpass).
"""
self.app_dir = app_dir
self.fingerprints_file = self.app_dir / 'fingerprints.json'
self.fingerprints_file = self.app_dir / "fingerprints.json"
self._ensure_app_directory()
self.fingerprints = self._load_fingerprints()
self.current_fingerprint: Optional[str] = None
@@ -43,7 +44,7 @@ class FingerprintManager:
Returns:
Optional[Path]: The Path object of the current fingerprint directory or None.
"""
if hasattr(self, 'current_fingerprint') and self.current_fingerprint:
if hasattr(self, "current_fingerprint") and self.current_fingerprint:
return self.get_fingerprint_directory(self.current_fingerprint)
else:
logger.error("No current fingerprint is set.")
@@ -57,7 +58,9 @@ class FingerprintManager:
self.app_dir.mkdir(parents=True, exist_ok=True)
logger.debug(f"Application directory ensured at {self.app_dir}")
except Exception as e:
logger.error(f"Failed to create application directory at {self.app_dir}: {e}")
logger.error(
f"Failed to create application directory at {self.app_dir}: {e}"
)
logger.error(traceback.format_exc())
raise
@@ -70,13 +73,15 @@ class FingerprintManager:
"""
try:
if self.fingerprints_file.exists():
with open(self.fingerprints_file, 'r') as f:
with open(self.fingerprints_file, "r") as f:
data = json.load(f)
fingerprints = data.get('fingerprints', [])
fingerprints = data.get("fingerprints", [])
logger.debug(f"Loaded fingerprints: {fingerprints}")
return fingerprints
else:
logger.debug("fingerprints.json not found. Initializing empty fingerprint list.")
logger.debug(
"fingerprints.json not found. Initializing empty fingerprint list."
)
return []
except Exception as e:
logger.error(f"Failed to load fingerprints: {e}")
@@ -88,8 +93,8 @@ class FingerprintManager:
Saves the current list of fingerprints to the fingerprints.json file.
"""
try:
with open(self.fingerprints_file, 'w') as f:
json.dump({'fingerprints': self.fingerprints}, f, indent=4)
with open(self.fingerprints_file, "w") as f:
json.dump({"fingerprints": self.fingerprints}, f, indent=4)
logger.debug(f"Fingerprints saved: {self.fingerprints}")
except Exception as e:
logger.error(f"Failed to save fingerprints: {e}")
@@ -140,7 +145,7 @@ class FingerprintManager:
# Remove fingerprint directory
fingerprint_dir = self.app_dir / fingerprint
if fingerprint_dir.exists() and fingerprint_dir.is_dir():
for child in fingerprint_dir.glob('*'):
for child in fingerprint_dir.glob("*"):
if child.is_file():
child.unlink()
elif child.is_dir():

View File

@@ -29,6 +29,7 @@ colorama_init()
# Instantiate the logger
logger = logging.getLogger(__name__)
def prompt_new_password() -> str:
"""
Prompts the user to enter and confirm a new password for encrypting the parent seed.
@@ -51,39 +52,50 @@ def prompt_new_password() -> str:
confirm_password = getpass.getpass(prompt="Confirm your password: ").strip()
if not password:
print(colored("Error: Password cannot be empty. Please try again.", 'red'))
print(
colored("Error: Password cannot be empty. Please try again.", "red")
)
logging.warning("User attempted to enter an empty password.")
attempts += 1
continue
if len(password) < MIN_PASSWORD_LENGTH:
print(colored(f"Error: Password must be at least {MIN_PASSWORD_LENGTH} characters long.", 'red'))
logging.warning(f"User entered a password shorter than {MIN_PASSWORD_LENGTH} characters.")
print(
colored(
f"Error: Password must be at least {MIN_PASSWORD_LENGTH} characters long.",
"red",
)
)
logging.warning(
f"User entered a password shorter than {MIN_PASSWORD_LENGTH} characters."
)
attempts += 1
continue
if password != confirm_password:
print(colored("Error: Passwords do not match. Please try again.", 'red'))
print(
colored("Error: Passwords do not match. Please try again.", "red")
)
logging.warning("User entered mismatching passwords.")
attempts += 1
continue
# Normalize the password to NFKD form
normalized_password = unicodedata.normalize('NFKD', password)
normalized_password = unicodedata.normalize("NFKD", password)
logging.debug("User entered a valid and confirmed password.")
return normalized_password
except KeyboardInterrupt:
print(colored("\nOperation cancelled by user.", 'yellow'))
print(colored("\nOperation cancelled by user.", "yellow"))
logging.info("Password prompt interrupted by user.")
sys.exit(0)
except Exception as e:
logging.error(f"Unexpected error during password prompt: {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: {e}", 'red'))
print(colored(f"Error: {e}", "red"))
attempts += 1
print(colored("Maximum password attempts exceeded. Exiting.", 'red'))
print(colored("Maximum password attempts exceeded. Exiting.", "red"))
logging.error("User failed to provide a valid password after multiple attempts.")
sys.exit(1)
@@ -107,27 +119,29 @@ def prompt_existing_password(prompt_message: str = "Enter your password: ") -> s
password = getpass.getpass(prompt=prompt_message).strip()
if not password:
print(colored("Error: Password cannot be empty.", 'red'))
print(colored("Error: Password cannot be empty.", "red"))
logging.warning("User attempted to enter an empty password.")
sys.exit(1)
# Normalize the password to NFKD form
normalized_password = unicodedata.normalize('NFKD', password)
normalized_password = unicodedata.normalize("NFKD", password)
logging.debug("User entered an existing password for decryption.")
return normalized_password
except KeyboardInterrupt:
print(colored("\nOperation cancelled by user.", 'yellow'))
print(colored("\nOperation cancelled by user.", "yellow"))
logging.info("Existing password prompt interrupted by user.")
sys.exit(0)
except Exception as e:
logging.error(f"Unexpected error during existing password prompt: {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: {e}", 'red'))
print(colored(f"Error: {e}", "red"))
sys.exit(1)
def confirm_action(prompt_message: str = "Are you sure you want to proceed? (Y/N): ") -> bool:
def confirm_action(
prompt_message: str = "Are you sure you want to proceed? (Y/N): ",
) -> bool:
"""
Prompts the user to confirm an action, typically used before performing critical operations.
@@ -143,24 +157,24 @@ def confirm_action(prompt_message: str = "Are you sure you want to proceed? (Y/N
"""
try:
while True:
response = input(colored(prompt_message, 'cyan')).strip().lower()
if response in ['y', 'yes']:
response = input(colored(prompt_message, "cyan")).strip().lower()
if response in ["y", "yes"]:
logging.debug("User confirmed the action.")
return True
elif response in ['n', 'no']:
elif response in ["n", "no"]:
logging.debug("User declined the action.")
return False
else:
print(colored("Please respond with 'Y' or 'N'.", 'yellow'))
print(colored("Please respond with 'Y' or 'N'.", "yellow"))
except KeyboardInterrupt:
print(colored("\nOperation cancelled by user.", 'yellow'))
print(colored("\nOperation cancelled by user.", "yellow"))
logging.info("Action confirmation interrupted by user.")
sys.exit(0)
except Exception as e:
logging.error(f"Unexpected error during action confirmation: {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: {e}", 'red'))
print(colored(f"Error: {e}", "red"))
sys.exit(1)