161 KiB
Okay, this is a significant refactoring effort! Let's break it down and implement the changes based on your plan and the provided code.
Phase 1: Create New Files and Basic Structures
1. Create password_manager/kinds.py
:
# password_manager/kinds.py
import logging
from typing import Dict, Callable, List, Any
from termcolor import colored
# Forward declaration for type hinting if handlers need PasswordManager instance later
# class PasswordManager: pass
# from .encryption import EncryptionManager
logger = logging.getLogger(__name__)
# Placeholder handlers - will be imported properly later
def handle_generated_password(entry_data: Dict[str, Any], fingerprint: str, **kwargs):
logger.warning("Placeholder handler called for generated_password")
print(colored(f"Processing Generated Password (Placeholder): {entry_data.get('title')}", "grey"))
def handle_stored_password(entry_data: Dict[str, Any], fingerprint: str, **kwargs):
logger.warning("Placeholder handler called for stored_password")
print(colored(f"Processing Stored Password (Placeholder): {entry_data.get('title')}", "grey"))
def handle_note(entry_data: Dict[str, Any], fingerprint: str, **kwargs):
logger.warning("Placeholder handler called for note")
print(colored(f"Processing Note (Placeholder): Content length {len(entry_data.get('content', ''))}", "grey"))
# --- Actual KINDS Definition ---
# We'll import real handlers after creating them.
# Define the structure for kinds. Each kind maps to:
# - handler: The function to process/display the entry data.
# - description: User-friendly description for menus.
# - fields: List of expected keys within the 'data' part of an entry.
# - nostr_kind: The Nostr event kind used for this entry type.
# - identifier_tag: The Nostr tag ('d' tag) value prefix for this entry type.
KINDS: Dict[str, Dict[str, Any]] = {
"generated_password": {
"handler": handle_generated_password, # Placeholder
"description": "Generated Password (using BIP-85 index)",
"fields": ["title", "username", "email", "url", "length", "bip85_index"], # Note: password is not stored, bip85_index is key
"nostr_kind": 31111, # Example custom kind for SeedPass entries
"identifier_tag": "seedpass_gp_" # gp for generated password
},
"stored_password": {
"handler": handle_stored_password, # Placeholder
"description": "Stored Password / Credential",
"fields": ["title", "username", "password", "url", "notes"], # Password stored encrypted in 'data'
"nostr_kind": 31111,
"identifier_tag": "seedpass_sp_" # sp for stored password
},
"note": {
"handler": handle_note, # Placeholder
"description": "Secure Note",
"fields": ["title", "content", "tags"],
"nostr_kind": 31111,
"identifier_tag": "seedpass_note_"
},
# Add new kinds here in the future
}
def get_kind_details(kind_name: str) -> Optional[Dict[str, Any]]:
"""Safely retrieves details for a given kind."""
return KINDS.get(kind_name)
def get_all_kinds() -> List[str]:
"""Returns a list of all defined kind names."""
return list(KINDS.keys())
def get_nostr_kind(kind_name: str) -> Optional[int]:
"""Gets the Nostr event kind for a SeedPass kind."""
details = get_kind_details(kind_name)
return details.get("nostr_kind") if details else None
def get_identifier_tag_prefix(kind_name: str) -> Optional[str]:
"""Gets the 'd' tag prefix for a SeedPass kind."""
details = get_kind_details(kind_name)
return details.get("identifier_tag") if details else None
def get_required_fields(kind_name: str) -> List[str]:
"""Gets the list of required fields for a SeedPass kind."""
details = get_kind_details(kind_name)
return details.get("fields", []) if details else []
def get_kind_handler(kind_name: str) -> Optional[Callable]:
"""Gets the handler function for a SeedPass kind."""
details = get_kind_details(kind_name)
return details.get("handler") if details else None
2. Create password_manager/handlers/
directory and __init__.py
:
mkdir -p password_manager/handlers
touch password_manager/handlers/__init__.py
3. Create Handler Files:
password_manager/handlers/generated_password_handler.py
:# password_manager/handlers/generated_password_handler.py import logging from typing import Dict, Any from termcolor import colored # Avoid circular import - PasswordManager/EncryptionManager likely passed in kwargs # from ..manager import PasswordManager # from ..encryption import EncryptionManager # from ..password_generation import PasswordGenerator logger = logging.getLogger(__name__) def handle_generated_password(entry_data: Dict[str, Any], fingerprint: str, **kwargs): """Handles processing/displaying a generated password entry.""" # Expect PasswordGenerator instance in kwargs for actual generation password_generator = kwargs.get("password_generator") if not password_generator: logger.error("PasswordGenerator not provided to generated_password handler.") print(colored("Error: Cannot process generated password - internal setup issue.", "red")) return title = entry_data.get("title", "N/A") username = entry_data.get("username", "") email = entry_data.get("email", "") url = entry_data.get("url", "") length = entry_data.get("length") bip85_index = entry_data.get("bip85_index") if length is None or bip85_index is None: logger.error(f"Missing length or bip85_index for generated password entry: {title}") print(colored(f"Error: Incomplete data for generated password '{title}'.", "red")) return try: # Regenerate the password on the fly password = password_generator.generate_password(length=length, index=bip85_index) print(colored(f"--- Generated Password Entry ---", "cyan")) print(colored(f" Title: {title}", "cyan")) if username: print(colored(f" Username: {username}", "cyan")) if email: print(colored(f" Email: {email}", "cyan")) if url: print(colored(f" URL: {url}", "cyan")) print(colored(f" Length: {length}", "cyan")) print(colored(f" Index: {bip85_index}", "cyan")) print(colored(f" Password: {password}", "yellow")) # Display generated password print(colored(f"--------------------------------", "cyan")) except Exception as e: logger.error(f"Failed to generate password for entry {title}: {e}", exc_info=True) print(colored(f"Error generating password for '{title}': {e}", "red"))
password_manager/handlers/stored_password_handler.py
:# password_manager/handlers/stored_password_handler.py import logging from typing import Dict, Any from termcolor import colored # from ..encryption import EncryptionManager # Passed in kwargs logger = logging.getLogger(__name__) def handle_stored_password(entry_data: Dict[str, Any], fingerprint: str, **kwargs): """Handles processing/displaying a stored password entry.""" encryption_manager = kwargs.get("encryption_manager") if not encryption_manager: logger.error("EncryptionManager not provided to stored_password handler.") print(colored("Error: Cannot process stored password - internal setup issue.", "red")) return title = entry_data.get("title", "N/A") username = entry_data.get("username", "") encrypted_password_b64 = entry_data.get("password") # Expecting base64 encoded encrypted bytes url = entry_data.get("url", "") notes = entry_data.get("notes", "") if not encrypted_password_b64: logger.error(f"Missing encrypted password for stored password entry: {title}") print(colored(f"Error: Incomplete data for stored password '{title}'.", "red")) return try: # Decode from base64 then decrypt import base64 encrypted_password_bytes = base64.b64decode(encrypted_password_b64) password = encryption_manager.decrypt_data(encrypted_password_bytes).decode('utf-8') print(colored(f"--- Stored Password Entry ---", "cyan")) print(colored(f" Title: {title}", "cyan")) if username: print(colored(f" Username: {username}", "cyan")) if url: print(colored(f" URL: {url}", "cyan")) if notes: print(colored(f" Notes: {notes}", "cyan")) print(colored(f" Password: {password}", "yellow")) # Display decrypted password print(colored(f"-----------------------------", "cyan")) except Exception as e: logger.error(f"Failed to decrypt stored password for entry {title}: {e}", exc_info=True) print(colored(f"Error decrypting password for '{title}': {e}", "red"))
password_manager/handlers/note_handler.py
:# password_manager/handlers/note_handler.py import logging from typing import Dict, Any from termcolor import colored # from ..encryption import EncryptionManager # Passed in kwargs logger = logging.getLogger(__name__) def handle_note(entry_data: Dict[str, Any], fingerprint: str, **kwargs): """Handles processing/displaying a secure note entry.""" encryption_manager = kwargs.get("encryption_manager") if not encryption_manager: logger.error("EncryptionManager not provided to note handler.") print(colored("Error: Cannot process note - internal setup issue.", "red")) return title = entry_data.get("title", "N/A") encrypted_content_b64 = entry_data.get("content") # Expecting base64 encoded encrypted bytes tags = entry_data.get("tags", []) if not encrypted_content_b64: logger.error(f"Missing encrypted content for note entry: {title}") print(colored(f"Error: Incomplete data for note '{title}'.", "red")) return try: # Decode from base64 then decrypt import base64 encrypted_content_bytes = base64.b64decode(encrypted_content_b64) content = encryption_manager.decrypt_data(encrypted_content_bytes).decode('utf-8') print(colored(f"--- Secure Note Entry ---", "cyan")) print(colored(f" Title: {title}", "cyan")) if tags: print(colored(f" Tags: {', '.join(tags)}", "cyan")) print(colored(f" Content:\n{content}", "yellow")) print(colored(f"-------------------------", "cyan")) except Exception as e: logger.error(f"Failed to decrypt note content for entry {title}: {e}", exc_info=True) print(colored(f"Error decrypting note '{title}': {e}", "red"))
- Update
password_manager/kinds.py
imports:# password_manager/kinds.py # ... (other imports) # --- Import Real Handlers --- from .handlers.generated_password_handler import handle_generated_password from .handlers.stored_password_handler import handle_stored_password from .handlers.note_handler import handle_note # Future handlers can be imported here # --- KINDS Definition --- (Use imported handlers now) KINDS: Dict[str, Dict[str, Any]] = { "generated_password": { "handler": handle_generated_password, # Use imported handler "description": "Generated Password (using BIP-85 index)", "fields": ["title", "username", "email", "url", "length", "bip85_index"], "nostr_kind": 31111, "identifier_tag": "seedpass_gp_" }, "stored_password": { "handler": handle_stored_password, # Use imported handler "description": "Stored Password / Credential", "fields": ["title", "username", "password", "url", "notes"], "nostr_kind": 31111, "identifier_tag": "seedpass_sp_" }, "note": { "handler": handle_note, # Use imported handler "description": "Secure Note", "fields": ["title", "content", "tags"], "nostr_kind": 31111, "identifier_tag": "seedpass_note_" }, # ... } # ... (rest of the helper functions)
4. Create password_manager/state_manager.py
:
# password_manager/state_manager.py
import json
import logging
from pathlib import Path
from typing import Optional, Dict, Any
import fcntl
import os
import traceback
from utils.file_lock import lock_file # Use the existing file lock utility
logger = logging.getLogger(__name__)
class StateManager:
"""Manages persistent state for a fingerprint, like last index and sync time."""
STATE_FILENAME = "seedpass_state.json"
def __init__(self, fingerprint_dir: Path):
self.fingerprint_dir = fingerprint_dir
self.state_file_path = self.fingerprint_dir / self.STATE_FILENAME
self._state: Dict[str, Any] = self._load_state()
def _load_state(self) -> Dict[str, Any]:
"""Loads state from the JSON file, returns default if not found or invalid."""
default_state = {"last_generated_password_index": -1, "last_nostr_sync_time": 0}
if not self.state_file_path.exists():
logger.info(f"State file not found for {self.fingerprint_dir.name}. Initializing default state.")
return default_state
try:
with lock_file(self.state_file_path, fcntl.LOCK_SH):
with open(self.state_file_path, 'r') as f:
state = json.load(f)
# Ensure essential keys exist
for key, default_value in default_state.items():
if key not in state:
state[key] = default_value
logger.debug(f"State loaded for {self.fingerprint_dir.name}")
return state
except (json.JSONDecodeError, IOError, ValueError) as e:
logger.error(f"Failed to load or parse state file {self.state_file_path}: {e}. Using default state.", exc_info=True)
return default_state
except Exception as e:
logger.error(f"Unexpected error loading state file {self.state_file_path}: {e}. Using default state.", exc_info=True)
return default_state
def _save_state(self) -> bool:
"""Saves the current state to the JSON file."""
try:
with lock_file(self.state_file_path, fcntl.LOCK_EX):
with open(self.state_file_path, 'w') as f:
json.dump(self._state, f, indent=4)
os.chmod(self.state_file_path, 0o600) # Ensure permissions
logger.debug(f"State saved for {self.fingerprint_dir.name}")
return True
except IOError as e:
logger.error(f"Failed to save state file {self.state_file_path}: {e}", exc_info=True)
return False
except Exception as e:
logger.error(f"Unexpected error saving state file {self.state_file_path}: {e}", exc_info=True)
return False
def get_last_generated_password_index(self) -> int:
"""Gets the last used index for generated passwords."""
# Ensure the key exists, defaulting if necessary
if "last_generated_password_index" not in self._state:
self._state["last_generated_password_index"] = -1
return self._state.get("last_generated_password_index", -1)
def set_last_generated_password_index(self, index: int) -> bool:
"""Sets the last used index for generated passwords and saves state."""
if not isinstance(index, int) or index < -1:
logger.error(f"Invalid index provided to set_last_generated_password_index: {index}")
return False
self._state["last_generated_password_index"] = index
logger.info(f"Setting last generated password index to: {index}")
return self._save_state()
def get_next_generated_password_index(self) -> int:
"""Gets the next available index and increments the stored value."""
current_index = self.get_last_generated_password_index()
next_index = current_index + 1
if self.set_last_generated_password_index(next_index):
return next_index
else:
# Handle save failure - maybe raise an exception?
logger.critical("Failed to save state after incrementing index! Potential index reuse risk.")
raise RuntimeError("Failed to update state for next generated password index.")
def get_last_nostr_sync_time(self) -> int:
"""Gets the timestamp of the last successful Nostr sync."""
# Ensure the key exists, defaulting if necessary
if "last_nostr_sync_time" not in self._state:
self._state["last_nostr_sync_time"] = 0
return self._state.get("last_nostr_sync_time", 0)
def set_last_nostr_sync_time(self, timestamp: int) -> bool:
"""Sets the timestamp of the last successful Nostr sync and saves state."""
if not isinstance(timestamp, int) or timestamp < 0:
logger.error(f"Invalid timestamp provided to set_last_nostr_sync_time: {timestamp}")
return False
self._state["last_nostr_sync_time"] = timestamp
logger.info(f"Setting last Nostr sync time to: {timestamp}")
return self._save_state()
Phase 2: Refactor EntryManager
and BackupManager
-
password_manager/entry_management.py
(Refactored):# password_manager/entry_management.py import json import logging import hashlib import sys import os import shutil import time import traceback import fcntl from pathlib import Path from typing import Optional, Dict, Any, List from termcolor import colored from .encryption import EncryptionManager # Keep this from utils.file_lock import lock_file # Keep this logger = logging.getLogger(__name__) class EntryManager: """Manages storage and retrieval of individual encrypted entry files.""" ENTRY_FILENAME_TEMPLATE = "entry_{entry_num}.json.enc" ENTRY_CHECKSUM_FIELD = "checksum" # Field within the decrypted JSON metadata def __init__(self, encryption_manager: EncryptionManager, fingerprint_dir: Path): """ Initializes the EntryManager. :param encryption_manager: The encryption manager instance. :param fingerprint_dir: The directory corresponding to the fingerprint. """ self.encryption_manager = encryption_manager self.fingerprint_dir = fingerprint_dir self.entries_dir = self.fingerprint_dir / 'entries' # Ensure the entries directory exists self.entries_dir.mkdir(parents=True, exist_ok=True) logger.debug(f"EntryManager initialized for directory {self.entries_dir}") def _get_entry_path(self, entry_num: int) -> Path: """Constructs the file path for a given entry number.""" return self.entries_dir / self.ENTRY_FILENAME_TEMPLATE.format(entry_num=entry_num) def get_next_entry_num(self) -> int: """Determines the next available entry number based on existing files.""" try: existing_entries = list(self.entries_dir.glob('entry_*.json.enc')) if not existing_entries: return 0 entry_nums = [] for entry_path in existing_entries: try: # Extract number from filename like 'entry_123.json.enc' num_str = entry_path.stem.split('_')[1] entry_nums.append(int(num_str)) except (IndexError, ValueError): logger.warning(f"Could not parse entry number from filename: {entry_path.name}") return max(entry_nums) + 1 if entry_nums else 0 except Exception as e: logger.error(f"Error determining next entry number: {e}", exc_info=True) print(colored(f"Error determining next entry number: {e}", 'red')) # Returning 0 might be risky, perhaps raise or exit? raise RuntimeError("Could not determine the next entry number.") from e def calculate_checksum(self, data_dict: Dict[str, Any]) -> str: """Calculates SHA-256 checksum of the provided data dictionary.""" try: # Ensure consistent ordering for checksum calculation data_string = json.dumps(data_dict, sort_keys=True).encode('utf-8') return hashlib.sha256(data_string).hexdigest() except Exception as e: logger.error(f"Error calculating checksum: {e}", exc_info=True) raise ValueError("Could not calculate checksum for data.") from e def save_entry(self, entry_num: int, encrypted_entry_data: bytes) -> bool: """Saves the encrypted data for a specific entry number.""" entry_path = self._get_entry_path(entry_num) try: with lock_file(entry_path, fcntl.LOCK_EX): with open(entry_path, 'wb') as f: f.write(encrypted_entry_data) os.chmod(entry_path, 0o600) # Ensure permissions logger.info(f"Entry {entry_num} saved successfully to {entry_path}.") return True except IOError as e: logger.error(f"Failed to save entry {entry_num} to {entry_path}: {e}", exc_info=True) print(colored(f"Error: Failed to save entry {entry_num}: {e}", 'red')) return False except Exception as e: logger.error(f"Unexpected error saving entry {entry_num}: {e}", exc_info=True) return False def load_entry(self, entry_num: int) -> Optional[Dict[str, Any]]: """Loads, decrypts, and returns the entry data for a specific entry number.""" entry_path = self._get_entry_path(entry_num) if not entry_path.exists(): logger.warning(f"Entry file not found: {entry_path}") return None try: # Use EncryptionManager's decrypt_file which handles locking decrypted_data_bytes = self.encryption_manager.decrypt_file(entry_path.relative_to(self.fingerprint_dir)) entry_dict = json.loads(decrypted_data_bytes.decode('utf-8')) logger.debug(f"Entry {entry_num} loaded successfully.") return entry_dict except json.JSONDecodeError as e: logger.error(f"Failed to decode JSON for entry {entry_num} from {entry_path}: {e}", exc_info=True) print(colored(f"Error: Corrupted data found for entry {entry_num}.", 'red')) return None except Exception as e: # Includes InvalidToken from decrypt_file logger.error(f"Failed to load or decrypt entry {entry_num} from {entry_path}: {e}", exc_info=True) # Don't show raw error to user unless needed # print(colored(f"Error: Failed to load entry {entry_num}: {e}", 'red')) return None def get_entry_checksum(self, entry_num: int) -> Optional[str]: """Retrieves the stored checksum from within an entry's metadata.""" entry_data = self.load_entry(entry_num) if entry_data: checksum = entry_data.get("metadata", {}).get(self.ENTRY_CHECKSUM_FIELD) if checksum: return checksum else: logger.warning(f"Checksum not found in metadata for entry {entry_num}") return None def delete_entry_file(self, entry_num: int) -> bool: """Deletes the file associated with an entry number.""" entry_path = self._get_entry_path(entry_num) if not entry_path.exists(): logger.warning(f"Attempted to delete non-existent entry file: {entry_path}") return False # Or True, as the state is achieved? Decide consistency. try: with lock_file(entry_path, fcntl.LOCK_EX): # Lock before deleting entry_path.unlink() logger.info(f"Entry file {entry_path} deleted successfully.") return True except OSError as e: logger.error(f"Failed to delete entry file {entry_path}: {e}", exc_info=True) print(colored(f"Error: Failed to delete entry file {entry_num}: {e}", 'red')) return False except Exception as e: logger.error(f"Unexpected error deleting entry file {entry_num}: {e}", exc_info=True) return False def list_all_entry_nums(self) -> List[int]: """Lists all available entry numbers by scanning the directory.""" entry_nums = [] try: for entry_path in self.entries_dir.glob('entry_*.json.enc'): try: num_str = entry_path.stem.split('_')[1] entry_nums.append(int(num_str)) except (IndexError, ValueError): logger.warning(f"Could not parse entry number from filename: {entry_path.name}") return sorted(entry_nums) except Exception as e: logger.error(f"Error listing entry numbers: {e}", exc_info=True) return [] # --- Methods related to the old single index are removed --- # remove _load_index, _save_index, add_entry (old), retrieve_entry (old) etc. # remove update_checksum (old) # remove get_encrypted_index (old)
-
password_manager/backup.py
(Refactored):# password_manager/backup.py import logging import os import shutil import time import traceback from pathlib import Path import fcntl # Keep fcntl import if used in lock_file from typing import List, Optional from termcolor import colored from utils.file_lock import lock_file logger = logging.getLogger(__name__) class BackupManager: """Handles backups for individual entry files.""" BACKUP_FILENAME_TEMPLATE = 'entry_{entry_num}_backup_{timestamp}.json.enc' def __init__(self, fingerprint_dir: Path): """ Initializes the BackupManager. :param fingerprint_dir: The directory corresponding to the fingerprint. """ self.fingerprint_dir = fingerprint_dir self.entries_dir = self.fingerprint_dir / 'entries' self.backups_dir = self.fingerprint_dir / 'backups' self.backups_dir.mkdir(parents=True, exist_ok=True) logger.debug(f"BackupManager initialized for backup directory {self.backups_dir}") def _get_entry_path(self, entry_num: int) -> Path: """Constructs the original entry file path.""" return self.entries_dir / f'entry_{entry_num}.json.enc' def create_backup_for_entry(self, entry_num: int) -> Optional[Path]: """Creates a timestamped backup for a specific entry file.""" entry_file = self._get_entry_path(entry_num) if not entry_file.exists(): logger.warning(f"Entry {entry_num} file does not exist at {entry_file}. No backup created.") print(colored(f"Warning: Entry file {entry_num} does not exist. No backup created.", 'yellow')) return None try: timestamp = int(time.time()) backup_filename = self.BACKUP_FILENAME_TEMPLATE.format(entry_num=entry_num, timestamp=timestamp) backup_file_path = self.backups_dir / backup_filename # Lock the source file for reading during copy with lock_file(entry_file, fcntl.LOCK_SH): shutil.copy2(entry_file, backup_file_path) # copy2 preserves metadata logger.info(f"Backup created for entry {entry_num} at '{backup_file_path}'.") print(colored(f"Backup created successfully for entry {entry_num}.", 'green')) return backup_file_path except Exception as e: logger.error(f"Failed to create backup for entry {entry_num}: {e}", exc_info=True) print(colored(f"Error: Failed to create backup for entry {entry_num}: {e}", 'red')) return None def list_backups_for_entry(self, entry_num: int) -> List[Path]: """Lists available backup files for a specific entry, sorted by time (newest first).""" try: backup_pattern = f'entry_{entry_num}_backup_*.json.enc' backup_files = sorted( self.backups_dir.glob(backup_pattern), key=lambda x: x.stat().st_mtime, reverse=True ) return backup_files except Exception as e: logger.error(f"Failed to list backups for entry {entry_num}: {e}", exc_info=True) return [] def list_all_backups(self) -> List[Path]: """Lists all backup files, sorted by time (newest first).""" try: backup_files = sorted( self.backups_dir.glob('entry_*_backup_*.json.enc'), key=lambda x: x.stat().st_mtime, reverse=True ) return backup_files except Exception as e: logger.error(f"Failed to list all backups: {e}", exc_info=True) return [] def display_backups(self, entry_num: Optional[int] = None): """Prints available backups to the console.""" if entry_num is not None: backup_files = self.list_backups_for_entry(entry_num) print(colored(f"Available Backups for Entry {entry_num}:", 'cyan')) else: backup_files = self.list_all_backups() print(colored("Available Backups (All Entries):", 'cyan')) if not backup_files: logger.info("No backup files available.") print(colored("No backup files available.", 'yellow')) return for backup in backup_files: try: creation_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(backup.stat().st_mtime)) print(colored(f"- {backup.name} (Created on: {creation_time})", 'cyan')) except Exception as e: logger.warning(f"Could not get stat for backup file {backup.name}: {e}") print(colored(f"- {backup.name} (Error reading time)", "red")) def restore_entry_from_backup(self, entry_num: int, backup_filename: str) -> bool: """Restores an entry file from a specific backup file.""" entry_file = self._get_entry_path(entry_num) backup_file = self.backups_dir / backup_filename # Basic check to ensure the backup filename matches the entry number pattern if not backup_filename.startswith(f'entry_{entry_num}_backup_'): logger.error(f"Backup filename '{backup_filename}' does not match entry number {entry_num}.") print(colored("Error: Backup file name does not match the entry number.", 'red')) return False if not backup_file.exists(): logger.error(f"Backup file '{backup_file}' not found.") print(colored(f"Error: Backup file '{backup_filename}' not found.", 'red')) return False try: # Lock the destination file exclusively during restore with lock_file(entry_file, fcntl.LOCK_EX): shutil.copy2(backup_file, entry_file) # copy2 preserves metadata logger.info(f"Entry {entry_num} restored successfully from backup '{backup_filename}'.") print(colored(f"Restored entry {entry_num} from backup '{backup_filename}'.", 'green')) return True except Exception as e: logger.error(f"Failed to restore entry {entry_num} from backup '{backup_filename}': {e}", exc_info=True) print(colored(f"Error: Failed to restore entry {entry_num} from backup: {e}", 'red')) return False # --- Methods related to the old single index are removed --- # Remove restore_latest_backup (old), restore_backup_by_timestamp (old) etc.
Phase 3: Refactor PasswordManager
password_manager/manager.py
(Major Refactoring):# password_manager/manager.py import sys import json import logging import getpass import os import base64 # Added import uuid # Added from datetime import datetime # Added from typing import Optional, Dict, Any, List import shutil from colorama import Fore, Style # Style Added from termcolor import colored from .encryption import EncryptionManager from .entry_management import EntryManager # Modified Import Path from .password_generation import PasswordGenerator from .backup import BackupManager # Modified Import Path from .state_manager import StateManager # Added from .kinds import KINDS, get_kind_details, get_all_kinds, get_required_fields, get_kind_handler # Added from utils.key_derivation import derive_key_from_password from utils.checksum import calculate_checksum as calculate_script_checksum, verify_checksum as verify_script_checksum # Renamed for clarity from utils.password_prompt import prompt_for_password, prompt_existing_password, confirm_action from constants import ( APP_DIR, PARENT_SEED_FILE as OLD_PARENT_SEED_FILENAME, # Rename old constant if needed SCRIPT_CHECKSUM_FILE, MIN_PASSWORD_LENGTH, MAX_PASSWORD_LENGTH, DEFAULT_PASSWORD_LENGTH, DEFAULT_SEED_BACKUP_FILENAME ) import traceback import bcrypt from pathlib import Path from local_bip85.bip85 import BIP85 from bip_utils import Bip39SeedGenerator from utils.fingerprint_manager import FingerprintManager from nostr.client import NostrClient logger = logging.getLogger(__name__) # --- Define constants for new structure --- ENTRIES_DIR_NAME = "entries" BACKUPS_DIR_NAME = "backups" PARENT_SEED_FILENAME = "parent_seed.enc" HASHED_PASSWORD_FILENAME = "hashed_password.enc" OLD_INDEX_FILENAME = 'seedpass_passwords_db.json.enc' # For migration check class PasswordManager: """ Manages password entries, encryption, Nostr sync, and user interaction using individual entry files and 'kinds'. """ def __init__(self): self.encryption_manager: Optional[EncryptionManager] = None self.entry_manager: Optional[EntryManager] = None self.password_generator: Optional[PasswordGenerator] = None self.backup_manager: Optional[BackupManager] = None self.fingerprint_manager: Optional[FingerprintManager] = None self.state_manager: Optional[StateManager] = None # Added self.parent_seed: Optional[str] = None self.bip85: Optional[BIP85] = None self.nostr_client: Optional[NostrClient] = None self.current_fingerprint: Optional[str] = None # Added for clarity self.fingerprint_dir: Optional[Path] = None # Added for clarity self.entries_dir: Optional[Path] = None # Added self.backups_dir: Optional[Path] = None # Added try: self.initialize_fingerprint_manager() self.setup_parent_seed() # This now includes selecting/adding fingerprint and initializing managers # Perform data migration check *after* managers are initialized for the selected fingerprint if self.fingerprint_dir: # Ensure fingerprint_dir is set self.migrate_data_if_needed() # Initial synchronization with Nostr after setup/migration if self.nostr_client: self.synchronize_with_nostr() # Optional: run sync on startup except Exception as e: logger.critical(f"Critical error during PasswordManager initialization: {e}", exc_info=True) print(colored(f"FATAL ERROR during startup: {e}. Check logs.", "red", attrs=["bold"])) sys.exit(1) def initialize_fingerprint_manager(self): """Initializes the FingerprintManager.""" try: self.fingerprint_manager = FingerprintManager(APP_DIR) logger.debug("FingerprintManager initialized successfully.") except Exception as e: logger.error(f"Failed to initialize FingerprintManager: {e}", exc_info=True) print(colored(f"Error: Failed to initialize FingerprintManager: {e}", 'red')) sys.exit(1) def setup_parent_seed(self) -> None: """Guides user through selecting or adding a fingerprint and initializes components.""" fingerprints = self.fingerprint_manager.list_fingerprints() if fingerprints: self.select_or_add_fingerprint() else: print(colored("No existing SeedPass profiles (fingerprints) found.", 'yellow')) self.handle_new_seed_setup() # Ensure initialization happened after selection/creation if not self.current_fingerprint or not self.fingerprint_dir or not self.encryption_manager: logger.critical("Fingerprint selection or initialization failed.") print(colored("Error: Could not set up a valid SeedPass profile.", 'red')) sys.exit(1) def select_or_add_fingerprint(self): """Prompts user to select existing fingerprint or add a new one.""" try: print(colored("\nAvailable SeedPass Profiles (Fingerprints):", 'cyan')) fingerprints = self.fingerprint_manager.list_fingerprints() for idx, fp in enumerate(fingerprints, start=1): print(colored(f"{idx}. {fp}", 'cyan')) print(colored(f"{len(fingerprints) + 1}. Add a new profile (generate or import seed)", 'cyan')) print(colored(f"{len(fingerprints) + 2}. Exit", 'cyan')) while True: choice_str = input("Select a profile by number or choose an action: ").strip() if not choice_str.isdigit(): print(colored("Invalid input. Please enter a number.", 'red')) continue choice = int(choice_str) if 1 <= choice <= len(fingerprints): selected_fingerprint = fingerprints[choice - 1] self.select_fingerprint(selected_fingerprint) break # Exit loop on valid selection elif choice == len(fingerprints) + 1: # Add a new fingerprint new_fingerprint = self.add_new_fingerprint() if new_fingerprint: self.select_fingerprint(new_fingerprint) # Select the newly added one else: print(colored("Failed to add new profile. Exiting.", "red")) sys.exit(1) break # Exit loop elif choice == len(fingerprints) + 2: print(colored("Exiting.", "yellow")) sys.exit(0) else: print(colored("Invalid selection.", 'red')) except Exception as e: logger.error(f"Error during fingerprint selection: {e}", exc_info=True) print(colored(f"Error: Failed to select profile: {e}", 'red')) sys.exit(1) def add_new_fingerprint(self) -> Optional[str]: """Guides user to add a new fingerprint/profile. Returns the new fingerprint or None.""" try: print(colored("\n--- Add New SeedPass Profile ---", "yellow")) choice = input("Do you want to (1) Enter an existing 12-word seed or (2) Generate a new 12-word seed? (1/2): ").strip() new_fingerprint = None if choice == '1': new_fingerprint = self.setup_existing_seed() elif choice == '2': new_fingerprint = self.generate_new_seed() else: print(colored("Invalid choice.", 'red')) return None # Indicate failure if new_fingerprint: # Don't automatically select here, let select_or_add_fingerprint handle it print(colored(f"New profile with fingerprint '{new_fingerprint}' created.", 'green')) return new_fingerprint else: return None # Indicate failure except Exception as e: logger.error(f"Error adding new fingerprint: {e}", exc_info=True) print(colored(f"Error: Failed to add new profile: {e}", 'red')) return None def select_fingerprint(self, fingerprint: str) -> bool: """Sets the selected fingerprint as active and initializes all managers.""" if self.fingerprint_manager.select_fingerprint(fingerprint): self.current_fingerprint = fingerprint self.fingerprint_dir = self.fingerprint_manager.get_current_fingerprint_dir() if not self.fingerprint_dir: print(colored(f"Error: Fingerprint directory for {fingerprint} not found.", 'red')) return False # Indicate failure # Setup encryption requires password for the selected fingerprint password = prompt_existing_password(f"Enter master password for profile '{fingerprint}': ") if not self.setup_encryption_manager(self.fingerprint_dir, password): # setup_encryption_manager now handles verify_password internally print(colored("Password verification failed. Cannot switch profile.", "red")) # Reset state if needed self.current_fingerprint = None self.fingerprint_dir = None self.encryption_manager = None return False # Indicate failure # Define entry/backup dirs based on selected fingerprint self.entries_dir = self.fingerprint_dir / ENTRIES_DIR_NAME self.backups_dir = self.fingerprint_dir / BACKUPS_DIR_NAME self.entries_dir.mkdir(parents=True, exist_ok=True) # Ensure they exist self.backups_dir.mkdir(parents=True, exist_ok=True) # Load parent seed (requires encryption manager) if not self.load_parent_seed(self.fingerprint_dir): # Reset state self.current_fingerprint = None self.fingerprint_dir = None self.encryption_manager = None return False # Indicate failure # Initialize BIP85 (requires parent seed) if not self.initialize_bip85(): return False # Indicate failure # Initialize other managers (requires encryption_manager, dirs, bip85 etc.) if not self.initialize_managers(): return False # Indicate failure print(colored(f"Profile '{fingerprint}' selected and ready.", 'green')) return True else: print(colored(f"Error: Profile (fingerprint) '{fingerprint}' not found.", 'red')) return False def setup_encryption_manager(self, fingerprint_dir: Path, password: str) -> bool: """Sets up EncryptionManager and verifies password. Returns True on success.""" try: key = derive_key_from_password(password) self.encryption_manager = EncryptionManager(key, fingerprint_dir) logger.debug(f"EncryptionManager set up for {fingerprint_dir.name}.") # Verify password against stored hash if not self.verify_password(password): self.encryption_manager = None # Clear invalid manager return False # Indicate failure return True # Success except Exception as e: logger.error(f"Failed to set up EncryptionManager: {e}", exc_info=True) print(colored(f"Error: Failed to set up encryption: {e}", 'red')) self.encryption_manager = None return False def load_parent_seed(self, fingerprint_dir: Path) -> bool: """Loads and decrypts parent seed. Returns True on success.""" if not self.encryption_manager: logger.error("Cannot load parent seed: EncryptionManager not initialized.") return False try: self.parent_seed = self.encryption_manager.decrypt_parent_seed() logger.debug(f"Parent seed loaded for profile {self.current_fingerprint}.") return True except Exception as e: # Decrypt_parent_seed already logs and prints errors logger.error(f"Failed to load parent seed for {self.current_fingerprint}: {e}", exc_info=False) # Avoid redundant stack trace print(colored(f"Error: Could not load the parent seed for this profile.", 'red')) self.parent_seed = None return False def initialize_bip85(self) -> bool: """Initializes BIP85 generator. Returns True on success.""" if not self.parent_seed: logger.error("Cannot initialize BIP85: Parent seed not loaded.") return False try: seed_bytes = Bip39SeedGenerator(self.parent_seed).Generate() self.bip85 = BIP85(seed_bytes) logger.debug("BIP-85 initialized successfully.") return True except Exception as e: logger.error(f"Failed to initialize BIP-85: {e}", exc_info=True) print(colored(f"Error: Failed to initialize BIP-85: {e}", 'red')) self.bip85 = None return False def initialize_managers(self) -> bool: """Initializes EntryManager, PasswordGenerator, BackupManager, StateManager, NostrClient.""" # Check prerequisites if not all([self.encryption_manager, self.fingerprint_dir, self.entries_dir, self.backups_dir, self.parent_seed, self.bip85, self.current_fingerprint]): logger.error("Cannot initialize managers: Prerequisites missing.") return False try: # Initialize State Manager first self.state_manager = StateManager(self.fingerprint_dir) self.entry_manager = EntryManager( encryption_manager=self.encryption_manager, fingerprint_dir=self.fingerprint_dir # entries_dir passed via fingerprint_dir in its init ) self.password_generator = PasswordGenerator( encryption_manager=self.encryption_manager, # Needed for derive_seed_from_mnemonic parent_seed=self.parent_seed, bip85=self.bip85 ) self.backup_manager = BackupManager( fingerprint_dir=self.fingerprint_dir # backup_dir passed via fingerprint_dir in its init ) # Initialize NostrClient (ensure NostrClient init is updated) self.nostr_client = NostrClient( encryption_manager=self.encryption_manager, fingerprint=self.current_fingerprint, # Pass PasswordManager instance for callbacks if needed by EventHandler # password_manager_ref=self ) logger.debug(f"All managers initialized for profile {self.current_fingerprint}.") return True except Exception as e: logger.error(f"Failed to initialize managers: {e}", exc_info=True) print(colored(f"Error: Failed to initialize managers: {e}", 'red')) # Clean up partially initialized managers? self.state_manager = None self.entry_manager = None self.password_generator = None self.backup_manager = None self.nostr_client = None return False # --- Seed Setup Handlers (Modified) --- def handle_new_seed_setup(self) -> None: """Handles setup when no profiles exist.""" print(colored("Welcome to SeedPass! Let's create your first profile.", 'yellow')) new_fingerprint = self.add_new_fingerprint() # This handles generate/import choice if new_fingerprint: self.select_fingerprint(new_fingerprint) # Select and initialize else: print(colored("Failed to create initial profile. Exiting.", "red")) sys.exit(1) def setup_existing_seed(self) -> Optional[str]: """Handles importing an existing seed phrase.""" try: parent_seed = getpass.getpass(prompt='Enter your 12-word BIP-39 seed phrase: ').strip() if not self.validate_bip85_seed(parent_seed): print(colored("Error: Invalid 12-word seed phrase format.", 'red')) return None fingerprint = self.fingerprint_manager.add_fingerprint(parent_seed) if not fingerprint: print(colored("Error: Failed to add profile for the provided seed (maybe it already exists?).", 'red')) # FingerprintManager logs specific error return None # Could be duplicate or generation failure fingerprint_dir = self.fingerprint_manager.get_fingerprint_directory(fingerprint) if not fingerprint_dir: print(colored("Error: Failed to create profile directory.", 'red')) # Attempt cleanup? self.fingerprint_manager.remove_fingerprint(fingerprint) return None print(colored(f"Profile '{fingerprint}' created. Now set its master password.", 'green')) # Need to save the seed and password hash *for this new fingerprint* # Temporarily set context to save correctly temp_fp_dir = self.fingerprint_dir # Save old context if any self.fingerprint_dir = fingerprint_dir if not self.save_seed_and_password(parent_seed, fingerprint_dir): print(colored("Error saving seed or password. Rolling back profile creation.", "red")) self.fingerprint_manager.remove_fingerprint(fingerprint) # Cleanup self.fingerprint_dir = temp_fp_dir # Restore context return None self.fingerprint_dir = temp_fp_dir # Restore context return fingerprint except KeyboardInterrupt: print(colored("\nOperation cancelled by user.", 'yellow')) return None except Exception as e: logger.error(f"Error setting up existing seed: {e}", exc_info=True) print(colored(f"Error importing seed: {e}", 'red')) return None def generate_new_seed(self) -> Optional[str]: """Handles generating a new seed phrase.""" try: new_seed = self.generate_bip85_seed() print(colored("\n=== Your New 12-Word Master Seed Phrase ===", 'yellow', attrs=['bold'])) print(colored(new_seed, 'cyan')) print(colored("=============================================", 'yellow', attrs=['bold'])) print(colored("WRITE THIS DOWN NOW!", 'red', attrs=['blink'])) print(colored("Store it securely offline. Losing this means losing all derived passwords.", 'red')) print(colored("Do not store it digitally unless you understand the risks.", 'red')) if not confirm_action("\nHave you securely written down this seed phrase? (Y/N): "): print(colored("Seed generation cancelled. Please run again when ready.", 'yellow')) return None fingerprint = self.fingerprint_manager.add_fingerprint(new_seed) if not fingerprint: print(colored("Error: Failed to add profile for the new seed.", 'red')) return None fingerprint_dir = self.fingerprint_manager.get_fingerprint_directory(fingerprint) if not fingerprint_dir: print(colored("Error: Failed to create profile directory.", 'red')) self.fingerprint_manager.remove_fingerprint(fingerprint) return None print(colored(f"\nProfile '{fingerprint}' created. Now set its master password.", 'green')) # Temporarily set context to save correctly temp_fp_dir = self.fingerprint_dir # Save old context if any self.fingerprint_dir = fingerprint_dir if not self.save_seed_and_password(new_seed, fingerprint_dir): print(colored("Error saving seed or password. Rolling back profile creation.", "red")) self.fingerprint_manager.remove_fingerprint(fingerprint) # Cleanup self.fingerprint_dir = temp_fp_dir # Restore context return None self.fingerprint_dir = temp_fp_dir # Restore context return fingerprint except KeyboardInterrupt: print(colored("\nOperation cancelled by user.", 'yellow')) return None except Exception as e: logger.error(f"Error generating new seed: {e}", exc_info=True) print(colored(f"Error generating seed: {e}", 'red')) return None def save_seed_and_password(self, seed: str, fingerprint_dir: Path) -> bool: """Internal helper to prompt for password, save hash, and save encrypted seed.""" try: password = prompt_for_password() # Prompts for new + confirm # Derive key and setup temporary encryption manager for saving key = derive_key_from_password(password) temp_enc_mgr = EncryptionManager(key, fingerprint_dir) # Store hashed password within the target fingerprint dir if not self._store_hashed_password(password, fingerprint_dir): raise RuntimeError("Failed to store hashed password.") # Encrypt and save parent seed within the target fingerprint dir temp_enc_mgr.encrypt_parent_seed(seed) # encrypt_parent_seed handles saving to file logger.info(f"Seed and password hash saved successfully for profile {fingerprint_dir.name}.") return True except Exception as e: logger.error(f"Failed to encrypt/save seed or password hash for {fingerprint_dir.name}: {e}", exc_info=True) # Cleanup potentially created hash file? Difficult to do atomically here. return False # --- Core Entry Operations (NEW) --- def add_entry(self, kind: str, entry_data: Dict[str, Any]) -> Optional[int]: """ Adds a new entry of the specified kind, saves locally, and posts to Nostr. :param kind: The type of entry (must exist in KINDS). :param entry_data: The data payload for the entry. :return: The assigned entry number if successful, None otherwise. """ if not all([self.entry_manager, self.encryption_manager, self.state_manager, self.nostr_client, self.current_fingerprint]): logger.error("Cannot add entry: PasswordManager not fully initialized.") print(colored("Error: System not ready. Please restart.", "red")) return None kind_details = get_kind_details(kind) if not kind_details: logger.error(f"Attempted to add entry with unknown kind: {kind}") print(colored(f"Error: Unknown entry type '{kind}'.", "red")) return None # Add necessary metadata entry_num = self.entry_manager.get_next_entry_num() timestamp = datetime.utcnow().isoformat() + 'Z' checksum = self.entry_manager.calculate_checksum(entry_data) # Checksum of the *data* part # Handle bip85 index for generated passwords bip85_index = None if kind == "generated_password": # Check if bip85_index was passed in entry_data (e.g. during migration) if "bip85_index" not in entry_data: bip85_index = self.state_manager.get_next_generated_password_index() entry_data["bip85_index"] = bip85_index # Add it to the data part # Recalculate checksum if index was added checksum = self.entry_manager.calculate_checksum(entry_data) else: bip85_index = entry_data["bip85_index"] # Ensure state manager is updated if migrating an index higher than current max last_known_index = self.state_manager.get_last_generated_password_index() if bip85_index > last_known_index: self.state_manager.set_last_generated_password_index(bip85_index) # Encrypt sensitive fields within entry_data before creating the full entry JSON # Example: encrypt 'password' for stored_password, 'content' for note if kind == "stored_password" and "password" in entry_data: try: pwd_bytes = entry_data["password"].encode('utf-8') encrypted_pwd_bytes = self.encryption_manager.encrypt_data(pwd_bytes) entry_data["password"] = base64.b64encode(encrypted_pwd_bytes).decode('utf-8') # Store as base64 string checksum = self.entry_manager.calculate_checksum(entry_data) # Recalculate checksum except Exception as enc_err: logger.error(f"Failed to encrypt password for stored_password entry {entry_num}: {enc_err}", exc_info=True) print(colored("Error encrypting password data.", "red")) return None elif kind == "note" and "content" in entry_data: try: content_bytes = entry_data["content"].encode('utf-8') encrypted_content_bytes = self.encryption_manager.encrypt_data(content_bytes) entry_data["content"] = base64.b64encode(encrypted_content_bytes).decode('utf-8') # Store as base64 string checksum = self.entry_manager.calculate_checksum(entry_data) # Recalculate checksum except Exception as enc_err: logger.error(f"Failed to encrypt content for note entry {entry_num}: {enc_err}", exc_info=True) print(colored("Error encrypting note data.", "red")) return None # Construct the full entry structure (to be encrypted) full_entry = { "entry_num": entry_num, "fingerprint": self.current_fingerprint, "kind": kind, "data": entry_data, # Contains potentially pre-encrypted fields "timestamp": timestamp, # UTC timestamp of creation/last update "metadata": { "created_at": timestamp, # Keep original creation time separate? maybe not needed. "updated_at": timestamp, "checksum": checksum # Checksum of the 'data' part } } # Add bip85_index to top level for generated_password for easier access if needed # This is somewhat redundant but might be useful for retrieval/display logic. if kind == "generated_password": full_entry["bip85_index"] = bip85_index try: # Encrypt the entire entry structure entry_json = json.dumps(full_entry).encode('utf-8') encrypted_entry_data = self.encryption_manager.encrypt_data(entry_json) # Save the encrypted entry locally if not self.entry_manager.save_entry(entry_num, encrypted_entry_data): # EntryManager logs the error print(colored(f"Error: Failed to save entry {entry_num} locally.", 'red')) # Potential rollback needed? Difficult state. return None # Create a backup of the newly saved entry self.backup_manager.create_backup_for_entry(entry_num) # Post the encrypted entry to Nostr # Use a unique identifier ('d' tag) for replaceable events identifier = f"{kind_details['identifier_tag']}{entry_num}" nostr_kind_int = kind_details['nostr_kind'] self.nostr_client.publish_entry( encrypted_entry_data=encrypted_entry_data, # Already encrypted full entry nostr_kind=nostr_kind_int, d_tag=identifier ) logger.info(f"Entry {entry_num} (Kind: {kind}, ID: {identifier}) added locally and posted to Nostr.") print(colored(f"Entry {entry_num} added successfully.", 'green')) return entry_num except Exception as e: logger.error(f"Failed during final steps of adding entry {entry_num}: {e}", exc_info=True) print(colored(f"Error: Failed to complete adding entry {entry_num}: {e}", 'red')) # Attempt to clean up the saved file if posting failed? # self.entry_manager.delete_entry_file(entry_num) # Risky if Nostr post *did* succeed partially return None def modify_entry(self, entry_num: int, updated_data_fields: Dict[str, Any]) -> bool: """ Modifies an existing entry, saves locally, and posts update to Nostr. :param entry_num: The number of the entry to modify. :param updated_data_fields: Dictionary containing only the fields to update within the 'data' part. :return: True if successful, False otherwise. """ if not all([self.entry_manager, self.encryption_manager, self.nostr_client]): logger.error("Cannot modify entry: PasswordManager not fully initialized.") return False # Load existing entry existing_entry = self.entry_manager.load_entry(entry_num) if not existing_entry: print(colored(f"Error: Entry {entry_num} not found.", 'red')) return False kind = existing_entry.get("kind") kind_details = get_kind_details(kind) if not kind_details: logger.error(f"Cannot modify entry {entry_num}: Unknown kind '{kind}' found in loaded data.") print(colored(f"Error: Cannot modify entry {entry_num} due to corrupted kind.", 'red')) return False # Create backup before modifying self.backup_manager.create_backup_for_entry(entry_num) # Update the 'data' part original_data = existing_entry.get("data", {}) # Decrypt sensitive fields *before* updating if necessary # Example: Decrypt 'password' for stored_password, 'content' for note if kind == "stored_password" and "password" in original_data: try: pwd_b64 = original_data["password"] pwd_bytes = self.encryption_manager.decrypt_data(base64.b64decode(pwd_b64)) original_data["password"] = pwd_bytes.decode('utf-8') # Temporarily store decrypted for update logic except Exception as dec_err: logger.error(f"Failed to decrypt password for modification in entry {entry_num}: {dec_err}", exc_info=True) print(colored("Error preparing password field for modification.", "red")) return False elif kind == "note" and "content" in original_data: try: content_b64 = original_data["content"] content_bytes = self.encryption_manager.decrypt_data(base64.b64decode(content_b64)) original_data["content"] = content_bytes.decode('utf-8') # Temporarily store decrypted except Exception as dec_err: logger.error(f"Failed to decrypt content for modification in entry {entry_num}: {dec_err}", exc_info=True) print(colored("Error preparing note content for modification.", "red")) return False # Apply the updates from updated_data_fields original_data.update(updated_data_fields) # Re-encrypt sensitive fields *after* updating if kind == "stored_password" and "password" in original_data: try: pwd_bytes = original_data["password"].encode('utf-8') encrypted_pwd_bytes = self.encryption_manager.encrypt_data(pwd_bytes) original_data["password"] = base64.b64encode(encrypted_pwd_bytes).decode('utf-8') # Store as base64 string again except Exception as enc_err: logger.error(f"Failed to re-encrypt password for stored_password entry {entry_num}: {enc_err}", exc_info=True) print(colored("Error encrypting updated password data.", "red")) return False elif kind == "note" and "content" in original_data: try: content_bytes = original_data["content"].encode('utf-8') encrypted_content_bytes = self.encryption_manager.encrypt_data(content_bytes) original_data["content"] = base64.b64encode(encrypted_content_bytes).decode('utf-8') # Store as base64 string again except Exception as enc_err: logger.error(f"Failed to re-encrypt content for note entry {entry_num}: {enc_err}", exc_info=True) print(colored("Error encrypting updated note data.", "red")) return False # Update timestamp and recalculate checksum new_timestamp = datetime.utcnow().isoformat() + 'Z' new_checksum = self.entry_manager.calculate_checksum(original_data) # Update the full entry structure existing_entry["data"] = original_data # Put potentially re-encrypted data back existing_entry["timestamp"] = new_timestamp if "metadata" not in existing_entry: existing_entry["metadata"] = {} existing_entry["metadata"]["updated_at"] = new_timestamp existing_entry["metadata"]["checksum"] = new_checksum try: # Encrypt the updated full entry entry_json = json.dumps(existing_entry).encode('utf-8') encrypted_entry_data = self.encryption_manager.encrypt_data(entry_json) # Save locally if not self.entry_manager.save_entry(entry_num, encrypted_entry_data): print(colored(f"Error: Failed to save updated entry {entry_num} locally.", 'red')) return False # Post update to Nostr (as a replaceable event) identifier = f"{kind_details['identifier_tag']}{entry_num}" nostr_kind_int = kind_details['nostr_kind'] self.nostr_client.publish_entry( encrypted_entry_data=encrypted_entry_data, nostr_kind=nostr_kind_int, d_tag=identifier ) logger.info(f"Entry {entry_num} modified locally and update posted to Nostr.") print(colored(f"Entry {entry_num} updated successfully.", 'green')) return True except Exception as e: logger.error(f"Failed during final steps of modifying entry {entry_num}: {e}", exc_info=True) print(colored(f"Error: Failed to complete modifying entry {entry_num}: {e}", 'red')) # Consider attempting to restore the backup? return False def delete_entry(self, entry_num: int) -> bool: """Deletes an entry locally and posts a deletion marker to Nostr.""" if not all([self.entry_manager, self.nostr_client]): logger.error("Cannot delete entry: PasswordManager not fully initialized.") return False # Load entry to get kind details for Nostr deletion marker entry_data = self.entry_manager.load_entry(entry_num) if not entry_data: print(colored(f"Warning: Entry {entry_num} not found locally. Cannot delete.", 'yellow')) # Maybe still try to post deletion to Nostr? # For now, assume local file must exist. return False kind = entry_data.get("kind") kind_details = get_kind_details(kind) if not kind_details: logger.warning(f"Cannot determine kind for entry {entry_num} during deletion.") # Proceed with file deletion, but maybe skip Nostr? else: # Create backup before deleting self.backup_manager.create_backup_for_entry(entry_num) # Delete local file first if not self.entry_manager.delete_entry_file(entry_num): print(colored(f"Error: Failed to delete local file for entry {entry_num}.", 'red')) # Don't post deletion to Nostr if local delete failed return False # Post deletion marker to Nostr (e.g., Kind 5 event referencing the replaceable event) if kind_details: identifier = f"{kind_details['identifier_tag']}{entry_num}" nostr_kind_to_delete = kind_details['nostr_kind'] # We need the event ID of the event we want to delete if using Kind 5 # Fetching the event ID first might be complex/slow. # Alternative: Publish an empty content replaceable event? Easier. # Let's publish an empty content update for the replaceable event. # Note: Relays might prune empty events faster. Kind 5 is more explicit. # Decision: Publish empty content replaceable event for simplicity now. try: # Create a dummy entry structure with empty data for checksum empty_data_checksum = self.entry_manager.calculate_checksum({}) tombstone_entry = { "entry_num": entry_num, "fingerprint": self.current_fingerprint, "kind": kind, "data": {}, # Empty data "timestamp": datetime.utcnow().isoformat() + 'Z', "metadata": { "deleted": True, # Add deletion flag "updated_at": datetime.utcnow().isoformat() + 'Z', "checksum": empty_data_checksum } } entry_json = json.dumps(tombstone_entry).encode('utf-8') encrypted_tombstone_data = self.encryption_manager.encrypt_data(entry_json) self.nostr_client.publish_entry( encrypted_entry_data=encrypted_tombstone_data, nostr_kind=nostr_kind_to_delete, d_tag=identifier, is_deletion=True # Add flag for logging/handling in client ) logger.info(f"Deletion marker for entry {entry_num} (ID: {identifier}) posted to Nostr.") except Exception as e: logger.error(f"Failed to post deletion marker to Nostr for entry {entry_num}: {e}", exc_info=True) # Local file is already deleted. Log inconsistency. print(colored(f"Warning: Local entry {entry_num} deleted, but failed to post deletion to Nostr.", 'yellow')) # Still return True as local deletion succeeded? Or False due to incomplete operation? # Let's return True as the primary goal (local deletion) was met. print(colored(f"Entry {entry_num} deleted successfully.", 'green')) return True def list_all_entries(self) -> List[Dict[str, Any]]: """Loads all local entries and returns them as a list of dictionaries.""" if not self.entry_manager: return [] all_entries = [] entry_nums = self.entry_manager.list_all_entry_nums() for num in entry_nums: entry = self.entry_manager.load_entry(num) if entry: all_entries.append(entry) return all_entries def process_entry(self, entry: Dict[str, Any]): """ Processes an individual entry based on its kind using the registered handler. :param entry: The entry data dictionary (decrypted). """ if not self.encryption_manager or not self.password_generator: logger.error("Cannot process entry: Required managers not initialized.") return try: kind = entry.get('kind') data = entry.get('data', {}) fingerprint = entry.get('fingerprint') entry_num = entry.get('entry_num', 'N/A') handler = get_kind_handler(kind) if handler: # Pass necessary components to the handler via kwargs handler_kwargs = { "encryption_manager": self.encryption_manager, "password_generator": self.password_generator, # Add other managers if handlers need them } handler(data, fingerprint, **handler_kwargs) logger.debug(f"Processed entry {entry_num} of kind '{kind}'.") else: logger.warning(f"No handler found for kind '{kind}'. Skipping processing for entry {entry_num}.") print(colored(f"Warning: Cannot process entry {entry_num} - unknown type '{kind}'.", "yellow")) except Exception as e: logger.error(f"Failed to process entry {entry.get('entry_num', 'N/A')}: {e}", exc_info=True) print(colored(f"Error processing entry {entry.get('entry_num', 'N/A')}: {e}", 'red')) def synchronize_with_nostr(self): """Fetches entries from Nostr and updates local storage.""" if not self.nostr_client or not self.entry_manager or not self.encryption_manager or not self.state_manager: logger.error("Cannot synchronize: Required managers not initialized.") print(colored("Error: Cannot synchronize with Nostr - system not ready.", "red")) return print(colored("Synchronizing with Nostr... Please wait.", "yellow")) try: last_sync_time = self.state_manager.get_last_nostr_sync_time() # Fetch events since last sync # Modify fetch_all_entries_async in NostrClient to accept a 'since' timestamp # Use a reasonable limit initially, might need pagination for huge histories nostr_events = self.nostr_client.fetch_all_entries_sync(since=last_sync_time, limit=500) # Sync version if nostr_events is None: # Indicates an error during fetch print(colored("Synchronization failed: Could not retrieve data from Nostr.", "red")) return if not nostr_events: print(colored("No new entries found on Nostr since last sync.", "green")) # Still update sync time? Yes, confirms we checked. self.state_manager.set_last_nostr_sync_time(int(time.time())) return newest_event_time = last_sync_time processed_count = 0 updated_count = 0 new_count = 0 deleted_count = 0 error_count = 0 # Process newest events first for event in sorted(nostr_events, key=lambda e: e.created_at, reverse=True): if event.created_at > newest_event_time: newest_event_time = event.created_at try: encrypted_content_b64 = event.content encrypted_content_bytes = base64.b64decode(encrypted_content_b64) decrypted_content_bytes = self.encryption_manager.decrypt_data(encrypted_content_bytes) entry = json.loads(decrypted_content_bytes.decode('utf-8')) entry_num = entry.get('entry_num') remote_checksum = entry.get('metadata', {}).get('checksum') is_deleted = entry.get('metadata', {}).get('deleted', False) # Check deletion flag if entry_num is None or remote_checksum is None: logger.warning(f"Skipping invalid Nostr event (ID: {event.id}): Missing entry_num or checksum.") error_count += 1 continue local_entry_path = self.entry_manager._get_entry_path(entry_num) # Use internal helper if is_deleted: # Handle deletion marker if local_entry_path.exists(): print(colored(f"Processing deletion for entry {entry_num}...", "magenta")) # Optional: backup before deleting based on sync? Risky. # self.backup_manager.create_backup_for_entry(entry_num) if self.entry_manager.delete_entry_file(entry_num): deleted_count += 1 else: error_count += 1 # Failed local delete else: logger.debug(f"Received deletion marker for already deleted/non-existent entry {entry_num}.") continue # Don't process further if deleted # Compare with local version if local_entry_path.exists(): local_checksum = self.entry_manager.get_entry_checksum(entry_num) if local_checksum is None: # Error reading local checksum logger.warning(f"Could not read local checksum for entry {entry_num}. Skipping update check.") error_count += 1 continue if local_checksum != remote_checksum: # Remote is newer or different, update local print(colored(f"Updating entry {entry_num} from Nostr...", "yellow")) if self.entry_manager.save_entry(entry_num, encrypted_content_bytes): updated_count += 1 # Optional: process updated entry immediately? # self.process_entry(entry) else: error_count += 1 # Failed local save else: # Checksums match, no update needed logger.debug(f"Entry {entry_num} is already up-to-date.") else: # Entry exists on Nostr but not locally, save it print(colored(f"Downloading new entry {entry_num} from Nostr...", "green")) if self.entry_manager.save_entry(entry_num, encrypted_content_bytes): new_count += 1 # Optional: process new entry immediately? # self.process_entry(entry) else: error_count += 1 # Failed local save processed_count +=1 except (base64.binascii.Error, json.JSONDecodeError) as decode_err: logger.error(f"Failed to decode/decrypt Nostr event content (ID: {event.id}): {decode_err}") error_count += 1 except InvalidToken: # From decryption logger.error(f"Decryption failed for Nostr event content (ID: {event.id}). Invalid key or corrupt data?") error_count += 1 except Exception as proc_err: logger.error(f"Unexpected error processing Nostr event (ID: {event.id}): {proc_err}", exc_info=True) error_count += 1 # Update last sync time to the timestamp of the newest processed event # Add a small buffer (1 sec) to avoid missing events published exactly at sync time? if newest_event_time > last_sync_time: self.state_manager.set_last_nostr_sync_time(newest_event_time + 1) print(colored(f"Synchronization complete. New: {new_count}, Updated: {updated_count}, Deleted: {deleted_count}, Errors: {error_count}", "blue")) except Exception as e: logger.error(f"Failed to synchronize with Nostr: {e}", exc_info=True) print(colored(f"Error: Failed to synchronize with Nostr: {e}", 'red')) def migrate_data_if_needed(self): """Checks for the old index file and performs migration if found.""" if not self.fingerprint_dir: return # Should not happen if called correctly old_index_path = self.fingerprint_dir / OLD_INDEX_FILENAME if not old_index_path.exists(): logger.info("Old index file not found. Migration not required.") return print(colored(f"Old index file found for profile {self.current_fingerprint}. Migrating to new format...", "yellow")) # Backup the old index file before migration try: timestamp = int(time.time()) backup_old_index_path = self.backups_dir / f"{OLD_INDEX_FILENAME}.backup_{timestamp}" shutil.copy2(old_index_path, backup_old_index_path) logger.info(f"Backed up old index file to {backup_old_index_path}") except Exception as backup_err: logger.error(f"Failed to backup old index file before migration: {backup_err}", exc_info=True) print(colored("Error: Could not back up old data file. Migration aborted.", "red")) return try: # Load old data (uses EncryptionManager correctly) old_data = self.encryption_manager.load_json_data(old_index_path.relative_to(self.fingerprint_dir)) old_passwords = old_data.get('passwords', {}) if not old_passwords: print(colored("Old index file is empty or invalid. No entries to migrate.", "yellow")) # Optionally delete the empty/invalid old file? # old_index_path.unlink() return migrated_count = 0 error_count = 0 print(colored(f"Found {len(old_passwords)} entries in old format. Starting migration...", "cyan")) # Iterate through old entries and use add_entry logic # Note: old index was string, new entry_num is int for old_idx_str, old_entry_data in old_passwords.items(): try: old_idx = int(old_idx_str) # Map old fields to new 'generated_password' kind structure new_entry_data = { "title": old_entry_data.get('website', f"Migrated Entry {old_idx}"), "username": old_entry_data.get('username', ''), "email": "", # Old format didn't have email "url": old_entry_data.get('url', ''), "length": old_entry_data.get('length'), "bip85_index": old_idx # Use the old index as the bip85_index # Blacklisted status? Decide how to handle. Maybe add to notes? } # Validate required fields for generated_password if new_entry_data["length"] is None: logger.warning(f"Skipping migration for old index {old_idx}: Missing 'length'. Data: {old_entry_data}") error_count += 1 continue # Use the add_entry method which handles saving and posting to nostr result_entry_num = self.add_entry(kind="generated_password", entry_data=new_entry_data) if result_entry_num is not None: migrated_count += 1 print(f" Migrated old index {old_idx} -> new entry {result_entry_num}") else: error_count += 1 print(colored(f" Failed to migrate old index {old_idx}", "red")) # Should we stop migration on first error? Or continue? Let's continue. except ValueError: logger.warning(f"Skipping migration for invalid old index key: {old_idx_str}") error_count += 1 continue except Exception as migrate_entry_err: logger.error(f"Error migrating old index {old_idx_str}: {migrate_entry_err}", exc_info=True) error_count += 1 print(colored(f" Error migrating old index {old_idx_str}", "red")) print(colored(f"Migration finished. Migrated: {migrated_count}, Errors: {error_count}", "blue")) if error_count == 0: # Optionally delete the old index file after successful migration if confirm_action("Migration successful. Delete the old index file? (Y/N): "): try: with lock_file(old_index_path, fcntl.LOCK_EX): old_index_path.unlink() print(colored("Old index file deleted.", "green")) except Exception as del_err: logger.error(f"Failed to delete old index file {old_index_path}: {del_err}", exc_info=True) print(colored("Error: Failed to delete old index file.", "red")) else: print(colored("Migration completed with errors. Please review logs.", "yellow")) print(colored("The old index file has NOT been deleted.", "yellow")) except Exception as e: logger.error(f"Critical error during data migration: {e}", exc_info=True) print(colored(f"Error: Failed to migrate data: {e}. Old data remains.", 'red')) # --- Utility Methods (Password Hashing, Seed Validation, etc.) --- def validate_bip85_seed(self, seed: str) -> bool: """Validates the provided BIP-39 seed phrase (12 words).""" try: words = seed.split() if len(words) == 12: # Basic check # Add bip_utils validation? Bip39MnemonicValidator(seed).IsValid() - needs wordlist return True return False except Exception: return False def generate_bip85_seed(self) -> str: """Generates a new 12-word BIP-39 seed phrase.""" try: # Generate entropy suitable for a 12-word mnemonic (128 bits / 16 bytes) entropy = os.urandom(16) mnemonic = Bip39MnemonicGenerator(Bip39Languages.ENGLISH).FromEntropy(entropy) return mnemonic.ToStr() except Exception as e: logger.error(f"Failed to generate BIP-39 seed: {e}", exc_info=True) print(colored(f"Error: Failed to generate seed: {e}", 'red')) sys.exit(1) def verify_password(self, password: str) -> bool: """Verifies provided password against the stored hash for the current fingerprint.""" if not self.fingerprint_dir: logger.error("Cannot verify password, fingerprint directory not set.") return False hashed_password_file = self.fingerprint_dir / HASHED_PASSWORD_FILENAME if not hashed_password_file.exists(): logger.error(f"Hashed password file not found: {hashed_password_file}") print(colored("Error: Password hash file missing for this profile.", 'red')) return False try: with lock_file(hashed_password_file, fcntl.LOCK_SH): with open(hashed_password_file, 'rb') as f: stored_hash = f.read() # Normalize entered password before checking normalized_password = unicodedata.normalize('NFKD', password).strip() is_correct = bcrypt.checkpw(normalized_password.encode('utf-8'), stored_hash) if is_correct: logger.debug("Password verification successful.") else: logger.warning("Password verification failed.") return is_correct except ValueError as e: # Handle potential bcrypt errors like "invalid salt" logger.error(f"Error during password check (likely invalid hash file): {e}") print(colored("Error: Problem verifying password - hash file might be corrupt.", 'red')) return False except Exception as e: logger.error(f"Error verifying password: {e}", exc_info=True) print(colored(f"Error: Failed to verify password: {e}", 'red')) return False def _store_hashed_password(self, password: str, fingerprint_dir: Path) -> bool: """Hashes and stores password for a specific fingerprint directory.""" hashed_password_file = fingerprint_dir / HASHED_PASSWORD_FILENAME try: # Normalize password before hashing normalized_password = unicodedata.normalize('NFKD', password).strip() hashed = bcrypt.hashpw(normalized_password.encode('utf-8'), bcrypt.gensalt()) with lock_file(hashed_password_file, fcntl.LOCK_EX): with open(hashed_password_file, 'wb') as f: f.write(hashed) os.chmod(hashed_password_file, 0o600) logger.info(f"Password hash stored for profile {fingerprint_dir.name}.") return True except Exception as e: logger.error(f"Failed to store hashed password for {fingerprint_dir.name}: {e}", exc_info=True) print(colored(f"Error: Failed to store password hash: {e}", 'red')) return False # --- CLI Handler Methods (Adapting old ones) --- def handle_add_entry_cli(self) -> None: """Handles the CLI interaction for adding a new entry.""" print(colored("\n--- Add New Entry ---", "yellow")) available_kinds = get_all_kinds() print("Available entry types:") for i, kind_name in enumerate(available_kinds): details = get_kind_details(kind_name) print(f" {i+1}. {kind_name} ({details['description']})") while True: try: choice_str = input("Select entry type number: ").strip() choice = int(choice_str) - 1 if 0 <= choice < len(available_kinds): selected_kind = available_kinds[choice] break else: print(colored("Invalid selection.", "red")) except ValueError: print(colored("Invalid input. Please enter a number.", "red")) print(colored(f"\nAdding new '{selected_kind}' entry...", "cyan")) entry_data = {} required_fields = get_required_fields(selected_kind) # Special handling for generated_password length/index (not prompted here) if selected_kind == "generated_password": try: entry_data["title"] = input("Enter Title/Website Name: ").strip() if not entry_data["title"]: print(colored("Title cannot be empty.", "red")) return entry_data["username"] = input("Enter Username (optional): ").strip() entry_data["email"] = input("Enter Email (optional): ").strip() entry_data["url"] = input("Enter URL (optional): ").strip() length_input = input(f'Enter desired password length (default {DEFAULT_PASSWORD_LENGTH}): ').strip() length = DEFAULT_PASSWORD_LENGTH if length_input: length = int(length_input) # Add validation if not (MIN_PASSWORD_LENGTH <= length <= MAX_PASSWORD_LENGTH): print(colored(f"Error: Password length must be between {MIN_PASSWORD_LENGTH} and {MAX_PASSWORD_LENGTH}.", 'red')) return entry_data["length"] = length # bip85_index is added automatically by add_entry method except ValueError: print(colored("Invalid length input.", "red")) return # Generic prompt for other kinds else: for field in required_fields: # Skip password field for stored_password - handle specially if selected_kind == "stored_password" and field == "password": entry_data[field] = getpass.getpass(f"Enter Password for '{entry_data.get('title', 'entry')}': ").strip() # Add confirmation? if not entry_data[field]: print(colored("Password cannot be empty.", "red")) return continue # Skip content field for note - handle specially? Maybe allow multiline? if selected_kind == "note" and field == "content": print(f"Enter {field.capitalize()} (end with 'EOF' on a new line):") lines = [] while True: line = input() if line == "EOF": break lines.append(line) entry_data[field] = "\n".join(lines) continue # Standard prompt prompt_text = f"Enter {field.replace('_', ' ').capitalize()}" if field == "tags" and selected_kind == "note": prompt_text += " (comma-separated)" user_input = input(f"{prompt_text}: ").strip() if field == "tags" and selected_kind == "note": entry_data[field] = [tag.strip() for tag in user_input.split(',') if tag.strip()] else: # Add validation based on field type if needed later entry_data[field] = user_input # Add the entry using the main logic self.add_entry(selected_kind, entry_data) def handle_retrieve_entry_cli(self) -> None: """Handles the CLI interaction for retrieving/displaying an entry.""" print(colored("\n--- Retrieve Entry ---", "yellow")) all_entries = self.list_all_entries() if not all_entries: print(colored("No entries found locally.", "yellow")) return print("Available Entries:") # Sort by entry_num for consistent display for entry in sorted(all_entries, key=lambda x: x.get("entry_num", -1)): num = entry.get("entry_num", "N/A") kind = entry.get("kind", "Unknown") title = entry.get("data", {}).get("title", "No Title") timestamp = entry.get("timestamp", "No Date") print(f" {Style.BRIGHT}{num}{Style.RESET_ALL}. {title} ({kind}) - Last Updated: {timestamp}") while True: try: choice_str = input("Enter entry number to display: ").strip() entry_num_to_display = int(choice_str) # Find the selected entry selected_entry = next((e for e in all_entries if e.get("entry_num") == entry_num_to_display), None) if selected_entry: print(colored(f"\nDisplaying Entry {entry_num_to_display}:", "blue")) self.process_entry(selected_entry) # Use the handler logic break else: print(colored("Invalid entry number.", "red")) except ValueError: print(colored("Invalid input. Please enter a number.", "red")) except KeyboardInterrupt: print(colored("\nCancelled.", "yellow")) break def handle_modify_entry_cli(self) -> None: """Handles the CLI interaction for modifying an entry.""" print(colored("\n--- Modify Entry ---", "yellow")) all_entries = self.list_all_entries() if not all_entries: print(colored("No entries found locally to modify.", "yellow")) return print("Available Entries:") for entry in sorted(all_entries, key=lambda x: x.get("entry_num", -1)): num = entry.get("entry_num", "N/A") kind = entry.get("kind", "Unknown") title = entry.get("data", {}).get("title", "No Title") print(f" {Style.BRIGHT}{num}{Style.RESET_ALL}. {title} ({kind})") while True: try: choice_str = input("Enter entry number to modify: ").strip() entry_num_to_modify = int(choice_str) existing_entry = next((e for e in all_entries if e.get("entry_num") == entry_num_to_modify), None) if existing_entry: break else: print(colored("Invalid entry number.", "red")) except ValueError: print(colored("Invalid input. Please enter a number.", "red")) except KeyboardInterrupt: print(colored("\nCancelled.", "yellow")) return # Exit modify handler kind = existing_entry.get("kind") current_data = existing_entry.get("data", {}) print(colored(f"\nModifying Entry {entry_num_to_modify} (Kind: {kind}, Title: {current_data.get('title', 'N/A')})", "cyan")) # Decrypt sensitive fields for display/editing if needed display_data = current_data.copy() # Work on a copy for display/prompting if kind == "stored_password" and "password" in display_data: try: pwd_b64 = display_data["password"] pwd_bytes = self.encryption_manager.decrypt_data(base64.b64decode(pwd_b64)) display_data["password"] = pwd_bytes.decode('utf-8') except Exception: display_data["password"] = "*** Error Decrypting ***" elif kind == "note" and "content" in display_data: try: content_b64 = display_data["content"] content_bytes = self.encryption_manager.decrypt_data(base64.b64decode(content_b64)) display_data["content"] = content_bytes.decode('utf-8') except Exception: display_data["content"] = "*** Error Decrypting ***" updated_data_fields = {} fields_to_modify = get_required_fields(kind) # Cannot modify bip85_index or length for generated_password if kind == "generated_password": fields_to_modify = [f for f in fields_to_modify if f not in ["length", "bip85_index"]] for field in fields_to_modify: current_value = display_data.get(field, "") # Handle special display/prompt for password/content if field == "password" and kind == "stored_password": print(f"Current Password: {'*' * len(current_value) if current_value else 'Not Set'}") new_value = getpass.getpass(f"Enter new Password (leave blank to keep current): ").strip() elif field == "content" and kind == "note": print(f"Current Content:\n---\n{current_value}\n---") print(f"Enter new {field.capitalize()} (leave blank to keep, end with 'EOF' on a new line):") lines = [] while True: line = input() if line == "EOF": break lines.append(line) new_value = "\n".join(lines) if lines else "" # Empty string if no input elif field == "tags" and kind == "note": print(f"Current Tags: {', '.join(current_value) if current_value else 'None'}") new_value = input(f"Enter new Tags (comma-separated, leave blank to keep): ").strip() else: print(f"Current {field.replace('_',' ').capitalize()}: {current_value}") new_value = input(f"Enter new {field.replace('_',' ').capitalize()} (leave blank to keep): ").strip() if new_value: # Only add to update dict if user provided input if field == "tags" and kind == "note": updated_data_fields[field] = [tag.strip() for tag in new_value.split(',') if tag.strip()] else: updated_data_fields[field] = new_value if not updated_data_fields: print(colored("No changes entered.", "yellow")) return # Confirm changes before applying print("\nChanges to be applied:") for field, value in updated_data_fields.items(): print(f" {field}: {value[:50] + '...' if len(value)>50 else value}") # Truncate long values if confirm_action("Proceed with these modifications? (Y/N): "): self.modify_entry(entry_num_to_modify, updated_data_fields) else: print(colored("Modification cancelled.", "yellow")) def handle_delete_entry_cli(self) -> None: """Handles the CLI interaction for deleting an entry.""" print(colored("\n--- Delete Entry ---", "yellow")) all_entries = self.list_all_entries() if not all_entries: print(colored("No entries found locally to delete.", "yellow")) return print("Available Entries:") for entry in sorted(all_entries, key=lambda x: x.get("entry_num", -1)): num = entry.get("entry_num", "N/A") kind = entry.get("kind", "Unknown") title = entry.get("data", {}).get("title", "No Title") print(f" {Style.BRIGHT}{num}{Style.RESET_ALL}. {title} ({kind})") while True: try: choice_str = input("Enter entry number to DELETE: ").strip() entry_num_to_delete = int(choice_str) # Verify entry exists before confirming existing_entry = next((e for e in all_entries if e.get("entry_num") == entry_num_to_delete), None) if existing_entry: title_to_delete = existing_entry.get("data", {}).get("title", "No Title") break else: print(colored("Invalid entry number.", "red")) except ValueError: print(colored("Invalid input. Please enter a number.", "red")) except KeyboardInterrupt: print(colored("\nCancelled.", "yellow")) return if confirm_action(colored(f"Are you SURE you want to delete entry {entry_num_to_delete} ('{title_to_delete}')?\nThis is IRREVERSIBLE locally and will post a deletion marker to Nostr. (Y/N): ", "red", attrs=["bold"])): self.delete_entry(entry_num_to_delete) else: print(colored("Deletion cancelled.", "yellow")) def handle_backup_entry_cli(self) -> None: """Handles CLI for backing up a specific entry.""" print(colored("\n--- Backup Entry ---", "yellow")) all_entries = self.list_all_entries() if not all_entries: print(colored("No entries found locally to back up.", "yellow")) return print("Available Entries:") for entry in sorted(all_entries, key=lambda x: x.get("entry_num", -1)): num = entry.get("entry_num", "N/A") kind = entry.get("kind", "Unknown") title = entry.get("data", {}).get("title", "No Title") print(f" {Style.BRIGHT}{num}{Style.RESET_ALL}. {title} ({kind})") while True: try: choice_str = input("Enter entry number to backup: ").strip() entry_num_to_backup = int(choice_str) if any(e.get("entry_num") == entry_num_to_backup for e in all_entries): self.backup_manager.create_backup_for_entry(entry_num_to_backup) break else: print(colored("Invalid entry number.", "red")) except ValueError: print(colored("Invalid input. Please enter a number.", "red")) except KeyboardInterrupt: print(colored("\nCancelled.", "yellow")) break def handle_restore_entry_cli(self) -> None: """Handles CLI for restoring an entry from backup.""" print(colored("\n--- Restore Entry from Backup ---", "yellow")) all_entries = self.list_all_entries() if not all_entries: print(colored("No entries exist. Cannot restore.", "yellow")) # Or maybe allow restoring to create? For now, require existing entry number. # If allowing restore-to-create, need to list all backups first. return print("Select entry number to restore:") for entry in sorted(all_entries, key=lambda x: x.get("entry_num", -1)): num = entry.get("entry_num", "N/A") kind = entry.get("kind", "Unknown") title = entry.get("data", {}).get("title", "No Title") print(f" {Style.BRIGHT}{num}{Style.RESET_ALL}. {title} ({kind})") entry_num_to_restore = None while entry_num_to_restore is None: try: choice_str = input("Enter entry number: ").strip() num = int(choice_str) if any(e.get("entry_num") == num for e in all_entries): entry_num_to_restore = num else: print(colored("Invalid entry number.", "red")) except ValueError: print(colored("Invalid input. Please enter a number.", "red")) except KeyboardInterrupt: print(colored("\nCancelled.", "yellow")) return # List backups for the selected entry backups = self.backup_manager.list_backups_for_entry(entry_num_to_restore) if not backups: print(colored(f"No backups found for entry {entry_num_to_restore}.", "yellow")) return print(colored(f"\nAvailable Backups for Entry {entry_num_to_restore}:", "cyan")) for i, backup_path in enumerate(backups): try: creation_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(backup_path.stat().st_mtime)) print(colored(f" {i+1}. {backup_path.name} ({creation_time})", "cyan")) except Exception: print(colored(f" {i+1}. {backup_path.name} (Error reading time)", "red")) while True: try: choice_str = input("Select backup number to restore: ").strip() choice = int(choice_str) - 1 if 0 <= choice < len(backups): selected_backup_path = backups[choice] if confirm_action(f"Restore entry {entry_num_to_restore} from {selected_backup_path.name}? This will overwrite the current entry. (Y/N): "): self.backup_manager.restore_entry_from_backup(entry_num_to_restore, selected_backup_path.name) # Ask user if they want to post the restored version to Nostr? if confirm_action("Do you want to post this restored version to Nostr (overwriting any newer version there)? (Y/N):"): restored_entry = self.entry_manager.load_entry(entry_num_to_restore) if restored_entry: kind_details = get_kind_details(restored_entry.get("kind")) if kind_details: entry_json = json.dumps(restored_entry).encode('utf-8') encrypted_entry_data = self.encryption_manager.encrypt_data(entry_json) identifier = f"{kind_details['identifier_tag']}{entry_num_to_restore}" nostr_kind_int = kind_details['nostr_kind'] self.nostr_client.publish_entry( encrypted_entry_data=encrypted_entry_data, nostr_kind=nostr_kind_int, d_tag=identifier ) print(colored("Restored entry posted to Nostr.", "green")) else: print(colored("Could not post to Nostr: Unknown kind.", "red")) else: print(colored("Could not post to Nostr: Failed to reload restored entry.", "red")) else: print(colored("Restore cancelled.", "yellow")) break # Exit loop else: print(colored("Invalid selection.", "red")) except ValueError: print(colored("Invalid input. Please enter a number.", "red")) except KeyboardInterrupt: print(colored("\nCancelled.", "yellow")) break # Exit loop def handle_verify_checksum(self) -> None: """Verifies main script checksum.""" # This remains unchanged as it checks the script file itself try: # Assuming __main__.__file__ gives the path to main.py when run script_path = os.path.abspath(sys.modules['__main__'].__file__) current_checksum = calculate_script_checksum(script_path) if verify_script_checksum(current_checksum, str(SCRIPT_CHECKSUM_FILE)): # Convert Path to str print(colored("Script checksum verification passed.", 'green')) logging.info("Script checksum verification passed.") else: print(colored("Checksum verification failed. The main script may have been modified.", 'red')) logging.error("Script checksum verification failed.") except Exception as e: logging.error(f"Error during script checksum verification: {e}", exc_info=True) print(colored(f"Error: Failed to verify script checksum: {e}", 'red')) def handle_backup_reveal_parent_seed(self) -> None: """Handles backup/reveal of the parent seed (remains largely unchanged).""" if not self.parent_seed or not self.fingerprint_dir or not self.encryption_manager: print(colored("Error: Profile not fully loaded.", "red")) return try: print(colored("\n=== Backup/Reveal Parent Seed ===", 'yellow')) print(colored("Warning: Revealing your parent seed is a highly sensitive operation.", 'red')) print(colored("Ensure you're in a secure, private environment.", 'red')) password = prompt_existing_password("Enter your master password to continue: ") if not self.verify_password(password): print(colored("Incorrect password. Operation aborted.", 'red')) return if not confirm_action("Are you absolutely SURE you want to reveal your parent seed? (Y/N): "): print(colored("Operation cancelled by user.", 'yellow')) return print(colored("\n=== Your 12-Word BIP-39 Parent Seed ===", 'green', attrs=['bold'])) print(colored(self.parent_seed, 'yellow')) print(colored("\nWRITE THIS DOWN if you haven't. Store it securely offline.", 'red')) if confirm_action("Do you want to save this seed to a separate encrypted backup file? (Y/N): "): default_name = f"seedpass_seed_{self.current_fingerprint}_backup.enc" filename = input(f"Enter filename (default: {default_name}): ").strip() or default_name # Basic filename validation (avoids path traversal) if '/' in filename or '\\' in filename or '..' in filename: print(colored("Invalid filename.", "red")) return backup_path = self.fingerprint_dir / filename # Use encrypt_and_save_file which handles locking etc. self.encryption_manager.encrypt_and_save_file(self.parent_seed.encode('utf-8'), backup_path.relative_to(self.fingerprint_dir)) print(colored(f"Encrypted seed backup saved to '{backup_path}'. Keep this file safe!", 'green')) except Exception as e: logger.error(f"Error during parent seed backup/reveal: {e}", exc_info=True) print(colored(f"Error: Failed during seed backup/reveal: {e}", 'red')) # --- Fingerprint Management Handlers (No change needed here) --- def handle_switch_fingerprint(self) -> bool: """Handles switching active profile.""" print(colored("\n--- Switch SeedPass Profile ---", "yellow")) # Get current selection before listing current_fp = self.current_fingerprint fingerprints = self.fingerprint_manager.list_fingerprints() if not fingerprints or len(fingerprints) <= 1: print(colored("No other profiles available to switch to.", "yellow")) return False print("Available Profiles:") available_to_switch = [] display_idx = 1 for fp in fingerprints: if fp != current_fp: print(colored(f"{display_idx}. {fp}", 'cyan')) available_to_switch.append(fp) display_idx += 1 else: print(colored(f" {fp} (Current)", "grey")) if not available_to_switch: print(colored("No other profiles available to switch to.", "yellow")) return False while True: choice_str = input("Select profile number to switch to (or 'c' to cancel): ").strip().lower() if choice_str == 'c': print(colored("Switch cancelled.", "yellow")) return False if not choice_str.isdigit(): print(colored("Invalid input.", "red")) continue choice = int(choice_str) if 1 <= choice <= len(available_to_switch): selected_fingerprint = available_to_switch[choice - 1] # select_fingerprint handles password prompt and manager re-init return self.select_fingerprint(selected_fingerprint) else: print(colored("Invalid selection.", 'red')) # Other fingerprint handlers (add_new_fingerprint_cli, remove_fingerprint_cli, list_fingerprints_cli) # would call the underlying FingerprintManager methods, similar to the existing structure in main.py, # but should be methods within PasswordManager for better encapsulation. def handle_add_new_fingerprint_cli(self): return self.add_new_fingerprint() # Calls the internal method def handle_remove_fingerprint_cli(self): print(colored("\n--- Remove SeedPass Profile ---", "yellow", attrs=['bold'])) print(colored("WARNING: This will delete the profile's fingerprint, encrypted seed,", attrs=['bold']), colored("all associated entries, and backups locally.", "red", attrs=['bold'])) print(colored("This action is IRREVERSIBLE.", "red", attrs=['bold'])) fingerprints = self.fingerprint_manager.list_fingerprints() if not fingerprints: print(colored("No profiles available to remove.", 'yellow')) return print("Available Profiles:") current_fp = self.current_fingerprint removable_fps = [] display_idx = 1 for fp in fingerprints: is_current = "(Current)" if fp == current_fp else "" print(colored(f"{display_idx}. {fp} {is_current}", 'cyan' if fp != current_fp else 'grey')) removable_fps.append(fp) display_idx += 1 while True: choice_str = input("Enter profile number to remove (or 'c' to cancel): ").strip().lower() if choice_str == 'c': print(colored("Removal cancelled.", "yellow")) return if not choice_str.isdigit(): print(colored("Invalid input.", "red")) continue choice = int(choice_str) if 1 <= choice <= len(removable_fps): selected_fingerprint = removable_fps[choice - 1] if selected_fingerprint == self.current_fingerprint: print(colored("Cannot remove the currently active profile. Switch profiles first.", "red")) return if confirm_action(colored(f"REALLY remove profile '{selected_fingerprint}' and all its data? (Y/N): ", "red")): if self.fingerprint_manager.remove_fingerprint(selected_fingerprint): print(colored(f"Profile {selected_fingerprint} removed successfully.", 'green')) else: print(colored("Failed to remove profile.", 'red')) else: print(colored("Removal cancelled.", 'yellow')) return # Exit after attempt or cancel else: print(colored("Invalid selection.", 'red')) def handle_list_fingerprints_cli(self): print(colored("\n--- SeedPass Profiles (Fingerprints) ---", "yellow")) fingerprints = self.fingerprint_manager.list_fingerprints() if not fingerprints: print(colored("No profiles configured.", 'yellow')) return current_fp = self.current_fingerprint for fp in fingerprints: is_current = colored("(Current)", "green") if fp == current_fp else "" print(colored(f"- {fp} {is_current}", 'cyan')) # --- Old/Removed Methods --- # Remove handle_generate_password, handle_retrieve_password, handle_modify_entry (old index versions) # Remove get_encrypted_data, decrypt_and_save_index_from_nostr (old index versions) # Remove backup_database, restore_database (old index versions)
Phase 4: Refactor NostrClient
nostr/client.py
(Refactored):# nostr/client.py import os import sys import logging import traceback import json import time import base64 import hashlib import asyncio import concurrent.futures from typing import List, Optional, Callable, Dict, Any from pathlib import Path from monstr.client.client import ClientPool, Client from monstr.encrypt import Keys # Keep Keys # Remove NIP4Encrypt unless needed for direct DMs (not needed for current backup plan) # from monstr.encrypt import NIP4Encrypt from monstr.event.event import Event from monstr.event.event_handlers import StoreEventHandler # Useful for collecting events from monstr.util import util_funcs # For relay set conversion import threading import uuid import fcntl # Import necessary components from SeedPass structure from password_manager.encryption import EncryptionManager # Used in init from .key_manager import KeyManager # EventHandler is now different - handles processing entries # from .event_handler import EventHandler # Remove old event handler import from constants import APP_DIR # Keep if needed, but paths managed by PasswordManager now from utils.file_lock import lock_file # Keep if needed logger = logging.getLogger(__name__) # Set the logging level specific to this module if desired # logger.setLevel(logging.DEBUG) # Example: More verbose Nostr logs DEFAULT_RELAYS = [ "wss://relay.snort.social", "wss://nostr.oxtr.dev", "wss://relay.primal.net", "wss://relay.damus.io", "wss://nostr.wine" ] # Define the Nostr Kind for SeedPass entries SEEDPASS_NOSTR_KIND = 31111 # Replaceable event kind for entries class NostrClient: """ Handles interactions with the Nostr network for SeedPass entries. Uses replaceable events (Kind 31111) with 'd' tags for synchronization. """ def __init__(self, encryption_manager: EncryptionManager, fingerprint: str, relays: Optional[List[str]] = None): """ Initializes the NostrClient. :param encryption_manager: Instance for decrypting the parent seed. :param fingerprint: The active fingerprint for deriving Nostr keys. :param relays: Optional list of relay URLs. """ self.encryption_manager = encryption_manager self.fingerprint = fingerprint # Derive keys *immediately* upon init try: self.key_manager = KeyManager( self.encryption_manager.decrypt_parent_seed(), # Decrypt seed here self.fingerprint ) except Exception as key_err: logger.critical(f"Failed to derive Nostr keys for fingerprint {fingerprint}: {key_err}", exc_info=True) print(colored(f"Error: Could not initialize Nostr identity for profile {fingerprint}.", "red")) raise RuntimeError("Nostr key generation failed") from key_err # Use default or provided relays self.relays = relays if relays else DEFAULT_RELAYS # Convert relay list to set for ClientPool if needed by monstr version relay_set = util_funcs.str_filter_to_set(self.relays) if not relay_set: logger.warning("No valid relays configured for NostrClient.") relay_set = {"wss://relay.damus.io"} # Fallback? Or raise error? self.client_pool = ClientPool(list(relay_set)) # ClientPool might expect list self.subscriptions: Dict[str, Any] = {} # Track subscriptions # For async operations from sync methods self.loop = asyncio.new_event_loop() self.loop_thread = threading.Thread(target=self._run_event_loop, daemon=True) self.loop_thread.start() # Wait for initial connection self.wait_for_connection() logger.info(f"NostrClient initialized for fingerprint {fingerprint} (PubKey: {self.key_manager.get_public_key_hex()[:10]}...).") # Shutdown flag self.is_shutting_down = False def _run_event_loop(self): """Runs the asyncio event loop in a separate thread.""" asyncio.set_event_loop(self.loop) try: self.loop.run_forever() finally: # Clean up loop resources before thread exits tasks = asyncio.all_tasks(loop=self.loop) for task in tasks: task.cancel() # Run loop briefly to allow tasks to finish cancelling self.loop.run_until_complete(asyncio.sleep(0.1)) self.loop.close() logger.info("NostrClient event loop closed.") def wait_for_connection(self, timeout=10): """Waits for the client pool to connect to at least one relay.""" start_time = time.time() while not self.client_pool.connected: if time.time() - start_time > timeout: logger.warning(f"NostrClient connection timeout after {timeout}s.") print(colored("Warning: Could not connect to Nostr relays within timeout.", "yellow")) # Decide if this is fatal or not. Maybe allow offline operation? # For now, let it proceed but log warning. break time.sleep(0.2) if self.client_pool.connected: logger.debug("NostrClient connected to relays.") async def publish_entry_async(self, encrypted_entry_data: bytes, nostr_kind: int, d_tag: str, is_deletion: bool = False): """ Asynchronously publishes an entry as a replaceable event. :param encrypted_entry_data: The fully encrypted entry JSON as bytes. :param nostr_kind: The Nostr event kind (e.g., 31111). :param d_tag: The unique identifier for the 'd' tag (e.g., "seedpass_gp_123"). :param is_deletion: If True, content might be empty/special marker (though we encrypt empty dict currently). """ try: content_b64 = base64.b64encode(encrypted_entry_data).decode('utf-8') # Create replaceable event event = Event( kind=nostr_kind, content=content_b64, pub_key=self.key_manager.get_public_key_hex(), tags=[ ["d", d_tag], ["t", "seedpass"] # General tag for SeedPass entries # Add ["k", str(nostr_kind)] ? Maybe redundant. ] ) # created_at will be set automatically by monstr on sign if not present event.sign(self.key_manager.get_private_key_hex()) logger.debug(f"Prepared Nostr Event (Kind: {nostr_kind}, d: {d_tag}, ID: {event.id})") # Publish using the client pool self.client_pool.publish(event) logger.info(f"Published entry {'(Deletion Marker)' if is_deletion else ''} to Nostr (Kind: {nostr_kind}, d: {d_tag}, EventID: {event.id})") except Exception as e: logger.error(f"Failed to publish Nostr event (Kind: {nostr_kind}, d: {d_tag}): {e}", exc_info=True) # Should this raise or just log? Logging for now. print(colored(f"Error: Failed to post entry update to Nostr: {e}", "red")) def publish_entry(self, encrypted_entry_data: bytes, nostr_kind: int, d_tag: str, is_deletion: bool = False): """Synchronous wrapper to publish an entry.""" if not self.loop.is_running(): logger.error("Cannot publish entry: Event loop is not running.") return future = asyncio.run_coroutine_threadsafe( self.publish_entry_async(encrypted_entry_data, nostr_kind, d_tag, is_deletion), self.loop ) try: future.result(timeout=10) # Wait for publish to be sent except concurrent.futures.TimeoutError: logger.warning(f"Timeout waiting for Nostr publish confirmation (Kind: {nostr_kind}, d: {d_tag}). Event might still be sent.") print(colored("Warning: Timeout posting to Nostr. Update might be delayed.", "yellow")) except Exception as e: logger.error(f"Error submitting publish task to event loop: {e}", exc_info=True) async def fetch_all_entries_async(self, since: Optional[int] = None, limit: int = 500) -> Optional[List[Event]]: """ Asynchronously fetches all SeedPass entries (Kind 31111) from Nostr. :param since: Optional Unix timestamp to fetch events newer than this. :param limit: Max number of events per relay query (relays might override). :return: A list of Event objects, or None if a critical error occurs. """ if not self.client_pool.connected: logger.warning("Cannot fetch entries: Nostr client not connected.") # Return empty list instead of None to indicate no *new* entries found due to connection issue return [] results = [] err_flag = asyncio.Event() # To signal errors from handler # Using StoreEventHandler to collect events store = StoreEventHandler() def on_error_handler(the_client: Client, sub_id: str, data: Any): logger.error(f"Error received on subscription {sub_id} from {the_client.url}: {data}") err_flag.set() # Signal that an error occurred # Filter for the specific replaceable kind authored by the user filters = [{ "authors": [self.key_manager.get_public_key_hex()], "kinds": [SEEDPASS_NOSTR_KIND], "#t": ["seedpass"], # Filter by general tag "limit": limit }] if since is not None and isinstance(since, int) and since >= 0: filters[0]["since"] = since # Add time filter if provided sub_id = None try: sub_id = f"seedpass_fetch_{uuid.uuid4()}" logger.debug(f"Subscribing to fetch entries with filter: {filters}, sub_id: {sub_id}") # Subscribe using the store handler and error handler self.client_pool.subscribe( handlers=[store, on_error_handler], # Pass list of handlers filters=filters, sub_id=sub_id, eose_func=lambda client, sub_id, events: logger.debug(f"Received EOSE for {sub_id} from {client.url}") ) self.subscriptions[sub_id] = filters # Store subscription info # Wait for EOSE from relays or a timeout/error # Timeout needs to be long enough for relays to respond fetch_timeout = 15.0 try: await asyncio.wait_for( self.client_pool.eose_matching(sub_id=sub_id), # Wait for EOSE events timeout=fetch_timeout ) logger.info(f"Received EOSE from relays for fetch subscription {sub_id}.") except asyncio.TimeoutError: logger.warning(f"Timeout waiting for EOSE on fetch subscription {sub_id} after {fetch_timeout}s.") # Continue with whatever events were received # Check if any error occurred during subscription if err_flag.is_set(): logger.error(f"Error occurred during Nostr fetch subscription {sub_id}.") # Depending on severity, maybe return None or partial results? # For now, return None to indicate failure. return None # Unsubscribe after fetching self.client_pool.unsubscribe(sub_id) if sub_id in self.subscriptions: del self.subscriptions[sub_id] logger.debug(f"Unsubscribed from fetch subscription {sub_id}.") # Get collected events from the store # Need to filter results by sub_id if store is reused, or use a fresh store each time. # Assuming store collects globally, filter results by pubkey/kind again for safety. # Actually, StoreEventHandler stores by event ID. Need a way to get all events received for the sub. # Let's refine this - maybe collect in a simple list within this function? # --- Alternative Collection --- collected_events = [] eose_received = asyncio.Event() def event_collector(the_client: Client, r_sub_id: str, evt: Event): if r_sub_id == sub_id: # Basic validation if evt.pub_key == self.key_manager.get_public_key_hex() and evt.kind == SEEDPASS_NOSTR_KIND: collected_events.append(evt) else: logger.warning(f"Received unexpected event during fetch: {evt.id} from {the_client.url}") def eose_marker(the_client: Client, r_sub_id: str, evts: List): if r_sub_id == sub_id: logger.debug(f"Received EOSE for {sub_id} from {the_client.url}") # We need to know when *enough* relays have sent EOSE. # This simple approach just sets a flag. `client_pool.eose_matching` is better. # For simplicity here, let's just use a timeout after subscribing. # --- Revert to simpler timeout-based fetch --- # This is less reliable than waiting for EOSE but simpler to implement without deeper monstr changes. collected_events_dict: Dict[str, Event] = {} # Use dict to store latest per d_tag def event_collector_simple(the_client: Client, r_sub_id: str, evt: Event): if r_sub_id == sub_id: if evt.pub_key == self.key_manager.get_public_key_hex() and evt.kind == SEEDPASS_NOSTR_KIND: d_tag_val = evt.get_tags("d") if d_tag_val: # Ensure 'd' tag exists d_tag = d_tag_val[0] # Get first 'd' tag # Store only the latest event for each 'd' tag if d_tag not in collected_events_dict or evt.created_at > collected_events_dict[d_tag].created_at: collected_events_dict[d_tag] = evt else: logger.warning(f"Received unexpected event during fetch: {evt.id} from {the_client.url}") sub_id = f"seedpass_fetch_{uuid.uuid4()}" self.client_pool.subscribe( handlers=event_collector_simple, filters=filters, sub_id=sub_id ) self.subscriptions[sub_id] = filters logger.debug(f"Subscribed to fetch entries with filter: {filters}, sub_id: {sub_id}") # Wait for a fixed time to allow events to arrive await asyncio.sleep(5.0) # Adjust this wait time as needed self.client_pool.unsubscribe(sub_id) if sub_id in self.subscriptions: del self.subscriptions[sub_id] logger.debug(f"Unsubscribed from fetch subscription {sub_id}. Collected {len(collected_events_dict)} unique entries.") return list(collected_events_dict.values()) # Return the latest event for each d_tag except Exception as e: logger.error(f"Failed during Nostr fetch: {e}", exc_info=True) # Clean up subscription if needed if sub_id and sub_id in self.subscriptions: try: self.client_pool.unsubscribe(sub_id) del self.subscriptions[sub_id] except Exception as unsub_err: logger.error(f"Error unsubscribing during fetch error handling: {unsub_err}") return None # Indicate failure def fetch_all_entries_sync(self, since: Optional[int] = None, limit: int = 500) -> Optional[List[Event]]: """Synchronous wrapper to fetch all entries.""" if not self.loop.is_running(): logger.error("Cannot fetch entries: Event loop is not running.") return None future = asyncio.run_coroutine_threadsafe( self.fetch_all_entries_async(since=since, limit=limit), self.loop ) try: return future.result(timeout=20) # Longer timeout for fetching except concurrent.futures.TimeoutError: logger.error("Timeout occurred while fetching entries from Nostr.") print(colored("Error: Timeout occurred while fetching entries from Nostr.", "red")) return None except Exception as e: logger.error(f"Error submitting fetch task to event loop: {e}", exc_info=True) return None def close_client_pool(self): """Gracefully shuts down the Nostr client pool and event loop.""" if self.is_shutting_down: logger.debug("Shutdown already in progress.") return self.is_shutting_down = True logger.info("Initiating NostrClient shutdown...") # Schedule the async close in the running loop if self.loop.is_running(): future = asyncio.run_coroutine_threadsafe(self._close_pool_async(), self.loop) try: future.result(timeout=10) # Wait for async close to finish except (concurrent.futures.TimeoutError, Exception) as e: logger.warning(f"Error or timeout during async pool close: {e}. Proceeding with loop stop.") # Stop the loop from the thread that owns it if self.loop.is_running(): self.loop.call_soon_threadsafe(self.loop.stop) else: logger.warning("NostrClient event loop was not running during shutdown.") # Wait for the thread to finish if self.loop_thread.is_alive(): self.loop_thread.join(timeout=5) if self.loop_thread.is_alive(): logger.warning("NostrClient event loop thread did not exit cleanly.") logger.info("NostrClient shutdown complete.") self.is_shutting_down = False async def _close_pool_async(self): """Async part of the shutdown sequence.""" try: logger.debug("Closing Nostr subscriptions...") sub_ids = list(self.subscriptions.keys()) for sub_id in sub_ids: try: self.client_pool.unsubscribe(sub_id) if sub_id in self.subscriptions: del self.subscriptions[sub_id] logger.debug(f"Unsubscribed from {sub_id}") except Exception as e: logger.warning(f"Error unsubscribing from {sub_id}: {e}") logger.debug("Closing Nostr client connections...") # Use await self.client_pool.disconnect() if available and preferred by monstr version # Otherwise, manually close underlying clients if accessible if hasattr(self.client_pool, '_clients'): # Accessing protected member, check monstr docs tasks = [self._safe_close_connection(c) for c in self.client_pool._clients.values()] await asyncio.gather(*tasks, return_exceptions=True) elif hasattr(self.client_pool, 'clients'): # Public attribute? tasks = [self._safe_close_connection(c) for c in self.client_pool.clients] await asyncio.gather(*tasks, return_exceptions=True) else: logger.warning("Cannot access client pool clients for explicit closure.") logger.debug("Async pool closure steps finished.") except Exception as e: logger.error(f"Error during async Nostr pool closure: {e}", exc_info=True) async def _safe_close_connection(self, client: Client): """Safely attempts to close a single client connection.""" # Older monstr versions might not have close_connection or disconnect close_method = getattr(client, 'disconnect', getattr(client, 'close_connection', None)) if close_method and asyncio.iscoroutinefunction(close_method): try: await asyncio.wait_for(close_method(), timeout=3) logger.debug(f"Closed connection to {client.url}") except asyncio.TimeoutError: logger.warning(f"Timeout closing connection to {client.url}") except Exception as e: logger.warning(f"Error closing connection to {client.url}: {e}") elif close_method: # Non-async close? Less likely for websockets. try: close_method() logger.debug(f"Closed connection to {client.url} (sync)") except Exception as e: logger.warning(f"Error closing connection to {client.url} (sync): {e}") else: logger.warning(f"No suitable close method found for client connected to {client.url}") # --- Remove Old Methods --- # remove publish_event, subscribe, retrieve_json_from_nostr_async, retrieve_json_from_nostr # remove do_post_async, subscribe_feed_async, publish_and_subscribe_async, publish_and_subscribe # remove decrypt_and_save_index_from_nostr, save_json_data, update_checksum, decrypt_data_from_file # remove publish_json_to_nostr, retrieve_json_from_nostr_sync, decrypt_and_save_index_from_nostr_public
Phase 5: Refactor EventHandler
-
nostr/event_handler.py
(Simplified/Removed): The event handling logic is now tightly coupled with synchronization inPasswordManager
. A separateEventHandler
class primarily for logging received events (as it was before) might still be useful for debugging, but it won't be directly involved in processing SeedPass entries anymore. Theevent_collector_simple
function insideNostrClient.fetch_all_entries_async
now handles the basic reception. The actual processing happens inPasswordManager.synchronize_with_nostr
. Decision: We can remove the oldEventHandler
class or keep it purely for debug logging if needed, but it's not essential for the new flow. Let's comment it out for now.# nostr/event_handler.py # import time # import logging # import traceback # from monstr.event.event import Event # logger = logging.getLogger(__name__) # class EventHandler: # """ # Handles incoming Nostr events (Primarily for Debug Logging now). # Actual entry processing is done within PasswordManager.synchronize_with_nostr. # """ # def __init__(self): # pass # No password manager reference needed now # def handle_new_event(self, the_client, sub_id, evt: Event): # """Processes incoming events by logging their details.""" # # This might be attached to a general subscription for debugging # try: # created_at_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(evt.created_at)) # logger.debug( # f"[Debug Event Handler] Received Event:" # f" SubID: {sub_id}" # f" | Relay: {the_client.url}" # f" | Kind: {evt.kind}" # f" | ID: {evt.id}" # f" | Created: {created_at_str}" # f" | Content Preview: {evt.content[:50]}..." # ) # except Exception as e: # logger.error(f"Error in debug event handler: {e}", exc_info=True)
Phase 6: Refactor main.py
main.py
(Major Changes to Menu and Handlers):# main.py import os import sys import logging import signal from colorama import init as colorama_init, Style from termcolor import colored import traceback # Import PasswordManager - NostrClient is now initialized within it from password_manager.manager import PasswordManager # from nostr.client import NostrClient # No longer needed here colorama_init(autoreset=True) # Autoreset colors # --- Logging Configuration (Keep as is) --- def configure_logging(): # ... (keep existing logging setup) ... pass # Keep existing code # --- Confirmation Helper (Keep as is) --- def confirm_action(prompt: str) -> bool: # ... (keep existing confirmation logic) ... pass # Keep existing code # --- New CLI Interaction Logic --- def display_main_menu(password_manager: PasswordManager): """Displays the main interactive menu.""" print(colored("\n--- SeedPass Main Menu ---", "blue", attrs=["bold"])) print(f"Active Profile: {colored(password_manager.current_fingerprint, 'green')}\n") menu_options = { "1": ("Add New Entry", password_manager.handle_add_entry_cli), "2": ("List / Retrieve Entries", password_manager.handle_retrieve_entry_cli), "3": ("Modify Entry", password_manager.handle_modify_entry_cli), "4": ("Delete Entry", password_manager.handle_delete_entry_cli), "5": ("Synchronize with Nostr", password_manager.synchronize_with_nostr), # Changed "6": ("Display Nostr Public Key (npub)", handle_display_npub), # Needs adapting "7": ("Manage Backups", handle_backup_menu), # New Sub-menu "8": ("Manage Profiles (Seeds)", handle_profile_menu), # New Sub-menu "9": ("Verify Script Checksum", password_manager.handle_verify_checksum), "10": ("Exit", None) # Handled in loop } for key, (text, _) in menu_options.items(): print(f" {Style.BRIGHT}{key}{Style.RESET_ALL}. {text}") return menu_options def handle_display_npub(password_manager: PasswordManager): """Displays the Nostr public key (npub).""" # Assumes nostr_client and key_manager are initialized if password_manager.nostr_client and password_manager.nostr_client.key_manager: try: npub = password_manager.nostr_client.key_manager.get_npub() print(colored(f"\nYour Nostr Public Key (npub) for profile '{password_manager.current_fingerprint}':", 'cyan')) print(colored(npub, 'yellow')) print(colored("Share this key for others to send you encrypted messages (if supported).", 'cyan')) except Exception as e: logger.error(f"Failed to get npub: {e}", exc_info=True) print(colored(f"Error displaying npub: {e}", "red")) else: print(colored("Nostr client not initialized for this profile.", "red")) def handle_backup_menu(password_manager: PasswordManager): """Handles the backup management sub-menu.""" if not password_manager.backup_manager: print(colored("Backup manager not initialized.", "red")) return backup_menu_options = { "1": ("Backup Specific Entry", password_manager.handle_backup_entry_cli), "2": ("Restore Specific Entry", password_manager.handle_restore_entry_cli), "3": ("List Backups for Entry", lambda: password_manager.backup_manager.display_backups( entry_num=int(input("Enter entry number to list backups for: ")) # Add error handling )), "4": ("List All Backups", lambda: password_manager.backup_manager.display_backups()), "5": ("Return to Main Menu", None) } while True: print(colored("\n--- Backup Management ---", "blue")) for key, (text, _) in backup_menu_options.items(): print(f" {key}. {text}") choice = input("Enter your choice: ").strip() if choice == '5': break # Return to main menu selected_option = backup_menu_options.get(choice) if selected_option and selected_option[1]: try: selected_option[1]() # Call the handler function except ValueError: print(colored("Invalid numeric input.", "red")) except Exception as e: logger.error(f"Error in backup menu option {choice}: {e}", exc_info=True) print(colored(f"An error occurred: {e}", "red")) elif selected_option: # Option exists but no function (like return) pass else: print(colored("Invalid choice.", "red")) def handle_profile_menu(password_manager: PasswordManager): """Handles the profile (fingerprint) management sub-menu.""" if not password_manager.fingerprint_manager: print(colored("Profile manager not initialized.", "red")) return profile_menu_options = { "1": ("Switch Active Profile", password_manager.handle_switch_fingerprint), # Assumes this returns bool "2": ("Add New Profile", password_manager.handle_add_new_fingerprint_cli), "3": ("Remove Profile", password_manager.handle_remove_fingerprint_cli), "4": ("List All Profiles", password_manager.handle_list_fingerprints_cli), "5": ("Backup/Reveal Current Profile Seed", password_manager.handle_backup_reveal_parent_seed), # Moved here "6": ("Return to Main Menu", None) } while True: print(colored("\n--- Profile Management ---", "blue")) for key, (text, _) in profile_menu_options.items(): print(f" {key}. {text}") choice = input("Enter your choice: ").strip() if choice == '6': break # Return to main menu selected_option = profile_menu_options.get(choice) if selected_option and selected_option[1]: try: result = selected_option[1]() # Call the handler function # Handle specific results if needed (e.g., switch profile might fail) if selected_option[0] == "Switch Active Profile" and result: print(colored("Profile switched successfully. Returning to main menu.", "green")) break # Exit sub-menu after successful switch except Exception as e: logger.error(f"Error in profile menu option {choice}: {e}", exc_info=True) print(colored(f"An error occurred: {e}", "red")) elif selected_option: pass else: print(colored("Invalid choice.", "red")) # --- Main Execution Logic --- if __name__ == '__main__': configure_logging() logger = logging.getLogger(__name__) logger.info("--- Starting SeedPass ---") password_manager: Optional[PasswordManager] = None # Define before try block try: # Initialization is now more complex, handled inside PasswordManager __init__ password_manager = PasswordManager() logger.info("PasswordManager initialization complete.") except SystemExit: logger.warning("SystemExit during initialization.") # Don't print error message again if sys.exit was called intentionally sys.exit(1) # Ensure exit code reflects failure except Exception as e: # Catch any other unexpected init errors logger.critical(f"Unhandled exception during PasswordManager initialization: {e}", exc_info=True) print(colored(f"FATAL ERROR during startup: {e}. Check logs.", "red", attrs=["bold"])) # Ensure cleanup if partially initialized? Difficult here. if password_manager and password_manager.nostr_client: password_manager.nostr_client.close_client_pool() sys.exit(1) # Register signal handlers for graceful shutdown def signal_handler(sig, frame): print(colored("\nReceived shutdown signal. Exiting gracefully...", 'yellow')) logging.info(f"Received shutdown signal: {sig}. Initiating graceful shutdown.") if password_manager and password_manager.nostr_client: try: password_manager.nostr_client.close_client_pool() logging.info("NostrClient closed successfully.") except Exception as e: logging.error(f"Error closing NostrClient during shutdown: {e}", exc_info=True) print(colored(f"Error during Nostr shutdown: {e}", 'red')) logging.info("--- SeedPass Shutting Down ---") sys.exit(0) signal.signal(signal.SIGINT, signal_handler) # Handle Ctrl+C signal.signal(signal.SIGTERM, signal_handler) # Handle termination signals # --- Main Application Loop --- try: while True: menu = display_main_menu(password_manager) choice = input(colored('Enter your choice: ', "magenta")).strip() if choice == '10': # Exit option break # Exit loop # Execute chosen action selected_option = menu.get(choice) if selected_option and selected_option[1]: try: # Call the appropriate handler function (now mostly methods of PasswordManager) # Pass password_manager instance only if the handler is a standalone function (like handle_display_npub) if selected_option[0] == "Display Nostr Public Key (npub)": handle_display_npub(password_manager) elif selected_option[0] == "Manage Backups": handle_backup_menu(password_manager) elif selected_option[0] == "Manage Profiles (Seeds)": handle_profile_menu(password_manager) else: selected_option[1]() # Call method on password_manager instance except Exception as menu_err: logger.error(f"Error during menu action '{selected_option[0]}': {menu_err}", exc_info=True) print(colored(f"An error occurred: {menu_err}", "red")) elif selected_option: # Option exists but no function (should not happen with this menu structure) pass else: print(colored("Invalid choice. Please select a valid option.", 'red')) except KeyboardInterrupt: logger.info("Program terminated by user (Ctrl+C in main loop).") print(colored("\nExiting...", 'yellow')) # Signal handler should have been called, but call cleanup just in case if password_manager and password_manager.nostr_client: password_manager.nostr_client.close_client_pool() sys.exit(0) except Exception as main_loop_err: logger.critical(f"An unexpected error occurred in the main loop: {main_loop_err}", exc_info=True) print(colored(f"FATAL ERROR: An unexpected error occurred: {main_loop_err}", 'red', attrs=["bold"])) # Attempt cleanup if password_manager and password_manager.nostr_client: password_manager.nostr_client.close_client_pool() sys.exit(1) finally: # Ensure cleanup runs on normal exit too logger.info("Exiting main loop.") if password_manager and password_manager.nostr_client: password_manager.nostr_client.close_client_pool() logging.info("--- SeedPass Finished ---") print(colored("Exiting SeedPass.", 'green')) sys.exit(0)
Phase 7: Remove Obsolete Files
- Delete
nostr/encryption_manager.py
.
Summary of Key Changes:
kinds.py
: Central definition for entry types.handlers/
: Specific logic for processing eachkind
.state_manager.py
: Tracks last generated password index and sync time.EntryManager
: Now manages individual entry files (saving, loading, deleting, checksumming data within the entry). No longer holds the index logic.BackupManager
: Adapted to back up/restore individual entry files.PasswordManager
: Orchestrates the new flow. Contains methods foradd_entry
,modify_entry
,delete_entry
,synchronize_with_nostr
,process_entry
, data migration, and CLI handlers. Reads/writes state viaStateManager
.NostrClient
: Publishes/fetches individual replaceable events (Kind 31111 suggested) usingd
tags. Fetching retrieves latest versions. UsesStoreEventHandler
(or similar) for collecting results.publish_json_to_nostr
andretrieve_json_from_nostr_sync
removed/replaced.main.py
: Updated menu structure, calling newPasswordManager
methods and sub-menu handlers.
Next Steps:
- Review Thoroughly: Carefully compare the refactored code against the original and the plan.
- Implement Tests: Write unit tests for
StateManager
, newEntryManager
methods,kinds.py
helpers, and the handlers. Write integration tests for theadd -> sync -> retrieve
flow. - Refine
NostrClient
Fetching: Thefetch_all_entries_async
implementation using a simple timeout is basic. Improving it to reliably use EOSE detection across multiple relays (client_pool.eose_matching
) would be more robust. - Error Handling: Add more specific error handling, especially around file I/O, network issues, and migration edge cases.
- User Experience: Test the CLI flow extensively. Ensure prompts are clear and error messages are helpful. Consider how users will select entries for modification/deletion (the current implementation lists all and asks for a number). Search functionality might be needed later.