Files
nomad_mcp/app/routers/claude.py
Nicolas Koehl 403fa50b4f Add Claude Code integration with SSE support
- Add Server-Sent Events (SSE) endpoint for Claude Code MCP integration
- Create MCP configuration for Claude Code CLI
- Update tool configuration to support modern OpenAPI format
- Add documentation for Claude Code integration options
- Create CLAUDE.md guide for AI coding agents

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-03-29 12:52:21 +07:00

460 lines
20 KiB
Python

from fastapi import APIRouter, HTTPException, Body, Query, Depends, Request
from fastapi.responses import StreamingResponse
from typing import Dict, Any, List, Optional, AsyncGenerator
import logging
import json
import asyncio
from app.services.nomad_client import NomadService
from app.schemas.claude_api import ClaudeJobRequest, ClaudeJobSpecification, ClaudeJobResponse
router = APIRouter()
logger = logging.getLogger(__name__)
@router.post("/jobs", response_model=ClaudeJobResponse)
async def manage_job(request: ClaudeJobRequest):
"""
Endpoint for Claude to manage Nomad jobs with a simplified interface.
This endpoint handles job operations like start, stop, restart, and status checks.
"""
try:
# Create a Nomad service instance with the specified namespace
nomad_service = NomadService()
if request.namespace:
nomad_service.namespace = request.namespace
# Handle different actions
if request.action.lower() == "status":
# Get job status
job = nomad_service.get_job(request.job_id)
# Get allocations for more detailed status
allocations = nomad_service.get_allocations(request.job_id)
latest_alloc = None
if allocations:
# Sort allocations by creation time (descending)
sorted_allocations = sorted(
allocations,
key=lambda a: a.get("CreateTime", 0),
reverse=True
)
latest_alloc = sorted_allocations[0]
return ClaudeJobResponse(
success=True,
job_id=request.job_id,
status=job.get("Status", "unknown"),
message=f"Job {request.job_id} is {job.get('Status', 'unknown')}",
details={
"job": job,
"latest_allocation": latest_alloc
}
)
elif request.action.lower() == "stop":
# Stop the job
result = nomad_service.stop_job(request.job_id, purge=request.purge)
return ClaudeJobResponse(
success=True,
job_id=request.job_id,
status="stopped",
message=f"Job {request.job_id} has been stopped" + (" and purged" if request.purge else ""),
details=result
)
elif request.action.lower() == "restart":
# Get the current job specification
job_spec = nomad_service.get_job(request.job_id)
# Stop the job
nomad_service.stop_job(request.job_id)
# Start the job with the original specification
result = nomad_service.start_job(job_spec)
return ClaudeJobResponse(
success=True,
job_id=request.job_id,
status="restarted",
message=f"Job {request.job_id} has been restarted",
details=result
)
else:
# Unknown action
raise HTTPException(status_code=400, detail=f"Unknown action: {request.action}")
except Exception as e:
logger.error(f"Error managing job {request.job_id}: {str(e)}")
return ClaudeJobResponse(
success=False,
job_id=request.job_id,
status="error",
message=f"Error: {str(e)}",
details=None
)
@router.post("/create-job", response_model=ClaudeJobResponse)
async def create_job(job_spec: ClaudeJobSpecification):
"""
Endpoint for Claude to create a new Nomad job with a simplified interface.
This endpoint allows creating a job with minimal configuration.
"""
try:
# Create a Nomad service instance with the specified namespace
nomad_service = NomadService()
if job_spec.namespace:
nomad_service.namespace = job_spec.namespace
# Convert the simplified job spec to Nomad format
nomad_job_spec = job_spec.to_nomad_job_spec()
# Start the job
result = nomad_service.start_job(nomad_job_spec)
return ClaudeJobResponse(
success=True,
job_id=job_spec.job_id,
status="started",
message=f"Job {job_spec.job_id} has been created and started",
details=result
)
except Exception as e:
logger.error(f"Error creating job {job_spec.job_id}: {str(e)}")
return ClaudeJobResponse(
success=False,
job_id=job_spec.job_id,
status="error",
message=f"Error: {str(e)}",
details=None
)
@router.get("/list-jobs", response_model=List[Dict[str, Any]])
async def list_jobs(namespace: str = Query("development")):
"""
List all jobs in the specified namespace.
Returns a simplified list of jobs with their IDs and statuses.
"""
try:
# Create a Nomad service instance with the specified namespace
nomad_service = NomadService()
nomad_service.namespace = namespace
# Get all jobs
jobs = nomad_service.list_jobs()
# Return a simplified list
simplified_jobs = []
for job in jobs:
simplified_jobs.append({
"id": job.get("ID"),
"name": job.get("Name"),
"status": job.get("Status"),
"type": job.get("Type"),
"namespace": namespace
})
return simplified_jobs
except Exception as e:
logger.error(f"Error listing jobs: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error listing jobs: {str(e)}")
@router.get("/job-logs/{job_id}", response_model=Dict[str, Any])
async def get_job_logs(job_id: str, namespace: str = Query("development")):
"""
Get logs for a job.
Returns logs from the latest allocation of the job.
"""
try:
# Create a Nomad service instance with the specified namespace
nomad_service = NomadService()
nomad_service.namespace = namespace
# Get allocations for the job
allocations = nomad_service.get_allocations(job_id)
if not allocations:
return {
"success": False,
"job_id": job_id,
"message": f"No allocations found for job {job_id}",
"logs": None
}
# Sort allocations by creation time (descending)
sorted_allocations = sorted(
allocations,
key=lambda a: a.get("CreateTime", 0),
reverse=True
)
latest_alloc = sorted_allocations[0]
alloc_id = latest_alloc.get("ID")
# Get the task name from the allocation
task_name = None
if "TaskStates" in latest_alloc:
task_states = latest_alloc["TaskStates"]
if task_states:
task_name = next(iter(task_states.keys()))
if not task_name:
task_name = "app" # Default task name
# Get logs for the allocation
stdout_logs = nomad_service.get_allocation_logs(alloc_id, task_name, "stdout")
stderr_logs = nomad_service.get_allocation_logs(alloc_id, task_name, "stderr")
return {
"success": True,
"job_id": job_id,
"allocation_id": alloc_id,
"task_name": task_name,
"message": f"Retrieved logs for job {job_id}",
"logs": {
"stdout": stdout_logs,
"stderr": stderr_logs
}
}
except Exception as e:
logger.error(f"Error getting logs for job {job_id}: {str(e)}")
return {
"success": False,
"job_id": job_id,
"message": f"Error getting logs: {str(e)}",
"logs": None
}
@router.post("/mcp/stream")
async def mcp_stream(request: Request):
"""
MCP streaming endpoint for Claude Code using Server-Sent Events (SSE).
This endpoint allows Claude Code to connect using the SSE transport mode.
It receives requests from Claude Code and streams responses back.
"""
async def event_generator() -> AsyncGenerator[str, None]:
# Read request data
request_data = await request.json()
logger.info(f"Received MCP request: {request_data}")
# Extract the request type and content
request_type = request_data.get("type", "unknown")
request_content = request_data.get("content", {})
request_id = request_data.get("id", "unknown")
# Send initial acknowledgment
yield f"data: {json.dumps({'id': request_id, 'type': 'ack', 'content': 'Processing request'})}\n\n"
try:
# Process different request types
if request_type == "nomad_list_jobs":
# Handle list jobs request
namespace = request_content.get("namespace", "development")
# Create Nomad service and get jobs
nomad_service = NomadService()
nomad_service.namespace = namespace
jobs = nomad_service.list_jobs()
# Format job information
simplified_jobs = []
for job in jobs:
simplified_jobs.append({
"id": job.get("ID"),
"name": job.get("Name"),
"status": job.get("Status"),
"type": job.get("Type"),
"namespace": namespace
})
# Send result
yield f"data: {json.dumps({'id': request_id, 'type': 'result', 'content': simplified_jobs})}\n\n"
elif request_type == "nomad_job_status":
# Handle job status request
job_id = request_content.get("job_id")
namespace = request_content.get("namespace", "development")
if not job_id:
yield f"data: {json.dumps({'id': request_id, 'type': 'error', 'content': 'Job ID is required'})}\n\n"
else:
# Create Nomad service and get job status
nomad_service = NomadService()
nomad_service.namespace = namespace
job = nomad_service.get_job(job_id)
# Get latest allocation
allocations = nomad_service.get_allocations(job_id)
latest_alloc = None
if allocations:
sorted_allocations = sorted(
allocations,
key=lambda a: a.get("CreateTime", 0),
reverse=True
)
latest_alloc = sorted_allocations[0]
# Send result
yield f"data: {json.dumps({'id': request_id, 'type': 'result', 'content': {
'job_id': job_id,
'status': job.get('Status', 'unknown'),
'message': f'Job {job_id} is {job.get('Status', 'unknown')}',
'details': {
'job': job,
'latest_allocation': latest_alloc
}
}})}\n\n"
elif request_type == "nomad_job_action":
# Handle job action request (stop, restart)
job_id = request_content.get("job_id")
action = request_content.get("action")
namespace = request_content.get("namespace", "development")
purge = request_content.get("purge", False)
if not job_id or not action:
yield f"data: {json.dumps({'id': request_id, 'type': 'error', 'content': 'Job ID and action are required'})}\n\n"
else:
# Create Nomad service
nomad_service = NomadService()
nomad_service.namespace = namespace
if action.lower() == "stop":
# Stop the job
result = nomad_service.stop_job(job_id, purge=purge)
yield f"data: {json.dumps({'id': request_id, 'type': 'result', 'content': {
'job_id': job_id,
'status': 'stopped',
'message': f'Job {job_id} has been stopped' + (' and purged' if purge else ''),
'details': result
}})}\n\n"
elif action.lower() == "restart":
# Restart the job
job_spec = nomad_service.get_job(job_id)
nomad_service.stop_job(job_id)
result = nomad_service.start_job(job_spec)
yield f"data: {json.dumps({'id': request_id, 'type': 'result', 'content': {
'job_id': job_id,
'status': 'restarted',
'message': f'Job {job_id} has been restarted',
'details': result
}})}\n\n"
else:
yield f"data: {json.dumps({'id': request_id, 'type': 'error', 'content': f'Unknown action: {action}'})}\n\n"
elif request_type == "nomad_create_job":
# Handle create job request
try:
# Convert request content to job spec
job_spec = ClaudeJobSpecification(**request_content)
# Create Nomad service
nomad_service = NomadService()
if job_spec.namespace:
nomad_service.namespace = job_spec.namespace
# Convert to Nomad format and start job
nomad_job_spec = job_spec.to_nomad_job_spec()
result = nomad_service.start_job(nomad_job_spec)
yield f"data: {json.dumps({'id': request_id, 'type': 'result', 'content': {
'job_id': job_spec.job_id,
'status': 'started',
'message': f'Job {job_spec.job_id} has been created and started',
'details': result
}})}\n\n"
except Exception as e:
logger.error(f"Error creating job: {str(e)}")
yield f"data: {json.dumps({'id': request_id, 'type': 'error', 'content': f'Error creating job: {str(e)}'})}\n\n"
elif request_type == "nomad_job_logs":
# Handle job logs request
job_id = request_content.get("job_id")
namespace = request_content.get("namespace", "development")
if not job_id:
yield f"data: {json.dumps({'id': request_id, 'type': 'error', 'content': 'Job ID is required'})}\n\n"
else:
try:
# Create Nomad service
nomad_service = NomadService()
nomad_service.namespace = namespace
# Get job allocations
allocations = nomad_service.get_allocations(job_id)
if not allocations:
yield f"data: {json.dumps({'id': request_id, 'type': 'result', 'content': {
'success': False,
'job_id': job_id,
'message': f'No allocations found for job {job_id}',
'logs': None
}})}\n\n"
else:
# Get latest allocation and logs
sorted_allocations = sorted(
allocations,
key=lambda a: a.get("CreateTime", 0),
reverse=True
)
latest_alloc = sorted_allocations[0]
alloc_id = latest_alloc.get("ID")
# Get task name
task_name = None
if "TaskStates" in latest_alloc:
task_states = latest_alloc["TaskStates"]
if task_states:
task_name = next(iter(task_states.keys()))
if not task_name:
task_name = "app" # Default task name
# Get logs
stdout_logs = nomad_service.get_allocation_logs(alloc_id, task_name, "stdout")
stderr_logs = nomad_service.get_allocation_logs(alloc_id, task_name, "stderr")
yield f"data: {json.dumps({'id': request_id, 'type': 'result', 'content': {
'success': True,
'job_id': job_id,
'allocation_id': alloc_id,
'task_name': task_name,
'message': f'Retrieved logs for job {job_id}',
'logs': {
'stdout': stdout_logs,
'stderr': stderr_logs
}
}})}\n\n"
except Exception as e:
logger.error(f"Error getting logs for job {job_id}: {str(e)}")
yield f"data: {json.dumps({'id': request_id, 'type': 'error', 'content': f'Error getting logs: {str(e)}'})}\n\n"
else:
# Unknown request type
yield f"data: {json.dumps({'id': request_id, 'type': 'error', 'content': f'Unknown request type: {request_type}'})}\n\n"
# Send completion message
yield f"data: {json.dumps({'id': request_id, 'type': 'done'})}\n\n"
except Exception as e:
logger.error(f"Error processing MCP request: {str(e)}")
yield f"data: {json.dumps({'id': request_id, 'type': 'error', 'content': f'Internal server error: {str(e)}'})}\n\n"
yield f"data: {json.dumps({'id': request_id, 'type': 'done'})}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Needed for Nginx
}
)