This commit is contained in:
thePR0M3TH3AN
2024-10-23 23:00:22 -04:00
parent d8aff057b7
commit c60ae6b442
11 changed files with 416 additions and 255 deletions

View File

@@ -3,12 +3,19 @@
import logging
import traceback
from .logging_config import configure_logging
# Configure logging at the start of the module
configure_logging()
# Initialize the logger for this module
logger = logging.getLogger(__name__) # Correct logger initialization
try:
from .client import NostrClient
logging.info("NostrClient module imported successfully.")
logger.info("NostrClient module imported successfully.")
except Exception as e:
logging.error(f"Failed to import NostrClient module: {e}")
logging.error(traceback.format_exc()) # Log full traceback
logger.error(f"Failed to import NostrClient module: {e}")
logger.error(traceback.format_exc()) # Log full traceback
__all__ = ['NostrClient']

View File

@@ -27,7 +27,8 @@ from .event_handler import EventHandler
from constants import APP_DIR, INDEX_FILE, DATA_CHECKSUM_FILE
from utils.file_lock import lock_file
logger = configure_logging()
configure_logging()
logger = logging.getLogger(__name__)
DEFAULT_RELAYS = [
"wss://relay.snort.social",

View File

@@ -1,58 +1,123 @@
# nostr/encryption_manager.py
import base64
import json
import logging
import traceback
from cryptography.fernet import Fernet, InvalidToken
from .logging_config import configure_logging
from .key_manager import KeyManager
from monstr.encrypt import NIP4Encrypt # Add if used
logger = configure_logging()
# Configure logging at the start of the module
configure_logging()
# Initialize the logger for this module
logger = logging.getLogger(__name__)
class EncryptionManager:
"""
Handles encryption and decryption of data using Fernet symmetric encryption.
Manages encryption and decryption using Fernet symmetric encryption.
"""
def __init__(self, key_manager: KeyManager):
self.key_manager = key_manager
self.fernet = Fernet(self.key_manager.derive_encryption_key())
"""
Initializes the EncryptionManager with a Fernet instance.
:param key_manager: An instance of KeyManager to derive the encryption key.
"""
try:
# Derive the raw encryption key (32 bytes)
raw_key = key_manager.derive_encryption_key()
logger.debug(f"Derived raw encryption key length: {len(raw_key)} bytes")
# Ensure the raw key is exactly 32 bytes
if len(raw_key) != 32:
raise ValueError(f"Derived key length is {len(raw_key)} bytes; expected 32 bytes.")
# Base64-encode the raw key to make it URL-safe
b64_key = base64.urlsafe_b64encode(raw_key)
logger.debug(f"Base64-encoded encryption key length: {len(b64_key)} bytes")
# Initialize Fernet with the base64-encoded key
self.fernet = Fernet(b64_key)
logger.info("Fernet encryption manager initialized successfully.")
except Exception as e:
logger.error(f"EncryptionManager initialization failed: {e}")
logger.error(traceback.format_exc())
raise
def encrypt_parent_seed(self, seed: str, file_path: str) -> None:
"""
Encrypts the parent seed and saves it to the specified file.
:param seed: The BIP-39 seed phrase as a string.
:param file_path: The file path to save the encrypted seed.
"""
try:
encrypted_seed = self.fernet.encrypt(seed.encode('utf-8'))
with open(file_path, 'wb') as f:
f.write(encrypted_seed)
logger.debug(f"Parent seed encrypted and saved to '{file_path}'.")
except Exception as e:
logger.error(f"Failed to encrypt and save parent seed: {e}")
logger.error(traceback.format_exc())
raise
def decrypt_parent_seed(self, file_path: str) -> str:
"""
Decrypts the parent seed from the specified file.
:param file_path: The file path to read the encrypted seed.
:return: The decrypted parent seed as a string.
"""
try:
with open(file_path, 'rb') as f:
encrypted_seed = f.read()
decrypted_seed = self.fernet.decrypt(encrypted_seed).decode('utf-8')
logger.debug(f"Parent seed decrypted successfully from '{file_path}'.")
return decrypted_seed
except InvalidToken:
logger.error("Decryption failed: Invalid token. Possibly incorrect password or corrupted file.")
raise ValueError("Decryption failed: Invalid token. Possibly incorrect password or corrupted file.")
except Exception as e:
logger.error(f"Failed to decrypt parent seed: {e}")
logger.error(traceback.format_exc())
raise
def encrypt_data(self, data: dict) -> bytes:
"""
Encrypts a dictionary and returns encrypted bytes.
Encrypts a dictionary by serializing it to JSON and then encrypting it.
:param data: The data to encrypt.
:param data: The dictionary to encrypt.
:return: Encrypted data as bytes.
"""
try:
json_data = json.dumps(data, indent=4).encode('utf-8')
encrypted_data = self.fernet.encrypt(json_data)
json_data = json.dumps(data).encode('utf-8')
encrypted = self.fernet.encrypt(json_data)
logger.debug("Data encrypted successfully.")
return encrypted_data
return encrypted
except Exception as e:
logger.error(f"Failed to encrypt data: {e}")
logger.error(f"Data encryption failed: {e}")
logger.error(traceback.format_exc())
raise
def decrypt_data(self, encrypted_data: bytes) -> bytes:
"""
Decrypts encrypted bytes and returns the original data.
Decrypts encrypted data.
:param encrypted_data: The encrypted data to decrypt.
:param encrypted_data: The encrypted data as bytes.
:return: Decrypted data as bytes.
"""
try:
decrypted_data = self.fernet.decrypt(encrypted_data)
decrypted = self.fernet.decrypt(encrypted_data)
logger.debug("Data decrypted successfully.")
return decrypted_data
except InvalidToken:
logger.error("Invalid encryption key or corrupted data.")
raise
except Exception as e:
logger.error(f"Error decrypting data: {e}")
return decrypted
except InvalidToken as e:
logger.error(f"Decryption failed: Invalid token. {e}")
logger.error(traceback.format_exc())
raise
except Exception as e:
logger.error(f"Data decryption failed: {e}")
logger.error(traceback.format_exc())
raise

View File

@@ -1,30 +1,23 @@
# nostr/key_manager.py
import base64
import logging
import traceback
from typing import Optional
from bip_utils import Bip39SeedGenerator
from bip85.bip85 import BIP85
from cryptography.fernet import Fernet, InvalidToken
from bech32 import bech32_encode, convertbits
from .logging_config import configure_logging
from utils.key_derivation import derive_key_from_parent_seed
# Add the missing import for Keys and NIP4Encrypt
from monstr.encrypt import Keys, NIP4Encrypt # Ensure monstr.encrypt is installed and accessible
logger = configure_logging()
# Configure logging at the start of the module
configure_logging()
# Initialize the logger for this module
logger = logging.getLogger(__name__)
def encode_bech32(prefix: str, key_hex: str) -> str:
"""
Encodes a hex key into Bech32 format with the given prefix.
:param prefix: The Bech32 prefix (e.g., 'nsec', 'npub').
:param key_hex: The key in hexadecimal format.
:return: The Bech32-encoded string.
"""
try:
key_bytes = bytes.fromhex(key_hex)
data = convertbits(key_bytes, 8, 5, pad=True)
@@ -40,31 +33,30 @@ class KeyManager:
"""
def __init__(self, parent_seed: str):
self.parent_seed = parent_seed
self.keys = None
self.nsec = None
self.npub = None
self.initialize_keys()
def initialize_keys(self):
"""
Derives Nostr keys using BIP85 and initializes Keys.
Initializes the KeyManager with the provided parent_seed.
Parameters:
parent_seed (str): The parent seed used for key derivation.
"""
try:
logger.debug("Starting key initialization")
seed_bytes = Bip39SeedGenerator(self.parent_seed).Generate()
bip85 = BIP85(seed_bytes)
entropy = bip85.derive_entropy(app_no=1237, language_code=0, words_num=24, index=0)
if len(entropy) != 32:
logger.error(f"Derived entropy length is {len(entropy)} bytes; expected 32 bytes.")
raise ValueError("Invalid entropy length.")
privkey_hex = entropy.hex()
self.keys = Keys(priv_k=privkey_hex) # Now Keys is defined via the import
if not isinstance(parent_seed, str):
raise TypeError(f"Parent seed must be a string, got {type(parent_seed)}")
self.parent_seed = parent_seed
logger.debug(f"KeyManager initialized with parent_seed: {self.parent_seed} (type: {type(self.parent_seed)})")
# Derive the encryption key from parent_seed
derived_key = self.derive_encryption_key()
derived_key_hex = derived_key.hex()
logger.debug(f"Derived encryption key (hex): {derived_key_hex}")
# Initialize Keys with the derived hexadecimal key
self.keys = Keys(priv_k=derived_key_hex) # Pass hex string
logger.debug("Nostr Keys initialized successfully.")
self.nsec = encode_bech32('nsec', privkey_hex)
# Generate bech32-encoded keys
self.nsec = encode_bech32('nsec', self.keys.private_key_hex())
logger.debug(f"Nostr Private Key (nsec): {self.nsec}")
public_key_hex = self.keys.public_key_hex()
@@ -76,11 +68,34 @@ class KeyManager:
logger.error(traceback.format_exc())
raise
def derive_encryption_key(self) -> bytes:
"""
Derives the encryption key using the parent seed.
Returns:
bytes: The derived encryption key.
Raises:
Exception: If key derivation fails.
"""
try:
key = derive_key_from_parent_seed(self.parent_seed)
logger.debug("Encryption key derived successfully.")
return key # Now returns raw bytes
except Exception as e:
logger.error(f"Failed to derive encryption key: {e}")
logger.error(traceback.format_exc())
raise
def get_npub(self) -> str:
"""
Returns the Nostr public key (npub).
:return: The npub as a string.
Returns:
str: The npub as a string.
Raises:
ValueError: If npub is not available.
"""
if self.npub:
logger.debug(f"Returning npub: {self.npub}")
@@ -88,18 +103,3 @@ class KeyManager:
else:
logger.error("Nostr public key (npub) is not available.")
raise ValueError("Nostr public key (npub) is not available.")
def derive_encryption_key(self) -> bytes:
"""
Derives the encryption key using the parent seed.
:return: The derived encryption key.
"""
try:
key = derive_key_from_parent_seed(self.parent_seed)
logger.debug("Encryption key derived successfully.")
return key
except Exception as e:
logger.error(f"Failed to derive encryption key: {e}")
logger.error(traceback.format_exc())
raise

View File

@@ -1,39 +1,40 @@
# nostr/logging_config.py
import os
import sys
import logging
import os
def configure_logging(log_file='nostr.log'):
def configure_logging():
"""
Configures logging with both file and console handlers.
Logs include the timestamp, log level, message, filename, and line number.
Only ERROR and higher-level messages are shown in the terminal, while all messages
are logged in the log file.
"""
# Create the 'logs' folder if it doesn't exist
if not os.path.exists('logs'):
os.makedirs('logs')
# Create the custom logger
logger = logging.getLogger('nostr')
logger.setLevel(logging.DEBUG) # Set to DEBUG for detailed output
logger = logging.getLogger()
logger.setLevel(logging.DEBUG) # Set root logger to DEBUG
# Prevent adding multiple handlers if configure_logging is called multiple times
if not logger.handlers:
# Create handlers
c_handler = logging.StreamHandler(sys.stdout)
f_handler = logging.FileHandler(os.path.join('logs', log_file))
# Create the 'logs' folder if it doesn't exist
log_directory = 'logs'
if not os.path.exists(log_directory):
os.makedirs(log_directory)
# Set levels
# Create handlers
c_handler = logging.StreamHandler()
f_handler = logging.FileHandler(os.path.join(log_directory, 'app.log'))
# Set levels: only errors and critical messages will be shown in the console
c_handler.setLevel(logging.ERROR)
f_handler.setLevel(logging.DEBUG)
# Create formatters
formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]')
# Create formatters and add them to handlers, include file and line number in log messages
formatter = logging.Formatter(
'%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]'
)
c_handler.setFormatter(formatter)
f_handler.setFormatter(formatter)
# Add handlers to the logger
logger.addHandler(c_handler)
logger.addHandler(f_handler)
return logger