Implement TOTP entry support

This commit is contained in:
thePR0M3TH3AN
2025-07-03 00:00:43 -04:00
parent aba3ca3bbb
commit d1d20759fd
3 changed files with 143 additions and 24 deletions

View File

@@ -29,6 +29,7 @@ from pathlib import Path
from termcolor import colored from termcolor import colored
from password_manager.migrations import LATEST_VERSION from password_manager.migrations import LATEST_VERSION
from password_manager.entry_types import EntryType from password_manager.entry_types import EntryType
from password_manager.totp import TotpManager
from password_manager.vault import Vault from password_manager.vault import Vault
from utils.file_lock import exclusive_lock from utils.file_lock import exclusive_lock
@@ -59,6 +60,9 @@ class EntryManager:
if self.index_file.exists(): if self.index_file.exists():
try: try:
data = self.vault.load_index() data = self.vault.load_index()
# Ensure legacy entries without a type are treated as passwords
for entry in data.get("entries", {}).values():
entry.setdefault("type", EntryType.PASSWORD.value)
logger.debug("Index loaded successfully.") logger.debug("Index loaded successfully.")
return data return data
except Exception as e: except Exception as e:
@@ -149,16 +153,33 @@ class EntryManager:
print(colored(f"Error: Failed to add entry: {e}", "red")) print(colored(f"Error: Failed to add entry: {e}", "red"))
sys.exit(1) sys.exit(1)
def add_totp(self, notes: str = "") -> int: def add_totp(
"""Placeholder for adding a TOTP entry.""" self, label: str, index: int, period: int = 30, digits: int = 6
index = self.get_next_index() ) -> str:
"""Add a new TOTP entry and return the provisioning URI."""
entry_id = self.get_next_index()
data = self.vault.load_index() data = self.vault.load_index()
data.setdefault("entries", {}) data.setdefault("entries", {})
data["entries"][str(index)] = {"type": EntryType.TOTP.value, "notes": notes}
data["entries"][str(entry_id)] = {
"type": EntryType.TOTP.value,
"label": label,
"index": index,
"period": period,
"digits": digits,
}
self._save_index(data) self._save_index(data)
self.update_checksum() self.update_checksum()
self.backup_index_file() self.backup_index_file()
raise NotImplementedError("TOTP entry support not implemented yet")
try:
seed = self.vault.encryption_manager.decrypt_parent_seed()
secret = TotpManager.derive_secret(seed, index)
return TotpManager.make_otpauth_uri(label, secret, period, digits)
except Exception as e:
logger.error(f"Failed to generate otpauth URI: {e}")
raise
def add_ssh_key(self, notes: str = "") -> int: def add_ssh_key(self, notes: str = "") -> int:
"""Placeholder for adding an SSH key entry.""" """Placeholder for adding an SSH key entry."""
@@ -182,6 +203,25 @@ class EntryManager:
self.backup_index_file() self.backup_index_file()
raise NotImplementedError("Seed entry support not implemented yet") raise NotImplementedError("Seed entry support not implemented yet")
def get_totp_code(self, index: int, timestamp: int | None = None) -> str:
"""Return the current TOTP code for the specified entry."""
entry = self.retrieve_entry(index)
if not entry or entry.get("type") != EntryType.TOTP.value:
raise ValueError("Entry is not a TOTP entry")
seed = self.vault.encryption_manager.decrypt_parent_seed()
totp_index = int(entry.get("index", 0))
return TotpManager.current_code(seed, totp_index, timestamp)
def get_totp_time_remaining(self, index: int) -> int:
"""Return seconds remaining in the TOTP period for the given entry."""
entry = self.retrieve_entry(index)
if not entry or entry.get("type") != EntryType.TOTP.value:
raise ValueError("Entry is not a TOTP entry")
period = int(entry.get("period", 30))
return TotpManager.time_remaining(period)
def get_encrypted_index(self) -> Optional[bytes]: def get_encrypted_index(self) -> Optional[bytes]:
""" """
Retrieves the encrypted password index file's contents. Retrieves the encrypted password index file's contents.
@@ -295,11 +335,7 @@ class EntryManager:
) )
def list_entries(self) -> List[Tuple[int, str, Optional[str], Optional[str], bool]]: def list_entries(self) -> List[Tuple[int, str, Optional[str], Optional[str], bool]]:
""" """List all entries in the index."""
Lists all entries in the index.
:return: A list of tuples containing entry details: (index, website, username, url, blacklisted)
"""
try: try:
data = self.vault.load_index() data = self.vault.load_index()
entries_data = data.get("entries", {}) entries_data = data.get("entries", {})
@@ -311,23 +347,48 @@ class EntryManager:
entries = [] entries = []
for idx, entry in sorted(entries_data.items(), key=lambda x: int(x[0])): for idx, entry in sorted(entries_data.items(), key=lambda x: int(x[0])):
entries.append( etype = entry.get("type", EntryType.PASSWORD.value)
( if etype == EntryType.TOTP.value:
int(idx), entries.append(
entry.get("website", ""), (int(idx), entry.get("label", ""), None, None, False)
entry.get("username", ""), )
entry.get("url", ""), else:
entry.get("blacklisted", False), entries.append(
(
int(idx),
entry.get("website", ""),
entry.get("username", ""),
entry.get("url", ""),
entry.get("blacklisted", False),
)
) )
)
logger.debug(f"Total entries found: {len(entries)}") logger.debug(f"Total entries found: {len(entries)}")
for entry in entries: for idx, entry in sorted(entries_data.items(), key=lambda x: int(x[0])):
print(colored(f"Index: {entry[0]}", "cyan")) etype = entry.get("type", EntryType.PASSWORD.value)
print(colored(f" Website: {entry[1]}", "cyan")) print(colored(f"Index: {idx}", "cyan"))
print(colored(f" Username: {entry[2] or 'N/A'}", "cyan")) if etype == EntryType.TOTP.value:
print(colored(f" URL: {entry[3] or 'N/A'}", "cyan")) print(colored(" Type: TOTP", "cyan"))
print(colored(f" Blacklisted: {'Yes' if entry[4] else 'No'}", "cyan")) print(colored(f" Label: {entry.get('label', '')}", "cyan"))
print(colored(f" Derivation Index: {entry.get('index')}", "cyan"))
print(
colored(
f" Period: {entry.get('period', 30)}s Digits: {entry.get('digits', 6)}",
"cyan",
)
)
else:
print(colored(f" Website: {entry.get('website', '')}", "cyan"))
print(
colored(f" Username: {entry.get('username') or 'N/A'}", "cyan")
)
print(colored(f" URL: {entry.get('url') or 'N/A'}", "cyan"))
print(
colored(
f" Blacklisted: {'Yes' if entry.get('blacklisted', False) else 'No'}",
"cyan",
)
)
print("-" * 40) print("-" * 40)
return entries return entries

