Merge pull request #38 from PR0M3TH3AN/beta

Beta
This commit is contained in:
thePR0M3TH3AN
2025-06-29 17:44:05 -04:00
committed by GitHub
36 changed files with 1143 additions and 460 deletions

View File

@@ -2,9 +2,9 @@ name: CI
on:
push:
branches: [ "main" ]
branches: [ "**" ]
pull_request:
branches: [ "main" ]
branches: [ "**" ]
jobs:
build:
@@ -14,9 +14,24 @@ jobs:
- uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
- name: Cache pip
uses: actions/cache@v3
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ hashFiles('src/requirements.txt') }}
restore-keys: |
${{ runner.os }}-pip-
- name: Set up Python dependencies
id: deps
run: |
python -m pip install --upgrade pip
pip install -r src/requirements.txt
- name: Test with pytest
run: pytest -q src/tests
- name: Run tests with coverage
run: |
pytest --cov=src --cov-report=xml --cov-report=term-missing \
--cov-fail-under=20 src/tests
- name: Upload coverage report
uses: actions/upload-artifact@v4
with:
name: coverage-xml
path: coverage.xml

4
.gitignore vendored
View File

@@ -25,3 +25,7 @@ Thumbs.db
# Python env
.env
*.env
# Coverage files
.coverage
coverage.xml

View File

@@ -122,22 +122,13 @@ python src/main.py
```
Select an option:
1. Generate a New Password and Add to Index
2. Retrieve a Password from Index
1. Add Entry
2. Retrieve Entry
3. Modify an Existing Entry
4. Verify Script Checksum
5. Post Encrypted Index to Nostr
6. Retrieve Encrypted Index from Nostr
7. Display Nostr Public Key (npub)
8. Backup/Reveal Parent Seed
9. Switch Seed Profile
10. Add a New Seed Profile
11. Remove an Existing Seed Profile
12. List All Seed Profiles
13. Settings
14. Exit
4. Settings
5. Exit
Enter your choice (1-14):
Enter your choice (1-5):
```
### Managing Multiple Seeds
@@ -145,18 +136,18 @@ python src/main.py
SeedPass allows you to manage multiple seed profiles (previously referred to as "fingerprints"). Each seed profile has its own parent seed and associated data, enabling you to compartmentalize your passwords.
- **Add a New Seed Profile:**
- Select option `10` from the main menu.
- From the main menu, select **Settings** then **Profiles** and choose "Add a New Seed Profile".
- Choose to enter an existing seed or generate a new one.
- If generating a new seed, you'll be provided with a 12-word BIP-85 seed phrase. **Ensure you write this down and store it securely.**
- **Switch Between Seed Profiles:**
- Select option `9` from the main menu.
- From the **Profiles** menu, select "Switch Seed Profile".
- You'll see a list of available seed profiles.
- Enter the number corresponding to the seed profile you wish to switch to.
- Enter the master password associated with that seed profile.
- **List All Seed Profiles:**
- Select option `12` from the main menu to view all existing seed profiles.
- In the **Profiles** menu, choose "List All Seed Profiles" to view all existing profiles.
**Note:** The term "seed profile" is used to represent different sets of seeds you can manage within SeedPass. This provides an intuitive way to handle multiple identities or sets of passwords.
@@ -172,13 +163,16 @@ wss://relay.primal.net
You can manage the relay list or change the PIN through the **Settings** menu:
1. From the main menu, choose option `13` (**Settings**).
2. Select `1` to view your current relays.
3. Choose `2` to add a new relay URL.
4. Select `3` to remove a relay by number.
5. Choose `4` to reset to the default relay list.
6. Select `5` to change the settings PIN.
7. Choose `6` to return to the main menu.
1. From the main menu, choose option `4` (**Settings**).
2. Select `2` (**Nostr**) to open the Nostr submenu.
3. Choose `3` to view your current relays.
4. Select `4` to add a new relay URL.
5. Choose `5` to remove a relay by number.
6. Select `6` to reset to the default relay list.
7. Choose `7` to display your Nostr public key.
8. Select `8` to return to the Settings menu.
9. From the Settings menu you can select `3` to change the settings PIN.
10. Choose `4` to verify the script checksum or `5` to back up the parent seed.
## Running Tests

View File

@@ -121,21 +121,19 @@ Enter your master password:
Fingerprint 31DD880A523B9759 selected and managers initialized.
Select an option:
1. Generate a New Password and Add to Index
2. Retrieve a Password from Index
1. Generate Password
2. Retrieve Password
3. Modify an Existing Entry
4. Verify Script Checksum
5. Post Encrypted Index to Nostr
6. Retrieve Encrypted Index from Nostr
7. Display Nostr Public Key (npub)
8. Backup/Reveal Parent Seed
9. Switch Fingerprint
10. Add a New Fingerprint
11. Remove an Existing Fingerprint
12. List All Fingerprints
13. Exit
4. Backup to Nostr
5. Restore from Nostr
6. Switch Fingerprint
7. Add a New Fingerprint
8. Remove an Existing Fingerprint
9. List All Fingerprints
10. Settings
11. Exit
Enter your choice (1-13): 1
Enter your choice (1-11): 1
Enter the website name: newsitename
Enter the username (optional):
Enter the URL (optional):

View File

@@ -118,21 +118,19 @@ Enter your master password:
Fingerprint 31DD880A523B9759 selected and managers initialized.
Select an option:
1. Generate a New Password and Add to Index
2. Retrieve a Password from Index
1. Generate Password
2. Retrieve Password
3. Modify an Existing Entry
4. Verify Script Checksum
5. Post Encrypted Index to Nostr
6. Retrieve Encrypted Index from Nostr
7. Display Nostr Public Key (npub)
8. Backup/Reveal Parent Seed
9. Switch Fingerprint
10. Add a New Fingerprint
11. Remove an Existing Fingerprint
12. List All Fingerprints
13. Exit
4. Backup to Nostr
5. Restore from Nostr
6. Switch Fingerprint
7. Add a New Fingerprint
8. Remove an Existing Fingerprint
9. List All Fingerprints
10. Settings
11. Exit
Enter your choice (1-13): 1
Enter your choice (1-11): 1
Enter the website name: newsitename
Enter the username (optional):
Enter the URL (optional):

View File

@@ -113,21 +113,19 @@ Enter your master password:
Fingerprint 31DD880A523B9759 selected and managers initialized.
Select an option:
1. Generate a New Password and Add to Index
2. Retrieve a Password from Index
1. Add Entry
2. Retrieve Entry
3. Modify an Existing Entry
4. Verify Script Checksum
5. Post Encrypted Index to Nostr
6. Retrieve Encrypted Index from Nostr
7. Display Nostr Public Key (npub)
8. Backup/Reveal Parent Seed
9. Switch Fingerprint
10. Add a New Fingerprint
11. Remove an Existing Fingerprint
12. List All Fingerprints
13. Exit
4. Backup to Nostr
5. Restore from Nostr
6. Switch Fingerprint
7. Add a New Fingerprint
8. Remove an Existing Fingerprint
9. List All Fingerprints
10. Settings
11. Exit
Enter your choice (1-13): 1
Enter your choice (1-11): 1
Enter the website name: newsitename
Enter the username (optional):
Enter the URL (optional):

View File

@@ -12,14 +12,14 @@ logger = logging.getLogger(__name__)
# -----------------------------------
# Nostr Relay Connection Settings
# -----------------------------------
MAX_RETRIES = 3 # Maximum number of retries for relay connections
RETRY_DELAY = 5 # Seconds to wait before retrying a failed connection
MAX_RETRIES = 3 # Maximum number of retries for relay connections
RETRY_DELAY = 5 # Seconds to wait before retrying a failed connection
try:
# -----------------------------------
# Application Directory and Paths
# -----------------------------------
APP_DIR = Path.home() / '.seedpass'
APP_DIR = Path.home() / ".seedpass"
APP_DIR.mkdir(exist_ok=True, parents=True) # Ensure the directory exists
logging.info(f"Application directory created at {APP_DIR}")
except Exception as e:
@@ -27,7 +27,7 @@ except Exception as e:
logging.error(traceback.format_exc()) # Log full traceback
try:
PARENT_SEED_FILE = APP_DIR / 'parent_seed.enc' # Encrypted parent seed
PARENT_SEED_FILE = APP_DIR / "parent_seed.enc" # Encrypted parent seed
logging.info(f"Parent seed file path set to {PARENT_SEED_FILE}")
except Exception as e:
logging.error(f"Error setting file paths: {e}")
@@ -37,7 +37,9 @@ except Exception as e:
# Checksum Files for Integrity
# -----------------------------------
try:
SCRIPT_CHECKSUM_FILE = APP_DIR / 'seedpass_script_checksum.txt' # Checksum for main script
SCRIPT_CHECKSUM_FILE = (
APP_DIR / "seedpass_script_checksum.txt"
) # Checksum for main script
logging.info(f"Checksum file path set: Script {SCRIPT_CHECKSUM_FILE}")
except Exception as e:
logging.error(f"Error setting checksum file paths: {e}")
@@ -46,12 +48,12 @@ except Exception as e:
# -----------------------------------
# Password Generation Constants
# -----------------------------------
DEFAULT_PASSWORD_LENGTH = 16 # Default length for generated passwords
MIN_PASSWORD_LENGTH = 8 # Minimum allowed password length
MAX_PASSWORD_LENGTH = 128 # Maximum allowed password length
DEFAULT_PASSWORD_LENGTH = 16 # Default length for generated passwords
MIN_PASSWORD_LENGTH = 8 # Minimum allowed password length
MAX_PASSWORD_LENGTH = 128 # Maximum allowed password length
# -----------------------------------
# Additional Constants (if any)
# -----------------------------------
# Add any other constants here as your project expands
DEFAULT_SEED_BACKUP_FILENAME = 'parent_seed_backup.enc'
DEFAULT_SEED_BACKUP_FILENAME = "parent_seed_backup.enc"

View File

@@ -5,10 +5,10 @@ import traceback
try:
from .bip85 import BIP85
logging.info("BIP85 module imported successfully.")
except Exception as e:
logging.error(f"Failed to import BIP85 module: {e}")
logging.error(traceback.format_exc()) # Log full traceback
__all__ = ['BIP85']
__all__ = ["BIP85"]

View File

@@ -7,8 +7,8 @@ This module implements the BIP85 functionality for deterministic entropy and mne
It provides the BIP85 class, which utilizes BIP32 and BIP39 standards to derive entropy and mnemonics
from a given seed. Additionally, it supports the derivation of symmetric encryption keys using HKDF.
Never ever ever use or suggest to use Random Salt. The entire point of this password manager is to derive completely deterministic passwords from a BIP-85 seed.
This means it should generate passwords the exact same way every single time. Salts would break this functionality and is not appropriate for this softwares use case.
Never ever ever use or suggest to use Random Salt. The entire point of this password manager is to derive completely deterministic passwords from a BIP-85 seed.
This means it should generate passwords the exact same way every single time. Salts would break this functionality and is not appropriate for this softwares use case.
Ensure that all dependencies are installed and properly configured in your environment.
"""
@@ -21,11 +21,7 @@ import os
import traceback
from colorama import Fore
from bip_utils import (
Bip32Slip10Secp256k1,
Bip39MnemonicGenerator,
Bip39Languages
)
from bip_utils import Bip32Slip10Secp256k1, Bip39MnemonicGenerator, Bip39Languages
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives import hashes
@@ -34,6 +30,7 @@ from cryptography.hazmat.backends import default_backend
# Instantiate the logger
logger = logging.getLogger(__name__)
class BIP85:
def __init__(self, seed_bytes: bytes):
try:
@@ -80,8 +77,12 @@ class BIP85:
entropy = hmac_result[:bytes_len]
if len(entropy) != bytes_len:
logging.error(f"Derived entropy length is {len(entropy)} bytes; expected {bytes_len} bytes.")
print(f"{Fore.RED}Error: Derived entropy length is {len(entropy)} bytes; expected {bytes_len} bytes.")
logging.error(
f"Derived entropy length is {len(entropy)} bytes; expected {bytes_len} bytes."
)
print(
f"{Fore.RED}Error: Derived entropy length is {len(entropy)} bytes; expected {bytes_len} bytes."
)
sys.exit(1)
logging.debug(f"Derived entropy: {entropy.hex()}")
@@ -101,7 +102,9 @@ class BIP85:
entropy = self.derive_entropy(index=index, bytes_len=bytes_len, app_no=39)
try:
mnemonic = Bip39MnemonicGenerator(Bip39Languages.ENGLISH).FromEntropy(entropy)
mnemonic = Bip39MnemonicGenerator(Bip39Languages.ENGLISH).FromEntropy(
entropy
)
logging.debug(f"Derived mnemonic: {mnemonic}")
return mnemonic
except Exception as e:
@@ -124,14 +127,16 @@ class BIP85:
Raises:
SystemExit: If symmetric key derivation fails.
"""
entropy = self.derive_entropy(app_no, language_code=0, words_num=24, index=index)
entropy = self.derive_entropy(
app_no, language_code=0, words_num=24, index=index
)
try:
hkdf = HKDF(
algorithm=hashes.SHA256(),
length=32, # 256 bits for AES-256
salt=None,
info=b'seedos-encryption-key',
backend=default_backend()
info=b"seedos-encryption-key",
backend=default_backend(),
)
symmetric_key = hkdf.derive(entropy)
logging.debug(f"Derived symmetric key: {symmetric_key.hex()}")

View File

