Refactor password manager modules

This commit is contained in:
thePR0M3TH3AN
2025-07-17 19:21:10 -04:00
parent 87149517d8
commit c23b2e4913
115 changed files with 388 additions and 404 deletions

View File

@@ -14,8 +14,8 @@ import asyncio
import sys
from fastapi.middleware.cors import CORSMiddleware
from password_manager.manager import PasswordManager
from password_manager.entry_types import EntryType
from seedpass.core.manager import PasswordManager
from seedpass.core.entry_types import EntryType
app = FastAPI()

View File

@@ -4,8 +4,8 @@ import json
import typer
from password_manager.manager import PasswordManager
from password_manager.entry_types import EntryType
from seedpass.core.manager import PasswordManager
from seedpass.core.entry_types import EntryType
import uvicorn
from . import api as api_module

View File

@@ -0,0 +1,19 @@
# seedpass.core/__init__.py
"""Expose password manager components with lazy imports."""
from importlib import import_module
__all__ = ["PasswordManager", "ConfigManager", "Vault", "EntryType"]
def __getattr__(name: str):
if name == "PasswordManager":
return import_module(".manager", __name__).PasswordManager
if name == "ConfigManager":
return import_module(".config_manager", __name__).ConfigManager
if name == "Vault":
return import_module(".vault", __name__).Vault
if name == "EntryType":
return import_module(".entry_types", __name__).EntryType
raise AttributeError(f"module '{__name__}' has no attribute '{name}'")

202
src/seedpass/core/backup.py Normal file
View File

@@ -0,0 +1,202 @@
# seedpass.core/backup.py
"""
Backup Manager Module
This module implements the BackupManager class, responsible for creating backups,
restoring from backups, and listing available backups for the encrypted password
index file. It ensures data integrity and provides mechanisms to recover from
corrupted or lost data by maintaining timestamped backups.
Ensure that all dependencies are installed and properly configured in your environment.
"""
import logging
import os
import shutil
import time
import traceback
from pathlib import Path
from termcolor import colored
from .config_manager import ConfigManager
from utils.file_lock import exclusive_lock
from constants import APP_DIR
# Instantiate the logger
logger = logging.getLogger(__name__)
class BackupManager:
"""
BackupManager Class
Handles the creation, restoration, and listing of backups for the encrypted password
index file. Backups are stored in the application directory with
timestamped filenames to facilitate easy identification and retrieval.
"""
BACKUP_FILENAME_TEMPLATE = "entries_db_backup_{timestamp}.json.enc"
def __init__(self, fingerprint_dir: Path, config_manager: ConfigManager):
"""Initialize BackupManager for a specific profile.
Parameters
----------
fingerprint_dir : Path
Directory for this profile.
config_manager : ConfigManager
Configuration manager used for retrieving settings.
"""
self.fingerprint_dir = fingerprint_dir
self.config_manager = config_manager
self.backup_dir = self.fingerprint_dir / "backups"
self.backup_dir.mkdir(parents=True, exist_ok=True)
self.index_file = self.fingerprint_dir / "seedpass_entries_db.json.enc"
self._last_backup_time = 0.0
logger.debug(
f"BackupManager initialized with backup directory at {self.backup_dir}"
)
def create_backup(self) -> None:
try:
index_file = self.index_file
if not index_file.exists():
logger.warning("Index file does not exist. No backup created.")
print(
colored(
"Warning: Index file does not exist. No backup created.",
"yellow",
)
)
return
now = time.time()
interval = self.config_manager.get_backup_interval()
if interval > 0 and now - self._last_backup_time < interval:
logger.info("Skipping backup due to interval throttle")
return
timestamp = int(now)
backup_filename = self.BACKUP_FILENAME_TEMPLATE.format(timestamp=timestamp)
backup_file = self.backup_dir / backup_filename
shutil.copy2(index_file, backup_file)
os.chmod(backup_file, 0o600)
logger.info(f"Backup created successfully at '{backup_file}'.")
print(colored(f"Backup created successfully at '{backup_file}'.", "green"))
self._create_additional_backup(backup_file)
self._last_backup_time = now
except Exception as e:
logger.error(f"Failed to create backup: {e}", exc_info=True)
print(colored(f"Error: Failed to create backup: {e}", "red"))
def _create_additional_backup(self, backup_file: Path) -> None:
"""Write a copy of *backup_file* to the configured secondary location."""
path = self.config_manager.get_additional_backup_path()
if not path:
return
try:
dest_dir = Path(path).expanduser()
dest_dir.mkdir(parents=True, exist_ok=True)
dest_file = dest_dir / f"{self.fingerprint_dir.name}_{backup_file.name}"
shutil.copy2(backup_file, dest_file)
os.chmod(dest_file, 0o600)
logger.info(f"Additional backup created at '{dest_file}'.")
except Exception as e: # pragma: no cover - best-effort logging
logger.error(
f"Failed to write additional backup to '{path}': {e}",
exc_info=True,
)
def restore_latest_backup(self) -> None:
try:
backup_files = sorted(
self.backup_dir.glob("entries_db_backup_*.json.enc"),
key=lambda x: x.stat().st_mtime,
reverse=True,
)
if not backup_files:
logger.error("No backup files found to restore.")
print(colored("Error: No backup files found to restore.", "red"))
return
latest_backup = backup_files[0]
index_file = self.index_file
shutil.copy2(latest_backup, index_file)
os.chmod(index_file, 0o600)
logger.info(f"Restored the index file from backup '{latest_backup}'.")
print(
colored(
f"Restored the index file from backup '{latest_backup}'.", "green"
)
)
except Exception as e:
logger.error(
f"Failed to restore from backup '{latest_backup}': {e}", exc_info=True
)
print(
colored(
f"Error: Failed to restore from backup '{latest_backup}': {e}",
"red",
)
)
def list_backups(self) -> None:
try:
backup_files = sorted(
self.backup_dir.glob("entries_db_backup_*.json.enc"),
key=lambda x: x.stat().st_mtime,
reverse=True,
)
if not backup_files:
logger.info("No backup files available.")
print(colored("No backup files available.", "yellow"))
return
print(colored("Available Backups:", "cyan"))
for backup in backup_files:
creation_time = time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(backup.stat().st_mtime)
)
print(colored(f"- {backup.name} (Created on: {creation_time})", "cyan"))
except Exception as e:
logger.error(f"Failed to list backups: {e}", exc_info=True)
print(colored(f"Error: Failed to list backups: {e}", "red"))
def restore_backup_by_timestamp(self, timestamp: int) -> None:
backup_filename = self.BACKUP_FILENAME_TEMPLATE.format(timestamp=timestamp)
backup_file = self.backup_dir / backup_filename
if not backup_file.exists():
logger.error(f"No backup found with timestamp {timestamp}.")
print(colored(f"Error: No backup found with timestamp {timestamp}.", "red"))
return
try:
with exclusive_lock(backup_file) as fh_src, open(
self.index_file, "wb"
) as dst:
fh_src.seek(0)
shutil.copyfileobj(fh_src, dst)
os.chmod(self.index_file, 0o600)
logger.info(f"Restored the index file from backup '{backup_file}'.")
print(
colored(
f"Restored the index file from backup '{backup_file}'.", "green"
)
)
except Exception as e:
logger.error(
f"Failed to restore from backup '{backup_file}': {e}", exc_info=True
)
print(
colored(
f"Error: Failed to restore from backup '{backup_file}': {e}", "red"
)
)

View File

