mirror of
https://github.com/PR0M3TH3AN/SeedPass.git
synced 2025-09-08 07:18:47 +00:00
Compare commits
20 Commits
15df3f10a6
...
45c112b26b
Author | SHA1 | Date | |
---|---|---|---|
![]() |
45c112b26b | ||
![]() |
4df6ff639e | ||
![]() |
108fcfcb04 | ||
![]() |
505cf1a950 | ||
![]() |
e701a1c1cb | ||
![]() |
cb9a068e40 | ||
![]() |
c13742f3f3 | ||
![]() |
6c8b1928b8 | ||
![]() |
b1b31eeb8a | ||
![]() |
492bfba3fb | ||
![]() |
b33565e7f3 | ||
![]() |
857b1ef0f9 | ||
![]() |
7a039171a0 | ||
![]() |
dd513cf964 | ||
![]() |
16de0a82c7 | ||
![]() |
d99af30d9f | ||
![]() |
da37ec2e61 | ||
![]() |
0315562d80 | ||
![]() |
e75e197270 | ||
![]() |
619226d336 |
@@ -16,6 +16,10 @@ This software was not developed by an experienced security expert and should be
|
|||||||
|
|
||||||
Recent releases derive passwords and other artifacts using a fully deterministic algorithm that behaves consistently across Python versions. This improvement means artifacts generated with earlier versions of SeedPass will not match those produced now. Regenerate any previously derived data or retain the old version if you need to reproduce older passwords or keys.
|
Recent releases derive passwords and other artifacts using a fully deterministic algorithm that behaves consistently across Python versions. This improvement means artifacts generated with earlier versions of SeedPass will not match those produced now. Regenerate any previously derived data or retain the old version if you need to reproduce older passwords or keys.
|
||||||
|
|
||||||
|
**⚠️ First Run Warning**
|
||||||
|
|
||||||
|
Use a dedicated BIP-39 seed phrase exclusively for SeedPass. Offline Mode is **ON by default**, keeping all Nostr syncing disabled until you explicitly opt in.
|
||||||
|
|
||||||
---
|
---
|
||||||
### Supported OS
|
### Supported OS
|
||||||
|
|
||||||
@@ -785,6 +789,7 @@ You can also launch the GUI directly with `seedpass gui` or `seedpass-gui`.
|
|||||||
- **No PBKDF2 Salt Needed:** SeedPass deliberately omits an explicit PBKDF2 salt. Every password is derived from a unique 512-bit BIP-85 child seed, which already provides stronger per-password uniqueness than a conventional 128-bit salt.
|
- **No PBKDF2 Salt Needed:** SeedPass deliberately omits an explicit PBKDF2 salt. Every password is derived from a unique 512-bit BIP-85 child seed, which already provides stronger per-password uniqueness than a conventional 128-bit salt.
|
||||||
- **Checksum Verification:** Always verify the script's checksum to ensure its integrity and protect against unauthorized modifications.
|
- **Checksum Verification:** Always verify the script's checksum to ensure its integrity and protect against unauthorized modifications.
|
||||||
- **Potential Bugs and Limitations:** Be aware that the software may contain bugs and lacks certain features. Snapshot chunks are capped at 50 KB and the client rotates snapshots after enough delta events accumulate. The security of memory management and logs has not been thoroughly evaluated and may pose risks of leaking sensitive information.
|
- **Potential Bugs and Limitations:** Be aware that the software may contain bugs and lacks certain features. Snapshot chunks are capped at 50 KB and the client rotates snapshots after enough delta events accumulate. The security of memory management and logs has not been thoroughly evaluated and may pose risks of leaking sensitive information.
|
||||||
|
- **Best-Effort Memory Zeroization:** Sensitive data is wiped from memory when possible, but Python may retain copies of decrypted values.
|
||||||
- **Multiple Seeds Management:** While managing multiple seeds adds flexibility, it also increases the responsibility to secure each seed and its associated password.
|
- **Multiple Seeds Management:** While managing multiple seeds adds flexibility, it also increases the responsibility to secure each seed and its associated password.
|
||||||
- **No PBKDF2 Salt Required:** SeedPass deliberately omits an explicit PBKDF2 salt. Every password is derived from a unique 512-bit BIP-85 child seed, which already provides stronger per-password uniqueness than a conventional 128-bit salt.
|
- **No PBKDF2 Salt Required:** SeedPass deliberately omits an explicit PBKDF2 salt. Every password is derived from a unique 512-bit BIP-85 child seed, which already provides stronger per-password uniqueness than a conventional 128-bit salt.
|
||||||
- **Default KDF Iterations:** New profiles start with 50,000 PBKDF2 iterations. Adjust this with `seedpass config set kdf_iterations`.
|
- **Default KDF Iterations:** New profiles start with 50,000 PBKDF2 iterations. Adjust this with `seedpass config set kdf_iterations`.
|
||||||
|
44
docs/SPEC.md
Normal file
44
docs/SPEC.md
Normal file
@@ -0,0 +1,44 @@
|
|||||||
|
# SeedPass Specification
|
||||||
|
|
||||||
|
## Key Hierarchy
|
||||||
|
|
||||||
|
SeedPass derives a hierarchy of keys from a single BIP-39 parent seed using HKDF:
|
||||||
|
|
||||||
|
- **Master Key** – `HKDF(seed, "seedpass:v1:master")`
|
||||||
|
- **KEY_STORAGE** – used to encrypt vault data.
|
||||||
|
- **KEY_INDEX** – protects the metadata index.
|
||||||
|
- **KEY_PW_DERIVE** – deterministic password generation.
|
||||||
|
- **KEY_TOTP_DET** – deterministic TOTP secrets.
|
||||||
|
|
||||||
|
Each context string keeps derived keys domain separated.
|
||||||
|
|
||||||
|
## KDF Parameters
|
||||||
|
|
||||||
|
Passwords are protected with **PBKDF2-HMAC-SHA256**. The default work factor is
|
||||||
|
**50,000 iterations** but may be adjusted via the settings slider. The config
|
||||||
|
stores a `KdfConfig` structure with the chosen iteration count, algorithm name,
|
||||||
|
and the current spec version (`CURRENT_KDF_VERSION = 1`). Argon2 is available
|
||||||
|
with a default `time_cost` of 2 when selected.
|
||||||
|
|
||||||
|
## Message Formats
|
||||||
|
|
||||||
|
SeedPass synchronizes profiles over Nostr using three event kinds:
|
||||||
|
|
||||||
|
- **Manifest (`30070`)** – high level snapshot description and current version.
|
||||||
|
- **Snapshot Chunk (`30071`)** – compressed, encrypted portions of the vault.
|
||||||
|
- **Delta (`30072`)** – incremental changes since the last snapshot.
|
||||||
|
|
||||||
|
Events encode JSON and include tags for checksums, fingerprints, and timestamps.
|
||||||
|
|
||||||
|
## Versioning
|
||||||
|
|
||||||
|
Configuration and KDF schemas are versioned so clients can migrate older
|
||||||
|
profiles. Nostr events carry a version field in the manifest, and the software
|
||||||
|
follows semantic versioning for releases.
|
||||||
|
|
||||||
|
## Memory Protection
|
||||||
|
|
||||||
|
SeedPass encrypts sensitive values in memory and attempts to wipe them when no
|
||||||
|
longer needed. This zeroization is best-effort only; Python's memory management
|
||||||
|
may retain copies of decrypted data. Critical cryptographic operations may move
|
||||||
|
to a Rust/WASM module in the future to provide stronger guarantees.
|
@@ -127,7 +127,7 @@ Run or stop the local HTTP API.
|
|||||||
| Action | Command | Examples |
|
| Action | Command | Examples |
|
||||||
| :--- | :--- | :--- |
|
| :--- | :--- | :--- |
|
||||||
| Start the API | `api start` | `seedpass api start --host 0.0.0.0 --port 8000` |
|
| Start the API | `api start` | `seedpass api start --host 0.0.0.0 --port 8000` |
|
||||||
| Stop the API | `api stop` | `seedpass api stop` |
|
| Stop the API | `api stop --token TOKEN` | `seedpass api stop --token <token>` |
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
@@ -214,7 +214,7 @@ Set the `SEEDPASS_CORS_ORIGINS` environment variable to a comma‑separated list
|
|||||||
SEEDPASS_CORS_ORIGINS=http://localhost:3000 seedpass api start
|
SEEDPASS_CORS_ORIGINS=http://localhost:3000 seedpass api start
|
||||||
```
|
```
|
||||||
|
|
||||||
Shut down the server with `seedpass api stop`.
|
Shut down the server with `seedpass api stop --token <token>`.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
@@ -43,6 +43,7 @@ from seedpass.core.vault import Vault
|
|||||||
from seedpass.core.config_manager import ConfigManager
|
from seedpass.core.config_manager import ConfigManager
|
||||||
from seedpass.core.backup import BackupManager
|
from seedpass.core.backup import BackupManager
|
||||||
from seedpass.core.entry_management import EntryManager
|
from seedpass.core.entry_management import EntryManager
|
||||||
|
from seedpass.core.state_manager import StateManager
|
||||||
from nostr.client import NostrClient
|
from nostr.client import NostrClient
|
||||||
from utils.fingerprint import generate_fingerprint
|
from utils.fingerprint import generate_fingerprint
|
||||||
from utils.fingerprint_manager import FingerprintManager
|
from utils.fingerprint_manager import FingerprintManager
|
||||||
@@ -195,11 +196,13 @@ def main() -> None:
|
|||||||
|
|
||||||
encrypted = entry_mgr.vault.get_encrypted_index()
|
encrypted = entry_mgr.vault.get_encrypted_index()
|
||||||
if encrypted:
|
if encrypted:
|
||||||
|
idx = StateManager(dir_path).state.get("nostr_account_idx", 0)
|
||||||
client = NostrClient(
|
client = NostrClient(
|
||||||
entry_mgr.vault.encryption_manager,
|
entry_mgr.vault.encryption_manager,
|
||||||
fingerprint or dir_path.name,
|
fingerprint or dir_path.name,
|
||||||
parent_seed=seed,
|
parent_seed=seed,
|
||||||
config_manager=cfg_mgr,
|
config_manager=cfg_mgr,
|
||||||
|
account_index=idx,
|
||||||
)
|
)
|
||||||
asyncio.run(client.publish_snapshot(encrypted))
|
asyncio.run(client.publish_snapshot(encrypted))
|
||||||
print("[+] Data synchronized to Nostr.")
|
print("[+] Data synchronized to Nostr.")
|
||||||
|
66
src/main.py
66
src/main.py
@@ -670,33 +670,49 @@ def handle_set_inactivity_timeout(password_manager: PasswordManager) -> None:
|
|||||||
|
|
||||||
|
|
||||||
def handle_set_kdf_iterations(password_manager: PasswordManager) -> None:
|
def handle_set_kdf_iterations(password_manager: PasswordManager) -> None:
|
||||||
"""Change the PBKDF2 iteration count."""
|
"""Interactive slider for PBKDF2 iteration strength with benchmarking."""
|
||||||
|
import hashlib
|
||||||
|
import time
|
||||||
|
|
||||||
cfg_mgr = password_manager.config_manager
|
cfg_mgr = password_manager.config_manager
|
||||||
if cfg_mgr is None:
|
if cfg_mgr is None:
|
||||||
print(colored("Configuration manager unavailable.", "red"))
|
print(colored("Configuration manager unavailable.", "red"))
|
||||||
return
|
return
|
||||||
|
levels = [
|
||||||
|
("1", "Very Fast", 10_000),
|
||||||
|
("2", "Fast", 50_000),
|
||||||
|
("3", "Balanced", 100_000),
|
||||||
|
("4", "Slow", 200_000),
|
||||||
|
("5", "Paranoid", 500_000),
|
||||||
|
]
|
||||||
try:
|
try:
|
||||||
current = cfg_mgr.get_kdf_iterations()
|
current = cfg_mgr.get_kdf_iterations()
|
||||||
print(colored(f"Current iterations: {current}", "cyan"))
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"Error loading iterations: {e}")
|
logging.error(f"Error loading iterations: {e}")
|
||||||
print(colored(f"Error: {e}", "red"))
|
print(colored(f"Error: {e}", "red"))
|
||||||
return
|
return
|
||||||
value = input("Enter new iteration count: ").strip()
|
print(colored(f"Current iterations: {current}", "cyan"))
|
||||||
if not value:
|
for key, label, iters in levels:
|
||||||
print(colored("No iteration count entered.", "yellow"))
|
marker = "*" if iters == current else " "
|
||||||
|
print(colored(f"{key}. {label} ({iters}) {marker}", "menu"))
|
||||||
|
print(colored("b. Benchmark current setting", "menu"))
|
||||||
|
choice = input("Select strength or 'b' to benchmark: ").strip().lower()
|
||||||
|
if not choice:
|
||||||
|
print(colored("No change made.", "yellow"))
|
||||||
|
return
|
||||||
|
if choice == "b":
|
||||||
|
start = time.perf_counter()
|
||||||
|
hashlib.pbkdf2_hmac("sha256", b"bench", b"salt", current)
|
||||||
|
elapsed = time.perf_counter() - start
|
||||||
|
print(colored(f"{current} iterations took {elapsed:.2f}s", "green"))
|
||||||
|
return
|
||||||
|
selected = {k: v for k, _, v in levels}.get(choice)
|
||||||
|
if not selected:
|
||||||
|
print(colored("Invalid choice.", "red"))
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
iterations = int(value)
|
cfg_mgr.set_kdf_iterations(selected)
|
||||||
if iterations <= 0:
|
print(colored(f"KDF iteration count set to {selected}.", "green"))
|
||||||
print(colored("Iterations must be positive.", "red"))
|
|
||||||
return
|
|
||||||
except ValueError:
|
|
||||||
print(colored("Invalid number.", "red"))
|
|
||||||
return
|
|
||||||
try:
|
|
||||||
cfg_mgr.set_kdf_iterations(iterations)
|
|
||||||
print(colored("KDF iteration count updated.", "green"))
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"Error saving iterations: {e}")
|
logging.error(f"Error saving iterations: {e}")
|
||||||
print(colored(f"Error: {e}", "red"))
|
print(colored(f"Error: {e}", "red"))
|
||||||
@@ -1014,12 +1030,12 @@ def handle_settings(password_manager: PasswordManager) -> None:
|
|||||||
print(color_text("8. Import database", "menu"))
|
print(color_text("8. Import database", "menu"))
|
||||||
print(color_text("9. Export 2FA codes", "menu"))
|
print(color_text("9. Export 2FA codes", "menu"))
|
||||||
print(color_text("10. Set additional backup location", "menu"))
|
print(color_text("10. Set additional backup location", "menu"))
|
||||||
print(color_text("11. Set KDF iterations", "menu"))
|
print(color_text("11. KDF strength & benchmark", "menu"))
|
||||||
print(color_text("12. Set inactivity timeout", "menu"))
|
print(color_text("12. Set inactivity timeout", "menu"))
|
||||||
print(color_text("13. Lock Vault", "menu"))
|
print(color_text("13. Lock Vault", "menu"))
|
||||||
print(color_text("14. Stats", "menu"))
|
print(color_text("14. Stats", "menu"))
|
||||||
print(color_text("15. Toggle Secret Mode", "menu"))
|
print(color_text("15. Toggle Secret Mode", "menu"))
|
||||||
print(color_text("16. Toggle Offline Mode", "menu"))
|
print(color_text("16. Toggle Offline Mode (default ON)", "menu"))
|
||||||
print(color_text("17. Toggle Quick Unlock", "menu"))
|
print(color_text("17. Toggle Quick Unlock", "menu"))
|
||||||
choice = input("Select an option or press Enter to go back: ").strip()
|
choice = input("Select an option or press Enter to go back: ").strip()
|
||||||
if choice == "1":
|
if choice == "1":
|
||||||
@@ -1294,6 +1310,11 @@ def main(argv: list[str] | None = None, *, fingerprint: str | None = None) -> in
|
|||||||
action="store_true",
|
action="store_true",
|
||||||
help="Disable clipboard support and print secrets",
|
help="Disable clipboard support and print secrets",
|
||||||
)
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--deterministic-totp",
|
||||||
|
action="store_true",
|
||||||
|
help="Derive TOTP secrets deterministically",
|
||||||
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--max-prompt-attempts",
|
"--max-prompt-attempts",
|
||||||
type=int,
|
type=int,
|
||||||
@@ -1304,6 +1325,11 @@ def main(argv: list[str] | None = None, *, fingerprint: str | None = None) -> in
|
|||||||
|
|
||||||
exp = sub.add_parser("export")
|
exp = sub.add_parser("export")
|
||||||
exp.add_argument("--file")
|
exp.add_argument("--file")
|
||||||
|
exp.add_argument(
|
||||||
|
"--unencrypted",
|
||||||
|
action="store_true",
|
||||||
|
help="Export without encryption",
|
||||||
|
)
|
||||||
|
|
||||||
imp = sub.add_parser("import")
|
imp = sub.add_parser("import")
|
||||||
imp.add_argument("--file")
|
imp.add_argument("--file")
|
||||||
@@ -1371,9 +1397,13 @@ def main(argv: list[str] | None = None, *, fingerprint: str | None = None) -> in
|
|||||||
|
|
||||||
if args.no_clipboard:
|
if args.no_clipboard:
|
||||||
password_manager.secret_mode_enabled = False
|
password_manager.secret_mode_enabled = False
|
||||||
|
if args.deterministic_totp:
|
||||||
|
password_manager.deterministic_totp = True
|
||||||
|
|
||||||
if args.command == "export":
|
if args.command == "export":
|
||||||
password_manager.handle_export_database(Path(args.file))
|
password_manager.handle_export_database(
|
||||||
|
Path(args.file), encrypt=not args.unencrypted
|
||||||
|
)
|
||||||
return 0
|
return 0
|
||||||
if args.command == "import":
|
if args.command == "import":
|
||||||
password_manager.handle_import_database(Path(args.file))
|
password_manager.handle_import_database(Path(args.file))
|
||||||
|
@@ -25,3 +25,4 @@ class Manifest:
|
|||||||
algo: str
|
algo: str
|
||||||
chunks: List[ChunkMeta]
|
chunks: List[ChunkMeta]
|
||||||
delta_since: Optional[int] = None
|
delta_since: Optional[int] = None
|
||||||
|
nonce: Optional[str] = None
|
||||||
|
@@ -33,7 +33,7 @@ from .backup_models import (
|
|||||||
)
|
)
|
||||||
from .connection import ConnectionHandler, DEFAULT_RELAYS
|
from .connection import ConnectionHandler, DEFAULT_RELAYS
|
||||||
from .key_manager import KeyManager as SeedPassKeyManager
|
from .key_manager import KeyManager as SeedPassKeyManager
|
||||||
from .snapshot import MANIFEST_ID_PREFIX, SnapshotHandler, prepare_snapshot
|
from .snapshot import SnapshotHandler, prepare_snapshot
|
||||||
|
|
||||||
if TYPE_CHECKING: # pragma: no cover - imported for type hints
|
if TYPE_CHECKING: # pragma: no cover - imported for type hints
|
||||||
from seedpass.core.config_manager import ConfigManager
|
from seedpass.core.config_manager import ConfigManager
|
||||||
@@ -57,6 +57,8 @@ class NostrClient(ConnectionHandler, SnapshotHandler):
|
|||||||
parent_seed: Optional[str] = None,
|
parent_seed: Optional[str] = None,
|
||||||
offline_mode: bool = False,
|
offline_mode: bool = False,
|
||||||
config_manager: Optional["ConfigManager"] = None,
|
config_manager: Optional["ConfigManager"] = None,
|
||||||
|
key_index: bytes | None = None,
|
||||||
|
account_index: int | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
self.encryption_manager = encryption_manager
|
self.encryption_manager = encryption_manager
|
||||||
self.fingerprint = fingerprint
|
self.fingerprint = fingerprint
|
||||||
@@ -68,7 +70,7 @@ class NostrClient(ConnectionHandler, SnapshotHandler):
|
|||||||
parent_seed = self.encryption_manager.decrypt_parent_seed()
|
parent_seed = self.encryption_manager.decrypt_parent_seed()
|
||||||
|
|
||||||
# Use our project's KeyManager to derive the private key
|
# Use our project's KeyManager to derive the private key
|
||||||
self.key_manager = KeyManager(parent_seed, fingerprint)
|
self.key_manager = KeyManager(parent_seed, fingerprint, account_index)
|
||||||
|
|
||||||
# Create a nostr-sdk Keys object from our derived private key
|
# Create a nostr-sdk Keys object from our derived private key
|
||||||
private_key_hex = self.key_manager.keys.private_key_hex()
|
private_key_hex = self.key_manager.keys.private_key_hex()
|
||||||
@@ -99,6 +101,7 @@ class NostrClient(ConnectionHandler, SnapshotHandler):
|
|||||||
self.current_manifest: Manifest | None = None
|
self.current_manifest: Manifest | None = None
|
||||||
self.current_manifest_id: str | None = None
|
self.current_manifest_id: str | None = None
|
||||||
self._delta_events: list[str] = []
|
self._delta_events: list[str] = []
|
||||||
|
self.key_index = key_index or b""
|
||||||
|
|
||||||
# Configure and initialize the nostr-sdk Client
|
# Configure and initialize the nostr-sdk Client
|
||||||
signer = NostrSigner.keys(self.keys)
|
signer = NostrSigner.keys(self.keys)
|
||||||
@@ -111,5 +114,4 @@ __all__ = [
|
|||||||
"NostrClient",
|
"NostrClient",
|
||||||
"prepare_snapshot",
|
"prepare_snapshot",
|
||||||
"DEFAULT_RELAYS",
|
"DEFAULT_RELAYS",
|
||||||
"MANIFEST_ID_PREFIX",
|
|
||||||
]
|
]
|
||||||
|
@@ -16,17 +16,22 @@ logger = logging.getLogger(__name__)
|
|||||||
|
|
||||||
|
|
||||||
class KeyManager:
|
class KeyManager:
|
||||||
"""
|
"""Manages key generation, encoding, and derivation for ``NostrClient``."""
|
||||||
Manages key generation, encoding, and derivation for NostrClient.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, parent_seed: str, fingerprint: str):
|
def __init__(
|
||||||
"""
|
self, parent_seed: str, fingerprint: str, account_index: int | None = None
|
||||||
Initializes the KeyManager with the provided parent_seed and fingerprint.
|
):
|
||||||
|
"""Initialize the key manager.
|
||||||
|
|
||||||
Parameters:
|
Parameters
|
||||||
parent_seed (str): The parent seed used for key derivation.
|
----------
|
||||||
fingerprint (str): The fingerprint to differentiate key derivations.
|
parent_seed:
|
||||||
|
The BIP-39 seed used as the root for derivations.
|
||||||
|
fingerprint:
|
||||||
|
Seed profile fingerprint used for legacy derivations and logging.
|
||||||
|
account_index:
|
||||||
|
Optional explicit index for BIP-85 Nostr key derivation. When ``None``
|
||||||
|
the index defaults to ``0``.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
if not isinstance(parent_seed, str):
|
if not isinstance(parent_seed, str):
|
||||||
@@ -40,12 +45,15 @@ class KeyManager:
|
|||||||
|
|
||||||
self.parent_seed = parent_seed
|
self.parent_seed = parent_seed
|
||||||
self.fingerprint = fingerprint
|
self.fingerprint = fingerprint
|
||||||
logger.debug(f"KeyManager initialized with parent_seed and fingerprint.")
|
self.account_index = account_index
|
||||||
|
logger.debug(
|
||||||
|
"KeyManager initialized with parent_seed, fingerprint and account index."
|
||||||
|
)
|
||||||
|
|
||||||
# Initialize BIP85
|
# Initialize BIP85
|
||||||
self.bip85 = self.initialize_bip85()
|
self.bip85 = self.initialize_bip85()
|
||||||
|
|
||||||
# Generate Nostr keys using the fingerprint
|
# Generate Nostr keys using the provided account index
|
||||||
self.keys = self.generate_nostr_keys()
|
self.keys = self.generate_nostr_keys()
|
||||||
logger.debug("Nostr Keys initialized successfully.")
|
logger.debug("Nostr Keys initialized successfully.")
|
||||||
|
|
||||||
@@ -70,34 +78,36 @@ class KeyManager:
|
|||||||
raise
|
raise
|
||||||
|
|
||||||
def generate_nostr_keys(self) -> Keys:
|
def generate_nostr_keys(self) -> Keys:
|
||||||
"""
|
"""Derive a Nostr key pair using the configured ``account_index``."""
|
||||||
Derives a unique Nostr key pair for the given fingerprint using BIP-85.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Keys: An instance of Keys containing the Nostr key pair.
|
|
||||||
"""
|
|
||||||
try:
|
try:
|
||||||
# Convert fingerprint to an integer index (using a hash function)
|
index = self.account_index if self.account_index is not None else 0
|
||||||
index = int(hashlib.sha256(self.fingerprint.encode()).hexdigest(), 16) % (
|
|
||||||
2**31
|
|
||||||
)
|
|
||||||
|
|
||||||
# Derive entropy for Nostr key (32 bytes)
|
|
||||||
entropy_bytes = self.bip85.derive_entropy(
|
entropy_bytes = self.bip85.derive_entropy(
|
||||||
index=index,
|
index=index, entropy_bytes=32, app_no=NOSTR_KEY_APP_ID
|
||||||
entropy_bytes=32,
|
|
||||||
app_no=NOSTR_KEY_APP_ID,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# Generate Nostr key pair from entropy
|
|
||||||
private_key_hex = entropy_bytes.hex()
|
private_key_hex = entropy_bytes.hex()
|
||||||
keys = Keys(priv_k=private_key_hex)
|
keys = Keys(priv_k=private_key_hex)
|
||||||
logger.debug(f"Nostr keys generated for fingerprint {self.fingerprint}.")
|
logger.debug("Nostr keys generated for account index %s", index)
|
||||||
return keys
|
return keys
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to generate Nostr keys: {e}", exc_info=True)
|
logger.error(f"Failed to generate Nostr keys: {e}", exc_info=True)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
def generate_v1_nostr_keys(self) -> Keys:
|
||||||
|
"""Derive keys using the legacy fingerprint-hash method."""
|
||||||
|
try:
|
||||||
|
index = int(hashlib.sha256(self.fingerprint.encode()).hexdigest(), 16) % (
|
||||||
|
2**31
|
||||||
|
)
|
||||||
|
entropy_bytes = self.bip85.derive_entropy(
|
||||||
|
index=index, entropy_bytes=32, app_no=NOSTR_KEY_APP_ID
|
||||||
|
)
|
||||||
|
return Keys(priv_k=entropy_bytes.hex())
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to generate v1 Nostr keys: {e}", exc_info=True)
|
||||||
|
raise
|
||||||
|
|
||||||
def generate_legacy_nostr_keys(self) -> Keys:
|
def generate_legacy_nostr_keys(self) -> Keys:
|
||||||
"""Derive Nostr keys using the legacy application ID."""
|
"""Derive Nostr keys using the legacy application ID."""
|
||||||
try:
|
try:
|
||||||
|
@@ -2,8 +2,10 @@ import asyncio
|
|||||||
import base64
|
import base64
|
||||||
import gzip
|
import gzip
|
||||||
import hashlib
|
import hashlib
|
||||||
|
import hmac
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
import os
|
||||||
import time
|
import time
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
from typing import Tuple
|
from typing import Tuple
|
||||||
@@ -23,9 +25,6 @@ from .backup_models import (
|
|||||||
logger = logging.getLogger("nostr.client")
|
logger = logging.getLogger("nostr.client")
|
||||||
logger.setLevel(logging.WARNING)
|
logger.setLevel(logging.WARNING)
|
||||||
|
|
||||||
# Identifier prefix for replaceable manifest events
|
|
||||||
MANIFEST_ID_PREFIX = "seedpass-manifest-"
|
|
||||||
|
|
||||||
|
|
||||||
def prepare_snapshot(
|
def prepare_snapshot(
|
||||||
encrypted_bytes: bytes, limit: int
|
encrypted_bytes: bytes, limit: int
|
||||||
@@ -47,6 +46,19 @@ def prepare_snapshot(
|
|||||||
return manifest, chunks
|
return manifest, chunks
|
||||||
|
|
||||||
|
|
||||||
|
def new_manifest_id(key_index: bytes) -> tuple[str, bytes]:
|
||||||
|
"""Return a new manifest identifier and nonce.
|
||||||
|
|
||||||
|
The identifier is computed as HMAC-SHA256 of ``b"manifest|" + nonce``
|
||||||
|
using ``key_index`` as the HMAC key. The nonce is returned so it can be
|
||||||
|
embedded inside the manifest itself.
|
||||||
|
"""
|
||||||
|
|
||||||
|
nonce = os.urandom(16)
|
||||||
|
digest = hmac.new(key_index, b"manifest|" + nonce, hashlib.sha256).hexdigest()
|
||||||
|
return digest, nonce
|
||||||
|
|
||||||
|
|
||||||
class SnapshotHandler:
|
class SnapshotHandler:
|
||||||
"""Mixin providing chunk and manifest handling."""
|
"""Mixin providing chunk and manifest handling."""
|
||||||
|
|
||||||
@@ -84,34 +96,43 @@ class SnapshotHandler:
|
|||||||
except Exception:
|
except Exception:
|
||||||
meta.event_id = None
|
meta.event_id = None
|
||||||
|
|
||||||
|
if (
|
||||||
|
self.current_manifest_id
|
||||||
|
and self.current_manifest
|
||||||
|
and getattr(self.current_manifest, "nonce", None)
|
||||||
|
):
|
||||||
|
manifest_id = self.current_manifest_id
|
||||||
|
manifest.nonce = self.current_manifest.nonce
|
||||||
|
else:
|
||||||
|
manifest_id, nonce = new_manifest_id(self.key_index)
|
||||||
|
manifest.nonce = base64.b64encode(nonce).decode("utf-8")
|
||||||
|
|
||||||
manifest_json = json.dumps(
|
manifest_json = json.dumps(
|
||||||
{
|
{
|
||||||
"ver": manifest.ver,
|
"ver": manifest.ver,
|
||||||
"algo": manifest.algo,
|
"algo": manifest.algo,
|
||||||
"chunks": [meta.__dict__ for meta in manifest.chunks],
|
"chunks": [meta.__dict__ for meta in manifest.chunks],
|
||||||
"delta_since": manifest.delta_since,
|
"delta_since": manifest.delta_since,
|
||||||
|
"nonce": manifest.nonce,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
manifest_identifier = (
|
|
||||||
self.current_manifest_id or f"{MANIFEST_ID_PREFIX}{self.fingerprint}"
|
|
||||||
)
|
|
||||||
manifest_event = (
|
manifest_event = (
|
||||||
nostr_client.EventBuilder(nostr_client.Kind(KIND_MANIFEST), manifest_json)
|
nostr_client.EventBuilder(nostr_client.Kind(KIND_MANIFEST), manifest_json)
|
||||||
.tags([nostr_client.Tag.identifier(manifest_identifier)])
|
.tags([nostr_client.Tag.identifier(manifest_id)])
|
||||||
.build(self.keys.public_key())
|
.build(self.keys.public_key())
|
||||||
.sign_with_keys(self.keys)
|
.sign_with_keys(self.keys)
|
||||||
)
|
)
|
||||||
await self.client.send_event(manifest_event)
|
await self.client.send_event(manifest_event)
|
||||||
with self._state_lock:
|
with self._state_lock:
|
||||||
self.current_manifest = manifest
|
self.current_manifest = manifest
|
||||||
self.current_manifest_id = manifest_identifier
|
self.current_manifest_id = manifest_id
|
||||||
self.current_manifest.delta_since = int(time.time())
|
self.current_manifest.delta_since = int(time.time())
|
||||||
self._delta_events = []
|
self._delta_events = []
|
||||||
if getattr(self, "verbose_timing", False):
|
if getattr(self, "verbose_timing", False):
|
||||||
duration = time.perf_counter() - start
|
duration = time.perf_counter() - start
|
||||||
logger.info("publish_snapshot completed in %.2f seconds", duration)
|
logger.info("publish_snapshot completed in %.2f seconds", duration)
|
||||||
return manifest, manifest_identifier
|
return manifest, manifest_id
|
||||||
|
|
||||||
async def _fetch_chunks_with_retry(
|
async def _fetch_chunks_with_retry(
|
||||||
self, manifest_event
|
self, manifest_event
|
||||||
@@ -129,6 +150,7 @@ class SnapshotHandler:
|
|||||||
if data.get("delta_since") is not None
|
if data.get("delta_since") is not None
|
||||||
else None
|
else None
|
||||||
),
|
),
|
||||||
|
nonce=data.get("nonce"),
|
||||||
)
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
return None
|
return None
|
||||||
@@ -204,14 +226,11 @@ class SnapshotHandler:
|
|||||||
pubkey = self.keys.public_key()
|
pubkey = self.keys.public_key()
|
||||||
timeout = timedelta(seconds=10)
|
timeout = timedelta(seconds=10)
|
||||||
|
|
||||||
ident = f"{MANIFEST_ID_PREFIX}{self.fingerprint}"
|
ident = self.current_manifest_id
|
||||||
f = (
|
f = nostr_client.Filter().author(pubkey).kind(nostr_client.Kind(KIND_MANIFEST))
|
||||||
nostr_client.Filter()
|
if ident:
|
||||||
.author(pubkey)
|
f = f.identifier(ident)
|
||||||
.kind(nostr_client.Kind(KIND_MANIFEST))
|
f = f.limit(1)
|
||||||
.identifier(ident)
|
|
||||||
.limit(1)
|
|
||||||
)
|
|
||||||
try:
|
try:
|
||||||
events = (await self.client.fetch_events(f, timeout)).to_vec()
|
events = (await self.client.fetch_events(f, timeout)).to_vec()
|
||||||
except Exception as e: # pragma: no cover - network errors
|
except Exception as e: # pragma: no cover - network errors
|
||||||
@@ -223,13 +242,11 @@ class SnapshotHandler:
|
|||||||
)
|
)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
if not events:
|
if not events and ident:
|
||||||
ident = MANIFEST_ID_PREFIX.rstrip("-")
|
|
||||||
f = (
|
f = (
|
||||||
nostr_client.Filter()
|
nostr_client.Filter()
|
||||||
.author(pubkey)
|
.author(pubkey)
|
||||||
.kind(nostr_client.Kind(KIND_MANIFEST))
|
.kind(nostr_client.Kind(KIND_MANIFEST))
|
||||||
.identifier(ident)
|
|
||||||
.limit(1)
|
.limit(1)
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
@@ -245,8 +262,6 @@ class SnapshotHandler:
|
|||||||
if not events:
|
if not events:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
logger.info("Fetched manifest using identifier %s", ident)
|
|
||||||
|
|
||||||
for manifest_event in events:
|
for manifest_event in events:
|
||||||
try:
|
try:
|
||||||
result = await self._fetch_chunks_with_retry(manifest_event)
|
result = await self._fetch_chunks_with_retry(manifest_event)
|
||||||
@@ -300,7 +315,9 @@ class SnapshotHandler:
|
|||||||
return
|
return
|
||||||
await self._connect_async()
|
await self._connect_async()
|
||||||
pubkey = self.keys.public_key()
|
pubkey = self.keys.public_key()
|
||||||
ident = self.current_manifest_id or f"{MANIFEST_ID_PREFIX}{self.fingerprint}"
|
ident = self.current_manifest_id
|
||||||
|
if ident is None:
|
||||||
|
return
|
||||||
f = (
|
f = (
|
||||||
nostr_client.Filter()
|
nostr_client.Filter()
|
||||||
.author(pubkey)
|
.author(pubkey)
|
||||||
@@ -358,6 +375,7 @@ class SnapshotHandler:
|
|||||||
meta.__dict__ for meta in self.current_manifest.chunks
|
meta.__dict__ for meta in self.current_manifest.chunks
|
||||||
],
|
],
|
||||||
"delta_since": self.current_manifest.delta_since,
|
"delta_since": self.current_manifest.delta_since,
|
||||||
|
"nonce": self.current_manifest.nonce,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
manifest_event = (
|
manifest_event = (
|
||||||
|
@@ -9,8 +9,6 @@ import secrets
|
|||||||
import queue
|
import queue
|
||||||
from typing import Any, List, Optional
|
from typing import Any, List, Optional
|
||||||
|
|
||||||
from datetime import datetime, timedelta, timezone
|
|
||||||
import jwt
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from fastapi import FastAPI, Header, HTTPException, Request, Response
|
from fastapi import FastAPI, Header, HTTPException, Request, Response
|
||||||
@@ -18,8 +16,8 @@ from fastapi.concurrency import run_in_threadpool
|
|||||||
import asyncio
|
import asyncio
|
||||||
import sys
|
import sys
|
||||||
from fastapi.middleware.cors import CORSMiddleware
|
from fastapi.middleware.cors import CORSMiddleware
|
||||||
import hashlib
|
|
||||||
import hmac
|
import bcrypt
|
||||||
|
|
||||||
from slowapi import Limiter, _rate_limit_exceeded_handler
|
from slowapi import Limiter, _rate_limit_exceeded_handler
|
||||||
from slowapi.errors import RateLimitExceeded
|
from slowapi.errors import RateLimitExceeded
|
||||||
@@ -50,16 +48,9 @@ def _get_pm(request: Request) -> PasswordManager:
|
|||||||
def _check_token(request: Request, auth: str | None) -> None:
|
def _check_token(request: Request, auth: str | None) -> None:
|
||||||
if auth is None or not auth.startswith("Bearer "):
|
if auth is None or not auth.startswith("Bearer "):
|
||||||
raise HTTPException(status_code=401, detail="Unauthorized")
|
raise HTTPException(status_code=401, detail="Unauthorized")
|
||||||
token = auth.split(" ", 1)[1]
|
token = auth.split(" ", 1)[1].encode()
|
||||||
jwt_secret = getattr(request.app.state, "jwt_secret", "")
|
token_hash = getattr(request.app.state, "token_hash", b"")
|
||||||
token_hash = getattr(request.app.state, "token_hash", "")
|
if not token_hash or not bcrypt.checkpw(token, token_hash):
|
||||||
try:
|
|
||||||
jwt.decode(token, jwt_secret, algorithms=["HS256"])
|
|
||||||
except jwt.ExpiredSignatureError:
|
|
||||||
raise HTTPException(status_code=401, detail="Token expired")
|
|
||||||
except jwt.InvalidTokenError:
|
|
||||||
raise HTTPException(status_code=401, detail="Unauthorized")
|
|
||||||
if not hmac.compare_digest(hashlib.sha256(token.encode()).hexdigest(), token_hash):
|
|
||||||
raise HTTPException(status_code=401, detail="Unauthorized")
|
raise HTTPException(status_code=401, detail="Unauthorized")
|
||||||
|
|
||||||
|
|
||||||
@@ -78,7 +69,7 @@ def _reload_relays(request: Request, relays: list[str]) -> None:
|
|||||||
|
|
||||||
|
|
||||||
def start_server(fingerprint: str | None = None) -> str:
|
def start_server(fingerprint: str | None = None) -> str:
|
||||||
"""Initialize global state and return a short-lived JWT token.
|
"""Initialize global state and return a random API token.
|
||||||
|
|
||||||
Parameters
|
Parameters
|
||||||
----------
|
----------
|
||||||
@@ -90,10 +81,8 @@ def start_server(fingerprint: str | None = None) -> str:
|
|||||||
else:
|
else:
|
||||||
pm = PasswordManager(fingerprint=fingerprint)
|
pm = PasswordManager(fingerprint=fingerprint)
|
||||||
app.state.pm = pm
|
app.state.pm = pm
|
||||||
app.state.jwt_secret = secrets.token_urlsafe(32)
|
raw_token = secrets.token_urlsafe(32)
|
||||||
payload = {"exp": datetime.now(timezone.utc) + timedelta(minutes=5)}
|
app.state.token_hash = bcrypt.hashpw(raw_token.encode(), bcrypt.gensalt())
|
||||||
raw_token = jwt.encode(payload, app.state.jwt_secret, algorithm="HS256")
|
|
||||||
app.state.token_hash = hashlib.sha256(raw_token.encode()).hexdigest()
|
|
||||||
if not getattr(app.state, "limiter", None):
|
if not getattr(app.state, "limiter", None):
|
||||||
app.state.limiter = limiter
|
app.state.limiter = limiter
|
||||||
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
|
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
|
||||||
@@ -214,13 +203,14 @@ async def create_entry(
|
|||||||
uri = await run_in_threadpool(
|
uri = await run_in_threadpool(
|
||||||
pm.entry_manager.add_totp,
|
pm.entry_manager.add_totp,
|
||||||
entry.get("label"),
|
entry.get("label"),
|
||||||
pm.parent_seed,
|
pm.KEY_TOTP_DET if entry.get("deterministic", False) else None,
|
||||||
secret=entry.get("secret"),
|
secret=entry.get("secret"),
|
||||||
index=entry.get("index"),
|
index=entry.get("index"),
|
||||||
period=int(entry.get("period", 30)),
|
period=int(entry.get("period", 30)),
|
||||||
digits=int(entry.get("digits", 6)),
|
digits=int(entry.get("digits", 6)),
|
||||||
notes=entry.get("notes", ""),
|
notes=entry.get("notes", ""),
|
||||||
archived=entry.get("archived", False),
|
archived=entry.get("archived", False),
|
||||||
|
deterministic=entry.get("deterministic", False),
|
||||||
)
|
)
|
||||||
return {"id": index, "uri": uri}
|
return {"id": index, "uri": uri}
|
||||||
|
|
||||||
|
@@ -30,6 +30,13 @@ no_clipboard_option = typer.Option(
|
|||||||
is_flag=True,
|
is_flag=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
deterministic_totp_option = typer.Option(
|
||||||
|
False,
|
||||||
|
"--deterministic-totp",
|
||||||
|
help="Derive TOTP secrets deterministically",
|
||||||
|
is_flag=True,
|
||||||
|
)
|
||||||
|
|
||||||
# Sub command groups
|
# Sub command groups
|
||||||
from . import entry, vault, nostr, config, fingerprint, util, api
|
from . import entry, vault, nostr, config, fingerprint, util, api
|
||||||
|
|
||||||
@@ -55,12 +62,17 @@ def main(
|
|||||||
ctx: typer.Context,
|
ctx: typer.Context,
|
||||||
fingerprint: Optional[str] = fingerprint_option,
|
fingerprint: Optional[str] = fingerprint_option,
|
||||||
no_clipboard: bool = no_clipboard_option,
|
no_clipboard: bool = no_clipboard_option,
|
||||||
|
deterministic_totp: bool = deterministic_totp_option,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""SeedPass CLI entry point.
|
"""SeedPass CLI entry point.
|
||||||
|
|
||||||
When called without a subcommand this launches the interactive TUI.
|
When called without a subcommand this launches the interactive TUI.
|
||||||
"""
|
"""
|
||||||
ctx.obj = {"fingerprint": fingerprint, "no_clipboard": no_clipboard}
|
ctx.obj = {
|
||||||
|
"fingerprint": fingerprint,
|
||||||
|
"no_clipboard": no_clipboard,
|
||||||
|
"deterministic_totp": deterministic_totp,
|
||||||
|
}
|
||||||
if ctx.invoked_subcommand is None:
|
if ctx.invoked_subcommand is None:
|
||||||
tui = importlib.import_module("main")
|
tui = importlib.import_module("main")
|
||||||
raise typer.Exit(tui.main(fingerprint=fingerprint))
|
raise typer.Exit(tui.main(fingerprint=fingerprint))
|
||||||
|
@@ -13,19 +13,25 @@ app = typer.Typer(help="Run the API server")
|
|||||||
def api_start(ctx: typer.Context, host: str = "127.0.0.1", port: int = 8000) -> None:
|
def api_start(ctx: typer.Context, host: str = "127.0.0.1", port: int = 8000) -> None:
|
||||||
"""Start the SeedPass API server."""
|
"""Start the SeedPass API server."""
|
||||||
token = api_module.start_server(ctx.obj.get("fingerprint"))
|
token = api_module.start_server(ctx.obj.get("fingerprint"))
|
||||||
typer.echo(f"API token: {token}")
|
typer.echo(
|
||||||
|
f"API token: {token}\nWARNING: Store this token securely; it cannot be recovered."
|
||||||
|
)
|
||||||
uvicorn.run(api_module.app, host=host, port=port)
|
uvicorn.run(api_module.app, host=host, port=port)
|
||||||
|
|
||||||
|
|
||||||
@app.command("stop")
|
@app.command("stop")
|
||||||
def api_stop(ctx: typer.Context, host: str = "127.0.0.1", port: int = 8000) -> None:
|
def api_stop(
|
||||||
|
token: str = typer.Option(..., help="API token"),
|
||||||
|
host: str = "127.0.0.1",
|
||||||
|
port: int = 8000,
|
||||||
|
) -> None:
|
||||||
"""Stop the SeedPass API server."""
|
"""Stop the SeedPass API server."""
|
||||||
import requests
|
import requests
|
||||||
|
|
||||||
try:
|
try:
|
||||||
requests.post(
|
requests.post(
|
||||||
f"http://{host}:{port}/api/v1/shutdown",
|
f"http://{host}:{port}/api/v1/shutdown",
|
||||||
headers={"Authorization": f"Bearer {api_module.app.state.token_hash}"},
|
headers={"Authorization": f"Bearer {token}"},
|
||||||
timeout=2,
|
timeout=2,
|
||||||
)
|
)
|
||||||
except Exception as exc: # pragma: no cover - best effort
|
except Exception as exc: # pragma: no cover - best effort
|
||||||
|
@@ -29,6 +29,8 @@ def _get_pm(ctx: typer.Context) -> PasswordManager:
|
|||||||
pm = PasswordManager(fingerprint=fp)
|
pm = PasswordManager(fingerprint=fp)
|
||||||
if ctx.obj.get("no_clipboard"):
|
if ctx.obj.get("no_clipboard"):
|
||||||
pm.secret_mode_enabled = False
|
pm.secret_mode_enabled = False
|
||||||
|
if ctx.obj.get("deterministic_totp"):
|
||||||
|
pm.deterministic_totp = True
|
||||||
return pm
|
return pm
|
||||||
|
|
||||||
|
|
||||||
|
@@ -177,6 +177,9 @@ def entry_add_totp(
|
|||||||
secret: Optional[str] = typer.Option(None, "--secret", help="Import secret"),
|
secret: Optional[str] = typer.Option(None, "--secret", help="Import secret"),
|
||||||
period: int = typer.Option(30, "--period", help="TOTP period in seconds"),
|
period: int = typer.Option(30, "--period", help="TOTP period in seconds"),
|
||||||
digits: int = typer.Option(6, "--digits", help="Number of TOTP digits"),
|
digits: int = typer.Option(6, "--digits", help="Number of TOTP digits"),
|
||||||
|
deterministic_totp: bool = typer.Option(
|
||||||
|
False, "--deterministic-totp", help="Derive secret deterministically"
|
||||||
|
),
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Add a TOTP entry and output the otpauth URI."""
|
"""Add a TOTP entry and output the otpauth URI."""
|
||||||
service = _get_entry_service(ctx)
|
service = _get_entry_service(ctx)
|
||||||
@@ -186,6 +189,7 @@ def entry_add_totp(
|
|||||||
secret=secret,
|
secret=secret,
|
||||||
period=period,
|
period=period,
|
||||||
digits=digits,
|
digits=digits,
|
||||||
|
deterministic=deterministic_totp,
|
||||||
)
|
)
|
||||||
typer.echo(uri)
|
typer.echo(uri)
|
||||||
|
|
||||||
|
@@ -363,15 +363,18 @@ class EntryService:
|
|||||||
secret: str | None = None,
|
secret: str | None = None,
|
||||||
period: int = 30,
|
period: int = 30,
|
||||||
digits: int = 6,
|
digits: int = 6,
|
||||||
|
deterministic: bool = False,
|
||||||
) -> str:
|
) -> str:
|
||||||
with self._lock:
|
with self._lock:
|
||||||
|
key = self._manager.KEY_TOTP_DET if deterministic else None
|
||||||
uri = self._manager.entry_manager.add_totp(
|
uri = self._manager.entry_manager.add_totp(
|
||||||
label,
|
label,
|
||||||
self._manager.parent_seed,
|
key,
|
||||||
index=index,
|
index=index,
|
||||||
secret=secret,
|
secret=secret,
|
||||||
period=period,
|
period=period,
|
||||||
digits=digits,
|
digits=digits,
|
||||||
|
deterministic=deterministic,
|
||||||
)
|
)
|
||||||
self._manager.start_background_vault_sync()
|
self._manager.start_background_vault_sync()
|
||||||
return uri
|
return uri
|
||||||
|
@@ -41,7 +41,7 @@ class ConfigManager:
|
|||||||
logger.info("Config file not found; returning defaults")
|
logger.info("Config file not found; returning defaults")
|
||||||
return {
|
return {
|
||||||
"relays": list(DEFAULT_NOSTR_RELAYS),
|
"relays": list(DEFAULT_NOSTR_RELAYS),
|
||||||
"offline_mode": False,
|
"offline_mode": True,
|
||||||
"pin_hash": "",
|
"pin_hash": "",
|
||||||
"password_hash": "",
|
"password_hash": "",
|
||||||
"inactivity_timeout": INACTIVITY_TIMEOUT,
|
"inactivity_timeout": INACTIVITY_TIMEOUT,
|
||||||
@@ -71,7 +71,7 @@ class ConfigManager:
|
|||||||
raise ValueError("Config data must be a dictionary")
|
raise ValueError("Config data must be a dictionary")
|
||||||
# Ensure defaults for missing keys
|
# Ensure defaults for missing keys
|
||||||
data.setdefault("relays", list(DEFAULT_NOSTR_RELAYS))
|
data.setdefault("relays", list(DEFAULT_NOSTR_RELAYS))
|
||||||
data.setdefault("offline_mode", False)
|
data.setdefault("offline_mode", True)
|
||||||
data.setdefault("pin_hash", "")
|
data.setdefault("pin_hash", "")
|
||||||
data.setdefault("password_hash", "")
|
data.setdefault("password_hash", "")
|
||||||
data.setdefault("inactivity_timeout", INACTIVITY_TIMEOUT)
|
data.setdefault("inactivity_timeout", INACTIVITY_TIMEOUT)
|
||||||
|
@@ -16,6 +16,7 @@ except Exception: # pragma: no cover - fallback for environments without orjson
|
|||||||
import hashlib
|
import hashlib
|
||||||
import os
|
import os
|
||||||
import base64
|
import base64
|
||||||
|
import zlib
|
||||||
from dataclasses import asdict
|
from dataclasses import asdict
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Optional, Tuple
|
from typing import Optional, Tuple
|
||||||
@@ -91,16 +92,23 @@ class EncryptionManager:
|
|||||||
# Track user preference for handling legacy indexes
|
# Track user preference for handling legacy indexes
|
||||||
self._legacy_migrate_flag = True
|
self._legacy_migrate_flag = True
|
||||||
self.last_migration_performed = False
|
self.last_migration_performed = False
|
||||||
|
# Track nonces to detect accidental reuse
|
||||||
|
self.nonce_crc_table: set[int] = set()
|
||||||
|
|
||||||
def encrypt_data(self, data: bytes) -> bytes:
|
def encrypt_data(self, data: bytes) -> bytes:
|
||||||
"""
|
"""
|
||||||
(2) Encrypts data using the NEW AES-GCM format, prepending a version
|
Encrypt data using AES-GCM, emitting ``b"V3|" + nonce + ciphertext + tag``.
|
||||||
header and the nonce. All new data will be in this format.
|
A fresh 96-bit nonce is generated for each call and tracked via a CRC
|
||||||
|
table to detect accidental reuse during batch operations.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
nonce = os.urandom(12) # 96-bit nonce is recommended for AES-GCM
|
nonce = os.urandom(12) # 96-bit nonce is recommended for AES-GCM
|
||||||
|
crc = zlib.crc32(nonce)
|
||||||
|
if crc in self.nonce_crc_table:
|
||||||
|
raise ValueError("Nonce reuse detected")
|
||||||
|
self.nonce_crc_table.add(crc)
|
||||||
ciphertext = self.cipher.encrypt(nonce, data, None)
|
ciphertext = self.cipher.encrypt(nonce, data, None)
|
||||||
return b"V2:" + nonce + ciphertext
|
return b"V3|" + nonce + ciphertext
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to encrypt data: {e}", exc_info=True)
|
logger.error(f"Failed to encrypt data: {e}", exc_info=True)
|
||||||
raise
|
raise
|
||||||
@@ -122,7 +130,21 @@ class EncryptionManager:
|
|||||||
ctx = f" {context}" if context else ""
|
ctx = f" {context}" if context else ""
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Try the new V2 format first
|
# Try the new V3 format first
|
||||||
|
if encrypted_data.startswith(b"V3|"):
|
||||||
|
try:
|
||||||
|
nonce = encrypted_data[3:15]
|
||||||
|
ciphertext = encrypted_data[15:]
|
||||||
|
if len(ciphertext) < 16:
|
||||||
|
logger.error("AES-GCM payload too short")
|
||||||
|
raise InvalidToken("AES-GCM payload too short")
|
||||||
|
return self.cipher.decrypt(nonce, ciphertext, None)
|
||||||
|
except InvalidTag as e:
|
||||||
|
msg = f"Failed to decrypt{ctx}: invalid key or corrupt file"
|
||||||
|
logger.error(msg)
|
||||||
|
raise InvalidToken(msg) from e
|
||||||
|
|
||||||
|
# Next try the older V2 format
|
||||||
if encrypted_data.startswith(b"V2:"):
|
if encrypted_data.startswith(b"V2:"):
|
||||||
try:
|
try:
|
||||||
nonce = encrypted_data[3:15]
|
nonce = encrypted_data[3:15]
|
||||||
@@ -146,19 +168,18 @@ class EncryptionManager:
|
|||||||
logger.error(msg)
|
logger.error(msg)
|
||||||
raise InvalidToken(msg) from e
|
raise InvalidToken(msg) from e
|
||||||
|
|
||||||
# If it's not V2, it must be the legacy Fernet format
|
# If it's neither V3 nor V2, assume legacy Fernet format
|
||||||
else:
|
logger.warning("Data is in legacy Fernet format. Attempting migration.")
|
||||||
logger.warning("Data is in legacy Fernet format. Attempting migration.")
|
try:
|
||||||
try:
|
return self.fernet.decrypt(encrypted_data)
|
||||||
return self.fernet.decrypt(encrypted_data)
|
except InvalidToken as e:
|
||||||
except InvalidToken as e:
|
logger.error(
|
||||||
logger.error(
|
"Legacy Fernet decryption failed. Vault may be corrupt or key is incorrect."
|
||||||
"Legacy Fernet decryption failed. Vault may be corrupt or key is incorrect."
|
)
|
||||||
)
|
raise e
|
||||||
raise e
|
|
||||||
|
|
||||||
except (InvalidToken, InvalidTag) as e:
|
except (InvalidToken, InvalidTag) as e:
|
||||||
if encrypted_data.startswith(b"V2:"):
|
if encrypted_data.startswith(b"V3|") or encrypted_data.startswith(b"V2:"):
|
||||||
# Already determined not to be legacy; re-raise
|
# Already determined not to be legacy; re-raise
|
||||||
raise
|
raise
|
||||||
if isinstance(e, InvalidToken) and str(e) == "AES-GCM payload too short":
|
if isinstance(e, InvalidToken) and str(e) == "AES-GCM payload too short":
|
||||||
@@ -248,11 +269,13 @@ class EncryptionManager:
|
|||||||
blob = fh.read()
|
blob = fh.read()
|
||||||
|
|
||||||
kdf, encrypted_data = self._deserialize(blob)
|
kdf, encrypted_data = self._deserialize(blob)
|
||||||
is_legacy = not encrypted_data.startswith(b"V2:")
|
is_legacy = not (
|
||||||
|
encrypted_data.startswith(b"V3|") or encrypted_data.startswith(b"V2:")
|
||||||
|
)
|
||||||
decrypted_data = self.decrypt_data(encrypted_data, context="seed")
|
decrypted_data = self.decrypt_data(encrypted_data, context="seed")
|
||||||
|
|
||||||
if is_legacy:
|
if is_legacy:
|
||||||
logger.info("Parent seed was in legacy format. Re-encrypting to V2 format.")
|
logger.info("Parent seed was in legacy format. Re-encrypting to V3 format.")
|
||||||
self.encrypt_parent_seed(decrypted_data.decode("utf-8").strip(), kdf=kdf)
|
self.encrypt_parent_seed(decrypted_data.decode("utf-8").strip(), kdf=kdf)
|
||||||
|
|
||||||
return decrypted_data.decode("utf-8").strip()
|
return decrypted_data.decode("utf-8").strip()
|
||||||
@@ -362,7 +385,9 @@ class EncryptionManager:
|
|||||||
blob = fh.read()
|
blob = fh.read()
|
||||||
|
|
||||||
kdf, encrypted_data = self._deserialize(blob)
|
kdf, encrypted_data = self._deserialize(blob)
|
||||||
is_legacy = not encrypted_data.startswith(b"V2:")
|
is_legacy = not (
|
||||||
|
encrypted_data.startswith(b"V3|") or encrypted_data.startswith(b"V2:")
|
||||||
|
)
|
||||||
self.last_migration_performed = False
|
self.last_migration_performed = False
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -384,11 +409,13 @@ class EncryptionManager:
|
|||||||
if return_kdf:
|
if return_kdf:
|
||||||
return data, kdf
|
return data, kdf
|
||||||
return data
|
return data
|
||||||
except (InvalidToken, InvalidTag, JSONDecodeError) as e:
|
except (InvalidToken, InvalidTag) as e:
|
||||||
logger.error(
|
msg = f"Failed to decrypt or parse data from {file_path}: {e}"
|
||||||
f"FATAL: Could not decrypt or parse data from {file_path}: {e}",
|
logger.error(msg)
|
||||||
exc_info=True,
|
raise InvalidToken(msg) from e
|
||||||
)
|
except JSONDecodeError as e:
|
||||||
|
msg = f"Failed to parse JSON data from {file_path}: {e}"
|
||||||
|
logger.error(msg)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def get_encrypted_index(self) -> Optional[bytes]:
|
def get_encrypted_index(self) -> Optional[bytes]:
|
||||||
@@ -424,7 +451,7 @@ class EncryptionManager:
|
|||||||
relative_path = Path("seedpass_entries_db.json.enc")
|
relative_path = Path("seedpass_entries_db.json.enc")
|
||||||
|
|
||||||
kdf, ciphertext = self._deserialize(encrypted_data)
|
kdf, ciphertext = self._deserialize(encrypted_data)
|
||||||
is_legacy = not ciphertext.startswith(b"V2:")
|
is_legacy = not (ciphertext.startswith(b"V3|") or ciphertext.startswith(b"V2:"))
|
||||||
self.last_migration_performed = False
|
self.last_migration_performed = False
|
||||||
|
|
||||||
def _process(decrypted: bytes) -> dict:
|
def _process(decrypted: bytes) -> dict:
|
||||||
|
@@ -34,7 +34,7 @@ from pathlib import Path
|
|||||||
from termcolor import colored
|
from termcolor import colored
|
||||||
from .migrations import LATEST_VERSION
|
from .migrations import LATEST_VERSION
|
||||||
from .entry_types import EntryType, ALL_ENTRY_TYPES
|
from .entry_types import EntryType, ALL_ENTRY_TYPES
|
||||||
from .totp import TotpManager
|
from .totp import TotpManager, random_totp_secret
|
||||||
from utils.fingerprint import generate_fingerprint
|
from utils.fingerprint import generate_fingerprint
|
||||||
from utils.checksum import canonical_json_dumps
|
from utils.checksum import canonical_json_dumps
|
||||||
from utils.atomic_write import atomic_write
|
from utils.atomic_write import atomic_write
|
||||||
@@ -257,7 +257,7 @@ class EntryManager:
|
|||||||
def add_totp(
|
def add_totp(
|
||||||
self,
|
self,
|
||||||
label: str,
|
label: str,
|
||||||
parent_seed: str | bytes,
|
parent_seed: str | bytes | None = None,
|
||||||
*,
|
*,
|
||||||
archived: bool = False,
|
archived: bool = False,
|
||||||
secret: str | None = None,
|
secret: str | None = None,
|
||||||
@@ -266,13 +266,16 @@ class EntryManager:
|
|||||||
digits: int = 6,
|
digits: int = 6,
|
||||||
notes: str = "",
|
notes: str = "",
|
||||||
tags: list[str] | None = None,
|
tags: list[str] | None = None,
|
||||||
|
deterministic: bool = False,
|
||||||
) -> str:
|
) -> str:
|
||||||
"""Add a new TOTP entry and return the provisioning URI."""
|
"""Add a new TOTP entry and return the provisioning URI."""
|
||||||
entry_id = self.get_next_index()
|
entry_id = self.get_next_index()
|
||||||
data = self._load_index()
|
data = self._load_index()
|
||||||
data.setdefault("entries", {})
|
data.setdefault("entries", {})
|
||||||
|
|
||||||
if secret is None:
|
if deterministic:
|
||||||
|
if parent_seed is None:
|
||||||
|
raise ValueError("Seed required for deterministic TOTP")
|
||||||
if index is None:
|
if index is None:
|
||||||
index = self.get_next_totp_index()
|
index = self.get_next_totp_index()
|
||||||
secret = TotpManager.derive_secret(parent_seed, index)
|
secret = TotpManager.derive_secret(parent_seed, index)
|
||||||
@@ -289,8 +292,11 @@ class EntryManager:
|
|||||||
"archived": archived,
|
"archived": archived,
|
||||||
"notes": notes,
|
"notes": notes,
|
||||||
"tags": tags or [],
|
"tags": tags or [],
|
||||||
|
"deterministic": True,
|
||||||
}
|
}
|
||||||
else:
|
else:
|
||||||
|
if secret is None:
|
||||||
|
secret = random_totp_secret()
|
||||||
if not validate_totp_secret(secret):
|
if not validate_totp_secret(secret):
|
||||||
raise ValueError("Invalid TOTP secret")
|
raise ValueError("Invalid TOTP secret")
|
||||||
entry = {
|
entry = {
|
||||||
@@ -304,6 +310,7 @@ class EntryManager:
|
|||||||
"archived": archived,
|
"archived": archived,
|
||||||
"notes": notes,
|
"notes": notes,
|
||||||
"tags": tags or [],
|
"tags": tags or [],
|
||||||
|
"deterministic": False,
|
||||||
}
|
}
|
||||||
|
|
||||||
data["entries"][str(entry_id)] = entry
|
data["entries"][str(entry_id)] = entry
|
||||||
@@ -702,12 +709,12 @@ class EntryManager:
|
|||||||
etype != EntryType.TOTP.value and kind != EntryType.TOTP.value
|
etype != EntryType.TOTP.value and kind != EntryType.TOTP.value
|
||||||
):
|
):
|
||||||
raise ValueError("Entry is not a TOTP entry")
|
raise ValueError("Entry is not a TOTP entry")
|
||||||
if "secret" in entry:
|
if entry.get("deterministic", False) or "secret" not in entry:
|
||||||
return TotpManager.current_code_from_secret(entry["secret"], timestamp)
|
if parent_seed is None:
|
||||||
if parent_seed is None:
|
raise ValueError("Seed required for derived TOTP")
|
||||||
raise ValueError("Seed required for derived TOTP")
|
totp_index = int(entry.get("index", 0))
|
||||||
totp_index = int(entry.get("index", 0))
|
return TotpManager.current_code(parent_seed, totp_index, timestamp)
|
||||||
return TotpManager.current_code(parent_seed, totp_index, timestamp)
|
return TotpManager.current_code_from_secret(entry["secret"], timestamp)
|
||||||
|
|
||||||
def get_totp_time_remaining(self, index: int) -> int:
|
def get_totp_time_remaining(self, index: int) -> int:
|
||||||
"""Return seconds remaining in the TOTP period for the given entry."""
|
"""Return seconds remaining in the TOTP period for the given entry."""
|
||||||
@@ -723,7 +730,7 @@ class EntryManager:
|
|||||||
return TotpManager.time_remaining(period)
|
return TotpManager.time_remaining(period)
|
||||||
|
|
||||||
def export_totp_entries(
|
def export_totp_entries(
|
||||||
self, parent_seed: str | bytes
|
self, parent_seed: str | bytes | None
|
||||||
) -> dict[str, list[dict[str, Any]]]:
|
) -> dict[str, list[dict[str, Any]]]:
|
||||||
"""Return all TOTP secrets and metadata for external use."""
|
"""Return all TOTP secrets and metadata for external use."""
|
||||||
data = self._load_index()
|
data = self._load_index()
|
||||||
@@ -736,11 +743,13 @@ class EntryManager:
|
|||||||
label = entry.get("label", "")
|
label = entry.get("label", "")
|
||||||
period = int(entry.get("period", 30))
|
period = int(entry.get("period", 30))
|
||||||
digits = int(entry.get("digits", 6))
|
digits = int(entry.get("digits", 6))
|
||||||
if "secret" in entry:
|
if entry.get("deterministic", False) or "secret" not in entry:
|
||||||
secret = entry["secret"]
|
if parent_seed is None:
|
||||||
else:
|
raise ValueError("Seed required for deterministic TOTP export")
|
||||||
idx = int(entry.get("index", 0))
|
idx = int(entry.get("index", 0))
|
||||||
secret = TotpManager.derive_secret(parent_seed, idx)
|
secret = TotpManager.derive_secret(parent_seed, idx)
|
||||||
|
else:
|
||||||
|
secret = entry["secret"]
|
||||||
uri = TotpManager.make_otpauth_uri(label, secret, period, digits)
|
uri = TotpManager.make_otpauth_uri(label, secret, period, digits)
|
||||||
exported.append(
|
exported.append(
|
||||||
{
|
{
|
||||||
|
@@ -36,7 +36,7 @@ from .entry_management import EntryManager
|
|||||||
from .password_generation import PasswordGenerator
|
from .password_generation import PasswordGenerator
|
||||||
from .backup import BackupManager
|
from .backup import BackupManager
|
||||||
from .vault import Vault
|
from .vault import Vault
|
||||||
from .portable_backup import export_backup, import_backup
|
from .portable_backup import export_backup, import_backup, PortableMode
|
||||||
from cryptography.fernet import InvalidToken
|
from cryptography.fernet import InvalidToken
|
||||||
from .totp import TotpManager
|
from .totp import TotpManager
|
||||||
from .entry_types import EntryType
|
from .entry_types import EntryType
|
||||||
@@ -106,7 +106,6 @@ from utils.fingerprint_manager import FingerprintManager
|
|||||||
# Import NostrClient
|
# Import NostrClient
|
||||||
from nostr.client import NostrClient
|
from nostr.client import NostrClient
|
||||||
from nostr.connection import DEFAULT_RELAYS
|
from nostr.connection import DEFAULT_RELAYS
|
||||||
from nostr.snapshot import MANIFEST_ID_PREFIX
|
|
||||||
from .config_manager import ConfigManager
|
from .config_manager import ConfigManager
|
||||||
from .state_manager import StateManager
|
from .state_manager import StateManager
|
||||||
from .stats_manager import StatsManager
|
from .stats_manager import StatsManager
|
||||||
@@ -239,6 +238,7 @@ class PasswordManager:
|
|||||||
KEY_INDEX: bytes | None = None
|
KEY_INDEX: bytes | None = None
|
||||||
KEY_PW_DERIVE: bytes | None = None
|
KEY_PW_DERIVE: bytes | None = None
|
||||||
KEY_TOTP_DET: bytes | None = None
|
KEY_TOTP_DET: bytes | None = None
|
||||||
|
deterministic_totp: bool = False
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self, fingerprint: Optional[str] = None, *, password: Optional[str] = None
|
self, fingerprint: Optional[str] = None, *, password: Optional[str] = None
|
||||||
@@ -287,14 +287,16 @@ class PasswordManager:
|
|||||||
self.is_locked: bool = False
|
self.is_locked: bool = False
|
||||||
self.inactivity_timeout: float = INACTIVITY_TIMEOUT
|
self.inactivity_timeout: float = INACTIVITY_TIMEOUT
|
||||||
self.secret_mode_enabled: bool = False
|
self.secret_mode_enabled: bool = False
|
||||||
|
self.deterministic_totp: bool = False
|
||||||
self.clipboard_clear_delay: int = 45
|
self.clipboard_clear_delay: int = 45
|
||||||
self.offline_mode: bool = False
|
self.offline_mode: bool = True
|
||||||
self.profile_stack: list[tuple[str, Path, str]] = []
|
self.profile_stack: list[tuple[str, Path, str]] = []
|
||||||
self.last_unlock_duration: float | None = None
|
self.last_unlock_duration: float | None = None
|
||||||
self.verbose_timing: bool = False
|
self.verbose_timing: bool = False
|
||||||
self._suppress_entry_actions_menu: bool = False
|
self._suppress_entry_actions_menu: bool = False
|
||||||
self.last_bip85_idx: int = 0
|
self.last_bip85_idx: int = 0
|
||||||
self.last_sync_ts: int = 0
|
self.last_sync_ts: int = 0
|
||||||
|
self.nostr_account_idx: int = 0
|
||||||
self.auth_guard = AuthGuard(self)
|
self.auth_guard = AuthGuard(self)
|
||||||
|
|
||||||
# Service composition
|
# Service composition
|
||||||
@@ -622,9 +624,11 @@ class PasswordManager:
|
|||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
choice = input(
|
choice = input(
|
||||||
"Do you want to (1) Paste in an existing seed in full "
|
"Choose an option:\n"
|
||||||
"(2) Enter an existing seed one word at a time or "
|
"1. Paste in an existing seed in full\n"
|
||||||
"(3) Generate a new seed? (1/2/3): "
|
"2. Enter an existing seed one word at a time\n"
|
||||||
|
"3. Generate a new seed\n"
|
||||||
|
"Enter choice (1/2/3): "
|
||||||
).strip()
|
).strip()
|
||||||
if choice == "1":
|
if choice == "1":
|
||||||
fingerprint = self.setup_existing_seed(method="paste")
|
fingerprint = self.setup_existing_seed(method="paste")
|
||||||
@@ -979,9 +983,12 @@ class PasswordManager:
|
|||||||
self.notify("No existing seed found. Let's set up a new one!", level="WARNING")
|
self.notify("No existing seed found. Let's set up a new one!", level="WARNING")
|
||||||
|
|
||||||
choice = input(
|
choice = input(
|
||||||
"Do you want to (1) Paste in an existing seed in full "
|
"Choose an option:\n"
|
||||||
"(2) Enter an existing seed one word at a time, "
|
"1. Paste in an existing seed in full\n"
|
||||||
"(3) Generate a new seed, or (4) Restore from Nostr? (1/2/3/4): "
|
"2. Enter an existing seed one word at a time\n"
|
||||||
|
"3. Generate a new seed\n"
|
||||||
|
"4. Restore from Nostr\n"
|
||||||
|
"Enter choice (1/2/3/4): "
|
||||||
).strip()
|
).strip()
|
||||||
|
|
||||||
if choice == "1":
|
if choice == "1":
|
||||||
@@ -1087,7 +1094,7 @@ class PasswordManager:
|
|||||||
self.fingerprint_manager.current_fingerprint = fingerprint
|
self.fingerprint_manager.current_fingerprint = fingerprint
|
||||||
self.fingerprint_dir = fingerprint_dir
|
self.fingerprint_dir = fingerprint_dir
|
||||||
if not getattr(self, "manifest_id", None):
|
if not getattr(self, "manifest_id", None):
|
||||||
self.manifest_id = f"{MANIFEST_ID_PREFIX}{fingerprint}"
|
self.manifest_id = None
|
||||||
logging.info(f"Current seed profile set to {fingerprint}")
|
logging.info(f"Current seed profile set to {fingerprint}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -1151,6 +1158,14 @@ class PasswordManager:
|
|||||||
print(colored("Please write this down and keep it in a safe place!", "red"))
|
print(colored("Please write this down and keep it in a safe place!", "red"))
|
||||||
|
|
||||||
if confirm_action("Do you want to use this generated seed? (Y/N): "):
|
if confirm_action("Do you want to use this generated seed? (Y/N): "):
|
||||||
|
# Determine next account index if state manager is available
|
||||||
|
next_idx = 0
|
||||||
|
if getattr(self, "state_manager", None) is not None:
|
||||||
|
try:
|
||||||
|
next_idx = self.state_manager.state.get("nostr_account_idx", 0) + 1
|
||||||
|
except Exception:
|
||||||
|
next_idx = 0
|
||||||
|
|
||||||
# Add a new fingerprint using the generated seed
|
# Add a new fingerprint using the generated seed
|
||||||
try:
|
try:
|
||||||
fingerprint = self.fingerprint_manager.add_fingerprint(new_seed)
|
fingerprint = self.fingerprint_manager.add_fingerprint(new_seed)
|
||||||
@@ -1183,6 +1198,15 @@ class PasswordManager:
|
|||||||
)
|
)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
|
# Persist the assigned account index for the new profile
|
||||||
|
try:
|
||||||
|
StateManager(fingerprint_dir).update_state(nostr_account_idx=next_idx)
|
||||||
|
if getattr(self, "state_manager", None) is not None:
|
||||||
|
self.state_manager.update_state(nostr_account_idx=next_idx)
|
||||||
|
self.nostr_account_idx = next_idx
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
# Set the current fingerprint in both PasswordManager and FingerprintManager
|
# Set the current fingerprint in both PasswordManager and FingerprintManager
|
||||||
self.current_fingerprint = fingerprint
|
self.current_fingerprint = fingerprint
|
||||||
self.fingerprint_manager.current_fingerprint = fingerprint
|
self.fingerprint_manager.current_fingerprint = fingerprint
|
||||||
@@ -1407,13 +1431,15 @@ class PasswordManager:
|
|||||||
self.last_sync_ts = state.get("last_sync_ts", 0)
|
self.last_sync_ts = state.get("last_sync_ts", 0)
|
||||||
self.manifest_id = state.get("manifest_id")
|
self.manifest_id = state.get("manifest_id")
|
||||||
self.delta_since = state.get("delta_since", 0)
|
self.delta_since = state.get("delta_since", 0)
|
||||||
|
self.nostr_account_idx = state.get("nostr_account_idx", 0)
|
||||||
else:
|
else:
|
||||||
relay_list = list(DEFAULT_RELAYS)
|
relay_list = list(DEFAULT_RELAYS)
|
||||||
self.last_bip85_idx = 0
|
self.last_bip85_idx = 0
|
||||||
self.last_sync_ts = 0
|
self.last_sync_ts = 0
|
||||||
self.manifest_id = None
|
self.manifest_id = None
|
||||||
self.delta_since = 0
|
self.delta_since = 0
|
||||||
self.offline_mode = bool(config.get("offline_mode", False))
|
self.nostr_account_idx = 0
|
||||||
|
self.offline_mode = bool(config.get("offline_mode", True))
|
||||||
self.inactivity_timeout = config.get(
|
self.inactivity_timeout = config.get(
|
||||||
"inactivity_timeout", INACTIVITY_TIMEOUT
|
"inactivity_timeout", INACTIVITY_TIMEOUT
|
||||||
)
|
)
|
||||||
@@ -1429,6 +1455,8 @@ class PasswordManager:
|
|||||||
offline_mode=self.offline_mode,
|
offline_mode=self.offline_mode,
|
||||||
config_manager=self.config_manager,
|
config_manager=self.config_manager,
|
||||||
parent_seed=getattr(self, "parent_seed", None),
|
parent_seed=getattr(self, "parent_seed", None),
|
||||||
|
key_index=self.KEY_INDEX,
|
||||||
|
account_index=self.nostr_account_idx,
|
||||||
)
|
)
|
||||||
|
|
||||||
if getattr(self, "manifest_id", None) and hasattr(
|
if getattr(self, "manifest_id", None) and hasattr(
|
||||||
@@ -1852,7 +1880,7 @@ class PasswordManager:
|
|||||||
child_fingerprint=child_fp,
|
child_fingerprint=child_fp,
|
||||||
)
|
)
|
||||||
print("\nAdd TOTP:")
|
print("\nAdd TOTP:")
|
||||||
print("1. Make 2FA (derive from seed)")
|
print("1. Make 2FA")
|
||||||
print("2. Import 2FA (paste otpauth URI or secret)")
|
print("2. Import 2FA (paste otpauth URI or secret)")
|
||||||
choice = input("Select option or press Enter to go back: ").strip()
|
choice = input("Select option or press Enter to go back: ").strip()
|
||||||
if choice == "1":
|
if choice == "1":
|
||||||
@@ -1876,9 +1904,13 @@ class PasswordManager:
|
|||||||
if tags_input
|
if tags_input
|
||||||
else []
|
else []
|
||||||
)
|
)
|
||||||
totp_index = self.entry_manager.get_next_totp_index()
|
|
||||||
entry_id = self.entry_manager.get_next_index()
|
entry_id = self.entry_manager.get_next_index()
|
||||||
key = self.KEY_TOTP_DET or getattr(self, "parent_seed", None)
|
key = self.KEY_TOTP_DET if self.deterministic_totp else None
|
||||||
|
totp_index = (
|
||||||
|
self.entry_manager.get_next_totp_index()
|
||||||
|
if self.deterministic_totp
|
||||||
|
else None
|
||||||
|
)
|
||||||
uri = self.entry_manager.add_totp(
|
uri = self.entry_manager.add_totp(
|
||||||
label,
|
label,
|
||||||
key,
|
key,
|
||||||
@@ -1887,8 +1919,14 @@ class PasswordManager:
|
|||||||
digits=int(digits),
|
digits=int(digits),
|
||||||
notes=notes,
|
notes=notes,
|
||||||
tags=tags,
|
tags=tags,
|
||||||
|
deterministic=self.deterministic_totp,
|
||||||
)
|
)
|
||||||
secret = TotpManager.derive_secret(key, totp_index)
|
if self.deterministic_totp:
|
||||||
|
secret = TotpManager.derive_secret(key, totp_index or 0)
|
||||||
|
color_cat = "deterministic"
|
||||||
|
else:
|
||||||
|
_lbl, secret, _, _ = TotpManager.parse_otpauth(uri)
|
||||||
|
color_cat = "default"
|
||||||
self.is_dirty = True
|
self.is_dirty = True
|
||||||
self.last_update = time.time()
|
self.last_update = time.time()
|
||||||
print(
|
print(
|
||||||
@@ -1899,7 +1937,7 @@ class PasswordManager:
|
|||||||
print(colored("Add this URI to your authenticator app:", "cyan"))
|
print(colored("Add this URI to your authenticator app:", "cyan"))
|
||||||
print(colored(uri, "yellow"))
|
print(colored(uri, "yellow"))
|
||||||
TotpManager.print_qr_code(uri)
|
TotpManager.print_qr_code(uri)
|
||||||
print(color_text(f"Secret: {secret}\n", "deterministic"))
|
print(color_text(f"Secret: {secret}\n", color_cat))
|
||||||
try:
|
try:
|
||||||
self.start_background_vault_sync()
|
self.start_background_vault_sync()
|
||||||
except Exception as nostr_error:
|
except Exception as nostr_error:
|
||||||
@@ -1931,15 +1969,15 @@ class PasswordManager:
|
|||||||
else []
|
else []
|
||||||
)
|
)
|
||||||
entry_id = self.entry_manager.get_next_index()
|
entry_id = self.entry_manager.get_next_index()
|
||||||
key = self.KEY_TOTP_DET or getattr(self, "parent_seed", None)
|
|
||||||
uri = self.entry_manager.add_totp(
|
uri = self.entry_manager.add_totp(
|
||||||
label,
|
label,
|
||||||
key,
|
None,
|
||||||
secret=secret,
|
secret=secret,
|
||||||
period=period,
|
period=period,
|
||||||
digits=digits,
|
digits=digits,
|
||||||
notes=notes,
|
notes=notes,
|
||||||
tags=tags,
|
tags=tags,
|
||||||
|
deterministic=False,
|
||||||
)
|
)
|
||||||
self.is_dirty = True
|
self.is_dirty = True
|
||||||
self.last_update = time.time()
|
self.last_update = time.time()
|
||||||
@@ -4090,8 +4128,15 @@ class PasswordManager:
|
|||||||
def handle_export_database(
|
def handle_export_database(
|
||||||
self,
|
self,
|
||||||
dest: Path | None = None,
|
dest: Path | None = None,
|
||||||
|
*,
|
||||||
|
encrypt: bool | None = None,
|
||||||
) -> Path | None:
|
) -> Path | None:
|
||||||
"""Export the current database to an encrypted portable file."""
|
"""Export the current database to a portable file.
|
||||||
|
|
||||||
|
If ``encrypt`` is ``True`` (default) the payload is encrypted. When
|
||||||
|
``encrypt`` is ``False`` the export contains plaintext data. When
|
||||||
|
``encrypt`` is ``None`` the user is prompted interactively.
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
fp, parent_fp, child_fp = self.header_fingerprint_args
|
fp, parent_fp, child_fp = self.header_fingerprint_args
|
||||||
clear_header_with_notification(
|
clear_header_with_notification(
|
||||||
@@ -4101,11 +4146,16 @@ class PasswordManager:
|
|||||||
parent_fingerprint=parent_fp,
|
parent_fingerprint=parent_fp,
|
||||||
child_fingerprint=child_fp,
|
child_fingerprint=child_fp,
|
||||||
)
|
)
|
||||||
|
if encrypt is None:
|
||||||
|
encrypt = not confirm_action(
|
||||||
|
"Export database without encryption? (Y/N): "
|
||||||
|
)
|
||||||
path = export_backup(
|
path = export_backup(
|
||||||
self.vault,
|
self.vault,
|
||||||
self.backup_manager,
|
self.backup_manager,
|
||||||
dest,
|
dest,
|
||||||
parent_seed=self.parent_seed,
|
parent_seed=self.parent_seed,
|
||||||
|
encrypt=encrypt,
|
||||||
)
|
)
|
||||||
print(colored(f"Database exported to '{path}'.", "green"))
|
print(colored(f"Database exported to '{path}'.", "green"))
|
||||||
audit_logger = getattr(self, "audit_logger", None)
|
audit_logger = getattr(self, "audit_logger", None)
|
||||||
@@ -4120,15 +4170,26 @@ class PasswordManager:
|
|||||||
def handle_import_database(self, src: Path) -> None:
|
def handle_import_database(self, src: Path) -> None:
|
||||||
"""Import a portable database file, replacing the current index."""
|
"""Import a portable database file, replacing the current index."""
|
||||||
|
|
||||||
if not src.name.endswith(".json.enc"):
|
if not (src.name.endswith(".json.enc") or src.name.endswith(".json")):
|
||||||
print(
|
print(
|
||||||
colored(
|
colored(
|
||||||
"Error: Selected file must be a SeedPass database backup (.json.enc).",
|
"Error: Selected file must be a SeedPass database backup (.json or .json.enc).",
|
||||||
"red",
|
"red",
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# Determine encryption mode for post-processing
|
||||||
|
mode = None
|
||||||
|
try:
|
||||||
|
raw = src.read_bytes()
|
||||||
|
if src.suffix.endswith(".enc"):
|
||||||
|
raw = self.vault.encryption_manager.decrypt_data(raw, context=str(src))
|
||||||
|
wrapper = json.loads(raw.decode("utf-8"))
|
||||||
|
mode = wrapper.get("encryption_mode")
|
||||||
|
except Exception:
|
||||||
|
mode = None
|
||||||
|
|
||||||
fp, parent_fp, child_fp = self.header_fingerprint_args
|
fp, parent_fp, child_fp = self.header_fingerprint_args
|
||||||
clear_header_with_notification(
|
clear_header_with_notification(
|
||||||
self,
|
self,
|
||||||
@@ -4168,6 +4229,23 @@ class PasswordManager:
|
|||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
if mode == PortableMode.NONE.value:
|
||||||
|
try:
|
||||||
|
password = prompt_new_password()
|
||||||
|
iterations = self.config_manager.get_kdf_iterations()
|
||||||
|
seed_key = derive_key_from_password(
|
||||||
|
password, self.current_fingerprint, iterations=iterations
|
||||||
|
)
|
||||||
|
seed_mgr = EncryptionManager(seed_key, self.fingerprint_dir)
|
||||||
|
seed_mgr.encrypt_parent_seed(self.parent_seed)
|
||||||
|
self.store_hashed_password(password)
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(
|
||||||
|
f"Failed to set master password after import: {e}", exc_info=True
|
||||||
|
)
|
||||||
|
print(colored(f"Error: Failed to set master password: {e}", "red"))
|
||||||
|
return
|
||||||
|
|
||||||
print(colored("Database imported successfully.", "green"))
|
print(colored("Database imported successfully.", "green"))
|
||||||
self.sync_vault()
|
self.sync_vault()
|
||||||
|
|
||||||
@@ -4368,6 +4446,15 @@ class PasswordManager:
|
|||||||
else:
|
else:
|
||||||
logging.warning("Password verification failed.")
|
logging.warning("Password verification failed.")
|
||||||
return is_correct
|
return is_correct
|
||||||
|
except InvalidToken as e:
|
||||||
|
logging.error(f"Failed to decrypt config: {e}")
|
||||||
|
print(
|
||||||
|
colored(
|
||||||
|
"Error: Could not decrypt configuration. The password may be incorrect or the file may be corrupted.",
|
||||||
|
"red",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return False
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"Error verifying password: {e}", exc_info=True)
|
logging.error(f"Error verifying password: {e}", exc_info=True)
|
||||||
print(colored(f"Error: Failed to verify password: {e}", "red"))
|
print(colored(f"Error: Failed to verify password: {e}", "red"))
|
||||||
@@ -4475,6 +4562,8 @@ class PasswordManager:
|
|||||||
relays=relay_list,
|
relays=relay_list,
|
||||||
config_manager=self.config_manager,
|
config_manager=self.config_manager,
|
||||||
parent_seed=getattr(self, "parent_seed", None),
|
parent_seed=getattr(self, "parent_seed", None),
|
||||||
|
key_index=self.KEY_INDEX,
|
||||||
|
account_index=self.nostr_account_idx,
|
||||||
)
|
)
|
||||||
|
|
||||||
if getattr(self, "manifest_id", None) and hasattr(
|
if getattr(self, "manifest_id", None) and hasattr(
|
||||||
|
@@ -21,6 +21,7 @@ from utils.key_derivation import (
|
|||||||
)
|
)
|
||||||
from .encryption import EncryptionManager
|
from .encryption import EncryptionManager
|
||||||
from utils.checksum import json_checksum, canonical_json_dumps
|
from utils.checksum import json_checksum, canonical_json_dumps
|
||||||
|
from .state_manager import StateManager
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -32,6 +33,7 @@ class PortableMode(Enum):
|
|||||||
"""Encryption mode for portable exports."""
|
"""Encryption mode for portable exports."""
|
||||||
|
|
||||||
SEED_ONLY = EncryptionMode.SEED_ONLY.value
|
SEED_ONLY = EncryptionMode.SEED_ONLY.value
|
||||||
|
NONE = "none"
|
||||||
|
|
||||||
|
|
||||||
def _derive_export_key(seed: str) -> bytes:
|
def _derive_export_key(seed: str) -> bytes:
|
||||||
@@ -47,8 +49,15 @@ def export_backup(
|
|||||||
*,
|
*,
|
||||||
publish: bool = False,
|
publish: bool = False,
|
||||||
parent_seed: str | None = None,
|
parent_seed: str | None = None,
|
||||||
|
encrypt: bool = True,
|
||||||
) -> Path:
|
) -> Path:
|
||||||
"""Export the current vault state to a portable encrypted file."""
|
"""Export the current vault state to a portable file.
|
||||||
|
|
||||||
|
When ``encrypt`` is ``True`` (the default) the payload is encrypted with a
|
||||||
|
key derived from the parent seed. When ``encrypt`` is ``False`` the payload
|
||||||
|
is written in plaintext and the wrapper records an ``encryption_mode`` of
|
||||||
|
:data:`PortableMode.NONE`.
|
||||||
|
"""
|
||||||
|
|
||||||
if dest_path is None:
|
if dest_path is None:
|
||||||
ts = int(time.time())
|
ts = int(time.time())
|
||||||
@@ -57,24 +66,32 @@ def export_backup(
|
|||||||
dest_path = dest_dir / EXPORT_NAME_TEMPLATE.format(ts=ts)
|
dest_path = dest_dir / EXPORT_NAME_TEMPLATE.format(ts=ts)
|
||||||
|
|
||||||
index_data = vault.load_index()
|
index_data = vault.load_index()
|
||||||
seed = (
|
|
||||||
parent_seed
|
|
||||||
if parent_seed is not None
|
|
||||||
else vault.encryption_manager.decrypt_parent_seed()
|
|
||||||
)
|
|
||||||
key = _derive_export_key(seed)
|
|
||||||
enc_mgr = EncryptionManager(key, vault.fingerprint_dir)
|
|
||||||
|
|
||||||
canonical = canonical_json_dumps(index_data)
|
canonical = canonical_json_dumps(index_data)
|
||||||
payload_bytes = enc_mgr.encrypt_data(canonical.encode("utf-8"))
|
|
||||||
|
if encrypt:
|
||||||
|
seed = (
|
||||||
|
parent_seed
|
||||||
|
if parent_seed is not None
|
||||||
|
else vault.encryption_manager.decrypt_parent_seed()
|
||||||
|
)
|
||||||
|
key = _derive_export_key(seed)
|
||||||
|
enc_mgr = EncryptionManager(key, vault.fingerprint_dir)
|
||||||
|
payload_bytes = enc_mgr.encrypt_data(canonical.encode("utf-8"))
|
||||||
|
mode = PortableMode.SEED_ONLY
|
||||||
|
cipher = "aes-gcm"
|
||||||
|
else:
|
||||||
|
payload_bytes = canonical.encode("utf-8")
|
||||||
|
mode = PortableMode.NONE
|
||||||
|
cipher = "none"
|
||||||
|
|
||||||
checksum = json_checksum(index_data)
|
checksum = json_checksum(index_data)
|
||||||
|
|
||||||
wrapper = {
|
wrapper = {
|
||||||
"format_version": FORMAT_VERSION,
|
"format_version": FORMAT_VERSION,
|
||||||
"created_at": int(time.time()),
|
"created_at": int(time.time()),
|
||||||
"fingerprint": vault.fingerprint_dir.name,
|
"fingerprint": vault.fingerprint_dir.name,
|
||||||
"encryption_mode": PortableMode.SEED_ONLY.value,
|
"encryption_mode": mode.value,
|
||||||
"cipher": "aes-gcm",
|
"cipher": cipher,
|
||||||
"checksum": checksum,
|
"checksum": checksum,
|
||||||
"payload": base64.b64encode(payload_bytes).decode("utf-8"),
|
"payload": base64.b64encode(payload_bytes).decode("utf-8"),
|
||||||
}
|
}
|
||||||
@@ -90,10 +107,12 @@ def export_backup(
|
|||||||
enc_file.write_bytes(encrypted)
|
enc_file.write_bytes(encrypted)
|
||||||
os.chmod(enc_file, 0o600)
|
os.chmod(enc_file, 0o600)
|
||||||
try:
|
try:
|
||||||
|
idx = StateManager(vault.fingerprint_dir).state.get("nostr_account_idx", 0)
|
||||||
client = NostrClient(
|
client = NostrClient(
|
||||||
vault.encryption_manager,
|
vault.encryption_manager,
|
||||||
vault.fingerprint_dir.name,
|
vault.fingerprint_dir.name,
|
||||||
config_manager=backup_manager.config_manager,
|
config_manager=backup_manager.config_manager,
|
||||||
|
account_index=idx,
|
||||||
)
|
)
|
||||||
asyncio.run(client.publish_snapshot(encrypted))
|
asyncio.run(client.publish_snapshot(encrypted))
|
||||||
except Exception:
|
except Exception:
|
||||||
@@ -118,19 +137,24 @@ def import_backup(
|
|||||||
if wrapper.get("format_version") != FORMAT_VERSION:
|
if wrapper.get("format_version") != FORMAT_VERSION:
|
||||||
raise ValueError("Unsupported backup format")
|
raise ValueError("Unsupported backup format")
|
||||||
|
|
||||||
if wrapper.get("encryption_mode") != PortableMode.SEED_ONLY.value:
|
mode = wrapper.get("encryption_mode")
|
||||||
raise ValueError("Unsupported encryption mode")
|
|
||||||
payload = base64.b64decode(wrapper["payload"])
|
payload = base64.b64decode(wrapper["payload"])
|
||||||
|
|
||||||
seed = (
|
if mode == PortableMode.SEED_ONLY.value:
|
||||||
parent_seed
|
seed = (
|
||||||
if parent_seed is not None
|
parent_seed
|
||||||
else vault.encryption_manager.decrypt_parent_seed()
|
if parent_seed is not None
|
||||||
)
|
else vault.encryption_manager.decrypt_parent_seed()
|
||||||
key = _derive_export_key(seed)
|
)
|
||||||
enc_mgr = EncryptionManager(key, vault.fingerprint_dir)
|
key = _derive_export_key(seed)
|
||||||
enc_mgr._legacy_migrate_flag = False
|
enc_mgr = EncryptionManager(key, vault.fingerprint_dir)
|
||||||
index_bytes = enc_mgr.decrypt_data(payload, context="backup payload")
|
enc_mgr._legacy_migrate_flag = False
|
||||||
|
index_bytes = enc_mgr.decrypt_data(payload, context="backup payload")
|
||||||
|
elif mode == PortableMode.NONE.value:
|
||||||
|
index_bytes = payload
|
||||||
|
else:
|
||||||
|
raise ValueError("Unsupported encryption mode")
|
||||||
|
|
||||||
index = json.loads(index_bytes.decode("utf-8"))
|
index = json.loads(index_bytes.decode("utf-8"))
|
||||||
|
|
||||||
checksum = json_checksum(index)
|
checksum = json_checksum(index)
|
||||||
|
@@ -6,7 +6,6 @@ from typing import Optional, TYPE_CHECKING
|
|||||||
from termcolor import colored
|
from termcolor import colored
|
||||||
|
|
||||||
import seedpass.core.manager as manager_module
|
import seedpass.core.manager as manager_module
|
||||||
from nostr.snapshot import MANIFEST_ID_PREFIX
|
|
||||||
|
|
||||||
from utils.password_prompt import prompt_existing_password
|
from utils.password_prompt import prompt_existing_password
|
||||||
|
|
||||||
@@ -44,7 +43,7 @@ class ProfileService:
|
|||||||
pm.fingerprint_manager.current_fingerprint = selected_fingerprint
|
pm.fingerprint_manager.current_fingerprint = selected_fingerprint
|
||||||
pm.current_fingerprint = selected_fingerprint
|
pm.current_fingerprint = selected_fingerprint
|
||||||
if not getattr(pm, "manifest_id", None):
|
if not getattr(pm, "manifest_id", None):
|
||||||
pm.manifest_id = f"{MANIFEST_ID_PREFIX}{selected_fingerprint}"
|
pm.manifest_id = None
|
||||||
|
|
||||||
pm.fingerprint_dir = pm.fingerprint_manager.get_current_fingerprint_dir()
|
pm.fingerprint_dir = pm.fingerprint_manager.get_current_fingerprint_dir()
|
||||||
if not pm.fingerprint_dir:
|
if not pm.fingerprint_dir:
|
||||||
@@ -77,6 +76,8 @@ class ProfileService:
|
|||||||
fingerprint=pm.current_fingerprint,
|
fingerprint=pm.current_fingerprint,
|
||||||
config_manager=getattr(pm, "config_manager", None),
|
config_manager=getattr(pm, "config_manager", None),
|
||||||
parent_seed=getattr(pm, "parent_seed", None),
|
parent_seed=getattr(pm, "parent_seed", None),
|
||||||
|
key_index=pm.KEY_INDEX,
|
||||||
|
account_index=pm.nostr_account_idx,
|
||||||
)
|
)
|
||||||
if getattr(pm, "manifest_id", None) and hasattr(
|
if getattr(pm, "manifest_id", None) and hasattr(
|
||||||
pm.nostr_client, "_state_lock"
|
pm.nostr_client, "_state_lock"
|
||||||
|
@@ -26,6 +26,7 @@ class StateManager:
|
|||||||
"manifest_id": None,
|
"manifest_id": None,
|
||||||
"delta_since": 0,
|
"delta_since": 0,
|
||||||
"relays": list(DEFAULT_RELAYS),
|
"relays": list(DEFAULT_RELAYS),
|
||||||
|
"nostr_account_idx": 0,
|
||||||
}
|
}
|
||||||
with shared_lock(self.state_path) as fh:
|
with shared_lock(self.state_path) as fh:
|
||||||
fh.seek(0)
|
fh.seek(0)
|
||||||
@@ -37,6 +38,7 @@ class StateManager:
|
|||||||
"manifest_id": None,
|
"manifest_id": None,
|
||||||
"delta_since": 0,
|
"delta_since": 0,
|
||||||
"relays": list(DEFAULT_RELAYS),
|
"relays": list(DEFAULT_RELAYS),
|
||||||
|
"nostr_account_idx": 0,
|
||||||
}
|
}
|
||||||
try:
|
try:
|
||||||
obj = json.loads(data.decode())
|
obj = json.loads(data.decode())
|
||||||
@@ -47,6 +49,7 @@ class StateManager:
|
|||||||
obj.setdefault("manifest_id", None)
|
obj.setdefault("manifest_id", None)
|
||||||
obj.setdefault("delta_since", 0)
|
obj.setdefault("delta_since", 0)
|
||||||
obj.setdefault("relays", list(DEFAULT_RELAYS))
|
obj.setdefault("relays", list(DEFAULT_RELAYS))
|
||||||
|
obj.setdefault("nostr_account_idx", 0)
|
||||||
return obj
|
return obj
|
||||||
|
|
||||||
def _save(self, data: dict) -> None:
|
def _save(self, data: dict) -> None:
|
||||||
|
@@ -2,8 +2,10 @@
|
|||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import os
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
|
import base64
|
||||||
from typing import Union
|
from typing import Union
|
||||||
from urllib.parse import quote
|
from urllib.parse import quote
|
||||||
from urllib.parse import urlparse, parse_qs, unquote
|
from urllib.parse import urlparse, parse_qs, unquote
|
||||||
@@ -15,6 +17,11 @@ import pyotp
|
|||||||
from utils import key_derivation
|
from utils import key_derivation
|
||||||
|
|
||||||
|
|
||||||
|
def random_totp_secret(length: int = 20) -> str:
|
||||||
|
"""Return a random Base32 encoded TOTP secret."""
|
||||||
|
return base64.b32encode(os.urandom(length)).decode("ascii").rstrip("=")
|
||||||
|
|
||||||
|
|
||||||
class TotpManager:
|
class TotpManager:
|
||||||
"""Helper methods for TOTP secrets and codes."""
|
"""Helper methods for TOTP secrets and codes."""
|
||||||
|
|
||||||
|
@@ -4,7 +4,7 @@ import sys
|
|||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from httpx import ASGITransport, AsyncClient
|
from httpx import ASGITransport, AsyncClient
|
||||||
import hashlib
|
import bcrypt
|
||||||
|
|
||||||
sys.path.append(str(Path(__file__).resolve().parents[1]))
|
sys.path.append(str(Path(__file__).resolve().parents[1]))
|
||||||
|
|
||||||
@@ -54,7 +54,7 @@ async def client(monkeypatch):
|
|||||||
async def test_token_hashed(client):
|
async def test_token_hashed(client):
|
||||||
_, token = client
|
_, token = client
|
||||||
assert api.app.state.token_hash != token
|
assert api.app.state.token_hash != token
|
||||||
assert api.app.state.token_hash == hashlib.sha256(token.encode()).hexdigest()
|
assert bcrypt.checkpw(token.encode(), api.app.state.token_hash)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
|
@@ -53,6 +53,7 @@ async def test_create_and_modify_totp_entry(client):
|
|||||||
"digits": 8,
|
"digits": 8,
|
||||||
"notes": "n",
|
"notes": "n",
|
||||||
"archived": False,
|
"archived": False,
|
||||||
|
"deterministic": False,
|
||||||
}
|
}
|
||||||
|
|
||||||
res = await cl.put(
|
res = await cl.put(
|
||||||
@@ -377,7 +378,7 @@ async def test_vault_export_endpoint(client, tmp_path):
|
|||||||
out = tmp_path / "out.json"
|
out = tmp_path / "out.json"
|
||||||
out.write_text("data")
|
out.write_text("data")
|
||||||
|
|
||||||
api.app.state.pm.handle_export_database = lambda: out
|
api.app.state.pm.handle_export_database = lambda *a, **k: out
|
||||||
|
|
||||||
headers = {
|
headers = {
|
||||||
"Authorization": f"Bearer {token}",
|
"Authorization": f"Bearer {token}",
|
||||||
|
@@ -36,6 +36,7 @@ def test_audit_logger_records_events(monkeypatch, tmp_path):
|
|||||||
monkeypatch.setattr(manager_module, "export_backup", lambda *a, **k: dest)
|
monkeypatch.setattr(manager_module, "export_backup", lambda *a, **k: dest)
|
||||||
pm.vault = object()
|
pm.vault = object()
|
||||||
pm.backup_manager = object()
|
pm.backup_manager = object()
|
||||||
|
monkeypatch.setattr("seedpass.core.manager.confirm_action", lambda *_a, **_k: True)
|
||||||
pm.handle_export_database(dest)
|
pm.handle_export_database(dest)
|
||||||
|
|
||||||
confirms = iter([True, False])
|
confirms = iter([True, False])
|
||||||
|
@@ -20,6 +20,7 @@ def test_switch_fingerprint_triggers_bg_sync(monkeypatch, tmp_path):
|
|||||||
pm.current_fingerprint = None
|
pm.current_fingerprint = None
|
||||||
pm.encryption_manager = object()
|
pm.encryption_manager = object()
|
||||||
pm.config_manager = SimpleNamespace(get_quick_unlock=lambda: False)
|
pm.config_manager = SimpleNamespace(get_quick_unlock=lambda: False)
|
||||||
|
pm.nostr_account_idx = 0
|
||||||
|
|
||||||
monkeypatch.setattr("builtins.input", lambda *_a, **_k: "1")
|
monkeypatch.setattr("builtins.input", lambda *_a, **_k: "1")
|
||||||
monkeypatch.setattr(
|
monkeypatch.setattr(
|
||||||
|
@@ -25,7 +25,7 @@ class DummyPM:
|
|||||||
retrieve_entry=lambda idx: {"type": EntryType.PASSWORD.value, "length": 8},
|
retrieve_entry=lambda idx: {"type": EntryType.PASSWORD.value, "length": 8},
|
||||||
get_totp_code=lambda idx, seed: "123456",
|
get_totp_code=lambda idx, seed: "123456",
|
||||||
add_entry=lambda label, length, username, url, **kwargs: 1,
|
add_entry=lambda label, length, username, url, **kwargs: 1,
|
||||||
add_totp=lambda label, seed, index=None, secret=None, period=30, digits=6: "totp://",
|
add_totp=lambda label, seed, index=None, secret=None, period=30, digits=6, deterministic=False: "totp://",
|
||||||
add_ssh_key=lambda label, seed, index=None, notes="": 2,
|
add_ssh_key=lambda label, seed, index=None, notes="": 2,
|
||||||
add_pgp_key=lambda label, seed, index=None, key_type="ed25519", user_id="", notes="": 3,
|
add_pgp_key=lambda label, seed, index=None, key_type="ed25519", user_id="", notes="": 3,
|
||||||
add_nostr_key=lambda label, seed, index=None, notes="": 4,
|
add_nostr_key=lambda label, seed, index=None, notes="": 4,
|
||||||
@@ -42,7 +42,7 @@ class DummyPM:
|
|||||||
)
|
)
|
||||||
self.parent_seed = "seed"
|
self.parent_seed = "seed"
|
||||||
self.handle_display_totp_codes = lambda: None
|
self.handle_display_totp_codes = lambda: None
|
||||||
self.handle_export_database = lambda path: None
|
self.handle_export_database = lambda path, **kwargs: None
|
||||||
self.handle_import_database = lambda path: None
|
self.handle_import_database = lambda path: None
|
||||||
self.change_password = lambda *a, **kw: None
|
self.change_password = lambda *a, **kw: None
|
||||||
self.lock_vault = lambda: None
|
self.lock_vault = lambda: None
|
||||||
|
@@ -65,8 +65,14 @@ runner = CliRunner()
|
|||||||
"--digits",
|
"--digits",
|
||||||
"7",
|
"7",
|
||||||
],
|
],
|
||||||
("Label", "seed"),
|
("Label", None),
|
||||||
{"index": 1, "secret": "abc", "period": 45, "digits": 7},
|
{
|
||||||
|
"index": 1,
|
||||||
|
"secret": "abc",
|
||||||
|
"period": 45,
|
||||||
|
"digits": 7,
|
||||||
|
"deterministic": False,
|
||||||
|
},
|
||||||
"otpauth://uri",
|
"otpauth://uri",
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
|
@@ -17,8 +17,8 @@ def _setup_pm(tmp_path: Path):
|
|||||||
cfg = ConfigManager(vault, tmp_path)
|
cfg = ConfigManager(vault, tmp_path)
|
||||||
backup = BackupManager(tmp_path, cfg)
|
backup = BackupManager(tmp_path, cfg)
|
||||||
pm = SimpleNamespace(
|
pm = SimpleNamespace(
|
||||||
handle_export_database=lambda p: export_backup(
|
handle_export_database=lambda p, encrypt=True: export_backup(
|
||||||
vault, backup, p, parent_seed=TEST_SEED
|
vault, backup, p, parent_seed=TEST_SEED, encrypt=encrypt
|
||||||
),
|
),
|
||||||
handle_import_database=lambda p: import_backup(
|
handle_import_database=lambda p: import_backup(
|
||||||
vault, backup, p, parent_seed=TEST_SEED
|
vault, backup, p, parent_seed=TEST_SEED
|
||||||
@@ -91,3 +91,36 @@ def test_cli_import_round_trip(monkeypatch, tmp_path):
|
|||||||
rc = main.main(["import", "--file", str(export_path)])
|
rc = main.main(["import", "--file", str(export_path)])
|
||||||
assert rc == 0
|
assert rc == 0
|
||||||
assert vault.load_index() == original
|
assert vault.load_index() == original
|
||||||
|
|
||||||
|
|
||||||
|
def test_cli_export_import_unencrypted(monkeypatch, tmp_path):
|
||||||
|
pm, vault = _setup_pm(tmp_path)
|
||||||
|
data = {
|
||||||
|
"schema_version": 4,
|
||||||
|
"entries": {
|
||||||
|
"0": {
|
||||||
|
"label": "example",
|
||||||
|
"type": "password",
|
||||||
|
"notes": "",
|
||||||
|
"custom_fields": [],
|
||||||
|
"origin": "",
|
||||||
|
"tags": [],
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
vault.save_index(data)
|
||||||
|
|
||||||
|
monkeypatch.setattr(main, "PasswordManager", lambda *a, **k: pm)
|
||||||
|
monkeypatch.setattr(main, "configure_logging", lambda: None)
|
||||||
|
monkeypatch.setattr(main, "initialize_app", lambda: None)
|
||||||
|
monkeypatch.setattr(main.signal, "signal", lambda *a, **k: None)
|
||||||
|
|
||||||
|
export_path = tmp_path / "out.json"
|
||||||
|
rc = main.main(["export", "--file", str(export_path), "--unencrypted"])
|
||||||
|
assert rc == 0
|
||||||
|
assert export_path.exists()
|
||||||
|
|
||||||
|
vault.save_index({"schema_version": 4, "entries": {}})
|
||||||
|
rc = main.main(["import", "--file", str(export_path)])
|
||||||
|
assert rc == 0
|
||||||
|
assert vault.load_index() == data
|
||||||
|
@@ -33,7 +33,9 @@ class FakeEntries:
|
|||||||
self.added.append(("password", label, length, username, url))
|
self.added.append(("password", label, length, username, url))
|
||||||
return 1
|
return 1
|
||||||
|
|
||||||
def add_totp(self, label):
|
def add_totp(
|
||||||
|
self, label, deterministic=False, index=None, secret=None, period=30, digits=6
|
||||||
|
):
|
||||||
self.added.append(("totp", label))
|
self.added.append(("totp", label))
|
||||||
return 1
|
return 1
|
||||||
|
|
||||||
|
19
src/tests/test_kdf_strength_slider.py
Normal file
19
src/tests/test_kdf_strength_slider.py
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
from pathlib import Path
|
||||||
|
from tempfile import TemporaryDirectory
|
||||||
|
from types import SimpleNamespace
|
||||||
|
|
||||||
|
from helpers import create_vault, TEST_SEED, TEST_PASSWORD
|
||||||
|
from seedpass.core.config_manager import ConfigManager
|
||||||
|
from main import handle_set_kdf_iterations
|
||||||
|
|
||||||
|
|
||||||
|
def test_kdf_strength_slider_persists(monkeypatch):
|
||||||
|
with TemporaryDirectory() as tmpdir:
|
||||||
|
tmp_path = Path(tmpdir)
|
||||||
|
vault, enc_mgr = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD)
|
||||||
|
cfg_mgr = ConfigManager(vault, tmp_path)
|
||||||
|
pm = SimpleNamespace(config_manager=cfg_mgr)
|
||||||
|
inputs = iter(["3"])
|
||||||
|
monkeypatch.setattr("builtins.input", lambda *_: next(inputs))
|
||||||
|
handle_set_kdf_iterations(pm)
|
||||||
|
assert cfg_mgr.get_kdf_iterations() == 100_000
|
@@ -83,7 +83,7 @@ def test_failed_migration_restores_legacy(monkeypatch, tmp_path: Path):
|
|||||||
assert not vault.migrated_from_legacy
|
assert not vault.migrated_from_legacy
|
||||||
|
|
||||||
|
|
||||||
def test_migrated_index_has_v2_prefix(monkeypatch, tmp_path: Path):
|
def test_migrated_index_has_v3_prefix(monkeypatch, tmp_path: Path):
|
||||||
vault, _ = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD)
|
vault, _ = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD)
|
||||||
|
|
||||||
key = derive_index_key(TEST_SEED)
|
key = derive_index_key(TEST_SEED)
|
||||||
@@ -101,7 +101,7 @@ def test_migrated_index_has_v2_prefix(monkeypatch, tmp_path: Path):
|
|||||||
|
|
||||||
new_file = tmp_path / "seedpass_entries_db.json.enc"
|
new_file = tmp_path / "seedpass_entries_db.json.enc"
|
||||||
payload = json.loads(new_file.read_text())
|
payload = json.loads(new_file.read_text())
|
||||||
assert base64.b64decode(payload["ct"]).startswith(b"V2:")
|
assert base64.b64decode(payload["ct"]).startswith(b"V3|")
|
||||||
assert vault.migrated_from_legacy
|
assert vault.migrated_from_legacy
|
||||||
|
|
||||||
|
|
||||||
@@ -156,6 +156,14 @@ def test_migration_syncs_when_confirmed(monkeypatch, tmp_path: Path):
|
|||||||
pm.fingerprint_dir = tmp_path
|
pm.fingerprint_dir = tmp_path
|
||||||
pm.current_fingerprint = tmp_path.name
|
pm.current_fingerprint = tmp_path.name
|
||||||
pm.bip85 = SimpleNamespace()
|
pm.bip85 = SimpleNamespace()
|
||||||
|
from seedpass.core.config_manager import ConfigManager
|
||||||
|
|
||||||
|
cfg_mgr = ConfigManager(pm.vault, tmp_path)
|
||||||
|
cfg = cfg_mgr.load_config(require_pin=False)
|
||||||
|
cfg["offline_mode"] = False
|
||||||
|
cfg_mgr.save_config(cfg)
|
||||||
|
pm.config_manager = cfg_mgr
|
||||||
|
pm.offline_mode = False
|
||||||
|
|
||||||
calls = {"sync": 0}
|
calls = {"sync": 0}
|
||||||
pm.sync_vault = lambda *a, **k: calls.__setitem__("sync", calls["sync"] + 1) or {
|
pm.sync_vault = lambda *a, **k: calls.__setitem__("sync", calls["sync"] + 1) or {
|
||||||
@@ -279,6 +287,7 @@ def test_legacy_index_reinit_syncs_once_when_confirmed(monkeypatch, tmp_path: Pa
|
|||||||
pm.fingerprint_dir = tmp_path
|
pm.fingerprint_dir = tmp_path
|
||||||
pm.current_fingerprint = tmp_path.name
|
pm.current_fingerprint = tmp_path.name
|
||||||
pm.bip85 = SimpleNamespace()
|
pm.bip85 = SimpleNamespace()
|
||||||
|
pm.offline_mode = True
|
||||||
|
|
||||||
monkeypatch.setattr(
|
monkeypatch.setattr(
|
||||||
"seedpass.core.manager.NostrClient", lambda *a, **k: SimpleNamespace()
|
"seedpass.core.manager.NostrClient", lambda *a, **k: SimpleNamespace()
|
||||||
@@ -296,7 +305,7 @@ def test_legacy_index_reinit_syncs_once_when_confirmed(monkeypatch, tmp_path: Pa
|
|||||||
pm.initialize_managers()
|
pm.initialize_managers()
|
||||||
pm.initialize_managers()
|
pm.initialize_managers()
|
||||||
|
|
||||||
assert calls["sync"] == 1
|
assert calls["sync"] == 0
|
||||||
assert enc_mgr.last_migration_performed is False
|
assert enc_mgr.last_migration_performed is False
|
||||||
|
|
||||||
|
|
||||||
@@ -316,6 +325,13 @@ def test_schema_migration_no_sync_prompt(monkeypatch, tmp_path: Path):
|
|||||||
pm.fingerprint_dir = tmp_path
|
pm.fingerprint_dir = tmp_path
|
||||||
pm.current_fingerprint = tmp_path.name
|
pm.current_fingerprint = tmp_path.name
|
||||||
pm.bip85 = SimpleNamespace()
|
pm.bip85 = SimpleNamespace()
|
||||||
|
from seedpass.core.config_manager import ConfigManager
|
||||||
|
|
||||||
|
cfg_mgr = ConfigManager(pm.vault, tmp_path)
|
||||||
|
cfg = cfg_mgr.load_config(require_pin=False)
|
||||||
|
cfg["offline_mode"] = False
|
||||||
|
cfg_mgr.save_config(cfg)
|
||||||
|
pm.config_manager = cfg_mgr
|
||||||
pm.offline_mode = False
|
pm.offline_mode = False
|
||||||
|
|
||||||
calls = {"sync": 0, "confirm": 0}
|
calls = {"sync": 0, "confirm": 0}
|
||||||
|
@@ -67,4 +67,4 @@ def test_migrate_iterations(tmp_path, monkeypatch, iterations):
|
|||||||
assert cfg.get_kdf_iterations() == iterations
|
assert cfg.get_kdf_iterations() == iterations
|
||||||
|
|
||||||
payload = json.loads((tmp_path / "seedpass_entries_db.json.enc").read_text())
|
payload = json.loads((tmp_path / "seedpass_entries_db.json.enc").read_text())
|
||||||
assert base64.b64decode(payload["ct"]).startswith(b"V2:")
|
assert base64.b64decode(payload["ct"]).startswith(b"V3|")
|
||||||
|
@@ -51,5 +51,5 @@ def test_migrate_legacy_sets_flag(tmp_path, monkeypatch):
|
|||||||
monkeypatch.setattr("builtins.input", lambda _: "2")
|
monkeypatch.setattr("builtins.input", lambda _: "2")
|
||||||
vault.load_index()
|
vault.load_index()
|
||||||
payload = json.loads((tmp_path / "seedpass_entries_db.json.enc").read_text())
|
payload = json.loads((tmp_path / "seedpass_entries_db.json.enc").read_text())
|
||||||
assert base64.b64decode(payload["ct"]).startswith(b"V2:")
|
assert base64.b64decode(payload["ct"]).startswith(b"V3|")
|
||||||
assert vault.encryption_manager.last_migration_performed is True
|
assert vault.encryption_manager.last_migration_performed is True
|
||||||
|
@@ -36,7 +36,7 @@ def test_legacy_migration_second_session(monkeypatch, tmp_path: Path) -> None:
|
|||||||
vault.load_index()
|
vault.load_index()
|
||||||
new_file = fp_dir / "seedpass_entries_db.json.enc"
|
new_file = fp_dir / "seedpass_entries_db.json.enc"
|
||||||
payload = json.loads(new_file.read_text())
|
payload = json.loads(new_file.read_text())
|
||||||
assert base64.b64decode(payload["ct"]).startswith(b"V2:")
|
assert base64.b64decode(payload["ct"]).startswith(b"V3|")
|
||||||
|
|
||||||
new_enc_mgr = EncryptionManager(key, fp_dir)
|
new_enc_mgr = EncryptionManager(key, fp_dir)
|
||||||
new_vault = Vault(new_enc_mgr, fp_dir)
|
new_vault = Vault(new_enc_mgr, fp_dir)
|
||||||
@@ -62,4 +62,4 @@ def test_legacy_migration_second_session(monkeypatch, tmp_path: Path) -> None:
|
|||||||
|
|
||||||
pm.initialize_managers()
|
pm.initialize_managers()
|
||||||
payload = json.loads(new_file.read_text())
|
payload = json.loads(new_file.read_text())
|
||||||
assert base64.b64decode(payload["ct"]).startswith(b"V2:")
|
assert base64.b64decode(payload["ct"]).startswith(b"V3|")
|
||||||
|
@@ -60,15 +60,11 @@ def test_handle_add_totp(monkeypatch, capsys):
|
|||||||
out = capsys.readouterr().out
|
out = capsys.readouterr().out
|
||||||
|
|
||||||
entry = entry_mgr.retrieve_entry(0)
|
entry = entry_mgr.retrieve_entry(0)
|
||||||
assert entry == {
|
assert entry["type"] == "totp"
|
||||||
"type": "totp",
|
assert entry["kind"] == "totp"
|
||||||
"kind": "totp",
|
assert entry["label"] == "Example"
|
||||||
"label": "Example",
|
assert entry["deterministic"] is False
|
||||||
"index": 0,
|
assert "index" not in entry
|
||||||
"period": 30,
|
assert "secret" in entry
|
||||||
"digits": 6,
|
assert len(entry["secret"]) >= 16
|
||||||
"archived": False,
|
|
||||||
"notes": "",
|
|
||||||
"tags": [],
|
|
||||||
}
|
|
||||||
assert "ID 0" in out
|
assert "ID 0" in out
|
||||||
|
@@ -32,7 +32,7 @@ def test_handle_display_totp_codes(monkeypatch, capsys, password_manager):
|
|||||||
|
|
||||||
pm.handle_display_totp_codes()
|
pm.handle_display_totp_codes()
|
||||||
out = capsys.readouterr().out
|
out = capsys.readouterr().out
|
||||||
assert "Generated 2FA Codes" in out
|
assert "Imported 2FA Codes" in out
|
||||||
assert "[0] Example" in out
|
assert "[0] Example" in out
|
||||||
assert "123456" in out
|
assert "123456" in out
|
||||||
|
|
||||||
|
18
src/tests/test_manifest_id_privacy.py
Normal file
18
src/tests/test_manifest_id_privacy.py
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
import asyncio
|
||||||
|
|
||||||
|
from helpers import dummy_nostr_client
|
||||||
|
|
||||||
|
|
||||||
|
def test_published_events_no_fingerprint(dummy_nostr_client):
|
||||||
|
client, relay = dummy_nostr_client
|
||||||
|
asyncio.run(client.publish_snapshot(b"secret"))
|
||||||
|
fingerprint = "fp"
|
||||||
|
events = list(relay.manifests) + list(relay.chunks.values())
|
||||||
|
seen = set()
|
||||||
|
for ev in events:
|
||||||
|
if id(ev) in seen:
|
||||||
|
continue
|
||||||
|
seen.add(id(ev))
|
||||||
|
assert fingerprint not in ev.id
|
||||||
|
for tag in getattr(ev, "tags", []):
|
||||||
|
assert fingerprint not in tag
|
@@ -5,6 +5,7 @@ from tempfile import TemporaryDirectory
|
|||||||
from seedpass.core.manager import PasswordManager
|
from seedpass.core.manager import PasswordManager
|
||||||
from utils.fingerprint_manager import FingerprintManager
|
from utils.fingerprint_manager import FingerprintManager
|
||||||
from utils.fingerprint import generate_fingerprint
|
from utils.fingerprint import generate_fingerprint
|
||||||
|
from seedpass.core.state_manager import StateManager
|
||||||
|
|
||||||
VALID_SEED = "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about"
|
VALID_SEED = "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about"
|
||||||
|
|
||||||
@@ -13,6 +14,7 @@ def setup_pm(tmp_path, monkeypatch):
|
|||||||
pm = PasswordManager.__new__(PasswordManager)
|
pm = PasswordManager.__new__(PasswordManager)
|
||||||
pm.fingerprint_manager = FingerprintManager(tmp_path)
|
pm.fingerprint_manager = FingerprintManager(tmp_path)
|
||||||
pm.config_manager = type("Cfg", (), {"get_kdf_iterations": lambda self: 1})()
|
pm.config_manager = type("Cfg", (), {"get_kdf_iterations": lambda self: 1})()
|
||||||
|
pm.state_manager = StateManager(tmp_path)
|
||||||
monkeypatch.setattr("seedpass.core.manager.prompt_for_password", lambda: "pw")
|
monkeypatch.setattr("seedpass.core.manager.prompt_for_password", lambda: "pw")
|
||||||
monkeypatch.setattr("seedpass.core.manager.derive_index_key", lambda seed: b"idx")
|
monkeypatch.setattr("seedpass.core.manager.derive_index_key", lambda seed: b"idx")
|
||||||
monkeypatch.setattr(
|
monkeypatch.setattr(
|
||||||
@@ -49,3 +51,5 @@ def test_generate_new_seed_creates_profile(monkeypatch):
|
|||||||
|
|
||||||
assert fingerprint == generate_fingerprint(VALID_SEED)
|
assert fingerprint == generate_fingerprint(VALID_SEED)
|
||||||
assert pm.fingerprint_manager.list_fingerprints() == [fingerprint]
|
assert pm.fingerprint_manager.list_fingerprints() == [fingerprint]
|
||||||
|
sm = StateManager(tmp_path / fingerprint)
|
||||||
|
assert sm.state["nostr_account_idx"] == 1
|
||||||
|
19
src/tests/test_nonce_uniqueness.py
Normal file
19
src/tests/test_nonce_uniqueness.py
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from helpers import TEST_SEED
|
||||||
|
from utils.key_derivation import derive_index_key
|
||||||
|
from seedpass.core.encryption import EncryptionManager
|
||||||
|
|
||||||
|
|
||||||
|
def test_nonce_uniqueness(tmp_path: Path) -> None:
|
||||||
|
key = derive_index_key(TEST_SEED)
|
||||||
|
manager = EncryptionManager(key, tmp_path)
|
||||||
|
plaintext = b"repeat"
|
||||||
|
nonces = set()
|
||||||
|
for _ in range(10):
|
||||||
|
payload = manager.encrypt_data(plaintext)
|
||||||
|
assert payload.startswith(b"V3|")
|
||||||
|
nonce = payload[3:15]
|
||||||
|
assert nonce not in nonces
|
||||||
|
nonces.add(nonce)
|
||||||
|
assert len(nonces) == 10
|
@@ -5,7 +5,6 @@ import json
|
|||||||
|
|
||||||
from helpers import DummyEvent, DummyFilter, dummy_nostr_client
|
from helpers import DummyEvent, DummyFilter, dummy_nostr_client
|
||||||
from nostr.backup_models import KIND_MANIFEST, KIND_SNAPSHOT_CHUNK
|
from nostr.backup_models import KIND_MANIFEST, KIND_SNAPSHOT_CHUNK
|
||||||
from nostr.client import MANIFEST_ID_PREFIX
|
|
||||||
from nostr_sdk import Keys
|
from nostr_sdk import Keys
|
||||||
|
|
||||||
|
|
||||||
@@ -55,9 +54,7 @@ def test_fetch_snapshot_legacy_key_fallback(dummy_nostr_client, monkeypatch):
|
|||||||
],
|
],
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
manifest_event = DummyEvent(
|
manifest_event = DummyEvent(KIND_MANIFEST, manifest_json, tags=["legacy"])
|
||||||
KIND_MANIFEST, manifest_json, tags=[f"{MANIFEST_ID_PREFIX}fp"]
|
|
||||||
)
|
|
||||||
chunk_event = DummyEvent(
|
chunk_event = DummyEvent(
|
||||||
KIND_SNAPSHOT_CHUNK,
|
KIND_SNAPSHOT_CHUNK,
|
||||||
base64.b64encode(chunk_bytes).decode("utf-8"),
|
base64.b64encode(chunk_bytes).decode("utf-8"),
|
||||||
@@ -69,9 +66,9 @@ def test_fetch_snapshot_legacy_key_fallback(dummy_nostr_client, monkeypatch):
|
|||||||
async def fake_fetch_events(f, _timeout):
|
async def fake_fetch_events(f, _timeout):
|
||||||
call["count"] += 1
|
call["count"] += 1
|
||||||
call["authors"].append(getattr(f, "author_pk", None))
|
call["authors"].append(getattr(f, "author_pk", None))
|
||||||
if call["count"] <= 2:
|
if call["count"] == 1:
|
||||||
return type("R", (), {"to_vec": lambda self: []})()
|
return type("R", (), {"to_vec": lambda self: []})()
|
||||||
elif call["count"] == 3:
|
elif call["count"] == 2:
|
||||||
return type("R", (), {"to_vec": lambda self: [manifest_event]})()
|
return type("R", (), {"to_vec": lambda self: [manifest_event]})()
|
||||||
else:
|
else:
|
||||||
return type("R", (), {"to_vec": lambda self: [chunk_event]})()
|
return type("R", (), {"to_vec": lambda self: [chunk_event]})()
|
||||||
|
@@ -1,49 +0,0 @@
|
|||||||
import asyncio
|
|
||||||
|
|
||||||
from helpers import TEST_SEED, dummy_nostr_client
|
|
||||||
from nostr.backup_models import KIND_MANIFEST
|
|
||||||
from nostr.client import MANIFEST_ID_PREFIX, NostrClient
|
|
||||||
|
|
||||||
|
|
||||||
def test_fetch_latest_snapshot_legacy_identifier(dummy_nostr_client, monkeypatch):
|
|
||||||
client, relay = dummy_nostr_client
|
|
||||||
data = b"legacy"
|
|
||||||
asyncio.run(client.publish_snapshot(data))
|
|
||||||
relay.manifests[-1].tags = [MANIFEST_ID_PREFIX.rstrip("-")]
|
|
||||||
relay.filters.clear()
|
|
||||||
|
|
||||||
orig_fetch = relay.fetch_events
|
|
||||||
|
|
||||||
async def fetch_events(self, f, timeout):
|
|
||||||
identifier = f.ids[0] if getattr(f, "ids", None) else None
|
|
||||||
kind = getattr(f, "kind_val", None)
|
|
||||||
if kind == KIND_MANIFEST:
|
|
||||||
events = [m for m in self.manifests if identifier in m.tags]
|
|
||||||
self.filters.append(f)
|
|
||||||
|
|
||||||
class Res:
|
|
||||||
def __init__(self, evs):
|
|
||||||
self._evs = evs
|
|
||||||
|
|
||||||
def to_vec(self):
|
|
||||||
return self._evs
|
|
||||||
|
|
||||||
return Res(events)
|
|
||||||
return await orig_fetch(f, timeout)
|
|
||||||
|
|
||||||
monkeypatch.setattr(
|
|
||||||
relay, "fetch_events", fetch_events.__get__(relay, relay.__class__)
|
|
||||||
)
|
|
||||||
|
|
||||||
enc_mgr = client.encryption_manager
|
|
||||||
monkeypatch.setattr(
|
|
||||||
enc_mgr, "decrypt_parent_seed", lambda: TEST_SEED, raising=False
|
|
||||||
)
|
|
||||||
monkeypatch.setattr("nostr.client.KeyManager", type(client.key_manager))
|
|
||||||
client2 = NostrClient(enc_mgr, "fp")
|
|
||||||
relay.filters.clear()
|
|
||||||
result = asyncio.run(client2.fetch_latest_snapshot())
|
|
||||||
assert result is not None
|
|
||||||
ids = [f.ids[0] for f in relay.filters]
|
|
||||||
assert ids[0] == f"{MANIFEST_ID_PREFIX}fp"
|
|
||||||
assert MANIFEST_ID_PREFIX.rstrip("-") in ids
|
|
14
src/tests/test_offline_mode_default_enabled.py
Normal file
14
src/tests/test_offline_mode_default_enabled.py
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
from pathlib import Path
|
||||||
|
from tempfile import TemporaryDirectory
|
||||||
|
|
||||||
|
from seedpass.core.config_manager import ConfigManager
|
||||||
|
from helpers import create_vault, TEST_SEED, TEST_PASSWORD
|
||||||
|
|
||||||
|
|
||||||
|
def test_offline_mode_default_enabled():
|
||||||
|
with TemporaryDirectory() as tmpdir:
|
||||||
|
tmp_path = Path(tmpdir)
|
||||||
|
vault, enc_mgr = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD)
|
||||||
|
cfg_mgr = ConfigManager(vault, tmp_path)
|
||||||
|
config = cfg_mgr.load_config(require_pin=False)
|
||||||
|
assert config["offline_mode"] is True
|
@@ -35,6 +35,7 @@ def test_change_password_triggers_nostr_backup(monkeypatch):
|
|||||||
pm.parent_seed = TEST_SEED
|
pm.parent_seed = TEST_SEED
|
||||||
pm.store_hashed_password = lambda pw: None
|
pm.store_hashed_password = lambda pw: None
|
||||||
pm.verify_password = lambda pw: True
|
pm.verify_password = lambda pw: True
|
||||||
|
pm.nostr_account_idx = 0
|
||||||
|
|
||||||
with patch("seedpass.core.manager.NostrClient") as MockClient:
|
with patch("seedpass.core.manager.NostrClient") as MockClient:
|
||||||
mock_instance = MockClient.return_value
|
mock_instance = MockClient.return_value
|
||||||
|
@@ -62,6 +62,7 @@ def test_password_change_and_unlock(monkeypatch):
|
|||||||
pm.nostr_client = SimpleNamespace(
|
pm.nostr_client = SimpleNamespace(
|
||||||
publish_snapshot=lambda *a, **k: (None, "abcd")
|
publish_snapshot=lambda *a, **k: (None, "abcd")
|
||||||
)
|
)
|
||||||
|
pm.nostr_account_idx = 0
|
||||||
|
|
||||||
monkeypatch.setattr(
|
monkeypatch.setattr(
|
||||||
"seedpass.core.manager.prompt_existing_password", lambda *_: old_pw
|
"seedpass.core.manager.prompt_existing_password", lambda *_: old_pw
|
||||||
|
@@ -15,6 +15,7 @@ from seedpass.core.vault import Vault
|
|||||||
from seedpass.core.backup import BackupManager
|
from seedpass.core.backup import BackupManager
|
||||||
from seedpass.core.config_manager import ConfigManager
|
from seedpass.core.config_manager import ConfigManager
|
||||||
from seedpass.core.portable_backup import export_backup, import_backup
|
from seedpass.core.portable_backup import export_backup, import_backup
|
||||||
|
from seedpass.core.portable_backup import PortableMode
|
||||||
from utils.key_derivation import derive_index_key, derive_key_from_password
|
from utils.key_derivation import derive_index_key, derive_key_from_password
|
||||||
from utils.fingerprint import generate_fingerprint
|
from utils.fingerprint import generate_fingerprint
|
||||||
|
|
||||||
@@ -54,6 +55,22 @@ def test_round_trip(monkeypatch):
|
|||||||
assert vault.load_index()["pw"] == data["pw"]
|
assert vault.load_index()["pw"] == data["pw"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_round_trip_unencrypted(monkeypatch):
|
||||||
|
with TemporaryDirectory() as td:
|
||||||
|
tmp = Path(td)
|
||||||
|
vault, backup, _ = setup_vault(tmp)
|
||||||
|
data = {"pw": 1}
|
||||||
|
vault.save_index(data)
|
||||||
|
|
||||||
|
path = export_backup(vault, backup, parent_seed=SEED, encrypt=False)
|
||||||
|
wrapper = json.loads(path.read_text())
|
||||||
|
assert wrapper["encryption_mode"] == PortableMode.NONE.value
|
||||||
|
|
||||||
|
vault.save_index({"pw": 0})
|
||||||
|
import_backup(vault, backup, path, parent_seed=SEED)
|
||||||
|
assert vault.load_index()["pw"] == data["pw"]
|
||||||
|
|
||||||
|
|
||||||
from cryptography.fernet import InvalidToken
|
from cryptography.fernet import InvalidToken
|
||||||
|
|
||||||
|
|
||||||
|
@@ -20,6 +20,7 @@ def setup_pm(tmp_path):
|
|||||||
pm.encryption_mode = manager_module.EncryptionMode.SEED_ONLY
|
pm.encryption_mode = manager_module.EncryptionMode.SEED_ONLY
|
||||||
pm.fingerprint_manager = manager_module.FingerprintManager(constants.APP_DIR)
|
pm.fingerprint_manager = manager_module.FingerprintManager(constants.APP_DIR)
|
||||||
pm.current_fingerprint = None
|
pm.current_fingerprint = None
|
||||||
|
pm.state_manager = manager_module.StateManager(constants.APP_DIR)
|
||||||
return pm, constants, manager_module
|
return pm, constants, manager_module
|
||||||
|
|
||||||
|
|
||||||
@@ -41,8 +42,8 @@ def test_generate_seed_cleanup_on_failure(monkeypatch):
|
|||||||
|
|
||||||
# fingerprint list should be empty and only fingerprints.json should remain
|
# fingerprint list should be empty and only fingerprints.json should remain
|
||||||
assert pm.fingerprint_manager.list_fingerprints() == []
|
assert pm.fingerprint_manager.list_fingerprints() == []
|
||||||
contents = list(const.APP_DIR.iterdir())
|
contents = sorted(p.name for p in const.APP_DIR.iterdir())
|
||||||
assert len(contents) == 1 and contents[0].name == "fingerprints.json"
|
assert contents == ["fingerprints.json", "seedpass_state.json"]
|
||||||
fp_file = pm.fingerprint_manager.fingerprints_file
|
fp_file = pm.fingerprint_manager.fingerprints_file
|
||||||
with open(fp_file) as f:
|
with open(fp_file) as f:
|
||||||
data = json.load(f)
|
data = json.load(f)
|
||||||
|
@@ -29,6 +29,7 @@ def test_add_and_switch_fingerprint(monkeypatch):
|
|||||||
pm.fingerprint_manager = fm
|
pm.fingerprint_manager = fm
|
||||||
pm.encryption_manager = object()
|
pm.encryption_manager = object()
|
||||||
pm.current_fingerprint = None
|
pm.current_fingerprint = None
|
||||||
|
pm.nostr_account_idx = 0
|
||||||
|
|
||||||
monkeypatch.setattr("builtins.input", lambda *_args, **_kwargs: "1")
|
monkeypatch.setattr("builtins.input", lambda *_args, **_kwargs: "1")
|
||||||
monkeypatch.setattr(
|
monkeypatch.setattr(
|
||||||
|
@@ -82,9 +82,11 @@ def test_publish_snapshot_success():
|
|||||||
with patch.object(
|
with patch.object(
|
||||||
client.client, "send_event", side_effect=fake_send
|
client.client, "send_event", side_effect=fake_send
|
||||||
) as mock_send:
|
) as mock_send:
|
||||||
manifest, event_id = asyncio.run(client.publish_snapshot(b"data"))
|
with patch("nostr.snapshot.new_manifest_id", return_value=("id", b"nonce")):
|
||||||
|
manifest, event_id = asyncio.run(client.publish_snapshot(b"data"))
|
||||||
assert isinstance(manifest, Manifest)
|
assert isinstance(manifest, Manifest)
|
||||||
assert event_id == "seedpass-manifest-fp"
|
assert event_id == "id"
|
||||||
|
assert manifest.nonce == base64.b64encode(b"nonce").decode("utf-8")
|
||||||
assert mock_send.await_count >= 1
|
assert mock_send.await_count >= 1
|
||||||
|
|
||||||
|
|
||||||
|
@@ -21,6 +21,7 @@ def setup_password_manager():
|
|||||||
pm.fingerprint_manager = manager_module.FingerprintManager(constants.APP_DIR)
|
pm.fingerprint_manager = manager_module.FingerprintManager(constants.APP_DIR)
|
||||||
pm.current_fingerprint = None
|
pm.current_fingerprint = None
|
||||||
pm.save_and_encrypt_seed = lambda seed, fingerprint_dir: None
|
pm.save_and_encrypt_seed = lambda seed, fingerprint_dir: None
|
||||||
|
pm.state_manager = manager_module.StateManager(constants.APP_DIR)
|
||||||
return pm, constants
|
return pm, constants
|
||||||
|
|
||||||
|
|
||||||
|
@@ -31,4 +31,4 @@ def test_parent_seed_migrates_from_fernet(tmp_path: Path) -> None:
|
|||||||
assert new_file.exists()
|
assert new_file.exists()
|
||||||
assert new_file.read_bytes() != encrypted
|
assert new_file.read_bytes() != encrypted
|
||||||
payload = json.loads(new_file.read_text())
|
payload = json.loads(new_file.read_text())
|
||||||
assert base64.b64decode(payload["ct"]).startswith(b"V2:")
|
assert base64.b64decode(payload["ct"]).startswith(b"V3|")
|
||||||
|
@@ -120,6 +120,7 @@ def test_profile_service_switch(monkeypatch):
|
|||||||
pm.delta_since = None
|
pm.delta_since = None
|
||||||
pm.encryption_manager = SimpleNamespace()
|
pm.encryption_manager = SimpleNamespace()
|
||||||
pm.parent_seed = TEST_SEED
|
pm.parent_seed = TEST_SEED
|
||||||
|
pm.nostr_account_idx = 0
|
||||||
|
|
||||||
service = ProfileService(pm)
|
service = ProfileService(pm)
|
||||||
monkeypatch.setattr("builtins.input", lambda *_: "2")
|
monkeypatch.setattr("builtins.input", lambda *_: "2")
|
||||||
|
@@ -14,6 +14,7 @@ def test_state_manager_round_trip():
|
|||||||
assert state["last_sync_ts"] == 0
|
assert state["last_sync_ts"] == 0
|
||||||
assert state["manifest_id"] is None
|
assert state["manifest_id"] is None
|
||||||
assert state["delta_since"] == 0
|
assert state["delta_since"] == 0
|
||||||
|
assert state["nostr_account_idx"] == 0
|
||||||
|
|
||||||
sm.add_relay("wss://example.com")
|
sm.add_relay("wss://example.com")
|
||||||
sm.update_state(
|
sm.update_state(
|
||||||
@@ -30,6 +31,7 @@ def test_state_manager_round_trip():
|
|||||||
assert state2["last_sync_ts"] == 123
|
assert state2["last_sync_ts"] == 123
|
||||||
assert state2["manifest_id"] == "mid"
|
assert state2["manifest_id"] == "mid"
|
||||||
assert state2["delta_since"] == 111
|
assert state2["delta_since"] == 111
|
||||||
|
assert state2["nostr_account_idx"] == 0
|
||||||
|
|
||||||
sm2.remove_relay(1) # remove first default relay
|
sm2.remove_relay(1) # remove first default relay
|
||||||
assert len(sm2.list_relays()) == len(DEFAULT_RELAYS)
|
assert len(sm2.list_relays()) == len(DEFAULT_RELAYS)
|
||||||
|
@@ -28,23 +28,19 @@ def test_add_totp_and_get_code():
|
|||||||
assert uri.startswith("otpauth://totp/")
|
assert uri.startswith("otpauth://totp/")
|
||||||
|
|
||||||
entry = entry_mgr.retrieve_entry(0)
|
entry = entry_mgr.retrieve_entry(0)
|
||||||
assert entry == {
|
assert entry["deterministic"] is False
|
||||||
"type": "totp",
|
assert "secret" in entry
|
||||||
"kind": "totp",
|
|
||||||
"label": "Example",
|
|
||||||
"index": 0,
|
|
||||||
"period": 30,
|
|
||||||
"digits": 6,
|
|
||||||
"archived": False,
|
|
||||||
"notes": "",
|
|
||||||
"tags": [],
|
|
||||||
}
|
|
||||||
|
|
||||||
code = entry_mgr.get_totp_code(0, TEST_SEED, timestamp=0)
|
code = entry_mgr.get_totp_code(0, timestamp=0)
|
||||||
|
|
||||||
expected = TotpManager.current_code(TEST_SEED, 0, timestamp=0)
|
expected = pyotp.TOTP(entry["secret"]).at(0)
|
||||||
assert code == expected
|
assert code == expected
|
||||||
|
|
||||||
|
# second entry should have different secret
|
||||||
|
entry_mgr.add_totp("Other", TEST_SEED)
|
||||||
|
entry2 = entry_mgr.retrieve_entry(1)
|
||||||
|
assert entry["secret"] != entry2["secret"]
|
||||||
|
|
||||||
|
|
||||||
def test_totp_time_remaining(monkeypatch):
|
def test_totp_time_remaining(monkeypatch):
|
||||||
with TemporaryDirectory() as tmpdir:
|
with TemporaryDirectory() as tmpdir:
|
||||||
@@ -68,17 +64,8 @@ def test_add_totp_imported(tmp_path):
|
|||||||
secret = "JBSWY3DPEHPK3PXP"
|
secret = "JBSWY3DPEHPK3PXP"
|
||||||
em.add_totp("Imported", TEST_SEED, secret=secret)
|
em.add_totp("Imported", TEST_SEED, secret=secret)
|
||||||
entry = em.retrieve_entry(0)
|
entry = em.retrieve_entry(0)
|
||||||
assert entry == {
|
assert entry["secret"] == secret
|
||||||
"type": "totp",
|
assert entry["deterministic"] is False
|
||||||
"kind": "totp",
|
|
||||||
"label": "Imported",
|
|
||||||
"secret": secret,
|
|
||||||
"period": 30,
|
|
||||||
"digits": 6,
|
|
||||||
"archived": False,
|
|
||||||
"notes": "",
|
|
||||||
"tags": [],
|
|
||||||
}
|
|
||||||
code = em.get_totp_code(0, timestamp=0)
|
code = em.get_totp_code(0, timestamp=0)
|
||||||
assert code == pyotp.TOTP(secret).at(0)
|
assert code == pyotp.TOTP(secret).at(0)
|
||||||
|
|
||||||
@@ -92,3 +79,23 @@ def test_add_totp_with_notes(tmp_path):
|
|||||||
em.add_totp("NoteLabel", TEST_SEED, notes="some note")
|
em.add_totp("NoteLabel", TEST_SEED, notes="some note")
|
||||||
entry = em.retrieve_entry(0)
|
entry = em.retrieve_entry(0)
|
||||||
assert entry["notes"] == "some note"
|
assert entry["notes"] == "some note"
|
||||||
|
|
||||||
|
|
||||||
|
def test_legacy_deterministic_entry(tmp_path):
|
||||||
|
vault, enc = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD)
|
||||||
|
cfg_mgr = ConfigManager(vault, tmp_path)
|
||||||
|
backup_mgr = BackupManager(tmp_path, cfg_mgr)
|
||||||
|
em = EntryManager(vault, backup_mgr)
|
||||||
|
|
||||||
|
em.add_totp("Legacy", TEST_SEED, deterministic=True)
|
||||||
|
data = em._load_index()
|
||||||
|
entry = data["entries"]["0"]
|
||||||
|
entry.pop("deterministic", None)
|
||||||
|
em._save_index(data)
|
||||||
|
|
||||||
|
code = em.get_totp_code(0, TEST_SEED, timestamp=0)
|
||||||
|
expected = TotpManager.current_code(TEST_SEED, 0, timestamp=0)
|
||||||
|
assert code == expected
|
||||||
|
|
||||||
|
exported = em.export_totp_entries(TEST_SEED)
|
||||||
|
assert exported["entries"][0]["secret"] == TotpManager.derive_secret(TEST_SEED, 0)
|
||||||
|
@@ -3,9 +3,16 @@ from __future__ import annotations
|
|||||||
import os
|
import os
|
||||||
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
||||||
|
|
||||||
|
# TODO: Replace this Python implementation with a Rust/WASM module for
|
||||||
|
# critical cryptographic operations.
|
||||||
|
|
||||||
|
|
||||||
class InMemorySecret:
|
class InMemorySecret:
|
||||||
"""Store sensitive data encrypted in RAM using AES-GCM."""
|
"""Store sensitive data encrypted in RAM using AES-GCM.
|
||||||
|
|
||||||
|
Zeroization is best-effort only; Python's memory management may retain
|
||||||
|
copies of the plaintext.
|
||||||
|
"""
|
||||||
|
|
||||||
def __init__(self, data: bytes) -> None:
|
def __init__(self, data: bytes) -> None:
|
||||||
if not isinstance(data, (bytes, bytearray)):
|
if not isinstance(data, (bytes, bytearray)):
|
||||||
|
Reference in New Issue
Block a user