Merge pull request #829 from PR0M3TH3AN/codex/implement-kdfconfig-dataclass-and-update-key-derivation

Introduce KDF config metadata and migration tests
This commit is contained in:
thePR0M3TH3AN
2025-08-20 17:44:50 -04:00
committed by GitHub
14 changed files with 289 additions and 95 deletions

View File

@@ -16,8 +16,9 @@ except Exception: # pragma: no cover - fallback for environments without orjson
import hashlib
import os
import base64
from dataclasses import asdict
from pathlib import Path
from typing import Optional
from typing import Optional, Tuple
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.exceptions import InvalidTag
@@ -26,6 +27,7 @@ from termcolor import colored
from utils.file_lock import exclusive_lock
from mnemonic import Mnemonic
from utils.password_prompt import prompt_existing_password
from utils.key_derivation import KdfConfig, CURRENT_KDF_VERSION
# Instantiate the logger
logger = logging.getLogger(__name__)
@@ -231,40 +233,76 @@ class EncryptionManager:
raise ValueError("Invalid path outside fingerprint directory")
return candidate
def encrypt_parent_seed(self, parent_seed: str) -> None:
def encrypt_parent_seed(
self, parent_seed: str, kdf: Optional[KdfConfig] = None
) -> None:
"""Encrypts and saves the parent seed to 'parent_seed.enc'."""
data = parent_seed.encode("utf-8")
encrypted_data = self.encrypt_data(data) # This now creates V2 format
with exclusive_lock(self.parent_seed_file) as fh:
fh.seek(0)
fh.truncate()
fh.write(encrypted_data)
os.chmod(self.parent_seed_file, 0o600)
self.encrypt_and_save_file(data, self.parent_seed_file, kdf=kdf)
logger.info(f"Parent seed encrypted and saved to '{self.parent_seed_file}'.")
def decrypt_parent_seed(self) -> str:
"""Decrypts and returns the parent seed, handling migration."""
with exclusive_lock(self.parent_seed_file) as fh:
fh.seek(0)
encrypted_data = fh.read()
blob = fh.read()
kdf, encrypted_data = self._deserialize(blob)
is_legacy = not encrypted_data.startswith(b"V2:")
decrypted_data = self.decrypt_data(encrypted_data, context="seed")
if is_legacy:
logger.info("Parent seed was in legacy format. Re-encrypting to V2 format.")
self.encrypt_parent_seed(decrypted_data.decode("utf-8").strip())
self.encrypt_parent_seed(decrypted_data.decode("utf-8").strip(), kdf=kdf)
return decrypted_data.decode("utf-8").strip()
def encrypt_and_save_file(self, data: bytes, relative_path: Path) -> None:
def _serialize(self, kdf: KdfConfig, ciphertext: bytes) -> bytes:
payload = {"kdf": asdict(kdf), "ct": base64.b64encode(ciphertext).decode()}
if USE_ORJSON:
return json_lib.dumps(payload)
return json_lib.dumps(payload, separators=(",", ":")).encode("utf-8")
def _deserialize(self, blob: bytes) -> Tuple[KdfConfig, bytes]:
"""Return ``(KdfConfig, ciphertext)`` from serialized *blob*.
Legacy files stored the raw ciphertext without a JSON wrapper. If
decoding the wrapper fails, treat ``blob`` as the ciphertext and return
a default HKDF configuration.
"""
try:
if USE_ORJSON:
obj = json_lib.loads(blob)
else:
obj = json_lib.loads(blob.decode("utf-8"))
kdf = KdfConfig(**obj.get("kdf", {}))
ct_b64 = obj.get("ct", "")
ciphertext = base64.b64decode(ct_b64)
if ciphertext:
return kdf, ciphertext
except Exception: # pragma: no cover - fall back to legacy path
pass
# Legacy format: ``blob`` already contains the ciphertext
return (
KdfConfig(name="hkdf", version=CURRENT_KDF_VERSION, params={}, salt_b64=""),
blob,
)
def encrypt_and_save_file(
self, data: bytes, relative_path: Path, *, kdf: Optional[KdfConfig] = None
) -> None:
if kdf is None:
kdf = KdfConfig()
file_path = self.resolve_relative_path(relative_path)
file_path.parent.mkdir(parents=True, exist_ok=True)
encrypted_data = self.encrypt_data(data)
payload = self._serialize(kdf, encrypted_data)
with exclusive_lock(file_path) as fh:
fh.seek(0)
fh.truncate()
fh.write(encrypted_data)
fh.write(payload)
fh.flush()
os.fsync(fh.fileno())
os.chmod(file_path, 0o600)
@@ -273,20 +311,37 @@ class EncryptionManager:
file_path = self.resolve_relative_path(relative_path)
with exclusive_lock(file_path) as fh:
fh.seek(0)
encrypted_data = fh.read()
blob = fh.read()
_, encrypted_data = self._deserialize(blob)
return self.decrypt_data(encrypted_data, context=str(relative_path))
def save_json_data(self, data: dict, relative_path: Optional[Path] = None) -> None:
def get_file_kdf(self, relative_path: Path) -> KdfConfig:
file_path = self.resolve_relative_path(relative_path)
with exclusive_lock(file_path) as fh:
fh.seek(0)
blob = fh.read()
kdf, _ = self._deserialize(blob)
return kdf
def save_json_data(
self,
data: dict,
relative_path: Optional[Path] = None,
*,
kdf: Optional[KdfConfig] = None,
) -> None:
if relative_path is None:
relative_path = Path("seedpass_entries_db.json.enc")
if USE_ORJSON:
json_data = json_lib.dumps(data)
else:
json_data = json_lib.dumps(data, separators=(",", ":")).encode("utf-8")
self.encrypt_and_save_file(json_data, relative_path)
self.encrypt_and_save_file(json_data, relative_path, kdf=kdf)
logger.debug(f"JSON data encrypted and saved to '{relative_path}'.")
def load_json_data(self, relative_path: Optional[Path] = None) -> dict:
def load_json_data(
self, relative_path: Optional[Path] = None, *, return_kdf: bool = False
) -> dict | Tuple[dict, KdfConfig]:
"""
Loads and decrypts JSON data, automatically migrating and re-saving
if it's in the legacy format.
@@ -295,12 +350,18 @@ class EncryptionManager:
relative_path = Path("seedpass_entries_db.json.enc")
file_path = self.resolve_relative_path(relative_path)
if not file_path.exists():
return {"entries": {}}
empty: dict = {"entries": {}}
if return_kdf:
return empty, KdfConfig(
name="hkdf", version=CURRENT_KDF_VERSION, params={}, salt_b64=""
)
return empty
with exclusive_lock(file_path) as fh:
fh.seek(0)
encrypted_data = fh.read()
blob = fh.read()
kdf, encrypted_data = self._deserialize(blob)
is_legacy = not encrypted_data.startswith(b"V2:")
self.last_migration_performed = False
@@ -316,10 +377,12 @@ class EncryptionManager:
# If it was a legacy file, re-save it in the new format now
if is_legacy and self._legacy_migrate_flag:
logger.info(f"Migrating and re-saving legacy vault file: {file_path}")
self.save_json_data(data, relative_path)
self.save_json_data(data, relative_path, kdf=kdf)
self.update_checksum(relative_path)
self.last_migration_performed = True
if return_kdf:
return data, kdf
return data
except (InvalidToken, InvalidTag, JSONDecodeError) as e:
logger.error(
@@ -360,7 +423,8 @@ class EncryptionManager:
if relative_path is None:
relative_path = Path("seedpass_entries_db.json.enc")
is_legacy = not encrypted_data.startswith(b"V2:")
kdf, ciphertext = self._deserialize(encrypted_data)
is_legacy = not ciphertext.startswith(b"V2:")
self.last_migration_performed = False
def _process(decrypted: bytes) -> dict:
@@ -386,11 +450,9 @@ class EncryptionManager:
return data
try:
decrypted_data = self.decrypt_data(
encrypted_data, context=str(relative_path)
)
decrypted_data = self.decrypt_data(ciphertext, context=str(relative_path))
data = _process(decrypted_data)
self.save_json_data(data, relative_path) # This always saves in V2 format
self.save_json_data(data, relative_path, kdf=kdf)
self.update_checksum(relative_path)
logger.info("Index file from Nostr was processed and saved successfully.")
self.last_migration_performed = is_legacy
@@ -401,10 +463,10 @@ class EncryptionManager:
"Enter your master password for legacy decryption: "
)
decrypted_data = self.decrypt_legacy(
encrypted_data, password, context=str(relative_path)
ciphertext, password, context=str(relative_path)
)
data = _process(decrypted_data)
self.save_json_data(data, relative_path)
self.save_json_data(data, relative_path, kdf=kdf)
self.update_checksum(relative_path)
logger.warning(
"Index decrypted using legacy password-only key derivation."

View File

@@ -15,6 +15,7 @@ import logging
import os
import hashlib
import hmac
import base64
from typing import Optional, Literal, Any
import shutil
import time
@@ -46,6 +47,7 @@ from utils.key_derivation import (
derive_key_from_password_argon2,
derive_index_key,
EncryptionMode,
KdfConfig,
)
from utils.checksum import (
calculate_checksum,
@@ -689,9 +691,9 @@ class PasswordManager:
for iter_try in dict.fromkeys(iter_candidates):
try:
if mode == "argon2":
seed_key = derive_key_from_password_argon2(
password, salt_fp
)
salt = hashlib.sha256(salt_fp.encode()).digest()[:16]
cfg = KdfConfig(salt_b64=base64.b64encode(salt).decode())
seed_key = derive_key_from_password_argon2(password, cfg)
else:
seed_key = derive_key_from_password(
password, salt_fp, iterations=iter_try
@@ -771,7 +773,9 @@ class PasswordManager:
)
salt_fp = fingerprint_dir.name
if mode == "argon2":
seed_key = derive_key_from_password_argon2(password, salt_fp)
salt = hashlib.sha256(salt_fp.encode()).digest()[:16]
cfg = KdfConfig(salt_b64=base64.b64encode(salt).decode())
seed_key = derive_key_from_password_argon2(password, cfg)
else:
seed_key = derive_key_from_password(
password, salt_fp, iterations=iterations

View File

@@ -14,6 +14,7 @@ from .encryption import (
USE_ORJSON,
json_lib,
)
from utils.key_derivation import KdfConfig, CURRENT_KDF_VERSION
from utils.password_prompt import prompt_existing_password
@@ -38,6 +39,11 @@ class Vault:
"""Replace the internal encryption manager."""
self.encryption_manager = manager
def _hkdf_kdf(self) -> KdfConfig:
return KdfConfig(
name="hkdf", version=CURRENT_KDF_VERSION, params={}, salt_b64=""
)
# ----- Password index helpers -----
def load_index(self, *, return_migration_flags: bool = False):
"""Return decrypted password index data, applying migrations.
@@ -102,10 +108,24 @@ class Vault:
)
try:
data = self.encryption_manager.load_json_data(self.index_file)
data, kdf = self.encryption_manager.load_json_data(
self.index_file, return_kdf=True
)
migration_performed = getattr(
self.encryption_manager, "last_migration_performed", False
)
if kdf.version < CURRENT_KDF_VERSION:
new_kdf = KdfConfig(
name=kdf.name,
version=CURRENT_KDF_VERSION,
params=kdf.params,
salt_b64=kdf.salt_b64,
)
self.encryption_manager.save_json_data(
data, self.index_file, kdf=new_kdf
)
self.encryption_manager.update_checksum(self.index_file)
migration_performed = True
except LegacyFormatRequiresMigrationError:
print(
colored(
@@ -142,7 +162,9 @@ class Vault:
else:
data = json_lib.loads(decrypted.decode("utf-8"))
if self.encryption_manager._legacy_migrate_flag:
self.encryption_manager.save_json_data(data, self.index_file)
self.encryption_manager.save_json_data(
data, self.index_file, kdf=self._hkdf_kdf()
)
self.encryption_manager.update_checksum(self.index_file)
migration_performed = getattr(
self.encryption_manager, "last_migration_performed", False
@@ -181,7 +203,9 @@ class Vault:
try:
data = apply_migrations(data)
if schema_migrated:
self.encryption_manager.save_json_data(data, self.index_file)
self.encryption_manager.save_json_data(
data, self.index_file, kdf=self._hkdf_kdf()
)
self.encryption_manager.update_checksum(self.index_file)
except Exception as exc: # noqa: BLE001 - surface clear error and restore
if legacy_detected and backup_dir is not None:
@@ -214,7 +238,9 @@ class Vault:
def save_index(self, data: dict) -> None:
"""Encrypt and write password index."""
self.encryption_manager.save_json_data(data, self.index_file)
self.encryption_manager.save_json_data(
data, self.index_file, kdf=self._hkdf_kdf()
)
def get_encrypted_index(self) -> Optional[bytes]:
"""Return the encrypted index bytes if present."""
@@ -252,4 +278,6 @@ class Vault:
def save_config(self, config: dict) -> None:
"""Encrypt and persist configuration."""
self.encryption_manager.save_json_data(config, self.config_file)
self.encryption_manager.save_json_data(
config, self.config_file, kdf=self._hkdf_kdf()
)

View File

@@ -3,11 +3,15 @@ from pathlib import Path
from hypothesis import given, strategies as st, settings, HealthCheck
from mnemonic import Mnemonic
import hashlib
import base64
import os
from utils.key_derivation import (
derive_key_from_password,
derive_key_from_password_argon2,
derive_index_key,
KdfConfig,
)
from utils.fingerprint import generate_fingerprint
from seedpass.core.encryption import EncryptionManager
@@ -36,16 +40,27 @@ def test_fuzz_key_round_trip(password, seed_bytes, config, mode, tmp_path: Path)
seed_phrase = Mnemonic("english").to_mnemonic(seed_bytes)
fp = generate_fingerprint(seed_phrase)
if mode == "argon2":
key = derive_key_from_password_argon2(
password, fp, time_cost=1, memory_cost=8, parallelism=1
cfg = KdfConfig(
params={"time_cost": 1, "memory_cost": 8, "parallelism": 1},
salt_b64=base64.b64encode(
hashlib.sha256(fp.encode()).digest()[:16]
).decode(),
)
key = derive_key_from_password_argon2(password, cfg)
else:
key = derive_key_from_password(password, fp, iterations=1)
cfg = KdfConfig(
name="pbkdf2",
params={"iterations": 1},
salt_b64=base64.b64encode(
hashlib.sha256(fp.encode()).digest()[:16]
).decode(),
)
enc_mgr = EncryptionManager(key, tmp_path)
# Parent seed round trip
enc_mgr.encrypt_parent_seed(seed_phrase)
enc_mgr.encrypt_parent_seed(seed_phrase, kdf=cfg)
assert enc_mgr.decrypt_parent_seed() == seed_phrase
# JSON data round trip

View File

@@ -1,4 +1,6 @@
import bcrypt
import hashlib
import base64
from pathlib import Path
from tempfile import TemporaryDirectory
from types import SimpleNamespace
@@ -7,6 +9,7 @@ from utils.key_derivation import (
derive_key_from_password,
derive_key_from_password_argon2,
derive_index_key,
KdfConfig,
)
from seedpass.core.encryption import EncryptionManager
from seedpass.core.vault import Vault
@@ -21,10 +24,24 @@ def _setup_profile(tmp: Path, mode: str):
argon_kwargs = dict(time_cost=1, memory_cost=8, parallelism=1)
fp = tmp.name
if mode == "argon2":
seed_key = derive_key_from_password_argon2(TEST_PASSWORD, fp, **argon_kwargs)
cfg = KdfConfig(
params=argon_kwargs,
salt_b64=base64.b64encode(
hashlib.sha256(fp.encode()).digest()[:16]
).decode(),
)
seed_key = derive_key_from_password_argon2(TEST_PASSWORD, cfg)
EncryptionManager(seed_key, tmp).encrypt_parent_seed(TEST_SEED, kdf=cfg)
else:
seed_key = derive_key_from_password(TEST_PASSWORD, fp, iterations=1)
EncryptionManager(seed_key, tmp).encrypt_parent_seed(TEST_SEED)
cfg = KdfConfig(
name="pbkdf2",
params={"iterations": 1},
salt_b64=base64.b64encode(
hashlib.sha256(fp.encode()).digest()[:16]
).decode(),
)
EncryptionManager(seed_key, tmp).encrypt_parent_seed(TEST_SEED, kdf=cfg)
index_key = derive_index_key(TEST_SEED)
enc_mgr = EncryptionManager(index_key, tmp)
@@ -65,9 +82,9 @@ def test_setup_encryption_manager_kdf_modes(monkeypatch):
)
if mode == "argon2":
monkeypatch.setattr(
"seedpass.core.manager.derive_key_from_password_argon2",
lambda pw, fp: derive_key_from_password_argon2(
pw, fp, **argon_kwargs
"seedpass.core.manager.KdfConfig",
lambda salt_b64, **_: KdfConfig(
params=argon_kwargs, salt_b64=salt_b64
),
)
monkeypatch.setattr(PasswordManager, "initialize_bip85", lambda self: None)
@@ -76,3 +93,26 @@ def test_setup_encryption_manager_kdf_modes(monkeypatch):
)
assert pm.setup_encryption_manager(path, exit_on_fail=False)
assert pm.parent_seed == TEST_SEED
def test_kdf_param_round_trip(tmp_path):
cfg = KdfConfig(
params={"time_cost": 3, "memory_cost": 32, "parallelism": 1},
salt_b64=base64.b64encode(b"static-salt-1234").decode(),
)
key = derive_key_from_password_argon2(TEST_PASSWORD, cfg)
mgr = EncryptionManager(key, tmp_path)
mgr.encrypt_parent_seed(TEST_SEED, kdf=cfg)
stored = mgr.get_file_kdf(Path("parent_seed.enc"))
assert stored.params == cfg.params
def test_vault_kdf_migration(tmp_path):
index_key = derive_index_key(TEST_SEED)
mgr = EncryptionManager(index_key, tmp_path)
vault = Vault(mgr, tmp_path)
old_kdf = KdfConfig(name="hkdf", version=0, params={}, salt_b64="")
mgr.save_json_data({"entries": {}}, vault.index_file, kdf=old_kdf)
vault.load_index()
new_kdf = mgr.get_file_kdf(vault.index_file)
assert new_kdf.version == KdfConfig().version

View File

@@ -1,11 +1,15 @@
import logging
import pytest
import logging
import hashlib
import base64
from utils.fingerprint import generate_fingerprint
from utils.key_derivation import (
derive_key_from_password,
derive_key_from_password_argon2,
derive_index_key_seed_only,
derive_index_key,
KdfConfig,
)
@@ -48,15 +52,17 @@ def test_argon2_fingerprint_affects_key():
fp1 = generate_fingerprint("seed one")
fp2 = generate_fingerprint("seed two")
k1 = derive_key_from_password_argon2(
password, fp1, time_cost=1, memory_cost=8, parallelism=1
cfg1 = KdfConfig(
params={"time_cost": 1, "memory_cost": 8, "parallelism": 1},
salt_b64=base64.b64encode(hashlib.sha256(fp1.encode()).digest()[:16]).decode(),
)
k2 = derive_key_from_password_argon2(
password, fp1, time_cost=1, memory_cost=8, parallelism=1
)
k3 = derive_key_from_password_argon2(
password, fp2, time_cost=1, memory_cost=8, parallelism=1
cfg2 = KdfConfig(
params={"time_cost": 1, "memory_cost": 8, "parallelism": 1},
salt_b64=base64.b64encode(hashlib.sha256(fp2.encode()).digest()[:16]).decode(),
)
k1 = derive_key_from_password_argon2(password, cfg1)
k2 = derive_key_from_password_argon2(password, cfg1)
k3 = derive_key_from_password_argon2(password, cfg2)
assert k1 == k2
assert k1 != k3

View File

@@ -1,4 +1,5 @@
import json
import base64
import hashlib
from pathlib import Path
@@ -99,7 +100,8 @@ def test_migrated_index_has_v2_prefix(monkeypatch, tmp_path: Path):
vault.load_index()
new_file = tmp_path / "seedpass_entries_db.json.enc"
assert new_file.read_bytes().startswith(b"V2:")
payload = json.loads(new_file.read_text())
assert base64.b64decode(payload["ct"]).startswith(b"V2:")
assert vault.migrated_from_legacy

View File

@@ -66,5 +66,5 @@ def test_migrate_iterations(tmp_path, monkeypatch, iterations):
cfg = ConfigManager(vault, tmp_path)
assert cfg.get_kdf_iterations() == iterations
content = (tmp_path / "seedpass_entries_db.json.enc").read_bytes()
assert content.startswith(b"V2:")
payload = json.loads((tmp_path / "seedpass_entries_db.json.enc").read_text())
assert base64.b64decode(payload["ct"]).startswith(b"V2:")

View File

@@ -50,6 +50,6 @@ def test_migrate_legacy_sets_flag(tmp_path, monkeypatch):
monkeypatch.setattr(vault_module, "prompt_existing_password", lambda _: password)
monkeypatch.setattr("builtins.input", lambda _: "2")
vault.load_index()
content = (tmp_path / "seedpass_entries_db.json.enc").read_bytes()
assert content.startswith(b"V2:")
payload = json.loads((tmp_path / "seedpass_entries_db.json.enc").read_text())
assert base64.b64decode(payload["ct"]).startswith(b"V2:")
assert vault.encryption_manager.last_migration_performed is True

View File

@@ -1,4 +1,5 @@
import json
import base64
import hashlib
from pathlib import Path
from types import SimpleNamespace
@@ -34,7 +35,8 @@ def test_legacy_migration_second_session(monkeypatch, tmp_path: Path) -> None:
monkeypatch.setattr("builtins.input", lambda *_a, **_k: "y")
vault.load_index()
new_file = fp_dir / "seedpass_entries_db.json.enc"
assert new_file.read_bytes().startswith(b"V2:")
payload = json.loads(new_file.read_text())
assert base64.b64decode(payload["ct"]).startswith(b"V2:")
new_enc_mgr = EncryptionManager(key, fp_dir)
new_vault = Vault(new_enc_mgr, fp_dir)
@@ -59,4 +61,5 @@ def test_legacy_migration_second_session(monkeypatch, tmp_path: Path) -> None:
)
pm.initialize_managers()
assert new_file.read_bytes().startswith(b"V2:")
payload = json.loads(new_file.read_text())
assert base64.b64decode(payload["ct"]).startswith(b"V2:")

View File

@@ -1,4 +1,6 @@
import sys
import json
import base64
from pathlib import Path
from cryptography.fernet import Fernet
@@ -28,4 +30,5 @@ def test_parent_seed_migrates_from_fernet(tmp_path: Path) -> None:
assert new_file.exists()
assert new_file.read_bytes() != encrypted
assert new_file.read_bytes().startswith(b"V2:")
payload = json.loads(new_file.read_text())
assert base64.b64decode(payload["ct"]).startswith(b"V2:")

View File

@@ -3,15 +3,13 @@
"""
Key Derivation Module
Never ever ever use or suggest to use Random Salt. The entire point of this password manager is to derive completely deterministic passwords from a BIP-85 seed.
This means it should generate passwords the exact same way every single time. Salts would break this functionality and is not appropriate for this software's use case.
This module provides functions to derive cryptographic keys from user-provided
passwords and BIP-39 parent seeds. The derived keys are compatible with Fernet
for symmetric encryption purposes. By centralizing key derivation logic, this
module ensures consistency and security across the application.
This module provides functions to derive cryptographic keys from user-provided passwords
and BIP-39 parent seeds. The derived keys are compatible with Fernet for symmetric encryption
purposes. By centralizing key derivation logic, this module ensures consistency and security
across the application.
Ensure that all dependencies are installed and properly configured in your environment.
Ensure that all dependencies are installed and properly configured in your
environment.
"""
import os
@@ -21,8 +19,9 @@ import unicodedata
import logging
import hmac
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional, Union
from typing import Optional, Union, Dict, Any
from bip_utils import Bip39SeedGenerator
from local_bip85 import BIP85
@@ -47,6 +46,27 @@ DEFAULT_ENCRYPTION_MODE = EncryptionMode.SEED_ONLY
TOTP_PURPOSE = 39
@dataclass
class KdfConfig:
"""Configuration block describing how a key was derived."""
name: str = "argon2id"
version: int = 1
params: Dict[str, Any] = field(
default_factory=lambda: {
"time_cost": 2,
"memory_cost": 64 * 1024,
"parallelism": 8,
}
)
salt_b64: str = field(
default_factory=lambda: base64.b64encode(os.urandom(16)).decode()
)
CURRENT_KDF_VERSION = 1
def derive_key_from_password(
password: str, fingerprint: Union[str, bytes], iterations: int = 100_000
) -> bytes:
@@ -109,18 +129,15 @@ def derive_key_from_password(
raise
def derive_key_from_password_argon2(
password: str,
fingerprint: Union[str, bytes],
*,
time_cost: int = 2,
memory_cost: int = 64 * 1024,
parallelism: int = 8,
) -> bytes:
def derive_key_from_password_argon2(password: str, kdf: KdfConfig) -> bytes:
"""Derive an encryption key from a password using Argon2id.
The defaults follow recommended parameters but omit a salt for deterministic
output. Smaller values may be supplied for testing.
Parameters
----------
password:
The user's password.
kdf:
:class:`KdfConfig` instance describing salt and tuning parameters.
"""
if not password:
@@ -131,17 +148,14 @@ def derive_key_from_password_argon2(
try:
from argon2.low_level import hash_secret_raw, Type
if isinstance(fingerprint, bytes):
salt = fingerprint
else:
salt = hashlib.sha256(fingerprint.encode()).digest()[:16]
params = kdf.params or {}
salt = base64.b64decode(kdf.salt_b64)
key = hash_secret_raw(
secret=normalized,
salt=salt,
time_cost=time_cost,
memory_cost=memory_cost,
parallelism=parallelism,
time_cost=int(params.get("time_cost", 2)),
memory_cost=int(params.get("memory_cost", 64 * 1024)),
parallelism=int(params.get("parallelism", 8)),
hash_len=32,
type=Type.ID,
)
@@ -267,18 +281,16 @@ def calibrate_argon2_time_cost(
"""
password = "benchmark"
fingerprint = b"argon2-calibration"
salt = base64.b64encode(b"argon2-calibration").decode()
time_cost = 1
elapsed_ms = 0.0
while time_cost <= max_time_cost:
start = time.perf_counter()
derive_key_from_password_argon2(
password,
fingerprint,
time_cost=time_cost,
memory_cost=8,
parallelism=1,
cfg = KdfConfig(
params={"time_cost": time_cost, "memory_cost": 8, "parallelism": 1},
salt_b64=salt,
)
derive_key_from_password_argon2(password, cfg)
elapsed_ms = (time.perf_counter() - start) * 1000
if elapsed_ms >= target_ms:
break

View File

@@ -33,6 +33,12 @@ logger = logging.getLogger(__name__)
DEFAULT_MAX_ATTEMPTS = 5
def _env_password() -> str | None:
"""Return a password supplied via environment for non-interactive use."""
return os.getenv("SEEDPASS_TEST_PASSWORD") or os.getenv("SEEDPASS_PASSWORD")
def _get_max_attempts(override: int | None = None) -> int:
"""Return the configured maximum number of prompt attempts."""
@@ -80,6 +86,13 @@ def prompt_new_password(max_retries: int | None = None) -> str:
Raises:
PasswordPromptError: If the user fails to provide a valid password after multiple attempts.
"""
env_pw = _env_password()
if env_pw:
normalized = unicodedata.normalize("NFKD", env_pw)
if len(normalized) < MIN_PASSWORD_LENGTH:
raise PasswordPromptError("Environment password too short")
return normalized
max_retries = _get_max_attempts(max_retries)
attempts = 0
@@ -164,6 +177,10 @@ def prompt_existing_password(
PasswordPromptError: If the user interrupts the operation or exceeds
``max_retries`` attempts.
"""
env_pw = _env_password()
if env_pw:
return unicodedata.normalize("NFKD", env_pw)
max_retries = _get_max_attempts(max_retries)
attempts = 0
while max_retries == 0 or attempts < max_retries:

View File

@@ -102,9 +102,11 @@ def _masked_input_posix(prompt: str) -> str:
def masked_input(prompt: str) -> str:
"""Return input from the user while masking typed characters."""
if sys.platform == "win32":
return _masked_input_windows(prompt)
return _masked_input_posix(prompt)
func = _masked_input_windows if sys.platform == "win32" else _masked_input_posix
try:
return func(prompt)
except Exception: # pragma: no cover - fallback when TTY operations fail
return input(prompt)
def prompt_seed_words(count: int = 12, *, max_attempts: int | None = None) -> str: