mirror of
https://github.com/PR0M3TH3AN/SeedPass.git
synced 2025-09-09 07:48:57 +00:00
Merge pull request #150 from PR0M3TH3AN/codex/update-tests-to-use-new-api-methods
Update tests for snapshot API
This commit is contained in:
@@ -57,7 +57,7 @@ def test_encryption_mode_migration(monkeypatch, start_mode, new_mode):
|
|||||||
pm.current_fingerprint = "fp"
|
pm.current_fingerprint = "fp"
|
||||||
pm.parent_seed = TEST_SEED
|
pm.parent_seed = TEST_SEED
|
||||||
pm.encryption_mode = start_mode
|
pm.encryption_mode = start_mode
|
||||||
pm.nostr_client = SimpleNamespace(publish_json_to_nostr=lambda *a, **k: None)
|
pm.nostr_client = SimpleNamespace(publish_snapshot=lambda *a, **k: None)
|
||||||
|
|
||||||
monkeypatch.setattr(
|
monkeypatch.setattr(
|
||||||
"password_manager.manager.prompt_existing_password",
|
"password_manager.manager.prompt_existing_password",
|
||||||
@@ -65,9 +65,7 @@ def test_encryption_mode_migration(monkeypatch, start_mode, new_mode):
|
|||||||
)
|
)
|
||||||
monkeypatch.setattr(
|
monkeypatch.setattr(
|
||||||
"password_manager.manager.NostrClient",
|
"password_manager.manager.NostrClient",
|
||||||
lambda *a, **kw: SimpleNamespace(
|
lambda *a, **kw: SimpleNamespace(publish_snapshot=lambda *a, **k: None),
|
||||||
publish_json_to_nostr=lambda *a, **k: None
|
|
||||||
),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
pm.change_encryption_mode(new_mode)
|
pm.change_encryption_mode(new_mode)
|
||||||
|
@@ -20,9 +20,8 @@ class FakeNostrClient:
|
|||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
self.published = []
|
self.published = []
|
||||||
|
|
||||||
def publish_json_to_nostr(self, data: bytes):
|
def publish_snapshot(self, data: bytes):
|
||||||
self.published.append(data)
|
self.published.append(data)
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
def test_manager_workflow(monkeypatch):
|
def test_manager_workflow(monkeypatch):
|
||||||
|
@@ -1,7 +1,8 @@
|
|||||||
import sys
|
import sys
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from tempfile import TemporaryDirectory
|
from tempfile import TemporaryDirectory
|
||||||
from unittest.mock import patch
|
from unittest.mock import patch, AsyncMock
|
||||||
|
import asyncio
|
||||||
from helpers import create_vault, TEST_SEED, TEST_PASSWORD
|
from helpers import create_vault, TEST_SEED, TEST_PASSWORD
|
||||||
|
|
||||||
sys.path.append(str(Path(__file__).resolve().parents[1]))
|
sys.path.append(str(Path(__file__).resolve().parents[1]))
|
||||||
@@ -23,7 +24,8 @@ def test_backup_and_publish_to_nostr():
|
|||||||
assert encrypted_index is not None
|
assert encrypted_index is not None
|
||||||
|
|
||||||
with patch(
|
with patch(
|
||||||
"nostr.client.NostrClient.publish_json_to_nostr", return_value=True
|
"nostr.client.NostrClient.publish_snapshot",
|
||||||
|
AsyncMock(return_value=None),
|
||||||
) as mock_publish, patch("nostr.client.ClientBuilder"), patch(
|
) as mock_publish, patch("nostr.client.ClientBuilder"), patch(
|
||||||
"nostr.client.KeyManager"
|
"nostr.client.KeyManager"
|
||||||
), patch.object(
|
), patch.object(
|
||||||
@@ -33,7 +35,7 @@ def test_backup_and_publish_to_nostr():
|
|||||||
):
|
):
|
||||||
nostr_client = NostrClient(enc_mgr, "fp")
|
nostr_client = NostrClient(enc_mgr, "fp")
|
||||||
entry_mgr.backup_index_file()
|
entry_mgr.backup_index_file()
|
||||||
result = nostr_client.publish_json_to_nostr(encrypted_index)
|
result = asyncio.run(nostr_client.publish_snapshot(encrypted_index))
|
||||||
|
|
||||||
mock_publish.assert_called_with(encrypted_index)
|
mock_publish.assert_awaited_with(encrypted_index)
|
||||||
assert result is True
|
assert result is None
|
||||||
|
@@ -1,12 +1,14 @@
|
|||||||
import sys
|
import sys
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from unittest.mock import patch
|
from unittest.mock import patch
|
||||||
|
import asyncio
|
||||||
|
import gzip
|
||||||
from cryptography.fernet import Fernet
|
from cryptography.fernet import Fernet
|
||||||
|
|
||||||
sys.path.append(str(Path(__file__).resolve().parents[1]))
|
sys.path.append(str(Path(__file__).resolve().parents[1]))
|
||||||
|
|
||||||
from password_manager.encryption import EncryptionManager
|
from password_manager.encryption import EncryptionManager
|
||||||
from nostr.client import NostrClient
|
from nostr.client import NostrClient, Manifest
|
||||||
|
|
||||||
|
|
||||||
class MockNostrServer:
|
class MockNostrServer:
|
||||||
@@ -17,6 +19,7 @@ class MockNostrServer:
|
|||||||
class MockClient:
|
class MockClient:
|
||||||
def __init__(self, server):
|
def __init__(self, server):
|
||||||
self.server = server
|
self.server = server
|
||||||
|
self.pos = -1
|
||||||
|
|
||||||
async def add_relays(self, relays):
|
async def add_relays(self, relays):
|
||||||
pass
|
pass
|
||||||
@@ -44,14 +47,17 @@ class MockClient:
|
|||||||
return FakeOutput()
|
return FakeOutput()
|
||||||
|
|
||||||
async def fetch_events(self, filter_obj, timeout):
|
async def fetch_events(self, filter_obj, timeout):
|
||||||
|
ev = self.server.events[self.pos]
|
||||||
|
self.pos -= 1
|
||||||
|
|
||||||
class FakeEvents:
|
class FakeEvents:
|
||||||
def __init__(self, events):
|
def __init__(self, event):
|
||||||
self._events = events
|
self._event = event
|
||||||
|
|
||||||
def to_vec(self):
|
def to_vec(self):
|
||||||
return self._events
|
return [self._event]
|
||||||
|
|
||||||
return FakeEvents(self.server.events[-1:])
|
return FakeEvents(ev)
|
||||||
|
|
||||||
|
|
||||||
def setup_client(tmp_path, server):
|
def setup_client(tmp_path, server):
|
||||||
@@ -72,5 +78,6 @@ def test_publish_and_retrieve(tmp_path):
|
|||||||
server = MockNostrServer()
|
server = MockNostrServer()
|
||||||
client = setup_client(tmp_path, server)
|
client = setup_client(tmp_path, server)
|
||||||
payload = b"contract-test"
|
payload = b"contract-test"
|
||||||
assert client.publish_json_to_nostr(payload) is True
|
asyncio.run(client.publish_snapshot(payload))
|
||||||
assert client.retrieve_json_from_nostr_sync() == payload
|
manifest, chunks = asyncio.run(client.fetch_latest_snapshot())
|
||||||
|
assert gzip.decompress(b"".join(chunks)) == payload
|
||||||
|
@@ -3,6 +3,8 @@ import time
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from tempfile import TemporaryDirectory
|
from tempfile import TemporaryDirectory
|
||||||
from unittest.mock import patch
|
from unittest.mock import patch
|
||||||
|
import asyncio
|
||||||
|
import gzip
|
||||||
import sys
|
import sys
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
@@ -58,21 +60,28 @@ def test_nostr_index_size_limits():
|
|||||||
|
|
||||||
encrypted = vault.get_encrypted_index()
|
encrypted = vault.get_encrypted_index()
|
||||||
payload_size = len(encrypted) if encrypted else 0
|
payload_size = len(encrypted) if encrypted else 0
|
||||||
published = client.publish_json_to_nostr(encrypted or b"")
|
asyncio.run(client.publish_snapshot(encrypted or b""))
|
||||||
time.sleep(delay)
|
time.sleep(delay)
|
||||||
retrieved = client.retrieve_json_from_nostr_sync()
|
result = asyncio.run(client.fetch_latest_snapshot())
|
||||||
|
retrieved = gzip.decompress(b"".join(result[1])) if result else None
|
||||||
retrieved_ok = retrieved == encrypted
|
retrieved_ok = retrieved == encrypted
|
||||||
if not retrieved_ok:
|
if not retrieved_ok:
|
||||||
print(f"Initial retrieve failed: {client.last_error}")
|
print(f"Initial retrieve failed: {client.last_error}")
|
||||||
retrieved = client.retrieve_json_from_nostr_sync(retries=1)
|
result = asyncio.run(client.fetch_latest_snapshot())
|
||||||
|
retrieved = (
|
||||||
|
gzip.decompress(b"".join(result[1])) if result else None
|
||||||
|
)
|
||||||
retrieved_ok = retrieved == encrypted
|
retrieved_ok = retrieved == encrypted
|
||||||
if not retrieved_ok:
|
if not retrieved_ok:
|
||||||
print("Trying alternate relay")
|
print("Trying alternate relay")
|
||||||
client.update_relays(["wss://relay.damus.io"])
|
client.update_relays(["wss://relay.damus.io"])
|
||||||
retrieved = client.retrieve_json_from_nostr_sync(retries=1)
|
result = asyncio.run(client.fetch_latest_snapshot())
|
||||||
|
retrieved = (
|
||||||
|
gzip.decompress(b"".join(result[1])) if result else None
|
||||||
|
)
|
||||||
retrieved_ok = retrieved == encrypted
|
retrieved_ok = retrieved == encrypted
|
||||||
results.append((entry_count, payload_size, published, retrieved_ok))
|
results.append((entry_count, payload_size, True, retrieved_ok))
|
||||||
if not published or not retrieved_ok or payload_size > max_payload:
|
if not retrieved_ok or payload_size > max_payload:
|
||||||
break
|
break
|
||||||
size *= 2
|
size *= 2
|
||||||
except Exception:
|
except Exception:
|
||||||
|
@@ -4,6 +4,8 @@ import time
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from tempfile import TemporaryDirectory
|
from tempfile import TemporaryDirectory
|
||||||
from unittest.mock import patch
|
from unittest.mock import patch
|
||||||
|
import asyncio
|
||||||
|
import gzip
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
@@ -31,8 +33,9 @@ def test_nostr_publish_and_retrieve():
|
|||||||
relays=["wss://relay.snort.social"],
|
relays=["wss://relay.snort.social"],
|
||||||
)
|
)
|
||||||
payload = b"seedpass"
|
payload = b"seedpass"
|
||||||
assert client.publish_json_to_nostr(payload) is True
|
asyncio.run(client.publish_snapshot(payload))
|
||||||
time.sleep(2)
|
time.sleep(2)
|
||||||
retrieved = client.retrieve_json_from_nostr_sync()
|
result = asyncio.run(client.fetch_latest_snapshot())
|
||||||
|
retrieved = gzip.decompress(b"".join(result[1])) if result else None
|
||||||
client.close_client_pool()
|
client.close_client_pool()
|
||||||
assert retrieved == payload
|
assert retrieved == payload
|
||||||
|
@@ -54,7 +54,7 @@ def test_password_change_and_unlock(monkeypatch):
|
|||||||
pm.fingerprint_dir = fp
|
pm.fingerprint_dir = fp
|
||||||
pm.current_fingerprint = "fp"
|
pm.current_fingerprint = "fp"
|
||||||
pm.parent_seed = SEED
|
pm.parent_seed = SEED
|
||||||
pm.nostr_client = SimpleNamespace(publish_json_to_nostr=lambda *a, **k: None)
|
pm.nostr_client = SimpleNamespace(publish_snapshot=lambda *a, **k: None)
|
||||||
|
|
||||||
monkeypatch.setattr(
|
monkeypatch.setattr(
|
||||||
"password_manager.manager.prompt_existing_password", lambda *_: old_pw
|
"password_manager.manager.prompt_existing_password", lambda *_: old_pw
|
||||||
@@ -64,9 +64,7 @@ def test_password_change_and_unlock(monkeypatch):
|
|||||||
)
|
)
|
||||||
monkeypatch.setattr(
|
monkeypatch.setattr(
|
||||||
"password_manager.manager.NostrClient",
|
"password_manager.manager.NostrClient",
|
||||||
lambda *a, **kw: SimpleNamespace(
|
lambda *a, **kw: SimpleNamespace(publish_snapshot=lambda *a, **k: None),
|
||||||
publish_json_to_nostr=lambda *a, **k: None
|
|
||||||
),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
pm.change_password()
|
pm.change_password()
|
||||||
|
@@ -60,9 +60,7 @@ def test_add_and_delete_entry(monkeypatch):
|
|||||||
|
|
||||||
published = []
|
published = []
|
||||||
pm.nostr_client = SimpleNamespace(
|
pm.nostr_client = SimpleNamespace(
|
||||||
publish_json_to_nostr=lambda data, alt_summary=None: (
|
publish_snapshot=lambda data, alt_summary=None: published.append(data)
|
||||||
published.append(data) or True
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
inputs = iter([str(index)])
|
inputs = iter([str(index)])
|
||||||
|
@@ -2,12 +2,14 @@ import sys
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from tempfile import TemporaryDirectory
|
from tempfile import TemporaryDirectory
|
||||||
from unittest.mock import patch
|
from unittest.mock import patch
|
||||||
|
import asyncio
|
||||||
|
import pytest
|
||||||
from cryptography.fernet import Fernet
|
from cryptography.fernet import Fernet
|
||||||
|
|
||||||
sys.path.append(str(Path(__file__).resolve().parents[1]))
|
sys.path.append(str(Path(__file__).resolve().parents[1]))
|
||||||
|
|
||||||
from password_manager.encryption import EncryptionManager
|
from password_manager.encryption import EncryptionManager
|
||||||
from nostr.client import NostrClient
|
from nostr.client import NostrClient, Manifest
|
||||||
|
|
||||||
|
|
||||||
def setup_client(tmp_path):
|
def setup_client(tmp_path):
|
||||||
@@ -27,21 +29,34 @@ def setup_client(tmp_path):
|
|||||||
|
|
||||||
|
|
||||||
class FakeEvent:
|
class FakeEvent:
|
||||||
def __init__(self):
|
def __init__(self, content="evt"):
|
||||||
self._id = "id"
|
self._id = "id"
|
||||||
|
self._content = content
|
||||||
|
|
||||||
def id(self):
|
def id(self):
|
||||||
return self._id
|
return self._id
|
||||||
|
|
||||||
|
def content(self):
|
||||||
|
return self._content
|
||||||
|
|
||||||
|
|
||||||
class FakeUnsignedEvent:
|
class FakeUnsignedEvent:
|
||||||
|
def __init__(self, content="evt"):
|
||||||
|
self._content = content
|
||||||
|
|
||||||
def sign_with_keys(self, _):
|
def sign_with_keys(self, _):
|
||||||
return FakeEvent()
|
return FakeEvent(self._content)
|
||||||
|
|
||||||
|
|
||||||
class FakeBuilder:
|
class FakeBuilder:
|
||||||
|
def __init__(self, _kind=None, content="evt"):
|
||||||
|
self._content = content
|
||||||
|
|
||||||
|
def tags(self, _tags):
|
||||||
|
return self
|
||||||
|
|
||||||
def build(self, _):
|
def build(self, _):
|
||||||
return FakeUnsignedEvent()
|
return FakeUnsignedEvent(self._content)
|
||||||
|
|
||||||
|
|
||||||
class FakeEventId:
|
class FakeEventId:
|
||||||
@@ -54,22 +69,32 @@ class FakeSendEventOutput:
|
|||||||
self.id = FakeEventId()
|
self.id = FakeEventId()
|
||||||
|
|
||||||
|
|
||||||
def test_publish_json_success():
|
def test_publish_snapshot_success():
|
||||||
with TemporaryDirectory() as tmpdir, patch(
|
with TemporaryDirectory() as tmpdir, patch(
|
||||||
"nostr.client.EventBuilder.text_note", return_value=FakeBuilder()
|
"nostr.client.EventBuilder", FakeBuilder
|
||||||
):
|
):
|
||||||
client = setup_client(Path(tmpdir))
|
client = setup_client(Path(tmpdir))
|
||||||
|
|
||||||
|
async def fake_send(event):
|
||||||
|
return FakeSendEventOutput()
|
||||||
|
|
||||||
with patch.object(
|
with patch.object(
|
||||||
client, "publish_event", return_value=FakeSendEventOutput()
|
client.client, "send_event", side_effect=fake_send
|
||||||
) as mock_pub:
|
) as mock_send:
|
||||||
assert client.publish_json_to_nostr(b"data") is True
|
manifest = asyncio.run(client.publish_snapshot(b"data"))
|
||||||
mock_pub.assert_called()
|
assert isinstance(manifest, Manifest)
|
||||||
|
assert mock_send.await_count >= 1
|
||||||
|
|
||||||
|
|
||||||
def test_publish_json_failure():
|
def test_publish_snapshot_failure():
|
||||||
with TemporaryDirectory() as tmpdir, patch(
|
with TemporaryDirectory() as tmpdir, patch(
|
||||||
"nostr.client.EventBuilder.text_note", return_value=FakeBuilder()
|
"nostr.client.EventBuilder", FakeBuilder
|
||||||
):
|
):
|
||||||
client = setup_client(Path(tmpdir))
|
client = setup_client(Path(tmpdir))
|
||||||
with patch.object(client, "publish_event", side_effect=Exception("boom")):
|
|
||||||
assert client.publish_json_to_nostr(b"data") is False
|
async def boom(_):
|
||||||
|
raise Exception("boom")
|
||||||
|
|
||||||
|
with patch.object(client.client, "send_event", side_effect=boom):
|
||||||
|
with pytest.raises(Exception):
|
||||||
|
asyncio.run(client.publish_snapshot(b"data"))
|
||||||
|
@@ -33,7 +33,7 @@ def setup_pm(tmp_path, monkeypatch):
|
|||||||
relays=list(DEFAULT_RELAYS),
|
relays=list(DEFAULT_RELAYS),
|
||||||
close_client_pool=lambda: None,
|
close_client_pool=lambda: None,
|
||||||
initialize_client_pool=lambda: None,
|
initialize_client_pool=lambda: None,
|
||||||
publish_json_to_nostr=lambda data, alt_summary=None: None,
|
publish_snapshot=lambda data, alt_summary=None: None,
|
||||||
key_manager=SimpleNamespace(get_npub=lambda: "npub"),
|
key_manager=SimpleNamespace(get_npub=lambda: "npub"),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user