Admin API: Service Control Router Implementation

Alex Johnson
-
Admin API: Service Control Router Implementation

Let's dive into implementing the Service Control Router with admin-only endpoints. This is issue #40, and it focuses on creating a secure and efficient way for administrators to manage services within the Fullon Master API. This router will enable admins to start, stop, restart, and check the status of various services, ensuring smooth operation and maintenance. This is a HIGH priority, aiming for completion in approximately 2 hours. It depends on the successful completion of Issue #38 (get_admin_user) and Issue #39 (ServiceManager). You can find helpful examples in examples/example_service_control.py (lines 17-22, 111-245).

Description

The goal is to create src/fullon_master_api/routers/services.py, which will house the admin-only service control endpoints. These endpoints are crucial for administrators to have complete control over the lifecycle of services. The implemented endpoints should allow for the following actions:

  • Starting services
  • Stopping services
  • Restarting services
  • Checking service status

Endpoints to Implement:

  • POST /api/v1/services/{service_name}/start - Start a service
  • POST /api/v1/services/{service_name}/stop - Stop a service
  • POST /api/v1/services/{service_name}/restart - Restart a service
  • GET /api/v1/services/{service_name}/status - Get service status
  • GET /api/v1/services - Get all services status

Authentication:

  • All endpoints must require admin authentication, which is to be enforced using the get_admin_user() dependency.

Valid Service Names:

  • The valid service names are ticker, ohlcv, and account, as defined in the ServiceName enum.

Implementation Strategy

Let's break down the implementation strategy into manageable steps.

1. Create routers/services.py

This file will contain all the necessary code for the service control endpoints. Here's a template to get you started:

"""
Admin-only service control endpoints.

Provides lifecycle management for Fullon service daemons:
- ticker: Real-time ticker data collection
- ohlcv: Historic + live OHLCV/trade collection
- account: Adaptive account monitoring

All endpoints require admin authentication (user.mail == ADMIN_MAIL).
"""
from fastapi import APIRouter, Depends, HTTPException, Request
from fullon_log import get_component_logger
from fullon_orm.models import User

from ..auth.dependencies import get_admin_user
from ..services.manager import ServiceManager, ServiceName

router = APIRouter(
    prefix="/services",
    tags=["services"],
)

logger = get_component_logger("fullon.master_api.routers.services")


def get_service_manager(request: Request) -> ServiceManager:
    """
    Dependency to get ServiceManager from app state.
    
    ServiceManager is initialized in gateway.py and stored in app.state.
    
    Args:
        request: FastAPI request object
    
    Returns:
        ServiceManager: Singleton service manager instance
    
    Raises:
        RuntimeError: If ServiceManager not initialized
    """
    if not hasattr(request.app.state, 'service_manager'):
        logger.error("ServiceManager not found in app state")
        raise RuntimeError("ServiceManager not initialized")
    
    return request.app.state.service_manager


@router.post("/{service_name}/start")
async def start_service(
    service_name: ServiceName,
    admin_user: User = Depends(get_admin_user),
    manager: ServiceManager = Depends(get_service_manager)
):
    """
    Start a service (admin only).
    
    Starts the specified service as an async background task.
    Returns error if service is already running.
    
    Args:
        service_name: Service to start (ticker, ohlcv, account)
        admin_user: Authenticated admin user (from dependency)
        manager: ServiceManager instance (from dependency)
    
    Returns:
        dict: {"service": str, "status": "started"}
    
    Raises:
        HTTPException 400: If service is already running
        HTTPException 403: If user is not admin
    
    Example:
        POST /api/v1/services/ticker/start
        Response: {"service": "ticker", "status": "started"}
    """
    try:
        result = await manager.start_service(service_name)
        logger.info(
            "Service started by admin",
            service=service_name,
            user_id=admin_user.uid,
            admin_email=admin_user.mail
        )
        return result
    except ValueError as e:
        logger.warning(
            "Service start failed",
            service=service_name,
            error=str(e),
            user_id=admin_user.uid
        )
        raise HTTPException(status_code=400, detail=str(e))