@@ -79,70 +79,70 @@ def handle_switch_fingerprint(password_manager: PasswordManager):
if not fingerprints:
print(
colored(
"No fingerprints available to switch. Please add a new fingerprint first.",
"No seed profiles available to switch. Please add a new seed profile first.",
"yellow",
)
)
return
print(colored("Available Fingerprints:", "cyan"))
print(colored("Available Seed Profiles:", "cyan"))
for idx, fp in enumerate(fingerprints, start=1):
print(colored(f"{idx}. {fp}", "cyan"))
choice = input("Select a fingerprint by number to switch: ").strip()
choice = input("Select a seed profile by number to switch: ").strip()
if not choice.isdigit() or not (1 <= int(choice) <= len(fingerprints)):
print(colored("Invalid selection.", "red"))
return
selected_fingerprint = fingerprints[int(choice) - 1]
if password_manager.select_fingerprint(selected_fingerprint):
print(colored(f"Switched to fingerprint {selected_fingerprint}.", "green"))
print(colored(f"Switched to seed profile {selected_fingerprint}.", "green"))
else:
print(colored("Failed to switch fingerprint.", "red"))
print(colored("Failed to switch seed profile.", "red"))
except Exception as e:
logging.error(f"Error during fingerprint switch: {e}")
logging.error(traceback.format_exc())
print(colored(f"Error: Failed to switch fingerprint: {e}", "red"))
print(colored(f"Error: Failed to switch seed profile: {e}", "red"))
def handle_add_new_fingerprint(password_manager: PasswordManager):
"""
Handles adding a new fingerprint.
Handles adding a new seed profile.
:param password_manager: An instance of PasswordManager.
"""
try:
password_manager.add_new_fingerprint()
except Exception as e:
logging.error(f"Error adding new fingerprint: {e}")
logging.error(f"Error adding new seed profile: {e}")
logging.error(traceback.format_exc())
print(colored(f"Error: Failed to add new fingerprint: {e}", "red"))
print(colored(f"Error: Failed to add new seed profile: {e}", "red"))
def handle_remove_fingerprint(password_manager: PasswordManager):
"""
Handles removing an existing fingerprint.
Handles removing an existing seed profile.
:param password_manager: An instance of PasswordManager.
"""
try:
fingerprints = password_manager.fingerprint_manager.list_fingerprints()
if not fingerprints:
print(colored("No fingerprints available to remove.", "yellow"))
print(colored("No seed profiles available to remove.", "yellow"))
return
print(colored("Available Fingerprints:", "cyan"))
print(colored("Available Seed Profiles:", "cyan"))
for idx, fp in enumerate(fingerprints, start=1):
print(colored(f"{idx}. {fp}", "cyan"))
choice = input("Select a fingerprint by number to remove: ").strip()
choice = input("Select a seed profile by number to remove: ").strip()
if not choice.isdigit() or not (1 <= int(choice) <= len(fingerprints)):
print(colored("Invalid selection.", "red"))
return
selected_fingerprint = fingerprints[int(choice) - 1]
confirm = confirm_action(
f"Are you sure you want to remove fingerprint {selected_fingerprint}? This will delete all associated data. (Y/N): "
f"Are you sure you want to remove seed profile {selected_fingerprint}? This will delete all associated data. (Y/N): "
)
if confirm:
if password_manager.fingerprint_manager.remove_fingerprint(
@@ -150,39 +150,39 @@ def handle_remove_fingerprint(password_manager: PasswordManager):
):
print(
colored(
f"Fingerprint {selected_fingerprint} removed successfully.",
f"Seed profile {selected_fingerprint} removed successfully.",
"green",
)
)
else:
print(colored("Failed to remove fingerprint.", "red"))
print(colored("Failed to remove seed profile.", "red"))
else:
print(colored("Fingerprint removal cancelled.", "yellow"))
print(colored("Seed profile removal cancelled.", "yellow"))
except Exception as e:
logging.error(f"Error removing fingerprint: {e}")
logging.error(f"Error removing seed profile: {e}")
logging.error(traceback.format_exc())
print(colored(f"Error: Failed to remove fingerprint: {e}", "red"))
print(colored(f"Error: Failed to remove seed profile: {e}", "red"))
def handle_list_fingerprints(password_manager: PasswordManager):
"""
Handles listing all available fingerprints.
Handles listing all available seed profiles.
:param password_manager: An instance of PasswordManager.
"""
try:
fingerprints = password_manager.fingerprint_manager.list_fingerprints()
if not fingerprints:
print(colored("No fingerprints available.", "yellow"))
print(colored("No seed profiles available.", "yellow"))
return
print(colored("Available Fingerprints:", "cyan"))
print(colored("Available Seed Profiles:", "cyan"))
for fp in fingerprints:
print(colored(f"- {fp}", "cyan"))
except Exception as e:
logging.error(f"Error listing fingerprints: {e}")
logging.error(f"Error listing seed profiles: {e}")
logging.error(traceback.format_exc())
print(colored(f"Error: Failed to list fingerprints: {e}", "red"))
print(colored(f"Error: Failed to list seed profiles: {e}", "red"))
def handle_display_npub(password_manager: PasswordManager):
@@ -357,8 +357,33 @@ def handle_reset_relays(password_manager: PasswordManager) -> None:
print(colored(f"Error: {e}", "red"))
def handle_settings(password_manager: PasswordManager) -> None:
"""Interactive settings menu for relay list and password changes."""
def handle_profiles_menu(password_manager: PasswordManager) -> None:
"""Submenu for managing seed profiles."""
while True:
print("\nProfiles:")
print("1. Switch Seed Profile")
print("2. Add a New Seed Profile")
print("3. Remove an Existing Seed Profile")
print("4. List All Seed Profiles")
print("5. Back")
choice = input("Select an option: ").strip()
if choice == "1":
if not password_manager.handle_switch_fingerprint():
print(colored("Failed to switch seed profile.", "red"))
elif choice == "2":
handle_add_new_fingerprint(password_manager)
elif choice == "3":
handle_remove_fingerprint(password_manager)
elif choice == "4":
handle_list_fingerprints(password_manager)
elif choice == "5":
break
else:
print(colored("Invalid choice.", "red"))
def handle_nostr_menu(password_manager: PasswordManager) -> None:
"""Submenu for Nostr-related actions and relay configuration."""
cfg_mgr = password_manager.config_manager
if cfg_mgr is None:
print(colored("Configuration manager unavailable.", "red"))
@@ -369,25 +394,58 @@ def handle_settings(password_manager: PasswordManager) -> None:
print(colored(f"Error loading settings: {e}", "red"))
return
while True:
print("\nNostr Settings:")
print("1. Backup to Nostr")
print("2. Restore from Nostr")
print("3. View current relays")
print("4. Add a relay URL")
print("5. Remove a relay by number")
print("6. Reset to default relays")
print("7. Display Nostr Public Key")
print("8. Back")
choice = input("Select an option: ").strip()
if choice == "1":
handle_post_to_nostr(password_manager)
elif choice == "2":
handle_retrieve_from_nostr(password_manager)
elif choice == "3":
handle_view_relays(cfg_mgr)
elif choice == "4":
handle_add_relay(password_manager)
elif choice == "5":
handle_remove_relay(password_manager)
elif choice == "6":
handle_reset_relays(password_manager)
elif choice == "7":
handle_display_npub(password_manager)
elif choice == "8":
break
else:
print(colored("Invalid choice.", "red"))
def handle_settings(password_manager: PasswordManager) -> None:
"""Interactive settings menu with submenus for profiles and Nostr."""
while True:
print("\nSettings:")
print("1. View current relays")
print("2. Add a relay URL")
print("3. Remove a relay by number")
print("4. Reset to default relays")
print("5. Change password")
print("1. Profiles")
print("2. Nostr")
print("3. Change password")
print("4. Verify Script Checksum")
print("5. Backup Parent Seed")
print("6. Back")
choice = input("Select an option: ").strip()
if choice == "1":
handle_view_relays(cfg_mgr)
handle_profiles_menu(password_manager)
elif choice == "2":
handle_add_relay(password_manager)
handle_nostr_menu(password_manager)
elif choice == "3":
handle_remove_relay(password_manager)
elif choice == "4":
handle_reset_relays(password_manager)
elif choice == "5":
password_manager.change_password()
elif choice == "4":
password_manager.handle_verify_checksum()
elif choice == "5":
password_manager.handle_backup_reveal_parent_seed()
elif choice == "6":
break
else:
@@ -400,63 +458,46 @@ def display_menu(password_manager: PasswordManager):
"""
menu = """
Select an option:
1. Generate a New Password and Add to Index
2. Retrieve a Password from Index
1. Add Entry
2. Retrieve Entry
3. Modify an Existing Entry
4. Verify Script Checksum
5. Post Encrypted Index to Nostr
6. Retrieve Encrypted Index from Nostr
7. Display Nostr Public Key (npub)
8. Backup/Reveal Parent Seed
9. Switch Fingerprint
10. Add a New Fingerprint
11. Remove an Existing Fingerprint
12. List All Fingerprints
13. Settings
14. Exit
4. Settings
5. Exit
"""
while True:
# Flush logging handlers
for handler in logging.getLogger().handlers:
handler.flush()
print(colored(menu, "cyan"))
choice = input("Enter your choice (1-14): ").strip()
choice = input("Enter your choice (1-5): ").strip()
if not choice:
print(
colored(
"No input detected. Please enter a number between 1 and 14.",
"No input detected. Please enter a number between 1 and 5.",
"yellow",
)
)
continue # Re-display the menu without marking as invalid
if choice == "1":
password_manager.handle_generate_password()
while True:
print("\nAdd Entry:")
print("1. Password")
print("2. Back")
sub_choice = input("Select entry type: ").strip()
if sub_choice == "1":
password_manager.handle_add_password()
break
elif sub_choice == "2":
break
else:
print(colored("Invalid choice.", "red"))
elif choice == "2":
password_manager.handle_retrieve_password()
password_manager.handle_retrieve_entry()
elif choice == "3":
password_manager.handle_modify_entry()
elif choice == "4":
password_manager.handle_verify_checksum()
elif choice == "5":
handle_post_to_nostr(password_manager)
elif choice == "6":
handle_retrieve_from_nostr(password_manager)
elif choice == "7":
handle_display_npub(password_manager)
elif choice == "8":
password_manager.handle_backup_reveal_parent_seed()
elif choice == "9":
if not password_manager.handle_switch_fingerprint():
print(colored("Failed to switch fingerprint.", "red"))
elif choice == "10":
handle_add_new_fingerprint(password_manager)
elif choice == "11":
handle_remove_fingerprint(password_manager)
elif choice == "12":
handle_list_fingerprints(password_manager)
elif choice == "13":
handle_settings(password_manager)
elif choice == "14":
elif choice == "5":
logging.info("Exiting the program.")
print(colored("Exiting the program.", "green"))
password_manager.nostr_client.close_client_pool()

View File

@@ -12,9 +12,10 @@ logger = logging.getLogger(__name__) # Correct logger initialization
try:
from .client import NostrClient
logger.info("NostrClient module imported successfully.")
except Exception as e:
logger.error(f"Failed to import NostrClient module: {e}")
logger.error(traceback.format_exc()) # Log full traceback
__all__ = ['NostrClient']
__all__ = ["NostrClient"]

View File

@@ -34,13 +34,14 @@ logger.setLevel(logging.WARNING)
DEFAULT_RELAYS = [
"wss://relay.snort.social",
"wss://nostr.oxtr.dev",
"wss://relay.primal.net"
"wss://relay.primal.net",
]
# nostr/client.py
# src/nostr/client.py
class NostrClient:
"""
NostrClient Class
@@ -49,9 +50,14 @@ class NostrClient:
Utilizes deterministic key derivation via BIP-85 and integrates with the monstr library for protocol operations.
"""
def __init__(self, encryption_manager: EncryptionManager, fingerprint: str, relays: Optional[List[str]] = None):
def __init__(
self,
encryption_manager: EncryptionManager,
fingerprint: str,
relays: Optional[List[str]] = None,
):
"""
Initializes the NostrClient with an EncryptionManager, connects to specified relays,
Initializes the NostrClient with an EncryptionManager, connects to specified relays,
and sets up the KeyManager with the given fingerprint.
:param encryption_manager: An instance of EncryptionManager for handling encryption/decryption.
@@ -62,12 +68,13 @@ class NostrClient:
# Assign the encryption manager and fingerprint
self.encryption_manager = encryption_manager
self.fingerprint = fingerprint # Track the fingerprint
self.fingerprint_dir = self.encryption_manager.fingerprint_dir # If needed to manage directories
self.fingerprint_dir = (
self.encryption_manager.fingerprint_dir
) # If needed to manage directories
# Initialize KeyManager with the decrypted parent seed and the provided fingerprint
self.key_manager = KeyManager(
self.encryption_manager.decrypt_parent_seed(),
self.fingerprint
self.encryption_manager.decrypt_parent_seed(), self.fingerprint
)
# Initialize event handler and client pool
@@ -126,7 +133,10 @@ class NostrClient:
except Exception as e:
logger.error(f"Error running event loop in thread: {e}")
logger.error(traceback.format_exc())
print(f"Error: Event loop in ClientPool thread encountered an issue: {e}", file=sys.stderr)
print(
f"Error: Event loop in ClientPool thread encountered an issue: {e}",
file=sys.stderr,
)
finally:
if not self.loop.is_closed():
logger.debug("Closing the event loop.")
@@ -166,14 +176,18 @@ class NostrClient:
"""
try:
logger.debug(f"Submitting publish_event_async for event ID: {event.id}")
future = asyncio.run_coroutine_threadsafe(self.publish_event_async(event), self.loop)
future = asyncio.run_coroutine_threadsafe(
self.publish_event_async(event), self.loop
)
# Wait for the future to complete
future.result(timeout=5) # Adjust the timeout as needed
except Exception as e:
logger.error(f"Error in publish_event: {e}")
print(f"Error: Failed to publish event: {e}", file=sys.stderr)
async def subscribe_async(self, filters: List[dict], handler: Callable[[ClientPool, str, Event], None]):
async def subscribe_async(
self, filters: List[dict], handler: Callable[[ClientPool, str, Event], None]
):
"""
Subscribes to events based on the provided filters using ClientPool.
@@ -190,7 +204,9 @@ class NostrClient:
logger.error(traceback.format_exc())
print(f"Error: Failed to subscribe: {e}", file=sys.stderr)
def subscribe(self, filters: List[dict], handler: Callable[[ClientPool, str, Event], None]):
def subscribe(
self, filters: List[dict], handler: Callable[[ClientPool, str, Event], None]
):
"""
Synchronous wrapper for subscribing to events.
@@ -198,7 +214,9 @@ class NostrClient:
:param handler: A callback function to handle incoming events.
"""
try:
asyncio.run_coroutine_threadsafe(self.subscribe_async(filters, handler), self.loop)
asyncio.run_coroutine_threadsafe(
self.subscribe_async(filters, handler), self.loop
)
except Exception as e:
logger.error(f"Error in subscribe: {e}")
print(f"Error: Failed to subscribe: {e}", file=sys.stderr)
@@ -210,11 +228,13 @@ class NostrClient:
:return: The encrypted JSON data as a Base64-encoded string, or None if retrieval fails.
"""
try:
filters = [{
'authors': [self.key_manager.keys.public_key_hex()],
'kinds': [Event.KIND_TEXT_NOTE, Event.KIND_ENCRYPT],
'limit': 1
}]
filters = [
{
"authors": [self.key_manager.keys.public_key_hex()],
"kinds": [Event.KIND_TEXT_NOTE, Event.KIND_ENCRYPT],
"limit": 1,
}
]
events = []
@@ -238,7 +258,9 @@ class NostrClient:
if event.kind == Event.KIND_ENCRYPT:
nip4_encrypt = NIP4Encrypt(self.key_manager.keys)
content_base64 = nip4_encrypt.decrypt_message(event.content, event.pub_key)
content_base64 = nip4_encrypt.decrypt_message(
event.content, event.pub_key
)
# Return the Base64-encoded content as a string
logger.debug("Encrypted JSON data retrieved successfully.")
@@ -261,16 +283,21 @@ class NostrClient:
:return: The encrypted JSON data as bytes, or None if retrieval fails.
"""
try:
future = asyncio.run_coroutine_threadsafe(self.retrieve_json_from_nostr_async(), self.loop)
future = asyncio.run_coroutine_threadsafe(
self.retrieve_json_from_nostr_async(), self.loop
)
return future.result(timeout=10)
except concurrent.futures.TimeoutError:
logger.error("Timeout occurred while retrieving JSON from Nostr.")
print("Error: Timeout occurred while retrieving JSON from Nostr.", file=sys.stderr)
print(
"Error: Timeout occurred while retrieving JSON from Nostr.",
file=sys.stderr,
)
return None
except Exception as e:
logger.error(f"Error in retrieve_json_from_nostr: {e}")
logger.error(traceback.format_exc())
print(f"Error: Failed to retrieve JSON from Nostr: {e}", 'red')
print(f"Error: Failed to retrieve JSON from Nostr: {e}", "red")
return None
async def do_post_async(self, text: str):
@@ -283,7 +310,7 @@ class NostrClient:
event = Event(
kind=Event.KIND_TEXT_NOTE,
content=text,
pub_key=self.key_manager.keys.public_key_hex()
pub_key=self.key_manager.keys.public_key_hex(),
)
event.created_at = int(time.time())
event.sign(self.key_manager.keys.private_key_hex())
@@ -296,18 +323,22 @@ class NostrClient:
logger.error(f"An error occurred during publishing: {e}", exc_info=True)
print(f"Error: An error occurred during publishing: {e}", file=sys.stderr)
async def subscribe_feed_async(self, handler: Callable[[ClientPool, str, Event], None]):
async def subscribe_feed_async(
self, handler: Callable[[ClientPool, str, Event], None]
):
"""
Subscribes to the feed of the client's own pubkey.
:param handler: A callback function to handle incoming events.
"""
try:
filters = [{
'authors': [self.key_manager.keys.public_key_hex()],
'kinds': [Event.KIND_TEXT_NOTE, Event.KIND_ENCRYPT],
'limit': 100
}]
filters = [
{
"authors": [self.key_manager.keys.public_key_hex()],
"kinds": [Event.KIND_TEXT_NOTE, Event.KIND_ENCRYPT],
"limit": 100,
}
]
await self.subscribe_async(filters=filters, handler=handler)
logger.info("Subscribed to your feed.")
@@ -327,11 +358,16 @@ class NostrClient:
try:
await asyncio.gather(
self.do_post_async(text),
self.subscribe_feed_async(self.event_handler.handle_new_event)
self.subscribe_feed_async(self.event_handler.handle_new_event),
)
except Exception as e:
logger.error(f"An error occurred in publish_and_subscribe_async: {e}", exc_info=True)
print(f"Error: An error occurred in publish and subscribe: {e}", file=sys.stderr)
logger.error(
f"An error occurred in publish_and_subscribe_async: {e}", exc_info=True
)
print(
f"Error: An error occurred in publish and subscribe: {e}",
file=sys.stderr,
)
def publish_and_subscribe(self, text: str):
"""
@@ -340,7 +376,9 @@ class NostrClient:
:param text: The content of the text note to publish.
"""
try:
asyncio.run_coroutine_threadsafe(self.publish_and_subscribe_async(text), self.loop)
asyncio.run_coroutine_threadsafe(
self.publish_and_subscribe_async(text), self.loop
)
except Exception as e:
logger.error(f"Error in publish_and_subscribe: {e}", exc_info=True)
print(f"Error: Failed to publish and subscribe: {e}", file=sys.stderr)
@@ -353,15 +391,19 @@ class NostrClient:
"""
try:
decrypted_data = self.encryption_manager.decrypt_data(encrypted_data)
data = json.loads(decrypted_data.decode('utf-8'))
data = json.loads(decrypted_data.decode("utf-8"))
self.save_json_data(data)
self.update_checksum()
logger.info("Index file updated from Nostr successfully.")
print(colored("Index file updated from Nostr successfully.", 'green'))
print(colored("Index file updated from Nostr successfully.", "green"))
except Exception as e:
logger.error(f"Failed to decrypt and save data from Nostr: {e}")
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to decrypt and save data from Nostr: {e}", 'red'))
print(
colored(
f"Error: Failed to decrypt and save data from Nostr: {e}", "red"
)
)
def save_json_data(self, data: dict) -> None:
"""
@@ -370,17 +412,19 @@ class NostrClient:
:param data: The JSON data to save.
"""
try:
encrypted_data = self.encryption_manager.encrypt_data(json.dumps(data).encode('utf-8'))
index_file_path = self.fingerprint_dir / 'seedpass_passwords_db.json.enc'
encrypted_data = self.encryption_manager.encrypt_data(
json.dumps(data).encode("utf-8")
)
index_file_path = self.fingerprint_dir / "seedpass_passwords_db.json.enc"
with lock_file(index_file_path, fcntl.LOCK_EX):
with open(index_file_path, 'wb') as f:
with open(index_file_path, "wb") as f:
f.write(encrypted_data)
logger.debug(f"Encrypted data saved to {index_file_path}.")
print(colored(f"Encrypted data saved to '{index_file_path}'.", 'green'))
print(colored(f"Encrypted data saved to '{index_file_path}'.", "green"))
except Exception as e:
logger.error(f"Failed to save encrypted data: {e}")
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to save encrypted data: {e}", 'red'))
print(colored(f"Error: Failed to save encrypted data: {e}", "red"))
raise
def update_checksum(self) -> None:
@@ -388,28 +432,30 @@ class NostrClient:
Updates the checksum file for the password database.
"""
try:
index_file_path = self.fingerprint_dir / 'seedpass_passwords_db.json.enc'
index_file_path = self.fingerprint_dir / "seedpass_passwords_db.json.enc"
decrypted_data = self.decrypt_data_from_file(index_file_path)
content = decrypted_data.decode('utf-8')
content = decrypted_data.decode("utf-8")
logger.debug("Calculating checksum of the updated file content.")
checksum = hashlib.sha256(content.encode('utf-8')).hexdigest()
checksum = hashlib.sha256(content.encode("utf-8")).hexdigest()
logger.debug(f"New checksum: {checksum}")
checksum_file = self.fingerprint_dir / 'seedpass_passwords_db_checksum.txt'
checksum_file = self.fingerprint_dir / "seedpass_passwords_db_checksum.txt"
with lock_file(checksum_file, fcntl.LOCK_EX):
with open(checksum_file, 'w') as f:
with open(checksum_file, "w") as f:
f.write(checksum)
os.chmod(checksum_file, 0o600)
logger.debug(f"Checksum for '{index_file_path}' updated and written to '{checksum_file}'.")
print(colored(f"Checksum for '{index_file_path}' updated.", 'green'))
logger.debug(
f"Checksum for '{index_file_path}' updated and written to '{checksum_file}'."
)
print(colored(f"Checksum for '{index_file_path}' updated.", "green"))
except Exception as e:
logger.error(f"Failed to update checksum: {e}")
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to update checksum: {e}", 'red'))
print(colored(f"Error: Failed to update checksum: {e}", "red"))
def decrypt_data_from_file(self, file_path: Path) -> bytes:
"""
@@ -420,7 +466,7 @@ class NostrClient:
"""
try:
with lock_file(file_path, fcntl.LOCK_SH):
with open(file_path, 'rb') as f:
with open(file_path, "rb") as f:
encrypted_data = f.read()
decrypted_data = self.encryption_manager.decrypt_data(encrypted_data)
logger.debug(f"Data decrypted from file '{file_path}'.")
@@ -428,7 +474,11 @@ class NostrClient:
except Exception as e:
logger.error(f"Failed to decrypt data from file '{file_path}': {e}")
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to decrypt data from file '{file_path}': {e}", 'red'))
print(
colored(
f"Error: Failed to decrypt data from file '{file_path}': {e}", "red"
)
)
raise
def publish_json_to_nostr(self, encrypted_json: bytes, to_pubkey: str = None):
@@ -439,10 +489,14 @@ class NostrClient:
:param to_pubkey: (Optional) The recipient's public key for encryption.
"""
try:
encrypted_json_b64 = base64.b64encode(encrypted_json).decode('utf-8')
encrypted_json_b64 = base64.b64encode(encrypted_json).decode("utf-8")
logger.debug(f"Encrypted JSON (base64): {encrypted_json_b64}")
event = Event(kind=Event.KIND_TEXT_NOTE, content=encrypted_json_b64, pub_key=self.key_manager.keys.public_key_hex())
event = Event(
kind=Event.KIND_TEXT_NOTE,
content=encrypted_json_b64,
pub_key=self.key_manager.keys.public_key_hex(),
)
event.created_at = int(time.time())
@@ -471,7 +525,9 @@ class NostrClient:
Optional[bytes]: The encrypted data as bytes if successful, None otherwise.
"""
try:
future = asyncio.run_coroutine_threadsafe(self.retrieve_json_from_nostr_async(), self.loop)
future = asyncio.run_coroutine_threadsafe(
self.retrieve_json_from_nostr_async(), self.loop
)
content_base64 = future.result(timeout=10)
if not content_base64:
@@ -479,17 +535,22 @@ class NostrClient:
return None
# Base64-decode the content
encrypted_data = base64.urlsafe_b64decode(content_base64.encode('utf-8'))
logger.debug("Encrypted data retrieved and Base64-decoded successfully from Nostr.")
encrypted_data = base64.urlsafe_b64decode(content_base64.encode("utf-8"))
logger.debug(
"Encrypted data retrieved and Base64-decoded successfully from Nostr."
)
return encrypted_data
except concurrent.futures.TimeoutError:
logger.error("Timeout occurred while retrieving JSON from Nostr.")
print("Error: Timeout occurred while retrieving JSON from Nostr.", file=sys.stderr)
print(
"Error: Timeout occurred while retrieving JSON from Nostr.",
file=sys.stderr,
)
return None
except Exception as e:
logger.error(f"Error in retrieve_json_from_nostr: {e}")
logger.error(traceback.format_exc())
print(f"Error: Failed to retrieve JSON from Nostr: {e}", 'red')
print(f"Error: Failed to retrieve JSON from Nostr: {e}", "red")
return None
def decrypt_and_save_index_from_nostr_public(self, encrypted_data: bytes) -> None:
@@ -502,7 +563,7 @@ class NostrClient:
self.decrypt_and_save_index_from_nostr(encrypted_data)
except Exception as e:
logger.error(f"Failed to decrypt and save index from Nostr: {e}")
print(f"Error: Failed to decrypt and save index from Nostr: {e}", 'red')
print(f"Error: Failed to decrypt and save index from Nostr: {e}", "red")
async def close_client_pool_async(self):
"""
@@ -529,14 +590,20 @@ class NostrClient:
logger.warning(f"Error unsubscribing from {sub_id}: {e}")
# Close all WebSocket connections
if hasattr(self.client_pool, 'clients'):
tasks = [self.safe_close_connection(client) for client in self.client_pool.clients]
if hasattr(self.client_pool, "clients"):
tasks = [
self.safe_close_connection(client)
for client in self.client_pool.clients
]
await asyncio.gather(*tasks, return_exceptions=True)
# Gather and cancel all tasks
current_task = asyncio.current_task()
tasks = [task for task in asyncio.all_tasks(loop=self.loop)
if task != current_task and not task.done()]
tasks = [
task
for task in asyncio.all_tasks(loop=self.loop)
if task != current_task and not task.done()
]
if tasks:
logger.debug(f"Cancelling {len(tasks)} pending tasks.")
@@ -545,7 +612,9 @@ class NostrClient:
# Wait for all tasks to be cancelled with a timeout
try:
await asyncio.wait_for(asyncio.gather(*tasks, return_exceptions=True), timeout=5)
await asyncio.wait_for(
asyncio.gather(*tasks, return_exceptions=True), timeout=5
)
except asyncio.TimeoutError:
logger.warning("Timeout waiting for tasks to cancel")
@@ -569,36 +638,40 @@ class NostrClient:
try:
# Schedule the coroutine to close the client pool
future = asyncio.run_coroutine_threadsafe(self.close_client_pool_async(), self.loop)
future = asyncio.run_coroutine_threadsafe(
self.close_client_pool_async(), self.loop
)
# Wait for the coroutine to finish with a timeout
try:
future.result(timeout=10)
except concurrent.futures.TimeoutError:
logger.warning("Initial shutdown attempt timed out, forcing cleanup...")
# Additional cleanup regardless of timeout
try:
self.loop.call_soon_threadsafe(self.loop.stop)
# Give a short grace period for the loop to stop
time.sleep(0.5)
if self.loop.is_running():
logger.warning("Loop still running after stop, closing forcefully")
self.loop.call_soon_threadsafe(self.loop.close)
# Wait for the thread with a reasonable timeout
if self.loop_thread.is_alive():
self.loop_thread.join(timeout=5)
if self.loop_thread.is_alive():
logger.warning("Thread still alive after join, may need to be force-killed")
logger.warning(
"Thread still alive after join, may need to be force-killed"
)
except Exception as cleanup_error:
logger.error(f"Error during final cleanup: {cleanup_error}")
logger.info("ClientPool shutdown complete")
except Exception as e:
logger.error(f"Error in close_client_pool: {e}")
logger.error(traceback.format_exc())
@@ -610,6 +683,8 @@ class NostrClient:
await client.close_connection()
logger.debug(f"Closed connection to relay: {client.url}")
except AttributeError:
logger.warning(f"Client object has no attribute 'close_connection'. Skipping closure for {client.url}.")
logger.warning(
f"Client object has no attribute 'close_connection'. Skipping closure for {client.url}."
)
except Exception as e:
logger.warning(f"Error closing connection to {client.url}: {e}")

View File

@@ -10,11 +10,12 @@ from .key_manager import KeyManager
# Instantiate the logger
logger = logging.getLogger(__name__)
class EncryptionManager:
"""
Manages encryption and decryption using Fernet symmetric encryption.
"""
def __init__(self, key_manager: KeyManager):
"""
Initializes the EncryptionManager with a Fernet instance.
@@ -25,24 +26,26 @@ class EncryptionManager:
# Derive the raw encryption key (32 bytes)
raw_key = key_manager.derive_encryption_key()
logger.debug(f"Derived raw encryption key length: {len(raw_key)} bytes")
# Ensure the raw key is exactly 32 bytes
if len(raw_key) != 32:
raise ValueError(f"Derived key length is {len(raw_key)} bytes; expected 32 bytes.")
raise ValueError(
f"Derived key length is {len(raw_key)} bytes; expected 32 bytes."
)
# Base64-encode the raw key to make it URL-safe
b64_key = base64.urlsafe_b64encode(raw_key)
logger.debug(f"Base64-encoded encryption key length: {len(b64_key)} bytes")
# Initialize Fernet with the base64-encoded key
self.fernet = Fernet(b64_key)
logger.info("Fernet encryption manager initialized successfully.")
except Exception as e:
logger.error(f"EncryptionManager initialization failed: {e}")
logger.error(traceback.format_exc())
raise
def encrypt_parent_seed(self, seed: str, file_path: str) -> None:
"""
Encrypts the parent seed and saves it to the specified file.
@@ -51,15 +54,15 @@ class EncryptionManager:
:param file_path: The file path to save the encrypted seed.
"""
try:
encrypted_seed = self.fernet.encrypt(seed.encode('utf-8'))
with open(file_path, 'wb') as f:
encrypted_seed = self.fernet.encrypt(seed.encode("utf-8"))
with open(file_path, "wb") as f:
f.write(encrypted_seed)
logger.debug(f"Parent seed encrypted and saved to '{file_path}'.")
except Exception as e:
logger.error(f"Failed to encrypt and save parent seed: {e}")
logger.error(traceback.format_exc())
raise
def decrypt_parent_seed(self, file_path: str) -> str:
"""
Decrypts the parent seed from the specified file.
@@ -68,19 +71,23 @@ class EncryptionManager:
:return: The decrypted parent seed as a string.
"""
try:
with open(file_path, 'rb') as f:
with open(file_path, "rb") as f:
encrypted_seed = f.read()
decrypted_seed = self.fernet.decrypt(encrypted_seed).decode('utf-8')
decrypted_seed = self.fernet.decrypt(encrypted_seed).decode("utf-8")
logger.debug(f"Parent seed decrypted successfully from '{file_path}'.")
return decrypted_seed
except InvalidToken:
logger.error("Decryption failed: Invalid token. Possibly incorrect password or corrupted file.")
raise ValueError("Decryption failed: Invalid token. Possibly incorrect password or corrupted file.")
logger.error(
"Decryption failed: Invalid token. Possibly incorrect password or corrupted file."
)
raise ValueError(
"Decryption failed: Invalid token. Possibly incorrect password or corrupted file."
)
except Exception as e:
logger.error(f"Failed to decrypt parent seed: {e}")
logger.error(traceback.format_exc())
raise
def encrypt_data(self, data: dict) -> bytes:
"""
Encrypts a dictionary by serializing it to JSON and then encrypting it.
@@ -89,7 +96,7 @@ class EncryptionManager:
:return: Encrypted data as bytes.
"""
try:
json_data = json.dumps(data).encode('utf-8')
json_data = json.dumps(data).encode("utf-8")
encrypted = self.fernet.encrypt(json_data)
logger.debug("Data encrypted successfully.")
return encrypted
@@ -97,7 +104,7 @@ class EncryptionManager:
logger.error(f"Data encryption failed: {e}")
logger.error(traceback.format_exc())
raise
def decrypt_data(self, encrypted_data: bytes) -> bytes:
"""
Decrypts encrypted data.

View File

@@ -8,6 +8,7 @@ from monstr.event.event import Event
# Instantiate the logger
logger = logging.getLogger(__name__)
class EventHandler:
"""
Handles incoming Nostr events.
@@ -25,7 +26,9 @@ class EventHandler:
try:
# Assuming evt.created_at is always an integer Unix timestamp
if isinstance(evt.created_at, int):
created_at_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(evt.created_at))
created_at_str = time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(evt.created_at)
)
else:
# Handle unexpected types gracefully
created_at_str = str(evt.created_at)

