Merge pull request #208 from PR0M3TH3AN/beta

Beta
This commit is contained in:
thePR0M3TH3AN
2025-07-03 14:50:46 -04:00
committed by GitHub
50 changed files with 1839 additions and 223 deletions

View File

@@ -46,6 +46,11 @@ SeedPass now uses the `portalocker` library for cross-platform file locking. No
- **Checksum Verification:** Ensure the integrity of the script with checksum verification.
- **Multiple Seed Profiles:** Manage separate seed profiles and switch between them seamlessly.
- **Interactive TUI:** Navigate through menus to add, retrieve, and modify entries as well as configure Nostr settings.
- **SeedPass 2FA:** Generate TOTP codes with a real-time countdown progress bar.
- **2FA Secret Issuance & Import:** Derive new TOTP secrets from your seed or import existing `otpauth://` URIs.
- **Export 2FA Codes:** Save all stored TOTP entries to an encrypted JSON file for use with other apps.
- **Optional External Backup Location:** Configure a second directory where backups are automatically copied.
- **AutoLock on Inactivity:** Vault locks after a configurable timeout for additional security.
## Prerequisites
@@ -118,6 +123,9 @@ seedpass export --file "~/seedpass_backup.json"
# Later you can restore it
seedpass import --file "~/seedpass_backup.json"
# Use the **Settings** menu to configure an extra backup directory
# on an external drive.
```
### Vault JSON Layout
@@ -168,12 +176,31 @@ python src/main.py
1. Add Entry
2. Retrieve Entry
3. Modify an Existing Entry
4. Settings
5. Exit
4. 2FA Codes
5. Settings
6. Exit
Enter your choice (1-5):
Enter your choice (1-6):
```
When choosing **Add Entry**, you can now select **Password** or **2FA (TOTP)**.
### Adding a 2FA Entry
1. From the main menu choose **Add Entry** and select **2FA (TOTP)**.
2. Pick **Make 2FA** to derive a new secret from your seed or **Import 2FA** to paste an existing `otpauth://` URI or secret.
3. Provide a label for the account (for example, `GitHub`).
4. SeedPass automatically chooses the next available derivation index when deriving.
5. Optionally specify the TOTP period and digit count.
6. SeedPass will display the URI and secret so you can add it to your authenticator app.
### Modifying a 2FA Entry
1. From the main menu choose **Modify an Existing Entry** and enter the index of the 2FA code you want to edit.
2. SeedPass will show the current label, period, digit count, and blacklist status.
3. Enter new values or press **Enter** to keep the existing settings.
4. The updated entry is saved back to your encrypted vault.
### Managing Multiple Seeds
@@ -223,7 +250,14 @@ Back in the Settings menu you can:
* Select `3` to change your master password.
* Choose `4` to verify the script checksum.
* Choose `5` to back up the parent seed.
* Choose `6` to lock the vault and require re-entry of your password.
* Select `6` to export the database to an encrypted file.
* Choose `7` to import a database from a backup file.
* Select `8` to export all 2FA codes.
* Choose `9` to set an additional backup location.
* Select `10` to change the inactivity timeout.
* Choose `11` to lock the vault and require re-entry of your password.
* Select `12` to return to the main menu.
* Choose `13` to view seed profile stats.
## Running Tests

15
docs/README.md Normal file
View File

@@ -0,0 +1,15 @@
# SeedPass Documentation
This directory contains supplementary guides for using SeedPass.
## Quick Example: Get a TOTP Code
Run `seedpass get-code` to retrieve a time-based one-time password (TOTP). A progress bar shows the remaining seconds in the current period.
```bash
$ seedpass get-code --index 0
[##########----------] 15s
Code: 123456
```
See [advanced_cli.md](advanced_cli.md) for a full command reference.

View File