@@ -0,0 +1,328 @@
"""Config management for SeedPass profiles."""
from __future__ import annotations
import logging
from pathlib import Path
from typing import List, Optional
from utils.seed_prompt import masked_input
import bcrypt
from .vault import Vault
from nostr.client import DEFAULT_RELAYS as DEFAULT_NOSTR_RELAYS
from constants import INACTIVITY_TIMEOUT
logger = logging.getLogger(__name__)
class ConfigManager:
"""Manage per-profile configuration encrypted on disk."""
CONFIG_FILENAME = "seedpass_config.json.enc"
def __init__(self, vault: Vault, fingerprint_dir: Path):
self.vault = vault
self.fingerprint_dir = fingerprint_dir
self.config_path = self.fingerprint_dir / self.CONFIG_FILENAME
def load_config(self, require_pin: bool = True) -> dict:
"""Load the configuration file and optionally verify a stored PIN.
Parameters
----------
require_pin: bool, default True
If True and a PIN is configured, prompt the user to enter it and
verify against the stored hash.
"""
if not self.config_path.exists():
logger.info("Config file not found; returning defaults")
return {
"relays": list(DEFAULT_NOSTR_RELAYS),
"offline_mode": False,
"pin_hash": "",
"password_hash": "",
"inactivity_timeout": INACTIVITY_TIMEOUT,
"kdf_iterations": 50_000,
"kdf_mode": "pbkdf2",
"additional_backup_path": "",
"backup_interval": 0,
"secret_mode_enabled": False,
"clipboard_clear_delay": 45,
"quick_unlock": False,
"nostr_max_retries": 2,
"nostr_retry_delay": 1.0,
"min_uppercase": 2,
"min_lowercase": 2,
"min_digits": 2,
"min_special": 2,
"verbose_timing": False,
}
try:
data = self.vault.load_config()
if not isinstance(data, dict):
raise ValueError("Config data must be a dictionary")
# Ensure defaults for missing keys
data.setdefault("relays", list(DEFAULT_NOSTR_RELAYS))
data.setdefault("offline_mode", False)
data.setdefault("pin_hash", "")
data.setdefault("password_hash", "")
data.setdefault("inactivity_timeout", INACTIVITY_TIMEOUT)
data.setdefault("kdf_iterations", 50_000)
data.setdefault("kdf_mode", "pbkdf2")
data.setdefault("additional_backup_path", "")
data.setdefault("backup_interval", 0)
data.setdefault("secret_mode_enabled", False)
data.setdefault("clipboard_clear_delay", 45)
data.setdefault("quick_unlock", False)
data.setdefault("nostr_max_retries", 2)
data.setdefault("nostr_retry_delay", 1.0)
data.setdefault("min_uppercase", 2)
data.setdefault("min_lowercase", 2)
data.setdefault("min_digits", 2)
data.setdefault("min_special", 2)
data.setdefault("verbose_timing", False)
# Migrate legacy hashed_password.enc if present and password_hash is missing
legacy_file = self.fingerprint_dir / "hashed_password.enc"
if not data.get("password_hash") and legacy_file.exists():
with open(legacy_file, "rb") as f:
data["password_hash"] = f.read().decode()
self.save_config(data)
if require_pin and data.get("pin_hash"):
for _ in range(3):
pin = masked_input("Enter settings PIN: ").strip()
if bcrypt.checkpw(pin.encode(), data["pin_hash"].encode()):
break
print("Invalid PIN")
else:
raise ValueError("PIN verification failed")
return data
except Exception as exc:
logger.error(f"Failed to load config: {exc}")
raise
def save_config(self, config: dict) -> None:
"""Encrypt and save configuration."""
try:
config.setdefault("backup_interval", 0)
self.vault.save_config(config)
except Exception as exc:
logger.error(f"Failed to save config: {exc}")
raise
def set_relays(self, relays: List[str], require_pin: bool = True) -> None:
"""Update relay list and save."""
if not relays:
raise ValueError("At least one Nostr relay must be configured")
config = self.load_config(require_pin=require_pin)
config["relays"] = relays
self.save_config(config)
def set_pin(self, pin: str) -> None:
"""Hash and store the provided PIN."""
pin_hash = bcrypt.hashpw(pin.encode(), bcrypt.gensalt()).decode()
config = self.load_config(require_pin=False)
config["pin_hash"] = pin_hash
self.save_config(config)
def verify_pin(self, pin: str) -> bool:
"""Check a provided PIN against the stored hash without prompting."""
config = self.load_config(require_pin=False)
stored = config.get("pin_hash", "").encode()
if not stored:
return False
return bcrypt.checkpw(pin.encode(), stored)
def change_pin(self, old_pin: str, new_pin: str) -> bool:
"""Update the stored PIN if the old PIN is correct."""
if self.verify_pin(old_pin):
self.set_pin(new_pin)
return True
return False
def set_password_hash(self, password_hash: str) -> None:
"""Persist the bcrypt password hash in the config."""
config = self.load_config(require_pin=False)
config["password_hash"] = password_hash
self.save_config(config)
def set_inactivity_timeout(self, timeout_seconds: float) -> None:
"""Persist the inactivity timeout in seconds."""
if timeout_seconds <= 0:
raise ValueError("Timeout must be positive")
config = self.load_config(require_pin=False)
config["inactivity_timeout"] = timeout_seconds
self.save_config(config)
def get_inactivity_timeout(self) -> float:
"""Retrieve the inactivity timeout setting in seconds."""
config = self.load_config(require_pin=False)
return float(config.get("inactivity_timeout", INACTIVITY_TIMEOUT))
def set_kdf_iterations(self, iterations: int) -> None:
"""Persist the PBKDF2 iteration count in the config."""
if iterations <= 0:
raise ValueError("Iterations must be positive")
config = self.load_config(require_pin=False)
config["kdf_iterations"] = int(iterations)
self.save_config(config)
def get_kdf_iterations(self) -> int:
"""Retrieve the PBKDF2 iteration count."""
config = self.load_config(require_pin=False)
return int(config.get("kdf_iterations", 50_000))
def set_kdf_mode(self, mode: str) -> None:
"""Persist the key derivation function mode."""
if mode not in ("pbkdf2", "argon2"):
raise ValueError("kdf_mode must be 'pbkdf2' or 'argon2'")
config = self.load_config(require_pin=False)
config["kdf_mode"] = mode
self.save_config(config)
def get_kdf_mode(self) -> str:
"""Retrieve the configured key derivation function."""
config = self.load_config(require_pin=False)
return config.get("kdf_mode", "pbkdf2")
def set_additional_backup_path(self, path: Optional[str]) -> None:
"""Persist an optional additional backup path in the config."""
config = self.load_config(require_pin=False)
config["additional_backup_path"] = path or ""
self.save_config(config)
def get_additional_backup_path(self) -> Optional[str]:
"""Retrieve the additional backup path if configured."""
config = self.load_config(require_pin=False)
value = config.get("additional_backup_path", "")
return value or None
def set_secret_mode_enabled(self, enabled: bool) -> None:
"""Persist the secret mode toggle."""
config = self.load_config(require_pin=False)
config["secret_mode_enabled"] = bool(enabled)
self.save_config(config)
def set_offline_mode(self, enabled: bool) -> None:
"""Persist the offline mode toggle."""
config = self.load_config(require_pin=False)
config["offline_mode"] = bool(enabled)
self.save_config(config)
def get_secret_mode_enabled(self) -> bool:
"""Retrieve whether secret mode is enabled."""
config = self.load_config(require_pin=False)
return bool(config.get("secret_mode_enabled", False))
def get_offline_mode(self) -> bool:
"""Retrieve the offline mode setting."""
config = self.load_config(require_pin=False)
return bool(config.get("offline_mode", False))
def set_clipboard_clear_delay(self, delay: int) -> None:
"""Persist clipboard clear timeout in seconds."""
if delay <= 0:
raise ValueError("Delay must be positive")
config = self.load_config(require_pin=False)
config["clipboard_clear_delay"] = int(delay)
self.save_config(config)
def get_clipboard_clear_delay(self) -> int:
"""Retrieve clipboard clear delay in seconds."""
config = self.load_config(require_pin=False)
return int(config.get("clipboard_clear_delay", 45))
def set_backup_interval(self, interval: int | float) -> None:
"""Persist the minimum interval in seconds between automatic backups."""
if interval < 0:
raise ValueError("Interval cannot be negative")
config = self.load_config(require_pin=False)
config["backup_interval"] = interval
self.save_config(config)
def get_backup_interval(self) -> float:
"""Retrieve the backup interval in seconds."""
config = self.load_config(require_pin=False)
return float(config.get("backup_interval", 0))
# Password policy settings
def get_password_policy(self) -> "PasswordPolicy":
"""Return the password complexity policy."""
from .password_generation import PasswordPolicy
cfg = self.load_config(require_pin=False)
return PasswordPolicy(
min_uppercase=int(cfg.get("min_uppercase", 2)),
min_lowercase=int(cfg.get("min_lowercase", 2)),
min_digits=int(cfg.get("min_digits", 2)),
min_special=int(cfg.get("min_special", 2)),
)
def set_min_uppercase(self, count: int) -> None:
cfg = self.load_config(require_pin=False)
cfg["min_uppercase"] = int(count)
self.save_config(cfg)
def set_min_lowercase(self, count: int) -> None:
cfg = self.load_config(require_pin=False)
cfg["min_lowercase"] = int(count)
self.save_config(cfg)
def set_min_digits(self, count: int) -> None:
cfg = self.load_config(require_pin=False)
cfg["min_digits"] = int(count)
self.save_config(cfg)
def set_min_special(self, count: int) -> None:
cfg = self.load_config(require_pin=False)
cfg["min_special"] = int(count)
self.save_config(cfg)
def set_quick_unlock(self, enabled: bool) -> None:
"""Persist the quick unlock toggle."""
cfg = self.load_config(require_pin=False)
cfg["quick_unlock"] = bool(enabled)
self.save_config(cfg)
def get_quick_unlock(self) -> bool:
"""Retrieve whether quick unlock is enabled."""
cfg = self.load_config(require_pin=False)
return bool(cfg.get("quick_unlock", False))
def set_nostr_max_retries(self, retries: int) -> None:
"""Persist the maximum number of Nostr retry attempts."""
if retries < 0:
raise ValueError("retries cannot be negative")
cfg = self.load_config(require_pin=False)
cfg["nostr_max_retries"] = int(retries)
self.save_config(cfg)
def get_nostr_max_retries(self) -> int:
"""Retrieve the configured Nostr retry count."""
cfg = self.load_config(require_pin=False)
return int(cfg.get("nostr_max_retries", 2))
def set_nostr_retry_delay(self, delay: float) -> None:
"""Persist the delay between Nostr retry attempts."""
if delay < 0:
raise ValueError("delay cannot be negative")
cfg = self.load_config(require_pin=False)
cfg["nostr_retry_delay"] = float(delay)
self.save_config(cfg)
def get_nostr_retry_delay(self) -> float:
"""Retrieve the delay in seconds between Nostr retries."""
cfg = self.load_config(require_pin=False)
return float(cfg.get("nostr_retry_delay", 1.0))
def set_verbose_timing(self, enabled: bool) -> None:
cfg = self.load_config(require_pin=False)
cfg["verbose_timing"] = bool(enabled)
self.save_config(cfg)
def get_verbose_timing(self) -> bool:
cfg = self.load_config(require_pin=False)
return bool(cfg.get("verbose_timing", False))