@router.post("/{service_name}/stop")
async def stop_service(
    service_name: ServiceName,
    admin_user: User = Depends(get_admin_user),
    manager: ServiceManager = Depends(get_service_manager)
):
    """
    Stop a service (admin only).
    
    Gracefully stops the specified service by calling daemon.stop()
    then cancelling the async task. Returns error if service is not running.
    
    Args:
        service_name: Service to stop (ticker, ohlcv, account)
        admin_user: Authenticated admin user (from dependency)
        manager: ServiceManager instance (from dependency)
    
    Returns:
        dict: {"service": str, "status": "stopped"}
    
    Raises:
        HTTPException 400: If service is not running
        HTTPException 403: If user is not admin
    
    Example:
        POST /api/v1/services/ticker/stop
        Response: {"service": "ticker", "status": "stopped"}
    """
    try:
        result = await manager.stop_service(service_name)
        logger.info(
            "Service stopped by admin",
            service=service_name,
            user_id=admin_user.uid,
            admin_email=admin_user.mail
        )
        return result
    except ValueError as e:
        logger.warning(
            "Service stop failed",
            service=service_name,
            error=str(e),
            user_id=admin_user.uid
        )
        raise HTTPException(status_code=400, detail=str(e))


@router.post("/{service_name}/restart")
async def restart_service(
    service_name: ServiceName,
    admin_user: User = Depends(get_admin_user),
    manager: ServiceManager = Depends(get_service_manager)
):
    """
    Restart a service (admin only).
    
    Stops the service (if running) then starts it after a brief pause.
    Always succeeds regardless of initial state.
    
    Args:
        service_name: Service to restart (ticker, ohlcv, account)
        admin_user: Authenticated admin user (from dependency)
        manager: ServiceManager instance (from dependency)
    
    Returns:
        dict: {"service": str, "status": "restarted"}
    
    Raises:
        HTTPException 403: If user is not admin
    
    Example:
        POST /api/v1/services/ticker/restart
        Response: {"service": "ticker", "status": "restarted"}
    """
    result = await manager.restart_service(service_name)
    logger.info(
        "Service restarted by admin",
        service=service_name,
        user_id=admin_user.uid,
        admin_email=admin_user.mail
    )
    return result


@router.get("/{service_name}/status")
async def get_service_status(
    service_name: ServiceName,
    admin_user: User = Depends(get_admin_user),
    manager: ServiceManager = Depends(get_service_manager)
):
    """
    Get service status (admin only).
    
    Returns current status of the specified service.
    
    Args:
        service_name: Service to check (ticker, ohlcv, account)
        admin_user: Authenticated admin user (from dependency)
        manager: ServiceManager instance (from dependency)
    
    Returns:
        dict: {
            "service": str,
            "status": "running" | "stopped",
            "is_running": bool
        }
    
    Raises:
        HTTPException 403: If user is not admin
    
    Example:
        GET /api/v1/services/ticker/status
        Response: {
            "service": "ticker",
            "status": "running",
            "is_running": true
        }
    """
    return manager.get_service_status(service_name)


@router.get("")
async def get_all_services_status(
    admin_user: User = Depends(get_admin_user),
    manager: ServiceManager = Depends(get_service_manager)
):
    """
    Get status of all services (admin only).
    
    Returns status dict for all managed services.
    
    Args:
        admin_user: Authenticated admin user (from dependency)
        manager: ServiceManager instance (from dependency)
    
    Returns:
        dict: {
            "services": {
                "ticker": {"service": "ticker", "status": "running", ...},
                "ohlcv": {"service": "ohlcv", "status": "stopped", ...},
                "account": {"service": "account", "status": "stopped", ...}
            }
        }
    
    Raises:
        HTTPException 403: If user is not admin
    
    Example:
        GET /api/v1/services
        Response: {
            "services": {
                "ticker": {"service": "ticker", "status": "running", "is_running": true},
                "ohlcv": {"service": "ohlcv", "status": "stopped", "is_running": false},
                "account": {"service": "account", "status": "stopped", "is_running": false}
            }
        }
    """
    return manager.get_all_status()

2. Create Integration Tests

To ensure the functionality and security of the endpoints, create tests/integration/test_service_control.py with the following tests:

"""Integration tests for service control endpoints."""
import pytest
from httpx import AsyncClient
from fullon_master_api.gateway import app

@pytest.fixture
async def admin_client():
    """Create async client for admin requests."""
    async with AsyncClient(app=app, base_url="http://test") as client:
        yield client

@pytest.mark.asyncio
async def test_start_service_admin(admin_client, admin_token):
    """Test admin can start service."""
    headers = {"Authorization": f"Bearer {admin_token}"}
    
    response = await admin_client.post(
        "/api/v1/services/ticker/start",
        headers=headers
    )
    
    assert response.status_code in [200, 400]  # 200 = started, 400 = already running
    if response.status_code == 200:
        data = response.json()
        assert data["service"] == "ticker"
        assert data["status"] == "started"

@pytest.mark.asyncio
async def test_stop_service_admin(admin_client, admin_token):
    """Test admin can stop service."""
    headers = {"Authorization": f"Bearer {admin_token}"}
    
    # Start first
    await admin_client.post("/api/v1/services/ticker/start", headers=headers)
    
    # Stop
    response = await admin_client.post(
        "/api/v1/services/ticker/stop",
        headers=headers
    )
    
    assert response.status_code in [200, 400]  # 200 = stopped, 400 = not running

@pytest.mark.asyncio
async def test_restart_service_admin(admin_client, admin_token):
    """Test admin can restart service."""
    headers = {"Authorization": f"Bearer {admin_token}"}
    
    response = await admin_client.post(
        "/api/v1/services/ticker/restart",
        headers=headers
    )
    
    assert response.status_code == 200
    data = response.json()
    assert data["service"] == "ticker"
    assert data["status"] == "restarted"

@pytest.mark.asyncio
async def test_get_service_status_admin(admin_client, admin_token):
    """Test admin can get service status."""
    headers = {"Authorization": f"Bearer {admin_token}"}
    
    response = await admin_client.get(
        "/api/v1/services/ticker/status",
        headers=headers
    )
    
    assert response.status_code == 200
    data = response.json()
    assert data["service"] == "ticker"
    assert "status" in data
    assert "is_running" in data

@pytest.mark.asyncio
async def test_get_all_services_status_admin(admin_client, admin_token):
    """Test admin can get all services status."""
    headers = {"Authorization": f"Bearer {admin_token}"}
    
    response = await admin_client.get(
        "/api/v1/services",
        headers=headers
    )
    
    assert response.status_code == 200
    data = response.json()
    assert "services" in data
    assert "ticker" in data["services"]
    assert "ohlcv" in data["services"]
    assert "account" in data["services"]

@pytest.mark.asyncio
async def test_non_admin_cannot_start_service(admin_client, user_token):
    """Test non-admin user gets 403 Forbidden."""
    headers = {"Authorization": f"Bearer {user_token}"}
    
    response = await admin_client.post(
        "/api/v1/services/ticker/start",
        headers=headers
    )
    
    assert response.status_code == 403  # Forbidden

@pytest.mark.asyncio
async def test_unauthenticated_cannot_access(admin_client):
    """Test unauthenticated request gets 401."""
    response = await admin_client.post("/api/v1/services/ticker/start")
    
    assert response.status_code == 401  # Unauthorized

Testing Requirements

Integration Tests

Run the integration tests to verify the functionality of the service control endpoints.

# Run service control integration tests
pytest tests/integration/test_service_control.py -v

# Expected tests:
# - test_start_service_admin
# - test_stop_service_admin
# - test_restart_service_admin
# - test_get_service_status_admin
# - test_get_all_services_status_admin
# - test_non_admin_cannot_start_service
# - test_unauthenticated_cannot_access