@@ -4,6 +4,7 @@ from cryptography.fernet import Fernet
from password_manager.encryption import EncryptionManager
from password_manager.vault import Vault
from password_manager.entry_management import EntryManager
from password_manager.backup import BackupManager
from constants import initialize_app
@@ -13,7 +14,8 @@ def main() -> None:
key = Fernet.generate_key()
enc = EncryptionManager(key, Path("."))
vault = Vault(enc, Path("."))
manager = EntryManager(vault, Path("."))
backup_mgr = BackupManager(Path("."))
manager = EntryManager(vault, backup_mgr)
index = manager.add_entry(
"Example Website",

View File

@@ -68,6 +68,10 @@
<li><i class="fas fa-exchange-alt" aria-hidden="true"></i> Seed/Fingerprint switching for managing multiple profiles</li>
<li><i class="fas fa-check" aria-hidden="true"></i> Checksum verification to ensure script integrity</li>
<li><i class="fas fa-terminal" aria-hidden="true"></i> Interactive TUI for managing entries and settings</li>
<li><i class="fas fa-shield-alt" aria-hidden="true"></i> Issue or import TOTP secrets for 2FA</li>
<li><i class="fas fa-file-export" aria-hidden="true"></i> Export your 2FA codes to an encrypted file</li>
<li><i class="fas fa-folder-open" aria-hidden="true"></i> Optional external backup location</li>
<li><i class="fas fa-lock" aria-hidden="true"></i> Auto-lock after inactivity</li>
</ul>
</div>
</section>
@@ -101,10 +105,11 @@ Select an option:
1. Add Entry
2. Retrieve Entry
3. Modify an Existing Entry
4. Settings
5. Exit
4. 2FA Codes
5. Settings
6. Exit
Enter your choice (1-5):
Enter your choice (1-6):
</pre>
</div>
</section>

View File

@@ -90,8 +90,8 @@ body.dark-mode {
/* Dark Mode Toggle */
.dark-mode-toggle {
position: fixed;
top: 12px;
right: 20px;
bottom: 12px;
left: 20px;
z-index: 1000;
}
@@ -856,8 +856,8 @@ footer .social-media a:focus {
}
.dark-mode-toggle {
top: 10px;
right: 10px;
bottom: 10px;
left: 10px;
}
/* Adjust disclaimer container padding on smaller screens */

View File

@@ -11,6 +11,7 @@ logger = logging.getLogger(__name__)
# -----------------------------------
MAX_RETRIES = 3 # Maximum number of retries for relay connections
RETRY_DELAY = 5 # Seconds to wait before retrying a failed connection
MIN_HEALTHY_RELAYS = 2 # Minimum relays that should return data on startup
# -----------------------------------
# Application Directory and Paths

View File

@@ -222,6 +222,17 @@ def handle_display_npub(password_manager: PasswordManager):
print(colored(f"Error: Failed to display npub: {e}", "red"))
def handle_display_stats(password_manager: PasswordManager) -> None:
"""Print seed profile statistics."""
try:
display_fn = getattr(password_manager, "display_stats", None)
if callable(display_fn):
display_fn()
except Exception as e: # pragma: no cover - display best effort
logging.error(f"Failed to display stats: {e}", exc_info=True)
print(colored(f"Error: Failed to display stats: {e}", "red"))
def handle_post_to_nostr(
password_manager: PasswordManager, alt_summary: str | None = None
):
@@ -422,6 +433,54 @@ def handle_set_inactivity_timeout(password_manager: PasswordManager) -> None:
print(colored(f"Error: {e}", "red"))
def handle_set_additional_backup_location(pm: PasswordManager) -> None:
"""Configure an optional second backup directory."""
cfg_mgr = pm.config_manager
if cfg_mgr is None:
print(colored("Configuration manager unavailable.", "red"))
return
try:
current = cfg_mgr.get_additional_backup_path()
if current:
print(colored(f"Current path: {current}", "cyan"))
else:
print(colored("No additional backup location configured.", "cyan"))
except Exception as e:
logging.error(f"Error loading backup path: {e}")
print(colored(f"Error: {e}", "red"))
return
value = input(
"Enter directory for extra backups (leave blank to disable): "
).strip()
if not value:
try:
cfg_mgr.set_additional_backup_path(None)
print(colored("Additional backup location disabled.", "green"))
except Exception as e:
logging.error(f"Error clearing path: {e}")
print(colored(f"Error: {e}", "red"))
return
try:
path = Path(value).expanduser()
path.mkdir(parents=True, exist_ok=True)
test_file = path / ".seedpass_write_test"
with open(test_file, "w") as f:
f.write("test")
test_file.unlink()
except Exception as e:
print(colored(f"Path not writable: {e}", "red"))
return
try:
cfg_mgr.set_additional_backup_path(str(path))
print(colored(f"Additional backups will be copied to {path}", "green"))
except Exception as e:
logging.error(f"Error saving backup path: {e}")
print(colored(f"Error: {e}", "red"))
def handle_profiles_menu(password_manager: PasswordManager) -> None:
"""Submenu for managing seed profiles."""
while True:
@@ -503,9 +562,12 @@ def handle_settings(password_manager: PasswordManager) -> None:
print("5. Backup Parent Seed")
print("6. Export database")
print("7. Import database")
print("8. Set inactivity timeout")
print("9. Lock Vault")
print("10. Back")
print("8. Export 2FA codes")
print("9. Set additional backup location")
print("10. Set inactivity timeout")
print("11. Lock Vault")
print("12. Back")
print("13. Stats")
choice = input("Select an option: ").strip()
if choice == "1":
handle_profiles_menu(password_manager)
@@ -524,13 +586,19 @@ def handle_settings(password_manager: PasswordManager) -> None:
if path:
password_manager.handle_import_database(Path(path))
elif choice == "8":
handle_set_inactivity_timeout(password_manager)
password_manager.handle_export_totp_codes()
elif choice == "9":
handle_set_additional_backup_location(password_manager)
elif choice == "10":
handle_set_inactivity_timeout(password_manager)
elif choice == "11":
password_manager.lock_vault()
print(colored("Vault locked. Please re-enter your password.", "yellow"))
password_manager.unlock_vault()
elif choice == "10":
elif choice == "12":
break
elif choice == "13":
handle_display_stats(password_manager)
else:
print(colored("Invalid choice.", "red"))
@@ -548,9 +616,13 @@ def display_menu(
1. Add Entry
2. Retrieve Entry
3. Modify an Existing Entry
4. Settings
5. Exit
4. 2FA Codes
5. Settings
6. Exit
"""
display_fn = getattr(password_manager, "display_stats", None)
if callable(display_fn):
display_fn()
while True:
if time.time() - password_manager.last_activity > inactivity_timeout:
print(colored("Session timed out. Vault locked.", "yellow"))
@@ -571,7 +643,7 @@ def display_menu(
print(colored(menu, "cyan"))
try:
choice = timed_input(
"Enter your choice (1-5): ", inactivity_timeout
"Enter your choice (1-6): ", inactivity_timeout
).strip()
except TimeoutError:
print(colored("Session timed out. Vault locked.", "yellow"))
@@ -582,7 +654,7 @@ def display_menu(
if not choice:
print(
colored(
"No input detected. Please enter a number between 1 and 5.",
"No input detected. Please enter a number between 1 and 6.",
"yellow",
)
)
@@ -591,13 +663,17 @@ def display_menu(
while True:
print("\nAdd Entry:")
print("1. Password")
print("2. Back")
print("2. 2FA (TOTP)")
print("3. Back")
sub_choice = input("Select entry type: ").strip()
password_manager.update_activity()
if sub_choice == "1":
password_manager.handle_add_password()
break
elif sub_choice == "2":
password_manager.handle_add_totp()
break
elif sub_choice == "3":
break
else:
print(colored("Invalid choice.", "red"))
@@ -609,8 +685,11 @@ def display_menu(
password_manager.handle_modify_entry()
elif choice == "4":
password_manager.update_activity()
handle_settings(password_manager)
password_manager.handle_display_totp_codes()
elif choice == "5":
password_manager.update_activity()
handle_settings(password_manager)
elif choice == "6":
logging.info("Exiting the program.")
print(colored("Exiting the program.", "green"))
password_manager.nostr_client.close_client_pool()

View File

@@ -8,6 +8,7 @@ from typing import List, Optional, Tuple
import hashlib
import asyncio
import gzip
import websockets
# Imports from the nostr-sdk library
from nostr_sdk import (
@@ -137,6 +138,42 @@ class NostrClient:
await self.client.connect()
logger.info(f"NostrClient connected to relays: {self.relays}")
async def _ping_relay(self, relay: str, timeout: float) -> bool:
"""Attempt to retrieve the latest event from a single relay."""
sub_id = "seedpass-health"
pubkey = self.keys.public_key().to_hex()
req = json.dumps(
["REQ", sub_id, {"kinds": [1], "authors": [pubkey], "limit": 1}]
)
try:
async with websockets.connect(
relay, open_timeout=timeout, close_timeout=timeout
) as ws:
await ws.send(req)
while True:
msg = await asyncio.wait_for(ws.recv(), timeout=timeout)
data = json.loads(msg)
if data[0] == "EVENT":
return True
if data[0] == "EOSE":
return False
except Exception:
return False
async def _check_relay_health(self, min_relays: int, timeout: float) -> int:
tasks = [self._ping_relay(r, timeout) for r in self.relays]
results = await asyncio.gather(*tasks, return_exceptions=True)
healthy = sum(1 for r in results if r is True)
if healthy < min_relays:
logger.warning(
"Only %s relays responded with data; consider adding more.", healthy
)
return healthy
def check_relay_health(self, min_relays: int = 2, timeout: float = 5.0) -> int:
"""Ping relays and return the count of those providing data."""
return asyncio.run(self._check_relay_health(min_relays, timeout))
def publish_json_to_nostr(
self,
encrypted_json: bytes,

View File

@@ -36,7 +36,7 @@ class EventHandler:
# Assuming evt.created_at is always an integer Unix timestamp
if isinstance(evt.created_at, int):
created_at_str = time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(evt.created_at)
"%Y-%m-%d %H:%M:%S", time.gmtime(evt.created_at)
)
else:
# Handle unexpected types gracefully

View File

@@ -19,6 +19,8 @@ import traceback
from pathlib import Path
from termcolor import colored
from password_manager.config_manager import ConfigManager
from utils.file_lock import exclusive_lock
from constants import APP_DIR
@@ -37,14 +39,18 @@ class BackupManager:
BACKUP_FILENAME_TEMPLATE = "entries_db_backup_{timestamp}.json.enc"
def __init__(self, fingerprint_dir: Path):
"""
Initializes the BackupManager with the fingerprint directory.
def __init__(self, fingerprint_dir: Path, config_manager: ConfigManager):
"""Initialize BackupManager for a specific profile.
Parameters:
fingerprint_dir (Path): The directory corresponding to the fingerprint.
Parameters
----------
fingerprint_dir : Path
Directory for this profile.
config_manager : ConfigManager
Configuration manager used for retrieving settings.
"""
self.fingerprint_dir = fingerprint_dir
self.config_manager = config_manager
self.backup_dir = self.fingerprint_dir / "backups"
self.backup_dir.mkdir(parents=True, exist_ok=True)
self.index_file = self.fingerprint_dir / "seedpass_entries_db.json.enc"
@@ -72,10 +78,30 @@ class BackupManager:
shutil.copy2(index_file, backup_file)
logger.info(f"Backup created successfully at '{backup_file}'.")
print(colored(f"Backup created successfully at '{backup_file}'.", "green"))
self._create_additional_backup(backup_file)
except Exception as e:
logger.error(f"Failed to create backup: {e}", exc_info=True)
print(colored(f"Error: Failed to create backup: {e}", "red"))
def _create_additional_backup(self, backup_file: Path) -> None:
"""Write a copy of *backup_file* to the configured secondary location."""
path = self.config_manager.get_additional_backup_path()
if not path:
return
try:
dest_dir = Path(path).expanduser()
dest_dir.mkdir(parents=True, exist_ok=True)
dest_file = dest_dir / f"{self.fingerprint_dir.name}_{backup_file.name}"
shutil.copy2(backup_file, dest_file)
logger.info(f"Additional backup created at '{dest_file}'.")
except Exception as e: # pragma: no cover - best-effort logging
logger.error(
f"Failed to write additional backup to '{path}': {e}",
exc_info=True,
)
def restore_latest_backup(self) -> None:
try:
backup_files = sorted(

View File

@@ -44,6 +44,7 @@ class ConfigManager:
"pin_hash": "",
"password_hash": "",
"inactivity_timeout": INACTIVITY_TIMEOUT,
"additional_backup_path": "",
}
try:
data = self.vault.load_config()
@@ -54,6 +55,7 @@ class ConfigManager:
data.setdefault("pin_hash", "")
data.setdefault("password_hash", "")
data.setdefault("inactivity_timeout", INACTIVITY_TIMEOUT)
data.setdefault("additional_backup_path", "")
# Migrate legacy hashed_password.enc if present and password_hash is missing
legacy_file = self.fingerprint_dir / "hashed_password.enc"
@@ -130,3 +132,15 @@ class ConfigManager:
"""Retrieve the inactivity timeout setting in seconds."""
config = self.load_config(require_pin=False)
return float(config.get("inactivity_timeout", INACTIVITY_TIMEOUT))
def set_additional_backup_path(self, path: Optional[str]) -> None:
"""Persist an optional additional backup path in the config."""
config = self.load_config(require_pin=False)
config["additional_backup_path"] = path or ""
self.save_config(config)
def get_additional_backup_path(self) -> Optional[str]:
"""Retrieve the additional backup path if configured."""
config = self.load_config(require_pin=False)
value = config.get("additional_backup_path", "")
return value or None

View File

@@ -19,19 +19,17 @@ import json
import logging
import hashlib
import sys
import os
import shutil
import time
import traceback
from typing import Optional, Tuple, Dict, Any, List
from pathlib import Path
from termcolor import colored
from password_manager.migrations import LATEST_VERSION
from password_manager.entry_types import EntryType
from password_manager.totp import TotpManager
from password_manager.vault import Vault
from utils.file_lock import exclusive_lock
from password_manager.backup import BackupManager
# Instantiate the logger
@@ -39,15 +37,16 @@ logger = logging.getLogger(__name__)
class EntryManager:
def __init__(self, vault: Vault, fingerprint_dir: Path):
"""
Initializes the EntryManager with the EncryptionManager and fingerprint directory.
def __init__(self, vault: Vault, backup_manager: BackupManager):
"""Initialize the EntryManager.
:param vault: The Vault instance for file access.
:param fingerprint_dir: The directory corresponding to the fingerprint.
Parameters:
vault: The Vault instance for file access.
backup_manager: Manages creation of entry database backups.
"""
self.vault = vault
self.fingerprint_dir = fingerprint_dir
self.backup_manager = backup_manager
self.fingerprint_dir = backup_manager.fingerprint_dir
# Use paths relative to the fingerprint directory
self.index_file = self.fingerprint_dir / "seedpass_entries_db.json.enc"
@@ -59,6 +58,9 @@ class EntryManager:
if self.index_file.exists():
try:
data = self.vault.load_index()
# Ensure legacy entries without a type are treated as passwords
for entry in data.get("entries", {}).values():
entry.setdefault("type", EntryType.PASSWORD.value)
logger.debug("Index loaded successfully.")
return data
except Exception as e:
@@ -137,7 +139,7 @@ class EntryManager:
self._save_index(data)
self.update_checksum()
self.backup_index_file()
self.backup_manager.create_backup()
logger.info(f"Entry added successfully at index {index}.")
print(colored(f"[+] Entry added successfully at index {index}.", "green"))
@@ -149,16 +151,63 @@ class EntryManager:
print(colored(f"Error: Failed to add entry: {e}", "red"))
sys.exit(1)
def add_totp(self, notes: str = "") -> int:
"""Placeholder for adding a TOTP entry."""
index = self.get_next_index()
def get_next_totp_index(self) -> int:
"""Return the next available derivation index for TOTP secrets."""
data = self.vault.load_index()
entries = data.get("entries", {})
indices = [
int(v.get("index", 0))
for v in entries.values()
if v.get("type") == EntryType.TOTP.value
]
return (max(indices) + 1) if indices else 0
def add_totp(
self,
label: str,
parent_seed: str,
*,
secret: str | None = None,
index: int | None = None,
period: int = 30,
digits: int = 6,
) -> str:
"""Add a new TOTP entry and return the provisioning URI."""
entry_id = self.get_next_index()
data = self.vault.load_index()
data.setdefault("entries", {})
data["entries"][str(index)] = {"type": EntryType.TOTP.value, "notes": notes}
if secret is None:
if index is None:
index = self.get_next_totp_index()
secret = TotpManager.derive_secret(parent_seed, index)
entry = {
"type": EntryType.TOTP.value,
"label": label,
"index": index,
"period": period,
"digits": digits,
}
else:
entry = {
"type": EntryType.TOTP.value,
"label": label,
"secret": secret,
"period": period,
"digits": digits,
}
data["entries"][str(entry_id)] = entry
self._save_index(data)
self.update_checksum()
self.backup_index_file()
raise NotImplementedError("TOTP entry support not implemented yet")
self.backup_manager.create_backup()
try:
return TotpManager.make_otpauth_uri(label, secret, period, digits)
except Exception as e:
logger.error(f"Failed to generate otpauth URI: {e}")
raise
def add_ssh_key(self, notes: str = "") -> int:
"""Placeholder for adding an SSH key entry."""
@@ -168,7 +217,7 @@ class EntryManager:
data["entries"][str(index)] = {"type": EntryType.SSH.value, "notes": notes}
self._save_index(data)
self.update_checksum()
self.backup_index_file()
self.backup_manager.create_backup()
raise NotImplementedError("SSH key entry support not implemented yet")
def add_seed(self, notes: str = "") -> int:
@@ -179,9 +228,32 @@ class EntryManager:
data["entries"][str(index)] = {"type": EntryType.SEED.value, "notes": notes}
self._save_index(data)
self.update_checksum()
self.backup_index_file()
self.backup_manager.create_backup()
raise NotImplementedError("Seed entry support not implemented yet")
def get_totp_code(
self, index: int, parent_seed: str | None = None, timestamp: int | None = None
) -> str:
"""Return the current TOTP code for the specified entry."""
entry = self.retrieve_entry(index)
if not entry or entry.get("type") != EntryType.TOTP.value:
raise ValueError("Entry is not a TOTP entry")
if "secret" in entry:
return TotpManager.current_code_from_secret(entry["secret"], timestamp)
if parent_seed is None:
raise ValueError("Seed required for derived TOTP")
totp_index = int(entry.get("index", 0))
return TotpManager.current_code(parent_seed, totp_index, timestamp)
def get_totp_time_remaining(self, index: int) -> int:
"""Return seconds remaining in the TOTP period for the given entry."""
entry = self.retrieve_entry(index)
if not entry or entry.get("type") != EntryType.TOTP.value:
raise ValueError("Entry is not a TOTP entry")
period = int(entry.get("period", 30))
return TotpManager.time_remaining(period)
def get_encrypted_index(self) -> Optional[bytes]:
"""
Retrieves the encrypted password index file's contents.
@@ -232,15 +304,22 @@ class EntryManager:
url: Optional[str] = None,
blacklisted: Optional[bool] = None,
notes: Optional[str] = None,
*,
label: Optional[str] = None,
period: Optional[int] = None,
digits: Optional[int] = None,
) -> None:
"""
Modifies an existing entry based on the provided index and new values.
:param index: The index number of the entry to modify.
:param username: (Optional) The new username.
:param url: (Optional) The new URL.
:param username: (Optional) The new username (password entries).
:param url: (Optional) The new URL (password entries).
:param blacklisted: (Optional) The new blacklist status.
:param notes: (Optional) New notes to attach to the entry.
:param label: (Optional) The new label for TOTP entries.
:param period: (Optional) The new TOTP period in seconds.
:param digits: (Optional) The new number of digits for TOTP codes.
"""
try:
data = self.vault.load_index()
@@ -258,13 +337,25 @@ class EntryManager:
)
return
if username is not None:
entry["username"] = username
logger.debug(f"Updated username to '{username}' for index {index}.")
entry_type = entry.get("type", EntryType.PASSWORD.value)
if url is not None:
entry["url"] = url
logger.debug(f"Updated URL to '{url}' for index {index}.")
if entry_type == EntryType.TOTP.value:
if label is not None:
entry["label"] = label
logger.debug(f"Updated label to '{label}' for index {index}.")
if period is not None:
entry["period"] = period
logger.debug(f"Updated period to '{period}' for index {index}.")
if digits is not None:
entry["digits"] = digits
logger.debug(f"Updated digits to '{digits}' for index {index}.")
else:
if username is not None:
entry["username"] = username
logger.debug(f"Updated username to '{username}' for index {index}.")
if url is not None:
entry["url"] = url
logger.debug(f"Updated URL to '{url}' for index {index}.")
if blacklisted is not None:
entry["blacklisted"] = blacklisted
@@ -281,7 +372,7 @@ class EntryManager:
self._save_index(data)
self.update_checksum()
self.backup_index_file()
self.backup_manager.create_backup()
logger.info(f"Entry at index {index} modified successfully.")
print(
@@ -295,11 +386,7 @@ class EntryManager:
)
def list_entries(self) -> List[Tuple[int, str, Optional[str], Optional[str], bool]]:
"""
Lists all entries in the index.
:return: A list of tuples containing entry details: (index, website, username, url, blacklisted)
"""
"""List all entries in the index."""
try:
data = self.vault.load_index()
entries_data = data.get("entries", {})
@@ -311,23 +398,48 @@ class EntryManager:
entries = []
for idx, entry in sorted(entries_data.items(), key=lambda x: int(x[0])):
entries.append(
(
int(idx),
entry.get("website", ""),
entry.get("username", ""),
entry.get("url", ""),
entry.get("blacklisted", False),
etype = entry.get("type", EntryType.PASSWORD.value)
if etype == EntryType.TOTP.value:
entries.append(
(int(idx), entry.get("label", ""), None, None, False)
)
else:
entries.append(
(
int(idx),
entry.get("website", ""),
entry.get("username", ""),
entry.get("url", ""),
entry.get("blacklisted", False),
)
)
)
logger.debug(f"Total entries found: {len(entries)}")
for entry in entries:
print(colored(f"Index: {entry[0]}", "cyan"))
print(colored(f" Website: {entry[1]}", "cyan"))
print(colored(f" Username: {entry[2] or 'N/A'}", "cyan"))
print(colored(f" URL: {entry[3] or 'N/A'}", "cyan"))
print(colored(f" Blacklisted: {'Yes' if entry[4] else 'No'}", "cyan"))
for idx, entry in sorted(entries_data.items(), key=lambda x: int(x[0])):
etype = entry.get("type", EntryType.PASSWORD.value)
print(colored(f"Index: {idx}", "cyan"))
if etype == EntryType.TOTP.value:
print(colored(" Type: TOTP", "cyan"))
print(colored(f" Label: {entry.get('label', '')}", "cyan"))
print(colored(f" Derivation Index: {entry.get('index')}", "cyan"))
print(
colored(
f" Period: {entry.get('period', 30)}s Digits: {entry.get('digits', 6)}",
"cyan",
)
)
else:
print(colored(f" Website: {entry.get('website', '')}", "cyan"))
print(
colored(f" Username: {entry.get('username') or 'N/A'}", "cyan")
)
print(colored(f" URL: {entry.get('url') or 'N/A'}", "cyan"))
print(
colored(
f" Blacklisted: {'Yes' if entry.get('blacklisted', False) else 'No'}",
"cyan",
)
)
print("-" * 40)
return entries
@@ -350,7 +462,7 @@ class EntryManager:
logger.debug(f"Deleted entry at index {index}.")
self.vault.save_index(data)
self.update_checksum()
self.backup_index_file()
self.backup_manager.create_backup()
logger.info(f"Entry at index {index} deleted successfully.")
print(
colored(
@@ -396,35 +508,6 @@ class EntryManager:
logger.error(f"Failed to update checksum: {e}", exc_info=True)
print(colored(f"Error: Failed to update checksum: {e}", "red"))
def backup_index_file(self) -> None:
"""
Creates a backup of the encrypted JSON index file to prevent data loss.
"""
try:
# self.index_file already includes the fingerprint directory
index_file_path = self.index_file
if not index_file_path.exists():
logger.warning(
f"Index file '{index_file_path}' does not exist. No backup created."
)
return
timestamp = int(time.time())
backup_filename = f"entries_db_backup_{timestamp}.json.enc"
backup_path = self.fingerprint_dir / backup_filename
with open(index_file_path, "rb") as original_file, open(
backup_path, "wb"
) as backup_file:
shutil.copyfileobj(original_file, backup_file)
logger.debug(f"Backup created at '{backup_path}'.")
print(colored(f"[+] Backup created at '{backup_path}'.", "green"))
except Exception as e:
logger.error(f"Failed to create backup: {e}", exc_info=True)
print(colored(f"Warning: Failed to create backup: {e}", "yellow"))
def restore_from_backup(self, backup_path: str) -> None:
"""
Restores the index file from a specified backup file.

View File

@@ -17,6 +17,7 @@ import os
from typing import Optional
import shutil
import time
import select
from termcolor import colored
from password_manager.encryption import EncryptionManager
@@ -25,18 +26,23 @@ from password_manager.password_generation import PasswordGenerator
from password_manager.backup import BackupManager
from password_manager.vault import Vault
from password_manager.portable_backup import export_backup, import_backup
from password_manager.totp import TotpManager
from password_manager.entry_types import EntryType
from utils.key_derivation import (
derive_key_from_parent_seed,
derive_key_from_password,
derive_index_key,
EncryptionMode,
)
from utils.checksum import calculate_checksum, verify_checksum
from utils.checksum import calculate_checksum, verify_checksum, json_checksum
from utils.password_prompt import (
prompt_for_password,
prompt_existing_password,
prompt_new_password,
confirm_action,
)
from utils.memory_protection import InMemorySecret
from constants import MIN_HEALTHY_RELAYS
from constants import (
APP_DIR,
@@ -89,7 +95,7 @@ class PasswordManager:
self.backup_manager: Optional[BackupManager] = None
self.vault: Optional[Vault] = None
self.fingerprint_manager: Optional[FingerprintManager] = None
self.parent_seed: Optional[str] = None
self._parent_seed_secret: Optional[InMemorySecret] = None
self.bip85: Optional[BIP85] = None
self.nostr_client: Optional[NostrClient] = None
self.config_manager: Optional[ConfigManager] = None
@@ -110,6 +116,22 @@ class PasswordManager:
# Set the current fingerprint directory
self.fingerprint_dir = self.fingerprint_manager.get_current_fingerprint_dir()
@property
def parent_seed(self) -> Optional[str]:
"""Return the decrypted parent seed if set."""
if self._parent_seed_secret is None:
return None
return self._parent_seed_secret.get_str()
@parent_seed.setter
def parent_seed(self, value: Optional[str]) -> None:
if value is None:
if self._parent_seed_secret:
self._parent_seed_secret.wipe()
self._parent_seed_secret = None
else:
self._parent_seed_secret = InMemorySecret(value.encode("utf-8"))
def update_activity(self) -> None:
"""Record the current time as the last user activity."""
self.last_activity = time.time()
@@ -729,10 +751,18 @@ class PasswordManager:
raise ValueError("EncryptionManager is not initialized.")
# Reinitialize the managers with the updated EncryptionManager and current fingerprint context
self.entry_manager = EntryManager(
self.config_manager = ConfigManager(
vault=self.vault,
fingerprint_dir=self.fingerprint_dir,
)
self.backup_manager = BackupManager(
fingerprint_dir=self.fingerprint_dir,
config_manager=self.config_manager,
)
self.entry_manager = EntryManager(
vault=self.vault,
backup_manager=self.backup_manager,
)
self.password_generator = PasswordGenerator(
encryption_manager=self.encryption_manager,
@@ -740,13 +770,7 @@ class PasswordManager:
bip85=self.bip85,
)
self.backup_manager = BackupManager(fingerprint_dir=self.fingerprint_dir)
# Load relay configuration and initialize NostrClient
self.config_manager = ConfigManager(
vault=self.vault,
fingerprint_dir=self.fingerprint_dir,
)
config = self.config_manager.load_config()
relay_list = config.get("relays", list(DEFAULT_RELAYS))
self.inactivity_timeout = config.get(
@@ -760,6 +784,17 @@ class PasswordManager:
parent_seed=getattr(self, "parent_seed", None),
)
if hasattr(self.nostr_client, "check_relay_health"):
healthy = self.nostr_client.check_relay_health(MIN_HEALTHY_RELAYS)
if healthy < MIN_HEALTHY_RELAYS:
print(
colored(
f"Only {healthy} relay(s) responded with your latest event."
" Consider adding more relays via Settings.",
"yellow",
)
)
logger.debug("Managers re-initialized for the new fingerprint.")
except Exception as e:
@@ -862,6 +897,101 @@ class PasswordManager:
logging.error(f"Error during password generation: {e}", exc_info=True)
print(colored(f"Error: Failed to generate password: {e}", "red"))
def handle_add_totp(self) -> None:
"""Add a TOTP entry either derived from the seed or imported."""
try:
while True:
print("\nAdd TOTP:")
print("1. Make 2FA (derive from seed)")
print("2. Import 2FA (paste otpauth URI or secret)")
print("3. Back")
choice = input("Select option: ").strip()
if choice == "1":
label = input("Label: ").strip()
if not label:
print(colored("Error: Label cannot be empty.", "red"))
continue
period = input("Period (default 30): ").strip() or "30"
digits = input("Digits (default 6): ").strip() or "6"
if not period.isdigit() or not digits.isdigit():
print(
colored("Error: Period and digits must be numbers.", "red")
)
continue
totp_index = self.entry_manager.get_next_totp_index()
entry_id = self.entry_manager.get_next_index()
uri = self.entry_manager.add_totp(
label,
self.parent_seed,
index=totp_index,
period=int(period),
digits=int(digits),
)
secret = TotpManager.derive_secret(self.parent_seed, totp_index)
self.is_dirty = True
self.last_update = time.time()
print(
colored(
f"\n[+] TOTP entry added with ID {entry_id}.\n", "green"
)
)
print(colored("Add this URI to your authenticator app:", "cyan"))
print(colored(uri, "yellow"))
print(colored(f"Secret: {secret}\n", "cyan"))
try:
self.sync_vault()
except Exception as nostr_error:
logging.error(
f"Failed to post updated index to Nostr: {nostr_error}",
exc_info=True,
)
break
elif choice == "2":
raw = input("Paste otpauth URI or secret: ").strip()
try:
if raw.lower().startswith("otpauth://"):
label, secret, period, digits = TotpManager.parse_otpauth(
raw
)
else:
label = input("Label: ").strip()
secret = raw.upper()
period = int(input("Period (default 30): ").strip() or 30)
digits = int(input("Digits (default 6): ").strip() or 6)
entry_id = self.entry_manager.get_next_index()
uri = self.entry_manager.add_totp(
label,
self.parent_seed,
secret=secret,
period=period,
digits=digits,
)
self.is_dirty = True
self.last_update = time.time()
print(
colored(
f"\nImported \u2714 Codes for {label} are now stored in SeedPass at ID {entry_id}.",
"green",
)
)
try:
self.sync_vault()
except Exception as nostr_error:
logging.error(
f"Failed to post updated index to Nostr: {nostr_error}",
exc_info=True,
)
break
except ValueError as err:
print(colored(f"Error: {err}", "red"))
elif choice == "3":
return
else:
print(colored("Invalid choice.", "red"))
except Exception as e:
logging.error(f"Error during TOTP setup: {e}", exc_info=True)
print(colored(f"Error: Failed to add TOTP: {e}", "red"))
def handle_retrieve_entry(self) -> None:
"""
Handles retrieving a password from the index by prompting the user for the index number
@@ -876,21 +1006,62 @@ class PasswordManager:
return
index = int(index_input)
# Retrieve entry details
entry = self.entry_manager.retrieve_entry(index)
if not entry:
return
# Display entry details
entry_type = entry.get("type", EntryType.PASSWORD.value)
if entry_type == EntryType.TOTP.value:
label = entry.get("label", "")
period = int(entry.get("period", 30))
notes = entry.get("notes", "")
print(colored(f"Retrieving 2FA code for '{label}'.", "cyan"))
print(colored("Press 'b' then Enter to return to the menu.", "cyan"))
try:
while True:
code = self.entry_manager.get_totp_code(index, self.parent_seed)
print(colored("\n[+] Retrieved 2FA Code:\n", "green"))
print(colored(f"Label: {label}", "cyan"))
print(colored(f"Code: {code}", "yellow"))
if notes:
print(colored(f"Notes: {notes}", "cyan"))
remaining = self.entry_manager.get_totp_time_remaining(index)
exit_loop = False
while remaining > 0:
filled = int(20 * (period - remaining) / period)
bar = "[" + "#" * filled + "-" * (20 - filled) + "]"
sys.stdout.write(f"\r{bar} {remaining:2d}s")
sys.stdout.flush()
try:
if (
sys.stdin
in select.select([sys.stdin], [], [], 1)[0]
):
user_input = sys.stdin.readline().strip().lower()
if user_input == "b":
exit_loop = True
break
except KeyboardInterrupt:
exit_loop = True
print()
break
remaining -= 1
sys.stdout.write("\n")
sys.stdout.flush()
if exit_loop:
break
except Exception as e:
logging.error(f"Error generating TOTP code: {e}", exc_info=True)
print(colored(f"Error: Failed to generate TOTP code: {e}", "red"))
return
website_name = entry.get("website")
length = entry.get("length")
username = entry.get("username")
url = entry.get("url")
blacklisted = entry.get("blacklisted")
notes = entry.get("notes", "")
notes = entry.get("notes", "")
notes = entry.get("notes", "")
notes = entry.get("notes", "")
print(
colored(
@@ -910,10 +1081,8 @@ class PasswordManager:
)
)
# Generate the password
password = self.password_generator.generate_password(length, index)
# Display the password and associated details
if password:
print(
colored(f"\n[+] Retrieved Password for {website_name}:\n", "green")
@@ -952,76 +1121,165 @@ class PasswordManager:
if not entry:
return
website_name = entry.get("website")
length = entry.get("length")
username = entry.get("username")
url = entry.get("url")
blacklisted = entry.get("blacklisted")
notes = entry.get("notes", "")
entry_type = entry.get("type", EntryType.PASSWORD.value)
# Display current values
print(
colored(
f"Modifying entry for '{website_name}' (Index: {index}):", "cyan"
)
)
print(colored(f"Current Username: {username or 'N/A'}", "cyan"))
print(colored(f"Current URL: {url or 'N/A'}", "cyan"))
print(
colored(
f"Current Blacklist Status: {'Blacklisted' if blacklisted else 'Not Blacklisted'}",
"cyan",
)
)
if entry_type == EntryType.TOTP.value:
label = entry.get("label", "")
period = int(entry.get("period", 30))
digits = int(entry.get("digits", 6))
blacklisted = entry.get("blacklisted", False)
notes = entry.get("notes", "")
# Prompt for new values (optional)
new_username = (
input(
f'Enter new username (leave blank to keep "{username or "N/A"}"): '
).strip()
or username
)
new_url = (
input(f'Enter new URL (leave blank to keep "{url or "N/A"}"): ').strip()
or url
)
blacklist_input = (
input(
f'Is this password blacklisted? (Y/N, current: {"Y" if blacklisted else "N"}): '
)
.strip()
.lower()
)
if blacklist_input == "":
new_blacklisted = blacklisted
elif blacklist_input == "y":
new_blacklisted = True
elif blacklist_input == "n":
new_blacklisted = False
else:
print(
colored(
"Invalid input for blacklist status. Keeping the current status.",
"yellow",
f"Modifying 2FA entry '{label}' (Index: {index}):",
"cyan",
)
)
new_blacklisted = blacklisted
new_notes = (
input(
f'Enter new notes (leave blank to keep "{notes or "N/A"}"): '
print(colored(f"Current Period: {period}s", "cyan"))
print(colored(f"Current Digits: {digits}", "cyan"))
print(
colored(
f"Current Blacklist Status: {'Blacklisted' if blacklisted else 'Not Blacklisted'}",
"cyan",
)
)
new_label = (
input(f'Enter new label (leave blank to keep "{label}"): ').strip()
or label
)
period_input = input(
f"Enter new period in seconds (current: {period}): "
).strip()
or notes
)
new_period = period
if period_input:
if period_input.isdigit():
new_period = int(period_input)
else:
print(
colored("Invalid period value. Keeping current.", "yellow")
)
digits_input = input(
f"Enter new digit count (current: {digits}): "
).strip()
new_digits = digits
if digits_input:
if digits_input.isdigit():
new_digits = int(digits_input)
else:
print(
colored(
"Invalid digits value. Keeping current.",
"yellow",
)
)
blacklist_input = (
input(
f'Is this 2FA code blacklisted? (Y/N, current: {"Y" if blacklisted else "N"}): '
)
.strip()
.lower()
)
if blacklist_input == "":
new_blacklisted = blacklisted
elif blacklist_input == "y":
new_blacklisted = True
elif blacklist_input == "n":
new_blacklisted = False
else:
print(
colored(
"Invalid input for blacklist status. Keeping the current status.",
"yellow",
)
)
new_blacklisted = blacklisted
# Update the entry
self.entry_manager.modify_entry(
index,
new_username,
new_url,
new_blacklisted,
new_notes,
)
new_notes = (
input(
f'Enter new notes (leave blank to keep "{notes or "N/A"}"): '
).strip()
or notes
)
self.entry_manager.modify_entry(
index,
blacklisted=new_blacklisted,
notes=new_notes,
label=new_label,
period=new_period,
digits=new_digits,
)
else:
website_name = entry.get("website")
username = entry.get("username")
url = entry.get("url")
blacklisted = entry.get("blacklisted")
notes = entry.get("notes", "")
print(
colored(
f"Modifying entry for '{website_name}' (Index: {index}):",
"cyan",
)
)
print(colored(f"Current Username: {username or 'N/A'}", "cyan"))
print(colored(f"Current URL: {url or 'N/A'}", "cyan"))
print(
colored(
f"Current Blacklist Status: {'Blacklisted' if blacklisted else 'Not Blacklisted'}",
"cyan",
)
)
new_username = (
input(
f'Enter new username (leave blank to keep "{username or "N/A"}"): '
).strip()
or username
)
new_url = (
input(
f'Enter new URL (leave blank to keep "{url or "N/A"}"): '
).strip()
or url
)
blacklist_input = (
input(
f'Is this password blacklisted? (Y/N, current: {"Y" if blacklisted else "N"}): '
)
.strip()
.lower()
)
if blacklist_input == "":
new_blacklisted = blacklisted
elif blacklist_input == "y":
new_blacklisted = True
elif blacklist_input == "n":
new_blacklisted = False
else:
print(
colored(
"Invalid input for blacklist status. Keeping the current status.",
"yellow",
)
)
new_blacklisted = blacklisted
new_notes = (
input(
f'Enter new notes (leave blank to keep "{notes or "N/A"}"): '
).strip()
or notes
)
self.entry_manager.modify_entry(
index,
new_username,
new_url,
new_blacklisted,
new_notes,
)
# Mark database as dirty for background sync
self.is_dirty = True
@@ -1082,6 +1340,61 @@ class PasswordManager:
logging.error(f"Error during entry deletion: {e}", exc_info=True)
print(colored(f"Error: Failed to delete entry: {e}", "red"))
def handle_display_totp_codes(self) -> None:
"""Display all stored TOTP codes with a countdown progress bar."""
try:
data = self.entry_manager.vault.load_index()
entries = data.get("entries", {})
totp_list: list[tuple[str, int, int, bool]] = []
for idx_str, entry in entries.items():
if entry.get("type") == EntryType.TOTP.value and not entry.get(
"blacklisted", False
):
label = entry.get("label", "")
period = int(entry.get("period", 30))
imported = "secret" in entry
totp_list.append((label, int(idx_str), period, imported))
if not totp_list:
print(colored("No 2FA entries found.", "yellow"))
return
totp_list.sort(key=lambda t: t[0].lower())
print(colored("Press 'b' then Enter to return to the menu.", "cyan"))
while True:
print("\033c", end="")
print(colored("Press 'b' then Enter to return to the menu.", "cyan"))
generated = [t for t in totp_list if not t[3]]
imported_list = [t for t in totp_list if t[3]]
if generated:
print(colored("\nGenerated 2FA Codes:", "green"))
for label, idx, period, _ in generated:
code = self.entry_manager.get_totp_code(idx, self.parent_seed)
remaining = self.entry_manager.get_totp_time_remaining(idx)
filled = int(20 * (period - remaining) / period)
bar = "[" + "#" * filled + "-" * (20 - filled) + "]"
print(f"[{idx}] {label}: {code} {bar} {remaining:2d}s")
if imported_list:
print(colored("\nImported 2FA Codes:", "green"))
for label, idx, period, _ in imported_list:
code = self.entry_manager.get_totp_code(idx, self.parent_seed)
remaining = self.entry_manager.get_totp_time_remaining(idx)
filled = int(20 * (period - remaining) / period)
bar = "[" + "#" * filled + "-" * (20 - filled) + "]"
print(f"[{idx}] {label}: {code} {bar} {remaining:2d}s")
sys.stdout.flush()
try:
if sys.stdin in select.select([sys.stdin], [], [], 1)[0]:
user_input = sys.stdin.readline().strip().lower()
if user_input == "b":
break
except KeyboardInterrupt:
print()
break
except Exception as e:
logging.error(f"Error displaying TOTP codes: {e}", exc_info=True)
print(colored(f"Error: Failed to display TOTP codes: {e}", "red"))
def handle_verify_checksum(self) -> None:
"""
Handles verifying the script's checksum against the stored checksum to ensure integrity.
@@ -1237,6 +1550,63 @@ class PasswordManager:
logging.error(f"Failed to import database: {e}", exc_info=True)
print(colored(f"Error: Failed to import database: {e}", "red"))
def handle_export_totp_codes(self) -> Path | None:
"""Export all 2FA codes to a JSON file for other authenticator apps."""
try:
data = self.entry_manager.vault.load_index()
entries = data.get("entries", {})
totp_entries = []
for entry in entries.values():
if entry.get("type") == EntryType.TOTP.value:
label = entry.get("label", "")
period = int(entry.get("period", 30))
digits = int(entry.get("digits", 6))
if "secret" in entry:
secret = entry["secret"]
else:
idx = int(entry.get("index", 0))
secret = TotpManager.derive_secret(self.parent_seed, idx)
uri = TotpManager.make_otpauth_uri(label, secret, period, digits)
totp_entries.append(
{
"label": label,
"secret": secret,
"period": period,
"digits": digits,
"uri": uri,
}
)
if not totp_entries:
print(colored("No 2FA codes to export.", "yellow"))
return None
dest_str = input(
"Enter destination file path (default: totp_export.json): "
).strip()
dest = Path(dest_str) if dest_str else Path("totp_export.json")
json_data = json.dumps({"entries": totp_entries}, indent=2)
if confirm_action("Encrypt export with a password? (Y/N): "):
password = prompt_new_password()
key = derive_key_from_password(password)
enc_mgr = EncryptionManager(key, dest.parent)
data_bytes = enc_mgr.encrypt_data(json_data.encode("utf-8"))
dest = dest.with_suffix(dest.suffix + ".enc")
dest.write_bytes(data_bytes)
else:
dest.write_text(json_data)
os.chmod(dest, 0o600)
print(colored(f"2FA codes exported to '{dest}'.", "green"))
return dest
except Exception as e:
logging.error(f"Failed to export TOTP codes: {e}", exc_info=True)
print(colored(f"Error: Failed to export 2FA codes: {e}", "red"))
return None
def handle_backup_reveal_parent_seed(self) -> None:
"""
Handles the backup and reveal of the parent seed.
@@ -1456,3 +1826,93 @@ class PasswordManager:
except Exception as e:
logging.error(f"Failed to change password: {e}", exc_info=True)
print(colored(f"Error: Failed to change password: {e}", "red"))
def get_profile_stats(self) -> dict:
"""Return various statistics about the current seed profile."""
if not all([self.entry_manager, self.config_manager, self.backup_manager]):
return {}
stats: dict[str, object] = {}
# Entry counts by type
data = self.entry_manager.vault.load_index()
entries = data.get("entries", {})
counts: dict[str, int] = {}
for entry in entries.values():
etype = entry.get("type", EntryType.PASSWORD.value)
counts[etype] = counts.get(etype, 0) + 1
stats["entries"] = counts
stats["total_entries"] = len(entries)
# Schema version and checksum status
stats["schema_version"] = data.get("schema_version")
current_checksum = json_checksum(data)
chk_path = self.entry_manager.checksum_file
if chk_path.exists():
stored = chk_path.read_text().strip()
stats["checksum_ok"] = stored == current_checksum
else:
stored = None
stats["checksum_ok"] = False
stats["checksum"] = stored
# Relay info
cfg = self.config_manager.load_config(require_pin=False)
relays = cfg.get("relays", [])
stats["relays"] = relays
stats["relay_count"] = len(relays)
# Backup info
backups = list(
self.backup_manager.backup_dir.glob("entries_db_backup_*.json.enc")
)
stats["backup_count"] = len(backups)
stats["backup_dir"] = str(self.backup_manager.backup_dir)
stats["additional_backup_path"] = (
self.config_manager.get_additional_backup_path()
)
# Nostr sync info
manifest = getattr(self.nostr_client, "current_manifest", None)
if manifest is not None:
stats["chunk_count"] = len(manifest.chunks)
stats["delta_since"] = manifest.delta_since
else:
stats["chunk_count"] = 0
stats["delta_since"] = None
stats["pending_deltas"] = len(getattr(self.nostr_client, "_delta_events", []))
return stats
def display_stats(self) -> None:
"""Print a summary of :meth:`get_profile_stats` to the console."""
stats = self.get_profile_stats()
if not stats:
print(colored("No statistics available.", "red"))
return
print(colored("\n=== Seed Profile Stats ===", "yellow"))
print(colored(f"Total entries: {stats['total_entries']}", "cyan"))
for etype, count in stats["entries"].items():
print(colored(f" {etype}: {count}", "cyan"))
print(colored(f"Relays configured: {stats['relay_count']}", "cyan"))
print(
colored(
f"Backups: {stats['backup_count']} (dir: {stats['backup_dir']})", "cyan"
)
)
if stats.get("additional_backup_path"):
print(
colored(f"Additional backup: {stats['additional_backup_path']}", "cyan")
)
print(colored(f"Schema version: {stats['schema_version']}", "cyan"))
print(
colored(
f"Checksum ok: {'yes' if stats['checksum_ok'] else 'no'}",
"cyan",
)
)
print(colored(f"Snapshot chunks: {stats['chunk_count']}", "cyan"))
print(colored(f"Pending deltas: {stats['pending_deltas']}", "cyan"))
if stats.get("delta_since"):
print(colored(f"Latest delta id: {stats['delta_since']}", "cyan"))

View File

@@ -335,12 +335,6 @@ class PasswordGenerator:
raise
def derive_totp_secret(bip85: BIP85, idx: int) -> str:
"""Derive a TOTP secret for the given index using BIP85."""
entropy = bip85.derive_entropy(index=idx, bytes_len=10, app_no=2)
return base64.b32encode(entropy).decode("utf-8")
def derive_ssh_key(bip85: BIP85, idx: int) -> bytes:
"""Derive 32 bytes of entropy suitable for an SSH key."""
return bip85.derive_entropy(index=idx, bytes_len=32, app_no=32)

View File

@@ -0,0 +1,83 @@
"""TOTP management utilities for SeedPass."""
from __future__ import annotations
import sys
import time
from urllib.parse import quote
from urllib.parse import urlparse, parse_qs, unquote
import pyotp
from utils import key_derivation
class TotpManager:
"""Helper methods for TOTP secrets and codes."""
@staticmethod
def derive_secret(seed: str, index: int) -> str:
"""Derive a TOTP secret from a BIP39 seed and index."""
return key_derivation.derive_totp_secret(seed, index)
@classmethod
def current_code(cls, seed: str, index: int, timestamp: int | None = None) -> str:
"""Return the TOTP code for the given seed and index."""
secret = cls.derive_secret(seed, index)
totp = pyotp.TOTP(secret)
if timestamp is None:
return totp.now()
return totp.at(timestamp)
@staticmethod
def current_code_from_secret(secret: str, timestamp: int | None = None) -> str:
"""Return the TOTP code for a raw secret."""
totp = pyotp.TOTP(secret)
return totp.now() if timestamp is None else totp.at(timestamp)
@staticmethod
def parse_otpauth(uri: str) -> tuple[str, str, int, int]:
"""Parse an otpauth URI and return (label, secret, period, digits)."""
if not uri.startswith("otpauth://"):
raise ValueError("Not an otpauth URI")
parsed = urlparse(uri)
label = unquote(parsed.path.lstrip("/"))
qs = parse_qs(parsed.query)
secret = qs.get("secret", [""])[0].upper()
period = int(qs.get("period", ["30"])[0])
digits = int(qs.get("digits", ["6"])[0])
if not secret:
raise ValueError("Missing secret in URI")
return label, secret, period, digits
@staticmethod
def make_otpauth_uri(
label: str, secret: str, period: int = 30, digits: int = 6
) -> str:
"""Construct an otpauth:// URI for use with authenticator apps."""
label_enc = quote(label)
return f"otpauth://totp/{label_enc}?secret={secret}&period={period}&digits={digits}"
@staticmethod
def time_remaining(period: int = 30, timestamp: int | None = None) -> int:
"""Return seconds remaining until the current TOTP period resets."""
if timestamp is None:
timestamp = int(time.time())
return period - (timestamp % period)
@classmethod
def print_progress_bar(cls, period: int = 30) -> None:
"""Print a simple progress bar for the current TOTP period."""
remaining = cls.time_remaining(period)
total = period
bar_len = 20
while remaining > 0:
progress = total - remaining
filled = int(bar_len * progress / total)
bar = "[" + "#" * filled + "-" * (bar_len - filled) + "]"
sys.stdout.write(f"\r{bar} {remaining:2d}s")
sys.stdout.flush()
time.sleep(1)
remaining -= 1
sys.stdout.write("\n")
sys.stdout.flush()

View File

@@ -18,3 +18,6 @@ websockets>=15.0.0
tomli
hypothesis
mutmut==2.4.4
pyotp>=2.8.0
freezegun

View File

@@ -0,0 +1,37 @@
import time
from pathlib import Path
from tempfile import TemporaryDirectory
from helpers import create_vault, TEST_SEED, TEST_PASSWORD
from password_manager.entry_management import EntryManager
from password_manager.backup import BackupManager
from password_manager.config_manager import ConfigManager
def test_entry_manager_additional_backup(monkeypatch):
with TemporaryDirectory() as tmpdir, TemporaryDirectory() as extra:
fp_dir = Path(tmpdir)
vault, _ = create_vault(fp_dir, TEST_SEED, TEST_PASSWORD)
cfg_mgr = ConfigManager(vault, fp_dir)
cfg_mgr.set_additional_backup_path(extra)
backup_mgr = BackupManager(fp_dir, cfg_mgr)
entry_mgr = EntryManager(vault, backup_mgr)
monkeypatch.setattr(time, "time", lambda: 1111)
entry_mgr.add_entry("example.com", 12)
backup = fp_dir / "backups" / "entries_db_backup_1111.json.enc"
extra_file = Path(extra) / f"{fp_dir.name}_entries_db_backup_1111.json.enc"
assert backup.exists()
assert extra_file.exists()
cfg_mgr.set_additional_backup_path(None)
monkeypatch.setattr(time, "time", lambda: 2222)
entry_mgr.add_entry("example.org", 8)
backup2 = fp_dir / "backups" / "entries_db_backup_2222.json.enc"
assert backup2.exists()
extra_file2 = Path(extra) / f"{fp_dir.name}_entries_db_backup_2222.json.enc"
assert not extra_file2.exists()

View File

@@ -31,7 +31,7 @@ def test_auto_sync_triggers_post(monkeypatch):
called = True
monkeypatch.setattr(main, "handle_post_to_nostr", fake_post)
monkeypatch.setattr(main, "timed_input", lambda *_: "5")
monkeypatch.setattr(main, "timed_input", lambda *_: "6")
with pytest.raises(SystemExit):
main.display_menu(pm, sync_interval=0.1)

View File

@@ -9,13 +9,15 @@ from helpers import create_vault, TEST_SEED, TEST_PASSWORD
sys.path.append(str(Path(__file__).resolve().parents[1]))
from password_manager.backup import BackupManager
from password_manager.config_manager import ConfigManager
def test_backup_restore_workflow(monkeypatch):
with TemporaryDirectory() as tmpdir:
fp_dir = Path(tmpdir)
vault, enc_mgr = create_vault(fp_dir, TEST_SEED, TEST_PASSWORD)
backup_mgr = BackupManager(fp_dir)
cfg_mgr = ConfigManager(vault, fp_dir)
backup_mgr = BackupManager(fp_dir, cfg_mgr)
index_file = fp_dir / "seedpass_entries_db.json.enc"
@@ -61,3 +63,24 @@ def test_backup_restore_workflow(monkeypatch):
current = vault.load_index()
backup_mgr.restore_backup_by_timestamp(1111)
assert vault.load_index() == current
def test_additional_backup_location(monkeypatch):
with TemporaryDirectory() as tmpdir, TemporaryDirectory() as extra:
fp_dir = Path(tmpdir)
vault, enc_mgr = create_vault(fp_dir, TEST_SEED, TEST_PASSWORD)
cfg_mgr = ConfigManager(vault, fp_dir)
cfg_mgr.set_additional_backup_path(extra)
backup_mgr = BackupManager(fp_dir, cfg_mgr)
vault.save_index({"schema_version": 2, "entries": {"a": {}}})
monkeypatch.setattr(time, "time", lambda: 3333)
backup_mgr.create_backup()
backup = fp_dir / "backups" / "entries_db_backup_3333.json.enc"
assert backup.exists()
extra_file = Path(extra) / f"{fp_dir.name}_entries_db_backup_3333.json.enc"
assert extra_file.exists()
assert extra_file.stat().st_mode & 0o777 == 0o600

View File

@@ -6,10 +6,10 @@ sys.path.append(str(Path(__file__).resolve().parents[1]))
from local_bip85.bip85 import BIP85, Bip85Error
from password_manager.password_generation import (
derive_totp_secret,
derive_ssh_key,
derive_seed_phrase,
)
from utils.key_derivation import derive_totp_secret
MASTER_XPRV = "xprv9s21ZrQH143K2LBWUUQRFXhucrQqBpKdRRxNVq2zBqsx8HVqFk2uYo8kmbaLLHRdqtQpUm98uKfu3vca1LqdGhUtyoFnCNkfmXRyPXLjbKb"
@@ -18,7 +18,7 @@ EXPECTED_12 = "girl mad pet galaxy egg matter matrix prison refuse sense ordinar
EXPECTED_24 = "puppy ocean match cereal symbol another shed magic wrap hammer bulb intact gadget divorce twin tonight reason outdoor destroy simple truth cigar social volcano"
EXPECTED_SYMM_KEY = "7040bb53104f27367f317558e78a994ada7296c6fde36a364e5baf206e502bb1"
EXPECTED_TOTP_SECRET = "OBALWUYQJ4TTM7ZR"
EXPECTED_TOTP_SECRET = "VQYTWDNEWYBY2G3LOGGCEKR4LZ3LNEYY"
EXPECTED_SSH_KEY = "52405cd0dd21c5be78314a7c1a3c65ffd8d896536cc7dee3157db5824f0c92e2"
@@ -39,8 +39,8 @@ def test_bip85_symmetric_key(bip85):
assert bip85.derive_symmetric_key(index=0).hex() == EXPECTED_SYMM_KEY
def test_derive_totp_secret(bip85):
assert derive_totp_secret(bip85, 0) == EXPECTED_TOTP_SECRET
def test_derive_totp_secret():
assert derive_totp_secret(EXPECTED_24, 0) == EXPECTED_TOTP_SECRET
def test_derive_ssh_key(bip85):

View File

@@ -39,6 +39,7 @@ def _make_pm(called, locked=None):
last_activity=time.time(),
nostr_client=SimpleNamespace(close_client_pool=lambda: None),
handle_add_password=add,
handle_add_totp=lambda: None,
handle_retrieve_entry=retrieve,
handle_modify_entry=modify,
update_activity=update,
@@ -51,7 +52,7 @@ def _make_pm(called, locked=None):
def test_empty_and_non_numeric_choice(monkeypatch, capsys):
called = {"add": False, "retrieve": False, "modify": False}
pm, _ = _make_pm(called)
inputs = iter(["", "abc", "5"])
inputs = iter(["", "abc", "6"])
monkeypatch.setattr(main, "timed_input", lambda *_: next(inputs))
with pytest.raises(SystemExit):
main.display_menu(pm, sync_interval=1000, inactivity_timeout=1000)
@@ -64,7 +65,7 @@ def test_empty_and_non_numeric_choice(monkeypatch, capsys):
def test_out_of_range_menu(monkeypatch, capsys):
called = {"add": False, "retrieve": False, "modify": False}
pm, _ = _make_pm(called)
inputs = iter(["9", "5"])
inputs = iter(["9", "6"])
monkeypatch.setattr(main, "timed_input", lambda *_: next(inputs))
with pytest.raises(SystemExit):
main.display_menu(pm, sync_interval=1000, inactivity_timeout=1000)
@@ -76,7 +77,7 @@ def test_out_of_range_menu(monkeypatch, capsys):
def test_invalid_add_entry_submenu(monkeypatch, capsys):
called = {"add": False, "retrieve": False, "modify": False}
pm, _ = _make_pm(called)
inputs = iter(["1", "3", "2", "5"])
inputs = iter(["1", "4", "3", "6"])
monkeypatch.setattr(main, "timed_input", lambda *_: next(inputs))
monkeypatch.setattr("builtins.input", lambda *_: next(inputs))
with pytest.raises(SystemExit):
@@ -91,7 +92,7 @@ def test_inactivity_timeout_loop(monkeypatch, capsys):
pm, locked = _make_pm(called)
pm.last_activity = 0
monkeypatch.setattr(time, "time", lambda: 100.0)
monkeypatch.setattr(main, "timed_input", lambda *_: "5")
monkeypatch.setattr(main, "timed_input", lambda *_: "6")
with pytest.raises(SystemExit):
main.display_menu(pm, sync_interval=1000, inactivity_timeout=0.1)
out = capsys.readouterr().out

View File

@@ -9,6 +9,7 @@ sys.path.append(str(Path(__file__).resolve().parents[1]))
from password_manager.encryption import EncryptionManager
from password_manager.vault import Vault
from password_manager.backup import BackupManager
from password_manager.config_manager import ConfigManager
from utils.key_derivation import derive_index_key, derive_key_from_password
@@ -34,9 +35,12 @@ def _reader(index_key: bytes, dir_path: Path, loops: int, out: Queue) -> None:
out.put(repr(e))
def _backup(dir_path: Path, loops: int, out: Queue) -> None:
def _backup(index_key: bytes, dir_path: Path, loops: int, out: Queue) -> None:
try:
bm = BackupManager(dir_path)
enc = EncryptionManager(index_key, dir_path)
vault = Vault(enc, dir_path)
cfg = ConfigManager(vault, dir_path)
bm = BackupManager(dir_path, cfg)
for _ in range(loops):
bm.create_backup()
except Exception as e: # pragma: no cover - capture
@@ -58,7 +62,7 @@ def test_concurrency_stress(tmp_path: Path, loops: int, _):
Process(target=_writer, args=(index_key, tmp_path, loops, q)),
Process(target=_reader, args=(index_key, tmp_path, loops, q)),
Process(target=_reader, args=(index_key, tmp_path, loops, q)),
Process(target=_backup, args=(tmp_path, loops, q)),
Process(target=_backup, args=(index_key, tmp_path, loops, q)),
]
for p in procs:

View File

@@ -22,6 +22,7 @@ def test_config_defaults_and_round_trip():
assert cfg["relays"] == list(DEFAULT_RELAYS)
assert cfg["pin_hash"] == ""
assert cfg["password_hash"] == ""
assert cfg["additional_backup_path"] == ""
cfg_mgr.set_pin("1234")
cfg_mgr.set_relays(["wss://example.com"], require_pin=False)
@@ -111,3 +112,21 @@ def test_password_hash_migrates_from_file(tmp_path):
(tmp_path / "hashed_password.enc").unlink()
cfg2 = cfg_mgr.load_config(require_pin=False)
assert cfg2["password_hash"] == hashed.decode()
def test_additional_backup_path_round_trip():
with TemporaryDirectory() as tmpdir:
vault, enc_mgr = create_vault(Path(tmpdir), TEST_SEED, TEST_PASSWORD)
cfg_mgr = ConfigManager(vault, Path(tmpdir))
# default should be empty string
assert cfg_mgr.load_config(require_pin=False)["additional_backup_path"] == ""
cfg_mgr.set_additional_backup_path("/tmp/my_backups")
cfg = cfg_mgr.load_config(require_pin=False)
assert cfg["additional_backup_path"] == "/tmp/my_backups"
assert cfg_mgr.get_additional_backup_path() == "/tmp/my_backups"
cfg_mgr.set_additional_backup_path(None)
cfg2 = cfg_mgr.load_config(require_pin=False)
assert cfg2["additional_backup_path"] == ""

View File

@@ -6,13 +6,17 @@ from helpers import create_vault, TEST_SEED, TEST_PASSWORD
sys.path.append(str(Path(__file__).resolve().parents[1]))
from password_manager.entry_management import EntryManager
from password_manager.backup import BackupManager
from password_manager.vault import Vault
from password_manager.config_manager import ConfigManager
def test_list_entries_empty():
with TemporaryDirectory() as tmpdir:
vault, enc_mgr = create_vault(Path(tmpdir), TEST_SEED, TEST_PASSWORD)
entry_mgr = EntryManager(vault, Path(tmpdir))
cfg_mgr = ConfigManager(vault, Path(tmpdir))
backup_mgr = BackupManager(Path(tmpdir), cfg_mgr)
entry_mgr = EntryManager(vault, backup_mgr)
entries = entry_mgr.list_entries()
assert entries == []

View File

@@ -1,19 +1,25 @@
import sys
from pathlib import Path
from tempfile import TemporaryDirectory
from unittest.mock import patch
import pytest
from helpers import create_vault, TEST_SEED, TEST_PASSWORD
sys.path.append(str(Path(__file__).resolve().parents[1]))
from password_manager.entry_management import EntryManager
from password_manager.backup import BackupManager
from password_manager.vault import Vault
from password_manager.config_manager import ConfigManager
def test_add_and_retrieve_entry():
with TemporaryDirectory() as tmpdir:
vault, enc_mgr = create_vault(Path(tmpdir), TEST_SEED, TEST_PASSWORD)
entry_mgr = EntryManager(vault, Path(tmpdir))
cfg_mgr = ConfigManager(vault, Path(tmpdir))
backup_mgr = BackupManager(Path(tmpdir), cfg_mgr)
entry_mgr = EntryManager(vault, backup_mgr)
index = entry_mgr.add_entry("example.com", 12, "user")
entry = entry_mgr.retrieve_entry(index)
@@ -45,10 +51,15 @@ def test_add_and_retrieve_entry():
def test_round_trip_entry_types(method, expected_type):
with TemporaryDirectory() as tmpdir:
vault, enc_mgr = create_vault(Path(tmpdir), TEST_SEED, TEST_PASSWORD)
entry_mgr = EntryManager(vault, Path(tmpdir))
cfg_mgr = ConfigManager(vault, Path(tmpdir))
backup_mgr = BackupManager(Path(tmpdir), cfg_mgr)
entry_mgr = EntryManager(vault, backup_mgr)
if method == "add_entry":
index = entry_mgr.add_entry("example.com", 8)
elif method == "add_totp":
entry_mgr.add_totp("example", TEST_SEED)
index = 0
else:
with pytest.raises(NotImplementedError):
getattr(entry_mgr, method)()
@@ -58,3 +69,20 @@ def test_round_trip_entry_types(method, expected_type):
assert entry["type"] == expected_type
data = enc_mgr.load_json_data(entry_mgr.index_file)
assert data["entries"][str(index)]["type"] == expected_type
def test_legacy_entry_defaults_to_password():
with TemporaryDirectory() as tmpdir:
vault, enc_mgr = create_vault(Path(tmpdir), TEST_SEED, TEST_PASSWORD)
cfg_mgr = ConfigManager(vault, Path(tmpdir))
backup_mgr = BackupManager(Path(tmpdir), cfg_mgr)
entry_mgr = EntryManager(vault, backup_mgr)
index = entry_mgr.add_entry("example.com", 8)
data = enc_mgr.load_json_data(entry_mgr.index_file)
data["entries"][str(index)].pop("type", None)
enc_mgr.save_json_data(data, entry_mgr.index_file)
loaded = entry_mgr._load_index()
assert loaded["entries"][str(index)]["type"] == "password"

View File

@@ -6,14 +6,18 @@ from helpers import create_vault, TEST_SEED, TEST_PASSWORD
sys.path.append(str(Path(__file__).resolve().parents[1]))
from password_manager.entry_management import EntryManager
from password_manager.backup import BackupManager
from password_manager.vault import Vault
from password_manager.config_manager import ConfigManager
def test_update_checksum_writes_to_expected_path():
with TemporaryDirectory() as tmpdir:
tmp_path = Path(tmpdir)
vault, enc_mgr = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD)
entry_mgr = EntryManager(vault, tmp_path)
cfg_mgr = ConfigManager(vault, tmp_path)
backup_mgr = BackupManager(tmp_path, cfg_mgr)
entry_mgr = EntryManager(vault, backup_mgr)
# create an empty index file
vault.save_index({"entries": {}})
@@ -27,10 +31,13 @@ def test_backup_index_file_creates_backup_in_directory():
with TemporaryDirectory() as tmpdir:
tmp_path = Path(tmpdir)
vault, enc_mgr = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD)
entry_mgr = EntryManager(vault, tmp_path)
cfg_mgr = ConfigManager(vault, tmp_path)
backup_mgr = BackupManager(tmp_path, cfg_mgr)
entry_mgr = EntryManager(vault, backup_mgr)
vault.save_index({"entries": {}})
entry_mgr.backup_index_file()
entry_mgr.backup_manager.create_backup()
backups = list(tmp_path.glob("entries_db_backup_*.json.enc"))
backup_dir = tmp_path / "backups"
backups = list(backup_dir.glob("entries_db_backup_*.json.enc"))
assert len(backups) == 1

View File

@@ -0,0 +1,43 @@
import logging
import sys
from pathlib import Path
sys.path.append(str(Path(__file__).resolve().parents[1]))
from nostr import event_handler
class SimpleEvent:
def __init__(self, id: str, created_at: int, content: str) -> None:
self.id = id
self.created_at = created_at
self.content = content
def test_handle_new_event_logs(caplog):
handler = event_handler.EventHandler()
evt = SimpleEvent("1", 0, "hello")
caplog.set_level(logging.INFO, logger=event_handler.logger.name)
handler.handle_new_event(evt)
assert (
"[New Event] ID: 1 | Created At: 1970-01-01 00:00:00 | Content: hello"
in caplog.text
)
def test_handle_new_event_error(monkeypatch, caplog):
handler = event_handler.EventHandler()
evt = SimpleEvent("2", 0, "boom")
def raise_info(*args, **kwargs):
raise RuntimeError("fail")
monkeypatch.setattr(event_handler.logger, "info", raise_info)
caplog.set_level(logging.ERROR, logger=event_handler.logger.name)
handler.handle_new_event(evt)
assert "Error handling new event: fail" in caplog.text

View File

@@ -0,0 +1,57 @@
import sys
from pathlib import Path
from tempfile import TemporaryDirectory
import json
from helpers import create_vault, TEST_SEED, TEST_PASSWORD
sys.path.append(str(Path(__file__).resolve().parents[1]))
from password_manager.entry_management import EntryManager
from password_manager.backup import BackupManager
from password_manager.manager import PasswordManager, EncryptionMode
from password_manager.config_manager import ConfigManager
from password_manager.totp import TotpManager
class FakeNostrClient:
def publish_snapshot(self, data: bytes):
return None, "abcd"
def test_handle_export_totp_codes(monkeypatch, tmp_path):
vault, enc_mgr = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD)
cfg_mgr = ConfigManager(vault, tmp_path)
backup_mgr = BackupManager(tmp_path, cfg_mgr)
entry_mgr = EntryManager(vault, backup_mgr)
pm = PasswordManager.__new__(PasswordManager)
pm.encryption_mode = EncryptionMode.SEED_ONLY
pm.encryption_manager = enc_mgr
pm.vault = vault
pm.entry_manager = entry_mgr
pm.backup_manager = backup_mgr
pm.parent_seed = TEST_SEED
pm.nostr_client = FakeNostrClient()
pm.fingerprint_dir = tmp_path
# add totp entries
entry_mgr.add_totp("Example", TEST_SEED)
entry_mgr.add_totp("Imported", TEST_SEED, secret="JBSWY3DPEHPK3PXP")
export_path = tmp_path / "out.json"
monkeypatch.setattr("builtins.input", lambda *a, **k: str(export_path))
monkeypatch.setattr(
"password_manager.manager.confirm_action", lambda *_a, **_k: False
)
pm.handle_export_totp_codes()
data = json.loads(export_path.read_text())
assert len(data["entries"]) == 2
labels = {e["label"] for e in data["entries"]}
assert {"Example", "Imported"} == labels
# check URI format
uri = data["entries"][0]["uri"]
assert uri.startswith("otpauth://totp/")

View File

@@ -36,7 +36,7 @@ def test_inactivity_triggers_lock(monkeypatch):
unlock_vault=unlock_vault,
)
monkeypatch.setattr(main, "timed_input", lambda *_: "5")
monkeypatch.setattr(main, "timed_input", lambda *_: "6")
with pytest.raises(SystemExit):
main.display_menu(pm, sync_interval=1000, inactivity_timeout=0.1)
@@ -72,7 +72,7 @@ def test_input_timeout_triggers_lock(monkeypatch):
unlock_vault=unlock_vault,
)
responses = iter([TimeoutError(), "5"])
responses = iter([TimeoutError(), "6"])
def fake_input(*_args, **_kwargs):
val = next(responses)

View File

@@ -0,0 +1,29 @@
import pytest
from bech32 import bech32_encode, convertbits
from nostr.key_manager import KeyManager
def test_key_manager_getters(monkeypatch):
priv_hex = "1" * 64
pub_hex = "2" * 64
class DummyKeys:
def public_key_hex(self):
return pub_hex
def private_key_hex(self):
return priv_hex
monkeypatch.setattr(KeyManager, "initialize_bip85", lambda self: None)
monkeypatch.setattr(KeyManager, "generate_nostr_keys", lambda self: DummyKeys())
km = KeyManager("seed", "fp")
assert km.get_public_key_hex() == pub_hex
assert km.get_private_key_hex() == priv_hex
expected_npub = bech32_encode(
"npub", convertbits(bytes.fromhex(pub_hex), 8, 5, True)
)
assert km.get_npub() == expected_npub

View File

@@ -0,0 +1,66 @@
import sys
from pathlib import Path
from tempfile import TemporaryDirectory
from types import SimpleNamespace
from helpers import create_vault, TEST_SEED, TEST_PASSWORD
sys.path.append(str(Path(__file__).resolve().parents[1]))
from password_manager.entry_management import EntryManager
from password_manager.backup import BackupManager
from password_manager.manager import PasswordManager, EncryptionMode
from password_manager.config_manager import ConfigManager
class FakeNostrClient:
def __init__(self, *args, **kwargs):
self.published = []
def publish_snapshot(self, data: bytes):
self.published.append(data)
return None, "abcd"
def test_handle_add_totp(monkeypatch, capsys):
with TemporaryDirectory() as tmpdir:
tmp_path = Path(tmpdir)
vault, enc_mgr = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD)
cfg_mgr = ConfigManager(vault, tmp_path)
backup_mgr = BackupManager(tmp_path, cfg_mgr)
entry_mgr = EntryManager(vault, backup_mgr)
pm = PasswordManager.__new__(PasswordManager)
pm.encryption_mode = EncryptionMode.SEED_ONLY
pm.encryption_manager = enc_mgr
pm.vault = vault
pm.entry_manager = entry_mgr
pm.backup_manager = backup_mgr
pm.parent_seed = TEST_SEED
pm.nostr_client = FakeNostrClient()
pm.fingerprint_dir = tmp_path
pm.is_dirty = False
inputs = iter(
[
"1", # choose derive
"Example", # label
"", # period
"", # digits
]
)
monkeypatch.setattr("builtins.input", lambda *args, **kwargs: next(inputs))
monkeypatch.setattr(pm, "sync_vault", lambda: None)
pm.handle_add_totp()
out = capsys.readouterr().out
entry = entry_mgr.retrieve_entry(0)
assert entry == {
"type": "totp",
"label": "Example",
"index": 0,
"period": 30,
"digits": 6,
}
assert "ID 0" in out

