1 Commits

Author SHA1 Message Date
thePR0M3TH3AN
f0e7df54d4 ci: add uv lockfile verification 2025-08-19 09:14:41 -04:00
70 changed files with 480 additions and 1281 deletions

View File

@@ -23,7 +23,22 @@ jobs:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
GITLEAKS_CONFIG: .gitleaks.toml
lock-check:
name: Lock Check
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install uv
run: |
curl -LsSf https://astral.sh/uv/install.sh | sh
echo "$HOME/.cargo/bin" >> $GITHUB_PATH
- name: Compile lockfile
run: uv pip compile --python-version 3.11 --emit-index-url src/requirements.txt -o requirements.lock
- name: Verify lockfile
run: git diff --exit-code requirements.lock
build:
needs: lock-check
strategy:
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
@@ -69,20 +84,19 @@ jobs:
if: runner.os == 'Windows'
shell: bash
run: echo "${{ steps.msys.outputs.msys2-location }}/mingw64/bin" >> $GITHUB_PATH
- name: Cache pip
- name: Cache dependencies
uses: actions/cache@v3
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ hashFiles('requirements.lock') }}
path: ~/.cache/uv
key: ${{ runner.os }}-uv-${{ hashFiles('requirements.lock') }}
restore-keys: |
${{ runner.os }}-pip-
- name: Verify lockfile and install dependencies
${{ runner.os }}-uv-
- name: Install uv
run: |
python -m pip install --upgrade pip
pip install pip-tools
pip-compile --generate-hashes --output-file=requirements.lock src/requirements.txt
git diff --exit-code requirements.lock
pip install --require-hashes -r requirements.lock
curl -LsSf https://astral.sh/uv/install.sh | sh
echo "$HOME/.cargo/bin" >> $GITHUB_PATH
- name: Sync dependencies
run: uv pip sync --frozen requirements.lock
- name: Run dependency scan
run: scripts/dependency_scan.sh --ignore-vuln GHSA-wj6h-64fc-37mp
- name: Determine stress args

View File

@@ -2,60 +2,6 @@
This project is written in **Python**. Follow these instructions when working with the code base.
## Installation Quickstart for AI Agents
### Prerequisites
Ensure the system has the required build tools and Python headers. Examples:
```bash
# Ubuntu/Debian
sudo apt update && sudo apt install -y \
build-essential \
libffi-dev \
pkg-config \
python3.11-dev \
curl \
git
# CentOS/RHEL
sudo yum install -y gcc gcc-c++ libffi-devel pkgconfig python3-devel curl git
# macOS
brew install python@3.11 libffi pkg-config git
```
### Installation
Run the installer script to fetch the latest release:
```bash
# Stable release
bash -c "$(curl -sSL https://raw.githubusercontent.com/PR0M3TH3AN/SeedPass/main/scripts/install.sh)"
# Beta branch
bash -c "$(curl -sSL https://raw.githubusercontent.com/PR0M3TH3AN/SeedPass/main/scripts/install.sh)" _ -b beta
```
### Environment Layout
- Virtual environment: `~/.seedpass/app/venv/`
- Entry point: `~/.seedpass/app/src/main.py`
### Verification
```bash
cd ~/.seedpass/app && source venv/bin/activate
cd src && python main.py --version # Expected: SeedPass v[version]
```
### Running SeedPass
```bash
cd ~/.seedpass/app && source venv/bin/activate
cd src && python main.py
```
## Running Tests
1. Set up a virtual environment and install dependencies:
@@ -93,11 +39,6 @@ cd src && python main.py
Following these practices helps keep the code base consistent and secure.
## Deterministic Artifact Generation
- All generated artifacts (passwords, keys, TOTP secrets, etc.) must be fully deterministic across runs and platforms.
- Randomness is only permitted for security primitives (e.g., encryption nonces, in-memory keys) and must never influence derived artifacts.
## Legacy Index Migration
- Always provide a migration path for index archives and import/export routines.

View File

@@ -16,10 +16,6 @@ This software was not developed by an experienced security expert and should be
Recent releases derive passwords and other artifacts using a fully deterministic algorithm that behaves consistently across Python versions. This improvement means artifacts generated with earlier versions of SeedPass will not match those produced now. Regenerate any previously derived data or retain the old version if you need to reproduce older passwords or keys.
**⚠️ First Run Warning**
Use a dedicated BIP-39 seed phrase exclusively for SeedPass. Offline Mode is **ON by default**, keeping all Nostr syncing disabled until you explicitly opt in.
---
### Supported OS
@@ -123,9 +119,9 @@ See `docs/ARCHITECTURE.md` and [Nostr Setup](docs/nostr_setup.md) for details.
### Quick Installer
Use the automated installer to download SeedPass and its dependencies in one step.
The scripts can also install the BeeWare backend for your platform when requested (`--mode gui` or `--mode both` on Linux/macOS, `-IncludeGui` on Windows).
The scripts can also install the BeeWare backend for your platform when requested (use `-IncludeGui` on Windows).
If the GTK `gi` bindings are missing, the installer attempts to install the
necessary system packages using `apt`, `yum`, `pacman`, or Homebrew. When no display server is detected, GUI components are skipped automatically.
necessary system packages using `apt`, `yum`, `pacman`, or Homebrew.
**Linux and macOS:**
```bash
@@ -136,10 +132,6 @@ bash -c "$(curl -sSL https://raw.githubusercontent.com/PR0M3TH3AN/SeedPass/main/
bash -c "$(curl -sSL https://raw.githubusercontent.com/PR0M3TH3AN/SeedPass/main/scripts/install.sh)" _ -b beta
```
Make sure the command ends right after `-b beta` with **no trailing parenthesis**.
*Install with GUI support:*
```bash
bash -c "$(curl -sSL https://raw.githubusercontent.com/PR0M3TH3AN/SeedPass/main/scripts/install.sh)" _ --mode gui
```
**Windows (PowerShell):**
```powershell
@@ -156,7 +148,7 @@ The Windows installer will attempt to install Git automatically if it is not alr
#### Installer Dependency Checks
The installer verifies that core build tooling—C/C++ build tools, Rust, CMake, and the imaging/GTK libraries—are available before completing. Use `--mode gui` to install only the graphical interface or `--mode both` to install both interfaces (default: `tui`). On Linux, ensure `xclip` or `wl-clipboard` is installed for clipboard support.
The installer verifies that core build tooling—C/C++ build tools, Rust, CMake, and the imaging/GTK libraries—are available before completing. Pass `--no-gui` to skip installing GUI packages. On Linux, ensure `xclip` or `wl-clipboard` is installed for clipboard support.
#### Windows Nostr Sync Troubleshooting
@@ -793,7 +785,6 @@ You can also launch the GUI directly with `seedpass gui` or `seedpass-gui`.
- **No PBKDF2 Salt Needed:** SeedPass deliberately omits an explicit PBKDF2 salt. Every password is derived from a unique 512-bit BIP-85 child seed, which already provides stronger per-password uniqueness than a conventional 128-bit salt.
- **Checksum Verification:** Always verify the script's checksum to ensure its integrity and protect against unauthorized modifications.
- **Potential Bugs and Limitations:** Be aware that the software may contain bugs and lacks certain features. Snapshot chunks are capped at 50 KB and the client rotates snapshots after enough delta events accumulate. The security of memory management and logs has not been thoroughly evaluated and may pose risks of leaking sensitive information.
- **Best-Effort Memory Zeroization:** Sensitive data is wiped from memory when possible, but Python may retain copies of decrypted values.
- **Multiple Seeds Management:** While managing multiple seeds adds flexibility, it also increases the responsibility to secure each seed and its associated password.
- **No PBKDF2 Salt Required:** SeedPass deliberately omits an explicit PBKDF2 salt. Every password is derived from a unique 512-bit BIP-85 child seed, which already provides stronger per-password uniqueness than a conventional 128-bit salt.
- **Default KDF Iterations:** New profiles start with 50,000 PBKDF2 iterations. Adjust this with `seedpass config set kdf_iterations`.

View File

@@ -1,44 +0,0 @@
# SeedPass Specification
## Key Hierarchy
SeedPass derives a hierarchy of keys from a single BIP-39 parent seed using HKDF:
- **Master Key** `HKDF(seed, "seedpass:v1:master")`
- **KEY_STORAGE** used to encrypt vault data.
- **KEY_INDEX** protects the metadata index.
- **KEY_PW_DERIVE** deterministic password generation.
- **KEY_TOTP_DET** deterministic TOTP secrets.
Each context string keeps derived keys domain separated.
## KDF Parameters
Passwords are protected with **PBKDF2-HMAC-SHA256**. The default work factor is
**50,000 iterations** but may be adjusted via the settings slider. The config
stores a `KdfConfig` structure with the chosen iteration count, algorithm name,
and the current spec version (`CURRENT_KDF_VERSION = 1`). Argon2 is available
with a default `time_cost` of 2 when selected.
## Message Formats
SeedPass synchronizes profiles over Nostr using three event kinds:
- **Manifest (`30070`)** high level snapshot description and current version.
- **Snapshot Chunk (`30071`)** compressed, encrypted portions of the vault.
- **Delta (`30072`)** incremental changes since the last snapshot.
Events encode JSON and include tags for checksums, fingerprints, and timestamps.
## Versioning
Configuration and KDF schemas are versioned so clients can migrate older
profiles. Nostr events carry a version field in the manifest, and the software
follows semantic versioning for releases.
## Memory Protection
SeedPass encrypts sensitive values in memory and attempts to wipe them when no
longer needed. This zeroization is best-effort only; Python's memory management
may retain copies of decrypted data. Critical cryptographic operations may move
to a Rust/WASM module in the future to provide stronger guarantees.

View File

@@ -127,7 +127,7 @@ Run or stop the local HTTP API.
| Action | Command | Examples |
| :--- | :--- | :--- |
| Start the API | `api start` | `seedpass api start --host 0.0.0.0 --port 8000` |
| Stop the API | `api stop --token TOKEN` | `seedpass api stop --token <token>` |
| Stop the API | `api stop` | `seedpass api stop` |
---
@@ -214,7 +214,7 @@ Set the `SEEDPASS_CORS_ORIGINS` environment variable to a commaseparated list
SEEDPASS_CORS_ORIGINS=http://localhost:3000 seedpass api start
```
Shut down the server with `seedpass api stop --token <token>`.
Shut down the server with `seedpass api stop`.
---

View File

@@ -43,7 +43,6 @@ from seedpass.core.vault import Vault
from seedpass.core.config_manager import ConfigManager
from seedpass.core.backup import BackupManager
from seedpass.core.entry_management import EntryManager
from seedpass.core.state_manager import StateManager
from nostr.client import NostrClient
from utils.fingerprint import generate_fingerprint
from utils.fingerprint_manager import FingerprintManager
@@ -196,13 +195,11 @@ def main() -> None:
encrypted = entry_mgr.vault.get_encrypted_index()
if encrypted:
idx = StateManager(dir_path).state.get("nostr_account_idx", 0)
client = NostrClient(
entry_mgr.vault.encryption_manager,
fingerprint or dir_path.name,
parent_seed=seed,
config_manager=cfg_mgr,
account_index=idx,
)
asyncio.run(client.publish_snapshot(encrypted))
print("[+] Data synchronized to Nostr.")

View File

@@ -17,8 +17,7 @@ VENV_DIR="$INSTALL_DIR/venv"
LAUNCHER_DIR="$HOME/.local/bin"
LAUNCHER_PATH="$LAUNCHER_DIR/seedpass"
BRANCH="main" # Default branch
MODE="tui"
INSTALL_GUI=false
INSTALL_GUI=true
# --- Helper Functions ---
print_info() { echo -e "\033[1;34m[INFO]\033[0m" "$1"; }
@@ -60,9 +59,9 @@ install_dependencies() {
fi
}
usage() {
echo "Usage: $0 [-b | --branch <branch_name>] [-m | --mode <tui|gui|both>] [-h | --help]"
echo "Usage: $0 [-b | --branch <branch_name>] [--no-gui] [-h | --help]"
echo " -b, --branch Specify the git branch to install (default: main)"
echo " -m, --mode Installation mode: tui, gui, both (default: tui)"
echo " --no-gui Skip graphical interface dependencies (default: include GUI)"
echo " -h, --help Display this help message"
exit 0
}
@@ -83,13 +82,9 @@ main() {
-h|--help)
usage
;;
-m|--mode)
if [ -n "$2" ]; then
MODE="$2"
shift 2
else
print_error "Error: --mode requires an argument (tui|gui|both)."
fi
--no-gui)
INSTALL_GUI=false
shift
;;
*)
print_error "Unknown parameter passed: $1"; usage
@@ -97,26 +92,6 @@ main() {
esac
done
case "$MODE" in
tui|gui|both) ;;
*)
print_error "Invalid mode: $MODE. Use 'tui', 'gui', or 'both'."
;;
esac
DISPLAY_DETECTED=false
if [ -n "${DISPLAY:-}" ] || [ -n "${WAYLAND_DISPLAY:-}" ]; then
DISPLAY_DETECTED=true
fi
if [[ "$MODE" == "gui" || "$MODE" == "both" ]]; then
if [ "$DISPLAY_DETECTED" = true ]; then
INSTALL_GUI=true
else
print_warning "No display detected. Skipping GUI installation."
fi
fi
# 1. Detect OS
OS_NAME=$(uname -s)
print_info "Installing SeedPass from branch: '$BRANCH'"

View File