View File

@@ -0,0 +1,339 @@
# /src/seedpass.core/encryption.py
import logging
import traceback
try:
import orjson as json_lib # type: ignore
JSONDecodeError = orjson.JSONDecodeError
USE_ORJSON = True
except Exception: # pragma: no cover - fallback for environments without orjson
import json as json_lib
from json import JSONDecodeError
USE_ORJSON = False
import hashlib
import os
import base64
from pathlib import Path
from typing import Optional
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.exceptions import InvalidTag
from cryptography.fernet import Fernet, InvalidToken
from termcolor import colored
from utils.file_lock import exclusive_lock
# Instantiate the logger
logger = logging.getLogger(__name__)
class EncryptionManager:
"""
Manages encryption and decryption, handling migration from legacy Fernet
to modern AES-GCM.
"""
def __init__(self, encryption_key: bytes, fingerprint_dir: Path):
"""
Initializes the EncryptionManager with keys for both new (AES-GCM)
and legacy (Fernet) encryption formats.
Parameters:
encryption_key (bytes): A base64-encoded key.
fingerprint_dir (Path): The directory corresponding to the fingerprint.
"""
self.fingerprint_dir = fingerprint_dir
self.parent_seed_file = self.fingerprint_dir / "parent_seed.enc"
try:
if isinstance(encryption_key, str):
encryption_key = encryption_key.encode()
# (1) Keep both the legacy Fernet instance and the new AES-GCM cipher ready.
self.key_b64 = encryption_key
self.fernet = Fernet(self.key_b64)
self.key = base64.urlsafe_b64decode(self.key_b64)
self.cipher = AESGCM(self.key)
logger.debug(f"EncryptionManager initialized for {self.fingerprint_dir}")
except Exception as e:
logger.error(
f"Failed to initialize ciphers with provided encryption key: {e}",
exc_info=True,
)
raise
def encrypt_data(self, data: bytes) -> bytes:
"""
(2) Encrypts data using the NEW AES-GCM format, prepending a version
header and the nonce. All new data will be in this format.
"""
try:
nonce = os.urandom(12) # 96-bit nonce is recommended for AES-GCM
ciphertext = self.cipher.encrypt(nonce, data, None)
return b"V2:" + nonce + ciphertext
except Exception as e:
logger.error(f"Failed to encrypt data: {e}", exc_info=True)
raise
def decrypt_data(self, encrypted_data: bytes) -> bytes:
"""
(3) The core migration logic. Tries the new format first, then falls back
to the old one. This is the ONLY place decryption logic should live.
"""
# Try the new V2 format first
if encrypted_data.startswith(b"V2:"):
try:
nonce = encrypted_data[3:15]
ciphertext = encrypted_data[15:]
if len(ciphertext) < 16:
logger.error("AES-GCM payload too short")
raise InvalidToken("AES-GCM payload too short")
return self.cipher.decrypt(nonce, ciphertext, None)
except InvalidTag as e:
logger.error("AES-GCM decryption failed: Invalid authentication tag.")
try:
result = self.fernet.decrypt(encrypted_data[3:])
logger.warning(
"Legacy-format file had incorrect 'V2:' header; decrypted with Fernet"
)
return result
except InvalidToken:
raise InvalidToken("AES-GCM decryption failed.") from e
# If it's not V2, it must be the legacy Fernet format
else:
logger.warning("Data is in legacy Fernet format. Attempting migration.")
try:
return self.fernet.decrypt(encrypted_data)
except InvalidToken as e:
logger.error(
"Legacy Fernet decryption failed. Vault may be corrupt or key is incorrect."
)
raise InvalidToken(
"Could not decrypt data with any available method."
) from e
# --- All functions below this point now use the smart `decrypt_data` method ---
def encrypt_parent_seed(self, parent_seed: str) -> None:
"""Encrypts and saves the parent seed to 'parent_seed.enc'."""
data = parent_seed.encode("utf-8")
encrypted_data = self.encrypt_data(data) # This now creates V2 format
with exclusive_lock(self.parent_seed_file) as fh:
fh.seek(0)
fh.truncate()
fh.write(encrypted_data)
os.chmod(self.parent_seed_file, 0o600)
logger.info(f"Parent seed encrypted and saved to '{self.parent_seed_file}'.")
def decrypt_parent_seed(self) -> str:
"""Decrypts and returns the parent seed, handling migration."""
with exclusive_lock(self.parent_seed_file) as fh:
fh.seek(0)
encrypted_data = fh.read()
is_legacy = not encrypted_data.startswith(b"V2:")
decrypted_data = self.decrypt_data(encrypted_data)
if is_legacy:
logger.info("Parent seed was in legacy format. Re-encrypting to V2 format.")
self.encrypt_parent_seed(decrypted_data.decode("utf-8").strip())
return decrypted_data.decode("utf-8").strip()
def encrypt_and_save_file(self, data: bytes, relative_path: Path) -> None:
file_path = self.fingerprint_dir / relative_path
file_path.parent.mkdir(parents=True, exist_ok=True)
encrypted_data = self.encrypt_data(data)
with exclusive_lock(file_path) as fh:
fh.seek(0)
fh.truncate()
fh.write(encrypted_data)
fh.flush()
os.fsync(fh.fileno())
os.chmod(file_path, 0o600)
def decrypt_file(self, relative_path: Path) -> bytes:
file_path = self.fingerprint_dir / relative_path
with exclusive_lock(file_path) as fh:
fh.seek(0)
encrypted_data = fh.read()
return self.decrypt_data(encrypted_data)
def save_json_data(self, data: dict, relative_path: Optional[Path] = None) -> None:
if relative_path is None:
relative_path = Path("seedpass_entries_db.json.enc")
if USE_ORJSON:
json_data = json_lib.dumps(data)
else:
json_data = json_lib.dumps(data, separators=(",", ":")).encode("utf-8")
self.encrypt_and_save_file(json_data, relative_path)
logger.debug(f"JSON data encrypted and saved to '{relative_path}'.")
def load_json_data(self, relative_path: Optional[Path] = None) -> dict:
"""
Loads and decrypts JSON data, automatically migrating and re-saving
if it's in the legacy format.
"""
if relative_path is None:
relative_path = Path("seedpass_entries_db.json.enc")
file_path = self.fingerprint_dir / relative_path
if not file_path.exists():
return {"entries": {}}
with exclusive_lock(file_path) as fh:
fh.seek(0)
encrypted_data = fh.read()
is_legacy = not encrypted_data.startswith(b"V2:")
try:
decrypted_data = self.decrypt_data(encrypted_data)
if USE_ORJSON:
data = json_lib.loads(decrypted_data)
else:
data = json_lib.loads(decrypted_data.decode("utf-8"))
# If it was a legacy file, re-save it in the new format now
if is_legacy:
logger.info(f"Migrating and re-saving legacy vault file: {file_path}")
self.save_json_data(data, relative_path)
self.update_checksum(relative_path)
return data
except (InvalidToken, InvalidTag, JSONDecodeError) as e:
logger.error(
f"FATAL: Could not decrypt or parse data from {file_path}: {e}",
exc_info=True,
)
raise
def get_encrypted_index(self) -> Optional[bytes]:
relative_path = Path("seedpass_entries_db.json.enc")
file_path = self.fingerprint_dir / relative_path
if not file_path.exists():
return None
with exclusive_lock(file_path) as fh:
fh.seek(0)
return fh.read()
def decrypt_and_save_index_from_nostr(
self,
encrypted_data: bytes,
relative_path: Optional[Path] = None,
*,
strict: bool = True,
) -> bool:
"""Decrypts data from Nostr and saves it.
Parameters
----------
encrypted_data:
The payload downloaded from Nostr.
relative_path:
Destination filename under the profile directory.
strict:
When ``True`` (default) re-raise any decryption error. When ``False``
return ``False`` if decryption fails.
"""
if relative_path is None:
relative_path = Path("seedpass_entries_db.json.enc")
try:
decrypted_data = self.decrypt_data(encrypted_data)
if USE_ORJSON:
data = json_lib.loads(decrypted_data)
else:
data = json_lib.loads(decrypted_data.decode("utf-8"))
self.save_json_data(data, relative_path) # This always saves in V2 format
self.update_checksum(relative_path)
logger.info("Index file from Nostr was processed and saved successfully.")
print(colored("Index file updated from Nostr successfully.", "green"))
return True
except Exception as e: # pragma: no cover - error handling
if strict:
logger.error(
f"Failed to decrypt and save data from Nostr: {e}",
exc_info=True,
)
print(
colored(
f"Error: Failed to decrypt and save data from Nostr: {e}",
"red",
)
)
raise
logger.warning(f"Failed to decrypt index from Nostr: {e}")
return False
def update_checksum(self, relative_path: Optional[Path] = None) -> None:
"""Updates the checksum file for the specified file."""
if relative_path is None:
relative_path = Path("seedpass_entries_db.json.enc")
file_path = self.fingerprint_dir / relative_path
if not file_path.exists():
return
try:
with exclusive_lock(file_path) as fh:
fh.seek(0)
encrypted_bytes = fh.read()
checksum = hashlib.sha256(encrypted_bytes).hexdigest()
checksum_file = file_path.parent / f"{file_path.stem}_checksum.txt"
with exclusive_lock(checksum_file) as fh:
fh.seek(0)
fh.truncate()
fh.write(checksum.encode("utf-8"))
fh.flush()
os.fsync(fh.fileno())
os.chmod(checksum_file, 0o600)
except Exception as e:
logger.error(
f"Failed to update checksum for '{relative_path}': {e}",
exc_info=True,
)
raise
# ... validate_seed and derive_seed_from_mnemonic can remain the same ...
def validate_seed(self, seed_phrase: str) -> bool:
try:
words = seed_phrase.split()
if len(words) != 12:
logger.error("Seed phrase does not contain exactly 12 words.")
print(
colored(
"Error: Seed phrase must contain exactly 12 words.",
"red",
)
)
return False
logger.debug("Seed phrase validated successfully.")
return True
except Exception as e:
logging.error(f"Error validating seed phrase: {e}", exc_info=True)
print(colored(f"Error: Failed to validate seed phrase: {e}", "red"))
return False
def derive_seed_from_mnemonic(self, mnemonic: str, passphrase: str = "") -> bytes:
try:
if not isinstance(mnemonic, str):
if isinstance(mnemonic, list):
mnemonic = " ".join(mnemonic)
else:
mnemonic = str(mnemonic)
if not isinstance(mnemonic, str):
raise TypeError("Mnemonic must be a string after conversion")
from bip_utils import Bip39SeedGenerator
seed = Bip39SeedGenerator(mnemonic).Generate(passphrase)
logger.debug("Seed derived successfully from mnemonic.")
return seed
except Exception as e:
logger.error(f"Failed to derive seed from mnemonic: {e}", exc_info=True)
print(colored(f"Error: Failed to derive seed from mnemonic: {e}", "red"))
raise

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,17 @@
# seedpass.core/entry_types.py
"""Enumerations for entry types used by SeedPass."""
from enum import Enum
class EntryType(str, Enum):
"""Enumeration of different entry types supported by the manager."""
PASSWORD = "password"
TOTP = "totp"
SSH = "ssh"
SEED = "seed"
PGP = "pgp"
NOSTR = "nostr"
KEY_VALUE = "key_value"
MANAGED_ACCOUNT = "managed_account"