View File

@@ -0,0 +1,99 @@
import sys
from pathlib import Path
from tempfile import TemporaryDirectory
from helpers import create_vault, TEST_SEED, TEST_PASSWORD
sys.path.append(str(Path(__file__).resolve().parents[1]))
from password_manager.entry_management import EntryManager
from password_manager.backup import BackupManager
from password_manager.manager import PasswordManager, EncryptionMode
from password_manager.config_manager import ConfigManager
class FakeNostrClient:
def __init__(self, *args, **kwargs):
self.published = []
def publish_snapshot(self, data: bytes):
self.published.append(data)
return None, "abcd"
def test_handle_display_totp_codes(monkeypatch, capsys):
with TemporaryDirectory() as tmpdir:
tmp_path = Path(tmpdir)
vault, enc_mgr = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD)
cfg_mgr = ConfigManager(vault, tmp_path)
backup_mgr = BackupManager(tmp_path, cfg_mgr)
entry_mgr = EntryManager(vault, backup_mgr)
pm = PasswordManager.__new__(PasswordManager)
pm.encryption_mode = EncryptionMode.SEED_ONLY
pm.encryption_manager = enc_mgr
pm.vault = vault
pm.entry_manager = entry_mgr
pm.backup_manager = backup_mgr
pm.parent_seed = TEST_SEED
pm.nostr_client = FakeNostrClient()
pm.fingerprint_dir = tmp_path
pm.is_dirty = False
entry_mgr.add_totp("Example", TEST_SEED)
monkeypatch.setattr(pm.entry_manager, "get_totp_code", lambda *a, **k: "123456")
monkeypatch.setattr(
pm.entry_manager, "get_totp_time_remaining", lambda *a, **k: 30
)
# interrupt the loop after first iteration
monkeypatch.setattr(
"password_manager.manager.select.select",
lambda *a, **k: (_ for _ in ()).throw(KeyboardInterrupt()),
)
pm.handle_display_totp_codes()
out = capsys.readouterr().out
assert "Generated 2FA Codes" in out
assert "[0] Example" in out
assert "123456" in out
def test_display_totp_codes_excludes_blacklisted(monkeypatch, capsys):
with TemporaryDirectory() as tmpdir:
tmp_path = Path(tmpdir)
vault, enc_mgr = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD)
cfg_mgr = ConfigManager(vault, tmp_path)
backup_mgr = BackupManager(tmp_path, cfg_mgr)
entry_mgr = EntryManager(vault, backup_mgr)
pm = PasswordManager.__new__(PasswordManager)
pm.encryption_mode = EncryptionMode.SEED_ONLY
pm.encryption_manager = enc_mgr
pm.vault = vault
pm.entry_manager = entry_mgr
pm.backup_manager = backup_mgr
pm.parent_seed = TEST_SEED
pm.nostr_client = FakeNostrClient()
pm.fingerprint_dir = tmp_path
pm.is_dirty = False
entry_mgr.add_totp("Visible", TEST_SEED)
entry_mgr.add_totp("Hidden", TEST_SEED)
entry_mgr.modify_entry(1, blacklisted=True)
monkeypatch.setattr(pm.entry_manager, "get_totp_code", lambda *a, **k: "123456")
monkeypatch.setattr(
pm.entry_manager, "get_totp_time_remaining", lambda *a, **k: 30
)
monkeypatch.setattr(
"password_manager.manager.select.select",
lambda *a, **k: (_ for _ in ()).throw(KeyboardInterrupt()),
)
pm.handle_display_totp_codes()
out = capsys.readouterr().out
assert "Visible" in out
assert "Hidden" not in out