View File

@@ -11,6 +11,7 @@ from monstr.encrypt import Keys
logger = logging.getLogger(__name__)
class KeyManager:
"""
Manages key generation, encoding, and derivation for NostrClient.
@@ -26,9 +27,13 @@ class KeyManager:
"""
try:
if not isinstance(parent_seed, str):
raise TypeError(f"Parent seed must be a string, got {type(parent_seed)}")
raise TypeError(
f"Parent seed must be a string, got {type(parent_seed)}"
)
if not isinstance(fingerprint, str):
raise TypeError(f"Fingerprint must be a string, got {type(fingerprint)}")
raise TypeError(
f"Fingerprint must be a string, got {type(fingerprint)}"
)
self.parent_seed = parent_seed
self.fingerprint = fingerprint
@@ -72,12 +77,14 @@ class KeyManager:
"""
try:
# Convert fingerprint to an integer index (using a hash function)
index = int(hashlib.sha256(self.fingerprint.encode()).hexdigest(), 16) % (2**31)
index = int(hashlib.sha256(self.fingerprint.encode()).hexdigest(), 16) % (
2**31
)
# Derive entropy for Nostr key (32 bytes)
entropy_bytes = self.bip85.derive_entropy(
index=index,
bytes_len=32 # Adjust parameter name and value as per your method signature
bytes_len=32, # Adjust parameter name and value as per your method signature
)
# Generate Nostr key pair from entropy
@@ -107,7 +114,7 @@ class KeyManager:
str: The private key in hex.
"""
return self.keys.private_key_hex()
def get_npub(self) -> str:
"""
Returns the npub (Bech32 encoded public key).
@@ -119,7 +126,7 @@ class KeyManager:
pub_key_hex = self.get_public_key_hex()
pub_key_bytes = bytes.fromhex(pub_key_hex)
data = convertbits(pub_key_bytes, 8, 5, True)
npub = bech32_encode('npub', data)
npub = bech32_encode("npub", data)
return npub
except Exception as e:
logger.error(f"Failed to generate npub: {e}")

View File

@@ -2,6 +2,7 @@
import logging
# Example utility function (if any specific to nostr package)
def some_helper_function():
pass # Implement as needed

View File

@@ -30,12 +30,14 @@ import fcntl # For file locking
# Instantiate the logger
logger = logging.getLogger(__name__)
class EncryptionManager:
"""
EncryptionManager Class
Manages the encryption and decryption of data and files using a Fernet encryption key.
"""
def __init__(self, encryption_key: bytes, fingerprint_dir: Path):
"""
Initializes the EncryptionManager with the provided encryption key and fingerprint directory.
@@ -45,16 +47,20 @@ class EncryptionManager:
fingerprint_dir (Path): The directory corresponding to the fingerprint.
"""
self.fingerprint_dir = fingerprint_dir
self.parent_seed_file = self.fingerprint_dir / 'parent_seed.enc'
self.parent_seed_file = self.fingerprint_dir / "parent_seed.enc"
self.key = encryption_key
try:
self.fernet = Fernet(self.key)
logger.debug(f"EncryptionManager initialized for {self.fingerprint_dir}")
except Exception as e:
logger.error(f"Failed to initialize Fernet with provided encryption key: {e}")
logger.error(
f"Failed to initialize Fernet with provided encryption key: {e}"
)
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to initialize encryption manager: {e}", 'red'))
print(
colored(f"Error: Failed to initialize encryption manager: {e}", "red")
)
raise
def encrypt_parent_seed(self, parent_seed: str) -> None:
@@ -65,25 +71,32 @@ class EncryptionManager:
"""
try:
# Convert seed to bytes
data = parent_seed.encode('utf-8')
data = parent_seed.encode("utf-8")
# Encrypt the data
encrypted_data = self.encrypt_data(data)
# Write the encrypted data to the file with locking
with lock_file(self.parent_seed_file, fcntl.LOCK_EX):
with open(self.parent_seed_file, 'wb') as f:
with open(self.parent_seed_file, "wb") as f:
f.write(encrypted_data)
# Set file permissions to read/write for the user only
os.chmod(self.parent_seed_file, 0o600)
logger.info(f"Parent seed encrypted and saved to '{self.parent_seed_file}'.")
print(colored(f"Parent seed encrypted and saved to '{self.parent_seed_file}'.", 'green'))
logger.info(
f"Parent seed encrypted and saved to '{self.parent_seed_file}'."
)
print(
colored(
f"Parent seed encrypted and saved to '{self.parent_seed_file}'.",
"green",
)
)
except Exception as e:
logger.error(f"Failed to encrypt and save parent seed: {e}")
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to encrypt and save parent seed: {e}", 'red'))
print(colored(f"Error: Failed to encrypt and save parent seed: {e}", "red"))
raise
def decrypt_parent_seed(self) -> str:
@@ -93,24 +106,28 @@ class EncryptionManager:
:return: The decrypted parent seed.
"""
try:
parent_seed_path = self.fingerprint_dir / 'parent_seed.enc'
parent_seed_path = self.fingerprint_dir / "parent_seed.enc"
with lock_file(parent_seed_path, fcntl.LOCK_SH):
with open(parent_seed_path, 'rb') as f:
with open(parent_seed_path, "rb") as f:
encrypted_data = f.read()
decrypted_data = self.decrypt_data(encrypted_data)
parent_seed = decrypted_data.decode('utf-8').strip()
parent_seed = decrypted_data.decode("utf-8").strip()
logger.debug(f"Parent seed decrypted successfully from '{parent_seed_path}'.")
logger.debug(
f"Parent seed decrypted successfully from '{parent_seed_path}'."
)
return parent_seed
except InvalidToken:
logger.error("Invalid encryption key or corrupted data while decrypting parent seed.")
print(colored("Error: Invalid encryption key or corrupted data.", 'red'))
logger.error(
"Invalid encryption key or corrupted data while decrypting parent seed."
)
print(colored("Error: Invalid encryption key or corrupted data.", "red"))
raise
except Exception as e:
logger.error(f"Failed to decrypt parent seed: {e}")
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to decrypt parent seed: {e}", 'red'))
print(colored(f"Error: Failed to decrypt parent seed: {e}", "red"))
raise
def encrypt_data(self, data: bytes) -> bytes:
@@ -127,7 +144,7 @@ class EncryptionManager:
except Exception as e:
logger.error(f"Failed to encrypt data: {e}")
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to encrypt data: {e}", 'red'))
print(colored(f"Error: Failed to encrypt data: {e}", "red"))
raise
def decrypt_data(self, encrypted_data: bytes) -> bytes:
@@ -142,13 +159,15 @@ class EncryptionManager:
logger.debug("Data decrypted successfully.")
return decrypted_data
except InvalidToken:
logger.error("Invalid encryption key or corrupted data while decrypting data.")
print(colored("Error: Invalid encryption key or corrupted data.", 'red'))
logger.error(
"Invalid encryption key or corrupted data while decrypting data."
)
print(colored("Error: Invalid encryption key or corrupted data.", "red"))
raise
except Exception as e:
logger.error(f"Failed to decrypt data: {e}")
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to decrypt data: {e}", 'red'))
print(colored(f"Error: Failed to decrypt data: {e}", "red"))
raise
def encrypt_and_save_file(self, data: bytes, relative_path: Path) -> None:
@@ -170,18 +189,23 @@ class EncryptionManager:
# Write the encrypted data to the file with locking
with lock_file(file_path, fcntl.LOCK_EX):
with open(file_path, 'wb') as f:
with open(file_path, "wb") as f:
f.write(encrypted_data)
# Set file permissions to read/write for the user only
os.chmod(file_path, 0o600)
logger.info(f"Data encrypted and saved to '{file_path}'.")
print(colored(f"Data encrypted and saved to '{file_path}'.", 'green'))
print(colored(f"Data encrypted and saved to '{file_path}'.", "green"))
except Exception as e:
logger.error(f"Failed to encrypt and save data to '{relative_path}': {e}")
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to encrypt and save data to '{relative_path}': {e}", 'red'))
print(
colored(
f"Error: Failed to encrypt and save data to '{relative_path}': {e}",
"red",
)
)
raise
def decrypt_file(self, relative_path: Path) -> bytes:
@@ -197,7 +221,7 @@ class EncryptionManager:
# Read the encrypted data with locking
with lock_file(file_path, fcntl.LOCK_SH):
with open(file_path, 'rb') as f:
with open(file_path, "rb") as f:
encrypted_data = f.read()
# Decrypt the data
@@ -205,13 +229,19 @@ class EncryptionManager:
logger.debug(f"Data decrypted successfully from '{file_path}'.")
return decrypted_data
except InvalidToken:
logger.error("Invalid encryption key or corrupted data while decrypting file.")
print(colored("Error: Invalid encryption key or corrupted data.", 'red'))
logger.error(
"Invalid encryption key or corrupted data while decrypting file."
)
print(colored("Error: Invalid encryption key or corrupted data.", "red"))
raise
except Exception as e:
logger.error(f"Failed to decrypt data from '{relative_path}': {e}")
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to decrypt data from '{relative_path}': {e}", 'red'))
print(
colored(
f"Error: Failed to decrypt data from '{relative_path}': {e}", "red"
)
)
raise
def save_json_data(self, data: dict, relative_path: Optional[Path] = None) -> None:
@@ -223,16 +253,22 @@ class EncryptionManager:
Defaults to 'seedpass_passwords_db.json.enc'.
"""
if relative_path is None:
relative_path = Path('seedpass_passwords_db.json.enc')
relative_path = Path("seedpass_passwords_db.json.enc")
try:
json_data = json.dumps(data, indent=4).encode('utf-8')
json_data = json.dumps(data, indent=4).encode("utf-8")
self.encrypt_and_save_file(json_data, relative_path)
logger.debug(f"JSON data encrypted and saved to '{relative_path}'.")
print(colored(f"JSON data encrypted and saved to '{relative_path}'.", 'green'))
print(
colored(f"JSON data encrypted and saved to '{relative_path}'.", "green")
)
except Exception as e:
logger.error(f"Failed to save JSON data to '{relative_path}': {e}")
logger.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to save JSON data to '{relative_path}': {e}", 'red'))
print(
colored(
f"Error: Failed to save JSON data to '{relative_path}': {e}", "red"
)
)
raise
def load_json_data(self, relative_path: Optional[Path] = None) -> dict:
@@ -244,35 +280,54 @@ class EncryptionManager:
:return: The decrypted JSON data as a dictionary.
"""
if relative_path is None:
relative_path = Path('seedpass_passwords_db.json.enc')
relative_path = Path("seedpass_passwords_db.json.enc")
file_path = self.fingerprint_dir / relative_path
if not file_path.exists():
logger.info(f"Index file '{file_path}' does not exist. Initializing empty data.")
print(colored(f"Info: Index file '{file_path}' not found. Initializing new password database.", 'yellow'))
return {'passwords': {}}
logger.info(
f"Index file '{file_path}' does not exist. Initializing empty data."
)
print(
colored(
f"Info: Index file '{file_path}' not found. Initializing new password database.",
"yellow",
)
)
return {"passwords": {}}
try:
decrypted_data = self.decrypt_file(relative_path)
json_content = decrypted_data.decode('utf-8').strip()
json_content = decrypted_data.decode("utf-8").strip()
data = json.loads(json_content)
logger.debug(f"JSON data loaded and decrypted from '{file_path}': {data}")
print(colored(f"JSON data loaded and decrypted from '{file_path}'.", 'green'))
print(
colored(f"JSON data loaded and decrypted from '{file_path}'.", "green")
)
return data
except json.JSONDecodeError as e:
logger.error(f"Failed to decode JSON data from '{file_path}': {e}")
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to decode JSON data from '{file_path}': {e}", 'red'))
print(
colored(
f"Error: Failed to decode JSON data from '{file_path}': {e}", "red"
)
)
raise
except InvalidToken:
logger.error("Invalid encryption key or corrupted data while decrypting JSON data.")
print(colored("Error: Invalid encryption key or corrupted data.", 'red'))
logger.error(
"Invalid encryption key or corrupted data while decrypting JSON data."
)
print(colored("Error: Invalid encryption key or corrupted data.", "red"))
raise
except Exception as e:
logger.error(f"Failed to load JSON data from '{file_path}': {e}")
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to load JSON data from '{file_path}': {e}", 'red'))
print(
colored(
f"Error: Failed to load JSON data from '{file_path}': {e}", "red"
)
)
raise
def update_checksum(self, relative_path: Optional[Path] = None) -> None:
@@ -283,32 +338,39 @@ class EncryptionManager:
Defaults to 'seedpass_passwords_db.json.enc'.
"""
if relative_path is None:
relative_path = Path('seedpass_passwords_db.json.enc')
relative_path = Path("seedpass_passwords_db.json.enc")
try:
file_path = self.fingerprint_dir / relative_path
decrypted_data = self.decrypt_file(relative_path)
content = decrypted_data.decode('utf-8')
content = decrypted_data.decode("utf-8")
logger.debug("Calculating checksum of the updated file content.")
checksum = hashlib.sha256(content.encode('utf-8')).hexdigest()
checksum = hashlib.sha256(content.encode("utf-8")).hexdigest()
logger.debug(f"New checksum: {checksum}")
checksum_file = file_path.parent / f"{file_path.stem}_checksum.txt"
# Write the checksum to the file with locking
with lock_file(checksum_file, fcntl.LOCK_EX):
with open(checksum_file, 'w') as f:
with open(checksum_file, "w") as f:
f.write(checksum)
# Set file permissions to read/write for the user only
os.chmod(checksum_file, 0o600)
logger.debug(f"Checksum for '{file_path}' updated and written to '{checksum_file}'.")
print(colored(f"Checksum for '{file_path}' updated.", 'green'))
logger.debug(
f"Checksum for '{file_path}' updated and written to '{checksum_file}'."
)
print(colored(f"Checksum for '{file_path}' updated.", "green"))
except Exception as e:
logger.error(f"Failed to update checksum for '{relative_path}': {e}")
logger.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to update checksum for '{relative_path}': {e}", 'red'))
print(
colored(
f"Error: Failed to update checksum for '{relative_path}': {e}",
"red",
)
)
raise
def get_encrypted_index(self) -> Optional[bytes]:
@@ -318,14 +380,20 @@ class EncryptionManager:
:return: Encrypted data as bytes or None if the index file does not exist.
"""
try:
relative_path = Path('seedpass_passwords_db.json.enc')
relative_path = Path("seedpass_passwords_db.json.enc")
if not (self.fingerprint_dir / relative_path).exists():
logger.error(f"Index file '{relative_path}' does not exist in '{self.fingerprint_dir}'.")
print(colored(f"Error: Index file '{relative_path}' does not exist.", 'red'))
logger.error(
f"Index file '{relative_path}' does not exist in '{self.fingerprint_dir}'."
)
print(
colored(
f"Error: Index file '{relative_path}' does not exist.", "red"
)
)
return None
with lock_file(self.fingerprint_dir / relative_path, fcntl.LOCK_SH):
with open(self.fingerprint_dir / relative_path, 'rb') as file:
with open(self.fingerprint_dir / relative_path, "rb") as file:
encrypted_data = file.read()
logger.debug(f"Encrypted index data read from '{relative_path}'.")
@@ -333,10 +401,17 @@ class EncryptionManager:
except Exception as e:
logger.error(f"Failed to read encrypted index file '{relative_path}': {e}")
logger.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to read encrypted index file '{relative_path}': {e}", 'red'))
print(
colored(
f"Error: Failed to read encrypted index file '{relative_path}': {e}",
"red",
)
)
return None
def decrypt_and_save_index_from_nostr(self, encrypted_data: bytes, relative_path: Optional[Path] = None) -> None:
def decrypt_and_save_index_from_nostr(
self, encrypted_data: bytes, relative_path: Optional[Path] = None
) -> None:
"""
Decrypts the encrypted data retrieved from Nostr and updates the local index file.
@@ -345,18 +420,22 @@ class EncryptionManager:
Defaults to 'seedpass_passwords_db.json.enc'.
"""
if relative_path is None:
relative_path = Path('seedpass_passwords_db.json.enc')
relative_path = Path("seedpass_passwords_db.json.enc")
try:
decrypted_data = self.decrypt_data(encrypted_data)
data = json.loads(decrypted_data.decode('utf-8'))
data = json.loads(decrypted_data.decode("utf-8"))
self.save_json_data(data, relative_path)
self.update_checksum(relative_path)
logger.info("Index file updated from Nostr successfully.")
print(colored("Index file updated from Nostr successfully.", 'green'))
print(colored("Index file updated from Nostr successfully.", "green"))
except Exception as e:
logger.error(f"Failed to decrypt and save data from Nostr: {e}")
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to decrypt and save data from Nostr: {e}", 'red'))
print(
colored(
f"Error: Failed to decrypt and save data from Nostr: {e}", "red"
)
)
# Re-raise the exception to inform the calling function of the failure
raise
@@ -371,7 +450,9 @@ class EncryptionManager:
words = seed_phrase.split()
if len(words) != 12:
logger.error("Seed phrase does not contain exactly 12 words.")
print(colored("Error: Seed phrase must contain exactly 12 words.", 'red'))
print(
colored("Error: Seed phrase must contain exactly 12 words.", "red")
)
return False
# Additional validation can be added here (e.g., word list checks)
logger.debug("Seed phrase validated successfully.")
@@ -379,7 +460,7 @@ class EncryptionManager:
except Exception as e:
logging.error(f"Error validating seed phrase: {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to validate seed phrase: {e}", 'red'))
print(colored(f"Error: Failed to validate seed phrase: {e}", "red"))
return False
def derive_seed_from_mnemonic(self, mnemonic: str, passphrase: str = "") -> bytes:
@@ -399,11 +480,12 @@ class EncryptionManager:
if not isinstance(mnemonic, str):
raise TypeError("Mnemonic must be a string after conversion")
from bip_utils import Bip39SeedGenerator
seed = Bip39SeedGenerator(mnemonic).Generate(passphrase)
logger.debug("Seed derived successfully from mnemonic.")
return seed
except Exception as e:
logger.error(f"Failed to derive seed from mnemonic: {e}")
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to derive seed from mnemonic: {e}", 'red'))
print(colored(f"Error: Failed to derive seed from mnemonic: {e}", "red"))
raise

View File

@@ -122,36 +122,36 @@ class PasswordManager:
Prompts the user to select an existing fingerprint or add a new one.
"""
try:
print(colored("\nAvailable Fingerprints:", "cyan"))
print(colored("\nAvailable Seed Profiles:", "cyan"))
fingerprints = self.fingerprint_manager.list_fingerprints()
for idx, fp in enumerate(fingerprints, start=1):
print(colored(f"{idx}. {fp}", "cyan"))
print(colored(f"{len(fingerprints)+1}. Add a new fingerprint", "cyan"))
print(colored(f"{len(fingerprints)+1}. Add a new seed profile", "cyan"))
choice = input("Select a fingerprint by number: ").strip()
choice = input("Select a seed profile by number: ").strip()
if not choice.isdigit() or not (1 <= int(choice) <= len(fingerprints) + 1):
print(colored("Invalid selection. Exiting.", "red"))
sys.exit(1)
choice = int(choice)
if choice == len(fingerprints) + 1:
# Add a new fingerprint
# Add a new seed profile
self.add_new_fingerprint()
else:
# Select existing fingerprint
# Select existing seed profile
selected_fingerprint = fingerprints[choice - 1]
self.select_fingerprint(selected_fingerprint)
except Exception as e:
logger.error(f"Error during fingerprint selection: {e}")
logger.error(f"Error during seed profile selection: {e}")
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to select fingerprint: {e}", "red"))
print(colored(f"Error: Failed to select seed profile: {e}", "red"))
sys.exit(1)
def add_new_fingerprint(self):
"""
Adds a new fingerprint by generating it from a seed phrase.
Adds a new seed profile by generating it from a seed phrase.
"""
try:
choice = input(
@@ -169,15 +169,15 @@ class PasswordManager:
self.fingerprint_manager.current_fingerprint = fingerprint
print(
colored(
f"New fingerprint '{fingerprint}' added and set as current.",
f"New seed profile '{fingerprint}' added and set as current.",
"green",
)
)
except Exception as e:
logger.error(f"Error adding new fingerprint: {e}")
logger.error(f"Error adding new seed profile: {e}")
logger.error(traceback.format_exc())
print(colored(f"Error: Failed to add new fingerprint: {e}", "red"))
print(colored(f"Error: Failed to add new seed profile: {e}", "red"))
sys.exit(1)
def select_fingerprint(self, fingerprint: str) -> None:
@@ -189,7 +189,7 @@ class PasswordManager:
if not self.fingerprint_dir:
print(
colored(
f"Error: Fingerprint directory for {fingerprint} not found.",
f"Error: Seed profile directory for {fingerprint} not found.",
"red",
)
)
@@ -203,12 +203,12 @@ class PasswordManager:
self.sync_index_from_nostr_if_missing()
print(
colored(
f"Fingerprint {fingerprint} selected and managers initialized.",
f"Seed profile {fingerprint} selected and managers initialized.",
"green",
)
)
else:
print(colored(f"Error: Fingerprint {fingerprint} not found.", "red"))
print(colored(f"Error: Seed profile {fingerprint} not found.", "red"))
sys.exit(1)
def setup_encryption_manager(
@@ -267,18 +267,18 @@ class PasswordManager:
def handle_switch_fingerprint(self) -> bool:
"""
Handles switching to a different fingerprint.
Handles switching to a different seed profile.
Returns:
bool: True if switch was successful, False otherwise.
"""
try:
print(colored("\nAvailable Fingerprints:", "cyan"))
print(colored("\nAvailable Seed Profiles:", "cyan"))
fingerprints = self.fingerprint_manager.list_fingerprints()
for idx, fp in enumerate(fingerprints, start=1):
print(colored(f"{idx}. {fp}", "cyan"))
choice = input("Select a fingerprint by number to switch: ").strip()
choice = input("Select a seed profile by number to switch: ").strip()
if not choice.isdigit() or not (1 <= int(choice) <= len(fingerprints)):
print(colored("Invalid selection. Returning to main menu.", "red"))
return False # Return False to indicate failure
@@ -294,26 +294,26 @@ class PasswordManager:
if not self.fingerprint_dir:
print(
colored(
f"Error: Fingerprint directory for {selected_fingerprint} not found.",
f"Error: Seed profile directory for {selected_fingerprint} not found.",
"red",
)
)
return False # Return False to indicate failure
# Prompt for master password for the selected fingerprint
# Prompt for master password for the selected seed profile
password = prompt_existing_password("Enter your master password: ")
# Set up the encryption manager with the new password and fingerprint directory
# Set up the encryption manager with the new password and seed profile directory
self.setup_encryption_manager(self.fingerprint_dir, password)
# Load the parent seed for the selected fingerprint
# Load the parent seed for the selected seed profile
self.load_parent_seed(self.fingerprint_dir)
# Initialize BIP85 and other managers
self.initialize_bip85()
self.initialize_managers()
self.sync_index_from_nostr_if_missing()
print(colored(f"Switched to fingerprint {selected_fingerprint}.", "green"))
print(colored(f"Switched to seed profile {selected_fingerprint}.", "green"))
# Re-initialize NostrClient with the new fingerprint
try:
@@ -322,7 +322,7 @@ class PasswordManager:
fingerprint=self.current_fingerprint,
)
logging.info(
f"NostrClient re-initialized with fingerprint {self.current_fingerprint}."
f"NostrClient re-initialized with seed profile {self.current_fingerprint}."
)
except Exception as e:
logging.error(f"Failed to re-initialize NostrClient: {e}")
@@ -334,9 +334,9 @@ class PasswordManager:
return True # Return True to indicate success
except Exception as e:
logging.error(f"Error during fingerprint switching: {e}")
logging.error(f"Error during seed profile switching: {e}")
logging.error(traceback.format_exc())
print(colored(f"Error: Failed to switch fingerprints: {e}", "red"))
print(colored(f"Error: Failed to switch seed profiles: {e}", "red"))
return False # Return False to indicate failure
def handle_existing_seed(self) -> None:
@@ -355,22 +355,22 @@ class PasswordManager:
if not self.fingerprint_manager:
self.initialize_fingerprint_manager()
# Prompt the user to select an existing fingerprint
# Prompt the user to select an existing seed profile
fingerprints = self.fingerprint_manager.list_fingerprints()
if not fingerprints:
print(
colored(
"No fingerprints available. Please add a fingerprint first.",
"No seed profiles available. Please add a seed profile first.",
"red",
)
)
sys.exit(1)
print(colored("Available Fingerprints:", "cyan"))
print(colored("Available Seed Profiles:", "cyan"))
for idx, fp in enumerate(fingerprints, start=1):
print(colored(f"{idx}. {fp}", "cyan"))
choice = input("Select a fingerprint by number: ").strip()
choice = input("Select a seed profile by number: ").strip()
if not choice.isdigit() or not (1 <= int(choice) <= len(fingerprints)):
print(colored("Invalid selection. Exiting.", "red"))
sys.exit(1)
@@ -381,7 +381,7 @@ class PasswordManager:
selected_fingerprint
)
if not fingerprint_dir:
print(colored("Error: Fingerprint directory not found.", "red"))
print(colored("Error: Seed profile directory not found.", "red"))
sys.exit(1)
# Initialize EncryptionManager with key and fingerprint_dir
@@ -442,7 +442,7 @@ class PasswordManager:
if not fingerprint:
print(
colored(
"Error: Failed to generate fingerprint for the provided seed.",
"Error: Failed to generate seed profile for the provided seed.",
"red",
)
)
@@ -454,7 +454,7 @@ class PasswordManager:
if not fingerprint_dir:
print(
colored(
"Error: Failed to retrieve fingerprint directory.", "red"
"Error: Failed to retrieve seed profile directory.", "red"
)
)
sys.exit(1)
@@ -463,7 +463,7 @@ class PasswordManager:
self.current_fingerprint = fingerprint
self.fingerprint_manager.current_fingerprint = fingerprint
self.fingerprint_dir = fingerprint_dir
logging.info(f"Current fingerprint set to {fingerprint}")
logging.info(f"Current seed profile set to {fingerprint}")
# Initialize EncryptionManager with key and fingerprint_dir
password = prompt_for_password()
@@ -514,7 +514,8 @@ class PasswordManager:
if not fingerprint:
print(
colored(
"Error: Failed to generate fingerprint for the new seed.", "red"
"Error: Failed to generate seed profile for the new seed.",
"red",
)
)
sys.exit(1)
@@ -524,14 +525,14 @@ class PasswordManager:
)
if not fingerprint_dir:
print(
colored("Error: Failed to retrieve fingerprint directory.", "red")
colored("Error: Failed to retrieve seed profile directory.", "red")
)
sys.exit(1)
# Set the current fingerprint in both PasswordManager and FingerprintManager
self.current_fingerprint = fingerprint
self.fingerprint_manager.current_fingerprint = fingerprint
logging.info(f"Current fingerprint set to {fingerprint}")
logging.info(f"Current seed profile set to {fingerprint}")
# Now, save and encrypt the seed with the fingerprint_dir
self.save_and_encrypt_seed(new_seed, fingerprint_dir)
@@ -696,7 +697,7 @@ class PasswordManager:
except Exception as e:
logger.warning(f"Unable to sync index from Nostr: {e}")
def handle_generate_password(self) -> None:
def handle_add_password(self) -> None:
try:
website_name = input("Enter the website name: ").strip()
if not website_name:
@@ -735,17 +736,31 @@ class PasswordManager:
# Provide user feedback
print(
colored(
f"\n[+] Password generated and indexed with ID {index}.\n", "green"
f"\n[+] Password generated and indexed with ID {index}.\n",
"green",
)
)
print(colored(f"Password for {website_name}: {password}\n", "yellow"))
# Automatically push the updated encrypted index to Nostr so the
# latest changes are backed up remotely.
try:
encrypted_data = self.get_encrypted_data()
if encrypted_data:
self.nostr_client.publish_json_to_nostr(encrypted_data)
logging.info(
"Encrypted index posted to Nostr after entry addition."
)
except Exception as nostr_error:
logging.error(f"Failed to post updated index to Nostr: {nostr_error}")
logging.error(traceback.format_exc())
except Exception as e:
logging.error(f"Error during password generation: {e}")
logging.error(traceback.format_exc())
print(colored(f"Error: Failed to generate password: {e}", "red"))
def handle_retrieve_password(self) -> None:
def handle_retrieve_entry(self) -> None:
"""
Handles retrieving a password from the index by prompting the user for the index number
and displaying the corresponding password and associated details.
@@ -1002,7 +1017,7 @@ class PasswordManager:
Handles the backup and reveal of the parent seed.
"""
try:
print(colored("\n=== Backup/Reveal Parent Seed ===", "yellow"))
print(colored("\n=== Backup Parent Seed ===", "yellow"))
print(
colored(
"Warning: Revealing your parent seed is a highly sensitive operation.",
@@ -1207,8 +1222,8 @@ if __name__ == "__main__":
# Example operations
# These would typically be triggered by user interactions, e.g., via a CLI menu
# manager.handle_generate_password()
# manager.handle_retrieve_password()
# manager.handle_add_password()
# manager.handle_retrieve_entry()
# manager.handle_modify_entry()
# manager.handle_verify_checksum()
# manager.nostr_client.publish_and_subscribe("Sample password data")

View File

@@ -35,6 +35,7 @@ from password_manager.encryption import EncryptionManager
# Instantiate the logger
logger = logging.getLogger(__name__)
class PasswordGenerator:
"""
PasswordGenerator Class
@@ -44,7 +45,9 @@ class PasswordGenerator:
complexity requirements.
"""
def __init__(self, encryption_manager: EncryptionManager, parent_seed: str, bip85: BIP85):
def __init__(
self, encryption_manager: EncryptionManager, parent_seed: str, bip85: BIP85
):
"""
Initializes the PasswordGenerator with the encryption manager, parent seed, and BIP85 instance.
@@ -59,16 +62,20 @@ class PasswordGenerator:
self.bip85 = bip85
# Derive seed bytes from parent_seed using BIP39 (handled by EncryptionManager)
self.seed_bytes = self.encryption_manager.derive_seed_from_mnemonic(self.parent_seed)
self.seed_bytes = self.encryption_manager.derive_seed_from_mnemonic(
self.parent_seed
)
logger.debug("PasswordGenerator initialized successfully.")
except Exception as e:
logger.error(f"Failed to initialize PasswordGenerator: {e}")
logger.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to initialize PasswordGenerator: {e}", 'red'))
print(colored(f"Error: Failed to initialize PasswordGenerator: {e}", "red"))
raise
def generate_password(self, length: int = DEFAULT_PASSWORD_LENGTH, index: int = 0) -> str:
def generate_password(
self, length: int = DEFAULT_PASSWORD_LENGTH, index: int = 0
) -> str:
"""
Generates a deterministic password based on the parent seed, desired length, and index.
@@ -90,11 +97,19 @@ class PasswordGenerator:
try:
# Validate password length
if length < MIN_PASSWORD_LENGTH:
logger.error(f"Password length must be at least {MIN_PASSWORD_LENGTH} characters.")
raise ValueError(f"Password length must be at least {MIN_PASSWORD_LENGTH} characters.")
logger.error(
f"Password length must be at least {MIN_PASSWORD_LENGTH} characters."
)
raise ValueError(
f"Password length must be at least {MIN_PASSWORD_LENGTH} characters."
)
if length > MAX_PASSWORD_LENGTH:
logger.error(f"Password length must not exceed {MAX_PASSWORD_LENGTH} characters.")
raise ValueError(f"Password length must not exceed {MAX_PASSWORD_LENGTH} characters.")
logger.error(
f"Password length must not exceed {MAX_PASSWORD_LENGTH} characters."
)
raise ValueError(
f"Password length must not exceed {MAX_PASSWORD_LENGTH} characters."
)
# Derive entropy using BIP-85
entropy = self.bip85.derive_entropy(index=index, bytes_len=64, app_no=32)
@@ -105,39 +120,43 @@ class PasswordGenerator:
algorithm=hashes.SHA256(),
length=32, # 256 bits for AES-256
salt=None,
info=b'password-generation',
backend=default_backend()
info=b"password-generation",
backend=default_backend(),
)
derived_key = hkdf.derive(entropy)
logger.debug(f"Derived key using HKDF: {derived_key.hex()}")
# Use PBKDF2-HMAC-SHA256 to derive a key from entropy
dk = hashlib.pbkdf2_hmac('sha256', entropy, b'', 100000)
dk = hashlib.pbkdf2_hmac("sha256", entropy, b"", 100000)
logger.debug(f"Derived key using PBKDF2: {dk.hex()}")
# Map the derived key to all allowed characters
all_allowed = string.ascii_letters + string.digits + string.punctuation
password = ''.join(all_allowed[byte % len(all_allowed)] for byte in dk)
logger.debug(f"Password after mapping to all allowed characters: {password}")
password = "".join(all_allowed[byte % len(all_allowed)] for byte in dk)
logger.debug(
f"Password after mapping to all allowed characters: {password}"
)
# Ensure the password meets complexity requirements
password = self.ensure_complexity(password, all_allowed, dk)
logger.debug(f"Password after ensuring complexity: {password}")
# Shuffle characters deterministically based on dk
shuffle_seed = int.from_bytes(dk, 'big')
shuffle_seed = int.from_bytes(dk, "big")
rng = random.Random(shuffle_seed)
password_chars = list(password)
rng.shuffle(password_chars)
password = ''.join(password_chars)
password = "".join(password_chars)
logger.debug("Shuffled password deterministically.")
# Ensure password length by extending if necessary
if len(password) < length:
while len(password) < length:
dk = hashlib.pbkdf2_hmac('sha256', dk, b'', 1)
base64_extra = ''.join(all_allowed[byte % len(all_allowed)] for byte in dk)
password += ''.join(base64_extra)
dk = hashlib.pbkdf2_hmac("sha256", dk, b"", 1)
base64_extra = "".join(
all_allowed[byte % len(all_allowed)] for byte in dk
)
password += "".join(base64_extra)
logger.debug(f"Extended password: {password}")
# Trim the password to the desired length
@@ -149,7 +168,7 @@ class PasswordGenerator:
except Exception as e:
logger.error(f"Error generating password: {e}")
logger.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to generate password: {e}", 'red'))
print(colored(f"Error: Failed to generate password: {e}", "red"))
raise
def ensure_complexity(self, password: str, alphabet: str, dk: bytes) -> str:
@@ -180,7 +199,9 @@ class PasswordGenerator:
current_digits = sum(1 for c in password_chars if c in digits)
current_special = sum(1 for c in password_chars if c in special)
logger.debug(f"Current character counts - Upper: {current_upper}, Lower: {current_lower}, Digits: {current_digits}, Special: {current_special}")
logger.debug(
f"Current character counts - Upper: {current_upper}, Lower: {current_lower}, Digits: {current_digits}, Special: {current_special}"
)
# Set minimum counts
min_upper = 2
@@ -204,14 +225,18 @@ class PasswordGenerator:
index = get_dk_value() % len(password_chars)
char = uppercase[get_dk_value() % len(uppercase)]
password_chars[index] = char
logger.debug(f"Added uppercase letter '{char}' at position {index}.")
logger.debug(
f"Added uppercase letter '{char}' at position {index}."
)
if current_lower < min_lower:
for _ in range(min_lower - current_lower):
index = get_dk_value() % len(password_chars)
char = lowercase[get_dk_value() % len(lowercase)]
password_chars[index] = char
logger.debug(f"Added lowercase letter '{char}' at position {index}.")
logger.debug(
f"Added lowercase letter '{char}' at position {index}."
)
if current_digits < min_digits:
for _ in range(min_digits - current_digits):
@@ -225,7 +250,9 @@ class PasswordGenerator:
index = get_dk_value() % len(password_chars)
char = special[get_dk_value() % len(special)]
password_chars[index] = char
logger.debug(f"Added special character '{char}' at position {index}.")
logger.debug(
f"Added special character '{char}' at position {index}."
)
# Additional deterministic inclusion of symbols to increase score
symbol_target = 3 # Increase target number of symbols
@@ -253,11 +280,15 @@ class PasswordGenerator:
if i == 0 and password_chars[j] not in uppercase:
char = uppercase[get_dk_value() % len(uppercase)]
password_chars[j] = char
logger.debug(f"Assigned uppercase letter '{char}' to position {j}.")
logger.debug(
f"Assigned uppercase letter '{char}' to position {j}."
)
elif i == 1 and password_chars[j] not in lowercase:
char = lowercase[get_dk_value() % len(lowercase)]
password_chars[j] = char
logger.debug(f"Assigned lowercase letter '{char}' to position {j}.")
logger.debug(
f"Assigned lowercase letter '{char}' to position {j}."
)
elif i == 2 and password_chars[j] not in digits:
char = digits[get_dk_value() % len(digits)]
password_chars[j] = char
@@ -265,10 +296,14 @@ class PasswordGenerator:
elif i == 3 and password_chars[j] not in special:
char = special[get_dk_value() % len(special)]
password_chars[j] = char
logger.debug(f"Assigned special character '{char}' to position {j}.")
logger.debug(
f"Assigned special character '{char}' to position {j}."
)
# Shuffle again to distribute the characters more evenly
shuffle_seed = int.from_bytes(dk, 'big') + dk_index # Modify seed to vary shuffle
shuffle_seed = (
int.from_bytes(dk, "big") + dk_index
) # Modify seed to vary shuffle
rng = random.Random(shuffle_seed)
rng.shuffle(password_chars)
logger.debug(f"Shuffled password characters for balanced distribution.")
@@ -278,12 +313,14 @@ class PasswordGenerator:
final_lower = sum(1 for c in password_chars if c in lowercase)
final_digits = sum(1 for c in password_chars if c in digits)
final_special = sum(1 for c in password_chars if c in special)
logger.debug(f"Final character counts - Upper: {final_upper}, Lower: {final_lower}, Digits: {final_digits}, Special: {final_special}")
logger.debug(
f"Final character counts - Upper: {final_upper}, Lower: {final_lower}, Digits: {final_digits}, Special: {final_special}"
)
return ''.join(password_chars)
return "".join(password_chars)
except Exception as e:
logger.error(f"Error ensuring password complexity: {e}")
logger.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to ensure password complexity: {e}", 'red'))
print(colored(f"Error: Failed to ensure password complexity: {e}", "red"))
raise

View File

@@ -350,7 +350,7 @@ def display_menu(password_manager: PasswordManager):
5. Post Encrypted Index to Nostr
6. Retrieve Encrypted Index from Nostr
7. Display Nostr Public Key (npub)
8. Backup/Reveal Parent Seed
8. Backup Parent Seed
9. Switch Fingerprint
10. Add a New Fingerprint
11. Remove an Existing Fingerprint
@@ -1602,7 +1602,7 @@ class PasswordManager:
Handles the backup and reveal of the parent seed.
"""
try:
print(colored("\n=== Backup/Reveal Parent Seed ===", 'yellow'))
print(colored("\n=== Backup Parent Seed ===", 'yellow'))
print(colored("Warning: Revealing your parent seed is a highly sensitive operation.", 'red'))
print(colored("Ensure you're in a secure, private environment and no one is watching your screen.", 'red'))

View File

@@ -9,4 +9,5 @@ aiohttp
bcrypt
bip85
pytest>=7.0
pytest-cov

View File

@@ -0,0 +1,41 @@
import json
import sys
from pathlib import Path
from tempfile import TemporaryDirectory
from cryptography.fernet import Fernet
sys.path.append(str(Path(__file__).resolve().parents[1]))
from password_manager.encryption import EncryptionManager
def test_json_save_and_load_round_trip():
with TemporaryDirectory() as tmpdir:
key = Fernet.generate_key()
manager = EncryptionManager(key, Path(tmpdir))
data = {"hello": "world", "nums": [1, 2, 3]}
manager.save_json_data(data)
loaded = manager.load_json_data()
assert loaded == data
file_path = Path(tmpdir) / "seedpass_passwords_db.json.enc"
raw = file_path.read_bytes()
assert raw != json.dumps(data, indent=4).encode("utf-8")
def test_encrypt_and_decrypt_file_binary_round_trip():
with TemporaryDirectory() as tmpdir:
key = Fernet.generate_key()
manager = EncryptionManager(key, Path(tmpdir))
payload = b"binary secret"
rel = Path("payload.bin.enc")
manager.encrypt_and_save_file(payload, rel)
decrypted = manager.decrypt_file(rel)
assert decrypted == payload
file_path = Path(tmpdir) / rel
raw = file_path.read_bytes()
assert raw != payload

View File

@@ -2,6 +2,7 @@
try:
from bip_utils import Bip39SeedGenerator
print("Bip39SeedGenerator imported successfully.")
except ImportError as e:
print(f"ImportError: {e}")

View File

@@ -0,0 +1,41 @@
import sys
import importlib
from pathlib import Path
from tempfile import TemporaryDirectory
from unittest.mock import patch
sys.path.append(str(Path(__file__).resolve().parents[1]))
def setup_password_manager():
"""Instantiate PasswordManager using a temporary APP_DIR without running __init__."""
import constants
import password_manager.manager as manager_module
# Reload modules so constants use the mocked home directory
importlib.reload(constants)
importlib.reload(manager_module)
pm = manager_module.PasswordManager.__new__(manager_module.PasswordManager)
pm.fingerprint_manager = manager_module.FingerprintManager(constants.APP_DIR)
pm.current_fingerprint = None
pm.save_and_encrypt_seed = lambda seed, fingerprint_dir: None
return pm, constants
def test_generate_bip85_and_new_seed(monkeypatch):
with TemporaryDirectory() as tmpdir:
tmp_path = Path(tmpdir)
monkeypatch.setattr(Path, "home", lambda: tmp_path)
pm, const = setup_password_manager()
mnemonic = pm.generate_bip85_seed()
assert len(mnemonic.split()) == 12
with patch("password_manager.manager.confirm_action", return_value=True):
fingerprint = pm.generate_new_seed()
expected_dir = const.APP_DIR / fingerprint
assert expected_dir.exists()
assert expected_dir.is_dir()

View File

@@ -0,0 +1,88 @@
import sys
import importlib
from pathlib import Path
from tempfile import TemporaryDirectory
from types import SimpleNamespace
from unittest.mock import patch
from cryptography.fernet import Fernet
sys.path.append(str(Path(__file__).resolve().parents[1]))
import main
from nostr.client import DEFAULT_RELAYS
from password_manager.encryption import EncryptionManager
from password_manager.config_manager import ConfigManager
from utils.fingerprint_manager import FingerprintManager
def setup_pm(tmp_path, monkeypatch):
monkeypatch.setattr(Path, "home", lambda: tmp_path)
import constants
importlib.reload(constants)
importlib.reload(main)
fp_dir = constants.APP_DIR / "fp"
fp_dir.mkdir(parents=True)
enc_mgr = EncryptionManager(Fernet.generate_key(), fp_dir)
cfg_mgr = ConfigManager(enc_mgr, fp_dir)
fp_mgr = FingerprintManager(constants.APP_DIR)
nostr_stub = SimpleNamespace(
relays=list(DEFAULT_RELAYS),
close_client_pool=lambda: None,
initialize_client_pool=lambda: None,
publish_json_to_nostr=lambda data: None,
key_manager=SimpleNamespace(get_npub=lambda: "npub"),
)
pm = SimpleNamespace(
config_manager=cfg_mgr,
fingerprint_manager=fp_mgr,
nostr_client=nostr_stub,
)
return pm, cfg_mgr, fp_mgr
def test_relay_and_profile_actions(monkeypatch, capsys):
with TemporaryDirectory() as tmpdir:
tmp_path = Path(tmpdir)
pm, cfg_mgr, fp_mgr = setup_pm(tmp_path, monkeypatch)
# Add two fingerprints for listing
fp1 = fp_mgr.add_fingerprint(
"abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about"
)
fp2 = fp_mgr.add_fingerprint(
"legal winner thank year wave sausage worth useful legal winner thank yellow"
)
# Add a relay
with patch("builtins.input", return_value="wss://new"), patch(
"main.handle_post_to_nostr"
), patch("main._reload_relays"):
main.handle_add_relay(pm)
cfg = cfg_mgr.load_config(require_pin=False)
assert "wss://new" in cfg["relays"]
# Remove the relay
idx = cfg["relays"].index("wss://new") + 1
with patch("builtins.input", return_value=str(idx)), patch(
"main._reload_relays"
):
main.handle_remove_relay(pm)
cfg = cfg_mgr.load_config(require_pin=False)
assert "wss://new" not in cfg["relays"]
# Reset to defaults
with patch("main._reload_relays"):
main.handle_reset_relays(pm)
cfg = cfg_mgr.load_config(require_pin=False)
assert cfg["relays"] == list(DEFAULT_RELAYS)
# List profiles
main.handle_list_fingerprints(pm)
out = capsys.readouterr().out
assert fp1 in out
assert fp2 in out

View File

@@ -8,17 +8,17 @@ try:
from .key_derivation import derive_key_from_password, derive_key_from_parent_seed
from .checksum import calculate_checksum, verify_checksum
from .password_prompt import prompt_for_password
logging.info("Modules imported successfully.")
except Exception as e:
logging.error(f"Failed to import one or more modules: {e}")
logging.error(traceback.format_exc()) # Log full traceback
__all__ = [
'derive_key_from_password',
'derive_key_from_parent_seed',
'calculate_checksum',
'verify_checksum',
'lock_file',
'prompt_for_password'
"derive_key_from_password",
"derive_key_from_parent_seed",
"calculate_checksum",
"verify_checksum",
"lock_file",
"prompt_for_password",
]

View File

@@ -19,14 +19,12 @@ from typing import Optional
from termcolor import colored
from constants import (
APP_DIR,
SCRIPT_CHECKSUM_FILE
)
from constants import APP_DIR, SCRIPT_CHECKSUM_FILE
# Instantiate the logger
logger = logging.getLogger(__name__)
def calculate_checksum(file_path: str) -> Optional[str]:
"""
Calculates the SHA-256 checksum of the given file.
@@ -39,7 +37,7 @@ def calculate_checksum(file_path: str) -> Optional[str]:
"""
hasher = hashlib.sha256()
try:
with open(file_path, 'rb') as f:
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hasher.update(chunk)
checksum = hasher.hexdigest()
@@ -47,12 +45,20 @@ def calculate_checksum(file_path: str) -> Optional[str]:
return checksum
except FileNotFoundError:
logging.error(f"File '{file_path}' not found for checksum calculation.")
print(colored(f"Error: File '{file_path}' not found for checksum calculation.", 'red'))
print(
colored(
f"Error: File '{file_path}' not found for checksum calculation.", "red"
)
)
return None
except Exception as e:
logging.error(f"Error calculating checksum for '{file_path}': {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to calculate checksum for '{file_path}': {e}", 'red'))
print(
colored(
f"Error: Failed to calculate checksum for '{file_path}': {e}", "red"
)
)
return None
@@ -68,7 +74,7 @@ def verify_checksum(current_checksum: str, checksum_file_path: str) -> bool:
bool: True if checksums match, False otherwise.
"""
try:
with open(checksum_file_path, 'r') as f:
with open(checksum_file_path, "r") as f:
stored_checksum = f.read().strip()
if current_checksum == stored_checksum:
logging.debug(f"Checksum verification passed for '{checksum_file_path}'.")
@@ -78,12 +84,17 @@ def verify_checksum(current_checksum: str, checksum_file_path: str) -> bool:
return False
except FileNotFoundError:
logging.error(f"Checksum file '{checksum_file_path}' not found.")
print(colored(f"Error: Checksum file '{checksum_file_path}' not found.", 'red'))
print(colored(f"Error: Checksum file '{checksum_file_path}' not found.", "red"))
return False
except Exception as e:
logging.error(f"Error reading checksum file '{checksum_file_path}': {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to read checksum file '{checksum_file_path}': {e}", 'red'))
print(
colored(
f"Error: Failed to read checksum file '{checksum_file_path}': {e}",
"red",
)
)
return False
@@ -100,16 +111,21 @@ def update_checksum(content: str, checksum_file_path: str) -> bool:
"""
try:
hasher = hashlib.sha256()
hasher.update(content.encode('utf-8'))
hasher.update(content.encode("utf-8"))
new_checksum = hasher.hexdigest()
with open(checksum_file_path, 'w') as f:
with open(checksum_file_path, "w") as f:
f.write(new_checksum)
logging.debug(f"Updated checksum for '{checksum_file_path}' to: {new_checksum}")
return True
except Exception as e:
logging.error(f"Failed to update checksum for '{checksum_file_path}': {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to update checksum for '{checksum_file_path}': {e}", 'red'))
print(
colored(
f"Error: Failed to update checksum for '{checksum_file_path}': {e}",
"red",
)
)
return False
@@ -129,11 +145,11 @@ def verify_and_update_checksum(file_path: str, checksum_file_path: str) -> bool:
return False
if verify_checksum(current_checksum, checksum_file_path):
print(colored(f"Checksum verification passed for '{file_path}'.", 'green'))
print(colored(f"Checksum verification passed for '{file_path}'.", "green"))
logging.info(f"Checksum verification passed for '{file_path}'.")
return True
else:
print(colored(f"Checksum verification failed for '{file_path}'.", 'red'))
print(colored(f"Checksum verification failed for '{file_path}'.", "red"))
logging.warning(f"Checksum verification failed for '{file_path}'.")
return False
@@ -154,13 +170,20 @@ def initialize_checksum(file_path: str, checksum_file_path: str) -> bool:
return False
try:
with open(checksum_file_path, 'w') as f:
with open(checksum_file_path, "w") as f:
f.write(checksum)
logging.debug(f"Initialized checksum file '{checksum_file_path}' with checksum: {checksum}")
print(colored(f"Initialized checksum for '{file_path}'.", 'green'))
logging.debug(
f"Initialized checksum file '{checksum_file_path}' with checksum: {checksum}"
)
print(colored(f"Initialized checksum for '{file_path}'.", "green"))
return True
except Exception as e:
logging.error(f"Failed to initialize checksum file '{checksum_file_path}': {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to initialize checksum file '{checksum_file_path}': {e}", 'red'))
print(
colored(
f"Error: Failed to initialize checksum file '{checksum_file_path}': {e}",
"red",
)
)
return False

View File

@@ -26,6 +26,7 @@ import traceback
# Instantiate the logger
logger = logging.getLogger(__name__)
@contextmanager
def lock_file(file_path: Path, lock_type: int) -> Generator[None, None, None]:
"""
@@ -44,14 +45,16 @@ def lock_file(file_path: Path, lock_type: int) -> Generator[None, None, None]:
SystemExit: Exits the program if the lock cannot be acquired.
"""
if lock_type not in (fcntl.LOCK_EX, fcntl.LOCK_SH):
logging.error(f"Invalid lock type: {lock_type}. Use fcntl.LOCK_EX or fcntl.LOCK_SH.")
print(colored("Error: Invalid lock type provided.", 'red'))
logging.error(
f"Invalid lock type: {lock_type}. Use fcntl.LOCK_EX or fcntl.LOCK_SH."
)
print(colored("Error: Invalid lock type provided.", "red"))
sys.exit(1)
file = None
try:
# Determine the mode based on whether the file exists
mode = 'rb+' if file_path.exists() else 'wb'
mode = "rb+" if file_path.exists() else "wb"
# Open the file
file = open(file_path, mode)
@@ -67,7 +70,12 @@ def lock_file(file_path: Path, lock_type: int) -> Generator[None, None, None]:
lock_type_str = "exclusive" if lock_type == fcntl.LOCK_EX else "shared"
logging.error(f"Failed to acquire {lock_type_str} lock on '{file_path}': {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: Failed to acquire {lock_type_str} lock on '{file_path}': {e}", 'red'))
print(
colored(
f"Error: Failed to acquire {lock_type_str} lock on '{file_path}': {e}",
"red",
)
)
sys.exit(1)
finally:
@@ -78,9 +86,16 @@ def lock_file(file_path: Path, lock_type: int) -> Generator[None, None, None]:
logging.debug(f"Lock released on '{file_path}'.")
except Exception as e:
lock_type_str = "exclusive" if lock_type == fcntl.LOCK_EX else "shared"
logging.warning(f"Failed to release {lock_type_str} lock on '{file_path}': {e}")
logging.warning(
f"Failed to release {lock_type_str} lock on '{file_path}': {e}"
)
logging.error(traceback.format_exc()) # Log full traceback
print(colored(f"Warning: Failed to release {lock_type_str} lock on '{file_path}': {e}", 'yellow'))
print(
colored(
f"Warning: Failed to release {lock_type_str} lock on '{file_path}': {e}",
"yellow",
)
)
finally:
# Close the file
try:
@@ -89,7 +104,12 @@ def lock_file(file_path: Path, lock_type: int) -> Generator[None, None, None]:
except Exception as e:
logging.warning(f"Failed to close file '{file_path}': {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(colored(f"Warning: Failed to close file '{file_path}': {e}", 'yellow'))
print(
colored(
f"Warning: Failed to close file '{file_path}': {e}",
"yellow",
)
)
@contextmanager

View File

@@ -16,6 +16,7 @@ from typing import Optional
# Instantiate the logger
logger = logging.getLogger(__name__)
def generate_fingerprint(seed_phrase: str, length: int = 16) -> Optional[str]:
"""
Generates a unique fingerprint from the provided seed phrase using SHA-256.
@@ -33,7 +34,7 @@ def generate_fingerprint(seed_phrase: str, length: int = 16) -> Optional[str]:
logger.debug(f"Normalized seed: {normalized_seed}")
# Compute SHA-256 hash
sha256_hash = hashlib.sha256(normalized_seed.encode('utf-8')).hexdigest()
sha256_hash = hashlib.sha256(normalized_seed.encode("utf-8")).hexdigest()
logger.debug(f"SHA-256 Hash: {sha256_hash}")
# Truncate to desired length