4061
src/seedpass/core/manager.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,87 @@
"""Schema migration helpers for password index files."""
from __future__ import annotations
from typing import Callable, Dict
MIGRATIONS: Dict[int, Callable[[dict], dict]] = {}
def migration(
from_ver: int,
) -> Callable[[Callable[[dict], dict]], Callable[[dict], dict]]:
"""Register a migration function from *from_ver* to *from_ver* + 1."""
def decorator(func: Callable[[dict], dict]) -> Callable[[dict], dict]:
MIGRATIONS[from_ver] = func
return func
return decorator
@migration(0)
def _v0_to_v1(data: dict) -> dict:
"""Inject schema_version field for initial upgrade."""
data["schema_version"] = 1
return data
@migration(1)
def _v1_to_v2(data: dict) -> dict:
passwords = data.pop("passwords", {})
entries = {}
for k, v in passwords.items():
v.setdefault("type", "password")
v.setdefault("notes", "")
if "label" not in v and "website" in v:
v["label"] = v["website"]
if v.get("type") == "password" and "website" in v:
v.pop("website", None)
entries[k] = v
data["entries"] = entries
data["schema_version"] = 2
return data
@migration(2)
def _v2_to_v3(data: dict) -> dict:
"""Add custom_fields and origin defaults to each entry."""
entries = data.get("entries", {})
for entry in entries.values():
entry.setdefault("custom_fields", [])
entry.setdefault("origin", "")
if entry.get("type", "password") == "password":
if "label" not in entry and "website" in entry:
entry["label"] = entry["website"]
entry.pop("website", None)
data["schema_version"] = 3
return data
@migration(3)
def _v3_to_v4(data: dict) -> dict:
"""Add tags defaults to each entry."""
entries = data.get("entries", {})
for entry in entries.values():
entry.setdefault("tags", [])
data["schema_version"] = 4
return data
LATEST_VERSION = 4
def apply_migrations(data: dict) -> dict:
"""Upgrade *data* in-place to the latest schema version."""
current = data.get("schema_version", 0)
if current > LATEST_VERSION:
raise ValueError(f"Unsupported schema version {current}")
while current < LATEST_VERSION:
migrate = MIGRATIONS.get(current)
if migrate is None:
raise ValueError(f"No migration available from version {current}")
data = migrate(data)
current = data.get("schema_version", current + 1)
return data