Manual Testing with Example

You can also manually test the endpoints using the provided example script.

# Start server
make run

# In another terminal, run example
python examples/example_service_control.py --action demo

# Should show:
# - All services status (GET /api/v1/services)
# - Start ticker (POST /api/v1/services/ticker/start)
# - Check ticker status (GET /api/v1/services/ticker/status)
# - Stop ticker (POST /api/v1/services/ticker/stop)
# - Restart ticker (POST /api/v1/services/ticker/restart)

Acceptance Criteria

To ensure the successful completion of this issue, the following criteria must be met:

  • [ ] routers/services.py created with service control endpoints
  • [ ] POST /services/{service_name}/start endpoint implemented
  • [ ] POST /services/{service_name}/stop endpoint implemented
  • [ ] POST /services/{service_name}/restart endpoint implemented
  • [ ] GET /services/{service_name}/status endpoint implemented
  • [ ] GET /services endpoint implemented
  • [ ] All endpoints require get_admin_user() dependency
  • [ ] get_service_manager() dependency retrieves manager from app.state
  • [ ] ServiceName enum used for path parameter validation
  • [ ] Error handling: 400 for invalid operations, 403 for non-admin
  • [ ] Structured logging for all operations
  • [ ] Integration tests pass (tests/integration/test_service_control.py)
  • [ ] Example script works (examples/example_service_control.py)

Git Workflow

Follow these steps to manage your Git workflow:

# Create feature branch
git checkout -b issue-40-service-control-router

# 1. Implement routers/services.py
# 2. Create and run integration tests
pytest tests/integration/test_service_control.py -v

# Test with example (needs gateway integration from Issue #41)
# python examples/example_service_control.py --action demo

# Commit changes
git add src/fullon_master_api/routers/services.py
git add tests/integration/test_service_control.py
git commit -m "Issue #40: Implement service control router (admin-only)

- Add service control endpoints at /api/v1/services/*
- POST start/stop/restart endpoints for service lifecycle
- GET status endpoints for monitoring
- Admin-only access via get_admin_user() dependency
- ServiceManager accessed via app.state dependency
- ServiceName enum for path parameter validation
- Comprehensive integration tests
- Structured logging for all operations

Endpoints:
- POST /api/v1/services/{service}/start
- POST /api/v1/services/{service}/stop
- POST /api/v1/services/{service}/restart
- GET /api/v1/services/{service}/status
- GET /api/v1/services

Enables admin service control in Phase 6.

Generated with Claude Code
Co-Authored-By: Claude <noreply@anthropic.com>"

# Run quality checks
make lint
make test

# Push and create PR
git push -u origin issue-40-service-control-router
gh pr create --title "Issue #40: Service Control Router" \
  --body "Admin-only service control endpoints for Phase 6.

Provides lifecycle management for ticker, ohlcv, account services."

References

  • Example: examples/example_service_control.py - Expected endpoint behavior
  • Masterplan: masterplan.md Phase 6 (lines 977-1054) - Router implementation
  • Dependencies: Issue #38 (get_admin_user), Issue #39 (ServiceManager)
  • Pattern: FastAPI router with dependency injection

Blockers

  • Required: Issue #38 (get_admin_user) must be completed
  • Required: Issue #39 (ServiceManager) must be completed
  • Note: Gateway integration (Issue #41) needed for full functionality

Notes

  • Admin Only: All endpoints protected by get_admin_user() dependency
  • ServiceName Enum: FastAPI validates service_name against enum automatically
  • Error Responses: 400 for invalid state, 403 for non-admin, 401 for unauth
  • ServiceManager: Accessed via request.app.state.service_manager
  • Logging: All operations logged with admin user identification
  • Testing: Integration tests use mocked ServiceManager for isolation
  • Example Validation: Run example_service_control.py to verify behavior

For more information on FastAPI dependencies and routing, check out the official FastAPI documentation.

You may also like