8 Commits

Author SHA1 Message Date
thePR0M3TH3AN
15df3f10a6 Merge pull request #831 from PR0M3TH3AN/codex/update-agents.md-to-specify-deterministic-artifacts
docs: add deterministic artifact guidelines
2025-08-20 18:19:36 -04:00
thePR0M3TH3AN
b451097c65 docs: add deterministic artifact guidelines 2025-08-20 18:19:20 -04:00
thePR0M3TH3AN
9cacd1b13d Merge pull request #830 from PR0M3TH3AN/codex/implement-hkdf-helper-and-update-sub-key-usage
Refactor key derivation with hierarchical HKDF
2025-08-20 18:19:00 -04:00
thePR0M3TH3AN
b97d60778b Restore compatibility for key hierarchy 2025-08-20 18:12:02 -04:00
thePR0M3TH3AN
bbb26ca55a test: add key hierarchy tests 2025-08-20 17:57:50 -04:00
thePR0M3TH3AN
d6e03d5e7a Merge pull request #829 from PR0M3TH3AN/codex/implement-kdfconfig-dataclass-and-update-key-derivation
Introduce KDF config metadata and migration tests
2025-08-20 17:44:50 -04:00
thePR0M3TH3AN
26632c0e70 Fix KDF metadata handling and headless password prompts 2025-08-20 17:23:10 -04:00
thePR0M3TH3AN
06ca51993a Add KDF config with per-file metadata 2025-08-19 09:53:46 -04:00
24 changed files with 483 additions and 160 deletions

View File

@@ -39,6 +39,11 @@ This project is written in **Python**. Follow these instructions when working wi
Following these practices helps keep the code base consistent and secure.
## Deterministic Artifact Generation
- All generated artifacts (passwords, keys, TOTP secrets, etc.) must be fully deterministic across runs and platforms.
- Randomness is only permitted for security primitives (e.g., encryption nonces, in-memory keys) and must never influence derived artifacts.
## Legacy Index Migration
- Always provide a migration path for index archives and import/export routines.

View File

