This commit is contained in:
thePR0M3TH3AN
2024-10-26 22:56:57 -04:00
parent 6090c38b92
commit 7d4eef2110
22 changed files with 1759 additions and 1152 deletions

View File

@@ -1,5 +1,3 @@
# nostr/client.py
import os
import sys
import logging
@@ -11,6 +9,7 @@ import hashlib
import asyncio
import concurrent.futures
from typing import List, Optional, Callable
from pathlib import Path
from monstr.client.client import ClientPool
from monstr.encrypt import Keys, NIP4Encrypt
@@ -18,24 +17,30 @@ from monstr.event.event import Event
import threading
import uuid
import fcntl # Ensure fcntl is imported for file locking
import fcntl
from .logging_config import configure_logging
from .key_manager import KeyManager
from .encryption_manager import EncryptionManager
from .event_handler import EventHandler
from constants import APP_DIR, INDEX_FILE, DATA_CHECKSUM_FILE
from constants import APP_DIR
from utils.file_lock import lock_file
configure_logging()
# Get the logger for this module
logger = logging.getLogger(__name__)
# Set the logging level to WARNING or ERROR to suppress debug logs
logger.setLevel(logging.WARNING)
DEFAULT_RELAYS = [
"wss://relay.snort.social",
"wss://nostr.oxtr.dev",
"wss://nostr-relay.wlvs.space"
"wss://relay.primal.net"
]
# nostr/client.py
# src/nostr/client.py
class NostrClient:
"""
NostrClient Class
@@ -44,25 +49,38 @@ class NostrClient:
Utilizes deterministic key derivation via BIP-85 and integrates with the monstr library for protocol operations.
"""
def __init__(self, parent_seed: str, relays: Optional[List[str]] = None):
def __init__(self, encryption_manager: EncryptionManager, fingerprint: str, relays: Optional[List[str]] = None):
"""
Initializes the NostrClient with a parent seed and connects to specified relays.
Initializes the NostrClient with an EncryptionManager, connects to specified relays,
and sets up the KeyManager with the given fingerprint.
:param parent_seed: The BIP39 mnemonic seed phrase.
:param encryption_manager: An instance of EncryptionManager for handling encryption/decryption.
:param fingerprint: The fingerprint to differentiate key derivations for unique identities.
:param relays: (Optional) A list of relay URLs to connect to. Defaults to predefined relays.
"""
try:
self.key_manager = KeyManager(parent_seed)
self.encryption_manager = EncryptionManager(self.key_manager)
self.event_handler = EventHandler()
# Assign the encryption manager and fingerprint
self.encryption_manager = encryption_manager
self.fingerprint = fingerprint # Track the fingerprint
self.fingerprint_dir = self.encryption_manager.fingerprint_dir # If needed to manage directories
# Initialize KeyManager with the decrypted parent seed and the provided fingerprint
self.key_manager = KeyManager(
self.encryption_manager.decrypt_parent_seed(),
self.fingerprint
)
# Initialize event handler and client pool
self.event_handler = EventHandler()
self.relays = relays if relays else DEFAULT_RELAYS
self.client_pool = ClientPool(self.relays)
self.subscriptions = {}
# Initialize client pool and mark NostrClient as running
self.initialize_client_pool()
logger.info("NostrClient initialized successfully.")
# For shutdown handling
self.is_shutting_down = False
self._shutdown_event = asyncio.Event()
@@ -110,8 +128,9 @@ class NostrClient:
logger.error(traceback.format_exc())
print(f"Error: Event loop in ClientPool thread encountered an issue: {e}", file=sys.stderr)
finally:
logger.debug("Closing the event loop.")
self.loop.close()
if not self.loop.is_closed():
logger.debug("Closing the event loop.")
self.loop.close()
def wait_for_connection(self):
"""
@@ -134,6 +153,7 @@ class NostrClient:
logger.debug(f"Publishing event: {event.serialize()}")
self.client_pool.publish(event)
logger.info(f"Event published with ID: {event.id}")
logger.debug(f"Finished publishing event: {event.id}")
except Exception as e:
logger.error(f"Failed to publish event: {e}")
logger.error(traceback.format_exc())
@@ -145,7 +165,10 @@ class NostrClient:
:param event: The signed Event object to publish.
"""
try:
asyncio.run_coroutine_threadsafe(self.publish_event_async(event), self.loop)
logger.debug(f"Submitting publish_event_async for event ID: {event.id}")
future = asyncio.run_coroutine_threadsafe(self.publish_event_async(event), self.loop)
# Wait for the future to complete
future.result(timeout=5) # Adjust the timeout as needed
except Exception as e:
logger.error(f"Error in publish_event: {e}")
print(f"Error: Failed to publish event: {e}", file=sys.stderr)
@@ -180,11 +203,11 @@ class NostrClient:
logger.error(f"Error in subscribe: {e}")
print(f"Error: Failed to subscribe: {e}", file=sys.stderr)
async def retrieve_json_from_nostr_async(self) -> Optional[bytes]:
async def retrieve_json_from_nostr_async(self) -> Optional[str]:
"""
Retrieves the latest encrypted JSON event from Nostr.
:return: The encrypted JSON data as bytes, or None if retrieval fails.
:return: The encrypted JSON data as a Base64-encoded string, or None if retrieval fails.
"""
try:
filters = [{
@@ -203,6 +226,7 @@ class NostrClient:
await asyncio.sleep(2) # Adjust the sleep time as needed
# Unsubscribe from all subscriptions
for sub_id in list(self.subscriptions.keys()):
self.client_pool.unsubscribe(sub_id)
del self.subscriptions[sub_id]
@@ -210,15 +234,15 @@ class NostrClient:
if events:
event = events[0]
encrypted_json_b64 = event.content
content_base64 = event.content
if event.kind == Event.KIND_ENCRYPT:
nip4_encrypt = NIP4Encrypt(self.key_manager.keys)
encrypted_json_b64 = nip4_encrypt.decrypt_message(event.content, event.pub_key)
content_base64 = nip4_encrypt.decrypt_message(event.content, event.pub_key)
encrypted_json = base64.b64decode(encrypted_json_b64.encode('utf-8'))
# Return the Base64-encoded content as a string
logger.debug("Encrypted JSON data retrieved successfully.")
return encrypted_json
return content_base64
else:
logger.warning("No events found matching the filters.")
print("No events found matching the filters.", file=sys.stderr)
@@ -238,11 +262,15 @@ class NostrClient:
"""
try:
future = asyncio.run_coroutine_threadsafe(self.retrieve_json_from_nostr_async(), self.loop)
return future.result()
return future.result(timeout=10)
except concurrent.futures.TimeoutError:
logger.error("Timeout occurred while retrieving JSON from Nostr.")
print("Error: Timeout occurred while retrieving JSON from Nostr.", file=sys.stderr)
return None
except Exception as e:
logger.error(f"Error in retrieve_json_from_nostr: {e}")
logger.error(traceback.format_exc())
print(f"Error: Failed to retrieve JSON from Nostr: {e}", file=sys.stderr)
print(f"Error: Failed to retrieve JSON from Nostr: {e}", 'red')
return None
async def do_post_async(self, text: str):
@@ -263,7 +291,7 @@ class NostrClient:
logger.debug(f"Event data: {event.serialize()}")
await self.publish_event_async(event)
logger.debug("Finished do_post_async")
except Exception as e:
logger.error(f"An error occurred during publishing: {e}", exc_info=True)
print(f"Error: An error occurred during publishing: {e}", file=sys.stderr)
@@ -284,8 +312,7 @@ class NostrClient:
await self.subscribe_async(filters=filters, handler=handler)
logger.info("Subscribed to your feed.")
while True:
await asyncio.sleep(1)
# Removed the infinite loop to prevent blocking
except Exception as e:
logger.error(f"An error occurred during subscription: {e}", exc_info=True)
@@ -330,11 +357,11 @@ class NostrClient:
self.save_json_data(data)
self.update_checksum()
logger.info("Index file updated from Nostr successfully.")
print("Index file updated from Nostr successfully.", file=sys.stdout)
print(colored("Index file updated from Nostr successfully.", 'green'))
except Exception as e:
logger.error(f"Failed to decrypt and save data from Nostr: {e}")
logger.error(traceback.format_exc())
print(f"Error: Failed to decrypt and save data from Nostr: {e}", file=sys.stderr)
print(colored(f"Error: Failed to decrypt and save data from Nostr: {e}", 'red'))
def save_json_data(self, data: dict) -> None:
"""
@@ -343,16 +370,17 @@ class NostrClient:
:param data: The JSON data to save.
"""
try:
encrypted_data = self.encryption_manager.encrypt_data(data)
with lock_file(INDEX_FILE, fcntl.LOCK_EX):
with open(INDEX_FILE, 'wb') as f:
encrypted_data = self.encryption_manager.encrypt_data(json.dumps(data).encode('utf-8'))
index_file_path = self.fingerprint_dir / 'seedpass_passwords_db.json.enc'
with lock_file(index_file_path, fcntl.LOCK_EX):
with open(index_file_path, 'wb') as f:
f.write(encrypted_data)
logger.debug(f"Encrypted data saved to {INDEX_FILE}.")
print(f"Encrypted data saved to {INDEX_FILE}.", file=sys.stdout)
logger.debug(f"Encrypted data saved to {index_file_path}.")
print(colored(f"Encrypted data saved to '{index_file_path}'.", 'green'))
except Exception as e:
logger.error(f"Failed to save encrypted data: {e}")
logger.error(traceback.format_exc())
print(f"Error: Failed to save encrypted data: {e}", file=sys.stderr)
print(colored(f"Error: Failed to save encrypted data: {e}", 'red'))
raise
def update_checksum(self) -> None:
@@ -360,28 +388,34 @@ class NostrClient:
Updates the checksum file for the password database.
"""
try:
decrypted_data = self.decrypt_data_from_file(INDEX_FILE)
index_file_path = self.fingerprint_dir / 'seedpass_passwords_db.json.enc'
decrypted_data = self.decrypt_data_from_file(index_file_path)
content = decrypted_data.decode('utf-8')
logger.debug("Calculating checksum of the updated file content.")
checksum = hashlib.sha256(content.encode('utf-8')).hexdigest()
logger.debug(f"New checksum: {checksum}")
with open(DATA_CHECKSUM_FILE, 'w') as f:
f.write(checksum)
logger.debug(f"Updated data checksum written to '{DATA_CHECKSUM_FILE}'.")
print("[+] Checksum updated successfully.", file=sys.stdout)
checksum_file = self.fingerprint_dir / 'seedpass_passwords_db_checksum.txt'
with lock_file(checksum_file, fcntl.LOCK_EX):
with open(checksum_file, 'w') as f:
f.write(checksum)
os.chmod(checksum_file, 0o600)
logger.debug(f"Checksum for '{index_file_path}' updated and written to '{checksum_file}'.")
print(colored(f"Checksum for '{index_file_path}' updated.", 'green'))
except Exception as e:
logger.error(f"Failed to update checksum: {e}")
logger.error(traceback.format_exc())
print(f"Error: Failed to update checksum: {e}", file=sys.stderr)
print(colored(f"Error: Failed to update checksum: {e}", 'red'))
def decrypt_data_from_file(self, file_path: str) -> bytes:
def decrypt_data_from_file(self, file_path: Path) -> bytes:
"""
Decrypts data directly from a file.
:param file_path: Path to the encrypted file.
:param file_path: Path to the encrypted file as a Path object.
:return: Decrypted data as bytes.
"""
try:
@@ -394,7 +428,7 @@ class NostrClient:
except Exception as e:
logger.error(f"Failed to decrypt data from file '{file_path}': {e}")
logger.error(traceback.format_exc())
print(f"Error: Failed to decrypt data from file '{file_path}': {e}", file=sys.stderr)
print(colored(f"Error: Failed to decrypt data from file '{file_path}': {e}", 'red'))
raise
def publish_json_to_nostr(self, encrypted_json: bytes, to_pubkey: str = None):
@@ -431,16 +465,31 @@ class NostrClient:
def retrieve_json_from_nostr_sync(self) -> Optional[bytes]:
"""
Public method to retrieve encrypted JSON from Nostr.
Retrieves encrypted data from Nostr and Base64-decodes it.
:return: The encrypted JSON data as bytes, or None if retrieval fails.
Returns:
Optional[bytes]: The encrypted data as bytes if successful, None otherwise.
"""
try:
return self.retrieve_json_from_nostr()
future = asyncio.run_coroutine_threadsafe(self.retrieve_json_from_nostr_async(), self.loop)
content_base64 = future.result(timeout=10)
if not content_base64:
logger.debug("No data retrieved from Nostr.")
return None
# Base64-decode the content
encrypted_data = base64.urlsafe_b64decode(content_base64.encode('utf-8'))
logger.debug("Encrypted data retrieved and Base64-decoded successfully from Nostr.")
return encrypted_data
except concurrent.futures.TimeoutError:
logger.error("Timeout occurred while retrieving JSON from Nostr.")
print("Error: Timeout occurred while retrieving JSON from Nostr.", file=sys.stderr)
return None
except Exception as e:
logger.error(f"Error in retrieve_json_from_nostr_sync: {e}")
logger.error(f"Error in retrieve_json_from_nostr: {e}")
logger.error(traceback.format_exc())
print(f"Error: Failed to retrieve JSON from Nostr: {e}", file=sys.stderr)
print(f"Error: Failed to retrieve JSON from Nostr: {e}", 'red')
return None
def decrypt_and_save_index_from_nostr_public(self, encrypted_data: bytes) -> None:
@@ -453,7 +502,7 @@ class NostrClient:
self.decrypt_and_save_index_from_nostr(encrypted_data)
except Exception as e:
logger.error(f"Failed to decrypt and save index from Nostr: {e}")
print(f"Error: Failed to decrypt and save index from Nostr: {e}", file=sys.stderr)
print(f"Error: Failed to decrypt and save index from Nostr: {e}", 'red')
async def close_client_pool_async(self):
"""
@@ -481,12 +530,8 @@ class NostrClient:
# Close all WebSocket connections
if hasattr(self.client_pool, 'clients'):
for client in self.client_pool.clients:
try:
await client.close()
logger.debug(f"Closed connection to relay: {client.url}")
except Exception as e:
logger.warning(f"Error closing connection to {client.url}: {e}")
tasks = [self.safe_close_connection(client) for client in self.client_pool.clients]
await asyncio.gather(*tasks, return_exceptions=True)
# Gather and cancel all tasks
current_task = asyncio.current_task()
@@ -526,7 +571,7 @@ class NostrClient:
# Schedule the coroutine to close the client pool
future = asyncio.run_coroutine_threadsafe(self.close_client_pool_async(), self.loop)
# Wait for the coroutine to finish with a shorter timeout
# Wait for the coroutine to finish with a timeout
try:
future.result(timeout=10)
except concurrent.futures.TimeoutError:
@@ -534,13 +579,13 @@ class NostrClient:
# Additional cleanup regardless of timeout
try:
self.loop.stop()
self.loop.call_soon_threadsafe(self.loop.stop)
# Give a short grace period for the loop to stop
time.sleep(0.5)
if self.loop.is_running():
logger.warning("Loop still running after stop, closing forcefully")
self.loop.close()
self.loop.call_soon_threadsafe(self.loop.close)
# Wait for the thread with a reasonable timeout
if self.loop_thread.is_alive():
@@ -559,3 +604,12 @@ class NostrClient:
logger.error(traceback.format_exc())
finally:
self.is_shutting_down = False
async def safe_close_connection(self, client):
try:
await client.close_connection()
logger.debug(f"Closed connection to relay: {client.url}")
except AttributeError:
logger.warning(f"Client object has no attribute 'close_connection'. Skipping closure for {client.url}.")
except Exception as e:
logger.warning(f"Error closing connection to {client.url}: {e}")