View File

@@ -0,0 +1,60 @@
import sys
from pathlib import Path
from tempfile import TemporaryDirectory
from helpers import create_vault, TEST_SEED, TEST_PASSWORD
sys.path.append(str(Path(__file__).resolve().parents[1]))
from password_manager.entry_management import EntryManager
from password_manager.backup import BackupManager
from password_manager.manager import PasswordManager, EncryptionMode, TotpManager
from password_manager.config_manager import ConfigManager
class FakeNostrClient:
def __init__(self, *args, **kwargs):
self.published = []
def publish_snapshot(self, data: bytes):
self.published.append(data)
return None, "abcd"
def test_handle_retrieve_totp_entry(monkeypatch, capsys):
with TemporaryDirectory() as tmpdir:
tmp_path = Path(tmpdir)
vault, enc_mgr = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD)
cfg_mgr = ConfigManager(vault, tmp_path)
backup_mgr = BackupManager(tmp_path, cfg_mgr)
entry_mgr = EntryManager(vault, backup_mgr)
pm = PasswordManager.__new__(PasswordManager)
pm.encryption_mode = EncryptionMode.SEED_ONLY
pm.encryption_manager = enc_mgr
pm.vault = vault
pm.entry_manager = entry_mgr
pm.backup_manager = backup_mgr
pm.parent_seed = TEST_SEED
pm.nostr_client = FakeNostrClient()
pm.fingerprint_dir = tmp_path
pm.is_dirty = False
entry_mgr.add_totp("Example", TEST_SEED)
monkeypatch.setattr("builtins.input", lambda *a, **k: "0")
monkeypatch.setattr(pm.entry_manager, "get_totp_code", lambda *a, **k: "123456")
monkeypatch.setattr(
pm.entry_manager, "get_totp_time_remaining", lambda *a, **k: 1
)
monkeypatch.setattr("password_manager.manager.time.sleep", lambda *a, **k: None)
monkeypatch.setattr(sys.stdin, "readline", lambda *a, **k: "b\n")
monkeypatch.setattr(
"password_manager.manager.select.select",
lambda *a, **k: ([sys.stdin], [], []),
)
pm.handle_retrieve_entry()
out = capsys.readouterr().out
assert "Retrieved 2FA Code" in out
assert "123456" in out