@@ -1415,9 +1415,10 @@ def main(argv: list[str] | None = None, *, fingerprint: str | None = None) -> in
if entry.get("type") != EntryType.TOTP.value:
print(colored("Entry is not a TOTP entry.", "red"))
return 1
code = password_manager.entry_manager.get_totp_code(
idx, password_manager.parent_seed
key = getattr(password_manager, "KEY_TOTP_DET", None) or getattr(
password_manager, "parent_seed", None
)
code = password_manager.entry_manager.get_totp_code(idx, key)
print(code)
try:
if copy_to_clipboard(code, password_manager.clipboard_clear_delay):

View File

@@ -464,7 +464,8 @@ def export_totp(
_check_token(request, authorization)
_require_password(request, password)
pm = _get_pm(request)
return pm.entry_manager.export_totp_entries(pm.parent_seed)
key = getattr(pm, "KEY_TOTP_DET", None) or getattr(pm, "parent_seed", None)
return pm.entry_manager.export_totp_entries(key)
@app.get("/api/v1/totp")
@@ -482,7 +483,8 @@ def get_totp_codes(
)
codes = []
for idx, label, _u, _url, _arch in entries:
code = pm.entry_manager.get_totp_code(idx, pm.parent_seed)
key = getattr(pm, "KEY_TOTP_DET", None) or getattr(pm, "parent_seed", None)
code = pm.entry_manager.get_totp_code(idx, key)
rem = pm.entry_manager.get_totp_time_remaining(idx)

View File

@@ -305,9 +305,10 @@ class EntryService:
def get_totp_code(self, entry_id: int) -> str:
with self._lock:
return self._manager.entry_manager.get_totp_code(
entry_id, self._manager.parent_seed
key = getattr(self._manager, "KEY_TOTP_DET", None) or getattr(
self._manager, "parent_seed", None
)
return self._manager.entry_manager.get_totp_code(entry_id, key)
def add_entry(
self,
@@ -515,9 +516,10 @@ class EntryService:
def export_totp_entries(self) -> dict:
with self._lock:
return self._manager.entry_manager.export_totp_entries(
self._manager.parent_seed
key = getattr(self._manager, "KEY_TOTP_DET", None) or getattr(
self._manager, "parent_seed", None
)
return self._manager.entry_manager.export_totp_entries(key)
def display_totp_codes(self) -> None:
with self._lock:

View File

@@ -16,8 +16,9 @@ except Exception: # pragma: no cover - fallback for environments without orjson
import hashlib
import os
import base64
from dataclasses import asdict
from pathlib import Path
from typing import Optional
from typing import Optional, Tuple
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.exceptions import InvalidTag
@@ -26,6 +27,7 @@ from termcolor import colored
from utils.file_lock import exclusive_lock
from mnemonic import Mnemonic
from utils.password_prompt import prompt_existing_password
from utils.key_derivation import KdfConfig, CURRENT_KDF_VERSION
# Instantiate the logger
logger = logging.getLogger(__name__)
@@ -231,40 +233,76 @@ class EncryptionManager:
raise ValueError("Invalid path outside fingerprint directory")
return candidate
def encrypt_parent_seed(self, parent_seed: str) -> None:
def encrypt_parent_seed(
self, parent_seed: str, kdf: Optional[KdfConfig] = None
) -> None:
"""Encrypts and saves the parent seed to 'parent_seed.enc'."""
data = parent_seed.encode("utf-8")
encrypted_data = self.encrypt_data(data) # This now creates V2 format
with exclusive_lock(self.parent_seed_file) as fh:
fh.seek(0)
fh.truncate()
fh.write(encrypted_data)
os.chmod(self.parent_seed_file, 0o600)
self.encrypt_and_save_file(data, self.parent_seed_file, kdf=kdf)
logger.info(f"Parent seed encrypted and saved to '{self.parent_seed_file}'.")
def decrypt_parent_seed(self) -> str:
"""Decrypts and returns the parent seed, handling migration."""
with exclusive_lock(self.parent_seed_file) as fh:
fh.seek(0)
encrypted_data = fh.read()
blob = fh.read()
kdf, encrypted_data = self._deserialize(blob)
is_legacy = not encrypted_data.startswith(b"V2:")
decrypted_data = self.decrypt_data(encrypted_data, context="seed")
if is_legacy:
logger.info("Parent seed was in legacy format. Re-encrypting to V2 format.")
self.encrypt_parent_seed(decrypted_data.decode("utf-8").strip())
self.encrypt_parent_seed(decrypted_data.decode("utf-8").strip(), kdf=kdf)
return decrypted_data.decode("utf-8").strip()
def encrypt_and_save_file(self, data: bytes, relative_path: Path) -> None:
def _serialize(self, kdf: KdfConfig, ciphertext: bytes) -> bytes:
payload = {"kdf": asdict(kdf), "ct": base64.b64encode(ciphertext).decode()}
if USE_ORJSON:
return json_lib.dumps(payload)
return json_lib.dumps(payload, separators=(",", ":")).encode("utf-8")
def _deserialize(self, blob: bytes) -> Tuple[KdfConfig, bytes]:
"""Return ``(KdfConfig, ciphertext)`` from serialized *blob*.
Legacy files stored the raw ciphertext without a JSON wrapper. If
decoding the wrapper fails, treat ``blob`` as the ciphertext and return
a default HKDF configuration.
"""
try:
if USE_ORJSON:
obj = json_lib.loads(blob)
else:
obj = json_lib.loads(blob.decode("utf-8"))
kdf = KdfConfig(**obj.get("kdf", {}))
ct_b64 = obj.get("ct", "")
ciphertext = base64.b64decode(ct_b64)
if ciphertext:
return kdf, ciphertext
except Exception: # pragma: no cover - fall back to legacy path
pass
# Legacy format: ``blob`` already contains the ciphertext
return (
KdfConfig(name="hkdf", version=CURRENT_KDF_VERSION, params={}, salt_b64=""),
blob,
)
def encrypt_and_save_file(
self, data: bytes, relative_path: Path, *, kdf: Optional[KdfConfig] = None
) -> None:
if kdf is None:
kdf = KdfConfig()
file_path = self.resolve_relative_path(relative_path)
file_path.parent.mkdir(parents=True, exist_ok=True)
encrypted_data = self.encrypt_data(data)
payload = self._serialize(kdf, encrypted_data)
with exclusive_lock(file_path) as fh:
fh.seek(0)
fh.truncate()
fh.write(encrypted_data)
fh.write(payload)
fh.flush()
os.fsync(fh.fileno())
os.chmod(file_path, 0o600)
@@ -273,20 +311,37 @@ class EncryptionManager:
file_path = self.resolve_relative_path(relative_path)
with exclusive_lock(file_path) as fh:
fh.seek(0)
encrypted_data = fh.read()
blob = fh.read()
_, encrypted_data = self._deserialize(blob)
return self.decrypt_data(encrypted_data, context=str(relative_path))
def save_json_data(self, data: dict, relative_path: Optional[Path] = None) -> None:
def get_file_kdf(self, relative_path: Path) -> KdfConfig:
file_path = self.resolve_relative_path(relative_path)
with exclusive_lock(file_path) as fh:
fh.seek(0)
blob = fh.read()
kdf, _ = self._deserialize(blob)
return kdf
def save_json_data(
self,
data: dict,
relative_path: Optional[Path] = None,
*,
kdf: Optional[KdfConfig] = None,
) -> None:
if relative_path is None:
relative_path = Path("seedpass_entries_db.json.enc")
if USE_ORJSON:
json_data = json_lib.dumps(data)
else:
json_data = json_lib.dumps(data, separators=(",", ":")).encode("utf-8")
self.encrypt_and_save_file(json_data, relative_path)
self.encrypt_and_save_file(json_data, relative_path, kdf=kdf)
logger.debug(f"JSON data encrypted and saved to '{relative_path}'.")
def load_json_data(self, relative_path: Optional[Path] = None) -> dict:
def load_json_data(
self, relative_path: Optional[Path] = None, *, return_kdf: bool = False
) -> dict | Tuple[dict, KdfConfig]:
"""
Loads and decrypts JSON data, automatically migrating and re-saving
if it's in the legacy format.
@@ -295,12 +350,18 @@ class EncryptionManager:
relative_path = Path("seedpass_entries_db.json.enc")
file_path = self.resolve_relative_path(relative_path)
if not file_path.exists():
return {"entries": {}}
empty: dict = {"entries": {}}
if return_kdf:
return empty, KdfConfig(
name="hkdf", version=CURRENT_KDF_VERSION, params={}, salt_b64=""
)
return empty
with exclusive_lock(file_path) as fh:
fh.seek(0)
encrypted_data = fh.read()
blob = fh.read()
kdf, encrypted_data = self._deserialize(blob)
is_legacy = not encrypted_data.startswith(b"V2:")
self.last_migration_performed = False
@@ -316,10 +377,12 @@ class EncryptionManager:
# If it was a legacy file, re-save it in the new format now
if is_legacy and self._legacy_migrate_flag:
logger.info(f"Migrating and re-saving legacy vault file: {file_path}")
self.save_json_data(data, relative_path)
self.save_json_data(data, relative_path, kdf=kdf)
self.update_checksum(relative_path)
self.last_migration_performed = True
if return_kdf:
return data, kdf
return data
except (InvalidToken, InvalidTag, JSONDecodeError) as e:
logger.error(
@@ -360,7 +423,8 @@ class EncryptionManager:
if relative_path is None:
relative_path = Path("seedpass_entries_db.json.enc")
is_legacy = not encrypted_data.startswith(b"V2:")
kdf, ciphertext = self._deserialize(encrypted_data)
is_legacy = not ciphertext.startswith(b"V2:")
self.last_migration_performed = False
def _process(decrypted: bytes) -> dict:
@@ -386,11 +450,9 @@ class EncryptionManager:
return data
try:
decrypted_data = self.decrypt_data(
encrypted_data, context=str(relative_path)
)
decrypted_data = self.decrypt_data(ciphertext, context=str(relative_path))
data = _process(decrypted_data)
self.save_json_data(data, relative_path) # This always saves in V2 format
self.save_json_data(data, relative_path, kdf=kdf)
self.update_checksum(relative_path)
logger.info("Index file from Nostr was processed and saved successfully.")
self.last_migration_performed = is_legacy
@@ -401,10 +463,10 @@ class EncryptionManager:
"Enter your master password for legacy decryption: "
)
decrypted_data = self.decrypt_legacy(
encrypted_data, password, context=str(relative_path)
ciphertext, password, context=str(relative_path)
)
data = _process(decrypted_data)
self.save_json_data(data, relative_path)
self.save_json_data(data, relative_path, kdf=kdf)
self.update_checksum(relative_path)
logger.warning(
"Index decrypted using legacy password-only key derivation."

View File

@@ -257,7 +257,7 @@ class EntryManager:
def add_totp(
self,
label: str,
parent_seed: str,
parent_seed: str | bytes,
*,
archived: bool = False,
secret: str | None = None,
@@ -689,7 +689,10 @@ class EntryManager:
return derive_seed_phrase(bip85, seed_index, words)
def get_totp_code(
self, index: int, parent_seed: str | None = None, timestamp: int | None = None
self,
index: int,
parent_seed: str | bytes | None = None,
timestamp: int | None = None,
) -> str:
"""Return the current TOTP code for the specified entry."""
entry = self.retrieve_entry(index)
@@ -719,7 +722,9 @@ class EntryManager:
period = int(entry.get("period", 30))
return TotpManager.time_remaining(period)
def export_totp_entries(self, parent_seed: str) -> dict[str, list[dict[str, Any]]]:
def export_totp_entries(
self, parent_seed: str | bytes
) -> dict[str, list[dict[str, Any]]]:
"""Return all TOTP secrets and metadata for external use."""
data = self._load_index()
entries = data.get("entries", {})

View File

@@ -15,6 +15,7 @@ import logging
import os
import hashlib
import hmac
import base64
from typing import Optional, Literal, Any
import shutil
import time
@@ -46,7 +47,9 @@ from utils.key_derivation import (
derive_key_from_password_argon2,
derive_index_key,
EncryptionMode,
KdfConfig,
)
from utils.key_hierarchy import kd
from utils.checksum import (
calculate_checksum,
verify_checksum,
@@ -230,6 +233,13 @@ class PasswordManager:
verification, ensuring the integrity and confidentiality of the stored password database.
"""
# Class-level fallbacks so attributes exist even if ``__init__`` is bypassed
master_key: bytes | None = None
KEY_STORAGE: bytes | None = None
KEY_INDEX: bytes | None = None
KEY_PW_DERIVE: bytes | None = None
KEY_TOTP_DET: bytes | None = None
def __init__(
self, fingerprint: Optional[str] = None, *, password: Optional[str] = None
) -> None:
@@ -262,6 +272,13 @@ class PasswordManager:
self._bip85_cache: dict[tuple[int, int], bytes] = {}
self.audit_logger: Optional[AuditLogger] = None
# Derived key hierarchy
self.master_key: bytes | None = None
self.KEY_STORAGE: bytes | None = None
self.KEY_INDEX: bytes | None = None
self.KEY_PW_DERIVE: bytes | None = None
self.KEY_TOTP_DET: bytes | None = None
# Track changes to trigger periodic Nostr sync
self.is_dirty: bool = False
self.last_update: float = time.time()
@@ -322,6 +339,30 @@ class PasswordManager:
self._bip85_cache.clear()
def derive_key_hierarchy(self, seed_bytes: bytes) -> None:
"""Populate sub-keys from ``seed_bytes`` using HKDF."""
master = kd(seed_bytes, b"seedpass:v1:master")
self.master_key = master
self.KEY_STORAGE = kd(master, b"seedpass:v1:storage")
self.KEY_INDEX = kd(master, b"seedpass:v1:index")
self.KEY_PW_DERIVE = kd(master, b"seedpass:v1:pw")
self.KEY_TOTP_DET = kd(master, b"seedpass:v1:totp")
def ensure_key_hierarchy(self) -> None:
"""Ensure sub-keys are derived from the current parent seed."""
if (
self.KEY_STORAGE is None
or self.KEY_INDEX is None
or self.KEY_PW_DERIVE is None
or self.KEY_TOTP_DET is None
) and getattr(self, "parent_seed", None):
try:
seed_bytes = Bip39SeedGenerator(self.parent_seed).Generate()
except Exception:
seed_bytes = hashlib.sha256(self.parent_seed.encode()).digest()
self.derive_key_hierarchy(seed_bytes)
def ensure_script_checksum(self) -> None:
"""Initialize or verify the checksum of the manager script."""
script_path = Path(__file__).resolve()
@@ -479,15 +520,12 @@ class PasswordManager:
self.setup_encryption_manager(self.fingerprint_dir, password)
self.initialize_bip85()
self.initialize_managers()
self.ensure_key_hierarchy()
self.is_locked = False
self.locked = False
self.update_activity()
if (
getattr(self, "audit_logger", None) is None
and getattr(self, "_parent_seed_secret", None) is not None
):
key = hashlib.sha256(self.parent_seed.encode("utf-8")).digest()
self.audit_logger = AuditLogger(key)
if getattr(self, "audit_logger", None) is None and self.KEY_INDEX is not None:
self.audit_logger = AuditLogger(self.KEY_INDEX)
if (
getattr(self, "config_manager", None)
and self.config_manager.get_quick_unlock()
@@ -689,9 +727,9 @@ class PasswordManager:
for iter_try in dict.fromkeys(iter_candidates):
try:
if mode == "argon2":
seed_key = derive_key_from_password_argon2(
password, salt_fp
)
salt = hashlib.sha256(salt_fp.encode()).digest()[:16]
cfg = KdfConfig(salt_b64=base64.b64encode(salt).decode())
seed_key = derive_key_from_password_argon2(password, cfg)
else:
seed_key = derive_key_from_password(
password, salt_fp, iterations=iter_try
@@ -718,9 +756,10 @@ class PasswordManager:
password = None
continue
key = derive_index_key(self.parent_seed)
self.encryption_manager = EncryptionManager(key, fingerprint_dir)
seed_bytes = Bip39SeedGenerator(self.parent_seed).Generate()
self.derive_key_hierarchy(seed_bytes)
key_b64 = base64.urlsafe_b64encode(self.KEY_STORAGE)
self.encryption_manager = EncryptionManager(key_b64, fingerprint_dir)
self.vault = Vault(self.encryption_manager, fingerprint_dir)
self.config_manager = ConfigManager(
@@ -771,7 +810,9 @@ class PasswordManager:
)
salt_fp = fingerprint_dir.name
if mode == "argon2":
seed_key = derive_key_from_password_argon2(password, salt_fp)
salt = hashlib.sha256(salt_fp.encode()).digest()[:16]
cfg = KdfConfig(salt_b64=base64.b64encode(salt).decode())
seed_key = derive_key_from_password_argon2(password, cfg)
else:
seed_key = derive_key_from_password(
password, salt_fp, iterations=iterations
@@ -779,6 +820,7 @@ class PasswordManager:
seed_mgr = EncryptionManager(seed_key, fingerprint_dir)
self.parent_seed = seed_mgr.decrypt_parent_seed()
seed_bytes = Bip39SeedGenerator(self.parent_seed).Generate()
self.derive_key_hierarchy(seed_bytes)
self.bip85 = BIP85(seed_bytes)
except Exception as e:
logger.error(f"Failed to load parent seed: {e}", exc_info=True)
@@ -808,8 +850,10 @@ class PasswordManager:
self.fingerprint_dir = account_dir
self.parent_seed = seed
key = derive_index_key(seed)
self.encryption_manager = EncryptionManager(key, account_dir)
seed_bytes = Bip39SeedGenerator(seed).Generate()
self.derive_key_hierarchy(seed_bytes)
key_b64 = base64.urlsafe_b64encode(self.KEY_STORAGE)
self.encryption_manager = EncryptionManager(key_b64, account_dir)
self.vault = Vault(self.encryption_manager, account_dir)
self.initialize_bip85()
@@ -828,9 +872,13 @@ class PasswordManager:
self.current_fingerprint = fp
self.fingerprint_dir = path
self.parent_seed = seed
key = derive_index_key(seed)
self.encryption_manager = EncryptionManager(key, path)
try:
seed_bytes = Bip39SeedGenerator(seed).Generate()
self.derive_key_hierarchy(seed_bytes)
key_b64 = base64.urlsafe_b64encode(self.KEY_STORAGE)
except Exception:
key_b64 = derive_index_key(seed)
self.encryption_manager = EncryptionManager(key_b64, path)
self.vault = Vault(self.encryption_manager, path)
self.initialize_bip85()
@@ -896,10 +944,14 @@ class PasswordManager:
password, selected_fingerprint, iterations=iterations
)
# Initialize EncryptionManager with key and fingerprint_dir
self.encryption_manager = EncryptionManager(key, fingerprint_dir)
seed_mgr = EncryptionManager(key, fingerprint_dir)
self.vault = Vault(seed_mgr, fingerprint_dir)
self.parent_seed = seed_mgr.decrypt_parent_seed()
seed_bytes = Bip39SeedGenerator(self.parent_seed).Generate()
self.derive_key_hierarchy(seed_bytes)
key_b64 = base64.urlsafe_b64encode(self.KEY_STORAGE)
self.encryption_manager = EncryptionManager(key_b64, fingerprint_dir)
self.vault = Vault(self.encryption_manager, fingerprint_dir)
self.parent_seed = self.encryption_manager.decrypt_parent_seed()
# Log the type and content of parent_seed
logger.debug(
@@ -1041,7 +1093,9 @@ class PasswordManager:
try:
if password is None:
password = prompt_for_password()
index_key = derive_index_key(parent_seed)
seed_bytes = Bip39SeedGenerator(parent_seed).Generate()
self.derive_key_hierarchy(seed_bytes)
index_key = base64.urlsafe_b64encode(self.KEY_STORAGE)
iterations = (
self.config_manager.get_kdf_iterations()
if getattr(self, "config_manager", None)
@@ -1220,7 +1274,9 @@ class PasswordManager:
if password is None:
password = prompt_for_password()
index_key = derive_index_key(seed)
seed_bytes = Bip39SeedGenerator(seed).Generate()
self.derive_key_hierarchy(seed_bytes)
index_key = base64.urlsafe_b64encode(self.KEY_STORAGE)
iterations = (
self.config_manager.get_kdf_iterations()
if getattr(self, "config_manager", None)
@@ -1266,6 +1322,7 @@ class PasswordManager:
"""
try:
seed_bytes = Bip39SeedGenerator(self.parent_seed).Generate()
self.derive_key_hierarchy(seed_bytes)
self.bip85 = BIP85(seed_bytes)
self._bip85_cache = {}
orig_derive = self.bip85.derive_entropy
@@ -1298,6 +1355,9 @@ class PasswordManager:
if not self.encryption_manager:
raise ValueError("EncryptionManager is not initialized.")
# Derive sub-keys if needed
self.ensure_key_hierarchy()
# Reinitialize the managers with the updated EncryptionManager and current fingerprint context
self.config_manager = ConfigManager(
vault=self.vault,
@@ -1330,10 +1390,11 @@ class PasswordManager:
backup_manager=self.backup_manager,
)
pw_bip85 = BIP85(self.KEY_PW_DERIVE)
self.password_generator = PasswordGenerator(
encryption_manager=self.encryption_manager,
parent_seed=self.parent_seed,
bip85=self.bip85,
parent_seed=self.KEY_PW_DERIVE,
bip85=pw_bip85,
policy=self.config_manager.get_password_policy(),
)
@@ -1817,16 +1878,17 @@ class PasswordManager:
)
totp_index = self.entry_manager.get_next_totp_index()
entry_id = self.entry_manager.get_next_index()
key = self.KEY_TOTP_DET or getattr(self, "parent_seed", None)
uri = self.entry_manager.add_totp(
label,
self.parent_seed,
key,
index=totp_index,
period=int(period),
digits=int(digits),
notes=notes,
tags=tags,
)
secret = TotpManager.derive_secret(self.parent_seed, totp_index)
secret = TotpManager.derive_secret(key, totp_index)
self.is_dirty = True
self.last_update = time.time()
print(
@@ -1869,9 +1931,10 @@ class PasswordManager:
else []
)
entry_id = self.entry_manager.get_next_index()
key = self.KEY_TOTP_DET or getattr(self, "parent_seed", None)
uri = self.entry_manager.add_totp(
label,
self.parent_seed,
key,
secret=secret,
period=period,
digits=digits,
@@ -2633,7 +2696,8 @@ class PasswordManager:
print(colored("Press Enter to return to the menu.", "cyan"))
try:
while True:
code = self.entry_manager.get_totp_code(index, self.parent_seed)
key = self.KEY_TOTP_DET or getattr(self, "parent_seed", None)
code = self.entry_manager.get_totp_code(index, key)
if self.secret_mode_enabled:
if copy_to_clipboard(code, self.clipboard_clear_delay):
print(
@@ -4110,6 +4174,7 @@ class PasswordManager:
def handle_export_totp_codes(self) -> Path | None:
"""Export all 2FA codes to a JSON file for other authenticator apps."""
try:
self.ensure_key_hierarchy()
fp, parent_fp, child_fp = self.header_fingerprint_args
clear_header_with_notification(
self,
@@ -4131,7 +4196,8 @@ class PasswordManager:
secret = entry["secret"]
else:
idx = int(entry.get("index", 0))
secret = TotpManager.derive_secret(self.parent_seed, idx)
key = self.KEY_TOTP_DET or getattr(self, "parent_seed", None)
secret = TotpManager.derive_secret(key, idx)
uri = TotpManager.make_otpauth_uri(label, secret, period, digits)
totp_entries.append(
{
@@ -4368,6 +4434,7 @@ class PasswordManager:
def change_password(self, old_password: str, new_password: str) -> None:
"""Change the master password used for encryption."""
try:
self.ensure_key_hierarchy()
if not self.verify_password(old_password):
raise ValueError("Incorrect password")
@@ -4376,7 +4443,7 @@ class PasswordManager:
config_data = self.config_manager.load_config(require_pin=False)
# Create a new encryption manager with the new password
new_key = derive_index_key(self.parent_seed)
new_key = base64.urlsafe_b64encode(self.KEY_STORAGE)
iterations = self.config_manager.get_kdf_iterations()
seed_key = derive_key_from_password(

View File

@@ -131,7 +131,10 @@ class MenuHandler:
if generated:
print(colored("\nGenerated 2FA Codes:", "green"))
for label, idx, period, _ in generated:
code = pm.entry_manager.get_totp_code(idx, pm.parent_seed)
key = getattr(pm, "KEY_TOTP_DET", None) or getattr(
pm, "parent_seed", None
)
code = pm.entry_manager.get_totp_code(idx, key)
remaining = pm.entry_manager.get_totp_time_remaining(idx)
filled = int(20 * (period - remaining) / period)
bar = "[" + "#" * filled + "-" * (20 - filled) + "]"
@@ -149,7 +152,10 @@ class MenuHandler:
if imported_list:
print(colored("\nImported 2FA Codes:", "green"))
for label, idx, period, _ in imported_list:
code = pm.entry_manager.get_totp_code(idx, pm.parent_seed)
key = getattr(pm, "KEY_TOTP_DET", None) or getattr(
pm, "parent_seed", None
)
code = pm.entry_manager.get_totp_code(idx, key)
remaining = pm.entry_manager.get_totp_time_remaining(idx)
filled = int(20 * (period - remaining) / period)
bar = "[" + "#" * filled + "-" * (20 - filled) + "]"

View File

@@ -113,10 +113,12 @@ class PasswordGenerator:
self.bip85 = bip85
self.policy = policy or PasswordPolicy()
# Derive seed bytes from parent_seed using BIP39 (handled by EncryptionManager)
self.seed_bytes = self.encryption_manager.derive_seed_from_mnemonic(
self.parent_seed
)
if isinstance(parent_seed, (bytes, bytearray)):
self.seed_bytes = bytes(parent_seed)
else:
self.seed_bytes = self.encryption_manager.derive_seed_from_mnemonic(
self.parent_seed
)
logger.debug("PasswordGenerator initialized successfully.")
except Exception as e:

View File

@@ -4,6 +4,7 @@ from __future__ import annotations
import sys
import time
from typing import Union
from urllib.parse import quote
from urllib.parse import urlparse, parse_qs, unquote
@@ -18,13 +19,15 @@ class TotpManager:
"""Helper methods for TOTP secrets and codes."""
@staticmethod
def derive_secret(seed: str, index: int) -> str:
"""Derive a TOTP secret from a BIP39 seed and index."""
def derive_secret(seed: Union[str, bytes], index: int) -> str:
"""Derive a TOTP secret from a seed or raw key and index."""
return key_derivation.derive_totp_secret(seed, index)
@classmethod
def current_code(cls, seed: str, index: int, timestamp: int | None = None) -> str:
"""Return the TOTP code for the given seed and index."""
def current_code(
cls, seed: Union[str, bytes], index: int, timestamp: int | None = None
) -> str:
"""Return the TOTP code for the given seed/key and index."""
secret = cls.derive_secret(seed, index)
totp = pyotp.TOTP(secret)
if timestamp is None:

View File

@@ -14,6 +14,7 @@ from .encryption import (
USE_ORJSON,
json_lib,
)
from utils.key_derivation import KdfConfig, CURRENT_KDF_VERSION
from utils.password_prompt import prompt_existing_password
@@ -38,6 +39,11 @@ class Vault:
"""Replace the internal encryption manager."""
self.encryption_manager = manager
def _hkdf_kdf(self) -> KdfConfig:
return KdfConfig(
name="hkdf", version=CURRENT_KDF_VERSION, params={}, salt_b64=""
)
# ----- Password index helpers -----
def load_index(self, *, return_migration_flags: bool = False):
"""Return decrypted password index data, applying migrations.
@@ -102,10 +108,24 @@ class Vault:
)
try:
data = self.encryption_manager.load_json_data(self.index_file)
data, kdf = self.encryption_manager.load_json_data(
self.index_file, return_kdf=True
)
migration_performed = getattr(
self.encryption_manager, "last_migration_performed", False
)
if kdf.version < CURRENT_KDF_VERSION:
new_kdf = KdfConfig(
name=kdf.name,
version=CURRENT_KDF_VERSION,
params=kdf.params,
salt_b64=kdf.salt_b64,
)
self.encryption_manager.save_json_data(
data, self.index_file, kdf=new_kdf
)
self.encryption_manager.update_checksum(self.index_file)
migration_performed = True
except LegacyFormatRequiresMigrationError:
print(
colored(
@@ -142,7 +162,9 @@ class Vault:
else:
data = json_lib.loads(decrypted.decode("utf-8"))
if self.encryption_manager._legacy_migrate_flag:
self.encryption_manager.save_json_data(data, self.index_file)
self.encryption_manager.save_json_data(
data, self.index_file, kdf=self._hkdf_kdf()
)
self.encryption_manager.update_checksum(self.index_file)
migration_performed = getattr(
self.encryption_manager, "last_migration_performed", False
@@ -181,7 +203,9 @@ class Vault:
try:
data = apply_migrations(data)
if schema_migrated:
self.encryption_manager.save_json_data(data, self.index_file)
self.encryption_manager.save_json_data(
data, self.index_file, kdf=self._hkdf_kdf()
)
self.encryption_manager.update_checksum(self.index_file)
except Exception as exc: # noqa: BLE001 - surface clear error and restore
if legacy_detected and backup_dir is not None:
@@ -214,7 +238,9 @@ class Vault:
def save_index(self, data: dict) -> None:
"""Encrypt and write password index."""
self.encryption_manager.save_json_data(data, self.index_file)
self.encryption_manager.save_json_data(
data, self.index_file, kdf=self._hkdf_kdf()
)
def get_encrypted_index(self) -> Optional[bytes]:
"""Return the encrypted index bytes if present."""
@@ -252,4 +278,6 @@ class Vault:
def save_config(self, config: dict) -> None:
"""Encrypt and persist configuration."""
self.encryption_manager.save_json_data(config, self.config_file)
self.encryption_manager.save_json_data(
config, self.config_file, kdf=self._hkdf_kdf()
)

View File

@@ -3,11 +3,15 @@ from pathlib import Path
from hypothesis import given, strategies as st, settings, HealthCheck
from mnemonic import Mnemonic
import hashlib
import base64
import os
from utils.key_derivation import (
derive_key_from_password,
derive_key_from_password_argon2,
derive_index_key,
KdfConfig,
)
from utils.fingerprint import generate_fingerprint
from seedpass.core.encryption import EncryptionManager
@@ -36,16 +40,27 @@ def test_fuzz_key_round_trip(password, seed_bytes, config, mode, tmp_path: Path)
seed_phrase = Mnemonic("english").to_mnemonic(seed_bytes)
fp = generate_fingerprint(seed_phrase)
if mode == "argon2":
key = derive_key_from_password_argon2(
password, fp, time_cost=1, memory_cost=8, parallelism=1
cfg = KdfConfig(
params={"time_cost": 1, "memory_cost": 8, "parallelism": 1},
salt_b64=base64.b64encode(
hashlib.sha256(fp.encode()).digest()[:16]
).decode(),
)
key = derive_key_from_password_argon2(password, cfg)
else:
key = derive_key_from_password(password, fp, iterations=1)
cfg = KdfConfig(
name="pbkdf2",
params={"iterations": 1},
salt_b64=base64.b64encode(
hashlib.sha256(fp.encode()).digest()[:16]
).decode(),
)
enc_mgr = EncryptionManager(key, tmp_path)
# Parent seed round trip
enc_mgr.encrypt_parent_seed(seed_phrase)
enc_mgr.encrypt_parent_seed(seed_phrase, kdf=cfg)
assert enc_mgr.decrypt_parent_seed() == seed_phrase
# JSON data round trip

View File

@@ -1,4 +1,6 @@
import bcrypt
import hashlib
import base64
from pathlib import Path
from tempfile import TemporaryDirectory
from types import SimpleNamespace
@@ -7,6 +9,7 @@ from utils.key_derivation import (
derive_key_from_password,
derive_key_from_password_argon2,
derive_index_key,
KdfConfig,
)
from seedpass.core.encryption import EncryptionManager
from seedpass.core.vault import Vault
@@ -21,10 +24,24 @@ def _setup_profile(tmp: Path, mode: str):
argon_kwargs = dict(time_cost=1, memory_cost=8, parallelism=1)
fp = tmp.name
if mode == "argon2":
seed_key = derive_key_from_password_argon2(TEST_PASSWORD, fp, **argon_kwargs)
cfg = KdfConfig(
params=argon_kwargs,
salt_b64=base64.b64encode(
hashlib.sha256(fp.encode()).digest()[:16]
).decode(),
)
seed_key = derive_key_from_password_argon2(TEST_PASSWORD, cfg)
EncryptionManager(seed_key, tmp).encrypt_parent_seed(TEST_SEED, kdf=cfg)
else:
seed_key = derive_key_from_password(TEST_PASSWORD, fp, iterations=1)
EncryptionManager(seed_key, tmp).encrypt_parent_seed(TEST_SEED)
cfg = KdfConfig(
name="pbkdf2",
params={"iterations": 1},
salt_b64=base64.b64encode(
hashlib.sha256(fp.encode()).digest()[:16]
).decode(),
)
EncryptionManager(seed_key, tmp).encrypt_parent_seed(TEST_SEED, kdf=cfg)
index_key = derive_index_key(TEST_SEED)
enc_mgr = EncryptionManager(index_key, tmp)
@@ -65,9 +82,9 @@ def test_setup_encryption_manager_kdf_modes(monkeypatch):
)
if mode == "argon2":
monkeypatch.setattr(
"seedpass.core.manager.derive_key_from_password_argon2",
lambda pw, fp: derive_key_from_password_argon2(
pw, fp, **argon_kwargs
"seedpass.core.manager.KdfConfig",
lambda salt_b64, **_: KdfConfig(
params=argon_kwargs, salt_b64=salt_b64
),
)
monkeypatch.setattr(PasswordManager, "initialize_bip85", lambda self: None)
@@ -76,3 +93,26 @@ def test_setup_encryption_manager_kdf_modes(monkeypatch):
)
assert pm.setup_encryption_manager(path, exit_on_fail=False)
assert pm.parent_seed == TEST_SEED
def test_kdf_param_round_trip(tmp_path):
cfg = KdfConfig(
params={"time_cost": 3, "memory_cost": 32, "parallelism": 1},
salt_b64=base64.b64encode(b"static-salt-1234").decode(),
)
key = derive_key_from_password_argon2(TEST_PASSWORD, cfg)
mgr = EncryptionManager(key, tmp_path)
mgr.encrypt_parent_seed(TEST_SEED, kdf=cfg)
stored = mgr.get_file_kdf(Path("parent_seed.enc"))
assert stored.params == cfg.params
def test_vault_kdf_migration(tmp_path):
index_key = derive_index_key(TEST_SEED)
mgr = EncryptionManager(index_key, tmp_path)
vault = Vault(mgr, tmp_path)
old_kdf = KdfConfig(name="hkdf", version=0, params={}, salt_b64="")
mgr.save_json_data({"entries": {}}, vault.index_file, kdf=old_kdf)
vault.load_index()
new_kdf = mgr.get_file_kdf(vault.index_file)
assert new_kdf.version == KdfConfig().version

View File

@@ -1,11 +1,15 @@
import logging
import pytest
import logging
import hashlib
import base64
from utils.fingerprint import generate_fingerprint
from utils.key_derivation import (
derive_key_from_password,
derive_key_from_password_argon2,
derive_index_key_seed_only,
derive_index_key,
KdfConfig,
)
@@ -48,15 +52,17 @@ def test_argon2_fingerprint_affects_key():
fp1 = generate_fingerprint("seed one")
fp2 = generate_fingerprint("seed two")
k1 = derive_key_from_password_argon2(
password, fp1, time_cost=1, memory_cost=8, parallelism=1
cfg1 = KdfConfig(
params={"time_cost": 1, "memory_cost": 8, "parallelism": 1},
salt_b64=base64.b64encode(hashlib.sha256(fp1.encode()).digest()[:16]).decode(),
)
k2 = derive_key_from_password_argon2(
password, fp1, time_cost=1, memory_cost=8, parallelism=1
)
k3 = derive_key_from_password_argon2(
password, fp2, time_cost=1, memory_cost=8, parallelism=1
cfg2 = KdfConfig(
params={"time_cost": 1, "memory_cost": 8, "parallelism": 1},
salt_b64=base64.b64encode(hashlib.sha256(fp2.encode()).digest()[:16]).decode(),
)
k1 = derive_key_from_password_argon2(password, cfg1)
k2 = derive_key_from_password_argon2(password, cfg1)
k3 = derive_key_from_password_argon2(password, cfg2)
assert k1 == k2
assert k1 != k3

View File

@@ -0,0 +1,19 @@
import base64
from bip_utils import Bip39SeedGenerator
from utils.key_hierarchy import kd
from utils.key_derivation import derive_index_key
def test_kd_distinct_infos():
root = b"root" * 8
k1 = kd(root, b"info1")
k2 = kd(root, b"info2")
assert k1 != k2
def test_derive_index_key_matches_hierarchy():
seed = "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about"
seed_bytes = Bip39SeedGenerator(seed).Generate()
master = kd(seed_bytes, b"seedpass:v1:master")
expected = base64.urlsafe_b64encode(kd(master, b"seedpass:v1:storage"))
assert derive_index_key(seed) == expected

View File

@@ -1,4 +1,5 @@
import json
import base64
import hashlib
from pathlib import Path
@@ -99,7 +100,8 @@ def test_migrated_index_has_v2_prefix(monkeypatch, tmp_path: Path):
vault.load_index()
new_file = tmp_path / "seedpass_entries_db.json.enc"
assert new_file.read_bytes().startswith(b"V2:")
payload = json.loads(new_file.read_text())
assert base64.b64decode(payload["ct"]).startswith(b"V2:")
assert vault.migrated_from_legacy

View File

@@ -66,5 +66,5 @@ def test_migrate_iterations(tmp_path, monkeypatch, iterations):
cfg = ConfigManager(vault, tmp_path)
assert cfg.get_kdf_iterations() == iterations
content = (tmp_path / "seedpass_entries_db.json.enc").read_bytes()
assert content.startswith(b"V2:")
payload = json.loads((tmp_path / "seedpass_entries_db.json.enc").read_text())
assert base64.b64decode(payload["ct"]).startswith(b"V2:")

View File

@@ -50,6 +50,6 @@ def test_migrate_legacy_sets_flag(tmp_path, monkeypatch):
monkeypatch.setattr(vault_module, "prompt_existing_password", lambda _: password)
monkeypatch.setattr("builtins.input", lambda _: "2")
vault.load_index()
content = (tmp_path / "seedpass_entries_db.json.enc").read_bytes()
assert content.startswith(b"V2:")
payload = json.loads((tmp_path / "seedpass_entries_db.json.enc").read_text())
assert base64.b64decode(payload["ct"]).startswith(b"V2:")
assert vault.encryption_manager.last_migration_performed is True

View File

@@ -1,4 +1,5 @@
import json
import base64
import hashlib
from pathlib import Path
from types import SimpleNamespace
@@ -34,7 +35,8 @@ def test_legacy_migration_second_session(monkeypatch, tmp_path: Path) -> None:
monkeypatch.setattr("builtins.input", lambda *_a, **_k: "y")
vault.load_index()
new_file = fp_dir / "seedpass_entries_db.json.enc"
assert new_file.read_bytes().startswith(b"V2:")
payload = json.loads(new_file.read_text())
assert base64.b64decode(payload["ct"]).startswith(b"V2:")
new_enc_mgr = EncryptionManager(key, fp_dir)
new_vault = Vault(new_enc_mgr, fp_dir)
@@ -59,4 +61,5 @@ def test_legacy_migration_second_session(monkeypatch, tmp_path: Path) -> None:
)
pm.initialize_managers()
assert new_file.read_bytes().startswith(b"V2:")
payload = json.loads(new_file.read_text())
assert base64.b64decode(payload["ct"]).startswith(b"V2:")

View File

@@ -1,4 +1,6 @@
import sys
import json
import base64
from pathlib import Path
from cryptography.fernet import Fernet
@@ -28,4 +30,5 @@ def test_parent_seed_migrates_from_fernet(tmp_path: Path) -> None:
assert new_file.exists()
assert new_file.read_bytes() != encrypted
assert new_file.read_bytes().startswith(b"V2:")
payload = json.loads(new_file.read_text())
assert base64.b64decode(payload["ct"]).startswith(b"V2:")

View File

@@ -3,15 +3,13 @@
"""
Key Derivation Module
Never ever ever use or suggest to use Random Salt. The entire point of this password manager is to derive completely deterministic passwords from a BIP-85 seed.
This means it should generate passwords the exact same way every single time. Salts would break this functionality and is not appropriate for this software's use case.
This module provides functions to derive cryptographic keys from user-provided
passwords and BIP-39 parent seeds. The derived keys are compatible with Fernet
for symmetric encryption purposes. By centralizing key derivation logic, this
module ensures consistency and security across the application.
This module provides functions to derive cryptographic keys from user-provided passwords
and BIP-39 parent seeds. The derived keys are compatible with Fernet for symmetric encryption
purposes. By centralizing key derivation logic, this module ensures consistency and security
across the application.
Ensure that all dependencies are installed and properly configured in your environment.
Ensure that all dependencies are installed and properly configured in your
environment.
"""
import os
@@ -21,11 +19,13 @@ import unicodedata
import logging
import hmac
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional, Union
from typing import Optional, Union, Dict, Any
from bip_utils import Bip39SeedGenerator
from local_bip85 import BIP85
from .key_hierarchy import kd
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives import hashes
@@ -47,6 +47,27 @@ DEFAULT_ENCRYPTION_MODE = EncryptionMode.SEED_ONLY
TOTP_PURPOSE = 39
@dataclass
class KdfConfig:
"""Configuration block describing how a key was derived."""
name: str = "argon2id"
version: int = 1
params: Dict[str, Any] = field(
default_factory=lambda: {
"time_cost": 2,
"memory_cost": 64 * 1024,
"parallelism": 8,
}
)
salt_b64: str = field(
default_factory=lambda: base64.b64encode(os.urandom(16)).decode()
)
CURRENT_KDF_VERSION = 1
def derive_key_from_password(
password: str, fingerprint: Union[str, bytes], iterations: int = 100_000
) -> bytes:
@@ -109,18 +130,15 @@ def derive_key_from_password(
raise
def derive_key_from_password_argon2(
password: str,
fingerprint: Union[str, bytes],
*,
time_cost: int = 2,
memory_cost: int = 64 * 1024,
parallelism: int = 8,
) -> bytes:
def derive_key_from_password_argon2(password: str, kdf: KdfConfig) -> bytes:
"""Derive an encryption key from a password using Argon2id.
The defaults follow recommended parameters but omit a salt for deterministic
output. Smaller values may be supplied for testing.
Parameters
----------
password:
The user's password.
kdf:
:class:`KdfConfig` instance describing salt and tuning parameters.
"""
if not password:
@@ -131,17 +149,14 @@ def derive_key_from_password_argon2(
try:
from argon2.low_level import hash_secret_raw, Type
if isinstance(fingerprint, bytes):
salt = fingerprint
else:
salt = hashlib.sha256(fingerprint.encode()).digest()[:16]
params = kdf.params or {}
salt = base64.b64decode(kdf.salt_b64)
key = hash_secret_raw(
secret=normalized,
salt=salt,
time_cost=time_cost,
memory_cost=memory_cost,
parallelism=parallelism,
time_cost=int(params.get("time_cost", 2)),
memory_cost=int(params.get("memory_cost", 64 * 1024)),
parallelism=int(params.get("parallelism", 8)),
hash_len=32,
type=Type.ID,
)
@@ -194,16 +209,10 @@ def derive_key_from_parent_seed(parent_seed: str, fingerprint: str = None) -> by
def derive_index_key_seed_only(seed: str) -> bytes:
"""Derive a deterministic Fernet key from only the BIP-39 seed."""
"""Derive the index encryption key using the v1 hierarchy."""
seed_bytes = Bip39SeedGenerator(seed).Generate()
hkdf = HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=None,
info=b"password-db",
backend=default_backend(),
)
key = hkdf.derive(seed_bytes)
master = kd(seed_bytes, b"seedpass:v1:master")
key = kd(master, b"seedpass:v1:storage")
return base64.urlsafe_b64encode(key)
@@ -212,23 +221,21 @@ def derive_index_key(seed: str) -> bytes:
return derive_index_key_seed_only(seed)
def derive_totp_secret(seed: str, index: int) -> str:
"""Derive a base32-encoded TOTP secret from a BIP39 seed."""
def derive_totp_secret(seed: Union[str, bytes], index: int) -> str:
"""Derive a base32-encoded TOTP secret from a seed or raw key."""
try:
# Initialize BIP85 from the BIP39 seed bytes
seed_bytes = Bip39SeedGenerator(seed).Generate()
if isinstance(seed, (bytes, bytearray)):
seed_bytes = bytes(seed)
else:
seed_bytes = Bip39SeedGenerator(seed).Generate()
bip85 = BIP85(seed_bytes)
# Build the BIP32 path m/83696968'/39'/TOTP'/{index}'
totp_int = int.from_bytes(b"TOTP", "big")
path = f"m/83696968'/{TOTP_PURPOSE}'/{totp_int}'/{index}'"
# Derive entropy using the same scheme as BIP85
child_key = bip85.bip32_ctx.DerivePath(path)
key_bytes = child_key.PrivateKey().Raw().ToBytes()
entropy = hmac.new(b"bip-entropy-from-k", key_bytes, hashlib.sha512).digest()
# Hash the first 32 bytes of entropy and encode the first 20 bytes
hashed = hashlib.sha256(entropy[:32]).digest()
secret = base64.b32encode(hashed[:20]).decode("utf-8")
logger.debug(f"Derived TOTP secret for index {index}.")
@@ -267,18 +274,16 @@ def calibrate_argon2_time_cost(
"""
password = "benchmark"
fingerprint = b"argon2-calibration"
salt = base64.b64encode(b"argon2-calibration").decode()
time_cost = 1
elapsed_ms = 0.0
while time_cost <= max_time_cost:
start = time.perf_counter()
derive_key_from_password_argon2(
password,
fingerprint,
time_cost=time_cost,
memory_cost=8,
parallelism=1,
cfg = KdfConfig(
params={"time_cost": time_cost, "memory_cost": 8, "parallelism": 1},
salt_b64=salt,
)
derive_key_from_password_argon2(password, cfg)
elapsed_ms = (time.perf_counter() - start) * 1000
if elapsed_ms >= target_ms:
break

View File

@@ -0,0 +1,28 @@
"""Key hierarchy helper functions."""
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
def kd(root: bytes, info: bytes, length: int = 32) -> bytes:
"""Derive a sub-key from ``root`` using HKDF-SHA256.
Parameters
----------
root:
Root key material.
info:
Domain separation string.
length:
Length of the derived key in bytes. Defaults to 32.
"""
hkdf = HKDF(
algorithm=hashes.SHA256(),
length=length,
salt=None,
info=info,
backend=default_backend(),
)
return hkdf.derive(root)

View File

@@ -33,6 +33,12 @@ logger = logging.getLogger(__name__)
DEFAULT_MAX_ATTEMPTS = 5
def _env_password() -> str | None:
"""Return a password supplied via environment for non-interactive use."""
return os.getenv("SEEDPASS_TEST_PASSWORD") or os.getenv("SEEDPASS_PASSWORD")
def _get_max_attempts(override: int | None = None) -> int:
"""Return the configured maximum number of prompt attempts."""
@@ -80,6 +86,13 @@ def prompt_new_password(max_retries: int | None = None) -> str:
Raises:
PasswordPromptError: If the user fails to provide a valid password after multiple attempts.
"""
env_pw = _env_password()
if env_pw:
normalized = unicodedata.normalize("NFKD", env_pw)
if len(normalized) < MIN_PASSWORD_LENGTH:
raise PasswordPromptError("Environment password too short")
return normalized
max_retries = _get_max_attempts(max_retries)
attempts = 0
@@ -164,6 +177,10 @@ def prompt_existing_password(
PasswordPromptError: If the user interrupts the operation or exceeds
``max_retries`` attempts.
"""
env_pw = _env_password()
if env_pw:
return unicodedata.normalize("NFKD", env_pw)
max_retries = _get_max_attempts(max_retries)
attempts = 0
while max_retries == 0 or attempts < max_retries:

View File

@@ -102,9 +102,11 @@ def _masked_input_posix(prompt: str) -> str:
def masked_input(prompt: str) -> str:
"""Return input from the user while masking typed characters."""
if sys.platform == "win32":
return _masked_input_windows(prompt)
return _masked_input_posix(prompt)
func = _masked_input_windows if sys.platform == "win32" else _masked_input_posix
try:
return func(prompt)
except Exception: # pragma: no cover - fallback when TTY operations fail
return input(prompt)
def prompt_seed_words(count: int = 12, *, max_attempts: int | None = None) -> str: