diff --git a/src/password_manager/entry_management.py b/src/password_manager/entry_management.py index 9c9fc16..42043b2 100644 --- a/src/password_manager/entry_management.py +++ b/src/password_manager/entry_management.py @@ -29,6 +29,7 @@ from pathlib import Path from termcolor import colored from password_manager.migrations import LATEST_VERSION from password_manager.entry_types import EntryType +from password_manager.totp import TotpManager from password_manager.vault import Vault from utils.file_lock import exclusive_lock @@ -59,6 +60,9 @@ class EntryManager: if self.index_file.exists(): try: data = self.vault.load_index() + # Ensure legacy entries without a type are treated as passwords + for entry in data.get("entries", {}).values(): + entry.setdefault("type", EntryType.PASSWORD.value) logger.debug("Index loaded successfully.") return data except Exception as e: @@ -149,16 +153,33 @@ class EntryManager: print(colored(f"Error: Failed to add entry: {e}", "red")) sys.exit(1) - def add_totp(self, notes: str = "") -> int: - """Placeholder for adding a TOTP entry.""" - index = self.get_next_index() + def add_totp( + self, label: str, index: int, period: int = 30, digits: int = 6 + ) -> str: + """Add a new TOTP entry and return the provisioning URI.""" + entry_id = self.get_next_index() data = self.vault.load_index() data.setdefault("entries", {}) - data["entries"][str(index)] = {"type": EntryType.TOTP.value, "notes": notes} + + data["entries"][str(entry_id)] = { + "type": EntryType.TOTP.value, + "label": label, + "index": index, + "period": period, + "digits": digits, + } + self._save_index(data) self.update_checksum() self.backup_index_file() - raise NotImplementedError("TOTP entry support not implemented yet") + + try: + seed = self.vault.encryption_manager.decrypt_parent_seed() + secret = TotpManager.derive_secret(seed, index) + return TotpManager.make_otpauth_uri(label, secret, period, digits) + except Exception as e: + logger.error(f"Failed to generate otpauth URI: {e}") + raise def add_ssh_key(self, notes: str = "") -> int: """Placeholder for adding an SSH key entry.""" @@ -182,6 +203,25 @@ class EntryManager: self.backup_index_file() raise NotImplementedError("Seed entry support not implemented yet") + def get_totp_code(self, index: int, timestamp: int | None = None) -> str: + """Return the current TOTP code for the specified entry.""" + entry = self.retrieve_entry(index) + if not entry or entry.get("type") != EntryType.TOTP.value: + raise ValueError("Entry is not a TOTP entry") + + seed = self.vault.encryption_manager.decrypt_parent_seed() + totp_index = int(entry.get("index", 0)) + return TotpManager.current_code(seed, totp_index, timestamp) + + def get_totp_time_remaining(self, index: int) -> int: + """Return seconds remaining in the TOTP period for the given entry.""" + entry = self.retrieve_entry(index) + if not entry or entry.get("type") != EntryType.TOTP.value: + raise ValueError("Entry is not a TOTP entry") + + period = int(entry.get("period", 30)) + return TotpManager.time_remaining(period) + def get_encrypted_index(self) -> Optional[bytes]: """ Retrieves the encrypted password index file's contents. @@ -295,11 +335,7 @@ class EntryManager: ) def list_entries(self) -> List[Tuple[int, str, Optional[str], Optional[str], bool]]: - """ - Lists all entries in the index. - - :return: A list of tuples containing entry details: (index, website, username, url, blacklisted) - """ + """List all entries in the index.""" try: data = self.vault.load_index() entries_data = data.get("entries", {}) @@ -311,23 +347,48 @@ class EntryManager: entries = [] for idx, entry in sorted(entries_data.items(), key=lambda x: int(x[0])): - entries.append( - ( - int(idx), - entry.get("website", ""), - entry.get("username", ""), - entry.get("url", ""), - entry.get("blacklisted", False), + etype = entry.get("type", EntryType.PASSWORD.value) + if etype == EntryType.TOTP.value: + entries.append( + (int(idx), entry.get("label", ""), None, None, False) + ) + else: + entries.append( + ( + int(idx), + entry.get("website", ""), + entry.get("username", ""), + entry.get("url", ""), + entry.get("blacklisted", False), + ) ) - ) logger.debug(f"Total entries found: {len(entries)}") - for entry in entries: - print(colored(f"Index: {entry[0]}", "cyan")) - print(colored(f" Website: {entry[1]}", "cyan")) - print(colored(f" Username: {entry[2] or 'N/A'}", "cyan")) - print(colored(f" URL: {entry[3] or 'N/A'}", "cyan")) - print(colored(f" Blacklisted: {'Yes' if entry[4] else 'No'}", "cyan")) + for idx, entry in sorted(entries_data.items(), key=lambda x: int(x[0])): + etype = entry.get("type", EntryType.PASSWORD.value) + print(colored(f"Index: {idx}", "cyan")) + if etype == EntryType.TOTP.value: + print(colored(" Type: TOTP", "cyan")) + print(colored(f" Label: {entry.get('label', '')}", "cyan")) + print(colored(f" Derivation Index: {entry.get('index')}", "cyan")) + print( + colored( + f" Period: {entry.get('period', 30)}s Digits: {entry.get('digits', 6)}", + "cyan", + ) + ) + else: + print(colored(f" Website: {entry.get('website', '')}", "cyan")) + print( + colored(f" Username: {entry.get('username') or 'N/A'}", "cyan") + ) + print(colored(f" URL: {entry.get('url') or 'N/A'}", "cyan")) + print( + colored( + f" Blacklisted: {'Yes' if entry.get('blacklisted', False) else 'No'}", + "cyan", + ) + ) print("-" * 40) return entries diff --git a/src/tests/test_entry_add.py b/src/tests/test_entry_add.py index fce73f4..d96f1c3 100644 --- a/src/tests/test_entry_add.py +++ b/src/tests/test_entry_add.py @@ -1,6 +1,8 @@ import sys from pathlib import Path from tempfile import TemporaryDirectory +from unittest.mock import patch + import pytest from helpers import create_vault, TEST_SEED, TEST_PASSWORD @@ -49,6 +51,10 @@ def test_round_trip_entry_types(method, expected_type): if method == "add_entry": index = entry_mgr.add_entry("example.com", 8) + elif method == "add_totp": + with patch.object(enc_mgr, "decrypt_parent_seed", return_value=TEST_SEED): + entry_mgr.add_totp("example", 0) + index = 0 else: with pytest.raises(NotImplementedError): getattr(entry_mgr, method)() diff --git a/src/tests/test_totp_entry.py b/src/tests/test_totp_entry.py new file mode 100644 index 0000000..371ceaa --- /dev/null +++ b/src/tests/test_totp_entry.py @@ -0,0 +1,52 @@ +import sys +from pathlib import Path +from tempfile import TemporaryDirectory +from unittest.mock import patch + +import pytest + +from helpers import create_vault, TEST_SEED, TEST_PASSWORD + +sys.path.append(str(Path(__file__).resolve().parents[1])) + +from password_manager.entry_management import EntryManager +from password_manager.vault import Vault +from password_manager.totp import TotpManager + + +def test_add_totp_and_get_code(): + with TemporaryDirectory() as tmpdir: + vault, enc_mgr = create_vault(Path(tmpdir), TEST_SEED, TEST_PASSWORD) + entry_mgr = EntryManager(vault, Path(tmpdir)) + + with patch.object(enc_mgr, "decrypt_parent_seed", return_value=TEST_SEED): + uri = entry_mgr.add_totp("Example", 0) + assert uri.startswith("otpauth://totp/") + + entry = entry_mgr.retrieve_entry(0) + assert entry == { + "type": "totp", + "label": "Example", + "index": 0, + "period": 30, + "digits": 6, + } + + with patch.object(enc_mgr, "decrypt_parent_seed", return_value=TEST_SEED): + code = entry_mgr.get_totp_code(0, timestamp=0) + + expected = TotpManager.current_code(TEST_SEED, 0, timestamp=0) + assert code == expected + + +def test_totp_time_remaining(monkeypatch): + with TemporaryDirectory() as tmpdir: + vault, enc_mgr = create_vault(Path(tmpdir), TEST_SEED, TEST_PASSWORD) + entry_mgr = EntryManager(vault, Path(tmpdir)) + + with patch.object(enc_mgr, "decrypt_parent_seed", return_value=TEST_SEED): + entry_mgr.add_totp("Example", 0) + + monkeypatch.setattr(TotpManager, "time_remaining", lambda period: 7) + remaining = entry_mgr.get_totp_time_remaining(0) + assert remaining == 7