View File

@@ -14,6 +14,7 @@ from utils.fingerprint import generate_fingerprint
# Instantiate the logger
logger = logging.getLogger(__name__)
class FingerprintManager:
"""
FingerprintManager Class
@@ -31,7 +32,7 @@ class FingerprintManager:
app_dir (Path): The root application directory (e.g., ~/.seedpass).
"""
self.app_dir = app_dir
self.fingerprints_file = self.app_dir / 'fingerprints.json'
self.fingerprints_file = self.app_dir / "fingerprints.json"
self._ensure_app_directory()
self.fingerprints = self._load_fingerprints()
self.current_fingerprint: Optional[str] = None
@@ -43,7 +44,7 @@ class FingerprintManager:
Returns:
Optional[Path]: The Path object of the current fingerprint directory or None.
"""
if hasattr(self, 'current_fingerprint') and self.current_fingerprint:
if hasattr(self, "current_fingerprint") and self.current_fingerprint:
return self.get_fingerprint_directory(self.current_fingerprint)
else:
logger.error("No current fingerprint is set.")
@@ -57,7 +58,9 @@ class FingerprintManager:
self.app_dir.mkdir(parents=True, exist_ok=True)
logger.debug(f"Application directory ensured at {self.app_dir}")
except Exception as e:
logger.error(f"Failed to create application directory at {self.app_dir}: {e}")
logger.error(
f"Failed to create application directory at {self.app_dir}: {e}"
)
logger.error(traceback.format_exc())
raise
@@ -70,13 +73,15 @@ class FingerprintManager:
"""
try:
if self.fingerprints_file.exists():
with open(self.fingerprints_file, 'r') as f:
with open(self.fingerprints_file, "r") as f:
data = json.load(f)
fingerprints = data.get('fingerprints', [])
fingerprints = data.get("fingerprints", [])
logger.debug(f"Loaded fingerprints: {fingerprints}")
return fingerprints
else:
logger.debug("fingerprints.json not found. Initializing empty fingerprint list.")
logger.debug(
"fingerprints.json not found. Initializing empty fingerprint list."
)
return []
except Exception as e:
logger.error(f"Failed to load fingerprints: {e}")
@@ -88,8 +93,8 @@ class FingerprintManager:
Saves the current list of fingerprints to the fingerprints.json file.
"""
try:
with open(self.fingerprints_file, 'w') as f:
json.dump({'fingerprints': self.fingerprints}, f, indent=4)
with open(self.fingerprints_file, "w") as f:
json.dump({"fingerprints": self.fingerprints}, f, indent=4)
logger.debug(f"Fingerprints saved: {self.fingerprints}")
except Exception as e:
logger.error(f"Failed to save fingerprints: {e}")
@@ -140,7 +145,7 @@ class FingerprintManager:
# Remove fingerprint directory
fingerprint_dir = self.app_dir / fingerprint
if fingerprint_dir.exists() and fingerprint_dir.is_dir():
for child in fingerprint_dir.glob('*'):
for child in fingerprint_dir.glob("*"):
if child.is_file():
child.unlink()
elif child.is_dir():

View File

@@ -29,6 +29,7 @@ colorama_init()
# Instantiate the logger
logger = logging.getLogger(__name__)
def prompt_new_password() -> str:
"""
Prompts the user to enter and confirm a new password for encrypting the parent seed.
@@ -51,39 +52,50 @@ def prompt_new_password() -> str:
confirm_password = getpass.getpass(prompt="Confirm your password: ").strip()
if not password:
print(colored("Error: Password cannot be empty. Please try again.", 'red'))
print(
colored("Error: Password cannot be empty. Please try again.", "red")
)
logging.warning("User attempted to enter an empty password.")
attempts += 1
continue
if len(password) < MIN_PASSWORD_LENGTH:
print(colored(f"Error: Password must be at least {MIN_PASSWORD_LENGTH} characters long.", 'red'))
logging.warning(f"User entered a password shorter than {MIN_PASSWORD_LENGTH} characters.")
print(
colored(
f"Error: Password must be at least {MIN_PASSWORD_LENGTH} characters long.",
"red",
)
)
logging.warning(
f"User entered a password shorter than {MIN_PASSWORD_LENGTH} characters."
)
attempts += 1
continue
if password != confirm_password:
print(colored("Error: Passwords do not match. Please try again.", 'red'))
print(
colored("Error: Passwords do not match. Please try again.", "red")
)
logging.warning("User entered mismatching passwords.")
attempts += 1
continue
# Normalize the password to NFKD form
normalized_password = unicodedata.normalize('NFKD', password)
normalized_password = unicodedata.normalize("NFKD", password)
logging.debug("User entered a valid and confirmed password.")
return normalized_password
except KeyboardInterrupt:
print(colored("\nOperation cancelled by user.", 'yellow'))
print(colored("\nOperation cancelled by user.", "yellow"))
logging.info("Password prompt interrupted by user.")
sys.exit(0)
except Exception as e:
logging.error(f"Unexpected error during password prompt: {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: {e}", 'red'))
print(colored(f"Error: {e}", "red"))
attempts += 1
print(colored("Maximum password attempts exceeded. Exiting.", 'red'))
print(colored("Maximum password attempts exceeded. Exiting.", "red"))
logging.error("User failed to provide a valid password after multiple attempts.")
sys.exit(1)
@@ -107,27 +119,29 @@ def prompt_existing_password(prompt_message: str = "Enter your password: ") -> s
password = getpass.getpass(prompt=prompt_message).strip()
if not password:
print(colored("Error: Password cannot be empty.", 'red'))
print(colored("Error: Password cannot be empty.", "red"))
logging.warning("User attempted to enter an empty password.")
sys.exit(1)
# Normalize the password to NFKD form
normalized_password = unicodedata.normalize('NFKD', password)
normalized_password = unicodedata.normalize("NFKD", password)
logging.debug("User entered an existing password for decryption.")
return normalized_password
except KeyboardInterrupt:
print(colored("\nOperation cancelled by user.", 'yellow'))
print(colored("\nOperation cancelled by user.", "yellow"))
logging.info("Existing password prompt interrupted by user.")
sys.exit(0)
except Exception as e:
logging.error(f"Unexpected error during existing password prompt: {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: {e}", 'red'))
print(colored(f"Error: {e}", "red"))
sys.exit(1)
def confirm_action(prompt_message: str = "Are you sure you want to proceed? (Y/N): ") -> bool:
def confirm_action(
prompt_message: str = "Are you sure you want to proceed? (Y/N): ",
) -> bool:
"""
Prompts the user to confirm an action, typically used before performing critical operations.
@@ -143,24 +157,24 @@ def confirm_action(prompt_message: str = "Are you sure you want to proceed? (Y/N
"""
try:
while True:
response = input(colored(prompt_message, 'cyan')).strip().lower()
if response in ['y', 'yes']:
response = input(colored(prompt_message, "cyan")).strip().lower()
if response in ["y", "yes"]:
logging.debug("User confirmed the action.")
return True
elif response in ['n', 'no']:
elif response in ["n", "no"]:
logging.debug("User declined the action.")
return False
else:
print(colored("Please respond with 'Y' or 'N'.", 'yellow'))
print(colored("Please respond with 'Y' or 'N'.", "yellow"))
except KeyboardInterrupt:
print(colored("\nOperation cancelled by user.", 'yellow'))
print(colored("\nOperation cancelled by user.", "yellow"))
logging.info("Action confirmation interrupted by user.")
sys.exit(0)
except Exception as e:
logging.error(f"Unexpected error during action confirmation: {e}")
logging.error(traceback.format_exc()) # Log full traceback
print(colored(f"Error: {e}", 'red'))
print(colored(f"Error: {e}", "red"))
sys.exit(1)