@@ -670,49 +670,33 @@ def handle_set_inactivity_timeout(password_manager: PasswordManager) -> None:
def handle_set_kdf_iterations(password_manager: PasswordManager) -> None:
"""Interactive slider for PBKDF2 iteration strength with benchmarking."""
import hashlib
import time
"""Change the PBKDF2 iteration count."""
cfg_mgr = password_manager.config_manager
if cfg_mgr is None:
print(colored("Configuration manager unavailable.", "red"))
return
levels = [
("1", "Very Fast", 10_000),
("2", "Fast", 50_000),
("3", "Balanced", 100_000),
("4", "Slow", 200_000),
("5", "Paranoid", 500_000),
]
try:
current = cfg_mgr.get_kdf_iterations()
print(colored(f"Current iterations: {current}", "cyan"))
except Exception as e:
logging.error(f"Error loading iterations: {e}")
print(colored(f"Error: {e}", "red"))
return
print(colored(f"Current iterations: {current}", "cyan"))
for key, label, iters in levels:
marker = "*" if iters == current else " "
print(colored(f"{key}. {label} ({iters}) {marker}", "menu"))
print(colored("b. Benchmark current setting", "menu"))
choice = input("Select strength or 'b' to benchmark: ").strip().lower()
if not choice:
print(colored("No change made.", "yellow"))
return
if choice == "b":
start = time.perf_counter()
hashlib.pbkdf2_hmac("sha256", b"bench", b"salt", current)
elapsed = time.perf_counter() - start
print(colored(f"{current} iterations took {elapsed:.2f}s", "green"))
return
selected = {k: v for k, _, v in levels}.get(choice)
if not selected:
print(colored("Invalid choice.", "red"))
value = input("Enter new iteration count: ").strip()
if not value:
print(colored("No iteration count entered.", "yellow"))
return
try:
cfg_mgr.set_kdf_iterations(selected)
print(colored(f"KDF iteration count set to {selected}.", "green"))
iterations = int(value)
if iterations <= 0:
print(colored("Iterations must be positive.", "red"))
return
except ValueError:
print(colored("Invalid number.", "red"))
return
try:
cfg_mgr.set_kdf_iterations(iterations)
print(colored("KDF iteration count updated.", "green"))
except Exception as e:
logging.error(f"Error saving iterations: {e}")
print(colored(f"Error: {e}", "red"))
@@ -1030,12 +1014,12 @@ def handle_settings(password_manager: PasswordManager) -> None:
print(color_text("8. Import database", "menu"))
print(color_text("9. Export 2FA codes", "menu"))
print(color_text("10. Set additional backup location", "menu"))
print(color_text("11. KDF strength & benchmark", "menu"))
print(color_text("11. Set KDF iterations", "menu"))
print(color_text("12. Set inactivity timeout", "menu"))
print(color_text("13. Lock Vault", "menu"))
print(color_text("14. Stats", "menu"))
print(color_text("15. Toggle Secret Mode", "menu"))
print(color_text("16. Toggle Offline Mode (default ON)", "menu"))
print(color_text("16. Toggle Offline Mode", "menu"))
print(color_text("17. Toggle Quick Unlock", "menu"))
choice = input("Select an option or press Enter to go back: ").strip()
if choice == "1":
@@ -1310,11 +1294,6 @@ def main(argv: list[str] | None = None, *, fingerprint: str | None = None) -> in
action="store_true",
help="Disable clipboard support and print secrets",
)
parser.add_argument(
"--deterministic-totp",
action="store_true",
help="Derive TOTP secrets deterministically",
)
parser.add_argument(
"--max-prompt-attempts",
type=int,
@@ -1325,11 +1304,6 @@ def main(argv: list[str] | None = None, *, fingerprint: str | None = None) -> in
exp = sub.add_parser("export")
exp.add_argument("--file")
exp.add_argument(
"--unencrypted",
action="store_true",
help="Export without encryption",
)
imp = sub.add_parser("import")
imp.add_argument("--file")
@@ -1397,13 +1371,9 @@ def main(argv: list[str] | None = None, *, fingerprint: str | None = None) -> in
if args.no_clipboard:
password_manager.secret_mode_enabled = False
if args.deterministic_totp:
password_manager.deterministic_totp = True
if args.command == "export":
password_manager.handle_export_database(
Path(args.file), encrypt=not args.unencrypted
)
password_manager.handle_export_database(Path(args.file))
return 0
if args.command == "import":
password_manager.handle_import_database(Path(args.file))
@@ -1445,10 +1415,9 @@ def main(argv: list[str] | None = None, *, fingerprint: str | None = None) -> in
if entry.get("type") != EntryType.TOTP.value:
print(colored("Entry is not a TOTP entry.", "red"))
return 1
key = getattr(password_manager, "KEY_TOTP_DET", None) or getattr(
password_manager, "parent_seed", None
code = password_manager.entry_manager.get_totp_code(
idx, password_manager.parent_seed
)
code = password_manager.entry_manager.get_totp_code(idx, key)
print(code)
try:
if copy_to_clipboard(code, password_manager.clipboard_clear_delay):

View File

@@ -25,4 +25,3 @@ class Manifest:
algo: str
chunks: List[ChunkMeta]
delta_since: Optional[int] = None
nonce: Optional[str] = None

View File

@@ -33,7 +33,7 @@ from .backup_models import (
)
from .connection import ConnectionHandler, DEFAULT_RELAYS
from .key_manager import KeyManager as SeedPassKeyManager
from .snapshot import SnapshotHandler, prepare_snapshot
from .snapshot import MANIFEST_ID_PREFIX, SnapshotHandler, prepare_snapshot
if TYPE_CHECKING: # pragma: no cover - imported for type hints
from seedpass.core.config_manager import ConfigManager
@@ -57,8 +57,6 @@ class NostrClient(ConnectionHandler, SnapshotHandler):
parent_seed: Optional[str] = None,
offline_mode: bool = False,
config_manager: Optional["ConfigManager"] = None,
key_index: bytes | None = None,
account_index: int | None = None,
) -> None:
self.encryption_manager = encryption_manager
self.fingerprint = fingerprint
@@ -70,7 +68,7 @@ class NostrClient(ConnectionHandler, SnapshotHandler):
parent_seed = self.encryption_manager.decrypt_parent_seed()
# Use our project's KeyManager to derive the private key
self.key_manager = KeyManager(parent_seed, fingerprint, account_index)
self.key_manager = KeyManager(parent_seed, fingerprint)
# Create a nostr-sdk Keys object from our derived private key
private_key_hex = self.key_manager.keys.private_key_hex()
@@ -101,7 +99,6 @@ class NostrClient(ConnectionHandler, SnapshotHandler):
self.current_manifest: Manifest | None = None
self.current_manifest_id: str | None = None
self._delta_events: list[str] = []
self.key_index = key_index or b""
# Configure and initialize the nostr-sdk Client
signer = NostrSigner.keys(self.keys)
@@ -114,4 +111,5 @@ __all__ = [
"NostrClient",
"prepare_snapshot",
"DEFAULT_RELAYS",
"MANIFEST_ID_PREFIX",
]

View File

@@ -16,22 +16,17 @@ logger = logging.getLogger(__name__)
class KeyManager:
"""Manages key generation, encoding, and derivation for ``NostrClient``."""
"""
Manages key generation, encoding, and derivation for NostrClient.
"""
def __init__(
self, parent_seed: str, fingerprint: str, account_index: int | None = None
):
"""Initialize the key manager.
def __init__(self, parent_seed: str, fingerprint: str):
"""
Initializes the KeyManager with the provided parent_seed and fingerprint.
Parameters
----------
parent_seed:
The BIP-39 seed used as the root for derivations.
fingerprint:
Seed profile fingerprint used for legacy derivations and logging.
account_index:
Optional explicit index for BIP-85 Nostr key derivation. When ``None``
the index defaults to ``0``.
Parameters:
parent_seed (str): The parent seed used for key derivation.
fingerprint (str): The fingerprint to differentiate key derivations.
"""
try:
if not isinstance(parent_seed, str):
@@ -45,15 +40,12 @@ class KeyManager:
self.parent_seed = parent_seed
self.fingerprint = fingerprint
self.account_index = account_index
logger.debug(
"KeyManager initialized with parent_seed, fingerprint and account index."
)
logger.debug(f"KeyManager initialized with parent_seed and fingerprint.")
# Initialize BIP85
self.bip85 = self.initialize_bip85()
# Generate Nostr keys using the provided account index
# Generate Nostr keys using the fingerprint
self.keys = self.generate_nostr_keys()
logger.debug("Nostr Keys initialized successfully.")
@@ -78,34 +70,32 @@ class KeyManager:
raise
def generate_nostr_keys(self) -> Keys:
"""Derive a Nostr key pair using the configured ``account_index``."""
try:
index = self.account_index if self.account_index is not None else 0
entropy_bytes = self.bip85.derive_entropy(
index=index, entropy_bytes=32, app_no=NOSTR_KEY_APP_ID
)
private_key_hex = entropy_bytes.hex()
keys = Keys(priv_k=private_key_hex)
logger.debug("Nostr keys generated for account index %s", index)
return keys
except Exception as e:
logger.error(f"Failed to generate Nostr keys: {e}", exc_info=True)
raise
def generate_v1_nostr_keys(self) -> Keys:
"""Derive keys using the legacy fingerprint-hash method."""
"""
Derives a unique Nostr key pair for the given fingerprint using BIP-85.
Returns:
Keys: An instance of Keys containing the Nostr key pair.
"""
try:
# Convert fingerprint to an integer index (using a hash function)
index = int(hashlib.sha256(self.fingerprint.encode()).hexdigest(), 16) % (
2**31
)
# Derive entropy for Nostr key (32 bytes)
entropy_bytes = self.bip85.derive_entropy(
index=index, entropy_bytes=32, app_no=NOSTR_KEY_APP_ID
index=index,
entropy_bytes=32,
app_no=NOSTR_KEY_APP_ID,
)
return Keys(priv_k=entropy_bytes.hex())
# Generate Nostr key pair from entropy
private_key_hex = entropy_bytes.hex()
keys = Keys(priv_k=private_key_hex)
logger.debug(f"Nostr keys generated for fingerprint {self.fingerprint}.")
return keys
except Exception as e:
logger.error(f"Failed to generate v1 Nostr keys: {e}", exc_info=True)
logger.error(f"Failed to generate Nostr keys: {e}", exc_info=True)
raise
def generate_legacy_nostr_keys(self) -> Keys:

View File

@@ -2,10 +2,8 @@ import asyncio
import base64
import gzip
import hashlib
import hmac
import json
import logging
import os
import time
from datetime import timedelta
from typing import Tuple
@@ -25,6 +23,9 @@ from .backup_models import (
logger = logging.getLogger("nostr.client")
logger.setLevel(logging.WARNING)
# Identifier prefix for replaceable manifest events
MANIFEST_ID_PREFIX = "seedpass-manifest-"
def prepare_snapshot(
encrypted_bytes: bytes, limit: int
@@ -46,19 +47,6 @@ def prepare_snapshot(
return manifest, chunks
def new_manifest_id(key_index: bytes) -> tuple[str, bytes]:
"""Return a new manifest identifier and nonce.
The identifier is computed as HMAC-SHA256 of ``b"manifest|" + nonce``
using ``key_index`` as the HMAC key. The nonce is returned so it can be
embedded inside the manifest itself.
"""
nonce = os.urandom(16)
digest = hmac.new(key_index, b"manifest|" + nonce, hashlib.sha256).hexdigest()
return digest, nonce
class SnapshotHandler:
"""Mixin providing chunk and manifest handling."""
@@ -96,43 +84,34 @@ class SnapshotHandler:
except Exception:
meta.event_id = None
if (
self.current_manifest_id
and self.current_manifest
and getattr(self.current_manifest, "nonce", None)
):
manifest_id = self.current_manifest_id
manifest.nonce = self.current_manifest.nonce
else:
manifest_id, nonce = new_manifest_id(self.key_index)
manifest.nonce = base64.b64encode(nonce).decode("utf-8")
manifest_json = json.dumps(
{
"ver": manifest.ver,
"algo": manifest.algo,
"chunks": [meta.__dict__ for meta in manifest.chunks],
"delta_since": manifest.delta_since,
"nonce": manifest.nonce,
}
)
manifest_identifier = (
self.current_manifest_id or f"{MANIFEST_ID_PREFIX}{self.fingerprint}"
)
manifest_event = (
nostr_client.EventBuilder(nostr_client.Kind(KIND_MANIFEST), manifest_json)
.tags([nostr_client.Tag.identifier(manifest_id)])
.tags([nostr_client.Tag.identifier(manifest_identifier)])
.build(self.keys.public_key())
.sign_with_keys(self.keys)
)
await self.client.send_event(manifest_event)
with self._state_lock:
self.current_manifest = manifest
self.current_manifest_id = manifest_id
self.current_manifest_id = manifest_identifier
self.current_manifest.delta_since = int(time.time())
self._delta_events = []
if getattr(self, "verbose_timing", False):
duration = time.perf_counter() - start
logger.info("publish_snapshot completed in %.2f seconds", duration)
return manifest, manifest_id
return manifest, manifest_identifier
async def _fetch_chunks_with_retry(
self, manifest_event
@@ -150,7 +129,6 @@ class SnapshotHandler:
if data.get("delta_since") is not None
else None
),
nonce=data.get("nonce"),
)
except Exception:
return None
@@ -226,11 +204,14 @@ class SnapshotHandler:
pubkey = self.keys.public_key()
timeout = timedelta(seconds=10)
ident = self.current_manifest_id
f = nostr_client.Filter().author(pubkey).kind(nostr_client.Kind(KIND_MANIFEST))
if ident:
f = f.identifier(ident)
f = f.limit(1)
ident = f"{MANIFEST_ID_PREFIX}{self.fingerprint}"
f = (
nostr_client.Filter()
.author(pubkey)
.kind(nostr_client.Kind(KIND_MANIFEST))
.identifier(ident)
.limit(1)
)
try:
events = (await self.client.fetch_events(f, timeout)).to_vec()
except Exception as e: # pragma: no cover - network errors
@@ -242,11 +223,13 @@ class SnapshotHandler:
)
return None
if not events and ident:
if not events:
ident = MANIFEST_ID_PREFIX.rstrip("-")
f = (
nostr_client.Filter()
.author(pubkey)
.kind(nostr_client.Kind(KIND_MANIFEST))
.identifier(ident)
.limit(1)
)
try:
@@ -262,6 +245,8 @@ class SnapshotHandler:
if not events:
return None
logger.info("Fetched manifest using identifier %s", ident)
for manifest_event in events:
try:
result = await self._fetch_chunks_with_retry(manifest_event)
@@ -315,9 +300,7 @@ class SnapshotHandler:
return
await self._connect_async()
pubkey = self.keys.public_key()
ident = self.current_manifest_id
if ident is None:
return
ident = self.current_manifest_id or f"{MANIFEST_ID_PREFIX}{self.fingerprint}"
f = (
nostr_client.Filter()
.author(pubkey)
@@ -375,7 +358,6 @@ class SnapshotHandler:
meta.__dict__ for meta in self.current_manifest.chunks
],
"delta_since": self.current_manifest.delta_since,
"nonce": self.current_manifest.nonce,
}
)
manifest_event = (

View File

@@ -9,6 +9,8 @@ import secrets
import queue
from typing import Any, List, Optional
from datetime import datetime, timedelta, timezone
import jwt
import logging
from fastapi import FastAPI, Header, HTTPException, Request, Response
@@ -16,8 +18,8 @@ from fastapi.concurrency import run_in_threadpool
import asyncio
import sys
from fastapi.middleware.cors import CORSMiddleware
import bcrypt
import hashlib
import hmac
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.errors import RateLimitExceeded
@@ -48,9 +50,16 @@ def _get_pm(request: Request) -> PasswordManager:
def _check_token(request: Request, auth: str | None) -> None:
if auth is None or not auth.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Unauthorized")
token = auth.split(" ", 1)[1].encode()
token_hash = getattr(request.app.state, "token_hash", b"")
if not token_hash or not bcrypt.checkpw(token, token_hash):
token = auth.split(" ", 1)[1]
jwt_secret = getattr(request.app.state, "jwt_secret", "")
token_hash = getattr(request.app.state, "token_hash", "")
try:
jwt.decode(token, jwt_secret, algorithms=["HS256"])
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Unauthorized")
if not hmac.compare_digest(hashlib.sha256(token.encode()).hexdigest(), token_hash):
raise HTTPException(status_code=401, detail="Unauthorized")
@@ -69,7 +78,7 @@ def _reload_relays(request: Request, relays: list[str]) -> None:
def start_server(fingerprint: str | None = None) -> str:
"""Initialize global state and return a random API token.
"""Initialize global state and return a short-lived JWT token.
Parameters
----------
@@ -81,8 +90,10 @@ def start_server(fingerprint: str | None = None) -> str:
else:
pm = PasswordManager(fingerprint=fingerprint)
app.state.pm = pm
raw_token = secrets.token_urlsafe(32)
app.state.token_hash = bcrypt.hashpw(raw_token.encode(), bcrypt.gensalt())
app.state.jwt_secret = secrets.token_urlsafe(32)
payload = {"exp": datetime.now(timezone.utc) + timedelta(minutes=5)}
raw_token = jwt.encode(payload, app.state.jwt_secret, algorithm="HS256")
app.state.token_hash = hashlib.sha256(raw_token.encode()).hexdigest()
if not getattr(app.state, "limiter", None):
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@@ -203,14 +214,13 @@ async def create_entry(
uri = await run_in_threadpool(
pm.entry_manager.add_totp,
entry.get("label"),
pm.KEY_TOTP_DET if entry.get("deterministic", False) else None,
pm.parent_seed,
secret=entry.get("secret"),
index=entry.get("index"),
period=int(entry.get("period", 30)),
digits=int(entry.get("digits", 6)),
notes=entry.get("notes", ""),
archived=entry.get("archived", False),
deterministic=entry.get("deterministic", False),
)
return {"id": index, "uri": uri}
@@ -454,8 +464,7 @@ def export_totp(
_check_token(request, authorization)
_require_password(request, password)
pm = _get_pm(request)
key = getattr(pm, "KEY_TOTP_DET", None) or getattr(pm, "parent_seed", None)
return pm.entry_manager.export_totp_entries(key)
return pm.entry_manager.export_totp_entries(pm.parent_seed)
@app.get("/api/v1/totp")
@@ -473,8 +482,7 @@ def get_totp_codes(
)
codes = []
for idx, label, _u, _url, _arch in entries:
key = getattr(pm, "KEY_TOTP_DET", None) or getattr(pm, "parent_seed", None)
code = pm.entry_manager.get_totp_code(idx, key)
code = pm.entry_manager.get_totp_code(idx, pm.parent_seed)
rem = pm.entry_manager.get_totp_time_remaining(idx)

View File

@@ -30,13 +30,6 @@ no_clipboard_option = typer.Option(
is_flag=True,
)
deterministic_totp_option = typer.Option(
False,
"--deterministic-totp",
help="Derive TOTP secrets deterministically",
is_flag=True,
)
# Sub command groups
from . import entry, vault, nostr, config, fingerprint, util, api
@@ -62,17 +55,12 @@ def main(
ctx: typer.Context,
fingerprint: Optional[str] = fingerprint_option,
no_clipboard: bool = no_clipboard_option,
deterministic_totp: bool = deterministic_totp_option,
) -> None:
"""SeedPass CLI entry point.
When called without a subcommand this launches the interactive TUI.
"""
ctx.obj = {
"fingerprint": fingerprint,
"no_clipboard": no_clipboard,
"deterministic_totp": deterministic_totp,
}
ctx.obj = {"fingerprint": fingerprint, "no_clipboard": no_clipboard}
if ctx.invoked_subcommand is None:
tui = importlib.import_module("main")
raise typer.Exit(tui.main(fingerprint=fingerprint))

View File

@@ -13,25 +13,19 @@ app = typer.Typer(help="Run the API server")
def api_start(ctx: typer.Context, host: str = "127.0.0.1", port: int = 8000) -> None:
"""Start the SeedPass API server."""
token = api_module.start_server(ctx.obj.get("fingerprint"))
typer.echo(
f"API token: {token}\nWARNING: Store this token securely; it cannot be recovered."
)
typer.echo(f"API token: {token}")
uvicorn.run(api_module.app, host=host, port=port)
@app.command("stop")
def api_stop(
token: str = typer.Option(..., help="API token"),
host: str = "127.0.0.1",
port: int = 8000,
) -> None:
def api_stop(ctx: typer.Context, host: str = "127.0.0.1", port: int = 8000) -> None:
"""Stop the SeedPass API server."""
import requests
try:
requests.post(
f"http://{host}:{port}/api/v1/shutdown",
headers={"Authorization": f"Bearer {token}"},
headers={"Authorization": f"Bearer {api_module.app.state.token_hash}"},
timeout=2,
)
except Exception as exc: # pragma: no cover - best effort

View File

@@ -29,8 +29,6 @@ def _get_pm(ctx: typer.Context) -> PasswordManager:
pm = PasswordManager(fingerprint=fp)
if ctx.obj.get("no_clipboard"):
pm.secret_mode_enabled = False
if ctx.obj.get("deterministic_totp"):
pm.deterministic_totp = True
return pm

View File

@@ -177,9 +177,6 @@ def entry_add_totp(
secret: Optional[str] = typer.Option(None, "--secret", help="Import secret"),
period: int = typer.Option(30, "--period", help="TOTP period in seconds"),
digits: int = typer.Option(6, "--digits", help="Number of TOTP digits"),
deterministic_totp: bool = typer.Option(
False, "--deterministic-totp", help="Derive secret deterministically"
),
) -> None:
"""Add a TOTP entry and output the otpauth URI."""
service = _get_entry_service(ctx)
@@ -189,7 +186,6 @@ def entry_add_totp(
secret=secret,
period=period,
digits=digits,
deterministic=deterministic_totp,
)
typer.echo(uri)

View File

@@ -305,10 +305,9 @@ class EntryService:
def get_totp_code(self, entry_id: int) -> str:
with self._lock:
key = getattr(self._manager, "KEY_TOTP_DET", None) or getattr(
self._manager, "parent_seed", None
return self._manager.entry_manager.get_totp_code(
entry_id, self._manager.parent_seed
)
return self._manager.entry_manager.get_totp_code(entry_id, key)
def add_entry(
self,
@@ -363,18 +362,15 @@ class EntryService:
secret: str | None = None,
period: int = 30,
digits: int = 6,
deterministic: bool = False,
) -> str:
with self._lock:
key = self._manager.KEY_TOTP_DET if deterministic else None
uri = self._manager.entry_manager.add_totp(
label,
key,
self._manager.parent_seed,
index=index,
secret=secret,
period=period,
digits=digits,
deterministic=deterministic,
)
self._manager.start_background_vault_sync()
return uri
@@ -519,10 +515,9 @@ class EntryService:
def export_totp_entries(self) -> dict:
with self._lock:
key = getattr(self._manager, "KEY_TOTP_DET", None) or getattr(
self._manager, "parent_seed", None
return self._manager.entry_manager.export_totp_entries(
self._manager.parent_seed
)
return self._manager.entry_manager.export_totp_entries(key)
def display_totp_codes(self) -> None:
with self._lock:

View File

@@ -41,7 +41,7 @@ class ConfigManager:
logger.info("Config file not found; returning defaults")
return {
"relays": list(DEFAULT_NOSTR_RELAYS),
"offline_mode": True,
"offline_mode": False,
"pin_hash": "",
"password_hash": "",
"inactivity_timeout": INACTIVITY_TIMEOUT,
@@ -71,7 +71,7 @@ class ConfigManager:
raise ValueError("Config data must be a dictionary")
# Ensure defaults for missing keys
data.setdefault("relays", list(DEFAULT_NOSTR_RELAYS))
data.setdefault("offline_mode", True)
data.setdefault("offline_mode", False)
data.setdefault("pin_hash", "")
data.setdefault("password_hash", "")
data.setdefault("inactivity_timeout", INACTIVITY_TIMEOUT)

View File

@@ -16,10 +16,8 @@ except Exception: # pragma: no cover - fallback for environments without orjson
import hashlib
import os
import base64
import zlib
from dataclasses import asdict
from pathlib import Path
from typing import Optional, Tuple
from typing import Optional
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.exceptions import InvalidTag
@@ -28,7 +26,6 @@ from termcolor import colored
from utils.file_lock import exclusive_lock
from mnemonic import Mnemonic
from utils.password_prompt import prompt_existing_password
from utils.key_derivation import KdfConfig, CURRENT_KDF_VERSION
# Instantiate the logger
logger = logging.getLogger(__name__)
@@ -92,23 +89,16 @@ class EncryptionManager:
# Track user preference for handling legacy indexes
self._legacy_migrate_flag = True
self.last_migration_performed = False
# Track nonces to detect accidental reuse
self.nonce_crc_table: set[int] = set()
def encrypt_data(self, data: bytes) -> bytes:
"""
Encrypt data using AES-GCM, emitting ``b"V3|" + nonce + ciphertext + tag``.
A fresh 96-bit nonce is generated for each call and tracked via a CRC
table to detect accidental reuse during batch operations.
(2) Encrypts data using the NEW AES-GCM format, prepending a version
header and the nonce. All new data will be in this format.
"""
try:
nonce = os.urandom(12) # 96-bit nonce is recommended for AES-GCM
crc = zlib.crc32(nonce)
if crc in self.nonce_crc_table:
raise ValueError("Nonce reuse detected")
self.nonce_crc_table.add(crc)
ciphertext = self.cipher.encrypt(nonce, data, None)
return b"V3|" + nonce + ciphertext
return b"V2:" + nonce + ciphertext
except Exception as e:
logger.error(f"Failed to encrypt data: {e}", exc_info=True)
raise
@@ -130,21 +120,7 @@ class EncryptionManager:
ctx = f" {context}" if context else ""
try:
# Try the new V3 format first
if encrypted_data.startswith(b"V3|"):
try:
nonce = encrypted_data[3:15]
ciphertext = encrypted_data[15:]
if len(ciphertext) < 16:
logger.error("AES-GCM payload too short")
raise InvalidToken("AES-GCM payload too short")
return self.cipher.decrypt(nonce, ciphertext, None)
except InvalidTag as e:
msg = f"Failed to decrypt{ctx}: invalid key or corrupt file"
logger.error(msg)
raise InvalidToken(msg) from e
# Next try the older V2 format
# Try the new V2 format first
if encrypted_data.startswith(b"V2:"):
try:
nonce = encrypted_data[3:15]
@@ -168,18 +144,19 @@ class EncryptionManager:
logger.error(msg)
raise InvalidToken(msg) from e
# If it's neither V3 nor V2, assume legacy Fernet format
logger.warning("Data is in legacy Fernet format. Attempting migration.")
try:
return self.fernet.decrypt(encrypted_data)
except InvalidToken as e:
logger.error(
"Legacy Fernet decryption failed. Vault may be corrupt or key is incorrect."
)
raise e
# If it's not V2, it must be the legacy Fernet format
else:
logger.warning("Data is in legacy Fernet format. Attempting migration.")
try:
return self.fernet.decrypt(encrypted_data)
except InvalidToken as e:
logger.error(
"Legacy Fernet decryption failed. Vault may be corrupt or key is incorrect."
)
raise e
except (InvalidToken, InvalidTag) as e:
if encrypted_data.startswith(b"V3|") or encrypted_data.startswith(b"V2:"):
if encrypted_data.startswith(b"V2:"):
# Already determined not to be legacy; re-raise
raise
if isinstance(e, InvalidToken) and str(e) == "AES-GCM payload too short":
@@ -254,78 +231,40 @@ class EncryptionManager:
raise ValueError("Invalid path outside fingerprint directory")
return candidate
def encrypt_parent_seed(
self, parent_seed: str, kdf: Optional[KdfConfig] = None
) -> None:
def encrypt_parent_seed(self, parent_seed: str) -> None:
"""Encrypts and saves the parent seed to 'parent_seed.enc'."""
data = parent_seed.encode("utf-8")
self.encrypt_and_save_file(data, self.parent_seed_file, kdf=kdf)
encrypted_data = self.encrypt_data(data) # This now creates V2 format
with exclusive_lock(self.parent_seed_file) as fh:
fh.seek(0)
fh.truncate()
fh.write(encrypted_data)
os.chmod(self.parent_seed_file, 0o600)
logger.info(f"Parent seed encrypted and saved to '{self.parent_seed_file}'.")
def decrypt_parent_seed(self) -> str:
"""Decrypts and returns the parent seed, handling migration."""
with exclusive_lock(self.parent_seed_file) as fh:
fh.seek(0)
blob = fh.read()
encrypted_data = fh.read()
kdf, encrypted_data = self._deserialize(blob)
is_legacy = not (
encrypted_data.startswith(b"V3|") or encrypted_data.startswith(b"V2:")
)
is_legacy = not encrypted_data.startswith(b"V2:")
decrypted_data = self.decrypt_data(encrypted_data, context="seed")
if is_legacy:
logger.info("Parent seed was in legacy format. Re-encrypting to V3 format.")
self.encrypt_parent_seed(decrypted_data.decode("utf-8").strip(), kdf=kdf)
logger.info("Parent seed was in legacy format. Re-encrypting to V2 format.")
self.encrypt_parent_seed(decrypted_data.decode("utf-8").strip())
return decrypted_data.decode("utf-8").strip()
def _serialize(self, kdf: KdfConfig, ciphertext: bytes) -> bytes:
payload = {"kdf": asdict(kdf), "ct": base64.b64encode(ciphertext).decode()}
if USE_ORJSON:
return json_lib.dumps(payload)
return json_lib.dumps(payload, separators=(",", ":")).encode("utf-8")
def _deserialize(self, blob: bytes) -> Tuple[KdfConfig, bytes]:
"""Return ``(KdfConfig, ciphertext)`` from serialized *blob*.
Legacy files stored the raw ciphertext without a JSON wrapper. If
decoding the wrapper fails, treat ``blob`` as the ciphertext and return
a default HKDF configuration.
"""
try:
if USE_ORJSON:
obj = json_lib.loads(blob)
else:
obj = json_lib.loads(blob.decode("utf-8"))
kdf = KdfConfig(**obj.get("kdf", {}))
ct_b64 = obj.get("ct", "")
ciphertext = base64.b64decode(ct_b64)
if ciphertext:
return kdf, ciphertext
except Exception: # pragma: no cover - fall back to legacy path
pass
# Legacy format: ``blob`` already contains the ciphertext
return (
KdfConfig(name="hkdf", version=CURRENT_KDF_VERSION, params={}, salt_b64=""),
blob,
)
def encrypt_and_save_file(
self, data: bytes, relative_path: Path, *, kdf: Optional[KdfConfig] = None
) -> None:
if kdf is None:
kdf = KdfConfig()
def encrypt_and_save_file(self, data: bytes, relative_path: Path) -> None:
file_path = self.resolve_relative_path(relative_path)
file_path.parent.mkdir(parents=True, exist_ok=True)
encrypted_data = self.encrypt_data(data)
payload = self._serialize(kdf, encrypted_data)
with exclusive_lock(file_path) as fh:
fh.seek(0)
fh.truncate()
fh.write(payload)
fh.write(encrypted_data)
fh.flush()
os.fsync(fh.fileno())
os.chmod(file_path, 0o600)
@@ -334,37 +273,20 @@ class EncryptionManager:
file_path = self.resolve_relative_path(relative_path)
with exclusive_lock(file_path) as fh:
fh.seek(0)
blob = fh.read()
_, encrypted_data = self._deserialize(blob)
encrypted_data = fh.read()
return self.decrypt_data(encrypted_data, context=str(relative_path))
def get_file_kdf(self, relative_path: Path) -> KdfConfig:
file_path = self.resolve_relative_path(relative_path)
with exclusive_lock(file_path) as fh:
fh.seek(0)
blob = fh.read()
kdf, _ = self._deserialize(blob)
return kdf
def save_json_data(
self,
data: dict,
relative_path: Optional[Path] = None,
*,
kdf: Optional[KdfConfig] = None,
) -> None:
def save_json_data(self, data: dict, relative_path: Optional[Path] = None) -> None:
if relative_path is None:
relative_path = Path("seedpass_entries_db.json.enc")
if USE_ORJSON:
json_data = json_lib.dumps(data)
else:
json_data = json_lib.dumps(data, separators=(",", ":")).encode("utf-8")
self.encrypt_and_save_file(json_data, relative_path, kdf=kdf)
self.encrypt_and_save_file(json_data, relative_path)
logger.debug(f"JSON data encrypted and saved to '{relative_path}'.")
def load_json_data(
self, relative_path: Optional[Path] = None, *, return_kdf: bool = False
) -> dict | Tuple[dict, KdfConfig]:
def load_json_data(self, relative_path: Optional[Path] = None) -> dict:
"""
Loads and decrypts JSON data, automatically migrating and re-saving
if it's in the legacy format.
@@ -373,21 +295,13 @@ class EncryptionManager:
relative_path = Path("seedpass_entries_db.json.enc")
file_path = self.resolve_relative_path(relative_path)
if not file_path.exists():
empty: dict = {"entries": {}}
if return_kdf:
return empty, KdfConfig(
name="hkdf", version=CURRENT_KDF_VERSION, params={}, salt_b64=""
)
return empty
return {"entries": {}}
with exclusive_lock(file_path) as fh:
fh.seek(0)
blob = fh.read()
encrypted_data = fh.read()
kdf, encrypted_data = self._deserialize(blob)
is_legacy = not (
encrypted_data.startswith(b"V3|") or encrypted_data.startswith(b"V2:")
)
is_legacy = not encrypted_data.startswith(b"V2:")
self.last_migration_performed = False
try:
@@ -402,20 +316,16 @@ class EncryptionManager:
# If it was a legacy file, re-save it in the new format now
if is_legacy and self._legacy_migrate_flag:
logger.info(f"Migrating and re-saving legacy vault file: {file_path}")
self.save_json_data(data, relative_path, kdf=kdf)
self.save_json_data(data, relative_path)
self.update_checksum(relative_path)
self.last_migration_performed = True
if return_kdf:
return data, kdf
return data
except (InvalidToken, InvalidTag) as e:
msg = f"Failed to decrypt or parse data from {file_path}: {e}"
logger.error(msg)
raise InvalidToken(msg) from e
except JSONDecodeError as e:
msg = f"Failed to parse JSON data from {file_path}: {e}"
logger.error(msg)
except (InvalidToken, InvalidTag, JSONDecodeError) as e:
logger.error(
f"FATAL: Could not decrypt or parse data from {file_path}: {e}",
exc_info=True,
)
raise
def get_encrypted_index(self) -> Optional[bytes]:
@@ -450,8 +360,7 @@ class EncryptionManager:
if relative_path is None:
relative_path = Path("seedpass_entries_db.json.enc")
kdf, ciphertext = self._deserialize(encrypted_data)
is_legacy = not (ciphertext.startswith(b"V3|") or ciphertext.startswith(b"V2:"))
is_legacy = not encrypted_data.startswith(b"V2:")
self.last_migration_performed = False
def _process(decrypted: bytes) -> dict:
@@ -477,9 +386,11 @@ class EncryptionManager:
return data
try:
decrypted_data = self.decrypt_data(ciphertext, context=str(relative_path))
decrypted_data = self.decrypt_data(
encrypted_data, context=str(relative_path)
)
data = _process(decrypted_data)
self.save_json_data(data, relative_path, kdf=kdf)
self.save_json_data(data, relative_path) # This always saves in V2 format
self.update_checksum(relative_path)
logger.info("Index file from Nostr was processed and saved successfully.")
self.last_migration_performed = is_legacy
@@ -490,10 +401,10 @@ class EncryptionManager:
"Enter your master password for legacy decryption: "
)
decrypted_data = self.decrypt_legacy(
ciphertext, password, context=str(relative_path)
encrypted_data, password, context=str(relative_path)
)
data = _process(decrypted_data)
self.save_json_data(data, relative_path, kdf=kdf)
self.save_json_data(data, relative_path)
self.update_checksum(relative_path)
logger.warning(
"Index decrypted using legacy password-only key derivation."

View File

@@ -34,7 +34,7 @@ from pathlib import Path
from termcolor import colored
from .migrations import LATEST_VERSION
from .entry_types import EntryType, ALL_ENTRY_TYPES
from .totp import TotpManager, random_totp_secret
from .totp import TotpManager
from utils.fingerprint import generate_fingerprint
from utils.checksum import canonical_json_dumps
from utils.atomic_write import atomic_write
@@ -257,7 +257,7 @@ class EntryManager:
def add_totp(
self,
label: str,
parent_seed: str | bytes | None = None,
parent_seed: str,
*,
archived: bool = False,
secret: str | None = None,
@@ -266,16 +266,13 @@ class EntryManager:
digits: int = 6,
notes: str = "",
tags: list[str] | None = None,
deterministic: bool = False,
) -> str:
"""Add a new TOTP entry and return the provisioning URI."""
entry_id = self.get_next_index()
data = self._load_index()
data.setdefault("entries", {})
if deterministic:
if parent_seed is None:
raise ValueError("Seed required for deterministic TOTP")
if secret is None:
if index is None:
index = self.get_next_totp_index()
secret = TotpManager.derive_secret(parent_seed, index)
@@ -292,11 +289,8 @@ class EntryManager:
"archived": archived,
"notes": notes,
"tags": tags or [],
"deterministic": True,
}
else:
if secret is None:
secret = random_totp_secret()
if not validate_totp_secret(secret):
raise ValueError("Invalid TOTP secret")
entry = {
@@ -310,7 +304,6 @@ class EntryManager:
"archived": archived,
"notes": notes,
"tags": tags or [],
"deterministic": False,
}
data["entries"][str(entry_id)] = entry
@@ -696,10 +689,7 @@ class EntryManager:
return derive_seed_phrase(bip85, seed_index, words)
def get_totp_code(
self,
index: int,
parent_seed: str | bytes | None = None,
timestamp: int | None = None,
self, index: int, parent_seed: str | None = None, timestamp: int | None = None
) -> str:
"""Return the current TOTP code for the specified entry."""
entry = self.retrieve_entry(index)
@@ -709,12 +699,12 @@ class EntryManager:
etype != EntryType.TOTP.value and kind != EntryType.TOTP.value
):
raise ValueError("Entry is not a TOTP entry")
if entry.get("deterministic", False) or "secret" not in entry:
if parent_seed is None:
raise ValueError("Seed required for derived TOTP")
totp_index = int(entry.get("index", 0))
return TotpManager.current_code(parent_seed, totp_index, timestamp)
return TotpManager.current_code_from_secret(entry["secret"], timestamp)
if "secret" in entry:
return TotpManager.current_code_from_secret(entry["secret"], timestamp)
if parent_seed is None:
raise ValueError("Seed required for derived TOTP")
totp_index = int(entry.get("index", 0))
return TotpManager.current_code(parent_seed, totp_index, timestamp)
def get_totp_time_remaining(self, index: int) -> int:
"""Return seconds remaining in the TOTP period for the given entry."""
@@ -729,9 +719,7 @@ class EntryManager:
period = int(entry.get("period", 30))
return TotpManager.time_remaining(period)
def export_totp_entries(
self, parent_seed: str | bytes | None
) -> dict[str, list[dict[str, Any]]]:
def export_totp_entries(self, parent_seed: str) -> dict[str, list[dict[str, Any]]]:
"""Return all TOTP secrets and metadata for external use."""
data = self._load_index()
entries = data.get("entries", {})
@@ -743,13 +731,11 @@ class EntryManager:
label = entry.get("label", "")
period = int(entry.get("period", 30))
digits = int(entry.get("digits", 6))
if entry.get("deterministic", False) or "secret" not in entry:
if parent_seed is None:
raise ValueError("Seed required for deterministic TOTP export")
if "secret" in entry:
secret = entry["secret"]
else:
idx = int(entry.get("index", 0))
secret = TotpManager.derive_secret(parent_seed, idx)
else:
secret = entry["secret"]
uri = TotpManager.make_otpauth_uri(label, secret, period, digits)
exported.append(
{

View File

@@ -15,7 +15,6 @@ import logging
import os
import hashlib
import hmac
import base64
from typing import Optional, Literal, Any
import shutil
import time
@@ -36,7 +35,7 @@ from .entry_management import EntryManager
from .password_generation import PasswordGenerator
from .backup import BackupManager
from .vault import Vault
from .portable_backup import export_backup, import_backup, PortableMode
from .portable_backup import export_backup, import_backup
from cryptography.fernet import InvalidToken
from .totp import TotpManager
from .entry_types import EntryType
@@ -47,9 +46,7 @@ from utils.key_derivation import (
derive_key_from_password_argon2,
derive_index_key,
EncryptionMode,
KdfConfig,
)
from utils.key_hierarchy import kd
from utils.checksum import (
calculate_checksum,
verify_checksum,
@@ -106,6 +103,7 @@ from utils.fingerprint_manager import FingerprintManager
# Import NostrClient
from nostr.client import NostrClient
from nostr.connection import DEFAULT_RELAYS
from nostr.snapshot import MANIFEST_ID_PREFIX
from .config_manager import ConfigManager
from .state_manager import StateManager
from .stats_manager import StatsManager
@@ -232,14 +230,6 @@ class PasswordManager:
verification, ensuring the integrity and confidentiality of the stored password database.
"""
# Class-level fallbacks so attributes exist even if ``__init__`` is bypassed
master_key: bytes | None = None
KEY_STORAGE: bytes | None = None
KEY_INDEX: bytes | None = None
KEY_PW_DERIVE: bytes | None = None
KEY_TOTP_DET: bytes | None = None
deterministic_totp: bool = False
def __init__(
self, fingerprint: Optional[str] = None, *, password: Optional[str] = None
) -> None:
@@ -272,13 +262,6 @@ class PasswordManager:
self._bip85_cache: dict[tuple[int, int], bytes] = {}
self.audit_logger: Optional[AuditLogger] = None
# Derived key hierarchy
self.master_key: bytes | None = None
self.KEY_STORAGE: bytes | None = None
self.KEY_INDEX: bytes | None = None
self.KEY_PW_DERIVE: bytes | None = None
self.KEY_TOTP_DET: bytes | None = None
# Track changes to trigger periodic Nostr sync
self.is_dirty: bool = False
self.last_update: float = time.time()
@@ -287,16 +270,14 @@ class PasswordManager:
self.is_locked: bool = False
self.inactivity_timeout: float = INACTIVITY_TIMEOUT
self.secret_mode_enabled: bool = False
self.deterministic_totp: bool = False
self.clipboard_clear_delay: int = 45
self.offline_mode: bool = True
self.offline_mode: bool = False
self.profile_stack: list[tuple[str, Path, str]] = []
self.last_unlock_duration: float | None = None
self.verbose_timing: bool = False
self._suppress_entry_actions_menu: bool = False
self.last_bip85_idx: int = 0
self.last_sync_ts: int = 0
self.nostr_account_idx: int = 0
self.auth_guard = AuthGuard(self)
# Service composition
@@ -341,30 +322,6 @@ class PasswordManager:
self._bip85_cache.clear()
def derive_key_hierarchy(self, seed_bytes: bytes) -> None:
"""Populate sub-keys from ``seed_bytes`` using HKDF."""
master = kd(seed_bytes, b"seedpass:v1:master")
self.master_key = master
self.KEY_STORAGE = kd(master, b"seedpass:v1:storage")
self.KEY_INDEX = kd(master, b"seedpass:v1:index")
self.KEY_PW_DERIVE = kd(master, b"seedpass:v1:pw")
self.KEY_TOTP_DET = kd(master, b"seedpass:v1:totp")
def ensure_key_hierarchy(self) -> None:
"""Ensure sub-keys are derived from the current parent seed."""
if (
self.KEY_STORAGE is None
or self.KEY_INDEX is None
or self.KEY_PW_DERIVE is None
or self.KEY_TOTP_DET is None
) and getattr(self, "parent_seed", None):
try:
seed_bytes = Bip39SeedGenerator(self.parent_seed).Generate()
except Exception:
seed_bytes = hashlib.sha256(self.parent_seed.encode()).digest()
self.derive_key_hierarchy(seed_bytes)
def ensure_script_checksum(self) -> None:
"""Initialize or verify the checksum of the manager script."""
script_path = Path(__file__).resolve()
@@ -522,12 +479,15 @@ class PasswordManager:
self.setup_encryption_manager(self.fingerprint_dir, password)
self.initialize_bip85()
self.initialize_managers()
self.ensure_key_hierarchy()
self.is_locked = False
self.locked = False
self.update_activity()
if getattr(self, "audit_logger", None) is None and self.KEY_INDEX is not None:
self.audit_logger = AuditLogger(self.KEY_INDEX)
if (
getattr(self, "audit_logger", None) is None
and getattr(self, "_parent_seed_secret", None) is not None
):
key = hashlib.sha256(self.parent_seed.encode("utf-8")).digest()
self.audit_logger = AuditLogger(key)
if (
getattr(self, "config_manager", None)
and self.config_manager.get_quick_unlock()
@@ -624,11 +584,9 @@ class PasswordManager:
"""
try:
choice = input(
"Choose an option:\n"
"1. Paste in an existing seed in full\n"
"2. Enter an existing seed one word at a time\n"
"3. Generate a new seed\n"
"Enter choice (1/2/3): "
"Do you want to (1) Paste in an existing seed in full "
"(2) Enter an existing seed one word at a time or "
"(3) Generate a new seed? (1/2/3): "
).strip()
if choice == "1":
fingerprint = self.setup_existing_seed(method="paste")
@@ -731,9 +689,9 @@ class PasswordManager:
for iter_try in dict.fromkeys(iter_candidates):
try:
if mode == "argon2":
salt = hashlib.sha256(salt_fp.encode()).digest()[:16]
cfg = KdfConfig(salt_b64=base64.b64encode(salt).decode())
seed_key = derive_key_from_password_argon2(password, cfg)
seed_key = derive_key_from_password_argon2(
password, salt_fp
)
else:
seed_key = derive_key_from_password(
password, salt_fp, iterations=iter_try
@@ -760,10 +718,9 @@ class PasswordManager:
password = None
continue
seed_bytes = Bip39SeedGenerator(self.parent_seed).Generate()
self.derive_key_hierarchy(seed_bytes)
key_b64 = base64.urlsafe_b64encode(self.KEY_STORAGE)
self.encryption_manager = EncryptionManager(key_b64, fingerprint_dir)
key = derive_index_key(self.parent_seed)
self.encryption_manager = EncryptionManager(key, fingerprint_dir)
self.vault = Vault(self.encryption_manager, fingerprint_dir)
self.config_manager = ConfigManager(
@@ -814,9 +771,7 @@ class PasswordManager:
)
salt_fp = fingerprint_dir.name
if mode == "argon2":
salt = hashlib.sha256(salt_fp.encode()).digest()[:16]
cfg = KdfConfig(salt_b64=base64.b64encode(salt).decode())
seed_key = derive_key_from_password_argon2(password, cfg)
seed_key = derive_key_from_password_argon2(password, salt_fp)
else:
seed_key = derive_key_from_password(
password, salt_fp, iterations=iterations
@@ -824,7 +779,6 @@ class PasswordManager:
seed_mgr = EncryptionManager(seed_key, fingerprint_dir)
self.parent_seed = seed_mgr.decrypt_parent_seed()
seed_bytes = Bip39SeedGenerator(self.parent_seed).Generate()
self.derive_key_hierarchy(seed_bytes)
self.bip85 = BIP85(seed_bytes)
except Exception as e:
logger.error(f"Failed to load parent seed: {e}", exc_info=True)
@@ -854,10 +808,8 @@ class PasswordManager:
self.fingerprint_dir = account_dir
self.parent_seed = seed
seed_bytes = Bip39SeedGenerator(seed).Generate()
self.derive_key_hierarchy(seed_bytes)
key_b64 = base64.urlsafe_b64encode(self.KEY_STORAGE)
self.encryption_manager = EncryptionManager(key_b64, account_dir)
key = derive_index_key(seed)
self.encryption_manager = EncryptionManager(key, account_dir)
self.vault = Vault(self.encryption_manager, account_dir)
self.initialize_bip85()
@@ -876,13 +828,9 @@ class PasswordManager:
self.current_fingerprint = fp
self.fingerprint_dir = path
self.parent_seed = seed
try:
seed_bytes = Bip39SeedGenerator(seed).Generate()
self.derive_key_hierarchy(seed_bytes)
key_b64 = base64.urlsafe_b64encode(self.KEY_STORAGE)
except Exception:
key_b64 = derive_index_key(seed)
self.encryption_manager = EncryptionManager(key_b64, path)
key = derive_index_key(seed)
self.encryption_manager = EncryptionManager(key, path)
self.vault = Vault(self.encryption_manager, path)
self.initialize_bip85()
@@ -948,14 +896,10 @@ class PasswordManager:
password, selected_fingerprint, iterations=iterations
)
seed_mgr = EncryptionManager(key, fingerprint_dir)
self.vault = Vault(seed_mgr, fingerprint_dir)
self.parent_seed = seed_mgr.decrypt_parent_seed()
seed_bytes = Bip39SeedGenerator(self.parent_seed).Generate()
self.derive_key_hierarchy(seed_bytes)
key_b64 = base64.urlsafe_b64encode(self.KEY_STORAGE)
self.encryption_manager = EncryptionManager(key_b64, fingerprint_dir)
# Initialize EncryptionManager with key and fingerprint_dir
self.encryption_manager = EncryptionManager(key, fingerprint_dir)
self.vault = Vault(self.encryption_manager, fingerprint_dir)
self.parent_seed = self.encryption_manager.decrypt_parent_seed()
# Log the type and content of parent_seed
logger.debug(
@@ -983,12 +927,9 @@ class PasswordManager:
self.notify("No existing seed found. Let's set up a new one!", level="WARNING")
choice = input(
"Choose an option:\n"
"1. Paste in an existing seed in full\n"
"2. Enter an existing seed one word at a time\n"
"3. Generate a new seed\n"
"4. Restore from Nostr\n"
"Enter choice (1/2/3/4): "
"Do you want to (1) Paste in an existing seed in full "
"(2) Enter an existing seed one word at a time, "
"(3) Generate a new seed, or (4) Restore from Nostr? (1/2/3/4): "
).strip()
if choice == "1":
@@ -1094,15 +1035,13 @@ class PasswordManager:
self.fingerprint_manager.current_fingerprint = fingerprint
self.fingerprint_dir = fingerprint_dir
if not getattr(self, "manifest_id", None):
self.manifest_id = None
self.manifest_id = f"{MANIFEST_ID_PREFIX}{fingerprint}"
logging.info(f"Current seed profile set to {fingerprint}")
try:
if password is None:
password = prompt_for_password()
seed_bytes = Bip39SeedGenerator(parent_seed).Generate()
self.derive_key_hierarchy(seed_bytes)
index_key = base64.urlsafe_b64encode(self.KEY_STORAGE)
index_key = derive_index_key(parent_seed)
iterations = (
self.config_manager.get_kdf_iterations()
if getattr(self, "config_manager", None)
@@ -1158,14 +1097,6 @@ class PasswordManager:
print(colored("Please write this down and keep it in a safe place!", "red"))
if confirm_action("Do you want to use this generated seed? (Y/N): "):
# Determine next account index if state manager is available
next_idx = 0
if getattr(self, "state_manager", None) is not None:
try:
next_idx = self.state_manager.state.get("nostr_account_idx", 0) + 1
except Exception:
next_idx = 0
# Add a new fingerprint using the generated seed
try:
fingerprint = self.fingerprint_manager.add_fingerprint(new_seed)
@@ -1198,15 +1129,6 @@ class PasswordManager:
)
sys.exit(1)
# Persist the assigned account index for the new profile
try:
StateManager(fingerprint_dir).update_state(nostr_account_idx=next_idx)
if getattr(self, "state_manager", None) is not None:
self.state_manager.update_state(nostr_account_idx=next_idx)
self.nostr_account_idx = next_idx
except Exception:
pass
# Set the current fingerprint in both PasswordManager and FingerprintManager
self.current_fingerprint = fingerprint
self.fingerprint_manager.current_fingerprint = fingerprint
@@ -1298,9 +1220,7 @@ class PasswordManager:
if password is None:
password = prompt_for_password()
seed_bytes = Bip39SeedGenerator(seed).Generate()
self.derive_key_hierarchy(seed_bytes)
index_key = base64.urlsafe_b64encode(self.KEY_STORAGE)
index_key = derive_index_key(seed)
iterations = (
self.config_manager.get_kdf_iterations()
if getattr(self, "config_manager", None)
@@ -1346,7 +1266,6 @@ class PasswordManager:
"""
try:
seed_bytes = Bip39SeedGenerator(self.parent_seed).Generate()
self.derive_key_hierarchy(seed_bytes)
self.bip85 = BIP85(seed_bytes)
self._bip85_cache = {}
orig_derive = self.bip85.derive_entropy
@@ -1379,9 +1298,6 @@ class PasswordManager:
if not self.encryption_manager:
raise ValueError("EncryptionManager is not initialized.")
# Derive sub-keys if needed
self.ensure_key_hierarchy()
# Reinitialize the managers with the updated EncryptionManager and current fingerprint context
self.config_manager = ConfigManager(
vault=self.vault,
@@ -1414,11 +1330,10 @@ class PasswordManager:
backup_manager=self.backup_manager,
)
pw_bip85 = BIP85(self.KEY_PW_DERIVE)
self.password_generator = PasswordGenerator(
encryption_manager=self.encryption_manager,
parent_seed=self.KEY_PW_DERIVE,
bip85=pw_bip85,
parent_seed=self.parent_seed,
bip85=self.bip85,
policy=self.config_manager.get_password_policy(),
)
@@ -1431,15 +1346,13 @@ class PasswordManager:
self.last_sync_ts = state.get("last_sync_ts", 0)
self.manifest_id = state.get("manifest_id")
self.delta_since = state.get("delta_since", 0)
self.nostr_account_idx = state.get("nostr_account_idx", 0)
else:
relay_list = list(DEFAULT_RELAYS)
self.last_bip85_idx = 0
self.last_sync_ts = 0
self.manifest_id = None
self.delta_since = 0
self.nostr_account_idx = 0
self.offline_mode = bool(config.get("offline_mode", True))
self.offline_mode = bool(config.get("offline_mode", False))
self.inactivity_timeout = config.get(
"inactivity_timeout", INACTIVITY_TIMEOUT
)
@@ -1455,8 +1368,6 @@ class PasswordManager:
offline_mode=self.offline_mode,
config_manager=self.config_manager,
parent_seed=getattr(self, "parent_seed", None),
key_index=self.KEY_INDEX,
account_index=self.nostr_account_idx,
)
if getattr(self, "manifest_id", None) and hasattr(
@@ -1880,7 +1791,7 @@ class PasswordManager:
child_fingerprint=child_fp,
)
print("\nAdd TOTP:")
print("1. Make 2FA")
print("1. Make 2FA (derive from seed)")
print("2. Import 2FA (paste otpauth URI or secret)")
choice = input("Select option or press Enter to go back: ").strip()
if choice == "1":
@@ -1904,29 +1815,18 @@ class PasswordManager:
if tags_input
else []
)
totp_index = self.entry_manager.get_next_totp_index()
entry_id = self.entry_manager.get_next_index()
key = self.KEY_TOTP_DET if self.deterministic_totp else None
totp_index = (
self.entry_manager.get_next_totp_index()
if self.deterministic_totp
else None
)
uri = self.entry_manager.add_totp(
label,
key,
self.parent_seed,
index=totp_index,
period=int(period),
digits=int(digits),
notes=notes,
tags=tags,
deterministic=self.deterministic_totp,
)
if self.deterministic_totp:
secret = TotpManager.derive_secret(key, totp_index or 0)
color_cat = "deterministic"
else:
_lbl, secret, _, _ = TotpManager.parse_otpauth(uri)
color_cat = "default"
secret = TotpManager.derive_secret(self.parent_seed, totp_index)
self.is_dirty = True
self.last_update = time.time()
print(
@@ -1937,7 +1837,7 @@ class PasswordManager:
print(colored("Add this URI to your authenticator app:", "cyan"))
print(colored(uri, "yellow"))
TotpManager.print_qr_code(uri)
print(color_text(f"Secret: {secret}\n", color_cat))
print(color_text(f"Secret: {secret}\n", "deterministic"))
try:
self.start_background_vault_sync()
except Exception as nostr_error:
@@ -1971,13 +1871,12 @@ class PasswordManager:
entry_id = self.entry_manager.get_next_index()
uri = self.entry_manager.add_totp(
label,
None,
self.parent_seed,
secret=secret,
period=period,
digits=digits,
notes=notes,
tags=tags,
deterministic=False,
)
self.is_dirty = True
self.last_update = time.time()
@@ -2734,8 +2633,7 @@ class PasswordManager:
print(colored("Press Enter to return to the menu.", "cyan"))
try:
while True:
key = self.KEY_TOTP_DET or getattr(self, "parent_seed", None)
code = self.entry_manager.get_totp_code(index, key)
code = self.entry_manager.get_totp_code(index, self.parent_seed)
if self.secret_mode_enabled:
if copy_to_clipboard(code, self.clipboard_clear_delay):
print(
@@ -4128,15 +4026,8 @@ class PasswordManager:
def handle_export_database(
self,
dest: Path | None = None,
*,
encrypt: bool | None = None,
) -> Path | None:
"""Export the current database to a portable file.
If ``encrypt`` is ``True`` (default) the payload is encrypted. When
``encrypt`` is ``False`` the export contains plaintext data. When
``encrypt`` is ``None`` the user is prompted interactively.
"""
"""Export the current database to an encrypted portable file."""
try:
fp, parent_fp, child_fp = self.header_fingerprint_args
clear_header_with_notification(
@@ -4146,16 +4037,11 @@ class PasswordManager:
parent_fingerprint=parent_fp,
child_fingerprint=child_fp,
)
if encrypt is None:
encrypt = not confirm_action(
"Export database without encryption? (Y/N): "
)
path = export_backup(
self.vault,
self.backup_manager,
dest,
parent_seed=self.parent_seed,
encrypt=encrypt,
)
print(colored(f"Database exported to '{path}'.", "green"))
audit_logger = getattr(self, "audit_logger", None)
@@ -4170,26 +4056,15 @@ class PasswordManager:
def handle_import_database(self, src: Path) -> None:
"""Import a portable database file, replacing the current index."""
if not (src.name.endswith(".json.enc") or src.name.endswith(".json")):
if not src.name.endswith(".json.enc"):
print(
colored(
"Error: Selected file must be a SeedPass database backup (.json or .json.enc).",
"Error: Selected file must be a SeedPass database backup (.json.enc).",
"red",
)
)
return
# Determine encryption mode for post-processing
mode = None
try:
raw = src.read_bytes()
if src.suffix.endswith(".enc"):
raw = self.vault.encryption_manager.decrypt_data(raw, context=str(src))
wrapper = json.loads(raw.decode("utf-8"))
mode = wrapper.get("encryption_mode")
except Exception:
mode = None
fp, parent_fp, child_fp = self.header_fingerprint_args
clear_header_with_notification(
self,
@@ -4229,30 +4104,12 @@ class PasswordManager:
)
return
if mode == PortableMode.NONE.value:
try:
password = prompt_new_password()
iterations = self.config_manager.get_kdf_iterations()
seed_key = derive_key_from_password(
password, self.current_fingerprint, iterations=iterations
)
seed_mgr = EncryptionManager(seed_key, self.fingerprint_dir)
seed_mgr.encrypt_parent_seed(self.parent_seed)
self.store_hashed_password(password)
except Exception as e:
logging.error(
f"Failed to set master password after import: {e}", exc_info=True
)
print(colored(f"Error: Failed to set master password: {e}", "red"))
return
print(colored("Database imported successfully.", "green"))
self.sync_vault()
def handle_export_totp_codes(self) -> Path | None:
"""Export all 2FA codes to a JSON file for other authenticator apps."""
try:
self.ensure_key_hierarchy()
fp, parent_fp, child_fp = self.header_fingerprint_args
clear_header_with_notification(
self,
@@ -4274,8 +4131,7 @@ class PasswordManager:
secret = entry["secret"]
else:
idx = int(entry.get("index", 0))
key = self.KEY_TOTP_DET or getattr(self, "parent_seed", None)
secret = TotpManager.derive_secret(key, idx)
secret = TotpManager.derive_secret(self.parent_seed, idx)
uri = TotpManager.make_otpauth_uri(label, secret, period, digits)
totp_entries.append(
{
@@ -4446,15 +4302,6 @@ class PasswordManager:
else:
logging.warning("Password verification failed.")
return is_correct
except InvalidToken as e:
logging.error(f"Failed to decrypt config: {e}")
print(
colored(
"Error: Could not decrypt configuration. The password may be incorrect or the file may be corrupted.",
"red",
)
)
return False
except Exception as e:
logging.error(f"Error verifying password: {e}", exc_info=True)
print(colored(f"Error: Failed to verify password: {e}", "red"))
@@ -4521,7 +4368,6 @@ class PasswordManager:
def change_password(self, old_password: str, new_password: str) -> None:
"""Change the master password used for encryption."""
try:
self.ensure_key_hierarchy()
if not self.verify_password(old_password):
raise ValueError("Incorrect password")
@@ -4530,7 +4376,7 @@ class PasswordManager:
config_data = self.config_manager.load_config(require_pin=False)
# Create a new encryption manager with the new password
new_key = base64.urlsafe_b64encode(self.KEY_STORAGE)
new_key = derive_index_key(self.parent_seed)
iterations = self.config_manager.get_kdf_iterations()
seed_key = derive_key_from_password(
@@ -4562,8 +4408,6 @@ class PasswordManager:
relays=relay_list,
config_manager=self.config_manager,
parent_seed=getattr(self, "parent_seed", None),
key_index=self.KEY_INDEX,
account_index=self.nostr_account_idx,
)
if getattr(self, "manifest_id", None) and hasattr(

View File

@@ -131,10 +131,7 @@ class MenuHandler:
if generated:
print(colored("\nGenerated 2FA Codes:", "green"))
for label, idx, period, _ in generated:
key = getattr(pm, "KEY_TOTP_DET", None) or getattr(
pm, "parent_seed", None
)
code = pm.entry_manager.get_totp_code(idx, key)
code = pm.entry_manager.get_totp_code(idx, pm.parent_seed)
remaining = pm.entry_manager.get_totp_time_remaining(idx)
filled = int(20 * (period - remaining) / period)
bar = "[" + "#" * filled + "-" * (20 - filled) + "]"
@@ -152,10 +149,7 @@ class MenuHandler:
if imported_list:
print(colored("\nImported 2FA Codes:", "green"))
for label, idx, period, _ in imported_list:
key = getattr(pm, "KEY_TOTP_DET", None) or getattr(
pm, "parent_seed", None
)
code = pm.entry_manager.get_totp_code(idx, key)
code = pm.entry_manager.get_totp_code(idx, pm.parent_seed)
remaining = pm.entry_manager.get_totp_time_remaining(idx)
filled = int(20 * (period - remaining) / period)
bar = "[" + "#" * filled + "-" * (20 - filled) + "]"

View File

@@ -113,12 +113,10 @@ class PasswordGenerator:
self.bip85 = bip85
self.policy = policy or PasswordPolicy()
if isinstance(parent_seed, (bytes, bytearray)):
self.seed_bytes = bytes(parent_seed)
else:
self.seed_bytes = self.encryption_manager.derive_seed_from_mnemonic(
self.parent_seed
)
# Derive seed bytes from parent_seed using BIP39 (handled by EncryptionManager)
self.seed_bytes = self.encryption_manager.derive_seed_from_mnemonic(
self.parent_seed
)
logger.debug("PasswordGenerator initialized successfully.")
except Exception as e:

View File

@@ -21,7 +21,6 @@ from utils.key_derivation import (
)
from .encryption import EncryptionManager
from utils.checksum import json_checksum, canonical_json_dumps
from .state_manager import StateManager
logger = logging.getLogger(__name__)
@@ -33,7 +32,6 @@ class PortableMode(Enum):
"""Encryption mode for portable exports."""
SEED_ONLY = EncryptionMode.SEED_ONLY.value
NONE = "none"
def _derive_export_key(seed: str) -> bytes:
@@ -49,15 +47,8 @@ def export_backup(
*,
publish: bool = False,
parent_seed: str | None = None,
encrypt: bool = True,
) -> Path:
"""Export the current vault state to a portable file.
When ``encrypt`` is ``True`` (the default) the payload is encrypted with a
key derived from the parent seed. When ``encrypt`` is ``False`` the payload
is written in plaintext and the wrapper records an ``encryption_mode`` of
:data:`PortableMode.NONE`.
"""
"""Export the current vault state to a portable encrypted file."""
if dest_path is None:
ts = int(time.time())
@@ -66,32 +57,24 @@ def export_backup(
dest_path = dest_dir / EXPORT_NAME_TEMPLATE.format(ts=ts)
index_data = vault.load_index()
seed = (
parent_seed
if parent_seed is not None
else vault.encryption_manager.decrypt_parent_seed()
)
key = _derive_export_key(seed)
enc_mgr = EncryptionManager(key, vault.fingerprint_dir)
canonical = canonical_json_dumps(index_data)
if encrypt:
seed = (
parent_seed
if parent_seed is not None
else vault.encryption_manager.decrypt_parent_seed()
)
key = _derive_export_key(seed)
enc_mgr = EncryptionManager(key, vault.fingerprint_dir)
payload_bytes = enc_mgr.encrypt_data(canonical.encode("utf-8"))
mode = PortableMode.SEED_ONLY
cipher = "aes-gcm"
else:
payload_bytes = canonical.encode("utf-8")
mode = PortableMode.NONE
cipher = "none"
payload_bytes = enc_mgr.encrypt_data(canonical.encode("utf-8"))
checksum = json_checksum(index_data)
wrapper = {
"format_version": FORMAT_VERSION,
"created_at": int(time.time()),
"fingerprint": vault.fingerprint_dir.name,
"encryption_mode": mode.value,
"cipher": cipher,
"encryption_mode": PortableMode.SEED_ONLY.value,
"cipher": "aes-gcm",
"checksum": checksum,
"payload": base64.b64encode(payload_bytes).decode("utf-8"),
}
@@ -107,12 +90,10 @@ def export_backup(
enc_file.write_bytes(encrypted)
os.chmod(enc_file, 0o600)
try:
idx = StateManager(vault.fingerprint_dir).state.get("nostr_account_idx", 0)
client = NostrClient(
vault.encryption_manager,
vault.fingerprint_dir.name,
config_manager=backup_manager.config_manager,
account_index=idx,
)
asyncio.run(client.publish_snapshot(encrypted))
except Exception:
@@ -137,24 +118,19 @@ def import_backup(
if wrapper.get("format_version") != FORMAT_VERSION:
raise ValueError("Unsupported backup format")
mode = wrapper.get("encryption_mode")
if wrapper.get("encryption_mode") != PortableMode.SEED_ONLY.value:
raise ValueError("Unsupported encryption mode")
payload = base64.b64decode(wrapper["payload"])
if mode == PortableMode.SEED_ONLY.value:
seed = (
parent_seed
if parent_seed is not None
else vault.encryption_manager.decrypt_parent_seed()
)
key = _derive_export_key(seed)
enc_mgr = EncryptionManager(key, vault.fingerprint_dir)
enc_mgr._legacy_migrate_flag = False
index_bytes = enc_mgr.decrypt_data(payload, context="backup payload")
elif mode == PortableMode.NONE.value:
index_bytes = payload
else:
raise ValueError("Unsupported encryption mode")
seed = (
parent_seed
if parent_seed is not None
else vault.encryption_manager.decrypt_parent_seed()
)
key = _derive_export_key(seed)
enc_mgr = EncryptionManager(key, vault.fingerprint_dir)
enc_mgr._legacy_migrate_flag = False
index_bytes = enc_mgr.decrypt_data(payload, context="backup payload")
index = json.loads(index_bytes.decode("utf-8"))
checksum = json_checksum(index)

View File

@@ -6,6 +6,7 @@ from typing import Optional, TYPE_CHECKING
from termcolor import colored
import seedpass.core.manager as manager_module
from nostr.snapshot import MANIFEST_ID_PREFIX
from utils.password_prompt import prompt_existing_password
@@ -43,7 +44,7 @@ class ProfileService:
pm.fingerprint_manager.current_fingerprint = selected_fingerprint
pm.current_fingerprint = selected_fingerprint
if not getattr(pm, "manifest_id", None):
pm.manifest_id = None
pm.manifest_id = f"{MANIFEST_ID_PREFIX}{selected_fingerprint}"
pm.fingerprint_dir = pm.fingerprint_manager.get_current_fingerprint_dir()
if not pm.fingerprint_dir:
@@ -76,8 +77,6 @@ class ProfileService:
fingerprint=pm.current_fingerprint,
config_manager=getattr(pm, "config_manager", None),
parent_seed=getattr(pm, "parent_seed", None),
key_index=pm.KEY_INDEX,
account_index=pm.nostr_account_idx,
)
if getattr(pm, "manifest_id", None) and hasattr(
pm.nostr_client, "_state_lock"

View File

@@ -26,7 +26,6 @@ class StateManager:
"manifest_id": None,
"delta_since": 0,
"relays": list(DEFAULT_RELAYS),
"nostr_account_idx": 0,
}
with shared_lock(self.state_path) as fh:
fh.seek(0)
@@ -38,7 +37,6 @@ class StateManager:
"manifest_id": None,
"delta_since": 0,
"relays": list(DEFAULT_RELAYS),
"nostr_account_idx": 0,
}
try:
obj = json.loads(data.decode())
@@ -49,7 +47,6 @@ class StateManager:
obj.setdefault("manifest_id", None)
obj.setdefault("delta_since", 0)
obj.setdefault("relays", list(DEFAULT_RELAYS))
obj.setdefault("nostr_account_idx", 0)
return obj
def _save(self, data: dict) -> None:

View File

@@ -2,11 +2,8 @@
from __future__ import annotations
import os
import sys
import time
import base64
from typing import Union
from urllib.parse import quote
from urllib.parse import urlparse, parse_qs, unquote
@@ -17,24 +14,17 @@ import pyotp
from utils import key_derivation
def random_totp_secret(length: int = 20) -> str:
"""Return a random Base32 encoded TOTP secret."""
return base64.b32encode(os.urandom(length)).decode("ascii").rstrip("=")
class TotpManager:
"""Helper methods for TOTP secrets and codes."""
@staticmethod
def derive_secret(seed: Union[str, bytes], index: int) -> str:
"""Derive a TOTP secret from a seed or raw key and index."""
def derive_secret(seed: str, index: int) -> str:
"""Derive a TOTP secret from a BIP39 seed and index."""
return key_derivation.derive_totp_secret(seed, index)
@classmethod
def current_code(
cls, seed: Union[str, bytes], index: int, timestamp: int | None = None
) -> str:
"""Return the TOTP code for the given seed/key and index."""
def current_code(cls, seed: str, index: int, timestamp: int | None = None) -> str:
"""Return the TOTP code for the given seed and index."""
secret = cls.derive_secret(seed, index)
totp = pyotp.TOTP(secret)
if timestamp is None:

View File

@@ -14,7 +14,6 @@ from .encryption import (
USE_ORJSON,
json_lib,
)
from utils.key_derivation import KdfConfig, CURRENT_KDF_VERSION
from utils.password_prompt import prompt_existing_password
@@ -39,11 +38,6 @@ class Vault:
"""Replace the internal encryption manager."""
self.encryption_manager = manager
def _hkdf_kdf(self) -> KdfConfig:
return KdfConfig(
name="hkdf", version=CURRENT_KDF_VERSION, params={}, salt_b64=""
)
# ----- Password index helpers -----
def load_index(self, *, return_migration_flags: bool = False):
"""Return decrypted password index data, applying migrations.
@@ -108,24 +102,10 @@ class Vault:
)
try:
data, kdf = self.encryption_manager.load_json_data(
self.index_file, return_kdf=True
)
data = self.encryption_manager.load_json_data(self.index_file)
migration_performed = getattr(
self.encryption_manager, "last_migration_performed", False
)
if kdf.version < CURRENT_KDF_VERSION:
new_kdf = KdfConfig(
name=kdf.name,
version=CURRENT_KDF_VERSION,
params=kdf.params,
salt_b64=kdf.salt_b64,
)
self.encryption_manager.save_json_data(
data, self.index_file, kdf=new_kdf
)
self.encryption_manager.update_checksum(self.index_file)
migration_performed = True
except LegacyFormatRequiresMigrationError:
print(
colored(
@@ -162,9 +142,7 @@ class Vault:
else:
data = json_lib.loads(decrypted.decode("utf-8"))
if self.encryption_manager._legacy_migrate_flag:
self.encryption_manager.save_json_data(
data, self.index_file, kdf=self._hkdf_kdf()
)
self.encryption_manager.save_json_data(data, self.index_file)
self.encryption_manager.update_checksum(self.index_file)
migration_performed = getattr(
self.encryption_manager, "last_migration_performed", False
@@ -203,9 +181,7 @@ class Vault:
try:
data = apply_migrations(data)
if schema_migrated:
self.encryption_manager.save_json_data(
data, self.index_file, kdf=self._hkdf_kdf()
)
self.encryption_manager.save_json_data(data, self.index_file)
self.encryption_manager.update_checksum(self.index_file)
except Exception as exc: # noqa: BLE001 - surface clear error and restore
if legacy_detected and backup_dir is not None:
@@ -238,9 +214,7 @@ class Vault:
def save_index(self, data: dict) -> None:
"""Encrypt and write password index."""
self.encryption_manager.save_json_data(
data, self.index_file, kdf=self._hkdf_kdf()
)
self.encryption_manager.save_json_data(data, self.index_file)
def get_encrypted_index(self) -> Optional[bytes]:
"""Return the encrypted index bytes if present."""
@@ -278,6 +252,4 @@ class Vault:
def save_config(self, config: dict) -> None:
"""Encrypt and persist configuration."""
self.encryption_manager.save_json_data(
config, self.config_file, kdf=self._hkdf_kdf()
)
self.encryption_manager.save_json_data(config, self.config_file)

View File

@@ -4,7 +4,7 @@ import sys
import pytest
from httpx import ASGITransport, AsyncClient
import bcrypt
import hashlib
sys.path.append(str(Path(__file__).resolve().parents[1]))
@@ -54,7 +54,7 @@ async def client(monkeypatch):
async def test_token_hashed(client):
_, token = client
assert api.app.state.token_hash != token
assert bcrypt.checkpw(token.encode(), api.app.state.token_hash)
assert api.app.state.token_hash == hashlib.sha256(token.encode()).hexdigest()
@pytest.mark.anyio

View File

@@ -53,7 +53,6 @@ async def test_create_and_modify_totp_entry(client):
"digits": 8,
"notes": "n",
"archived": False,
"deterministic": False,
}
res = await cl.put(
@@ -378,7 +377,7 @@ async def test_vault_export_endpoint(client, tmp_path):
out = tmp_path / "out.json"
out.write_text("data")
api.app.state.pm.handle_export_database = lambda *a, **k: out
api.app.state.pm.handle_export_database = lambda: out
headers = {
"Authorization": f"Bearer {token}",

View File

@@ -36,7 +36,6 @@ def test_audit_logger_records_events(monkeypatch, tmp_path):
monkeypatch.setattr(manager_module, "export_backup", lambda *a, **k: dest)
pm.vault = object()
pm.backup_manager = object()
monkeypatch.setattr("seedpass.core.manager.confirm_action", lambda *_a, **_k: True)
pm.handle_export_database(dest)
confirms = iter([True, False])

View File

@@ -20,7 +20,6 @@ def test_switch_fingerprint_triggers_bg_sync(monkeypatch, tmp_path):
pm.current_fingerprint = None
pm.encryption_manager = object()
pm.config_manager = SimpleNamespace(get_quick_unlock=lambda: False)
pm.nostr_account_idx = 0
monkeypatch.setattr("builtins.input", lambda *_a, **_k: "1")
monkeypatch.setattr(

View File

@@ -25,7 +25,7 @@ class DummyPM:
retrieve_entry=lambda idx: {"type": EntryType.PASSWORD.value, "length": 8},
get_totp_code=lambda idx, seed: "123456",
add_entry=lambda label, length, username, url, **kwargs: 1,
add_totp=lambda label, seed, index=None, secret=None, period=30, digits=6, deterministic=False: "totp://",
add_totp=lambda label, seed, index=None, secret=None, period=30, digits=6: "totp://",
add_ssh_key=lambda label, seed, index=None, notes="": 2,
add_pgp_key=lambda label, seed, index=None, key_type="ed25519", user_id="", notes="": 3,
add_nostr_key=lambda label, seed, index=None, notes="": 4,
@@ -42,7 +42,7 @@ class DummyPM:
)
self.parent_seed = "seed"
self.handle_display_totp_codes = lambda: None
self.handle_export_database = lambda path, **kwargs: None
self.handle_export_database = lambda path: None
self.handle_import_database = lambda path: None
self.change_password = lambda *a, **kw: None
self.lock_vault = lambda: None

View File

@@ -65,14 +65,8 @@ runner = CliRunner()
"--digits",
"7",
],
("Label", None),
{
"index": 1,
"secret": "abc",
"period": 45,
"digits": 7,
"deterministic": False,
},
("Label", "seed"),
{"index": 1, "secret": "abc", "period": 45, "digits": 7},
"otpauth://uri",
),
(

View File

@@ -17,8 +17,8 @@ def _setup_pm(tmp_path: Path):
cfg = ConfigManager(vault, tmp_path)
backup = BackupManager(tmp_path, cfg)
pm = SimpleNamespace(
handle_export_database=lambda p, encrypt=True: export_backup(
vault, backup, p, parent_seed=TEST_SEED, encrypt=encrypt
handle_export_database=lambda p: export_backup(
vault, backup, p, parent_seed=TEST_SEED
),
handle_import_database=lambda p: import_backup(
vault, backup, p, parent_seed=TEST_SEED
@@ -91,36 +91,3 @@ def test_cli_import_round_trip(monkeypatch, tmp_path):
rc = main.main(["import", "--file", str(export_path)])
assert rc == 0
assert vault.load_index() == original
def test_cli_export_import_unencrypted(monkeypatch, tmp_path):
pm, vault = _setup_pm(tmp_path)
data = {
"schema_version": 4,
"entries": {
"0": {
"label": "example",
"type": "password",
"notes": "",
"custom_fields": [],
"origin": "",
"tags": [],
}
},
}
vault.save_index(data)
monkeypatch.setattr(main, "PasswordManager", lambda *a, **k: pm)
monkeypatch.setattr(main, "configure_logging", lambda: None)
monkeypatch.setattr(main, "initialize_app", lambda: None)
monkeypatch.setattr(main.signal, "signal", lambda *a, **k: None)
export_path = tmp_path / "out.json"
rc = main.main(["export", "--file", str(export_path), "--unencrypted"])
assert rc == 0
assert export_path.exists()
vault.save_index({"schema_version": 4, "entries": {}})
rc = main.main(["import", "--file", str(export_path)])
assert rc == 0
assert vault.load_index() == data

View File

@@ -3,15 +3,11 @@ from pathlib import Path
from hypothesis import given, strategies as st, settings, HealthCheck
from mnemonic import Mnemonic
import hashlib
import base64
import os
from utils.key_derivation import (
derive_key_from_password,
derive_key_from_password_argon2,
derive_index_key,
KdfConfig,
)
from utils.fingerprint import generate_fingerprint
from seedpass.core.encryption import EncryptionManager
@@ -40,27 +36,16 @@ def test_fuzz_key_round_trip(password, seed_bytes, config, mode, tmp_path: Path)
seed_phrase = Mnemonic("english").to_mnemonic(seed_bytes)
fp = generate_fingerprint(seed_phrase)
if mode == "argon2":
cfg = KdfConfig(
params={"time_cost": 1, "memory_cost": 8, "parallelism": 1},
salt_b64=base64.b64encode(
hashlib.sha256(fp.encode()).digest()[:16]
).decode(),
key = derive_key_from_password_argon2(
password, fp, time_cost=1, memory_cost=8, parallelism=1
)
key = derive_key_from_password_argon2(password, cfg)
else:
key = derive_key_from_password(password, fp, iterations=1)
cfg = KdfConfig(
name="pbkdf2",
params={"iterations": 1},
salt_b64=base64.b64encode(
hashlib.sha256(fp.encode()).digest()[:16]
).decode(),
)
enc_mgr = EncryptionManager(key, tmp_path)
# Parent seed round trip
enc_mgr.encrypt_parent_seed(seed_phrase, kdf=cfg)
enc_mgr.encrypt_parent_seed(seed_phrase)
assert enc_mgr.decrypt_parent_seed() == seed_phrase
# JSON data round trip

View File

@@ -33,9 +33,7 @@ class FakeEntries:
self.added.append(("password", label, length, username, url))
return 1
def add_totp(
self, label, deterministic=False, index=None, secret=None, period=30, digits=6
):
def add_totp(self, label):
self.added.append(("totp", label))
return 1

View File

@@ -1,6 +1,4 @@
import bcrypt
import hashlib
import base64
from pathlib import Path
from tempfile import TemporaryDirectory
from types import SimpleNamespace
@@ -9,7 +7,6 @@ from utils.key_derivation import (
derive_key_from_password,
derive_key_from_password_argon2,
derive_index_key,
KdfConfig,
)
from seedpass.core.encryption import EncryptionManager
from seedpass.core.vault import Vault
@@ -24,24 +21,10 @@ def _setup_profile(tmp: Path, mode: str):
argon_kwargs = dict(time_cost=1, memory_cost=8, parallelism=1)
fp = tmp.name
if mode == "argon2":
cfg = KdfConfig(
params=argon_kwargs,
salt_b64=base64.b64encode(
hashlib.sha256(fp.encode()).digest()[:16]
).decode(),
)
seed_key = derive_key_from_password_argon2(TEST_PASSWORD, cfg)
EncryptionManager(seed_key, tmp).encrypt_parent_seed(TEST_SEED, kdf=cfg)
seed_key = derive_key_from_password_argon2(TEST_PASSWORD, fp, **argon_kwargs)
else:
seed_key = derive_key_from_password(TEST_PASSWORD, fp, iterations=1)
cfg = KdfConfig(
name="pbkdf2",
params={"iterations": 1},
salt_b64=base64.b64encode(
hashlib.sha256(fp.encode()).digest()[:16]
).decode(),
)
EncryptionManager(seed_key, tmp).encrypt_parent_seed(TEST_SEED, kdf=cfg)
EncryptionManager(seed_key, tmp).encrypt_parent_seed(TEST_SEED)
index_key = derive_index_key(TEST_SEED)
enc_mgr = EncryptionManager(index_key, tmp)
@@ -82,9 +65,9 @@ def test_setup_encryption_manager_kdf_modes(monkeypatch):
)
if mode == "argon2":
monkeypatch.setattr(
"seedpass.core.manager.KdfConfig",
lambda salt_b64, **_: KdfConfig(
params=argon_kwargs, salt_b64=salt_b64
"seedpass.core.manager.derive_key_from_password_argon2",
lambda pw, fp: derive_key_from_password_argon2(
pw, fp, **argon_kwargs
),
)
monkeypatch.setattr(PasswordManager, "initialize_bip85", lambda self: None)
@@ -93,26 +76,3 @@ def test_setup_encryption_manager_kdf_modes(monkeypatch):
)
assert pm.setup_encryption_manager(path, exit_on_fail=False)
assert pm.parent_seed == TEST_SEED
def test_kdf_param_round_trip(tmp_path):
cfg = KdfConfig(
params={"time_cost": 3, "memory_cost": 32, "parallelism": 1},
salt_b64=base64.b64encode(b"static-salt-1234").decode(),
)
key = derive_key_from_password_argon2(TEST_PASSWORD, cfg)
mgr = EncryptionManager(key, tmp_path)
mgr.encrypt_parent_seed(TEST_SEED, kdf=cfg)
stored = mgr.get_file_kdf(Path("parent_seed.enc"))
assert stored.params == cfg.params
def test_vault_kdf_migration(tmp_path):
index_key = derive_index_key(TEST_SEED)
mgr = EncryptionManager(index_key, tmp_path)
vault = Vault(mgr, tmp_path)
old_kdf = KdfConfig(name="hkdf", version=0, params={}, salt_b64="")
mgr.save_json_data({"entries": {}}, vault.index_file, kdf=old_kdf)
vault.load_index()
new_kdf = mgr.get_file_kdf(vault.index_file)
assert new_kdf.version == KdfConfig().version

View File

@@ -1,19 +0,0 @@
from pathlib import Path
from tempfile import TemporaryDirectory
from types import SimpleNamespace
from helpers import create_vault, TEST_SEED, TEST_PASSWORD
from seedpass.core.config_manager import ConfigManager
from main import handle_set_kdf_iterations
def test_kdf_strength_slider_persists(monkeypatch):
with TemporaryDirectory() as tmpdir:
tmp_path = Path(tmpdir)
vault, enc_mgr = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD)
cfg_mgr = ConfigManager(vault, tmp_path)
pm = SimpleNamespace(config_manager=cfg_mgr)
inputs = iter(["3"])
monkeypatch.setattr("builtins.input", lambda *_: next(inputs))
handle_set_kdf_iterations(pm)
assert cfg_mgr.get_kdf_iterations() == 100_000

View File

@@ -1,15 +1,11 @@
import logging
import pytest
import logging
import hashlib
import base64
from utils.fingerprint import generate_fingerprint
from utils.key_derivation import (
derive_key_from_password,
derive_key_from_password_argon2,
derive_index_key_seed_only,
derive_index_key,
KdfConfig,
)
@@ -52,17 +48,15 @@ def test_argon2_fingerprint_affects_key():
fp1 = generate_fingerprint("seed one")
fp2 = generate_fingerprint("seed two")
cfg1 = KdfConfig(
params={"time_cost": 1, "memory_cost": 8, "parallelism": 1},
salt_b64=base64.b64encode(hashlib.sha256(fp1.encode()).digest()[:16]).decode(),
k1 = derive_key_from_password_argon2(
password, fp1, time_cost=1, memory_cost=8, parallelism=1
)
cfg2 = KdfConfig(
params={"time_cost": 1, "memory_cost": 8, "parallelism": 1},
salt_b64=base64.b64encode(hashlib.sha256(fp2.encode()).digest()[:16]).decode(),
k2 = derive_key_from_password_argon2(
password, fp1, time_cost=1, memory_cost=8, parallelism=1
)
k3 = derive_key_from_password_argon2(
password, fp2, time_cost=1, memory_cost=8, parallelism=1
)
k1 = derive_key_from_password_argon2(password, cfg1)
k2 = derive_key_from_password_argon2(password, cfg1)
k3 = derive_key_from_password_argon2(password, cfg2)
assert k1 == k2
assert k1 != k3

View File

@@ -1,19 +0,0 @@
import base64
from bip_utils import Bip39SeedGenerator
from utils.key_hierarchy import kd
from utils.key_derivation import derive_index_key
def test_kd_distinct_infos():
root = b"root" * 8
k1 = kd(root, b"info1")
k2 = kd(root, b"info2")
assert k1 != k2
def test_derive_index_key_matches_hierarchy():
seed = "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about"
seed_bytes = Bip39SeedGenerator(seed).Generate()
master = kd(seed_bytes, b"seedpass:v1:master")
expected = base64.urlsafe_b64encode(kd(master, b"seedpass:v1:storage"))
assert derive_index_key(seed) == expected

View File

@@ -1,5 +1,4 @@
import json
import base64
import hashlib
from pathlib import Path
@@ -83,7 +82,7 @@ def test_failed_migration_restores_legacy(monkeypatch, tmp_path: Path):
assert not vault.migrated_from_legacy
def test_migrated_index_has_v3_prefix(monkeypatch, tmp_path: Path):
def test_migrated_index_has_v2_prefix(monkeypatch, tmp_path: Path):
vault, _ = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD)
key = derive_index_key(TEST_SEED)
@@ -100,8 +99,7 @@ def test_migrated_index_has_v3_prefix(monkeypatch, tmp_path: Path):
vault.load_index()
new_file = tmp_path / "seedpass_entries_db.json.enc"
payload = json.loads(new_file.read_text())
assert base64.b64decode(payload["ct"]).startswith(b"V3|")
assert new_file.read_bytes().startswith(b"V2:")
assert vault.migrated_from_legacy
@@ -156,14 +154,6 @@ def test_migration_syncs_when_confirmed(monkeypatch, tmp_path: Path):
pm.fingerprint_dir = tmp_path
pm.current_fingerprint = tmp_path.name
pm.bip85 = SimpleNamespace()
from seedpass.core.config_manager import ConfigManager
cfg_mgr = ConfigManager(pm.vault, tmp_path)
cfg = cfg_mgr.load_config(require_pin=False)
cfg["offline_mode"] = False
cfg_mgr.save_config(cfg)
pm.config_manager = cfg_mgr
pm.offline_mode = False
calls = {"sync": 0}
pm.sync_vault = lambda *a, **k: calls.__setitem__("sync", calls["sync"] + 1) or {
@@ -287,7 +277,6 @@ def test_legacy_index_reinit_syncs_once_when_confirmed(monkeypatch, tmp_path: Pa
pm.fingerprint_dir = tmp_path
pm.current_fingerprint = tmp_path.name
pm.bip85 = SimpleNamespace()
pm.offline_mode = True
monkeypatch.setattr(
"seedpass.core.manager.NostrClient", lambda *a, **k: SimpleNamespace()
@@ -305,7 +294,7 @@ def test_legacy_index_reinit_syncs_once_when_confirmed(monkeypatch, tmp_path: Pa
pm.initialize_managers()
pm.initialize_managers()
assert calls["sync"] == 0
assert calls["sync"] == 1
assert enc_mgr.last_migration_performed is False
@@ -325,13 +314,6 @@ def test_schema_migration_no_sync_prompt(monkeypatch, tmp_path: Path):
pm.fingerprint_dir = tmp_path
pm.current_fingerprint = tmp_path.name
pm.bip85 = SimpleNamespace()
from seedpass.core.config_manager import ConfigManager
cfg_mgr = ConfigManager(pm.vault, tmp_path)
cfg = cfg_mgr.load_config(require_pin=False)
cfg["offline_mode"] = False
cfg_mgr.save_config(cfg)
pm.config_manager = cfg_mgr
pm.offline_mode = False
calls = {"sync": 0, "confirm": 0}

View File

@@ -66,5 +66,5 @@ def test_migrate_iterations(tmp_path, monkeypatch, iterations):
cfg = ConfigManager(vault, tmp_path)
assert cfg.get_kdf_iterations() == iterations
payload = json.loads((tmp_path / "seedpass_entries_db.json.enc").read_text())
assert base64.b64decode(payload["ct"]).startswith(b"V3|")
content = (tmp_path / "seedpass_entries_db.json.enc").read_bytes()
assert content.startswith(b"V2:")

View File

@@ -50,6 +50,6 @@ def test_migrate_legacy_sets_flag(tmp_path, monkeypatch):
monkeypatch.setattr(vault_module, "prompt_existing_password", lambda _: password)
monkeypatch.setattr("builtins.input", lambda _: "2")
vault.load_index()
payload = json.loads((tmp_path / "seedpass_entries_db.json.enc").read_text())
assert base64.b64decode(payload["ct"]).startswith(b"V3|")
content = (tmp_path / "seedpass_entries_db.json.enc").read_bytes()
assert content.startswith(b"V2:")
assert vault.encryption_manager.last_migration_performed is True

View File

@@ -1,5 +1,4 @@
import json
import base64
import hashlib
from pathlib import Path
from types import SimpleNamespace
@@ -35,8 +34,7 @@ def test_legacy_migration_second_session(monkeypatch, tmp_path: Path) -> None:
monkeypatch.setattr("builtins.input", lambda *_a, **_k: "y")
vault.load_index()
new_file = fp_dir / "seedpass_entries_db.json.enc"
payload = json.loads(new_file.read_text())
assert base64.b64decode(payload["ct"]).startswith(b"V3|")
assert new_file.read_bytes().startswith(b"V2:")
new_enc_mgr = EncryptionManager(key, fp_dir)
new_vault = Vault(new_enc_mgr, fp_dir)
@@ -61,5 +59,4 @@ def test_legacy_migration_second_session(monkeypatch, tmp_path: Path) -> None:
)
pm.initialize_managers()
payload = json.loads(new_file.read_text())
assert base64.b64decode(payload["ct"]).startswith(b"V3|")
assert new_file.read_bytes().startswith(b"V2:")

View File

@@ -60,11 +60,15 @@ def test_handle_add_totp(monkeypatch, capsys):
out = capsys.readouterr().out
entry = entry_mgr.retrieve_entry(0)
assert entry["type"] == "totp"
assert entry["kind"] == "totp"
assert entry["label"] == "Example"
assert entry["deterministic"] is False
assert "index" not in entry
assert "secret" in entry
assert len(entry["secret"]) >= 16
assert entry == {
"type": "totp",
"kind": "totp",
"label": "Example",
"index": 0,
"period": 30,
"digits": 6,
"archived": False,
"notes": "",
"tags": [],
}
assert "ID 0" in out

View File

@@ -32,7 +32,7 @@ def test_handle_display_totp_codes(monkeypatch, capsys, password_manager):
pm.handle_display_totp_codes()
out = capsys.readouterr().out
assert "Imported 2FA Codes" in out
assert "Generated 2FA Codes" in out
assert "[0] Example" in out
assert "123456" in out

View File

@@ -1,18 +0,0 @@
import asyncio
from helpers import dummy_nostr_client
def test_published_events_no_fingerprint(dummy_nostr_client):
client, relay = dummy_nostr_client
asyncio.run(client.publish_snapshot(b"secret"))
fingerprint = "fp"
events = list(relay.manifests) + list(relay.chunks.values())
seen = set()
for ev in events:
if id(ev) in seen:
continue
seen.add(id(ev))
assert fingerprint not in ev.id
for tag in getattr(ev, "tags", []):
assert fingerprint not in tag

View File

@@ -5,7 +5,6 @@ from tempfile import TemporaryDirectory
from seedpass.core.manager import PasswordManager
from utils.fingerprint_manager import FingerprintManager
from utils.fingerprint import generate_fingerprint
from seedpass.core.state_manager import StateManager
VALID_SEED = "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about"
@@ -14,7 +13,6 @@ def setup_pm(tmp_path, monkeypatch):
pm = PasswordManager.__new__(PasswordManager)
pm.fingerprint_manager = FingerprintManager(tmp_path)
pm.config_manager = type("Cfg", (), {"get_kdf_iterations": lambda self: 1})()
pm.state_manager = StateManager(tmp_path)
monkeypatch.setattr("seedpass.core.manager.prompt_for_password", lambda: "pw")
monkeypatch.setattr("seedpass.core.manager.derive_index_key", lambda seed: b"idx")
monkeypatch.setattr(
@@ -51,5 +49,3 @@ def test_generate_new_seed_creates_profile(monkeypatch):
assert fingerprint == generate_fingerprint(VALID_SEED)
assert pm.fingerprint_manager.list_fingerprints() == [fingerprint]
sm = StateManager(tmp_path / fingerprint)
assert sm.state["nostr_account_idx"] == 1

View File

@@ -1,19 +0,0 @@
from pathlib import Path
from helpers import TEST_SEED
from utils.key_derivation import derive_index_key
from seedpass.core.encryption import EncryptionManager
def test_nonce_uniqueness(tmp_path: Path) -> None:
key = derive_index_key(TEST_SEED)
manager = EncryptionManager(key, tmp_path)
plaintext = b"repeat"
nonces = set()
for _ in range(10):
payload = manager.encrypt_data(plaintext)
assert payload.startswith(b"V3|")
nonce = payload[3:15]
assert nonce not in nonces
nonces.add(nonce)
assert len(nonces) == 10

View File

@@ -5,6 +5,7 @@ import json
from helpers import DummyEvent, DummyFilter, dummy_nostr_client
from nostr.backup_models import KIND_MANIFEST, KIND_SNAPSHOT_CHUNK
from nostr.client import MANIFEST_ID_PREFIX
from nostr_sdk import Keys
@@ -54,7 +55,9 @@ def test_fetch_snapshot_legacy_key_fallback(dummy_nostr_client, monkeypatch):
],
}
)
manifest_event = DummyEvent(KIND_MANIFEST, manifest_json, tags=["legacy"])
manifest_event = DummyEvent(
KIND_MANIFEST, manifest_json, tags=[f"{MANIFEST_ID_PREFIX}fp"]
)
chunk_event = DummyEvent(
KIND_SNAPSHOT_CHUNK,
base64.b64encode(chunk_bytes).decode("utf-8"),
@@ -66,9 +69,9 @@ def test_fetch_snapshot_legacy_key_fallback(dummy_nostr_client, monkeypatch):
async def fake_fetch_events(f, _timeout):
call["count"] += 1
call["authors"].append(getattr(f, "author_pk", None))
if call["count"] == 1:
if call["count"] <= 2:
return type("R", (), {"to_vec": lambda self: []})()
elif call["count"] == 2:
elif call["count"] == 3:
return type("R", (), {"to_vec": lambda self: [manifest_event]})()
else:
return type("R", (), {"to_vec": lambda self: [chunk_event]})()

View File

@@ -0,0 +1,49 @@
import asyncio
from helpers import TEST_SEED, dummy_nostr_client
from nostr.backup_models import KIND_MANIFEST
from nostr.client import MANIFEST_ID_PREFIX, NostrClient
def test_fetch_latest_snapshot_legacy_identifier(dummy_nostr_client, monkeypatch):
client, relay = dummy_nostr_client
data = b"legacy"
asyncio.run(client.publish_snapshot(data))
relay.manifests[-1].tags = [MANIFEST_ID_PREFIX.rstrip("-")]
relay.filters.clear()
orig_fetch = relay.fetch_events
async def fetch_events(self, f, timeout):
identifier = f.ids[0] if getattr(f, "ids", None) else None
kind = getattr(f, "kind_val", None)
if kind == KIND_MANIFEST:
events = [m for m in self.manifests if identifier in m.tags]
self.filters.append(f)
class Res:
def __init__(self, evs):
self._evs = evs
def to_vec(self):
return self._evs
return Res(events)
return await orig_fetch(f, timeout)
monkeypatch.setattr(
relay, "fetch_events", fetch_events.__get__(relay, relay.__class__)
)
enc_mgr = client.encryption_manager
monkeypatch.setattr(
enc_mgr, "decrypt_parent_seed", lambda: TEST_SEED, raising=False
)
monkeypatch.setattr("nostr.client.KeyManager", type(client.key_manager))
client2 = NostrClient(enc_mgr, "fp")
relay.filters.clear()
result = asyncio.run(client2.fetch_latest_snapshot())
assert result is not None
ids = [f.ids[0] for f in relay.filters]
assert ids[0] == f"{MANIFEST_ID_PREFIX}fp"
assert MANIFEST_ID_PREFIX.rstrip("-") in ids

View File

@@ -1,14 +0,0 @@
from pathlib import Path
from tempfile import TemporaryDirectory
from seedpass.core.config_manager import ConfigManager
from helpers import create_vault, TEST_SEED, TEST_PASSWORD
def test_offline_mode_default_enabled():
with TemporaryDirectory() as tmpdir:
tmp_path = Path(tmpdir)
vault, enc_mgr = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD)
cfg_mgr = ConfigManager(vault, tmp_path)
config = cfg_mgr.load_config(require_pin=False)
assert config["offline_mode"] is True

View File

@@ -35,7 +35,6 @@ def test_change_password_triggers_nostr_backup(monkeypatch):
pm.parent_seed = TEST_SEED
pm.store_hashed_password = lambda pw: None
pm.verify_password = lambda pw: True
pm.nostr_account_idx = 0
with patch("seedpass.core.manager.NostrClient") as MockClient:
mock_instance = MockClient.return_value

View File

@@ -62,7 +62,6 @@ def test_password_change_and_unlock(monkeypatch):
pm.nostr_client = SimpleNamespace(
publish_snapshot=lambda *a, **k: (None, "abcd")
)
pm.nostr_account_idx = 0
monkeypatch.setattr(
"seedpass.core.manager.prompt_existing_password", lambda *_: old_pw

View File

@@ -15,7 +15,6 @@ from seedpass.core.vault import Vault
from seedpass.core.backup import BackupManager
from seedpass.core.config_manager import ConfigManager
from seedpass.core.portable_backup import export_backup, import_backup
from seedpass.core.portable_backup import PortableMode
from utils.key_derivation import derive_index_key, derive_key_from_password
from utils.fingerprint import generate_fingerprint
@@ -55,22 +54,6 @@ def test_round_trip(monkeypatch):
assert vault.load_index()["pw"] == data["pw"]
def test_round_trip_unencrypted(monkeypatch):
with TemporaryDirectory() as td:
tmp = Path(td)
vault, backup, _ = setup_vault(tmp)
data = {"pw": 1}
vault.save_index(data)
path = export_backup(vault, backup, parent_seed=SEED, encrypt=False)
wrapper = json.loads(path.read_text())
assert wrapper["encryption_mode"] == PortableMode.NONE.value
vault.save_index({"pw": 0})
import_backup(vault, backup, path, parent_seed=SEED)
assert vault.load_index()["pw"] == data["pw"]
from cryptography.fernet import InvalidToken

View File

@@ -20,7 +20,6 @@ def setup_pm(tmp_path):
pm.encryption_mode = manager_module.EncryptionMode.SEED_ONLY
pm.fingerprint_manager = manager_module.FingerprintManager(constants.APP_DIR)
pm.current_fingerprint = None
pm.state_manager = manager_module.StateManager(constants.APP_DIR)
return pm, constants, manager_module
@@ -42,8 +41,8 @@ def test_generate_seed_cleanup_on_failure(monkeypatch):
# fingerprint list should be empty and only fingerprints.json should remain
assert pm.fingerprint_manager.list_fingerprints() == []
contents = sorted(p.name for p in const.APP_DIR.iterdir())
assert contents == ["fingerprints.json", "seedpass_state.json"]
contents = list(const.APP_DIR.iterdir())
assert len(contents) == 1 and contents[0].name == "fingerprints.json"
fp_file = pm.fingerprint_manager.fingerprints_file
with open(fp_file) as f:
data = json.load(f)

View File

@@ -29,7 +29,6 @@ def test_add_and_switch_fingerprint(monkeypatch):
pm.fingerprint_manager = fm
pm.encryption_manager = object()
pm.current_fingerprint = None
pm.nostr_account_idx = 0
monkeypatch.setattr("builtins.input", lambda *_args, **_kwargs: "1")
monkeypatch.setattr(

View File

@@ -82,11 +82,9 @@ def test_publish_snapshot_success():
with patch.object(
client.client, "send_event", side_effect=fake_send
) as mock_send:
with patch("nostr.snapshot.new_manifest_id", return_value=("id", b"nonce")):
manifest, event_id = asyncio.run(client.publish_snapshot(b"data"))
manifest, event_id = asyncio.run(client.publish_snapshot(b"data"))
assert isinstance(manifest, Manifest)
assert event_id == "id"
assert manifest.nonce == base64.b64encode(b"nonce").decode("utf-8")
assert event_id == "seedpass-manifest-fp"
assert mock_send.await_count >= 1

View File

@@ -21,7 +21,6 @@ def setup_password_manager():
pm.fingerprint_manager = manager_module.FingerprintManager(constants.APP_DIR)
pm.current_fingerprint = None
pm.save_and_encrypt_seed = lambda seed, fingerprint_dir: None
pm.state_manager = manager_module.StateManager(constants.APP_DIR)
return pm, constants

View File

@@ -1,6 +1,4 @@
import sys
import json
import base64
from pathlib import Path
from cryptography.fernet import Fernet
@@ -30,5 +28,4 @@ def test_parent_seed_migrates_from_fernet(tmp_path: Path) -> None:
assert new_file.exists()
assert new_file.read_bytes() != encrypted
payload = json.loads(new_file.read_text())
assert base64.b64decode(payload["ct"]).startswith(b"V3|")
assert new_file.read_bytes().startswith(b"V2:")

View File

@@ -120,7 +120,6 @@ def test_profile_service_switch(monkeypatch):
pm.delta_since = None
pm.encryption_manager = SimpleNamespace()
pm.parent_seed = TEST_SEED
pm.nostr_account_idx = 0
service = ProfileService(pm)
monkeypatch.setattr("builtins.input", lambda *_: "2")

View File

@@ -14,7 +14,6 @@ def test_state_manager_round_trip():
assert state["last_sync_ts"] == 0
assert state["manifest_id"] is None
assert state["delta_since"] == 0
assert state["nostr_account_idx"] == 0
sm.add_relay("wss://example.com")
sm.update_state(
@@ -31,7 +30,6 @@ def test_state_manager_round_trip():
assert state2["last_sync_ts"] == 123
assert state2["manifest_id"] == "mid"
assert state2["delta_since"] == 111
assert state2["nostr_account_idx"] == 0
sm2.remove_relay(1) # remove first default relay
assert len(sm2.list_relays()) == len(DEFAULT_RELAYS)

View File

@@ -28,19 +28,23 @@ def test_add_totp_and_get_code():
assert uri.startswith("otpauth://totp/")
entry = entry_mgr.retrieve_entry(0)
assert entry["deterministic"] is False
assert "secret" in entry
assert entry == {
"type": "totp",
"kind": "totp",
"label": "Example",
"index": 0,
"period": 30,
"digits": 6,
"archived": False,
"notes": "",
"tags": [],
}
code = entry_mgr.get_totp_code(0, timestamp=0)
code = entry_mgr.get_totp_code(0, TEST_SEED, timestamp=0)
expected = pyotp.TOTP(entry["secret"]).at(0)
expected = TotpManager.current_code(TEST_SEED, 0, timestamp=0)
assert code == expected
# second entry should have different secret
entry_mgr.add_totp("Other", TEST_SEED)
entry2 = entry_mgr.retrieve_entry(1)
assert entry["secret"] != entry2["secret"]
def test_totp_time_remaining(monkeypatch):
with TemporaryDirectory() as tmpdir:
@@ -64,8 +68,17 @@ def test_add_totp_imported(tmp_path):
secret = "JBSWY3DPEHPK3PXP"
em.add_totp("Imported", TEST_SEED, secret=secret)
entry = em.retrieve_entry(0)
assert entry["secret"] == secret
assert entry["deterministic"] is False
assert entry == {
"type": "totp",
"kind": "totp",
"label": "Imported",
"secret": secret,
"period": 30,
"digits": 6,
"archived": False,
"notes": "",
"tags": [],
}
code = em.get_totp_code(0, timestamp=0)
assert code == pyotp.TOTP(secret).at(0)
@@ -79,23 +92,3 @@ def test_add_totp_with_notes(tmp_path):
em.add_totp("NoteLabel", TEST_SEED, notes="some note")
entry = em.retrieve_entry(0)
assert entry["notes"] == "some note"
def test_legacy_deterministic_entry(tmp_path):
vault, enc = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD)
cfg_mgr = ConfigManager(vault, tmp_path)
backup_mgr = BackupManager(tmp_path, cfg_mgr)
em = EntryManager(vault, backup_mgr)
em.add_totp("Legacy", TEST_SEED, deterministic=True)
data = em._load_index()
entry = data["entries"]["0"]
entry.pop("deterministic", None)
em._save_index(data)
code = em.get_totp_code(0, TEST_SEED, timestamp=0)
expected = TotpManager.current_code(TEST_SEED, 0, timestamp=0)
assert code == expected
exported = em.export_totp_entries(TEST_SEED)
assert exported["entries"][0]["secret"] == TotpManager.derive_secret(TEST_SEED, 0)

View File

@@ -3,13 +3,15 @@
"""
Key Derivation Module
This module provides functions to derive cryptographic keys from user-provided
passwords and BIP-39 parent seeds. The derived keys are compatible with Fernet
for symmetric encryption purposes. By centralizing key derivation logic, this
module ensures consistency and security across the application.
Never ever ever use or suggest to use Random Salt. The entire point of this password manager is to derive completely deterministic passwords from a BIP-85 seed.
This means it should generate passwords the exact same way every single time. Salts would break this functionality and is not appropriate for this software's use case.
Ensure that all dependencies are installed and properly configured in your
environment.
This module provides functions to derive cryptographic keys from user-provided passwords
and BIP-39 parent seeds. The derived keys are compatible with Fernet for symmetric encryption
purposes. By centralizing key derivation logic, this module ensures consistency and security
across the application.
Ensure that all dependencies are installed and properly configured in your environment.
"""
import os
@@ -19,13 +21,11 @@ import unicodedata
import logging
import hmac
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional, Union, Dict, Any
from typing import Optional, Union
from bip_utils import Bip39SeedGenerator
from local_bip85 import BIP85
from .key_hierarchy import kd
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives import hashes
@@ -47,27 +47,6 @@ DEFAULT_ENCRYPTION_MODE = EncryptionMode.SEED_ONLY
TOTP_PURPOSE = 39
@dataclass
class KdfConfig:
"""Configuration block describing how a key was derived."""
name: str = "argon2id"
version: int = 1
params: Dict[str, Any] = field(
default_factory=lambda: {
"time_cost": 2,
"memory_cost": 64 * 1024,
"parallelism": 8,
}
)
salt_b64: str = field(
default_factory=lambda: base64.b64encode(os.urandom(16)).decode()
)
CURRENT_KDF_VERSION = 1
def derive_key_from_password(
password: str, fingerprint: Union[str, bytes], iterations: int = 100_000
) -> bytes:
@@ -130,15 +109,18 @@ def derive_key_from_password(
raise
def derive_key_from_password_argon2(password: str, kdf: KdfConfig) -> bytes:
def derive_key_from_password_argon2(
password: str,
fingerprint: Union[str, bytes],
*,
time_cost: int = 2,
memory_cost: int = 64 * 1024,
parallelism: int = 8,
) -> bytes:
"""Derive an encryption key from a password using Argon2id.
Parameters
----------
password:
The user's password.
kdf:
:class:`KdfConfig` instance describing salt and tuning parameters.
The defaults follow recommended parameters but omit a salt for deterministic
output. Smaller values may be supplied for testing.
"""
if not password:
@@ -149,14 +131,17 @@ def derive_key_from_password_argon2(password: str, kdf: KdfConfig) -> bytes:
try:
from argon2.low_level import hash_secret_raw, Type
params = kdf.params or {}
salt = base64.b64decode(kdf.salt_b64)
if isinstance(fingerprint, bytes):
salt = fingerprint
else:
salt = hashlib.sha256(fingerprint.encode()).digest()[:16]
key = hash_secret_raw(
secret=normalized,
salt=salt,
time_cost=int(params.get("time_cost", 2)),
memory_cost=int(params.get("memory_cost", 64 * 1024)),
parallelism=int(params.get("parallelism", 8)),
time_cost=time_cost,
memory_cost=memory_cost,
parallelism=parallelism,
hash_len=32,
type=Type.ID,
)
@@ -209,10 +194,16 @@ def derive_key_from_parent_seed(parent_seed: str, fingerprint: str = None) -> by
def derive_index_key_seed_only(seed: str) -> bytes:
"""Derive the index encryption key using the v1 hierarchy."""
"""Derive a deterministic Fernet key from only the BIP-39 seed."""
seed_bytes = Bip39SeedGenerator(seed).Generate()
master = kd(seed_bytes, b"seedpass:v1:master")
key = kd(master, b"seedpass:v1:storage")
hkdf = HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=None,
info=b"password-db",
backend=default_backend(),
)
key = hkdf.derive(seed_bytes)
return base64.urlsafe_b64encode(key)
@@ -221,21 +212,23 @@ def derive_index_key(seed: str) -> bytes:
return derive_index_key_seed_only(seed)
def derive_totp_secret(seed: Union[str, bytes], index: int) -> str:
"""Derive a base32-encoded TOTP secret from a seed or raw key."""
def derive_totp_secret(seed: str, index: int) -> str:
"""Derive a base32-encoded TOTP secret from a BIP39 seed."""
try:
if isinstance(seed, (bytes, bytearray)):
seed_bytes = bytes(seed)
else:
seed_bytes = Bip39SeedGenerator(seed).Generate()
# Initialize BIP85 from the BIP39 seed bytes
seed_bytes = Bip39SeedGenerator(seed).Generate()
bip85 = BIP85(seed_bytes)
# Build the BIP32 path m/83696968'/39'/TOTP'/{index}'
totp_int = int.from_bytes(b"TOTP", "big")
path = f"m/83696968'/{TOTP_PURPOSE}'/{totp_int}'/{index}'"
# Derive entropy using the same scheme as BIP85
child_key = bip85.bip32_ctx.DerivePath(path)
key_bytes = child_key.PrivateKey().Raw().ToBytes()
entropy = hmac.new(b"bip-entropy-from-k", key_bytes, hashlib.sha512).digest()
# Hash the first 32 bytes of entropy and encode the first 20 bytes
hashed = hashlib.sha256(entropy[:32]).digest()
secret = base64.b32encode(hashed[:20]).decode("utf-8")
logger.debug(f"Derived TOTP secret for index {index}.")
@@ -274,16 +267,18 @@ def calibrate_argon2_time_cost(
"""
password = "benchmark"
salt = base64.b64encode(b"argon2-calibration").decode()
fingerprint = b"argon2-calibration"
time_cost = 1
elapsed_ms = 0.0
while time_cost <= max_time_cost:
start = time.perf_counter()
cfg = KdfConfig(
params={"time_cost": time_cost, "memory_cost": 8, "parallelism": 1},
salt_b64=salt,
derive_key_from_password_argon2(
password,
fingerprint,
time_cost=time_cost,
memory_cost=8,
parallelism=1,
)
derive_key_from_password_argon2(password, cfg)
elapsed_ms = (time.perf_counter() - start) * 1000
if elapsed_ms >= target_ms:
break

View File

@@ -1,28 +0,0 @@
"""Key hierarchy helper functions."""
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
def kd(root: bytes, info: bytes, length: int = 32) -> bytes:
"""Derive a sub-key from ``root`` using HKDF-SHA256.
Parameters
----------
root:
Root key material.
info:
Domain separation string.
length:
Length of the derived key in bytes. Defaults to 32.
"""
hkdf = HKDF(
algorithm=hashes.SHA256(),
length=length,
salt=None,
info=info,
backend=default_backend(),
)
return hkdf.derive(root)

View File

@@ -3,16 +3,9 @@ from __future__ import annotations
import os
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
# TODO: Replace this Python implementation with a Rust/WASM module for
# critical cryptographic operations.
class InMemorySecret:
"""Store sensitive data encrypted in RAM using AES-GCM.
Zeroization is best-effort only; Python's memory management may retain
copies of the plaintext.
"""
"""Store sensitive data encrypted in RAM using AES-GCM."""
def __init__(self, data: bytes) -> None:
if not isinstance(data, (bytes, bytearray)):

View File

@@ -33,12 +33,6 @@ logger = logging.getLogger(__name__)
DEFAULT_MAX_ATTEMPTS = 5
def _env_password() -> str | None:
"""Return a password supplied via environment for non-interactive use."""
return os.getenv("SEEDPASS_TEST_PASSWORD") or os.getenv("SEEDPASS_PASSWORD")
def _get_max_attempts(override: int | None = None) -> int:
"""Return the configured maximum number of prompt attempts."""
@@ -86,13 +80,6 @@ def prompt_new_password(max_retries: int | None = None) -> str:
Raises:
PasswordPromptError: If the user fails to provide a valid password after multiple attempts.
"""
env_pw = _env_password()
if env_pw:
normalized = unicodedata.normalize("NFKD", env_pw)
if len(normalized) < MIN_PASSWORD_LENGTH:
raise PasswordPromptError("Environment password too short")
return normalized
max_retries = _get_max_attempts(max_retries)
attempts = 0
@@ -177,10 +164,6 @@ def prompt_existing_password(
PasswordPromptError: If the user interrupts the operation or exceeds
``max_retries`` attempts.
"""
env_pw = _env_password()
if env_pw:
return unicodedata.normalize("NFKD", env_pw)
max_retries = _get_max_attempts(max_retries)
attempts = 0
while max_retries == 0 or attempts < max_retries:

View File

@@ -102,11 +102,9 @@ def _masked_input_posix(prompt: str) -> str:
def masked_input(prompt: str) -> str:
"""Return input from the user while masking typed characters."""
func = _masked_input_windows if sys.platform == "win32" else _masked_input_posix
try:
return func(prompt)
except Exception: # pragma: no cover - fallback when TTY operations fail
return input(prompt)
if sys.platform == "win32":
return _masked_input_windows(prompt)
return _masked_input_posix(prompt)
def prompt_seed_words(count: int = 12, *, max_attempts: int | None = None) -> str: