Add KDF config with per-file metadata

This commit is contained in:
thePR0M3TH3AN
2025-08-19 09:53:46 -04:00
parent 1b6b0ab5c5
commit 06ca51993a
7 changed files with 221 additions and 76 deletions

View File

@@ -16,8 +16,9 @@ except Exception: # pragma: no cover - fallback for environments without orjson
import hashlib
import os
import base64
from dataclasses import asdict
from pathlib import Path
from typing import Optional
from typing import Optional, Tuple
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.exceptions import InvalidTag
@@ -26,6 +27,7 @@ from termcolor import colored
from utils.file_lock import exclusive_lock
from mnemonic import Mnemonic
from utils.password_prompt import prompt_existing_password
from utils.key_derivation import KdfConfig, CURRENT_KDF_VERSION
# Instantiate the logger
logger = logging.getLogger(__name__)
@@ -231,40 +233,58 @@ class EncryptionManager:
raise ValueError("Invalid path outside fingerprint directory")
return candidate
def encrypt_parent_seed(self, parent_seed: str) -> None:
def encrypt_parent_seed(
self, parent_seed: str, kdf: Optional[KdfConfig] = None
) -> None:
"""Encrypts and saves the parent seed to 'parent_seed.enc'."""
data = parent_seed.encode("utf-8")
encrypted_data = self.encrypt_data(data) # This now creates V2 format
with exclusive_lock(self.parent_seed_file) as fh:
fh.seek(0)
fh.truncate()
fh.write(encrypted_data)
os.chmod(self.parent_seed_file, 0o600)
self.encrypt_and_save_file(data, self.parent_seed_file, kdf=kdf)
logger.info(f"Parent seed encrypted and saved to '{self.parent_seed_file}'.")
def decrypt_parent_seed(self) -> str:
"""Decrypts and returns the parent seed, handling migration."""
with exclusive_lock(self.parent_seed_file) as fh:
fh.seek(0)
encrypted_data = fh.read()
blob = fh.read()
kdf, encrypted_data = self._deserialize(blob)
is_legacy = not encrypted_data.startswith(b"V2:")
decrypted_data = self.decrypt_data(encrypted_data, context="seed")
if is_legacy:
logger.info("Parent seed was in legacy format. Re-encrypting to V2 format.")
self.encrypt_parent_seed(decrypted_data.decode("utf-8").strip())
self.encrypt_parent_seed(decrypted_data.decode("utf-8").strip(), kdf=kdf)
return decrypted_data.decode("utf-8").strip()
def encrypt_and_save_file(self, data: bytes, relative_path: Path) -> None:
def _serialize(self, kdf: KdfConfig, ciphertext: bytes) -> bytes:
payload = {"kdf": asdict(kdf), "ct": base64.b64encode(ciphertext).decode()}
if USE_ORJSON:
return json_lib.dumps(payload)
return json_lib.dumps(payload, separators=(",", ":")).encode("utf-8")
def _deserialize(self, blob: bytes) -> Tuple[KdfConfig, bytes]:
if USE_ORJSON:
obj = json_lib.loads(blob)
else:
obj = json_lib.loads(blob.decode("utf-8"))
kdf = KdfConfig(**obj.get("kdf", {}))
ct = base64.b64decode(obj.get("ct", ""))
return kdf, ct
def encrypt_and_save_file(
self, data: bytes, relative_path: Path, *, kdf: Optional[KdfConfig] = None
) -> None:
if kdf is None:
kdf = KdfConfig()
file_path = self.resolve_relative_path(relative_path)
file_path.parent.mkdir(parents=True, exist_ok=True)
encrypted_data = self.encrypt_data(data)
payload = self._serialize(kdf, encrypted_data)
with exclusive_lock(file_path) as fh:
fh.seek(0)
fh.truncate()
fh.write(encrypted_data)
fh.write(payload)
fh.flush()
os.fsync(fh.fileno())
os.chmod(file_path, 0o600)
@@ -273,20 +293,37 @@ class EncryptionManager:
file_path = self.resolve_relative_path(relative_path)
with exclusive_lock(file_path) as fh:
fh.seek(0)
encrypted_data = fh.read()
blob = fh.read()
_, encrypted_data = self._deserialize(blob)
return self.decrypt_data(encrypted_data, context=str(relative_path))
def save_json_data(self, data: dict, relative_path: Optional[Path] = None) -> None:
def get_file_kdf(self, relative_path: Path) -> KdfConfig:
file_path = self.resolve_relative_path(relative_path)
with exclusive_lock(file_path) as fh:
fh.seek(0)
blob = fh.read()
kdf, _ = self._deserialize(blob)
return kdf
def save_json_data(
self,
data: dict,
relative_path: Optional[Path] = None,
*,
kdf: Optional[KdfConfig] = None,
) -> None:
if relative_path is None:
relative_path = Path("seedpass_entries_db.json.enc")
if USE_ORJSON:
json_data = json_lib.dumps(data)
else:
json_data = json_lib.dumps(data, separators=(",", ":")).encode("utf-8")
self.encrypt_and_save_file(json_data, relative_path)
self.encrypt_and_save_file(json_data, relative_path, kdf=kdf)
logger.debug(f"JSON data encrypted and saved to '{relative_path}'.")
def load_json_data(self, relative_path: Optional[Path] = None) -> dict:
def load_json_data(
self, relative_path: Optional[Path] = None, *, return_kdf: bool = False
) -> dict | Tuple[dict, KdfConfig]:
"""
Loads and decrypts JSON data, automatically migrating and re-saving
if it's in the legacy format.
@@ -299,8 +336,9 @@ class EncryptionManager:
with exclusive_lock(file_path) as fh:
fh.seek(0)
encrypted_data = fh.read()
blob = fh.read()
kdf, encrypted_data = self._deserialize(blob)
is_legacy = not encrypted_data.startswith(b"V2:")
self.last_migration_performed = False
@@ -316,10 +354,12 @@ class EncryptionManager:
# If it was a legacy file, re-save it in the new format now
if is_legacy and self._legacy_migrate_flag:
logger.info(f"Migrating and re-saving legacy vault file: {file_path}")
self.save_json_data(data, relative_path)
self.save_json_data(data, relative_path, kdf=kdf)
self.update_checksum(relative_path)
self.last_migration_performed = True
if return_kdf:
return data, kdf
return data
except (InvalidToken, InvalidTag, JSONDecodeError) as e:
logger.error(

View File

@@ -15,6 +15,7 @@ import logging
import os
import hashlib
import hmac
import base64
from typing import Optional, Literal, Any
import shutil
import time
@@ -46,6 +47,7 @@ from utils.key_derivation import (
derive_key_from_password_argon2,
derive_index_key,
EncryptionMode,
KdfConfig,
)
from utils.checksum import (
calculate_checksum,
@@ -689,9 +691,9 @@ class PasswordManager:
for iter_try in dict.fromkeys(iter_candidates):
try:
if mode == "argon2":
seed_key = derive_key_from_password_argon2(
password, salt_fp
)
salt = hashlib.sha256(salt_fp.encode()).digest()[:16]
cfg = KdfConfig(salt_b64=base64.b64encode(salt).decode())
seed_key = derive_key_from_password_argon2(password, cfg)
else:
seed_key = derive_key_from_password(
password, salt_fp, iterations=iter_try
@@ -771,7 +773,9 @@ class PasswordManager:
)
salt_fp = fingerprint_dir.name
if mode == "argon2":
seed_key = derive_key_from_password_argon2(password, salt_fp)
salt = hashlib.sha256(salt_fp.encode()).digest()[:16]
cfg = KdfConfig(salt_b64=base64.b64encode(salt).decode())
seed_key = derive_key_from_password_argon2(password, cfg)
else:
seed_key = derive_key_from_password(
password, salt_fp, iterations=iterations

View File

@@ -14,6 +14,7 @@ from .encryption import (
USE_ORJSON,
json_lib,
)
from utils.key_derivation import KdfConfig, CURRENT_KDF_VERSION
from utils.password_prompt import prompt_existing_password
@@ -38,6 +39,11 @@ class Vault:
"""Replace the internal encryption manager."""
self.encryption_manager = manager
def _hkdf_kdf(self) -> KdfConfig:
return KdfConfig(
name="hkdf", version=CURRENT_KDF_VERSION, params={}, salt_b64=""
)
# ----- Password index helpers -----
def load_index(self, *, return_migration_flags: bool = False):
"""Return decrypted password index data, applying migrations.
@@ -102,10 +108,24 @@ class Vault:
)
try:
data = self.encryption_manager.load_json_data(self.index_file)
data, kdf = self.encryption_manager.load_json_data(
self.index_file, return_kdf=True
)
migration_performed = getattr(
self.encryption_manager, "last_migration_performed", False
)
if kdf.version < CURRENT_KDF_VERSION:
new_kdf = KdfConfig(
name=kdf.name,
version=CURRENT_KDF_VERSION,
params=kdf.params,
salt_b64=kdf.salt_b64,
)
self.encryption_manager.save_json_data(
data, self.index_file, kdf=new_kdf
)
self.encryption_manager.update_checksum(self.index_file)
migration_performed = True
except LegacyFormatRequiresMigrationError:
print(
colored(
@@ -142,7 +162,9 @@ class Vault:
else:
data = json_lib.loads(decrypted.decode("utf-8"))
if self.encryption_manager._legacy_migrate_flag:
self.encryption_manager.save_json_data(data, self.index_file)
self.encryption_manager.save_json_data(
data, self.index_file, kdf=self._hkdf_kdf()
)
self.encryption_manager.update_checksum(self.index_file)
migration_performed = getattr(
self.encryption_manager, "last_migration_performed", False
@@ -181,7 +203,9 @@ class Vault:
try:
data = apply_migrations(data)
if schema_migrated:
self.encryption_manager.save_json_data(data, self.index_file)
self.encryption_manager.save_json_data(
data, self.index_file, kdf=self._hkdf_kdf()
)
self.encryption_manager.update_checksum(self.index_file)
except Exception as exc: # noqa: BLE001 - surface clear error and restore
if legacy_detected and backup_dir is not None:
@@ -214,7 +238,9 @@ class Vault:
def save_index(self, data: dict) -> None:
"""Encrypt and write password index."""
self.encryption_manager.save_json_data(data, self.index_file)
self.encryption_manager.save_json_data(
data, self.index_file, kdf=self._hkdf_kdf()
)
def get_encrypted_index(self) -> Optional[bytes]:
"""Return the encrypted index bytes if present."""
@@ -252,4 +278,6 @@ class Vault:
def save_config(self, config: dict) -> None:
"""Encrypt and persist configuration."""
self.encryption_manager.save_json_data(config, self.config_file)
self.encryption_manager.save_json_data(
config, self.config_file, kdf=self._hkdf_kdf()
)