View File

@@ -0,0 +1,490 @@
# seedpass.core/password_generation.py
"""
Password Generation Module
This module provides the PasswordGenerator class responsible for deterministic password generation
based on a BIP-39 parent seed. It leverages BIP-85 for entropy derivation and ensures that
generated passwords meet complexity requirements.
Ensure that all dependencies are installed and properly configured in your environment.
Never ever ever use Random Salt. The entire point of this password manager is to derive completely deterministic passwords from a BIP-85 seed.
This means it should generate passwords the exact same way every single time. Salts would break this functionality and is not appropriate for this software's use case.
"""
import os
import logging
import hashlib
import string
import random
import traceback
import base64
from typing import Optional
from dataclasses import dataclass
from termcolor import colored
from pathlib import Path
import shutil
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ed25519
from cryptography.hazmat.backends import default_backend
from bip_utils import Bip39SeedGenerator
# Ensure the ``imghdr`` module is available for ``pgpy`` on Python 3.13+
try: # pragma: no cover - only executed on Python >= 3.13
import imghdr # type: ignore
except ModuleNotFoundError: # pragma: no cover - fallback for removed module
from utils import imghdr_stub as imghdr # type: ignore
import sys
sys.modules.setdefault("imghdr", imghdr)
from local_bip85.bip85 import BIP85
from constants import DEFAULT_PASSWORD_LENGTH, MIN_PASSWORD_LENGTH, MAX_PASSWORD_LENGTH
from .encryption import EncryptionManager
# Instantiate the logger
logger = logging.getLogger(__name__)
@dataclass
class PasswordPolicy:
"""Minimum complexity requirements for generated passwords."""
min_uppercase: int = 2
min_lowercase: int = 2
min_digits: int = 2
min_special: int = 2
class PasswordGenerator:
"""
PasswordGenerator Class
Responsible for deterministic password generation based on a BIP-39 parent seed.
Utilizes BIP-85 for entropy derivation and ensures that generated passwords meet
complexity requirements.
"""
def __init__(
self,
encryption_manager: EncryptionManager,
parent_seed: str,
bip85: BIP85,
policy: PasswordPolicy | None = None,
):
"""
Initializes the PasswordGenerator with the encryption manager, parent seed, and BIP85 instance.
Parameters:
encryption_manager (EncryptionManager): The encryption manager instance.
parent_seed (str): The BIP-39 parent seed phrase.
bip85 (BIP85): The BIP85 instance for generating deterministic entropy.
"""
try:
self.encryption_manager = encryption_manager
self.parent_seed = parent_seed
self.bip85 = bip85
self.policy = policy or PasswordPolicy()
# Derive seed bytes from parent_seed using BIP39 (handled by EncryptionManager)
self.seed_bytes = self.encryption_manager.derive_seed_from_mnemonic(
self.parent_seed
)
logger.debug("PasswordGenerator initialized successfully.")
except Exception as e:
logger.error(f"Failed to initialize PasswordGenerator: {e}", exc_info=True)
print(colored(f"Error: Failed to initialize PasswordGenerator: {e}", "red"))
raise
def _derive_password_entropy(self, index: int) -> bytes:
"""Derive deterministic entropy for password generation."""
entropy = self.bip85.derive_entropy(index=index, bytes_len=64, app_no=32)
logger.debug(f"Derived entropy: {entropy.hex()}")
hkdf = HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=None,
info=b"password-generation",
backend=default_backend(),
)
hkdf_derived = hkdf.derive(entropy)
logger.debug(f"Derived key using HKDF: {hkdf_derived.hex()}")
dk = hashlib.pbkdf2_hmac("sha256", entropy, b"", 100000)
logger.debug(f"Derived key using PBKDF2: {dk.hex()}")
return dk
def _map_entropy_to_chars(self, dk: bytes, alphabet: str) -> str:
"""Map derived bytes to characters from the provided alphabet."""
password = "".join(alphabet[byte % len(alphabet)] for byte in dk)
logger.debug(f"Password after mapping to all allowed characters: {password}")
return password
def _shuffle_deterministically(self, password: str, dk: bytes) -> str:
"""Deterministically shuffle characters using derived bytes."""
shuffle_seed = int.from_bytes(dk, "big")
rng = random.Random(shuffle_seed)
password_chars = list(password)
rng.shuffle(password_chars)
shuffled = "".join(password_chars)
logger.debug("Shuffled password deterministically.")
return shuffled
def generate_password(
self, length: int = DEFAULT_PASSWORD_LENGTH, index: int = 0
) -> str:
"""
Generates a deterministic password based on the parent seed, desired length, and index.
Steps:
1. Derive entropy using BIP-85.
2. Use HKDF-HMAC-SHA256 to derive a key from entropy.
3. Map the derived key to all allowed characters.
4. Ensure the password meets complexity requirements.
5. Shuffle the password deterministically based on the derived key.
6. Trim or extend the password to the desired length.
Parameters:
length (int): Desired length of the password.
index (int): Index for deriving child entropy.
Returns:
str: The generated password.
"""
try:
# Validate password length
if length < MIN_PASSWORD_LENGTH:
logger.error(
f"Password length must be at least {MIN_PASSWORD_LENGTH} characters."
)
raise ValueError(
f"Password length must be at least {MIN_PASSWORD_LENGTH} characters."
)
if length > MAX_PASSWORD_LENGTH:
logger.error(
f"Password length must not exceed {MAX_PASSWORD_LENGTH} characters."
)
raise ValueError(
f"Password length must not exceed {MAX_PASSWORD_LENGTH} characters."
)
dk = self._derive_password_entropy(index=index)
all_allowed = string.ascii_letters + string.digits + string.punctuation
password = self._map_entropy_to_chars(dk, all_allowed)
password = self._enforce_complexity(password, all_allowed, dk)
password = self._shuffle_deterministically(password, dk)
# Ensure password length by extending if necessary
if len(password) < length:
while len(password) < length:
dk = hashlib.pbkdf2_hmac("sha256", dk, b"", 1)
extra = self._map_entropy_to_chars(dk, all_allowed)
password += extra
password = self._shuffle_deterministically(password, dk)
logger.debug(f"Extended password: {password}")
# Trim the password to the desired length and enforce complexity on
# the final result. Complexity enforcement is repeated here because
# trimming may remove required character classes from the password
# produced above when the requested length is shorter than the
# initial entropy size.
password = password[:length]
password = self._enforce_complexity(password, all_allowed, dk)
password = self._shuffle_deterministically(password, dk)
logger.debug(
f"Final password (trimmed to {length} chars with complexity enforced): {password}"
)
return password
except Exception as e:
logger.error(f"Error generating password: {e}", exc_info=True)
print(colored(f"Error: Failed to generate password: {e}", "red"))
raise
def _enforce_complexity(self, password: str, alphabet: str, dk: bytes) -> str:
"""
Ensures that the password contains at least two uppercase letters, two lowercase letters,
two digits, and two special characters, modifying it deterministically if necessary.
Also balances the distribution of character types.
Parameters:
password (str): The initial password.
alphabet (str): Allowed characters in the password.
dk (bytes): Derived key used for deterministic modifications.
Returns:
str: Password that meets complexity requirements.
"""
try:
uppercase = string.ascii_uppercase
lowercase = string.ascii_lowercase
digits = string.digits
special = string.punctuation
password_chars = list(password)
# Current counts
current_upper = sum(1 for c in password_chars if c in uppercase)
current_lower = sum(1 for c in password_chars if c in lowercase)
current_digits = sum(1 for c in password_chars if c in digits)
current_special = sum(1 for c in password_chars if c in special)
logger.debug(
f"Current character counts - Upper: {current_upper}, Lower: {current_lower}, Digits: {current_digits}, Special: {current_special}"
)
# Set minimum counts from policy
min_upper = self.policy.min_uppercase
min_lower = self.policy.min_lowercase
min_digits = self.policy.min_digits
min_special = self.policy.min_special
# Initialize derived key index
dk_index = 0
dk_length = len(dk)
def get_dk_value() -> int:
nonlocal dk_index
value = dk[dk_index % dk_length]
dk_index += 1
return value
# Replace characters to meet minimum counts
if current_upper < min_upper:
for _ in range(min_upper - current_upper):
index = get_dk_value() % len(password_chars)
char = uppercase[get_dk_value() % len(uppercase)]
password_chars[index] = char
logger.debug(
f"Added uppercase letter '{char}' at position {index}."
)
if current_lower < min_lower:
for _ in range(min_lower - current_lower):
index = get_dk_value() % len(password_chars)
char = lowercase[get_dk_value() % len(lowercase)]
password_chars[index] = char
logger.debug(
f"Added lowercase letter '{char}' at position {index}."
)
if current_digits < min_digits:
for _ in range(min_digits - current_digits):
index = get_dk_value() % len(password_chars)
char = digits[get_dk_value() % len(digits)]
password_chars[index] = char
logger.debug(f"Added digit '{char}' at position {index}.")
if current_special < min_special:
for _ in range(min_special - current_special):
index = get_dk_value() % len(password_chars)
char = special[get_dk_value() % len(special)]
password_chars[index] = char
logger.debug(
f"Added special character '{char}' at position {index}."
)
# Additional deterministic inclusion of symbols to increase score
symbol_target = 3 # Increase target number of symbols
current_symbols = sum(1 for c in password_chars if c in special)
additional_symbols_needed = max(symbol_target - current_symbols, 0)
for _ in range(additional_symbols_needed):
if dk_index >= dk_length:
break # Avoid exceeding the derived key length
index = get_dk_value() % len(password_chars)
char = special[get_dk_value() % len(special)]
password_chars[index] = char
logger.debug(f"Added additional symbol '{char}' at position {index}.")
# Ensure balanced distribution by assigning different character types to specific segments
# Example: Divide password into segments and assign different types
segment_length = len(password_chars) // 4
if segment_length > 0:
for i, char_type in enumerate([uppercase, lowercase, digits, special]):
segment_start = i * segment_length
segment_end = segment_start + segment_length
if segment_end > len(password_chars):
segment_end = len(password_chars)
for j in range(segment_start, segment_end):
if i == 0 and password_chars[j] not in uppercase:
char = uppercase[get_dk_value() % len(uppercase)]
password_chars[j] = char
logger.debug(
f"Assigned uppercase letter '{char}' to position {j}."
)
elif i == 1 and password_chars[j] not in lowercase:
char = lowercase[get_dk_value() % len(lowercase)]
password_chars[j] = char
logger.debug(
f"Assigned lowercase letter '{char}' to position {j}."
)
elif i == 2 and password_chars[j] not in digits:
char = digits[get_dk_value() % len(digits)]
password_chars[j] = char
logger.debug(f"Assigned digit '{char}' to position {j}.")
elif i == 3 and password_chars[j] not in special:
char = special[get_dk_value() % len(special)]
password_chars[j] = char
logger.debug(
f"Assigned special character '{char}' to position {j}."
)
# Shuffle again to distribute the characters more evenly
shuffle_seed = (
int.from_bytes(dk, "big") + dk_index
) # Modify seed to vary shuffle
rng = random.Random(shuffle_seed)
rng.shuffle(password_chars)
logger.debug(f"Shuffled password characters for balanced distribution.")
# Final counts after modifications
final_upper = sum(1 for c in password_chars if c in uppercase)
final_lower = sum(1 for c in password_chars if c in lowercase)
final_digits = sum(1 for c in password_chars if c in digits)
final_special = sum(1 for c in password_chars if c in special)
logger.debug(
f"Final character counts - Upper: {final_upper}, Lower: {final_lower}, Digits: {final_digits}, Special: {final_special}"
)
return "".join(password_chars)
except Exception as e:
logger.error(f"Error ensuring password complexity: {e}", exc_info=True)
print(colored(f"Error: Failed to ensure password complexity: {e}", "red"))
raise
def derive_ssh_key(bip85: BIP85, idx: int) -> bytes:
"""Derive 32 bytes of entropy suitable for an SSH key."""
return bip85.derive_entropy(index=idx, bytes_len=32, app_no=32)
def derive_ssh_key_pair(parent_seed: str, index: int) -> tuple[str, str]:
"""Derive an Ed25519 SSH key pair from the seed phrase and index."""
seed_bytes = Bip39SeedGenerator(parent_seed).Generate()
bip85 = BIP85(seed_bytes)
entropy = derive_ssh_key(bip85, index)
private_key = ed25519.Ed25519PrivateKey.from_private_bytes(entropy)
priv_pem = private_key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.PKCS8,
serialization.NoEncryption(),
).decode()
public_key = private_key.public_key()
pub_pem = public_key.public_bytes(
serialization.Encoding.PEM,
serialization.PublicFormat.SubjectPublicKeyInfo,
).decode()
return priv_pem, pub_pem
def derive_seed_phrase(bip85: BIP85, idx: int, words: int = 24) -> str:
"""Derive a new BIP39 seed phrase using BIP85."""
return bip85.derive_mnemonic(index=idx, words_num=words)
def derive_pgp_key(
bip85: BIP85, idx: int, key_type: str = "ed25519", user_id: str = ""
) -> tuple[str, str]:
"""Derive a deterministic PGP private key and return it with its fingerprint."""
from pgpy import PGPKey, PGPUID
from pgpy.packet.packets import PrivKeyV4
from pgpy.packet.fields import (
EdDSAPriv,
RSAPriv,
ECPoint,
ECPointFormat,
EllipticCurveOID,
MPI,
)
from pgpy.constants import (
PubKeyAlgorithm,
KeyFlags,
HashAlgorithm,
SymmetricKeyAlgorithm,
CompressionAlgorithm,
)
from Crypto.PublicKey import RSA
from Crypto.Util.number import inverse
from cryptography.hazmat.primitives.asymmetric import ed25519
from cryptography.hazmat.primitives import serialization
import hashlib
import datetime
entropy = bip85.derive_entropy(index=idx, bytes_len=32, app_no=32)
created = datetime.datetime(2000, 1, 1, tzinfo=datetime.timezone.utc)
if key_type.lower() == "rsa":
class DRNG:
def __init__(self, seed: bytes) -> None:
self.seed = seed
def __call__(self, n: int) -> bytes: # pragma: no cover - deterministic
out = b""
while len(out) < n:
self.seed = hashlib.sha256(self.seed).digest()
out += self.seed
return out[:n]
rsa_key = RSA.generate(2048, randfunc=DRNG(entropy))
keymat = RSAPriv()
keymat.n = MPI(rsa_key.n)
keymat.e = MPI(rsa_key.e)
keymat.d = MPI(rsa_key.d)
keymat.p = MPI(rsa_key.p)
keymat.q = MPI(rsa_key.q)
keymat.u = MPI(inverse(keymat.p, keymat.q))
keymat._compute_chksum()
pkt = PrivKeyV4()
pkt.pkalg = PubKeyAlgorithm.RSAEncryptOrSign
pkt.keymaterial = keymat
else:
priv = ed25519.Ed25519PrivateKey.from_private_bytes(entropy)
public = priv.public_key().public_bytes(
serialization.Encoding.Raw, serialization.PublicFormat.Raw
)
keymat = EdDSAPriv()
keymat.oid = EllipticCurveOID.Ed25519
keymat.s = MPI(int.from_bytes(entropy, "big"))
keymat.p = ECPoint.from_values(
keymat.oid.key_size, ECPointFormat.Native, public
)
keymat._compute_chksum()
pkt = PrivKeyV4()
pkt.pkalg = PubKeyAlgorithm.EdDSA
pkt.keymaterial = keymat
pkt.created = created
pkt.update_hlen()
key = PGPKey()
key._key = pkt
uid = PGPUID.new(user_id)
key.add_uid(
uid,
usage=[
KeyFlags.Sign,
KeyFlags.EncryptCommunications,
KeyFlags.EncryptStorage,
],
hashes=[HashAlgorithm.SHA256],
ciphers=[SymmetricKeyAlgorithm.AES256],
compression=[CompressionAlgorithm.ZLIB],
created=created,
)
return str(key), key.fingerprint

