Admin API: Service Control Router Implementation
Let's dive into implementing the Service Control Router with admin-only endpoints. This is issue #40, and it focuses on creating a secure and efficient way for administrators to manage services within the Fullon Master API. This router will enable admins to start, stop, restart, and check the status of various services, ensuring smooth operation and maintenance. This is a HIGH priority, aiming for completion in approximately 2 hours. It depends on the successful completion of Issue #38 (get_admin_user) and Issue #39 (ServiceManager). You can find helpful examples in examples/example_service_control.py (lines 17-22, 111-245).
Description
The goal is to create src/fullon_master_api/routers/services.py, which will house the admin-only service control endpoints. These endpoints are crucial for administrators to have complete control over the lifecycle of services. The implemented endpoints should allow for the following actions:
- Starting services
- Stopping services
- Restarting services
- Checking service status
Endpoints to Implement:
POST /api/v1/services/{service_name}/start- Start a servicePOST /api/v1/services/{service_name}/stop- Stop a servicePOST /api/v1/services/{service_name}/restart- Restart a serviceGET /api/v1/services/{service_name}/status- Get service statusGET /api/v1/services- Get all services status
Authentication:
- All endpoints must require admin authentication, which is to be enforced using the
get_admin_user()dependency.
Valid Service Names:
- The valid service names are
ticker,ohlcv, andaccount, as defined in theServiceNameenum.
Implementation Strategy
Let's break down the implementation strategy into manageable steps.
1. Create routers/services.py
This file will contain all the necessary code for the service control endpoints. Here's a template to get you started:
"""
Admin-only service control endpoints.
Provides lifecycle management for Fullon service daemons:
- ticker: Real-time ticker data collection
- ohlcv: Historic + live OHLCV/trade collection
- account: Adaptive account monitoring
All endpoints require admin authentication (user.mail == ADMIN_MAIL).
"""
from fastapi import APIRouter, Depends, HTTPException, Request
from fullon_log import get_component_logger
from fullon_orm.models import User
from ..auth.dependencies import get_admin_user
from ..services.manager import ServiceManager, ServiceName
router = APIRouter(
prefix="/services",
tags=["services"],
)
logger = get_component_logger("fullon.master_api.routers.services")
def get_service_manager(request: Request) -> ServiceManager:
"""
Dependency to get ServiceManager from app state.
ServiceManager is initialized in gateway.py and stored in app.state.
Args:
request: FastAPI request object
Returns:
ServiceManager: Singleton service manager instance
Raises:
RuntimeError: If ServiceManager not initialized
"""
if not hasattr(request.app.state, 'service_manager'):
logger.error("ServiceManager not found in app state")
raise RuntimeError("ServiceManager not initialized")
return request.app.state.service_manager
@router.post("/{service_name}/start")
async def start_service(
service_name: ServiceName,
admin_user: User = Depends(get_admin_user),
manager: ServiceManager = Depends(get_service_manager)
):
"""
Start a service (admin only).
Starts the specified service as an async background task.
Returns error if service is already running.
Args:
service_name: Service to start (ticker, ohlcv, account)
admin_user: Authenticated admin user (from dependency)
manager: ServiceManager instance (from dependency)
Returns:
dict: {"service": str, "status": "started"}
Raises:
HTTPException 400: If service is already running
HTTPException 403: If user is not admin
Example:
POST /api/v1/services/ticker/start
Response: {"service": "ticker", "status": "started"}
"""
try:
result = await manager.start_service(service_name)
logger.info(
"Service started by admin",
service=service_name,
user_id=admin_user.uid,
admin_email=admin_user.mail
)
return result
except ValueError as e:
logger.warning(
"Service start failed",
service=service_name,
error=str(e),
user_id=admin_user.uid
)
raise HTTPException(status_code=400, detail=str(e))
@router.post("/{service_name}/stop")
async def stop_service(
service_name: ServiceName,
admin_user: User = Depends(get_admin_user),
manager: ServiceManager = Depends(get_service_manager)
):
"""
Stop a service (admin only).
Gracefully stops the specified service by calling daemon.stop()
then cancelling the async task. Returns error if service is not running.
Args:
service_name: Service to stop (ticker, ohlcv, account)
admin_user: Authenticated admin user (from dependency)
manager: ServiceManager instance (from dependency)
Returns:
dict: {"service": str, "status": "stopped"}
Raises:
HTTPException 400: If service is not running
HTTPException 403: If user is not admin
Example:
POST /api/v1/services/ticker/stop
Response: {"service": "ticker", "status": "stopped"}
"""
try:
result = await manager.stop_service(service_name)
logger.info(
"Service stopped by admin",
service=service_name,
user_id=admin_user.uid,
admin_email=admin_user.mail
)
return result
except ValueError as e:
logger.warning(
"Service stop failed",
service=service_name,
error=str(e),
user_id=admin_user.uid
)
raise HTTPException(status_code=400, detail=str(e))
@router.post("/{service_name}/restart")
async def restart_service(
service_name: ServiceName,
admin_user: User = Depends(get_admin_user),
manager: ServiceManager = Depends(get_service_manager)
):
"""
Restart a service (admin only).
Stops the service (if running) then starts it after a brief pause.
Always succeeds regardless of initial state.
Args:
service_name: Service to restart (ticker, ohlcv, account)
admin_user: Authenticated admin user (from dependency)
manager: ServiceManager instance (from dependency)
Returns:
dict: {"service": str, "status": "restarted"}
Raises:
HTTPException 403: If user is not admin
Example:
POST /api/v1/services/ticker/restart
Response: {"service": "ticker", "status": "restarted"}
"""
result = await manager.restart_service(service_name)
logger.info(
"Service restarted by admin",
service=service_name,
user_id=admin_user.uid,
admin_email=admin_user.mail
)
return result
@router.get("/{service_name}/status")
async def get_service_status(
service_name: ServiceName,
admin_user: User = Depends(get_admin_user),
manager: ServiceManager = Depends(get_service_manager)
):
"""
Get service status (admin only).
Returns current status of the specified service.
Args:
service_name: Service to check (ticker, ohlcv, account)
admin_user: Authenticated admin user (from dependency)
manager: ServiceManager instance (from dependency)
Returns:
dict: {
"service": str,
"status": "running" | "stopped",
"is_running": bool
}
Raises:
HTTPException 403: If user is not admin
Example:
GET /api/v1/services/ticker/status
Response: {
"service": "ticker",
"status": "running",
"is_running": true
}
"""
return manager.get_service_status(service_name)
@router.get("")
async def get_all_services_status(
admin_user: User = Depends(get_admin_user),
manager: ServiceManager = Depends(get_service_manager)
):
"""
Get status of all services (admin only).
Returns status dict for all managed services.
Args:
admin_user: Authenticated admin user (from dependency)
manager: ServiceManager instance (from dependency)
Returns:
dict: {
"services": {
"ticker": {"service": "ticker", "status": "running", ...},
"ohlcv": {"service": "ohlcv", "status": "stopped", ...},
"account": {"service": "account", "status": "stopped", ...}
}
}
Raises:
HTTPException 403: If user is not admin
Example:
GET /api/v1/services
Response: {
"services": {
"ticker": {"service": "ticker", "status": "running", "is_running": true},
"ohlcv": {"service": "ohlcv", "status": "stopped", "is_running": false},
"account": {"service": "account", "status": "stopped", "is_running": false}
}
}
"""
return manager.get_all_status()
2. Create Integration Tests
To ensure the functionality and security of the endpoints, create tests/integration/test_service_control.py with the following tests:
"""Integration tests for service control endpoints."""
import pytest
from httpx import AsyncClient
from fullon_master_api.gateway import app
@pytest.fixture
async def admin_client():
"""Create async client for admin requests."""
async with AsyncClient(app=app, base_url="http://test") as client:
yield client
@pytest.mark.asyncio
async def test_start_service_admin(admin_client, admin_token):
"""Test admin can start service."""
headers = {"Authorization": f"Bearer {admin_token}"}
response = await admin_client.post(
"/api/v1/services/ticker/start",
headers=headers
)
assert response.status_code in [200, 400] # 200 = started, 400 = already running
if response.status_code == 200:
data = response.json()
assert data["service"] == "ticker"
assert data["status"] == "started"
@pytest.mark.asyncio
async def test_stop_service_admin(admin_client, admin_token):
"""Test admin can stop service."""
headers = {"Authorization": f"Bearer {admin_token}"}
# Start first
await admin_client.post("/api/v1/services/ticker/start", headers=headers)
# Stop
response = await admin_client.post(
"/api/v1/services/ticker/stop",
headers=headers
)
assert response.status_code in [200, 400] # 200 = stopped, 400 = not running
@pytest.mark.asyncio
async def test_restart_service_admin(admin_client, admin_token):
"""Test admin can restart service."""
headers = {"Authorization": f"Bearer {admin_token}"}
response = await admin_client.post(
"/api/v1/services/ticker/restart",
headers=headers
)
assert response.status_code == 200
data = response.json()
assert data["service"] == "ticker"
assert data["status"] == "restarted"
@pytest.mark.asyncio
async def test_get_service_status_admin(admin_client, admin_token):
"""Test admin can get service status."""
headers = {"Authorization": f"Bearer {admin_token}"}
response = await admin_client.get(
"/api/v1/services/ticker/status",
headers=headers
)
assert response.status_code == 200
data = response.json()
assert data["service"] == "ticker"
assert "status" in data
assert "is_running" in data
@pytest.mark.asyncio
async def test_get_all_services_status_admin(admin_client, admin_token):
"""Test admin can get all services status."""
headers = {"Authorization": f"Bearer {admin_token}"}
response = await admin_client.get(
"/api/v1/services",
headers=headers
)
assert response.status_code == 200
data = response.json()
assert "services" in data
assert "ticker" in data["services"]
assert "ohlcv" in data["services"]
assert "account" in data["services"]
@pytest.mark.asyncio
async def test_non_admin_cannot_start_service(admin_client, user_token):
"""Test non-admin user gets 403 Forbidden."""
headers = {"Authorization": f"Bearer {user_token}"}
response = await admin_client.post(
"/api/v1/services/ticker/start",
headers=headers
)
assert response.status_code == 403 # Forbidden
@pytest.mark.asyncio
async def test_unauthenticated_cannot_access(admin_client):
"""Test unauthenticated request gets 401."""
response = await admin_client.post("/api/v1/services/ticker/start")
assert response.status_code == 401 # Unauthorized
Testing Requirements
Integration Tests
Run the integration tests to verify the functionality of the service control endpoints.
# Run service control integration tests
pytest tests/integration/test_service_control.py -v
# Expected tests:
# - test_start_service_admin
# - test_stop_service_admin
# - test_restart_service_admin
# - test_get_service_status_admin
# - test_get_all_services_status_admin
# - test_non_admin_cannot_start_service
# - test_unauthenticated_cannot_access
Manual Testing with Example
You can also manually test the endpoints using the provided example script.
# Start server
make run
# In another terminal, run example
python examples/example_service_control.py --action demo
# Should show:
# - All services status (GET /api/v1/services)
# - Start ticker (POST /api/v1/services/ticker/start)
# - Check ticker status (GET /api/v1/services/ticker/status)
# - Stop ticker (POST /api/v1/services/ticker/stop)
# - Restart ticker (POST /api/v1/services/ticker/restart)
Acceptance Criteria
To ensure the successful completion of this issue, the following criteria must be met:
- [ ]
routers/services.pycreated with service control endpoints - [ ] POST
/services/{service_name}/startendpoint implemented - [ ] POST
/services/{service_name}/stopendpoint implemented - [ ] POST
/services/{service_name}/restartendpoint implemented - [ ] GET
/services/{service_name}/statusendpoint implemented - [ ] GET
/servicesendpoint implemented - [ ] All endpoints require
get_admin_user()dependency - [ ]
get_service_manager()dependency retrieves manager from app.state - [ ]
ServiceNameenum used for path parameter validation - [ ] Error handling: 400 for invalid operations, 403 for non-admin
- [ ] Structured logging for all operations
- [ ] Integration tests pass (
tests/integration/test_service_control.py) - [ ] Example script works (
examples/example_service_control.py)
Git Workflow
Follow these steps to manage your Git workflow:
# Create feature branch
git checkout -b issue-40-service-control-router
# 1. Implement routers/services.py
# 2. Create and run integration tests
pytest tests/integration/test_service_control.py -v
# Test with example (needs gateway integration from Issue #41)
# python examples/example_service_control.py --action demo
# Commit changes
git add src/fullon_master_api/routers/services.py
git add tests/integration/test_service_control.py
git commit -m "Issue #40: Implement service control router (admin-only)
- Add service control endpoints at /api/v1/services/*
- POST start/stop/restart endpoints for service lifecycle
- GET status endpoints for monitoring
- Admin-only access via get_admin_user() dependency
- ServiceManager accessed via app.state dependency
- ServiceName enum for path parameter validation
- Comprehensive integration tests
- Structured logging for all operations
Endpoints:
- POST /api/v1/services/{service}/start
- POST /api/v1/services/{service}/stop
- POST /api/v1/services/{service}/restart
- GET /api/v1/services/{service}/status
- GET /api/v1/services
Enables admin service control in Phase 6.
Generated with Claude Code
Co-Authored-By: Claude <noreply@anthropic.com>"
# Run quality checks
make lint
make test
# Push and create PR
git push -u origin issue-40-service-control-router
gh pr create --title "Issue #40: Service Control Router" \
--body "Admin-only service control endpoints for Phase 6.
Provides lifecycle management for ticker, ohlcv, account services."
References
- Example:
examples/example_service_control.py- Expected endpoint behavior - Masterplan:
masterplan.mdPhase 6 (lines 977-1054) - Router implementation - Dependencies: Issue #38 (get_admin_user), Issue #39 (ServiceManager)
- Pattern: FastAPI router with dependency injection
Blockers
- Required: Issue #38 (get_admin_user) must be completed
- Required: Issue #39 (ServiceManager) must be completed
- Note: Gateway integration (Issue #41) needed for full functionality
Notes
- Admin Only: All endpoints protected by
get_admin_user()dependency - ServiceName Enum: FastAPI validates
service_nameagainst enum automatically - Error Responses: 400 for invalid state, 403 for non-admin, 401 for unauth
- ServiceManager: Accessed via
request.app.state.service_manager - Logging: All operations logged with admin user identification
- Testing: Integration tests use mocked ServiceManager for isolation
- Example Validation: Run
example_service_control.pyto verify behavior
For more information on FastAPI dependencies and routing, check out the official FastAPI documentation.