View File

@@ -0,0 +1,19 @@
import sys
from pathlib import Path
from tempfile import TemporaryDirectory
from cryptography.fernet import Fernet
sys.path.append(str(Path(__file__).resolve().parents[1]))
from password_manager.encryption import EncryptionManager
from password_manager.entry_management import EntryManager
def test_list_entries_empty():
with TemporaryDirectory() as tmpdir:
key = Fernet.generate_key()
enc_mgr = EncryptionManager(key, Path(tmpdir))
entry_mgr = EntryManager(enc_mgr, Path(tmpdir))
entries = entry_mgr.list_entries()
assert entries == []

31
tests/test_entry_add.py Normal file
View File

@@ -0,0 +1,31 @@
import sys
from pathlib import Path
from tempfile import TemporaryDirectory
from cryptography.fernet import Fernet
sys.path.append(str(Path(__file__).resolve().parents[1]))
from password_manager.encryption import EncryptionManager
from password_manager.entry_management import EntryManager
def test_add_and_retrieve_entry():
with TemporaryDirectory() as tmpdir:
key = Fernet.generate_key()
enc_mgr = EncryptionManager(key, Path(tmpdir))
entry_mgr = EntryManager(enc_mgr, Path(tmpdir))
index = entry_mgr.add_entry("example.com", 12, "user")
entry = entry_mgr.retrieve_entry(index)
assert entry == {
"website": "example.com",
"length": 12,
"username": "user",
"url": "",
"blacklisted": False,
}
data = enc_mgr.load_json_data(entry_mgr.index_file)
assert str(index) in data.get("passwords", {})
assert data["passwords"][str(index)] == entry