View File

@@ -0,0 +1,140 @@
# portable_backup.py
"""Export and import encrypted profile backups."""
from __future__ import annotations
import base64
import json
import logging
import os
import time
import asyncio
from enum import Enum
from pathlib import Path
from .vault import Vault
from .backup import BackupManager
from nostr.client import NostrClient
from utils.key_derivation import (
derive_index_key,
EncryptionMode,
)
from .encryption import EncryptionManager
from utils.checksum import json_checksum, canonical_json_dumps
logger = logging.getLogger(__name__)
FORMAT_VERSION = 1
EXPORT_NAME_TEMPLATE = "seedpass_export_{ts}.json"
class PortableMode(Enum):
"""Encryption mode for portable exports."""
SEED_ONLY = EncryptionMode.SEED_ONLY.value
def _derive_export_key(seed: str) -> bytes:
"""Derive the Fernet key for the export payload."""
return derive_index_key(seed)
def export_backup(
vault: Vault,
backup_manager: BackupManager,
dest_path: Path | None = None,
*,
publish: bool = False,
parent_seed: str | None = None,
) -> Path:
"""Export the current vault state to a portable encrypted file."""
if dest_path is None:
ts = int(time.time())
dest_dir = vault.fingerprint_dir / "exports"
dest_dir.mkdir(parents=True, exist_ok=True)
dest_path = dest_dir / EXPORT_NAME_TEMPLATE.format(ts=ts)
index_data = vault.load_index()
seed = (
parent_seed
if parent_seed is not None
else vault.encryption_manager.decrypt_parent_seed()
)
key = _derive_export_key(seed)
enc_mgr = EncryptionManager(key, vault.fingerprint_dir)
canonical = canonical_json_dumps(index_data)
payload_bytes = enc_mgr.encrypt_data(canonical.encode("utf-8"))
checksum = json_checksum(index_data)
wrapper = {
"format_version": FORMAT_VERSION,
"created_at": int(time.time()),
"fingerprint": vault.fingerprint_dir.name,
"encryption_mode": PortableMode.SEED_ONLY.value,
"cipher": "aes-gcm",
"checksum": checksum,
"payload": base64.b64encode(payload_bytes).decode("utf-8"),
}
json_bytes = json.dumps(wrapper, indent=2).encode("utf-8")
dest_path.write_bytes(json_bytes)
os.chmod(dest_path, 0o600)
backup_manager._create_additional_backup(dest_path)
if publish:
encrypted = vault.encryption_manager.encrypt_data(json_bytes)
enc_file = dest_path.with_suffix(dest_path.suffix + ".enc")
enc_file.write_bytes(encrypted)
os.chmod(enc_file, 0o600)
try:
client = NostrClient(
vault.encryption_manager,
vault.fingerprint_dir.name,
config_manager=backup_manager.config_manager,
)
asyncio.run(client.publish_snapshot(encrypted))
except Exception:
logger.error("Failed to publish backup via Nostr", exc_info=True)
return dest_path
def import_backup(
vault: Vault,
backup_manager: BackupManager,
path: Path,
parent_seed: str | None = None,
) -> None:
"""Import a portable backup file and replace the current index."""
raw = Path(path).read_bytes()
if path.suffix.endswith(".enc"):
raw = vault.encryption_manager.decrypt_data(raw)
wrapper = json.loads(raw.decode("utf-8"))
if wrapper.get("format_version") != FORMAT_VERSION:
raise ValueError("Unsupported backup format")
if wrapper.get("encryption_mode") != PortableMode.SEED_ONLY.value:
raise ValueError("Unsupported encryption mode")
payload = base64.b64decode(wrapper["payload"])
seed = (
parent_seed
if parent_seed is not None
else vault.encryption_manager.decrypt_parent_seed()
)
key = _derive_export_key(seed)
enc_mgr = EncryptionManager(key, vault.fingerprint_dir)
index_bytes = enc_mgr.decrypt_data(payload)
index = json.loads(index_bytes.decode("utf-8"))
checksum = json_checksum(index)
if checksum != wrapper.get("checksum"):
raise ValueError("Checksum mismatch")
backup_manager.create_backup()
vault.save_index(index)