View File

@@ -1,6 +1,8 @@
import sys import sys
from pathlib import Path from pathlib import Path
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
from unittest.mock import patch
import pytest import pytest
from helpers import create_vault, TEST_SEED, TEST_PASSWORD from helpers import create_vault, TEST_SEED, TEST_PASSWORD
@@ -49,6 +51,10 @@ def test_round_trip_entry_types(method, expected_type):
if method == "add_entry": if method == "add_entry":
index = entry_mgr.add_entry("example.com", 8) index = entry_mgr.add_entry("example.com", 8)
elif method == "add_totp":
with patch.object(enc_mgr, "decrypt_parent_seed", return_value=TEST_SEED):
entry_mgr.add_totp("example", 0)
index = 0
else: else:
with pytest.raises(NotImplementedError): with pytest.raises(NotImplementedError):
getattr(entry_mgr, method)() getattr(entry_mgr, method)()

View File

@@ -0,0 +1,52 @@
import sys
from pathlib import Path
from tempfile import TemporaryDirectory
from unittest.mock import patch
import pytest
from helpers import create_vault, TEST_SEED, TEST_PASSWORD
sys.path.append(str(Path(__file__).resolve().parents[1]))
from password_manager.entry_management import EntryManager
from password_manager.vault import Vault
from password_manager.totp import TotpManager
def test_add_totp_and_get_code():
with TemporaryDirectory() as tmpdir:
vault, enc_mgr = create_vault(Path(tmpdir), TEST_SEED, TEST_PASSWORD)
entry_mgr = EntryManager(vault, Path(tmpdir))
with patch.object(enc_mgr, "decrypt_parent_seed", return_value=TEST_SEED):
uri = entry_mgr.add_totp("Example", 0)
assert uri.startswith("otpauth://totp/")
entry = entry_mgr.retrieve_entry(0)
assert entry == {
"type": "totp",
"label": "Example",
"index": 0,
"period": 30,
"digits": 6,
}
with patch.object(enc_mgr, "decrypt_parent_seed", return_value=TEST_SEED):
code = entry_mgr.get_totp_code(0, timestamp=0)
expected = TotpManager.current_code(TEST_SEED, 0, timestamp=0)
assert code == expected
def test_totp_time_remaining(monkeypatch):
with TemporaryDirectory() as tmpdir:
vault, enc_mgr = create_vault(Path(tmpdir), TEST_SEED, TEST_PASSWORD)
entry_mgr = EntryManager(vault, Path(tmpdir))
with patch.object(enc_mgr, "decrypt_parent_seed", return_value=TEST_SEED):
entry_mgr.add_totp("Example", 0)
monkeypatch.setattr(TotpManager, "time_remaining", lambda period: 7)
remaining = entry_mgr.get_totp_time_remaining(0)
assert remaining == 7