diff --git a/requirements.lock b/requirements.lock index 5b74e12..7d79820 100644 --- a/requirements.lock +++ b/requirements.lock @@ -19,6 +19,7 @@ cryptography==45.0.4 ecdsa==0.19.1 ed25519-blake2b==1.4.1 execnet==2.1.1 +fastapi==0.116.0 frozenlist==1.7.0 glob2==0.7 hypothesis==6.135.20 @@ -57,6 +58,7 @@ termcolor==3.1.0 toml==0.10.2 tomli==2.2.1 urllib3==2.5.0 +uvicorn==0.35.0 varint==1.0.2 websocket-client==1.7.0 websockets==15.0.1 diff --git a/src/requirements.txt b/src/requirements.txt index 5e9e0a0..14018ab 100644 --- a/src/requirements.txt +++ b/src/requirements.txt @@ -25,3 +25,5 @@ freezegun pyperclip qrcode>=8.2 typer>=0.12.3 +fastapi>=0.116.0 +uvicorn>=0.35.0 diff --git a/src/seedpass/api.py b/src/seedpass/api.py new file mode 100644 index 0000000..fbe1537 --- /dev/null +++ b/src/seedpass/api.py @@ -0,0 +1,104 @@ +"""SeedPass FastAPI server.""" + +from __future__ import annotations + +import os +import secrets +from typing import Any, List, Optional + +from fastapi import FastAPI, Header, HTTPException +import asyncio +import sys +from fastapi.middleware.cors import CORSMiddleware + +from password_manager.manager import PasswordManager + + +app = FastAPI() + +_pm: Optional[PasswordManager] = None +_token: str = "" + + +def _check_token(auth: str | None) -> None: + if auth != f"Bearer {_token}": + raise HTTPException(status_code=401, detail="Unauthorized") + + +def start_server() -> str: + """Initialize global state and return the API token.""" + global _pm, _token + _pm = PasswordManager() + _token = secrets.token_urlsafe(16) + print(f"API token: {_token}") + origins = [ + o.strip() + for o in os.getenv("SEEDPASS_CORS_ORIGINS", "").split(",") + if o.strip() + ] + if origins and app.middleware_stack is None: + app.add_middleware( + CORSMiddleware, + allow_origins=origins, + allow_methods=["*"], + allow_headers=["*"], + ) + return _token + + +@app.get("/api/v1/entry") +def search_entry(query: str, authorization: str | None = Header(None)) -> List[Any]: + _check_token(authorization) + assert _pm is not None + results = _pm.entry_manager.search_entries(query) + return [ + { + "id": idx, + "label": label, + "username": username, + "url": url, + "archived": archived, + } + for idx, label, username, url, archived in results + ] + + +@app.get("/api/v1/entry/{entry_id}") +def get_entry(entry_id: int, authorization: str | None = Header(None)) -> Any: + _check_token(authorization) + assert _pm is not None + entry = _pm.entry_manager.retrieve_entry(entry_id) + if entry is None: + raise HTTPException(status_code=404, detail="Not found") + return entry + + +@app.get("/api/v1/config/{key}") +def get_config(key: str, authorization: str | None = Header(None)) -> Any: + _check_token(authorization) + assert _pm is not None + value = _pm.config_manager.load_config(require_pin=False).get(key) + if value is None: + raise HTTPException(status_code=404, detail="Not found") + return {"key": key, "value": value} + + +@app.get("/api/v1/fingerprint") +def list_fingerprints(authorization: str | None = Header(None)) -> List[str]: + _check_token(authorization) + assert _pm is not None + return _pm.fingerprint_manager.list_fingerprints() + + +@app.get("/api/v1/nostr/pubkey") +def get_nostr_pubkey(authorization: str | None = Header(None)) -> Any: + _check_token(authorization) + assert _pm is not None + return {"npub": _pm.nostr_client.key_manager.get_npub()} + + +@app.post("/api/v1/shutdown") +async def shutdown_server(authorization: str | None = Header(None)) -> dict[str, str]: + _check_token(authorization) + asyncio.get_event_loop().call_soon(sys.exit, 0) + return {"status": "shutting down"} diff --git a/src/seedpass/cli.py b/src/seedpass/cli.py index 1b4a44a..b7718fc 100644 --- a/src/seedpass/cli.py +++ b/src/seedpass/cli.py @@ -5,6 +5,8 @@ import typer from password_manager.manager import PasswordManager from password_manager.entry_types import EntryType +import uvicorn +from . import api as api_module app = typer.Typer(help="SeedPass command line interface") @@ -23,6 +25,7 @@ nostr_app = typer.Typer(help="Interact with Nostr relays") config_app = typer.Typer(help="Manage configuration values") fingerprint_app = typer.Typer(help="Manage seed profiles") util_app = typer.Typer(help="Utility commands") +api_app = typer.Typer(help="Run the API server") app.add_typer(entry_app, name="entry") app.add_typer(vault_app, name="vault") @@ -30,6 +33,7 @@ app.add_typer(nostr_app, name="nostr") app.add_typer(config_app, name="config") app.add_typer(fingerprint_app, name="fingerprint") app.add_typer(util_app, name="util") +app.add_typer(api_app, name="api") def _get_pm(ctx: typer.Context) -> PasswordManager: @@ -169,3 +173,26 @@ def fingerprint_list(ctx: typer.Context) -> None: def generate_password(ctx: typer.Context, length: int = 24) -> None: """Generate a strong password.""" typer.echo(f"Generate password of length {length} for {ctx.obj.get('fingerprint')}") + + +@api_app.command("start") +def api_start(host: str = "127.0.0.1", port: int = 8000) -> None: + """Start the SeedPass API server.""" + token = api_module.start_server() + typer.echo(f"API token: {token}") + uvicorn.run(api_module.app, host=host, port=port) + + +@api_app.command("stop") +def api_stop(host: str = "127.0.0.1", port: int = 8000) -> None: + """Stop the SeedPass API server.""" + import requests + + try: + requests.post( + f"http://{host}:{port}/api/v1/shutdown", + headers={"Authorization": f"Bearer {api_module._token}"}, + timeout=2, + ) + except Exception as exc: # pragma: no cover - best effort + typer.echo(f"Failed to stop server: {exc}") diff --git a/src/tests/test_api.py b/src/tests/test_api.py new file mode 100644 index 0000000..33cf4b6 --- /dev/null +++ b/src/tests/test_api.py @@ -0,0 +1,50 @@ +from types import SimpleNamespace +from pathlib import Path +import sys + +import pytest +from fastapi.testclient import TestClient + +sys.path.append(str(Path(__file__).resolve().parents[1])) + +from seedpass import api + + +@pytest.fixture +def client(monkeypatch): + dummy = SimpleNamespace( + entry_manager=SimpleNamespace( + search_entries=lambda q: [(1, "Site", "user", "url", False)], + retrieve_entry=lambda i: {"label": "Site"}, + ), + config_manager=SimpleNamespace( + load_config=lambda require_pin=False: {"k": "v"} + ), + fingerprint_manager=SimpleNamespace(list_fingerprints=lambda: ["fp"]), + nostr_client=SimpleNamespace( + key_manager=SimpleNamespace(get_npub=lambda: "np") + ), + ) + monkeypatch.setattr(api, "PasswordManager", lambda: dummy) + monkeypatch.setenv("SEEDPASS_CORS_ORIGINS", "http://example.com") + token = api.start_server() + client = TestClient(api.app) + return client, token + + +def test_cors_and_auth(client): + cl, token = client + headers = {"Authorization": f"Bearer {token}", "Origin": "http://example.com"} + res = cl.get("/api/v1/entry", params={"query": "s"}, headers=headers) + assert res.status_code == 200 + assert res.headers.get("access-control-allow-origin") == "http://example.com" + + +def test_invalid_token(client): + cl, _token = client + res = cl.get( + "/api/v1/entry", + params={"query": "s"}, + headers={"Authorization": "Bearer bad"}, + ) + assert res.status_code == 401