View File

@@ -9,6 +9,7 @@ from password_manager.entry_management import EntryManager
from password_manager.vault import Vault
from password_manager.backup import BackupManager
from password_manager.manager import PasswordManager, EncryptionMode
from password_manager.config_manager import ConfigManager
class FakePasswordGenerator:
@@ -29,8 +30,9 @@ def test_manager_workflow(monkeypatch):
with TemporaryDirectory() as tmpdir:
tmp_path = Path(tmpdir)
vault, enc_mgr = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD)
entry_mgr = EntryManager(vault, tmp_path)
backup_mgr = BackupManager(tmp_path)
cfg_mgr = ConfigManager(vault, tmp_path)
backup_mgr = BackupManager(tmp_path, cfg_mgr)
entry_mgr = EntryManager(vault, backup_mgr)
monkeypatch.setattr("password_manager.manager.NostrClient", FakeNostrClient)
@@ -64,7 +66,7 @@ def test_manager_workflow(monkeypatch):
pm.handle_add_password()
assert pm.is_dirty is False
backups = list(tmp_path.glob("entries_db_backup_*.json.enc"))
backups = list((tmp_path / "backups").glob("entries_db_backup_*.json.enc"))
assert len(backups) == 1
checksum_file = tmp_path / "seedpass_entries_db_checksum.txt"
assert checksum_file.exists()

