mirror of
https://github.com/PR0M3TH3AN/SeedPass.git
synced 2025-09-08 07:18:47 +00:00
Fix KDF metadata handling and headless password prompts
This commit is contained in:
@@ -264,13 +264,31 @@ class EncryptionManager:
|
||||
return json_lib.dumps(payload, separators=(",", ":")).encode("utf-8")
|
||||
|
||||
def _deserialize(self, blob: bytes) -> Tuple[KdfConfig, bytes]:
|
||||
if USE_ORJSON:
|
||||
obj = json_lib.loads(blob)
|
||||
else:
|
||||
obj = json_lib.loads(blob.decode("utf-8"))
|
||||
kdf = KdfConfig(**obj.get("kdf", {}))
|
||||
ct = base64.b64decode(obj.get("ct", ""))
|
||||
return kdf, ct
|
||||
"""Return ``(KdfConfig, ciphertext)`` from serialized *blob*.
|
||||
|
||||
Legacy files stored the raw ciphertext without a JSON wrapper. If
|
||||
decoding the wrapper fails, treat ``blob`` as the ciphertext and return
|
||||
a default HKDF configuration.
|
||||
"""
|
||||
|
||||
try:
|
||||
if USE_ORJSON:
|
||||
obj = json_lib.loads(blob)
|
||||
else:
|
||||
obj = json_lib.loads(blob.decode("utf-8"))
|
||||
kdf = KdfConfig(**obj.get("kdf", {}))
|
||||
ct_b64 = obj.get("ct", "")
|
||||
ciphertext = base64.b64decode(ct_b64)
|
||||
if ciphertext:
|
||||
return kdf, ciphertext
|
||||
except Exception: # pragma: no cover - fall back to legacy path
|
||||
pass
|
||||
|
||||
# Legacy format: ``blob`` already contains the ciphertext
|
||||
return (
|
||||
KdfConfig(name="hkdf", version=CURRENT_KDF_VERSION, params={}, salt_b64=""),
|
||||
blob,
|
||||
)
|
||||
|
||||
def encrypt_and_save_file(
|
||||
self, data: bytes, relative_path: Path, *, kdf: Optional[KdfConfig] = None
|
||||
@@ -332,7 +350,12 @@ class EncryptionManager:
|
||||
relative_path = Path("seedpass_entries_db.json.enc")
|
||||
file_path = self.resolve_relative_path(relative_path)
|
||||
if not file_path.exists():
|
||||
return {"entries": {}}
|
||||
empty: dict = {"entries": {}}
|
||||
if return_kdf:
|
||||
return empty, KdfConfig(
|
||||
name="hkdf", version=CURRENT_KDF_VERSION, params={}, salt_b64=""
|
||||
)
|
||||
return empty
|
||||
|
||||
with exclusive_lock(file_path) as fh:
|
||||
fh.seek(0)
|
||||
@@ -400,7 +423,8 @@ class EncryptionManager:
|
||||
if relative_path is None:
|
||||
relative_path = Path("seedpass_entries_db.json.enc")
|
||||
|
||||
is_legacy = not encrypted_data.startswith(b"V2:")
|
||||
kdf, ciphertext = self._deserialize(encrypted_data)
|
||||
is_legacy = not ciphertext.startswith(b"V2:")
|
||||
self.last_migration_performed = False
|
||||
|
||||
def _process(decrypted: bytes) -> dict:
|
||||
@@ -426,11 +450,9 @@ class EncryptionManager:
|
||||
return data
|
||||
|
||||
try:
|
||||
decrypted_data = self.decrypt_data(
|
||||
encrypted_data, context=str(relative_path)
|
||||
)
|
||||
decrypted_data = self.decrypt_data(ciphertext, context=str(relative_path))
|
||||
data = _process(decrypted_data)
|
||||
self.save_json_data(data, relative_path) # This always saves in V2 format
|
||||
self.save_json_data(data, relative_path, kdf=kdf)
|
||||
self.update_checksum(relative_path)
|
||||
logger.info("Index file from Nostr was processed and saved successfully.")
|
||||
self.last_migration_performed = is_legacy
|
||||
@@ -441,10 +463,10 @@ class EncryptionManager:
|
||||
"Enter your master password for legacy decryption: "
|
||||
)
|
||||
decrypted_data = self.decrypt_legacy(
|
||||
encrypted_data, password, context=str(relative_path)
|
||||
ciphertext, password, context=str(relative_path)
|
||||
)
|
||||
data = _process(decrypted_data)
|
||||
self.save_json_data(data, relative_path)
|
||||
self.save_json_data(data, relative_path, kdf=kdf)
|
||||
self.update_checksum(relative_path)
|
||||
logger.warning(
|
||||
"Index decrypted using legacy password-only key derivation."
|
||||
|
@@ -1,4 +1,5 @@
|
||||
import json
|
||||
import base64
|
||||
import hashlib
|
||||
from pathlib import Path
|
||||
|
||||
@@ -99,7 +100,8 @@ def test_migrated_index_has_v2_prefix(monkeypatch, tmp_path: Path):
|
||||
vault.load_index()
|
||||
|
||||
new_file = tmp_path / "seedpass_entries_db.json.enc"
|
||||
assert new_file.read_bytes().startswith(b"V2:")
|
||||
payload = json.loads(new_file.read_text())
|
||||
assert base64.b64decode(payload["ct"]).startswith(b"V2:")
|
||||
assert vault.migrated_from_legacy
|
||||
|
||||
|
||||
|
@@ -66,5 +66,5 @@ def test_migrate_iterations(tmp_path, monkeypatch, iterations):
|
||||
cfg = ConfigManager(vault, tmp_path)
|
||||
assert cfg.get_kdf_iterations() == iterations
|
||||
|
||||
content = (tmp_path / "seedpass_entries_db.json.enc").read_bytes()
|
||||
assert content.startswith(b"V2:")
|
||||
payload = json.loads((tmp_path / "seedpass_entries_db.json.enc").read_text())
|
||||
assert base64.b64decode(payload["ct"]).startswith(b"V2:")
|
||||
|
@@ -50,6 +50,6 @@ def test_migrate_legacy_sets_flag(tmp_path, monkeypatch):
|
||||
monkeypatch.setattr(vault_module, "prompt_existing_password", lambda _: password)
|
||||
monkeypatch.setattr("builtins.input", lambda _: "2")
|
||||
vault.load_index()
|
||||
content = (tmp_path / "seedpass_entries_db.json.enc").read_bytes()
|
||||
assert content.startswith(b"V2:")
|
||||
payload = json.loads((tmp_path / "seedpass_entries_db.json.enc").read_text())
|
||||
assert base64.b64decode(payload["ct"]).startswith(b"V2:")
|
||||
assert vault.encryption_manager.last_migration_performed is True
|
||||
|
@@ -1,4 +1,5 @@
|
||||
import json
|
||||
import base64
|
||||
import hashlib
|
||||
from pathlib import Path
|
||||
from types import SimpleNamespace
|
||||
@@ -34,7 +35,8 @@ def test_legacy_migration_second_session(monkeypatch, tmp_path: Path) -> None:
|
||||
monkeypatch.setattr("builtins.input", lambda *_a, **_k: "y")
|
||||
vault.load_index()
|
||||
new_file = fp_dir / "seedpass_entries_db.json.enc"
|
||||
assert new_file.read_bytes().startswith(b"V2:")
|
||||
payload = json.loads(new_file.read_text())
|
||||
assert base64.b64decode(payload["ct"]).startswith(b"V2:")
|
||||
|
||||
new_enc_mgr = EncryptionManager(key, fp_dir)
|
||||
new_vault = Vault(new_enc_mgr, fp_dir)
|
||||
@@ -59,4 +61,5 @@ def test_legacy_migration_second_session(monkeypatch, tmp_path: Path) -> None:
|
||||
)
|
||||
|
||||
pm.initialize_managers()
|
||||
assert new_file.read_bytes().startswith(b"V2:")
|
||||
payload = json.loads(new_file.read_text())
|
||||
assert base64.b64decode(payload["ct"]).startswith(b"V2:")
|
||||
|
@@ -1,4 +1,6 @@
|
||||
import sys
|
||||
import json
|
||||
import base64
|
||||
from pathlib import Path
|
||||
from cryptography.fernet import Fernet
|
||||
|
||||
@@ -28,4 +30,5 @@ def test_parent_seed_migrates_from_fernet(tmp_path: Path) -> None:
|
||||
|
||||
assert new_file.exists()
|
||||
assert new_file.read_bytes() != encrypted
|
||||
assert new_file.read_bytes().startswith(b"V2:")
|
||||
payload = json.loads(new_file.read_text())
|
||||
assert base64.b64decode(payload["ct"]).startswith(b"V2:")
|
||||
|
@@ -33,6 +33,12 @@ logger = logging.getLogger(__name__)
|
||||
DEFAULT_MAX_ATTEMPTS = 5
|
||||
|
||||
|
||||
def _env_password() -> str | None:
|
||||
"""Return a password supplied via environment for non-interactive use."""
|
||||
|
||||
return os.getenv("SEEDPASS_TEST_PASSWORD") or os.getenv("SEEDPASS_PASSWORD")
|
||||
|
||||
|
||||
def _get_max_attempts(override: int | None = None) -> int:
|
||||
"""Return the configured maximum number of prompt attempts."""
|
||||
|
||||
@@ -80,6 +86,13 @@ def prompt_new_password(max_retries: int | None = None) -> str:
|
||||
Raises:
|
||||
PasswordPromptError: If the user fails to provide a valid password after multiple attempts.
|
||||
"""
|
||||
env_pw = _env_password()
|
||||
if env_pw:
|
||||
normalized = unicodedata.normalize("NFKD", env_pw)
|
||||
if len(normalized) < MIN_PASSWORD_LENGTH:
|
||||
raise PasswordPromptError("Environment password too short")
|
||||
return normalized
|
||||
|
||||
max_retries = _get_max_attempts(max_retries)
|
||||
attempts = 0
|
||||
|
||||
@@ -164,6 +177,10 @@ def prompt_existing_password(
|
||||
PasswordPromptError: If the user interrupts the operation or exceeds
|
||||
``max_retries`` attempts.
|
||||
"""
|
||||
env_pw = _env_password()
|
||||
if env_pw:
|
||||
return unicodedata.normalize("NFKD", env_pw)
|
||||
|
||||
max_retries = _get_max_attempts(max_retries)
|
||||
attempts = 0
|
||||
while max_retries == 0 or attempts < max_retries:
|
||||
|
@@ -102,9 +102,11 @@ def _masked_input_posix(prompt: str) -> str:
|
||||
|
||||
def masked_input(prompt: str) -> str:
|
||||
"""Return input from the user while masking typed characters."""
|
||||
if sys.platform == "win32":
|
||||
return _masked_input_windows(prompt)
|
||||
return _masked_input_posix(prompt)
|
||||
func = _masked_input_windows if sys.platform == "win32" else _masked_input_posix
|
||||
try:
|
||||
return func(prompt)
|
||||
except Exception: # pragma: no cover - fallback when TTY operations fail
|
||||
return input(prompt)
|
||||
|
||||
|
||||
def prompt_seed_words(count: int = 12, *, max_attempts: int | None = None) -> str:
|
||||
|
Reference in New Issue
Block a user