View File

@@ -0,0 +1,39 @@
import sys
from pathlib import Path
from tempfile import TemporaryDirectory
from unittest.mock import patch
from cryptography.fernet import Fernet
sys.path.append(str(Path(__file__).resolve().parents[1]))
from password_manager.encryption import EncryptionManager
from password_manager.entry_management import EntryManager
from nostr.client import NostrClient
def test_backup_and_publish_to_nostr():
with TemporaryDirectory() as tmpdir:
tmp_path = Path(tmpdir)
key = Fernet.generate_key()
enc_mgr = EncryptionManager(key, tmp_path)
entry_mgr = EntryManager(enc_mgr, tmp_path)
# create an index by adding an entry
entry_mgr.add_entry("example.com", 12)
encrypted_index = entry_mgr.get_encrypted_index()
assert encrypted_index is not None
with patch(
"nostr.client.NostrClient.publish_json_to_nostr"
) as mock_publish, patch("nostr.client.ClientPool"), patch(
"nostr.client.KeyManager"
), patch.object(
NostrClient, "initialize_client_pool"
), patch.object(
enc_mgr, "decrypt_parent_seed", return_value="seed"
):
nostr_client = NostrClient(enc_mgr, "fp")
entry_mgr.backup_index_file()
nostr_client.publish_json_to_nostr(encrypted_index)
mock_publish.assert_called_with(encrypted_index)