View File

@@ -0,0 +1,29 @@
import sys
from pathlib import Path
sys.path.append(str(Path(__file__).resolve().parents[1]))
import pytest
from utils.memory_protection import InMemorySecret
def test_inmemory_secret_round_trip_bytes_and_str():
plaintext = b"super secret"
secret = InMemorySecret(plaintext)
assert secret.get_bytes() == plaintext
assert secret.get_str() == plaintext.decode("utf-8")
def test_inmemory_secret_invalid_type():
with pytest.raises(TypeError):
InMemorySecret("not bytes")
def test_inmemory_secret_wipe_clears_attributes():
secret = InMemorySecret(b"wipe me")
secret.wipe()
assert secret._key is None
assert secret._nonce is None
assert secret._cipher is None
assert secret._encrypted is None

View File

@@ -8,7 +8,9 @@ from helpers import create_vault, TEST_SEED, TEST_PASSWORD
sys.path.append(str(Path(__file__).resolve().parents[1]))
from password_manager.entry_management import EntryManager
from password_manager.backup import BackupManager
from password_manager.vault import Vault
from password_manager.config_manager import ConfigManager
from nostr.client import NostrClient
@@ -16,7 +18,9 @@ def test_backup_and_publish_to_nostr():
with TemporaryDirectory() as tmpdir:
tmp_path = Path(tmpdir)
vault, enc_mgr = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD)
entry_mgr = EntryManager(vault, tmp_path)
cfg_mgr = ConfigManager(vault, tmp_path)
backup_mgr = BackupManager(tmp_path, cfg_mgr)
entry_mgr = EntryManager(vault, backup_mgr)
# create an index by adding an entry
entry_mgr.add_entry("example.com", 12)
@@ -34,7 +38,7 @@ def test_backup_and_publish_to_nostr():
enc_mgr, "decrypt_parent_seed", return_value="seed"
):
nostr_client = NostrClient(enc_mgr, "fp")
entry_mgr.backup_index_file()
entry_mgr.backup_manager.create_backup()
result = asyncio.run(nostr_client.publish_snapshot(encrypted_index))
mock_publish.assert_awaited_with(encrypted_index)

View File

@@ -75,3 +75,19 @@ def test_initialize_client_pool_add_relay_fallback(tmp_path):
fc = client.client
assert fc.added == client.relays
assert fc.connected is True
def test_check_relay_health_runs_async(tmp_path, monkeypatch):
client = _setup_client(tmp_path, FakeAddRelayClient)
recorded = {}
async def fake_check(min_relays, timeout):
recorded["args"] = (min_relays, timeout)
return 1
monkeypatch.setattr(client, "_check_relay_health", fake_check)
result = client.check_relay_health(3, timeout=2)
assert result == 1
assert recorded["args"] == (3, 2)

View File

@@ -4,12 +4,16 @@ import math
from helpers import create_vault, dummy_nostr_client
from password_manager.entry_management import EntryManager
from password_manager.backup import BackupManager
from password_manager.config_manager import ConfigManager
from nostr.client import prepare_snapshot
def test_manifest_generation(tmp_path):
vault, enc_mgr = create_vault(tmp_path)
entry_mgr = EntryManager(vault, tmp_path)
cfg_mgr = ConfigManager(vault, tmp_path)
backup_mgr = BackupManager(tmp_path, cfg_mgr)
entry_mgr = EntryManager(vault, backup_mgr)
entry_mgr.add_entry("example.com", 12)
entry_mgr.add_entry("test.com", 12)
encrypted = vault.get_encrypted_index()

View File

@@ -16,7 +16,9 @@ sys.path.append(str(Path(__file__).resolve().parents[1]))
from password_manager.encryption import EncryptionManager
from password_manager.entry_management import EntryManager
from password_manager.backup import BackupManager
from password_manager.vault import Vault
from password_manager.config_manager import ConfigManager
from nostr.client import NostrClient, Kind, KindStandard
@@ -40,7 +42,9 @@ def test_nostr_index_size_limits():
)
npub = client.key_manager.get_npub()
vault = Vault(enc_mgr, tmpdir)
entry_mgr = EntryManager(vault, Path(tmpdir))
cfg_mgr = ConfigManager(vault, Path(tmpdir))
backup_mgr = BackupManager(Path(tmpdir), cfg_mgr)
entry_mgr = EntryManager(vault, backup_mgr)
delay = float(os.getenv("NOSTR_TEST_DELAY", "5"))
size = 16

View File

@@ -10,6 +10,7 @@ sys.path.append(str(Path(__file__).resolve().parents[1]))
from password_manager.entry_management import EntryManager
from password_manager.config_manager import ConfigManager
from password_manager.backup import BackupManager
from password_manager.vault import Vault
from password_manager.manager import PasswordManager, EncryptionMode
@@ -18,8 +19,9 @@ def test_change_password_triggers_nostr_backup(monkeypatch):
with TemporaryDirectory() as tmpdir:
fp = Path(tmpdir)
vault, enc_mgr = create_vault(fp, TEST_SEED, TEST_PASSWORD)
entry_mgr = EntryManager(vault, fp)
cfg_mgr = ConfigManager(vault, fp)
backup_mgr = BackupManager(fp, cfg_mgr)
entry_mgr = EntryManager(vault, backup_mgr)
pm = PasswordManager.__new__(PasswordManager)
pm.encryption_mode = EncryptionMode.SEED_ONLY

