mirror of
https://github.com/PR0M3TH3AN/SeedPass.git
synced 2025-09-08 15:28:44 +00:00
Compare commits
8 Commits
1b6b0ab5c5
...
15df3f10a6
Author | SHA1 | Date | |
---|---|---|---|
![]() |
15df3f10a6 | ||
![]() |
b451097c65 | ||
![]() |
9cacd1b13d | ||
![]() |
b97d60778b | ||
![]() |
bbb26ca55a | ||
![]() |
d6e03d5e7a | ||
![]() |
26632c0e70 | ||
![]() |
06ca51993a |
@@ -39,6 +39,11 @@ This project is written in **Python**. Follow these instructions when working wi
|
|||||||
|
|
||||||
Following these practices helps keep the code base consistent and secure.
|
Following these practices helps keep the code base consistent and secure.
|
||||||
|
|
||||||
|
## Deterministic Artifact Generation
|
||||||
|
|
||||||
|
- All generated artifacts (passwords, keys, TOTP secrets, etc.) must be fully deterministic across runs and platforms.
|
||||||
|
- Randomness is only permitted for security primitives (e.g., encryption nonces, in-memory keys) and must never influence derived artifacts.
|
||||||
|
|
||||||
## Legacy Index Migration
|
## Legacy Index Migration
|
||||||
|
|
||||||
- Always provide a migration path for index archives and import/export routines.
|
- Always provide a migration path for index archives and import/export routines.
|
||||||
|
@@ -1415,9 +1415,10 @@ def main(argv: list[str] | None = None, *, fingerprint: str | None = None) -> in
|
|||||||
if entry.get("type") != EntryType.TOTP.value:
|
if entry.get("type") != EntryType.TOTP.value:
|
||||||
print(colored("Entry is not a TOTP entry.", "red"))
|
print(colored("Entry is not a TOTP entry.", "red"))
|
||||||
return 1
|
return 1
|
||||||
code = password_manager.entry_manager.get_totp_code(
|
key = getattr(password_manager, "KEY_TOTP_DET", None) or getattr(
|
||||||
idx, password_manager.parent_seed
|
password_manager, "parent_seed", None
|
||||||
)
|
)
|
||||||
|
code = password_manager.entry_manager.get_totp_code(idx, key)
|
||||||
print(code)
|
print(code)
|
||||||
try:
|
try:
|
||||||
if copy_to_clipboard(code, password_manager.clipboard_clear_delay):
|
if copy_to_clipboard(code, password_manager.clipboard_clear_delay):
|
||||||
|
@@ -464,7 +464,8 @@ def export_totp(
|
|||||||
_check_token(request, authorization)
|
_check_token(request, authorization)
|
||||||
_require_password(request, password)
|
_require_password(request, password)
|
||||||
pm = _get_pm(request)
|
pm = _get_pm(request)
|
||||||
return pm.entry_manager.export_totp_entries(pm.parent_seed)
|
key = getattr(pm, "KEY_TOTP_DET", None) or getattr(pm, "parent_seed", None)
|
||||||
|
return pm.entry_manager.export_totp_entries(key)
|
||||||
|
|
||||||
|
|
||||||
@app.get("/api/v1/totp")
|
@app.get("/api/v1/totp")
|
||||||
@@ -482,7 +483,8 @@ def get_totp_codes(
|
|||||||
)
|
)
|
||||||
codes = []
|
codes = []
|
||||||
for idx, label, _u, _url, _arch in entries:
|
for idx, label, _u, _url, _arch in entries:
|
||||||
code = pm.entry_manager.get_totp_code(idx, pm.parent_seed)
|
key = getattr(pm, "KEY_TOTP_DET", None) or getattr(pm, "parent_seed", None)
|
||||||
|
code = pm.entry_manager.get_totp_code(idx, key)
|
||||||
|
|
||||||
rem = pm.entry_manager.get_totp_time_remaining(idx)
|
rem = pm.entry_manager.get_totp_time_remaining(idx)
|
||||||
|
|
||||||
|
@@ -305,9 +305,10 @@ class EntryService:
|
|||||||
|
|
||||||
def get_totp_code(self, entry_id: int) -> str:
|
def get_totp_code(self, entry_id: int) -> str:
|
||||||
with self._lock:
|
with self._lock:
|
||||||
return self._manager.entry_manager.get_totp_code(
|
key = getattr(self._manager, "KEY_TOTP_DET", None) or getattr(
|
||||||
entry_id, self._manager.parent_seed
|
self._manager, "parent_seed", None
|
||||||
)
|
)
|
||||||
|
return self._manager.entry_manager.get_totp_code(entry_id, key)
|
||||||
|
|
||||||
def add_entry(
|
def add_entry(
|
||||||
self,
|
self,
|
||||||
@@ -515,9 +516,10 @@ class EntryService:
|
|||||||
|
|
||||||
def export_totp_entries(self) -> dict:
|
def export_totp_entries(self) -> dict:
|
||||||
with self._lock:
|
with self._lock:
|
||||||
return self._manager.entry_manager.export_totp_entries(
|
key = getattr(self._manager, "KEY_TOTP_DET", None) or getattr(
|
||||||
self._manager.parent_seed
|
self._manager, "parent_seed", None
|
||||||
)
|
)
|
||||||
|
return self._manager.entry_manager.export_totp_entries(key)
|
||||||
|
|
||||||
def display_totp_codes(self) -> None:
|
def display_totp_codes(self) -> None:
|
||||||
with self._lock:
|
with self._lock:
|
||||||
|
@@ -16,8 +16,9 @@ except Exception: # pragma: no cover - fallback for environments without orjson
|
|||||||
import hashlib
|
import hashlib
|
||||||
import os
|
import os
|
||||||
import base64
|
import base64
|
||||||
|
from dataclasses import asdict
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Optional
|
from typing import Optional, Tuple
|
||||||
|
|
||||||
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
||||||
from cryptography.exceptions import InvalidTag
|
from cryptography.exceptions import InvalidTag
|
||||||
@@ -26,6 +27,7 @@ from termcolor import colored
|
|||||||
from utils.file_lock import exclusive_lock
|
from utils.file_lock import exclusive_lock
|
||||||
from mnemonic import Mnemonic
|
from mnemonic import Mnemonic
|
||||||
from utils.password_prompt import prompt_existing_password
|
from utils.password_prompt import prompt_existing_password
|
||||||
|
from utils.key_derivation import KdfConfig, CURRENT_KDF_VERSION
|
||||||
|
|
||||||
# Instantiate the logger
|
# Instantiate the logger
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@@ -231,40 +233,76 @@ class EncryptionManager:
|
|||||||
raise ValueError("Invalid path outside fingerprint directory")
|
raise ValueError("Invalid path outside fingerprint directory")
|
||||||
return candidate
|
return candidate
|
||||||
|
|
||||||
def encrypt_parent_seed(self, parent_seed: str) -> None:
|
def encrypt_parent_seed(
|
||||||
|
self, parent_seed: str, kdf: Optional[KdfConfig] = None
|
||||||
|
) -> None:
|
||||||
"""Encrypts and saves the parent seed to 'parent_seed.enc'."""
|
"""Encrypts and saves the parent seed to 'parent_seed.enc'."""
|
||||||
data = parent_seed.encode("utf-8")
|
data = parent_seed.encode("utf-8")
|
||||||
encrypted_data = self.encrypt_data(data) # This now creates V2 format
|
self.encrypt_and_save_file(data, self.parent_seed_file, kdf=kdf)
|
||||||
with exclusive_lock(self.parent_seed_file) as fh:
|
|
||||||
fh.seek(0)
|
|
||||||
fh.truncate()
|
|
||||||
fh.write(encrypted_data)
|
|
||||||
os.chmod(self.parent_seed_file, 0o600)
|
|
||||||
logger.info(f"Parent seed encrypted and saved to '{self.parent_seed_file}'.")
|
logger.info(f"Parent seed encrypted and saved to '{self.parent_seed_file}'.")
|
||||||
|
|
||||||
def decrypt_parent_seed(self) -> str:
|
def decrypt_parent_seed(self) -> str:
|
||||||
"""Decrypts and returns the parent seed, handling migration."""
|
"""Decrypts and returns the parent seed, handling migration."""
|
||||||
with exclusive_lock(self.parent_seed_file) as fh:
|
with exclusive_lock(self.parent_seed_file) as fh:
|
||||||
fh.seek(0)
|
fh.seek(0)
|
||||||
encrypted_data = fh.read()
|
blob = fh.read()
|
||||||
|
|
||||||
|
kdf, encrypted_data = self._deserialize(blob)
|
||||||
is_legacy = not encrypted_data.startswith(b"V2:")
|
is_legacy = not encrypted_data.startswith(b"V2:")
|
||||||
decrypted_data = self.decrypt_data(encrypted_data, context="seed")
|
decrypted_data = self.decrypt_data(encrypted_data, context="seed")
|
||||||
|
|
||||||
if is_legacy:
|
if is_legacy:
|
||||||
logger.info("Parent seed was in legacy format. Re-encrypting to V2 format.")
|
logger.info("Parent seed was in legacy format. Re-encrypting to V2 format.")
|
||||||
self.encrypt_parent_seed(decrypted_data.decode("utf-8").strip())
|
self.encrypt_parent_seed(decrypted_data.decode("utf-8").strip(), kdf=kdf)
|
||||||
|
|
||||||
return decrypted_data.decode("utf-8").strip()
|
return decrypted_data.decode("utf-8").strip()
|
||||||
|
|
||||||
def encrypt_and_save_file(self, data: bytes, relative_path: Path) -> None:
|
def _serialize(self, kdf: KdfConfig, ciphertext: bytes) -> bytes:
|
||||||
|
payload = {"kdf": asdict(kdf), "ct": base64.b64encode(ciphertext).decode()}
|
||||||
|
if USE_ORJSON:
|
||||||
|
return json_lib.dumps(payload)
|
||||||
|
return json_lib.dumps(payload, separators=(",", ":")).encode("utf-8")
|
||||||
|
|
||||||
|
def _deserialize(self, blob: bytes) -> Tuple[KdfConfig, bytes]:
|
||||||
|
"""Return ``(KdfConfig, ciphertext)`` from serialized *blob*.
|
||||||
|
|
||||||
|
Legacy files stored the raw ciphertext without a JSON wrapper. If
|
||||||
|
decoding the wrapper fails, treat ``blob`` as the ciphertext and return
|
||||||
|
a default HKDF configuration.
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
if USE_ORJSON:
|
||||||
|
obj = json_lib.loads(blob)
|
||||||
|
else:
|
||||||
|
obj = json_lib.loads(blob.decode("utf-8"))
|
||||||
|
kdf = KdfConfig(**obj.get("kdf", {}))
|
||||||
|
ct_b64 = obj.get("ct", "")
|
||||||
|
ciphertext = base64.b64decode(ct_b64)
|
||||||
|
if ciphertext:
|
||||||
|
return kdf, ciphertext
|
||||||
|
except Exception: # pragma: no cover - fall back to legacy path
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Legacy format: ``blob`` already contains the ciphertext
|
||||||
|
return (
|
||||||
|
KdfConfig(name="hkdf", version=CURRENT_KDF_VERSION, params={}, salt_b64=""),
|
||||||
|
blob,
|
||||||
|
)
|
||||||
|
|
||||||
|
def encrypt_and_save_file(
|
||||||
|
self, data: bytes, relative_path: Path, *, kdf: Optional[KdfConfig] = None
|
||||||
|
) -> None:
|
||||||
|
if kdf is None:
|
||||||
|
kdf = KdfConfig()
|
||||||
file_path = self.resolve_relative_path(relative_path)
|
file_path = self.resolve_relative_path(relative_path)
|
||||||
file_path.parent.mkdir(parents=True, exist_ok=True)
|
file_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
encrypted_data = self.encrypt_data(data)
|
encrypted_data = self.encrypt_data(data)
|
||||||
|
payload = self._serialize(kdf, encrypted_data)
|
||||||
with exclusive_lock(file_path) as fh:
|
with exclusive_lock(file_path) as fh:
|
||||||
fh.seek(0)
|
fh.seek(0)
|
||||||
fh.truncate()
|
fh.truncate()
|
||||||
fh.write(encrypted_data)
|
fh.write(payload)
|
||||||
fh.flush()
|
fh.flush()
|
||||||
os.fsync(fh.fileno())
|
os.fsync(fh.fileno())
|
||||||
os.chmod(file_path, 0o600)
|
os.chmod(file_path, 0o600)
|
||||||
@@ -273,20 +311,37 @@ class EncryptionManager:
|
|||||||
file_path = self.resolve_relative_path(relative_path)
|
file_path = self.resolve_relative_path(relative_path)
|
||||||
with exclusive_lock(file_path) as fh:
|
with exclusive_lock(file_path) as fh:
|
||||||
fh.seek(0)
|
fh.seek(0)
|
||||||
encrypted_data = fh.read()
|
blob = fh.read()
|
||||||
|
_, encrypted_data = self._deserialize(blob)
|
||||||
return self.decrypt_data(encrypted_data, context=str(relative_path))
|
return self.decrypt_data(encrypted_data, context=str(relative_path))
|
||||||
|
|
||||||
def save_json_data(self, data: dict, relative_path: Optional[Path] = None) -> None:
|
def get_file_kdf(self, relative_path: Path) -> KdfConfig:
|
||||||
|
file_path = self.resolve_relative_path(relative_path)
|
||||||
|
with exclusive_lock(file_path) as fh:
|
||||||
|
fh.seek(0)
|
||||||
|
blob = fh.read()
|
||||||
|
kdf, _ = self._deserialize(blob)
|
||||||
|
return kdf
|
||||||
|
|
||||||
|
def save_json_data(
|
||||||
|
self,
|
||||||
|
data: dict,
|
||||||
|
relative_path: Optional[Path] = None,
|
||||||
|
*,
|
||||||
|
kdf: Optional[KdfConfig] = None,
|
||||||
|
) -> None:
|
||||||
if relative_path is None:
|
if relative_path is None:
|
||||||
relative_path = Path("seedpass_entries_db.json.enc")
|
relative_path = Path("seedpass_entries_db.json.enc")
|
||||||
if USE_ORJSON:
|
if USE_ORJSON:
|
||||||
json_data = json_lib.dumps(data)
|
json_data = json_lib.dumps(data)
|
||||||
else:
|
else:
|
||||||
json_data = json_lib.dumps(data, separators=(",", ":")).encode("utf-8")
|
json_data = json_lib.dumps(data, separators=(",", ":")).encode("utf-8")
|
||||||
self.encrypt_and_save_file(json_data, relative_path)
|
self.encrypt_and_save_file(json_data, relative_path, kdf=kdf)
|
||||||
logger.debug(f"JSON data encrypted and saved to '{relative_path}'.")
|
logger.debug(f"JSON data encrypted and saved to '{relative_path}'.")
|
||||||
|
|
||||||
def load_json_data(self, relative_path: Optional[Path] = None) -> dict:
|
def load_json_data(
|
||||||
|
self, relative_path: Optional[Path] = None, *, return_kdf: bool = False
|
||||||
|
) -> dict | Tuple[dict, KdfConfig]:
|
||||||
"""
|
"""
|
||||||
Loads and decrypts JSON data, automatically migrating and re-saving
|
Loads and decrypts JSON data, automatically migrating and re-saving
|
||||||
if it's in the legacy format.
|
if it's in the legacy format.
|
||||||
@@ -295,12 +350,18 @@ class EncryptionManager:
|
|||||||
relative_path = Path("seedpass_entries_db.json.enc")
|
relative_path = Path("seedpass_entries_db.json.enc")
|
||||||
file_path = self.resolve_relative_path(relative_path)
|
file_path = self.resolve_relative_path(relative_path)
|
||||||
if not file_path.exists():
|
if not file_path.exists():
|
||||||
return {"entries": {}}
|
empty: dict = {"entries": {}}
|
||||||
|
if return_kdf:
|
||||||
|
return empty, KdfConfig(
|
||||||
|
name="hkdf", version=CURRENT_KDF_VERSION, params={}, salt_b64=""
|
||||||
|
)
|
||||||
|
return empty
|
||||||
|
|
||||||
with exclusive_lock(file_path) as fh:
|
with exclusive_lock(file_path) as fh:
|
||||||
fh.seek(0)
|
fh.seek(0)
|
||||||
encrypted_data = fh.read()
|
blob = fh.read()
|
||||||
|
|
||||||
|
kdf, encrypted_data = self._deserialize(blob)
|
||||||
is_legacy = not encrypted_data.startswith(b"V2:")
|
is_legacy = not encrypted_data.startswith(b"V2:")
|
||||||
self.last_migration_performed = False
|
self.last_migration_performed = False
|
||||||
|
|
||||||
@@ -316,10 +377,12 @@ class EncryptionManager:
|
|||||||
# If it was a legacy file, re-save it in the new format now
|
# If it was a legacy file, re-save it in the new format now
|
||||||
if is_legacy and self._legacy_migrate_flag:
|
if is_legacy and self._legacy_migrate_flag:
|
||||||
logger.info(f"Migrating and re-saving legacy vault file: {file_path}")
|
logger.info(f"Migrating and re-saving legacy vault file: {file_path}")
|
||||||
self.save_json_data(data, relative_path)
|
self.save_json_data(data, relative_path, kdf=kdf)
|
||||||
self.update_checksum(relative_path)
|
self.update_checksum(relative_path)
|
||||||
self.last_migration_performed = True
|
self.last_migration_performed = True
|
||||||
|
|
||||||
|
if return_kdf:
|
||||||
|
return data, kdf
|
||||||
return data
|
return data
|
||||||
except (InvalidToken, InvalidTag, JSONDecodeError) as e:
|
except (InvalidToken, InvalidTag, JSONDecodeError) as e:
|
||||||
logger.error(
|
logger.error(
|
||||||
@@ -360,7 +423,8 @@ class EncryptionManager:
|
|||||||
if relative_path is None:
|
if relative_path is None:
|
||||||
relative_path = Path("seedpass_entries_db.json.enc")
|
relative_path = Path("seedpass_entries_db.json.enc")
|
||||||
|
|
||||||
is_legacy = not encrypted_data.startswith(b"V2:")
|
kdf, ciphertext = self._deserialize(encrypted_data)
|
||||||
|
is_legacy = not ciphertext.startswith(b"V2:")
|
||||||
self.last_migration_performed = False
|
self.last_migration_performed = False
|
||||||
|
|
||||||
def _process(decrypted: bytes) -> dict:
|
def _process(decrypted: bytes) -> dict:
|
||||||
@@ -386,11 +450,9 @@ class EncryptionManager:
|
|||||||
return data
|
return data
|
||||||
|
|
||||||
try:
|
try:
|
||||||
decrypted_data = self.decrypt_data(
|
decrypted_data = self.decrypt_data(ciphertext, context=str(relative_path))
|
||||||
encrypted_data, context=str(relative_path)
|
|
||||||
)
|
|
||||||
data = _process(decrypted_data)
|
data = _process(decrypted_data)
|
||||||
self.save_json_data(data, relative_path) # This always saves in V2 format
|
self.save_json_data(data, relative_path, kdf=kdf)
|
||||||
self.update_checksum(relative_path)
|
self.update_checksum(relative_path)
|
||||||
logger.info("Index file from Nostr was processed and saved successfully.")
|
logger.info("Index file from Nostr was processed and saved successfully.")
|
||||||
self.last_migration_performed = is_legacy
|
self.last_migration_performed = is_legacy
|
||||||
@@ -401,10 +463,10 @@ class EncryptionManager:
|
|||||||
"Enter your master password for legacy decryption: "
|
"Enter your master password for legacy decryption: "
|
||||||
)
|
)
|
||||||
decrypted_data = self.decrypt_legacy(
|
decrypted_data = self.decrypt_legacy(
|
||||||
encrypted_data, password, context=str(relative_path)
|
ciphertext, password, context=str(relative_path)
|
||||||
)
|
)
|
||||||
data = _process(decrypted_data)
|
data = _process(decrypted_data)
|
||||||
self.save_json_data(data, relative_path)
|
self.save_json_data(data, relative_path, kdf=kdf)
|
||||||
self.update_checksum(relative_path)
|
self.update_checksum(relative_path)
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"Index decrypted using legacy password-only key derivation."
|
"Index decrypted using legacy password-only key derivation."
|
||||||
|
@@ -257,7 +257,7 @@ class EntryManager:
|
|||||||
def add_totp(
|
def add_totp(
|
||||||
self,
|
self,
|
||||||
label: str,
|
label: str,
|
||||||
parent_seed: str,
|
parent_seed: str | bytes,
|
||||||
*,
|
*,
|
||||||
archived: bool = False,
|
archived: bool = False,
|
||||||
secret: str | None = None,
|
secret: str | None = None,
|
||||||
@@ -689,7 +689,10 @@ class EntryManager:
|
|||||||
return derive_seed_phrase(bip85, seed_index, words)
|
return derive_seed_phrase(bip85, seed_index, words)
|
||||||
|
|
||||||
def get_totp_code(
|
def get_totp_code(
|
||||||
self, index: int, parent_seed: str | None = None, timestamp: int | None = None
|
self,
|
||||||
|
index: int,
|
||||||
|
parent_seed: str | bytes | None = None,
|
||||||
|
timestamp: int | None = None,
|
||||||
) -> str:
|
) -> str:
|
||||||
"""Return the current TOTP code for the specified entry."""
|
"""Return the current TOTP code for the specified entry."""
|
||||||
entry = self.retrieve_entry(index)
|
entry = self.retrieve_entry(index)
|
||||||
@@ -719,7 +722,9 @@ class EntryManager:
|
|||||||
period = int(entry.get("period", 30))
|
period = int(entry.get("period", 30))
|
||||||
return TotpManager.time_remaining(period)
|
return TotpManager.time_remaining(period)
|
||||||
|
|
||||||
def export_totp_entries(self, parent_seed: str) -> dict[str, list[dict[str, Any]]]:
|
def export_totp_entries(
|
||||||
|
self, parent_seed: str | bytes
|
||||||
|
) -> dict[str, list[dict[str, Any]]]:
|
||||||
"""Return all TOTP secrets and metadata for external use."""
|
"""Return all TOTP secrets and metadata for external use."""
|
||||||
data = self._load_index()
|
data = self._load_index()
|
||||||
entries = data.get("entries", {})
|
entries = data.get("entries", {})
|
||||||
|
@@ -15,6 +15,7 @@ import logging
|
|||||||
import os
|
import os
|
||||||
import hashlib
|
import hashlib
|
||||||
import hmac
|
import hmac
|
||||||
|
import base64
|
||||||
from typing import Optional, Literal, Any
|
from typing import Optional, Literal, Any
|
||||||
import shutil
|
import shutil
|
||||||
import time
|
import time
|
||||||
@@ -46,7 +47,9 @@ from utils.key_derivation import (
|
|||||||
derive_key_from_password_argon2,
|
derive_key_from_password_argon2,
|
||||||
derive_index_key,
|
derive_index_key,
|
||||||
EncryptionMode,
|
EncryptionMode,
|
||||||
|
KdfConfig,
|
||||||
)
|
)
|
||||||
|
from utils.key_hierarchy import kd
|
||||||
from utils.checksum import (
|
from utils.checksum import (
|
||||||
calculate_checksum,
|
calculate_checksum,
|
||||||
verify_checksum,
|
verify_checksum,
|
||||||
@@ -230,6 +233,13 @@ class PasswordManager:
|
|||||||
verification, ensuring the integrity and confidentiality of the stored password database.
|
verification, ensuring the integrity and confidentiality of the stored password database.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
# Class-level fallbacks so attributes exist even if ``__init__`` is bypassed
|
||||||
|
master_key: bytes | None = None
|
||||||
|
KEY_STORAGE: bytes | None = None
|
||||||
|
KEY_INDEX: bytes | None = None
|
||||||
|
KEY_PW_DERIVE: bytes | None = None
|
||||||
|
KEY_TOTP_DET: bytes | None = None
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self, fingerprint: Optional[str] = None, *, password: Optional[str] = None
|
self, fingerprint: Optional[str] = None, *, password: Optional[str] = None
|
||||||
) -> None:
|
) -> None:
|
||||||
@@ -262,6 +272,13 @@ class PasswordManager:
|
|||||||
self._bip85_cache: dict[tuple[int, int], bytes] = {}
|
self._bip85_cache: dict[tuple[int, int], bytes] = {}
|
||||||
self.audit_logger: Optional[AuditLogger] = None
|
self.audit_logger: Optional[AuditLogger] = None
|
||||||
|
|
||||||
|
# Derived key hierarchy
|
||||||
|
self.master_key: bytes | None = None
|
||||||
|
self.KEY_STORAGE: bytes | None = None
|
||||||
|
self.KEY_INDEX: bytes | None = None
|
||||||
|
self.KEY_PW_DERIVE: bytes | None = None
|
||||||
|
self.KEY_TOTP_DET: bytes | None = None
|
||||||
|
|
||||||
# Track changes to trigger periodic Nostr sync
|
# Track changes to trigger periodic Nostr sync
|
||||||
self.is_dirty: bool = False
|
self.is_dirty: bool = False
|
||||||
self.last_update: float = time.time()
|
self.last_update: float = time.time()
|
||||||
@@ -322,6 +339,30 @@ class PasswordManager:
|
|||||||
|
|
||||||
self._bip85_cache.clear()
|
self._bip85_cache.clear()
|
||||||
|
|
||||||
|
def derive_key_hierarchy(self, seed_bytes: bytes) -> None:
|
||||||
|
"""Populate sub-keys from ``seed_bytes`` using HKDF."""
|
||||||
|
|
||||||
|
master = kd(seed_bytes, b"seedpass:v1:master")
|
||||||
|
self.master_key = master
|
||||||
|
self.KEY_STORAGE = kd(master, b"seedpass:v1:storage")
|
||||||
|
self.KEY_INDEX = kd(master, b"seedpass:v1:index")
|
||||||
|
self.KEY_PW_DERIVE = kd(master, b"seedpass:v1:pw")
|
||||||
|
self.KEY_TOTP_DET = kd(master, b"seedpass:v1:totp")
|
||||||
|
|
||||||
|
def ensure_key_hierarchy(self) -> None:
|
||||||
|
"""Ensure sub-keys are derived from the current parent seed."""
|
||||||
|
if (
|
||||||
|
self.KEY_STORAGE is None
|
||||||
|
or self.KEY_INDEX is None
|
||||||
|
or self.KEY_PW_DERIVE is None
|
||||||
|
or self.KEY_TOTP_DET is None
|
||||||
|
) and getattr(self, "parent_seed", None):
|
||||||
|
try:
|
||||||
|
seed_bytes = Bip39SeedGenerator(self.parent_seed).Generate()
|
||||||
|
except Exception:
|
||||||
|
seed_bytes = hashlib.sha256(self.parent_seed.encode()).digest()
|
||||||
|
self.derive_key_hierarchy(seed_bytes)
|
||||||
|
|
||||||
def ensure_script_checksum(self) -> None:
|
def ensure_script_checksum(self) -> None:
|
||||||
"""Initialize or verify the checksum of the manager script."""
|
"""Initialize or verify the checksum of the manager script."""
|
||||||
script_path = Path(__file__).resolve()
|
script_path = Path(__file__).resolve()
|
||||||
@@ -479,15 +520,12 @@ class PasswordManager:
|
|||||||
self.setup_encryption_manager(self.fingerprint_dir, password)
|
self.setup_encryption_manager(self.fingerprint_dir, password)
|
||||||
self.initialize_bip85()
|
self.initialize_bip85()
|
||||||
self.initialize_managers()
|
self.initialize_managers()
|
||||||
|
self.ensure_key_hierarchy()
|
||||||
self.is_locked = False
|
self.is_locked = False
|
||||||
self.locked = False
|
self.locked = False
|
||||||
self.update_activity()
|
self.update_activity()
|
||||||
if (
|
if getattr(self, "audit_logger", None) is None and self.KEY_INDEX is not None:
|
||||||
getattr(self, "audit_logger", None) is None
|
self.audit_logger = AuditLogger(self.KEY_INDEX)
|
||||||
and getattr(self, "_parent_seed_secret", None) is not None
|
|
||||||
):
|
|
||||||
key = hashlib.sha256(self.parent_seed.encode("utf-8")).digest()
|
|
||||||
self.audit_logger = AuditLogger(key)
|
|
||||||
if (
|
if (
|
||||||
getattr(self, "config_manager", None)
|
getattr(self, "config_manager", None)
|
||||||
and self.config_manager.get_quick_unlock()
|
and self.config_manager.get_quick_unlock()
|
||||||
@@ -689,9 +727,9 @@ class PasswordManager:
|
|||||||
for iter_try in dict.fromkeys(iter_candidates):
|
for iter_try in dict.fromkeys(iter_candidates):
|
||||||
try:
|
try:
|
||||||
if mode == "argon2":
|
if mode == "argon2":
|
||||||
seed_key = derive_key_from_password_argon2(
|
salt = hashlib.sha256(salt_fp.encode()).digest()[:16]
|
||||||
password, salt_fp
|
cfg = KdfConfig(salt_b64=base64.b64encode(salt).decode())
|
||||||
)
|
seed_key = derive_key_from_password_argon2(password, cfg)
|
||||||
else:
|
else:
|
||||||
seed_key = derive_key_from_password(
|
seed_key = derive_key_from_password(
|
||||||
password, salt_fp, iterations=iter_try
|
password, salt_fp, iterations=iter_try
|
||||||
@@ -718,9 +756,10 @@ class PasswordManager:
|
|||||||
password = None
|
password = None
|
||||||
continue
|
continue
|
||||||
|
|
||||||
key = derive_index_key(self.parent_seed)
|
seed_bytes = Bip39SeedGenerator(self.parent_seed).Generate()
|
||||||
|
self.derive_key_hierarchy(seed_bytes)
|
||||||
self.encryption_manager = EncryptionManager(key, fingerprint_dir)
|
key_b64 = base64.urlsafe_b64encode(self.KEY_STORAGE)
|
||||||
|
self.encryption_manager = EncryptionManager(key_b64, fingerprint_dir)
|
||||||
self.vault = Vault(self.encryption_manager, fingerprint_dir)
|
self.vault = Vault(self.encryption_manager, fingerprint_dir)
|
||||||
|
|
||||||
self.config_manager = ConfigManager(
|
self.config_manager = ConfigManager(
|
||||||
@@ -771,7 +810,9 @@ class PasswordManager:
|
|||||||
)
|
)
|
||||||
salt_fp = fingerprint_dir.name
|
salt_fp = fingerprint_dir.name
|
||||||
if mode == "argon2":
|
if mode == "argon2":
|
||||||
seed_key = derive_key_from_password_argon2(password, salt_fp)
|
salt = hashlib.sha256(salt_fp.encode()).digest()[:16]
|
||||||
|
cfg = KdfConfig(salt_b64=base64.b64encode(salt).decode())
|
||||||
|
seed_key = derive_key_from_password_argon2(password, cfg)
|
||||||
else:
|
else:
|
||||||
seed_key = derive_key_from_password(
|
seed_key = derive_key_from_password(
|
||||||
password, salt_fp, iterations=iterations
|
password, salt_fp, iterations=iterations
|
||||||
@@ -779,6 +820,7 @@ class PasswordManager:
|
|||||||
seed_mgr = EncryptionManager(seed_key, fingerprint_dir)
|
seed_mgr = EncryptionManager(seed_key, fingerprint_dir)
|
||||||
self.parent_seed = seed_mgr.decrypt_parent_seed()
|
self.parent_seed = seed_mgr.decrypt_parent_seed()
|
||||||
seed_bytes = Bip39SeedGenerator(self.parent_seed).Generate()
|
seed_bytes = Bip39SeedGenerator(self.parent_seed).Generate()
|
||||||
|
self.derive_key_hierarchy(seed_bytes)
|
||||||
self.bip85 = BIP85(seed_bytes)
|
self.bip85 = BIP85(seed_bytes)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to load parent seed: {e}", exc_info=True)
|
logger.error(f"Failed to load parent seed: {e}", exc_info=True)
|
||||||
@@ -808,8 +850,10 @@ class PasswordManager:
|
|||||||
self.fingerprint_dir = account_dir
|
self.fingerprint_dir = account_dir
|
||||||
self.parent_seed = seed
|
self.parent_seed = seed
|
||||||
|
|
||||||
key = derive_index_key(seed)
|
seed_bytes = Bip39SeedGenerator(seed).Generate()
|
||||||
self.encryption_manager = EncryptionManager(key, account_dir)
|
self.derive_key_hierarchy(seed_bytes)
|
||||||
|
key_b64 = base64.urlsafe_b64encode(self.KEY_STORAGE)
|
||||||
|
self.encryption_manager = EncryptionManager(key_b64, account_dir)
|
||||||
self.vault = Vault(self.encryption_manager, account_dir)
|
self.vault = Vault(self.encryption_manager, account_dir)
|
||||||
|
|
||||||
self.initialize_bip85()
|
self.initialize_bip85()
|
||||||
@@ -828,9 +872,13 @@ class PasswordManager:
|
|||||||
self.current_fingerprint = fp
|
self.current_fingerprint = fp
|
||||||
self.fingerprint_dir = path
|
self.fingerprint_dir = path
|
||||||
self.parent_seed = seed
|
self.parent_seed = seed
|
||||||
|
try:
|
||||||
key = derive_index_key(seed)
|
seed_bytes = Bip39SeedGenerator(seed).Generate()
|
||||||
self.encryption_manager = EncryptionManager(key, path)
|
self.derive_key_hierarchy(seed_bytes)
|
||||||
|
key_b64 = base64.urlsafe_b64encode(self.KEY_STORAGE)
|
||||||
|
except Exception:
|
||||||
|
key_b64 = derive_index_key(seed)
|
||||||
|
self.encryption_manager = EncryptionManager(key_b64, path)
|
||||||
self.vault = Vault(self.encryption_manager, path)
|
self.vault = Vault(self.encryption_manager, path)
|
||||||
|
|
||||||
self.initialize_bip85()
|
self.initialize_bip85()
|
||||||
@@ -896,10 +944,14 @@ class PasswordManager:
|
|||||||
password, selected_fingerprint, iterations=iterations
|
password, selected_fingerprint, iterations=iterations
|
||||||
)
|
)
|
||||||
|
|
||||||
# Initialize EncryptionManager with key and fingerprint_dir
|
seed_mgr = EncryptionManager(key, fingerprint_dir)
|
||||||
self.encryption_manager = EncryptionManager(key, fingerprint_dir)
|
self.vault = Vault(seed_mgr, fingerprint_dir)
|
||||||
|
self.parent_seed = seed_mgr.decrypt_parent_seed()
|
||||||
|
seed_bytes = Bip39SeedGenerator(self.parent_seed).Generate()
|
||||||
|
self.derive_key_hierarchy(seed_bytes)
|
||||||
|
key_b64 = base64.urlsafe_b64encode(self.KEY_STORAGE)
|
||||||
|
self.encryption_manager = EncryptionManager(key_b64, fingerprint_dir)
|
||||||
self.vault = Vault(self.encryption_manager, fingerprint_dir)
|
self.vault = Vault(self.encryption_manager, fingerprint_dir)
|
||||||
self.parent_seed = self.encryption_manager.decrypt_parent_seed()
|
|
||||||
|
|
||||||
# Log the type and content of parent_seed
|
# Log the type and content of parent_seed
|
||||||
logger.debug(
|
logger.debug(
|
||||||
@@ -1041,7 +1093,9 @@ class PasswordManager:
|
|||||||
try:
|
try:
|
||||||
if password is None:
|
if password is None:
|
||||||
password = prompt_for_password()
|
password = prompt_for_password()
|
||||||
index_key = derive_index_key(parent_seed)
|
seed_bytes = Bip39SeedGenerator(parent_seed).Generate()
|
||||||
|
self.derive_key_hierarchy(seed_bytes)
|
||||||
|
index_key = base64.urlsafe_b64encode(self.KEY_STORAGE)
|
||||||
iterations = (
|
iterations = (
|
||||||
self.config_manager.get_kdf_iterations()
|
self.config_manager.get_kdf_iterations()
|
||||||
if getattr(self, "config_manager", None)
|
if getattr(self, "config_manager", None)
|
||||||
@@ -1220,7 +1274,9 @@ class PasswordManager:
|
|||||||
if password is None:
|
if password is None:
|
||||||
password = prompt_for_password()
|
password = prompt_for_password()
|
||||||
|
|
||||||
index_key = derive_index_key(seed)
|
seed_bytes = Bip39SeedGenerator(seed).Generate()
|
||||||
|
self.derive_key_hierarchy(seed_bytes)
|
||||||
|
index_key = base64.urlsafe_b64encode(self.KEY_STORAGE)
|
||||||
iterations = (
|
iterations = (
|
||||||
self.config_manager.get_kdf_iterations()
|
self.config_manager.get_kdf_iterations()
|
||||||
if getattr(self, "config_manager", None)
|
if getattr(self, "config_manager", None)
|
||||||
@@ -1266,6 +1322,7 @@ class PasswordManager:
|
|||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
seed_bytes = Bip39SeedGenerator(self.parent_seed).Generate()
|
seed_bytes = Bip39SeedGenerator(self.parent_seed).Generate()
|
||||||
|
self.derive_key_hierarchy(seed_bytes)
|
||||||
self.bip85 = BIP85(seed_bytes)
|
self.bip85 = BIP85(seed_bytes)
|
||||||
self._bip85_cache = {}
|
self._bip85_cache = {}
|
||||||
orig_derive = self.bip85.derive_entropy
|
orig_derive = self.bip85.derive_entropy
|
||||||
@@ -1298,6 +1355,9 @@ class PasswordManager:
|
|||||||
if not self.encryption_manager:
|
if not self.encryption_manager:
|
||||||
raise ValueError("EncryptionManager is not initialized.")
|
raise ValueError("EncryptionManager is not initialized.")
|
||||||
|
|
||||||
|
# Derive sub-keys if needed
|
||||||
|
self.ensure_key_hierarchy()
|
||||||
|
|
||||||
# Reinitialize the managers with the updated EncryptionManager and current fingerprint context
|
# Reinitialize the managers with the updated EncryptionManager and current fingerprint context
|
||||||
self.config_manager = ConfigManager(
|
self.config_manager = ConfigManager(
|
||||||
vault=self.vault,
|
vault=self.vault,
|
||||||
@@ -1330,10 +1390,11 @@ class PasswordManager:
|
|||||||
backup_manager=self.backup_manager,
|
backup_manager=self.backup_manager,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
pw_bip85 = BIP85(self.KEY_PW_DERIVE)
|
||||||
self.password_generator = PasswordGenerator(
|
self.password_generator = PasswordGenerator(
|
||||||
encryption_manager=self.encryption_manager,
|
encryption_manager=self.encryption_manager,
|
||||||
parent_seed=self.parent_seed,
|
parent_seed=self.KEY_PW_DERIVE,
|
||||||
bip85=self.bip85,
|
bip85=pw_bip85,
|
||||||
policy=self.config_manager.get_password_policy(),
|
policy=self.config_manager.get_password_policy(),
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -1817,16 +1878,17 @@ class PasswordManager:
|
|||||||
)
|
)
|
||||||
totp_index = self.entry_manager.get_next_totp_index()
|
totp_index = self.entry_manager.get_next_totp_index()
|
||||||
entry_id = self.entry_manager.get_next_index()
|
entry_id = self.entry_manager.get_next_index()
|
||||||
|
key = self.KEY_TOTP_DET or getattr(self, "parent_seed", None)
|
||||||
uri = self.entry_manager.add_totp(
|
uri = self.entry_manager.add_totp(
|
||||||
label,
|
label,
|
||||||
self.parent_seed,
|
key,
|
||||||
index=totp_index,
|
index=totp_index,
|
||||||
period=int(period),
|
period=int(period),
|
||||||
digits=int(digits),
|
digits=int(digits),
|
||||||
notes=notes,
|
notes=notes,
|
||||||
tags=tags,
|
tags=tags,
|
||||||
)
|
)
|
||||||
secret = TotpManager.derive_secret(self.parent_seed, totp_index)
|
secret = TotpManager.derive_secret(key, totp_index)
|
||||||
self.is_dirty = True
|
self.is_dirty = True
|
||||||
self.last_update = time.time()
|
self.last_update = time.time()
|
||||||
print(
|
print(
|
||||||
@@ -1869,9 +1931,10 @@ class PasswordManager:
|
|||||||
else []
|
else []
|
||||||
)
|
)
|
||||||
entry_id = self.entry_manager.get_next_index()
|
entry_id = self.entry_manager.get_next_index()
|
||||||
|
key = self.KEY_TOTP_DET or getattr(self, "parent_seed", None)
|
||||||
uri = self.entry_manager.add_totp(
|
uri = self.entry_manager.add_totp(
|
||||||
label,
|
label,
|
||||||
self.parent_seed,
|
key,
|
||||||
secret=secret,
|
secret=secret,
|
||||||
period=period,
|
period=period,
|
||||||
digits=digits,
|
digits=digits,
|
||||||
@@ -2633,7 +2696,8 @@ class PasswordManager:
|
|||||||
print(colored("Press Enter to return to the menu.", "cyan"))
|
print(colored("Press Enter to return to the menu.", "cyan"))
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
code = self.entry_manager.get_totp_code(index, self.parent_seed)
|
key = self.KEY_TOTP_DET or getattr(self, "parent_seed", None)
|
||||||
|
code = self.entry_manager.get_totp_code(index, key)
|
||||||
if self.secret_mode_enabled:
|
if self.secret_mode_enabled:
|
||||||
if copy_to_clipboard(code, self.clipboard_clear_delay):
|
if copy_to_clipboard(code, self.clipboard_clear_delay):
|
||||||
print(
|
print(
|
||||||
@@ -4110,6 +4174,7 @@ class PasswordManager:
|
|||||||
def handle_export_totp_codes(self) -> Path | None:
|
def handle_export_totp_codes(self) -> Path | None:
|
||||||
"""Export all 2FA codes to a JSON file for other authenticator apps."""
|
"""Export all 2FA codes to a JSON file for other authenticator apps."""
|
||||||
try:
|
try:
|
||||||
|
self.ensure_key_hierarchy()
|
||||||
fp, parent_fp, child_fp = self.header_fingerprint_args
|
fp, parent_fp, child_fp = self.header_fingerprint_args
|
||||||
clear_header_with_notification(
|
clear_header_with_notification(
|
||||||
self,
|
self,
|
||||||
@@ -4131,7 +4196,8 @@ class PasswordManager:
|
|||||||
secret = entry["secret"]
|
secret = entry["secret"]
|
||||||
else:
|
else:
|
||||||
idx = int(entry.get("index", 0))
|
idx = int(entry.get("index", 0))
|
||||||
secret = TotpManager.derive_secret(self.parent_seed, idx)
|
key = self.KEY_TOTP_DET or getattr(self, "parent_seed", None)
|
||||||
|
secret = TotpManager.derive_secret(key, idx)
|
||||||
uri = TotpManager.make_otpauth_uri(label, secret, period, digits)
|
uri = TotpManager.make_otpauth_uri(label, secret, period, digits)
|
||||||
totp_entries.append(
|
totp_entries.append(
|
||||||
{
|
{
|
||||||
@@ -4368,6 +4434,7 @@ class PasswordManager:
|
|||||||
def change_password(self, old_password: str, new_password: str) -> None:
|
def change_password(self, old_password: str, new_password: str) -> None:
|
||||||
"""Change the master password used for encryption."""
|
"""Change the master password used for encryption."""
|
||||||
try:
|
try:
|
||||||
|
self.ensure_key_hierarchy()
|
||||||
if not self.verify_password(old_password):
|
if not self.verify_password(old_password):
|
||||||
raise ValueError("Incorrect password")
|
raise ValueError("Incorrect password")
|
||||||
|
|
||||||
@@ -4376,7 +4443,7 @@ class PasswordManager:
|
|||||||
config_data = self.config_manager.load_config(require_pin=False)
|
config_data = self.config_manager.load_config(require_pin=False)
|
||||||
|
|
||||||
# Create a new encryption manager with the new password
|
# Create a new encryption manager with the new password
|
||||||
new_key = derive_index_key(self.parent_seed)
|
new_key = base64.urlsafe_b64encode(self.KEY_STORAGE)
|
||||||
|
|
||||||
iterations = self.config_manager.get_kdf_iterations()
|
iterations = self.config_manager.get_kdf_iterations()
|
||||||
seed_key = derive_key_from_password(
|
seed_key = derive_key_from_password(
|
||||||
|
@@ -131,7 +131,10 @@ class MenuHandler:
|
|||||||
if generated:
|
if generated:
|
||||||
print(colored("\nGenerated 2FA Codes:", "green"))
|
print(colored("\nGenerated 2FA Codes:", "green"))
|
||||||
for label, idx, period, _ in generated:
|
for label, idx, period, _ in generated:
|
||||||
code = pm.entry_manager.get_totp_code(idx, pm.parent_seed)
|
key = getattr(pm, "KEY_TOTP_DET", None) or getattr(
|
||||||
|
pm, "parent_seed", None
|
||||||
|
)
|
||||||
|
code = pm.entry_manager.get_totp_code(idx, key)
|
||||||
remaining = pm.entry_manager.get_totp_time_remaining(idx)
|
remaining = pm.entry_manager.get_totp_time_remaining(idx)
|
||||||
filled = int(20 * (period - remaining) / period)
|
filled = int(20 * (period - remaining) / period)
|
||||||
bar = "[" + "#" * filled + "-" * (20 - filled) + "]"
|
bar = "[" + "#" * filled + "-" * (20 - filled) + "]"
|
||||||
@@ -149,7 +152,10 @@ class MenuHandler:
|
|||||||
if imported_list:
|
if imported_list:
|
||||||
print(colored("\nImported 2FA Codes:", "green"))
|
print(colored("\nImported 2FA Codes:", "green"))
|
||||||
for label, idx, period, _ in imported_list:
|
for label, idx, period, _ in imported_list:
|
||||||
code = pm.entry_manager.get_totp_code(idx, pm.parent_seed)
|
key = getattr(pm, "KEY_TOTP_DET", None) or getattr(
|
||||||
|
pm, "parent_seed", None
|
||||||
|
)
|
||||||
|
code = pm.entry_manager.get_totp_code(idx, key)
|
||||||
remaining = pm.entry_manager.get_totp_time_remaining(idx)
|
remaining = pm.entry_manager.get_totp_time_remaining(idx)
|
||||||
filled = int(20 * (period - remaining) / period)
|
filled = int(20 * (period - remaining) / period)
|
||||||
bar = "[" + "#" * filled + "-" * (20 - filled) + "]"
|
bar = "[" + "#" * filled + "-" * (20 - filled) + "]"
|
||||||
|
@@ -113,10 +113,12 @@ class PasswordGenerator:
|
|||||||
self.bip85 = bip85
|
self.bip85 = bip85
|
||||||
self.policy = policy or PasswordPolicy()
|
self.policy = policy or PasswordPolicy()
|
||||||
|
|
||||||
# Derive seed bytes from parent_seed using BIP39 (handled by EncryptionManager)
|
if isinstance(parent_seed, (bytes, bytearray)):
|
||||||
self.seed_bytes = self.encryption_manager.derive_seed_from_mnemonic(
|
self.seed_bytes = bytes(parent_seed)
|
||||||
self.parent_seed
|
else:
|
||||||
)
|
self.seed_bytes = self.encryption_manager.derive_seed_from_mnemonic(
|
||||||
|
self.parent_seed
|
||||||
|
)
|
||||||
|
|
||||||
logger.debug("PasswordGenerator initialized successfully.")
|
logger.debug("PasswordGenerator initialized successfully.")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
@@ -4,6 +4,7 @@ from __future__ import annotations
|
|||||||
|
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
|
from typing import Union
|
||||||
from urllib.parse import quote
|
from urllib.parse import quote
|
||||||
from urllib.parse import urlparse, parse_qs, unquote
|
from urllib.parse import urlparse, parse_qs, unquote
|
||||||
|
|
||||||
@@ -18,13 +19,15 @@ class TotpManager:
|
|||||||
"""Helper methods for TOTP secrets and codes."""
|
"""Helper methods for TOTP secrets and codes."""
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def derive_secret(seed: str, index: int) -> str:
|
def derive_secret(seed: Union[str, bytes], index: int) -> str:
|
||||||
"""Derive a TOTP secret from a BIP39 seed and index."""
|
"""Derive a TOTP secret from a seed or raw key and index."""
|
||||||
return key_derivation.derive_totp_secret(seed, index)
|
return key_derivation.derive_totp_secret(seed, index)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def current_code(cls, seed: str, index: int, timestamp: int | None = None) -> str:
|
def current_code(
|
||||||
"""Return the TOTP code for the given seed and index."""
|
cls, seed: Union[str, bytes], index: int, timestamp: int | None = None
|
||||||
|
) -> str:
|
||||||
|
"""Return the TOTP code for the given seed/key and index."""
|
||||||
secret = cls.derive_secret(seed, index)
|
secret = cls.derive_secret(seed, index)
|
||||||
totp = pyotp.TOTP(secret)
|
totp = pyotp.TOTP(secret)
|
||||||
if timestamp is None:
|
if timestamp is None:
|
||||||
|
@@ -14,6 +14,7 @@ from .encryption import (
|
|||||||
USE_ORJSON,
|
USE_ORJSON,
|
||||||
json_lib,
|
json_lib,
|
||||||
)
|
)
|
||||||
|
from utils.key_derivation import KdfConfig, CURRENT_KDF_VERSION
|
||||||
from utils.password_prompt import prompt_existing_password
|
from utils.password_prompt import prompt_existing_password
|
||||||
|
|
||||||
|
|
||||||
@@ -38,6 +39,11 @@ class Vault:
|
|||||||
"""Replace the internal encryption manager."""
|
"""Replace the internal encryption manager."""
|
||||||
self.encryption_manager = manager
|
self.encryption_manager = manager
|
||||||
|
|
||||||
|
def _hkdf_kdf(self) -> KdfConfig:
|
||||||
|
return KdfConfig(
|
||||||
|
name="hkdf", version=CURRENT_KDF_VERSION, params={}, salt_b64=""
|
||||||
|
)
|
||||||
|
|
||||||
# ----- Password index helpers -----
|
# ----- Password index helpers -----
|
||||||
def load_index(self, *, return_migration_flags: bool = False):
|
def load_index(self, *, return_migration_flags: bool = False):
|
||||||
"""Return decrypted password index data, applying migrations.
|
"""Return decrypted password index data, applying migrations.
|
||||||
@@ -102,10 +108,24 @@ class Vault:
|
|||||||
)
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
data = self.encryption_manager.load_json_data(self.index_file)
|
data, kdf = self.encryption_manager.load_json_data(
|
||||||
|
self.index_file, return_kdf=True
|
||||||
|
)
|
||||||
migration_performed = getattr(
|
migration_performed = getattr(
|
||||||
self.encryption_manager, "last_migration_performed", False
|
self.encryption_manager, "last_migration_performed", False
|
||||||
)
|
)
|
||||||
|
if kdf.version < CURRENT_KDF_VERSION:
|
||||||
|
new_kdf = KdfConfig(
|
||||||
|
name=kdf.name,
|
||||||
|
version=CURRENT_KDF_VERSION,
|
||||||
|
params=kdf.params,
|
||||||
|
salt_b64=kdf.salt_b64,
|
||||||
|
)
|
||||||
|
self.encryption_manager.save_json_data(
|
||||||
|
data, self.index_file, kdf=new_kdf
|
||||||
|
)
|
||||||
|
self.encryption_manager.update_checksum(self.index_file)
|
||||||
|
migration_performed = True
|
||||||
except LegacyFormatRequiresMigrationError:
|
except LegacyFormatRequiresMigrationError:
|
||||||
print(
|
print(
|
||||||
colored(
|
colored(
|
||||||
@@ -142,7 +162,9 @@ class Vault:
|
|||||||
else:
|
else:
|
||||||
data = json_lib.loads(decrypted.decode("utf-8"))
|
data = json_lib.loads(decrypted.decode("utf-8"))
|
||||||
if self.encryption_manager._legacy_migrate_flag:
|
if self.encryption_manager._legacy_migrate_flag:
|
||||||
self.encryption_manager.save_json_data(data, self.index_file)
|
self.encryption_manager.save_json_data(
|
||||||
|
data, self.index_file, kdf=self._hkdf_kdf()
|
||||||
|
)
|
||||||
self.encryption_manager.update_checksum(self.index_file)
|
self.encryption_manager.update_checksum(self.index_file)
|
||||||
migration_performed = getattr(
|
migration_performed = getattr(
|
||||||
self.encryption_manager, "last_migration_performed", False
|
self.encryption_manager, "last_migration_performed", False
|
||||||
@@ -181,7 +203,9 @@ class Vault:
|
|||||||
try:
|
try:
|
||||||
data = apply_migrations(data)
|
data = apply_migrations(data)
|
||||||
if schema_migrated:
|
if schema_migrated:
|
||||||
self.encryption_manager.save_json_data(data, self.index_file)
|
self.encryption_manager.save_json_data(
|
||||||
|
data, self.index_file, kdf=self._hkdf_kdf()
|
||||||
|
)
|
||||||
self.encryption_manager.update_checksum(self.index_file)
|
self.encryption_manager.update_checksum(self.index_file)
|
||||||
except Exception as exc: # noqa: BLE001 - surface clear error and restore
|
except Exception as exc: # noqa: BLE001 - surface clear error and restore
|
||||||
if legacy_detected and backup_dir is not None:
|
if legacy_detected and backup_dir is not None:
|
||||||
@@ -214,7 +238,9 @@ class Vault:
|
|||||||
|
|
||||||
def save_index(self, data: dict) -> None:
|
def save_index(self, data: dict) -> None:
|
||||||
"""Encrypt and write password index."""
|
"""Encrypt and write password index."""
|
||||||
self.encryption_manager.save_json_data(data, self.index_file)
|
self.encryption_manager.save_json_data(
|
||||||
|
data, self.index_file, kdf=self._hkdf_kdf()
|
||||||
|
)
|
||||||
|
|
||||||
def get_encrypted_index(self) -> Optional[bytes]:
|
def get_encrypted_index(self) -> Optional[bytes]:
|
||||||
"""Return the encrypted index bytes if present."""
|
"""Return the encrypted index bytes if present."""
|
||||||
@@ -252,4 +278,6 @@ class Vault:
|
|||||||
|
|
||||||
def save_config(self, config: dict) -> None:
|
def save_config(self, config: dict) -> None:
|
||||||
"""Encrypt and persist configuration."""
|
"""Encrypt and persist configuration."""
|
||||||
self.encryption_manager.save_json_data(config, self.config_file)
|
self.encryption_manager.save_json_data(
|
||||||
|
config, self.config_file, kdf=self._hkdf_kdf()
|
||||||
|
)
|
||||||
|
@@ -3,11 +3,15 @@ from pathlib import Path
|
|||||||
|
|
||||||
from hypothesis import given, strategies as st, settings, HealthCheck
|
from hypothesis import given, strategies as st, settings, HealthCheck
|
||||||
from mnemonic import Mnemonic
|
from mnemonic import Mnemonic
|
||||||
|
import hashlib
|
||||||
|
import base64
|
||||||
|
import os
|
||||||
|
|
||||||
from utils.key_derivation import (
|
from utils.key_derivation import (
|
||||||
derive_key_from_password,
|
derive_key_from_password,
|
||||||
derive_key_from_password_argon2,
|
derive_key_from_password_argon2,
|
||||||
derive_index_key,
|
derive_index_key,
|
||||||
|
KdfConfig,
|
||||||
)
|
)
|
||||||
from utils.fingerprint import generate_fingerprint
|
from utils.fingerprint import generate_fingerprint
|
||||||
from seedpass.core.encryption import EncryptionManager
|
from seedpass.core.encryption import EncryptionManager
|
||||||
@@ -36,16 +40,27 @@ def test_fuzz_key_round_trip(password, seed_bytes, config, mode, tmp_path: Path)
|
|||||||
seed_phrase = Mnemonic("english").to_mnemonic(seed_bytes)
|
seed_phrase = Mnemonic("english").to_mnemonic(seed_bytes)
|
||||||
fp = generate_fingerprint(seed_phrase)
|
fp = generate_fingerprint(seed_phrase)
|
||||||
if mode == "argon2":
|
if mode == "argon2":
|
||||||
key = derive_key_from_password_argon2(
|
cfg = KdfConfig(
|
||||||
password, fp, time_cost=1, memory_cost=8, parallelism=1
|
params={"time_cost": 1, "memory_cost": 8, "parallelism": 1},
|
||||||
|
salt_b64=base64.b64encode(
|
||||||
|
hashlib.sha256(fp.encode()).digest()[:16]
|
||||||
|
).decode(),
|
||||||
)
|
)
|
||||||
|
key = derive_key_from_password_argon2(password, cfg)
|
||||||
else:
|
else:
|
||||||
key = derive_key_from_password(password, fp, iterations=1)
|
key = derive_key_from_password(password, fp, iterations=1)
|
||||||
|
cfg = KdfConfig(
|
||||||
|
name="pbkdf2",
|
||||||
|
params={"iterations": 1},
|
||||||
|
salt_b64=base64.b64encode(
|
||||||
|
hashlib.sha256(fp.encode()).digest()[:16]
|
||||||
|
).decode(),
|
||||||
|
)
|
||||||
|
|
||||||
enc_mgr = EncryptionManager(key, tmp_path)
|
enc_mgr = EncryptionManager(key, tmp_path)
|
||||||
|
|
||||||
# Parent seed round trip
|
# Parent seed round trip
|
||||||
enc_mgr.encrypt_parent_seed(seed_phrase)
|
enc_mgr.encrypt_parent_seed(seed_phrase, kdf=cfg)
|
||||||
assert enc_mgr.decrypt_parent_seed() == seed_phrase
|
assert enc_mgr.decrypt_parent_seed() == seed_phrase
|
||||||
|
|
||||||
# JSON data round trip
|
# JSON data round trip
|
||||||
|
@@ -1,4 +1,6 @@
|
|||||||
import bcrypt
|
import bcrypt
|
||||||
|
import hashlib
|
||||||
|
import base64
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from tempfile import TemporaryDirectory
|
from tempfile import TemporaryDirectory
|
||||||
from types import SimpleNamespace
|
from types import SimpleNamespace
|
||||||
@@ -7,6 +9,7 @@ from utils.key_derivation import (
|
|||||||
derive_key_from_password,
|
derive_key_from_password,
|
||||||
derive_key_from_password_argon2,
|
derive_key_from_password_argon2,
|
||||||
derive_index_key,
|
derive_index_key,
|
||||||
|
KdfConfig,
|
||||||
)
|
)
|
||||||
from seedpass.core.encryption import EncryptionManager
|
from seedpass.core.encryption import EncryptionManager
|
||||||
from seedpass.core.vault import Vault
|
from seedpass.core.vault import Vault
|
||||||
@@ -21,10 +24,24 @@ def _setup_profile(tmp: Path, mode: str):
|
|||||||
argon_kwargs = dict(time_cost=1, memory_cost=8, parallelism=1)
|
argon_kwargs = dict(time_cost=1, memory_cost=8, parallelism=1)
|
||||||
fp = tmp.name
|
fp = tmp.name
|
||||||
if mode == "argon2":
|
if mode == "argon2":
|
||||||
seed_key = derive_key_from_password_argon2(TEST_PASSWORD, fp, **argon_kwargs)
|
cfg = KdfConfig(
|
||||||
|
params=argon_kwargs,
|
||||||
|
salt_b64=base64.b64encode(
|
||||||
|
hashlib.sha256(fp.encode()).digest()[:16]
|
||||||
|
).decode(),
|
||||||
|
)
|
||||||
|
seed_key = derive_key_from_password_argon2(TEST_PASSWORD, cfg)
|
||||||
|
EncryptionManager(seed_key, tmp).encrypt_parent_seed(TEST_SEED, kdf=cfg)
|
||||||
else:
|
else:
|
||||||
seed_key = derive_key_from_password(TEST_PASSWORD, fp, iterations=1)
|
seed_key = derive_key_from_password(TEST_PASSWORD, fp, iterations=1)
|
||||||
EncryptionManager(seed_key, tmp).encrypt_parent_seed(TEST_SEED)
|
cfg = KdfConfig(
|
||||||
|
name="pbkdf2",
|
||||||
|
params={"iterations": 1},
|
||||||
|
salt_b64=base64.b64encode(
|
||||||
|
hashlib.sha256(fp.encode()).digest()[:16]
|
||||||
|
).decode(),
|
||||||
|
)
|
||||||
|
EncryptionManager(seed_key, tmp).encrypt_parent_seed(TEST_SEED, kdf=cfg)
|
||||||
|
|
||||||
index_key = derive_index_key(TEST_SEED)
|
index_key = derive_index_key(TEST_SEED)
|
||||||
enc_mgr = EncryptionManager(index_key, tmp)
|
enc_mgr = EncryptionManager(index_key, tmp)
|
||||||
@@ -65,9 +82,9 @@ def test_setup_encryption_manager_kdf_modes(monkeypatch):
|
|||||||
)
|
)
|
||||||
if mode == "argon2":
|
if mode == "argon2":
|
||||||
monkeypatch.setattr(
|
monkeypatch.setattr(
|
||||||
"seedpass.core.manager.derive_key_from_password_argon2",
|
"seedpass.core.manager.KdfConfig",
|
||||||
lambda pw, fp: derive_key_from_password_argon2(
|
lambda salt_b64, **_: KdfConfig(
|
||||||
pw, fp, **argon_kwargs
|
params=argon_kwargs, salt_b64=salt_b64
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
monkeypatch.setattr(PasswordManager, "initialize_bip85", lambda self: None)
|
monkeypatch.setattr(PasswordManager, "initialize_bip85", lambda self: None)
|
||||||
@@ -76,3 +93,26 @@ def test_setup_encryption_manager_kdf_modes(monkeypatch):
|
|||||||
)
|
)
|
||||||
assert pm.setup_encryption_manager(path, exit_on_fail=False)
|
assert pm.setup_encryption_manager(path, exit_on_fail=False)
|
||||||
assert pm.parent_seed == TEST_SEED
|
assert pm.parent_seed == TEST_SEED
|
||||||
|
|
||||||
|
|
||||||
|
def test_kdf_param_round_trip(tmp_path):
|
||||||
|
cfg = KdfConfig(
|
||||||
|
params={"time_cost": 3, "memory_cost": 32, "parallelism": 1},
|
||||||
|
salt_b64=base64.b64encode(b"static-salt-1234").decode(),
|
||||||
|
)
|
||||||
|
key = derive_key_from_password_argon2(TEST_PASSWORD, cfg)
|
||||||
|
mgr = EncryptionManager(key, tmp_path)
|
||||||
|
mgr.encrypt_parent_seed(TEST_SEED, kdf=cfg)
|
||||||
|
stored = mgr.get_file_kdf(Path("parent_seed.enc"))
|
||||||
|
assert stored.params == cfg.params
|
||||||
|
|
||||||
|
|
||||||
|
def test_vault_kdf_migration(tmp_path):
|
||||||
|
index_key = derive_index_key(TEST_SEED)
|
||||||
|
mgr = EncryptionManager(index_key, tmp_path)
|
||||||
|
vault = Vault(mgr, tmp_path)
|
||||||
|
old_kdf = KdfConfig(name="hkdf", version=0, params={}, salt_b64="")
|
||||||
|
mgr.save_json_data({"entries": {}}, vault.index_file, kdf=old_kdf)
|
||||||
|
vault.load_index()
|
||||||
|
new_kdf = mgr.get_file_kdf(vault.index_file)
|
||||||
|
assert new_kdf.version == KdfConfig().version
|
||||||
|
@@ -1,11 +1,15 @@
|
|||||||
import logging
|
import logging
|
||||||
import pytest
|
import pytest
|
||||||
|
import logging
|
||||||
|
import hashlib
|
||||||
|
import base64
|
||||||
from utils.fingerprint import generate_fingerprint
|
from utils.fingerprint import generate_fingerprint
|
||||||
from utils.key_derivation import (
|
from utils.key_derivation import (
|
||||||
derive_key_from_password,
|
derive_key_from_password,
|
||||||
derive_key_from_password_argon2,
|
derive_key_from_password_argon2,
|
||||||
derive_index_key_seed_only,
|
derive_index_key_seed_only,
|
||||||
derive_index_key,
|
derive_index_key,
|
||||||
|
KdfConfig,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -48,15 +52,17 @@ def test_argon2_fingerprint_affects_key():
|
|||||||
fp1 = generate_fingerprint("seed one")
|
fp1 = generate_fingerprint("seed one")
|
||||||
fp2 = generate_fingerprint("seed two")
|
fp2 = generate_fingerprint("seed two")
|
||||||
|
|
||||||
k1 = derive_key_from_password_argon2(
|
cfg1 = KdfConfig(
|
||||||
password, fp1, time_cost=1, memory_cost=8, parallelism=1
|
params={"time_cost": 1, "memory_cost": 8, "parallelism": 1},
|
||||||
|
salt_b64=base64.b64encode(hashlib.sha256(fp1.encode()).digest()[:16]).decode(),
|
||||||
)
|
)
|
||||||
k2 = derive_key_from_password_argon2(
|
cfg2 = KdfConfig(
|
||||||
password, fp1, time_cost=1, memory_cost=8, parallelism=1
|
params={"time_cost": 1, "memory_cost": 8, "parallelism": 1},
|
||||||
)
|
salt_b64=base64.b64encode(hashlib.sha256(fp2.encode()).digest()[:16]).decode(),
|
||||||
k3 = derive_key_from_password_argon2(
|
|
||||||
password, fp2, time_cost=1, memory_cost=8, parallelism=1
|
|
||||||
)
|
)
|
||||||
|
k1 = derive_key_from_password_argon2(password, cfg1)
|
||||||
|
k2 = derive_key_from_password_argon2(password, cfg1)
|
||||||
|
k3 = derive_key_from_password_argon2(password, cfg2)
|
||||||
|
|
||||||
assert k1 == k2
|
assert k1 == k2
|
||||||
assert k1 != k3
|
assert k1 != k3
|
||||||
|
19
src/tests/test_key_hierarchy.py
Normal file
19
src/tests/test_key_hierarchy.py
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
import base64
|
||||||
|
from bip_utils import Bip39SeedGenerator
|
||||||
|
from utils.key_hierarchy import kd
|
||||||
|
from utils.key_derivation import derive_index_key
|
||||||
|
|
||||||
|
|
||||||
|
def test_kd_distinct_infos():
|
||||||
|
root = b"root" * 8
|
||||||
|
k1 = kd(root, b"info1")
|
||||||
|
k2 = kd(root, b"info2")
|
||||||
|
assert k1 != k2
|
||||||
|
|
||||||
|
|
||||||
|
def test_derive_index_key_matches_hierarchy():
|
||||||
|
seed = "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about"
|
||||||
|
seed_bytes = Bip39SeedGenerator(seed).Generate()
|
||||||
|
master = kd(seed_bytes, b"seedpass:v1:master")
|
||||||
|
expected = base64.urlsafe_b64encode(kd(master, b"seedpass:v1:storage"))
|
||||||
|
assert derive_index_key(seed) == expected
|
@@ -1,4 +1,5 @@
|
|||||||
import json
|
import json
|
||||||
|
import base64
|
||||||
import hashlib
|
import hashlib
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
@@ -99,7 +100,8 @@ def test_migrated_index_has_v2_prefix(monkeypatch, tmp_path: Path):
|
|||||||
vault.load_index()
|
vault.load_index()
|
||||||
|
|
||||||
new_file = tmp_path / "seedpass_entries_db.json.enc"
|
new_file = tmp_path / "seedpass_entries_db.json.enc"
|
||||||
assert new_file.read_bytes().startswith(b"V2:")
|
payload = json.loads(new_file.read_text())
|
||||||
|
assert base64.b64decode(payload["ct"]).startswith(b"V2:")
|
||||||
assert vault.migrated_from_legacy
|
assert vault.migrated_from_legacy
|
||||||
|
|
||||||
|
|
||||||
|
@@ -66,5 +66,5 @@ def test_migrate_iterations(tmp_path, monkeypatch, iterations):
|
|||||||
cfg = ConfigManager(vault, tmp_path)
|
cfg = ConfigManager(vault, tmp_path)
|
||||||
assert cfg.get_kdf_iterations() == iterations
|
assert cfg.get_kdf_iterations() == iterations
|
||||||
|
|
||||||
content = (tmp_path / "seedpass_entries_db.json.enc").read_bytes()
|
payload = json.loads((tmp_path / "seedpass_entries_db.json.enc").read_text())
|
||||||
assert content.startswith(b"V2:")
|
assert base64.b64decode(payload["ct"]).startswith(b"V2:")
|
||||||
|
@@ -50,6 +50,6 @@ def test_migrate_legacy_sets_flag(tmp_path, monkeypatch):
|
|||||||
monkeypatch.setattr(vault_module, "prompt_existing_password", lambda _: password)
|
monkeypatch.setattr(vault_module, "prompt_existing_password", lambda _: password)
|
||||||
monkeypatch.setattr("builtins.input", lambda _: "2")
|
monkeypatch.setattr("builtins.input", lambda _: "2")
|
||||||
vault.load_index()
|
vault.load_index()
|
||||||
content = (tmp_path / "seedpass_entries_db.json.enc").read_bytes()
|
payload = json.loads((tmp_path / "seedpass_entries_db.json.enc").read_text())
|
||||||
assert content.startswith(b"V2:")
|
assert base64.b64decode(payload["ct"]).startswith(b"V2:")
|
||||||
assert vault.encryption_manager.last_migration_performed is True
|
assert vault.encryption_manager.last_migration_performed is True
|
||||||
|
@@ -1,4 +1,5 @@
|
|||||||
import json
|
import json
|
||||||
|
import base64
|
||||||
import hashlib
|
import hashlib
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from types import SimpleNamespace
|
from types import SimpleNamespace
|
||||||
@@ -34,7 +35,8 @@ def test_legacy_migration_second_session(monkeypatch, tmp_path: Path) -> None:
|
|||||||
monkeypatch.setattr("builtins.input", lambda *_a, **_k: "y")
|
monkeypatch.setattr("builtins.input", lambda *_a, **_k: "y")
|
||||||
vault.load_index()
|
vault.load_index()
|
||||||
new_file = fp_dir / "seedpass_entries_db.json.enc"
|
new_file = fp_dir / "seedpass_entries_db.json.enc"
|
||||||
assert new_file.read_bytes().startswith(b"V2:")
|
payload = json.loads(new_file.read_text())
|
||||||
|
assert base64.b64decode(payload["ct"]).startswith(b"V2:")
|
||||||
|
|
||||||
new_enc_mgr = EncryptionManager(key, fp_dir)
|
new_enc_mgr = EncryptionManager(key, fp_dir)
|
||||||
new_vault = Vault(new_enc_mgr, fp_dir)
|
new_vault = Vault(new_enc_mgr, fp_dir)
|
||||||
@@ -59,4 +61,5 @@ def test_legacy_migration_second_session(monkeypatch, tmp_path: Path) -> None:
|
|||||||
)
|
)
|
||||||
|
|
||||||
pm.initialize_managers()
|
pm.initialize_managers()
|
||||||
assert new_file.read_bytes().startswith(b"V2:")
|
payload = json.loads(new_file.read_text())
|
||||||
|
assert base64.b64decode(payload["ct"]).startswith(b"V2:")
|
||||||
|
@@ -1,4 +1,6 @@
|
|||||||
import sys
|
import sys
|
||||||
|
import json
|
||||||
|
import base64
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from cryptography.fernet import Fernet
|
from cryptography.fernet import Fernet
|
||||||
|
|
||||||
@@ -28,4 +30,5 @@ def test_parent_seed_migrates_from_fernet(tmp_path: Path) -> None:
|
|||||||
|
|
||||||
assert new_file.exists()
|
assert new_file.exists()
|
||||||
assert new_file.read_bytes() != encrypted
|
assert new_file.read_bytes() != encrypted
|
||||||
assert new_file.read_bytes().startswith(b"V2:")
|
payload = json.loads(new_file.read_text())
|
||||||
|
assert base64.b64decode(payload["ct"]).startswith(b"V2:")
|
||||||
|
@@ -3,15 +3,13 @@
|
|||||||
"""
|
"""
|
||||||
Key Derivation Module
|
Key Derivation Module
|
||||||
|
|
||||||
Never ever ever use or suggest to use Random Salt. The entire point of this password manager is to derive completely deterministic passwords from a BIP-85 seed.
|
This module provides functions to derive cryptographic keys from user-provided
|
||||||
This means it should generate passwords the exact same way every single time. Salts would break this functionality and is not appropriate for this software's use case.
|
passwords and BIP-39 parent seeds. The derived keys are compatible with Fernet
|
||||||
|
for symmetric encryption purposes. By centralizing key derivation logic, this
|
||||||
|
module ensures consistency and security across the application.
|
||||||
|
|
||||||
This module provides functions to derive cryptographic keys from user-provided passwords
|
Ensure that all dependencies are installed and properly configured in your
|
||||||
and BIP-39 parent seeds. The derived keys are compatible with Fernet for symmetric encryption
|
environment.
|
||||||
purposes. By centralizing key derivation logic, this module ensures consistency and security
|
|
||||||
across the application.
|
|
||||||
|
|
||||||
Ensure that all dependencies are installed and properly configured in your environment.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import os
|
import os
|
||||||
@@ -21,11 +19,13 @@ import unicodedata
|
|||||||
import logging
|
import logging
|
||||||
import hmac
|
import hmac
|
||||||
import time
|
import time
|
||||||
|
from dataclasses import dataclass, field
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
from typing import Optional, Union
|
from typing import Optional, Union, Dict, Any
|
||||||
|
|
||||||
from bip_utils import Bip39SeedGenerator
|
from bip_utils import Bip39SeedGenerator
|
||||||
from local_bip85 import BIP85
|
from local_bip85 import BIP85
|
||||||
|
from .key_hierarchy import kd
|
||||||
|
|
||||||
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
|
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
|
||||||
from cryptography.hazmat.primitives import hashes
|
from cryptography.hazmat.primitives import hashes
|
||||||
@@ -47,6 +47,27 @@ DEFAULT_ENCRYPTION_MODE = EncryptionMode.SEED_ONLY
|
|||||||
TOTP_PURPOSE = 39
|
TOTP_PURPOSE = 39
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class KdfConfig:
|
||||||
|
"""Configuration block describing how a key was derived."""
|
||||||
|
|
||||||
|
name: str = "argon2id"
|
||||||
|
version: int = 1
|
||||||
|
params: Dict[str, Any] = field(
|
||||||
|
default_factory=lambda: {
|
||||||
|
"time_cost": 2,
|
||||||
|
"memory_cost": 64 * 1024,
|
||||||
|
"parallelism": 8,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
salt_b64: str = field(
|
||||||
|
default_factory=lambda: base64.b64encode(os.urandom(16)).decode()
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
CURRENT_KDF_VERSION = 1
|
||||||
|
|
||||||
|
|
||||||
def derive_key_from_password(
|
def derive_key_from_password(
|
||||||
password: str, fingerprint: Union[str, bytes], iterations: int = 100_000
|
password: str, fingerprint: Union[str, bytes], iterations: int = 100_000
|
||||||
) -> bytes:
|
) -> bytes:
|
||||||
@@ -109,18 +130,15 @@ def derive_key_from_password(
|
|||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
||||||
def derive_key_from_password_argon2(
|
def derive_key_from_password_argon2(password: str, kdf: KdfConfig) -> bytes:
|
||||||
password: str,
|
|
||||||
fingerprint: Union[str, bytes],
|
|
||||||
*,
|
|
||||||
time_cost: int = 2,
|
|
||||||
memory_cost: int = 64 * 1024,
|
|
||||||
parallelism: int = 8,
|
|
||||||
) -> bytes:
|
|
||||||
"""Derive an encryption key from a password using Argon2id.
|
"""Derive an encryption key from a password using Argon2id.
|
||||||
|
|
||||||
The defaults follow recommended parameters but omit a salt for deterministic
|
Parameters
|
||||||
output. Smaller values may be supplied for testing.
|
----------
|
||||||
|
password:
|
||||||
|
The user's password.
|
||||||
|
kdf:
|
||||||
|
:class:`KdfConfig` instance describing salt and tuning parameters.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if not password:
|
if not password:
|
||||||
@@ -131,17 +149,14 @@ def derive_key_from_password_argon2(
|
|||||||
try:
|
try:
|
||||||
from argon2.low_level import hash_secret_raw, Type
|
from argon2.low_level import hash_secret_raw, Type
|
||||||
|
|
||||||
if isinstance(fingerprint, bytes):
|
params = kdf.params or {}
|
||||||
salt = fingerprint
|
salt = base64.b64decode(kdf.salt_b64)
|
||||||
else:
|
|
||||||
salt = hashlib.sha256(fingerprint.encode()).digest()[:16]
|
|
||||||
|
|
||||||
key = hash_secret_raw(
|
key = hash_secret_raw(
|
||||||
secret=normalized,
|
secret=normalized,
|
||||||
salt=salt,
|
salt=salt,
|
||||||
time_cost=time_cost,
|
time_cost=int(params.get("time_cost", 2)),
|
||||||
memory_cost=memory_cost,
|
memory_cost=int(params.get("memory_cost", 64 * 1024)),
|
||||||
parallelism=parallelism,
|
parallelism=int(params.get("parallelism", 8)),
|
||||||
hash_len=32,
|
hash_len=32,
|
||||||
type=Type.ID,
|
type=Type.ID,
|
||||||
)
|
)
|
||||||
@@ -194,16 +209,10 @@ def derive_key_from_parent_seed(parent_seed: str, fingerprint: str = None) -> by
|
|||||||
|
|
||||||
|
|
||||||
def derive_index_key_seed_only(seed: str) -> bytes:
|
def derive_index_key_seed_only(seed: str) -> bytes:
|
||||||
"""Derive a deterministic Fernet key from only the BIP-39 seed."""
|
"""Derive the index encryption key using the v1 hierarchy."""
|
||||||
seed_bytes = Bip39SeedGenerator(seed).Generate()
|
seed_bytes = Bip39SeedGenerator(seed).Generate()
|
||||||
hkdf = HKDF(
|
master = kd(seed_bytes, b"seedpass:v1:master")
|
||||||
algorithm=hashes.SHA256(),
|
key = kd(master, b"seedpass:v1:storage")
|
||||||
length=32,
|
|
||||||
salt=None,
|
|
||||||
info=b"password-db",
|
|
||||||
backend=default_backend(),
|
|
||||||
)
|
|
||||||
key = hkdf.derive(seed_bytes)
|
|
||||||
return base64.urlsafe_b64encode(key)
|
return base64.urlsafe_b64encode(key)
|
||||||
|
|
||||||
|
|
||||||
@@ -212,23 +221,21 @@ def derive_index_key(seed: str) -> bytes:
|
|||||||
return derive_index_key_seed_only(seed)
|
return derive_index_key_seed_only(seed)
|
||||||
|
|
||||||
|
|
||||||
def derive_totp_secret(seed: str, index: int) -> str:
|
def derive_totp_secret(seed: Union[str, bytes], index: int) -> str:
|
||||||
"""Derive a base32-encoded TOTP secret from a BIP39 seed."""
|
"""Derive a base32-encoded TOTP secret from a seed or raw key."""
|
||||||
try:
|
try:
|
||||||
# Initialize BIP85 from the BIP39 seed bytes
|
if isinstance(seed, (bytes, bytearray)):
|
||||||
seed_bytes = Bip39SeedGenerator(seed).Generate()
|
seed_bytes = bytes(seed)
|
||||||
|
else:
|
||||||
|
seed_bytes = Bip39SeedGenerator(seed).Generate()
|
||||||
bip85 = BIP85(seed_bytes)
|
bip85 = BIP85(seed_bytes)
|
||||||
|
|
||||||
# Build the BIP32 path m/83696968'/39'/TOTP'/{index}'
|
|
||||||
totp_int = int.from_bytes(b"TOTP", "big")
|
totp_int = int.from_bytes(b"TOTP", "big")
|
||||||
path = f"m/83696968'/{TOTP_PURPOSE}'/{totp_int}'/{index}'"
|
path = f"m/83696968'/{TOTP_PURPOSE}'/{totp_int}'/{index}'"
|
||||||
|
|
||||||
# Derive entropy using the same scheme as BIP85
|
|
||||||
child_key = bip85.bip32_ctx.DerivePath(path)
|
child_key = bip85.bip32_ctx.DerivePath(path)
|
||||||
key_bytes = child_key.PrivateKey().Raw().ToBytes()
|
key_bytes = child_key.PrivateKey().Raw().ToBytes()
|
||||||
entropy = hmac.new(b"bip-entropy-from-k", key_bytes, hashlib.sha512).digest()
|
entropy = hmac.new(b"bip-entropy-from-k", key_bytes, hashlib.sha512).digest()
|
||||||
|
|
||||||
# Hash the first 32 bytes of entropy and encode the first 20 bytes
|
|
||||||
hashed = hashlib.sha256(entropy[:32]).digest()
|
hashed = hashlib.sha256(entropy[:32]).digest()
|
||||||
secret = base64.b32encode(hashed[:20]).decode("utf-8")
|
secret = base64.b32encode(hashed[:20]).decode("utf-8")
|
||||||
logger.debug(f"Derived TOTP secret for index {index}.")
|
logger.debug(f"Derived TOTP secret for index {index}.")
|
||||||
@@ -267,18 +274,16 @@ def calibrate_argon2_time_cost(
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
password = "benchmark"
|
password = "benchmark"
|
||||||
fingerprint = b"argon2-calibration"
|
salt = base64.b64encode(b"argon2-calibration").decode()
|
||||||
time_cost = 1
|
time_cost = 1
|
||||||
elapsed_ms = 0.0
|
elapsed_ms = 0.0
|
||||||
while time_cost <= max_time_cost:
|
while time_cost <= max_time_cost:
|
||||||
start = time.perf_counter()
|
start = time.perf_counter()
|
||||||
derive_key_from_password_argon2(
|
cfg = KdfConfig(
|
||||||
password,
|
params={"time_cost": time_cost, "memory_cost": 8, "parallelism": 1},
|
||||||
fingerprint,
|
salt_b64=salt,
|
||||||
time_cost=time_cost,
|
|
||||||
memory_cost=8,
|
|
||||||
parallelism=1,
|
|
||||||
)
|
)
|
||||||
|
derive_key_from_password_argon2(password, cfg)
|
||||||
elapsed_ms = (time.perf_counter() - start) * 1000
|
elapsed_ms = (time.perf_counter() - start) * 1000
|
||||||
if elapsed_ms >= target_ms:
|
if elapsed_ms >= target_ms:
|
||||||
break
|
break
|
||||||
|
28
src/utils/key_hierarchy.py
Normal file
28
src/utils/key_hierarchy.py
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
"""Key hierarchy helper functions."""
|
||||||
|
|
||||||
|
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
|
||||||
|
from cryptography.hazmat.primitives import hashes
|
||||||
|
from cryptography.hazmat.backends import default_backend
|
||||||
|
|
||||||
|
|
||||||
|
def kd(root: bytes, info: bytes, length: int = 32) -> bytes:
|
||||||
|
"""Derive a sub-key from ``root`` using HKDF-SHA256.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
root:
|
||||||
|
Root key material.
|
||||||
|
info:
|
||||||
|
Domain separation string.
|
||||||
|
length:
|
||||||
|
Length of the derived key in bytes. Defaults to 32.
|
||||||
|
"""
|
||||||
|
|
||||||
|
hkdf = HKDF(
|
||||||
|
algorithm=hashes.SHA256(),
|
||||||
|
length=length,
|
||||||
|
salt=None,
|
||||||
|
info=info,
|
||||||
|
backend=default_backend(),
|
||||||
|
)
|
||||||
|
return hkdf.derive(root)
|
@@ -33,6 +33,12 @@ logger = logging.getLogger(__name__)
|
|||||||
DEFAULT_MAX_ATTEMPTS = 5
|
DEFAULT_MAX_ATTEMPTS = 5
|
||||||
|
|
||||||
|
|
||||||
|
def _env_password() -> str | None:
|
||||||
|
"""Return a password supplied via environment for non-interactive use."""
|
||||||
|
|
||||||
|
return os.getenv("SEEDPASS_TEST_PASSWORD") or os.getenv("SEEDPASS_PASSWORD")
|
||||||
|
|
||||||
|
|
||||||
def _get_max_attempts(override: int | None = None) -> int:
|
def _get_max_attempts(override: int | None = None) -> int:
|
||||||
"""Return the configured maximum number of prompt attempts."""
|
"""Return the configured maximum number of prompt attempts."""
|
||||||
|
|
||||||
@@ -80,6 +86,13 @@ def prompt_new_password(max_retries: int | None = None) -> str:
|
|||||||
Raises:
|
Raises:
|
||||||
PasswordPromptError: If the user fails to provide a valid password after multiple attempts.
|
PasswordPromptError: If the user fails to provide a valid password after multiple attempts.
|
||||||
"""
|
"""
|
||||||
|
env_pw = _env_password()
|
||||||
|
if env_pw:
|
||||||
|
normalized = unicodedata.normalize("NFKD", env_pw)
|
||||||
|
if len(normalized) < MIN_PASSWORD_LENGTH:
|
||||||
|
raise PasswordPromptError("Environment password too short")
|
||||||
|
return normalized
|
||||||
|
|
||||||
max_retries = _get_max_attempts(max_retries)
|
max_retries = _get_max_attempts(max_retries)
|
||||||
attempts = 0
|
attempts = 0
|
||||||
|
|
||||||
@@ -164,6 +177,10 @@ def prompt_existing_password(
|
|||||||
PasswordPromptError: If the user interrupts the operation or exceeds
|
PasswordPromptError: If the user interrupts the operation or exceeds
|
||||||
``max_retries`` attempts.
|
``max_retries`` attempts.
|
||||||
"""
|
"""
|
||||||
|
env_pw = _env_password()
|
||||||
|
if env_pw:
|
||||||
|
return unicodedata.normalize("NFKD", env_pw)
|
||||||
|
|
||||||
max_retries = _get_max_attempts(max_retries)
|
max_retries = _get_max_attempts(max_retries)
|
||||||
attempts = 0
|
attempts = 0
|
||||||
while max_retries == 0 or attempts < max_retries:
|
while max_retries == 0 or attempts < max_retries:
|
||||||
|
@@ -102,9 +102,11 @@ def _masked_input_posix(prompt: str) -> str:
|
|||||||
|
|
||||||
def masked_input(prompt: str) -> str:
|
def masked_input(prompt: str) -> str:
|
||||||
"""Return input from the user while masking typed characters."""
|
"""Return input from the user while masking typed characters."""
|
||||||
if sys.platform == "win32":
|
func = _masked_input_windows if sys.platform == "win32" else _masked_input_posix
|
||||||
return _masked_input_windows(prompt)
|
try:
|
||||||
return _masked_input_posix(prompt)
|
return func(prompt)
|
||||||
|
except Exception: # pragma: no cover - fallback when TTY operations fail
|
||||||
|
return input(prompt)
|
||||||
|
|
||||||
|
|
||||||
def prompt_seed_words(count: int = 12, *, max_attempts: int | None = None) -> str:
|
def prompt_seed_words(count: int = 12, *, max_attempts: int | None = None) -> str:
|
||||||
|
Reference in New Issue
Block a user