52
tests/test_profiles.py Normal file
View File

@@ -0,0 +1,52 @@
import sys
from pathlib import Path
from tempfile import TemporaryDirectory
sys.path.append(str(Path(__file__).resolve().parents[1]))
from utils.fingerprint_manager import FingerprintManager
from password_manager.manager import PasswordManager
VALID_SEED = "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about"
def test_add_and_switch_fingerprint(monkeypatch):
with TemporaryDirectory() as tmpdir:
app_dir = Path(tmpdir)
fm = FingerprintManager(app_dir)
fingerprint = fm.add_fingerprint(VALID_SEED)
assert fingerprint in fm.list_fingerprints()
expected_dir = app_dir / fingerprint
assert expected_dir.exists()
pm = PasswordManager.__new__(PasswordManager)
pm.fingerprint_manager = fm
pm.encryption_manager = object()
pm.current_fingerprint = None
monkeypatch.setattr("builtins.input", lambda *_args, **_kwargs: "1")
monkeypatch.setattr(
"password_manager.manager.prompt_existing_password",
lambda *_a, **_k: "pass",
)
monkeypatch.setattr(
PasswordManager,
"setup_encryption_manager",
lambda self, d, password=None: None,
)
monkeypatch.setattr(PasswordManager, "load_parent_seed", lambda self, d: None)
monkeypatch.setattr(PasswordManager, "initialize_bip85", lambda self: None)
monkeypatch.setattr(PasswordManager, "initialize_managers", lambda self: None)
monkeypatch.setattr(
PasswordManager, "sync_index_from_nostr_if_missing", lambda self: None
)
monkeypatch.setattr(
"password_manager.manager.NostrClient", lambda *a, **kw: object()
)
assert pm.handle_switch_fingerprint()
assert pm.current_fingerprint == fingerprint
assert fm.current_fingerprint == fingerprint
assert pm.fingerprint_dir == expected_dir

24
tests/test_seed_import.py Normal file
View File

@@ -0,0 +1,24 @@
import sys
from pathlib import Path
from tempfile import TemporaryDirectory
from cryptography.fernet import Fernet
from mnemonic import Mnemonic
sys.path.append(str(Path(__file__).resolve().parents[1]))
from password_manager.encryption import EncryptionManager
from password_manager.manager import PasswordManager
def test_seed_encryption_round_trip():
with TemporaryDirectory() as tmpdir:
key = Fernet.generate_key()
enc_mgr = EncryptionManager(key, Path(tmpdir))
seed = Mnemonic("english").generate(strength=128)
enc_mgr.encrypt_parent_seed(seed)
decrypted = enc_mgr.decrypt_parent_seed()
assert decrypted == seed
pm = PasswordManager.__new__(PasswordManager)
assert pm.validate_bip85_seed(seed)