mirror of
https://github.com/PR0M3TH3AN/SeedPass.git
synced 2025-09-09 07:48:57 +00:00
Introduce vault layer
This commit is contained in:
@@ -28,7 +28,7 @@ from pathlib import Path
|
||||
|
||||
from termcolor import colored
|
||||
|
||||
from password_manager.encryption import EncryptionManager
|
||||
from password_manager.vault import Vault
|
||||
from utils.file_lock import exclusive_lock
|
||||
|
||||
|
||||
@@ -37,14 +37,14 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EntryManager:
|
||||
def __init__(self, encryption_manager: EncryptionManager, fingerprint_dir: Path):
|
||||
def __init__(self, vault: Vault, fingerprint_dir: Path):
|
||||
"""
|
||||
Initializes the EntryManager with the EncryptionManager and fingerprint directory.
|
||||
|
||||
:param encryption_manager: The encryption manager instance.
|
||||
:param vault: The Vault instance for file access.
|
||||
:param fingerprint_dir: The directory corresponding to the fingerprint.
|
||||
"""
|
||||
self.encryption_manager = encryption_manager
|
||||
self.vault = vault
|
||||
self.fingerprint_dir = fingerprint_dir
|
||||
|
||||
# Use paths relative to the fingerprint directory
|
||||
@@ -56,7 +56,7 @@ class EntryManager:
|
||||
def _load_index(self) -> Dict[str, Any]:
|
||||
if self.index_file.exists():
|
||||
try:
|
||||
data = self.encryption_manager.load_json_data(self.index_file)
|
||||
data = self.vault.load_index()
|
||||
logger.debug("Index loaded successfully.")
|
||||
return data
|
||||
except Exception as e:
|
||||
@@ -70,7 +70,7 @@ class EntryManager:
|
||||
|
||||
def _save_index(self, data: Dict[str, Any]) -> None:
|
||||
try:
|
||||
self.encryption_manager.save_json_data(data, self.index_file)
|
||||
self.vault.save_index(data)
|
||||
logger.debug("Index saved successfully.")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save index: {e}")
|
||||
@@ -83,7 +83,7 @@ class EntryManager:
|
||||
:return: The next index number as an integer.
|
||||
"""
|
||||
try:
|
||||
data = self.encryption_manager.load_json_data(self.index_file)
|
||||
data = self.vault.load_index()
|
||||
if "passwords" in data and isinstance(data["passwords"], dict):
|
||||
indices = [int(idx) for idx in data["passwords"].keys()]
|
||||
next_index = max(indices) + 1 if indices else 0
|
||||
@@ -117,7 +117,7 @@ class EntryManager:
|
||||
"""
|
||||
try:
|
||||
index = self.get_next_index()
|
||||
data = self.encryption_manager.load_json_data(self.index_file)
|
||||
data = self.vault.load_index()
|
||||
|
||||
data["passwords"][str(index)] = {
|
||||
"website": website_name,
|
||||
@@ -153,19 +153,7 @@ class EntryManager:
|
||||
:return: The encrypted data as bytes, or None if retrieval fails.
|
||||
"""
|
||||
try:
|
||||
if not self.index_file.exists():
|
||||
logger.error(f"Index file '{self.index_file}' does not exist.")
|
||||
print(
|
||||
colored(
|
||||
f"Error: Index file '{self.index_file}' does not exist.", "red"
|
||||
)
|
||||
)
|
||||
return None
|
||||
|
||||
with open(self.index_file, "rb") as file:
|
||||
encrypted_data = file.read()
|
||||
logger.debug("Encrypted index file data retrieved successfully.")
|
||||
return encrypted_data
|
||||
return self.vault.get_encrypted_index()
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to retrieve encrypted index file: {e}")
|
||||
logger.error(traceback.format_exc())
|
||||
@@ -182,7 +170,7 @@ class EntryManager:
|
||||
:return: A dictionary containing the entry details or None if not found.
|
||||
"""
|
||||
try:
|
||||
data = self.encryption_manager.load_json_data(self.index_file)
|
||||
data = self.vault.load_index()
|
||||
entry = data.get("passwords", {}).get(str(index))
|
||||
|
||||
if entry:
|
||||
@@ -217,7 +205,7 @@ class EntryManager:
|
||||
:param blacklisted: (Optional) The new blacklist status.
|
||||
"""
|
||||
try:
|
||||
data = self.encryption_manager.load_json_data(self.index_file)
|
||||
data = self.vault.load_index()
|
||||
entry = data.get("passwords", {}).get(str(index))
|
||||
|
||||
if not entry:
|
||||
@@ -272,7 +260,7 @@ class EntryManager:
|
||||
:return: A list of tuples containing entry details: (index, website, username, url, blacklisted)
|
||||
"""
|
||||
try:
|
||||
data = self.encryption_manager.load_json_data()
|
||||
data = self.vault.load_index()
|
||||
passwords = data.get("passwords", {})
|
||||
|
||||
if not passwords:
|
||||
@@ -316,11 +304,11 @@ class EntryManager:
|
||||
:param index: The index number of the password entry to delete.
|
||||
"""
|
||||
try:
|
||||
data = self.encryption_manager.load_json_data()
|
||||
data = self.vault.load_index()
|
||||
if "passwords" in data and str(index) in data["passwords"]:
|
||||
del data["passwords"][str(index)]
|
||||
logger.debug(f"Deleted entry at index {index}.")
|
||||
self.encryption_manager.save_json_data(data)
|
||||
self.vault.save_index(data)
|
||||
self.update_checksum()
|
||||
self.backup_index_file()
|
||||
logger.info(f"Entry at index {index} deleted successfully.")
|
||||
@@ -352,7 +340,7 @@ class EntryManager:
|
||||
Updates the checksum file for the password database to ensure data integrity.
|
||||
"""
|
||||
try:
|
||||
data = self.encryption_manager.load_json_data(self.index_file)
|
||||
data = self.vault.load_index()
|
||||
json_content = json.dumps(data, indent=4)
|
||||
checksum = hashlib.sha256(json_content.encode("utf-8")).hexdigest()
|
||||
|
||||
@@ -470,15 +458,15 @@ class EntryManager:
|
||||
|
||||
# Example usage (this part should be removed or commented out when integrating into the larger application)
|
||||
if __name__ == "__main__":
|
||||
from password_manager.encryption import (
|
||||
EncryptionManager,
|
||||
) # Ensure this import is correct based on your project structure
|
||||
from password_manager.encryption import EncryptionManager
|
||||
from password_manager.vault import Vault
|
||||
|
||||
# Initialize EncryptionManager with a dummy key for demonstration purposes
|
||||
# Replace 'your-fernet-key' with your actual Fernet key
|
||||
try:
|
||||
dummy_key = Fernet.generate_key()
|
||||
encryption_manager = EncryptionManager(dummy_key)
|
||||
encryption_manager = EncryptionManager(dummy_key, Path("."))
|
||||
vault = Vault(encryption_manager, Path("."))
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize EncryptionManager: {e}")
|
||||
print(colored(f"Error: Failed to initialize EncryptionManager: {e}", "red"))
|
||||
@@ -486,7 +474,7 @@ if __name__ == "__main__":
|
||||
|
||||
# Initialize EntryManager
|
||||
try:
|
||||
entry_manager = EntryManager(encryption_manager)
|
||||
entry_manager = EntryManager(vault, Path("."))
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize EntryManager: {e}")
|
||||
print(colored(f"Error: Failed to initialize EntryManager: {e}", "red"))
|
||||
|
Reference in New Issue
Block a user