mirror of
https://github.com/PR0M3TH3AN/SeedPass.git
synced 2025-09-08 07:18:47 +00:00
11
.github/workflows/python-ci.yml
vendored
11
.github/workflows/python-ci.yml
vendored
@@ -81,10 +81,15 @@ jobs:
|
||||
if: github.ref == 'refs/heads/main' || github.event_name == 'schedule'
|
||||
run: echo "NOSTR_E2E=1" >> $GITHUB_ENV
|
||||
- name: Run tests with coverage
|
||||
timeout-minutes: 16
|
||||
shell: bash
|
||||
run: |
|
||||
pytest ${STRESS_ARGS} --cov=src --cov-report=xml --cov-report=term-missing \
|
||||
--cov-fail-under=20 src/tests
|
||||
run: scripts/run_ci_tests.sh
|
||||
- name: Upload pytest log
|
||||
if: always()
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: pytest-log-${{ matrix.os }}
|
||||
path: pytest.log
|
||||
- name: Upload coverage report
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
|
@@ -473,7 +473,9 @@ subfolder (or adjust `APP_DIR` in `constants.py`) if you want to load it with
|
||||
the main application. The fingerprint is printed after creation and the
|
||||
encrypted index is published to Nostr. Use that same seed phrase to load
|
||||
SeedPass. The app checks Nostr on startup and pulls any newer snapshot so your
|
||||
vault stays in sync across machines.
|
||||
vault stays in sync across machines. If no snapshot exists or the download
|
||||
cannot be decrypted (for example when using a brand-new seed), SeedPass
|
||||
automatically initializes an empty index instead of exiting.
|
||||
|
||||
### Automatically Updating the Script Checksum
|
||||
|
||||
|
36
scripts/run_ci_tests.sh
Executable file
36
scripts/run_ci_tests.sh
Executable file
@@ -0,0 +1,36 @@
|
||||
#!/usr/bin/env bash
|
||||
set -eo pipefail
|
||||
|
||||
pytest_args=(-vv)
|
||||
if [[ -n "${STRESS_ARGS:-}" ]]; then
|
||||
pytest_args+=(${STRESS_ARGS})
|
||||
fi
|
||||
if [[ "${RUNNER_OS:-}" == "Windows" ]]; then
|
||||
pytest_args+=(-n 1)
|
||||
fi
|
||||
pytest_args+=(--cov=src --cov-report=xml --cov-report=term-missing --cov-fail-under=20 src/tests)
|
||||
|
||||
timeout_bin="timeout"
|
||||
if ! command -v "$timeout_bin" >/dev/null 2>&1; then
|
||||
if command -v gtimeout >/dev/null 2>&1; then
|
||||
timeout_bin="gtimeout"
|
||||
else
|
||||
timeout_bin=""
|
||||
fi
|
||||
fi
|
||||
|
||||
if [[ -n "$timeout_bin" ]]; then
|
||||
$timeout_bin 15m pytest "${pytest_args[@]}" 2>&1 | tee pytest.log
|
||||
status=${PIPESTATUS[0]}
|
||||
else
|
||||
echo "timeout command not found; running tests without timeout" >&2
|
||||
pytest "${pytest_args[@]}" 2>&1 | tee pytest.log
|
||||
status=${PIPESTATUS[0]}
|
||||
fi
|
||||
|
||||
if [[ $status -eq 124 ]]; then
|
||||
echo "::error::Tests exceeded 15-minute limit"
|
||||
tail -n 20 pytest.log
|
||||
exit 1
|
||||
fi
|
||||
exit $status
|
@@ -223,15 +223,28 @@ class EncryptionManager:
|
||||
return fh.read()
|
||||
|
||||
def decrypt_and_save_index_from_nostr(
|
||||
self, encrypted_data: bytes, relative_path: Optional[Path] = None
|
||||
) -> None:
|
||||
"""Decrypts data from Nostr and saves it, automatically using the new format."""
|
||||
self,
|
||||
encrypted_data: bytes,
|
||||
relative_path: Optional[Path] = None,
|
||||
*,
|
||||
strict: bool = True,
|
||||
) -> bool:
|
||||
"""Decrypts data from Nostr and saves it.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
encrypted_data:
|
||||
The payload downloaded from Nostr.
|
||||
relative_path:
|
||||
Destination filename under the profile directory.
|
||||
strict:
|
||||
When ``True`` (default) re-raise any decryption error. When ``False``
|
||||
return ``False`` if decryption fails.
|
||||
"""
|
||||
if relative_path is None:
|
||||
relative_path = Path("seedpass_entries_db.json.enc")
|
||||
try:
|
||||
decrypted_data = self.decrypt_data(
|
||||
encrypted_data
|
||||
) # This now handles both formats
|
||||
decrypted_data = self.decrypt_data(encrypted_data)
|
||||
if USE_ORJSON:
|
||||
data = json_lib.loads(decrypted_data)
|
||||
else:
|
||||
@@ -240,18 +253,22 @@ class EncryptionManager:
|
||||
self.update_checksum(relative_path)
|
||||
logger.info("Index file from Nostr was processed and saved successfully.")
|
||||
print(colored("Index file updated from Nostr successfully.", "green"))
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to decrypt and save data from Nostr: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
print(
|
||||
colored(
|
||||
f"Error: Failed to decrypt and save data from Nostr: {e}",
|
||||
"red",
|
||||
return True
|
||||
except Exception as e: # pragma: no cover - error handling
|
||||
if strict:
|
||||
logger.error(
|
||||
f"Failed to decrypt and save data from Nostr: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
)
|
||||
raise
|
||||
print(
|
||||
colored(
|
||||
f"Error: Failed to decrypt and save data from Nostr: {e}",
|
||||
"red",
|
||||
)
|
||||
)
|
||||
raise
|
||||
logger.warning(f"Failed to decrypt index from Nostr: {e}")
|
||||
return False
|
||||
|
||||
def update_checksum(self, relative_path: Optional[Path] = None) -> None:
|
||||
"""Updates the checksum file for the specified file."""
|
||||
|
@@ -920,6 +920,7 @@ class EntryManager:
|
||||
filter_kind: str | None = None,
|
||||
*,
|
||||
include_archived: bool = False,
|
||||
verbose: bool = True,
|
||||
) -> List[Tuple[int, str, Optional[str], Optional[str], bool]]:
|
||||
"""List entries in the index with optional sorting and filtering.
|
||||
|
||||
@@ -932,7 +933,8 @@ class EntryManager:
|
||||
|
||||
if not entries_data:
|
||||
logger.info("No entries found.")
|
||||
print(colored("No entries found.", "yellow"))
|
||||
if verbose:
|
||||
print(colored("No entries found.", "yellow"))
|
||||
return []
|
||||
|
||||
def sort_key(item: Tuple[str, Dict[str, Any]]):
|
||||
@@ -987,51 +989,59 @@ class EntryManager:
|
||||
)
|
||||
|
||||
logger.debug(f"Total entries found: {len(entries)}")
|
||||
for idx, entry in filtered_items:
|
||||
etype = entry.get("type", entry.get("kind", EntryType.PASSWORD.value))
|
||||
print(colored(f"Index: {idx}", "cyan"))
|
||||
if etype == EntryType.TOTP.value:
|
||||
print(colored(" Type: TOTP", "cyan"))
|
||||
print(colored(f" Label: {entry.get('label', '')}", "cyan"))
|
||||
print(colored(f" Derivation Index: {entry.get('index')}", "cyan"))
|
||||
print(
|
||||
colored(
|
||||
f" Period: {entry.get('period', 30)}s Digits: {entry.get('digits', 6)}",
|
||||
"cyan",
|
||||
if verbose:
|
||||
for idx, entry in filtered_items:
|
||||
etype = entry.get(
|
||||
"type", entry.get("kind", EntryType.PASSWORD.value)
|
||||
)
|
||||
print(colored(f"Index: {idx}", "cyan"))
|
||||
if etype == EntryType.TOTP.value:
|
||||
print(colored(" Type: TOTP", "cyan"))
|
||||
print(colored(f" Label: {entry.get('label', '')}", "cyan"))
|
||||
print(
|
||||
colored(f" Derivation Index: {entry.get('index')}", "cyan")
|
||||
)
|
||||
)
|
||||
elif etype == EntryType.PASSWORD.value:
|
||||
print(
|
||||
colored(
|
||||
f" Label: {entry.get('label', entry.get('website', ''))}",
|
||||
"cyan",
|
||||
print(
|
||||
colored(
|
||||
f" Period: {entry.get('period', 30)}s Digits: {entry.get('digits', 6)}",
|
||||
"cyan",
|
||||
)
|
||||
)
|
||||
)
|
||||
print(
|
||||
colored(f" Username: {entry.get('username') or 'N/A'}", "cyan")
|
||||
)
|
||||
print(colored(f" URL: {entry.get('url') or 'N/A'}", "cyan"))
|
||||
print(
|
||||
colored(
|
||||
f" Archived: {'Yes' if entry.get('archived', entry.get('blacklisted', False)) else 'No'}",
|
||||
"cyan",
|
||||
elif etype == EntryType.PASSWORD.value:
|
||||
print(
|
||||
colored(
|
||||
f" Label: {entry.get('label', entry.get('website', ''))}",
|
||||
"cyan",
|
||||
)
|
||||
)
|
||||
)
|
||||
else:
|
||||
print(colored(f" Label: {entry.get('label', '')}", "cyan"))
|
||||
print(
|
||||
colored(
|
||||
f" Derivation Index: {entry.get('index', idx)}",
|
||||
"cyan",
|
||||
print(
|
||||
colored(
|
||||
f" Username: {entry.get('username') or 'N/A'}", "cyan"
|
||||
)
|
||||
)
|
||||
)
|
||||
print("-" * 40)
|
||||
print(colored(f" URL: {entry.get('url') or 'N/A'}", "cyan"))
|
||||
print(
|
||||
colored(
|
||||
f" Archived: {'Yes' if entry.get('archived', entry.get('blacklisted', False)) else 'No'}",
|
||||
"cyan",
|
||||
)
|
||||
)
|
||||
else:
|
||||
print(colored(f" Label: {entry.get('label', '')}", "cyan"))
|
||||
print(
|
||||
colored(
|
||||
f" Derivation Index: {entry.get('index', idx)}",
|
||||
"cyan",
|
||||
)
|
||||
)
|
||||
print("-" * 40)
|
||||
|
||||
return entries
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to list entries: {e}", exc_info=True)
|
||||
print(colored(f"Error: Failed to list entries: {e}", "red"))
|
||||
if verbose:
|
||||
print(colored(f"Error: Failed to list entries: {e}", "red"))
|
||||
return []
|
||||
|
||||
def search_entries(
|
||||
|
@@ -1103,8 +1103,10 @@ class PasswordManager:
|
||||
encrypted = deltas[-1]
|
||||
current = self.vault.get_encrypted_index()
|
||||
if current != encrypted:
|
||||
self.vault.decrypt_and_save_index_from_nostr(encrypted)
|
||||
logger.info("Local database synchronized from Nostr.")
|
||||
if self.vault.decrypt_and_save_index_from_nostr(
|
||||
encrypted, strict=False
|
||||
):
|
||||
logger.info("Local database synchronized from Nostr.")
|
||||
except Exception as e:
|
||||
logger.warning(f"Unable to sync index from Nostr: {e}")
|
||||
finally:
|
||||
@@ -1195,14 +1197,12 @@ class PasswordManager:
|
||||
deltas = asyncio.run(self.nostr_client.fetch_deltas_since(version))
|
||||
if deltas:
|
||||
encrypted = deltas[-1]
|
||||
try:
|
||||
self.vault.decrypt_and_save_index_from_nostr(encrypted)
|
||||
success = self.vault.decrypt_and_save_index_from_nostr(
|
||||
encrypted, strict=False
|
||||
)
|
||||
if success:
|
||||
logger.info("Initialized local database from Nostr.")
|
||||
have_data = True
|
||||
except Exception as err:
|
||||
logger.warning(
|
||||
f"Failed to decrypt Nostr data: {err}; treating as new account."
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Unable to sync index from Nostr: {e}")
|
||||
|
||||
@@ -3240,7 +3240,9 @@ class PasswordManager:
|
||||
def handle_view_archived_entries(self) -> None:
|
||||
"""Display archived entries and optionally view or restore them."""
|
||||
try:
|
||||
archived = self.entry_manager.list_entries(include_archived=True)
|
||||
archived = self.entry_manager.list_entries(
|
||||
include_archived=True, verbose=False
|
||||
)
|
||||
archived = [e for e in archived if e[4]]
|
||||
if not archived:
|
||||
self.notify("No archived entries found.", level="WARNING")
|
||||
@@ -3286,7 +3288,7 @@ class PasswordManager:
|
||||
self.last_update = time.time()
|
||||
pause()
|
||||
archived = self.entry_manager.list_entries(
|
||||
include_archived=True
|
||||
include_archived=True, verbose=False
|
||||
)
|
||||
archived = [e for e in archived if e[4]]
|
||||
if not archived:
|
||||
|
@@ -60,9 +60,13 @@ class Vault:
|
||||
"""Return the encrypted index bytes if present."""
|
||||
return self.encryption_manager.get_encrypted_index()
|
||||
|
||||
def decrypt_and_save_index_from_nostr(self, encrypted_data: bytes) -> None:
|
||||
def decrypt_and_save_index_from_nostr(
|
||||
self, encrypted_data: bytes, *, strict: bool = True
|
||||
) -> bool:
|
||||
"""Decrypt Nostr payload and overwrite the local index."""
|
||||
self.encryption_manager.decrypt_and_save_index_from_nostr(encrypted_data)
|
||||
return self.encryption_manager.decrypt_and_save_index_from_nostr(
|
||||
encrypted_data, strict=strict
|
||||
)
|
||||
|
||||
# ----- Config helpers -----
|
||||
def load_config(self) -> dict:
|
||||
|
@@ -152,3 +152,41 @@ def test_view_archived_entries_removed_after_restore(monkeypatch, capsys):
|
||||
note = pm.notifications.get_nowait()
|
||||
assert note.level == "WARNING"
|
||||
assert note.message == "No archived entries found."
|
||||
|
||||
|
||||
def test_archived_entries_menu_hides_active(monkeypatch, capsys):
|
||||
with TemporaryDirectory() as tmpdir:
|
||||
tmp_path = Path(tmpdir)
|
||||
vault, enc_mgr = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD)
|
||||
cfg_mgr = ConfigManager(vault, tmp_path)
|
||||
backup_mgr = BackupManager(tmp_path, cfg_mgr)
|
||||
entry_mgr = EntryManager(vault, backup_mgr)
|
||||
|
||||
pm = PasswordManager.__new__(PasswordManager)
|
||||
pm.encryption_mode = EncryptionMode.SEED_ONLY
|
||||
pm.encryption_manager = enc_mgr
|
||||
pm.vault = vault
|
||||
pm.entry_manager = entry_mgr
|
||||
pm.backup_manager = backup_mgr
|
||||
pm.parent_seed = TEST_SEED
|
||||
pm.nostr_client = SimpleNamespace()
|
||||
pm.fingerprint_dir = tmp_path
|
||||
pm.is_dirty = False
|
||||
pm.notifications = queue.Queue()
|
||||
|
||||
archived_idx = entry_mgr.add_entry("archived.com", 8)
|
||||
active_idx = entry_mgr.add_entry("active.com", 8)
|
||||
|
||||
# Archive only the first entry
|
||||
monkeypatch.setattr("builtins.input", lambda *_: str(archived_idx))
|
||||
pm.handle_archive_entry()
|
||||
assert entry_mgr.retrieve_entry(archived_idx)["archived"] is True
|
||||
assert entry_mgr.retrieve_entry(active_idx)["archived"] is False
|
||||
|
||||
# View archived entries and immediately exit
|
||||
inputs = iter([""])
|
||||
monkeypatch.setattr("builtins.input", lambda *_: next(inputs))
|
||||
pm.handle_view_archived_entries()
|
||||
out = capsys.readouterr().out
|
||||
assert "archived.com" in out
|
||||
assert "active.com" not in out
|
||||
|
@@ -63,7 +63,7 @@ def test_index_export_import_round_trip():
|
||||
},
|
||||
}
|
||||
)
|
||||
vault.decrypt_and_save_index_from_nostr(encrypted)
|
||||
assert vault.decrypt_and_save_index_from_nostr(encrypted)
|
||||
|
||||
loaded = vault.load_index()
|
||||
assert loaded["entries"] == original["entries"]
|
||||
|
@@ -6,6 +6,9 @@ sys.path.append(str(Path(__file__).resolve().parents[1]))
|
||||
|
||||
from utils.fingerprint_manager import FingerprintManager
|
||||
from password_manager.manager import PasswordManager, EncryptionMode
|
||||
from helpers import create_vault, dummy_nostr_client
|
||||
import gzip
|
||||
from nostr.backup_models import Manifest, ChunkMeta
|
||||
|
||||
|
||||
VALID_SEED = "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about"
|
||||
@@ -51,3 +54,33 @@ def test_add_and_switch_fingerprint(monkeypatch):
|
||||
assert pm.current_fingerprint == fingerprint
|
||||
assert fm.current_fingerprint == fingerprint
|
||||
assert pm.fingerprint_dir == expected_dir
|
||||
|
||||
|
||||
def test_sync_index_missing_bad_data(monkeypatch, dummy_nostr_client):
|
||||
client, _relay = dummy_nostr_client
|
||||
with TemporaryDirectory() as tmpdir:
|
||||
dir_path = Path(tmpdir)
|
||||
vault, _enc = create_vault(dir_path)
|
||||
|
||||
pm = PasswordManager.__new__(PasswordManager)
|
||||
pm.fingerprint_dir = dir_path
|
||||
pm.vault = vault
|
||||
pm.nostr_client = client
|
||||
pm.sync_vault = lambda *a, **k: None
|
||||
|
||||
manifest = Manifest(
|
||||
ver=1,
|
||||
algo="aes-gcm",
|
||||
chunks=[ChunkMeta(id="c0", size=1, hash="00")],
|
||||
delta_since=None,
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
client,
|
||||
"fetch_latest_snapshot",
|
||||
lambda: (manifest, [gzip.compress(b"garbage")]),
|
||||
)
|
||||
monkeypatch.setattr(client, "fetch_deltas_since", lambda *_a, **_k: [])
|
||||
|
||||
pm.sync_index_from_nostr_if_missing()
|
||||
data = pm.vault.load_index()
|
||||
assert data["entries"] == {}
|
||||
|
Reference in New Issue
Block a user