View File

@@ -10,6 +10,7 @@ sys.path.append(str(Path(__file__).resolve().parents[1]))
from password_manager.encryption import EncryptionManager
from password_manager.vault import Vault
from password_manager.entry_management import EntryManager
from password_manager.backup import BackupManager
from password_manager.config_manager import ConfigManager
from password_manager.manager import PasswordManager, EncryptionMode
from utils.key_derivation import derive_index_key, derive_key_from_password
@@ -29,8 +30,9 @@ def test_password_change_and_unlock(monkeypatch):
enc_mgr = EncryptionManager(index_key, fp)
seed_mgr = EncryptionManager(seed_key, fp)
vault = Vault(enc_mgr, fp)
entry_mgr = EntryManager(vault, fp)
cfg_mgr = ConfigManager(vault, fp)
backup_mgr = BackupManager(fp, cfg_mgr)
entry_mgr = EntryManager(vault, backup_mgr)
vault.save_index({"entries": {}})
cfg_mgr.save_config(

View File

@@ -11,6 +11,7 @@ sys.path.append(str(Path(__file__).resolve().parents[1]))
from password_manager.encryption import EncryptionManager
from password_manager.vault import Vault
from password_manager.backup import BackupManager
from password_manager.config_manager import ConfigManager
from password_manager.portable_backup import export_backup, import_backup
from utils.key_derivation import derive_index_key, derive_key_from_password
@@ -27,7 +28,8 @@ def setup_vault(tmp: Path):
index_key = derive_index_key(SEED)
enc_mgr = EncryptionManager(index_key, tmp)
vault = Vault(enc_mgr, tmp)
backup = BackupManager(tmp)
cfg = ConfigManager(vault, tmp)
backup = BackupManager(tmp, cfg)
return vault, backup

View File

@@ -14,7 +14,9 @@ import constants
import password_manager.manager as manager_module
from password_manager.vault import Vault
from password_manager.entry_management import EntryManager
from password_manager.backup import BackupManager
from password_manager.manager import EncryptionMode
from password_manager.config_manager import ConfigManager
def test_add_and_delete_entry(monkeypatch):
@@ -51,7 +53,9 @@ def test_add_and_delete_entry(monkeypatch):
assert pm.fingerprint_manager.current_fingerprint == fingerprint
vault, enc_mgr = create_vault(fingerprint_dir, TEST_SEED, TEST_PASSWORD)
entry_mgr = EntryManager(vault, fingerprint_dir)
cfg_mgr = ConfigManager(vault, fingerprint_dir)
backup_mgr = BackupManager(fingerprint_dir, cfg_mgr)
entry_mgr = EntryManager(vault, backup_mgr)
pm.encryption_manager = enc_mgr
pm.vault = vault

View File

@@ -86,3 +86,15 @@ def test_relay_and_profile_actions(monkeypatch, capsys):
out = capsys.readouterr().out
assert fp1 in out
assert fp2 in out
def test_settings_menu_additional_backup(monkeypatch):
with TemporaryDirectory() as tmpdir:
tmp_path = Path(tmpdir)
pm, cfg_mgr, fp_mgr = setup_pm(tmp_path, monkeypatch)
inputs = iter(["9", "12"])
with patch("main.handle_set_additional_backup_location") as handler:
with patch("builtins.input", side_effect=lambda *_: next(inputs)):
main.handle_settings(pm)
handler.assert_called_once_with(pm)

30
src/tests/test_totp.py Normal file
View File

@@ -0,0 +1,30 @@
import sys
from pathlib import Path
import pyotp
from freezegun import freeze_time
sys.path.append(str(Path(__file__).resolve().parents[1]))
from helpers import TEST_SEED
from password_manager.totp import TotpManager
@freeze_time("1970-01-01 00:16:40")
def test_current_code_matches_pyotp():
secret = TotpManager.derive_secret(TEST_SEED, 0)
expected = pyotp.TOTP(secret).now()
assert TotpManager.current_code(TEST_SEED, 0) == expected
@freeze_time("1970-01-01 00:00:15")
def test_time_remaining():
assert TotpManager.time_remaining(period=30) == 15
def test_print_progress_bar_terminates(monkeypatch):
monkeypatch.setattr(TotpManager, "time_remaining", lambda period: 0)
calls = []
monkeypatch.setattr("password_manager.totp.time.sleep", lambda s: calls.append(s))
TotpManager.print_progress_bar(period=30)
assert calls == []

View File

@@ -0,0 +1,75 @@
import sys
from pathlib import Path
from tempfile import TemporaryDirectory
from unittest.mock import patch
import pytest
from helpers import create_vault, TEST_SEED, TEST_PASSWORD
sys.path.append(str(Path(__file__).resolve().parents[1]))
from password_manager.entry_management import EntryManager
from password_manager.backup import BackupManager
from password_manager.vault import Vault
from password_manager.config_manager import ConfigManager
from password_manager.totp import TotpManager
import pyotp
def test_add_totp_and_get_code():
with TemporaryDirectory() as tmpdir:
vault, enc_mgr = create_vault(Path(tmpdir), TEST_SEED, TEST_PASSWORD)
cfg_mgr = ConfigManager(vault, Path(tmpdir))
backup_mgr = BackupManager(Path(tmpdir), cfg_mgr)
entry_mgr = EntryManager(vault, backup_mgr)
uri = entry_mgr.add_totp("Example", TEST_SEED)
assert uri.startswith("otpauth://totp/")
entry = entry_mgr.retrieve_entry(0)
assert entry == {
"type": "totp",
"label": "Example",
"index": 0,
"period": 30,
"digits": 6,
}
code = entry_mgr.get_totp_code(0, TEST_SEED, timestamp=0)
expected = TotpManager.current_code(TEST_SEED, 0, timestamp=0)
assert code == expected
def test_totp_time_remaining(monkeypatch):
with TemporaryDirectory() as tmpdir:
vault, enc_mgr = create_vault(Path(tmpdir), TEST_SEED, TEST_PASSWORD)
cfg_mgr = ConfigManager(vault, Path(tmpdir))
backup_mgr = BackupManager(Path(tmpdir), cfg_mgr)
entry_mgr = EntryManager(vault, backup_mgr)
entry_mgr.add_totp("Example", TEST_SEED)
monkeypatch.setattr(TotpManager, "time_remaining", lambda period: 7)
remaining = entry_mgr.get_totp_time_remaining(0)
assert remaining == 7
def test_add_totp_imported(tmp_path):
vault, enc = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD)
cfg_mgr = ConfigManager(vault, tmp_path)
backup_mgr = BackupManager(tmp_path, cfg_mgr)
em = EntryManager(vault, backup_mgr)
secret = "JBSWY3DPEHPK3PXP"
em.add_totp("Imported", TEST_SEED, secret=secret)
entry = em.retrieve_entry(0)
assert entry == {
"type": "totp",
"label": "Imported",
"secret": secret,
"period": 30,
"digits": 6,
}
code = em.get_totp_code(0, timestamp=0)
assert code == pyotp.TOTP(secret).at(0)

View File

@@ -0,0 +1,48 @@
import sys
from pathlib import Path
import pytest
sys.path.append(str(Path(__file__).resolve().parents[1]))
from password_manager.totp import TotpManager
# Test parsing a normal otpauth URI with custom period and digits
def test_parse_otpauth_normal():
uri = "otpauth://totp/Example?secret=JBSWY3DPEHPK3PXP&period=45&digits=8"
label, secret, period, digits = TotpManager.parse_otpauth(uri)
assert label == "Example"
assert secret == "JBSWY3DPEHPK3PXP"
assert period == 45
assert digits == 8
# URI missing the otpauth:// prefix should raise ValueError
def test_parse_otpauth_missing_prefix():
with pytest.raises(ValueError):
TotpManager.parse_otpauth("totp/Example?secret=ABC")
# URI without a secret parameter should raise ValueError
def test_parse_otpauth_missing_secret():
uri = "otpauth://totp/Example?period=30"
with pytest.raises(ValueError):
TotpManager.parse_otpauth(uri)
# Round-trip make_otpauth_uri -> parse_otpauth with label containing spaces
def test_make_otpauth_uri_roundtrip():
label = "Example Label"
secret = "JBSWY3DPEHPK3PXP"
uri = TotpManager.make_otpauth_uri(label, secret, period=30, digits=6)
parsed = TotpManager.parse_otpauth(uri)
assert parsed == (label, secret, 30, 6)

View File

@@ -11,8 +11,10 @@ try:
derive_key_from_password,
derive_key_from_parent_seed,
derive_index_key,
derive_totp_secret,
EncryptionMode,
DEFAULT_ENCRYPTION_MODE,
TOTP_PURPOSE,
)
from .checksum import (
calculate_checksum,
@@ -22,6 +24,7 @@ try:
)
from .password_prompt import prompt_for_password
from .input_utils import timed_input
from .memory_protection import InMemorySecret
if logger.isEnabledFor(logging.DEBUG):
logger.info("Modules imported successfully.")
@@ -33,8 +36,10 @@ __all__ = [
"derive_key_from_password",
"derive_key_from_parent_seed",
"derive_index_key",
"derive_totp_secret",
"EncryptionMode",
"DEFAULT_ENCRYPTION_MODE",
"TOTP_PURPOSE",
"calculate_checksum",
"verify_checksum",
"json_checksum",
@@ -43,4 +48,5 @@ __all__ = [
"shared_lock",
"prompt_for_password",
"timed_input",
"InMemorySecret",
]

View File

@@ -20,6 +20,7 @@ import base64
import unicodedata
import logging
import traceback
import hmac
from enum import Enum
from typing import Optional, Union
from bip_utils import Bip39SeedGenerator
@@ -40,6 +41,9 @@ class EncryptionMode(Enum):
DEFAULT_ENCRYPTION_MODE = EncryptionMode.SEED_ONLY
# Purpose constant for TOTP secret derivation using BIP85
TOTP_PURPOSE = 39
def derive_key_from_password(password: str, iterations: int = 100_000) -> bytes:
"""
@@ -152,3 +156,31 @@ def derive_index_key_seed_only(seed: str) -> bytes:
def derive_index_key(seed: str) -> bytes:
"""Derive the index encryption key."""
return derive_index_key_seed_only(seed)
def derive_totp_secret(seed: str, index: int) -> str:
"""Derive a base32-encoded TOTP secret from a BIP39 seed."""
try:
from local_bip85 import BIP85
# Initialize BIP85 from the BIP39 seed bytes
seed_bytes = Bip39SeedGenerator(seed).Generate()
bip85 = BIP85(seed_bytes)
# Build the BIP32 path m/83696968'/39'/TOTP'/{index}'
totp_int = int.from_bytes(b"TOTP", "big")
path = f"m/83696968'/{TOTP_PURPOSE}'/{totp_int}'/{index}'"
# Derive entropy using the same scheme as BIP85
child_key = bip85.bip32_ctx.DerivePath(path)
key_bytes = child_key.PrivateKey().Raw().ToBytes()
entropy = hmac.new(b"bip-entropy-from-k", key_bytes, hashlib.sha512).digest()
# Hash the first 32 bytes of entropy and encode the first 20 bytes
hashed = hashlib.sha256(entropy[:32]).digest()
secret = base64.b32encode(hashed[:20]).decode("utf-8")
logger.debug(f"Derived TOTP secret for index {index}: {secret}")
return secret
except Exception as e:
logger.error(f"Failed to derive TOTP secret: {e}", exc_info=True)
raise

View File

@@ -0,0 +1,31 @@
from __future__ import annotations
import os
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
class InMemorySecret:
"""Store sensitive data encrypted in RAM using AES-GCM."""
def __init__(self, data: bytes) -> None:
if not isinstance(data, (bytes, bytearray)):
raise TypeError("data must be bytes")
self._key = AESGCM.generate_key(bit_length=128)
self._nonce = os.urandom(12)
self._cipher = AESGCM(self._key)
self._encrypted = self._cipher.encrypt(self._nonce, bytes(data), None)
def get_bytes(self) -> bytes:
"""Decrypt and return the plaintext bytes."""
return self._cipher.decrypt(self._nonce, self._encrypted, None)
def wipe(self) -> None:
"""Zero out internal data."""
self._key = None
self._nonce = None
self._cipher = None
self._encrypted = None
def get_str(self) -> str:
"""Return the decrypted plaintext as a UTF-8 string."""
return self.get_bytes().decode("utf-8")