View File

@@ -0,0 +1,14 @@
"""SeedQR encoding utilities."""
from __future__ import annotations
from bip_utils.bip.bip39.bip39_mnemonic import Bip39Languages
from bip_utils.bip.bip39.bip39_mnemonic_utils import Bip39WordsListGetter
def encode_seedqr(mnemonic: str) -> str:
"""Return SeedQR digit stream for a BIP-39 mnemonic."""
wordlist = Bip39WordsListGetter().GetByLanguage(Bip39Languages.ENGLISH)
words = mnemonic.strip().split()
indices = [wordlist.GetWordIdx(word.lower()) for word in words]
return "".join(f"{idx:04d}" for idx in indices)

93
src/seedpass/core/totp.py Normal file
View File

@@ -0,0 +1,93 @@
"""TOTP management utilities for SeedPass."""
from __future__ import annotations
import sys
import time
from urllib.parse import quote
from urllib.parse import urlparse, parse_qs, unquote
import qrcode
import pyotp
from utils import key_derivation
class TotpManager:
"""Helper methods for TOTP secrets and codes."""
@staticmethod
def derive_secret(seed: str, index: int) -> str:
"""Derive a TOTP secret from a BIP39 seed and index."""
return key_derivation.derive_totp_secret(seed, index)
@classmethod
def current_code(cls, seed: str, index: int, timestamp: int | None = None) -> str:
"""Return the TOTP code for the given seed and index."""
secret = cls.derive_secret(seed, index)
totp = pyotp.TOTP(secret)
if timestamp is None:
return totp.now()
return totp.at(timestamp)
@staticmethod
def current_code_from_secret(secret: str, timestamp: int | None = None) -> str:
"""Return the TOTP code for a raw secret."""
totp = pyotp.TOTP(secret)
return totp.now() if timestamp is None else totp.at(timestamp)
@staticmethod
def parse_otpauth(uri: str) -> tuple[str, str, int, int]:
"""Parse an otpauth URI and return (label, secret, period, digits)."""
if not uri.startswith("otpauth://"):
raise ValueError("Not an otpauth URI")
parsed = urlparse(uri)
label = unquote(parsed.path.lstrip("/"))
qs = parse_qs(parsed.query)
secret = qs.get("secret", [""])[0].upper()
period = int(qs.get("period", ["30"])[0])
digits = int(qs.get("digits", ["6"])[0])
if not secret:
raise ValueError("Missing secret in URI")
return label, secret, period, digits
@staticmethod
def make_otpauth_uri(
label: str, secret: str, period: int = 30, digits: int = 6
) -> str:
"""Construct an otpauth:// URI for use with authenticator apps."""
label_enc = quote(label)
return f"otpauth://totp/{label_enc}?secret={secret}&period={period}&digits={digits}"
@staticmethod
def time_remaining(period: int = 30, timestamp: int | None = None) -> int:
"""Return seconds remaining until the current TOTP period resets."""
if timestamp is None:
timestamp = int(time.time())
return period - (timestamp % period)
@classmethod
def print_progress_bar(cls, period: int = 30) -> None:
"""Print a simple progress bar for the current TOTP period."""
remaining = cls.time_remaining(period)
total = period
bar_len = 20
while remaining > 0:
progress = total - remaining
filled = int(bar_len * progress / total)
bar = "[" + "#" * filled + "-" * (bar_len - filled) + "]"
sys.stdout.write(f"\r{bar} {remaining:2d}s")
sys.stdout.flush()
time.sleep(1)
remaining -= 1
sys.stdout.write("\n")
sys.stdout.flush()
@staticmethod
def print_qr_code(uri: str) -> None:
"""Display a QR code representing the provided URI in the terminal."""
qr = qrcode.QRCode(border=1)
qr.add_data(uri)
qr.make()
qr.print_ascii(invert=True)

View File

@@ -0,0 +1,78 @@
"""Vault utilities for reading and writing encrypted files."""
from pathlib import Path
from typing import Optional, Union
from os import PathLike
from .encryption import EncryptionManager
class Vault:
"""Simple wrapper around :class:`EncryptionManager` for vault storage."""
INDEX_FILENAME = "seedpass_entries_db.json.enc"
CONFIG_FILENAME = "seedpass_config.json.enc"
def __init__(
self,
encryption_manager: EncryptionManager,
fingerprint_dir: Union[str, PathLike[str], Path],
):
self.encryption_manager = encryption_manager
self.fingerprint_dir = Path(fingerprint_dir)
self.index_file = self.fingerprint_dir / self.INDEX_FILENAME
self.config_file = self.fingerprint_dir / self.CONFIG_FILENAME
def set_encryption_manager(self, manager: EncryptionManager) -> None:
"""Replace the internal encryption manager."""
self.encryption_manager = manager
# ----- Password index helpers -----
def load_index(self) -> dict:
"""Return decrypted password index data as a dict, applying migrations."""
legacy_file = self.fingerprint_dir / "seedpass_passwords_db.json.enc"
if legacy_file.exists() and not self.index_file.exists():
legacy_checksum = (
self.fingerprint_dir / "seedpass_passwords_db_checksum.txt"
)
legacy_file.rename(self.index_file)
if legacy_checksum.exists():
legacy_checksum.rename(
self.fingerprint_dir / "seedpass_entries_db_checksum.txt"
)
data = self.encryption_manager.load_json_data(self.index_file)
from .migrations import apply_migrations, LATEST_VERSION
version = data.get("schema_version", 0)
if version > LATEST_VERSION:
raise ValueError(
f"File schema version {version} is newer than supported {LATEST_VERSION}"
)
data = apply_migrations(data)
return data
def save_index(self, data: dict) -> None:
"""Encrypt and write password index."""
self.encryption_manager.save_json_data(data, self.index_file)
def get_encrypted_index(self) -> Optional[bytes]:
"""Return the encrypted index bytes if present."""
return self.encryption_manager.get_encrypted_index()
def decrypt_and_save_index_from_nostr(
self, encrypted_data: bytes, *, strict: bool = True
) -> bool:
"""Decrypt Nostr payload and overwrite the local index."""
return self.encryption_manager.decrypt_and_save_index_from_nostr(
encrypted_data, strict=strict
)
# ----- Config helpers -----
def load_config(self) -> dict:
"""Load decrypted configuration."""
return self.encryption_manager.load_json_data(self.config_file)
def save_config(self, config: dict) -> None:
"""Encrypt and persist configuration."""
self.encryption_manager.save_json_data(config, self.config_file)