from fastapi import APIRouter, HTTPException, Body, Query, Depends, Request from fastapi.responses import StreamingResponse from typing import Dict, Any, List, Optional, AsyncGenerator import logging import json import asyncio from app.services.nomad_client import NomadService from app.schemas.claude_api import ClaudeJobRequest, ClaudeJobSpecification, ClaudeJobResponse router = APIRouter() logger = logging.getLogger(__name__) @router.post("/jobs", response_model=ClaudeJobResponse) async def manage_job(request: ClaudeJobRequest): """ Endpoint for Claude to manage Nomad jobs with a simplified interface. This endpoint handles job operations like start, stop, restart, and status checks. """ try: # Create a Nomad service instance with the specified namespace nomad_service = NomadService() if request.namespace: nomad_service.namespace = request.namespace # Handle different actions if request.action.lower() == "status": # Get job status job = nomad_service.get_job(request.job_id) # Get allocations for more detailed status allocations = nomad_service.get_allocations(request.job_id) latest_alloc = None if allocations: # Sort allocations by creation time (descending) sorted_allocations = sorted( allocations, key=lambda a: a.get("CreateTime", 0), reverse=True ) latest_alloc = sorted_allocations[0] return ClaudeJobResponse( success=True, job_id=request.job_id, status=job.get("Status", "unknown"), message=f"Job {request.job_id} is {job.get('Status', 'unknown')}", details={ "job": job, "latest_allocation": latest_alloc } ) elif request.action.lower() == "stop": # Stop the job result = nomad_service.stop_job(request.job_id, purge=request.purge) return ClaudeJobResponse( success=True, job_id=request.job_id, status="stopped", message=f"Job {request.job_id} has been stopped" + (" and purged" if request.purge else ""), details=result ) elif request.action.lower() == "restart": # Get the current job specification job_spec = nomad_service.get_job(request.job_id) # Stop the job nomad_service.stop_job(request.job_id) # Start the job with the original specification result = nomad_service.start_job(job_spec) return ClaudeJobResponse( success=True, job_id=request.job_id, status="restarted", message=f"Job {request.job_id} has been restarted", details=result ) else: # Unknown action raise HTTPException(status_code=400, detail=f"Unknown action: {request.action}") except Exception as e: logger.error(f"Error managing job {request.job_id}: {str(e)}") return ClaudeJobResponse( success=False, job_id=request.job_id, status="error", message=f"Error: {str(e)}", details=None ) @router.post("/create-job", response_model=ClaudeJobResponse) async def create_job(job_spec: ClaudeJobSpecification): """ Endpoint for Claude to create a new Nomad job with a simplified interface. This endpoint allows creating a job with minimal configuration. """ try: # Create a Nomad service instance with the specified namespace nomad_service = NomadService() if job_spec.namespace: nomad_service.namespace = job_spec.namespace # Convert the simplified job spec to Nomad format nomad_job_spec = job_spec.to_nomad_job_spec() # Start the job result = nomad_service.start_job(nomad_job_spec) return ClaudeJobResponse( success=True, job_id=job_spec.job_id, status="started", message=f"Job {job_spec.job_id} has been created and started", details=result ) except Exception as e: logger.error(f"Error creating job {job_spec.job_id}: {str(e)}") return ClaudeJobResponse( success=False, job_id=job_spec.job_id, status="error", message=f"Error: {str(e)}", details=None ) @router.get("/list-jobs", response_model=List[Dict[str, Any]]) async def list_jobs(namespace: str = Query("development")): """ List all jobs in the specified namespace. Returns a simplified list of jobs with their IDs and statuses. """ try: # Create a Nomad service instance with the specified namespace nomad_service = NomadService() nomad_service.namespace = namespace # Get all jobs jobs = nomad_service.list_jobs() # Return a simplified list simplified_jobs = [] for job in jobs: simplified_jobs.append({ "id": job.get("ID"), "name": job.get("Name"), "status": job.get("Status"), "type": job.get("Type"), "namespace": namespace }) return simplified_jobs except Exception as e: logger.error(f"Error listing jobs: {str(e)}") raise HTTPException(status_code=500, detail=f"Error listing jobs: {str(e)}") @router.get("/job-logs/{job_id}", response_model=Dict[str, Any]) async def get_job_logs(job_id: str, namespace: str = Query("development")): """ Get logs for a job. Returns logs from the latest allocation of the job. """ try: # Create a Nomad service instance with the specified namespace nomad_service = NomadService() nomad_service.namespace = namespace # Get allocations for the job allocations = nomad_service.get_allocations(job_id) if not allocations: return { "success": False, "job_id": job_id, "message": f"No allocations found for job {job_id}", "logs": None } # Sort allocations by creation time (descending) sorted_allocations = sorted( allocations, key=lambda a: a.get("CreateTime", 0), reverse=True ) latest_alloc = sorted_allocations[0] alloc_id = latest_alloc.get("ID") # Get the task name from the allocation task_name = None if "TaskStates" in latest_alloc: task_states = latest_alloc["TaskStates"] if task_states: task_name = next(iter(task_states.keys())) if not task_name: task_name = "app" # Default task name # Get logs for the allocation stdout_logs = nomad_service.get_allocation_logs(alloc_id, task_name, "stdout") stderr_logs = nomad_service.get_allocation_logs(alloc_id, task_name, "stderr") return { "success": True, "job_id": job_id, "allocation_id": alloc_id, "task_name": task_name, "message": f"Retrieved logs for job {job_id}", "logs": { "stdout": stdout_logs, "stderr": stderr_logs } } except Exception as e: logger.error(f"Error getting logs for job {job_id}: {str(e)}") return { "success": False, "job_id": job_id, "message": f"Error getting logs: {str(e)}", "logs": None } @router.post("/mcp/stream") async def mcp_stream(request: Request): """ MCP streaming endpoint for Claude Code using Server-Sent Events (SSE). This endpoint allows Claude Code to connect using the SSE transport mode. It receives requests from Claude Code and streams responses back. """ async def event_generator() -> AsyncGenerator[str, None]: # Read request data request_data = await request.json() logger.info(f"Received MCP request: {request_data}") # Extract the request type and content request_type = request_data.get("type", "unknown") request_content = request_data.get("content", {}) request_id = request_data.get("id", "unknown") # Send initial acknowledgment yield f"data: {json.dumps({'id': request_id, 'type': 'ack', 'content': 'Processing request'})}\n\n" try: # Process different request types if request_type == "nomad_list_jobs": # Handle list jobs request namespace = request_content.get("namespace", "development") # Create Nomad service and get jobs nomad_service = NomadService() nomad_service.namespace = namespace jobs = nomad_service.list_jobs() # Format job information simplified_jobs = [] for job in jobs: simplified_jobs.append({ "id": job.get("ID"), "name": job.get("Name"), "status": job.get("Status"), "type": job.get("Type"), "namespace": namespace }) # Send result yield f"data: {json.dumps({'id': request_id, 'type': 'result', 'content': simplified_jobs})}\n\n" elif request_type == "nomad_job_status": # Handle job status request job_id = request_content.get("job_id") namespace = request_content.get("namespace", "development") if not job_id: yield f"data: {json.dumps({'id': request_id, 'type': 'error', 'content': 'Job ID is required'})}\n\n" else: # Create Nomad service and get job status nomad_service = NomadService() nomad_service.namespace = namespace job = nomad_service.get_job(job_id) # Get latest allocation allocations = nomad_service.get_allocations(job_id) latest_alloc = None if allocations: sorted_allocations = sorted( allocations, key=lambda a: a.get("CreateTime", 0), reverse=True ) latest_alloc = sorted_allocations[0] # Send result yield f"data: {json.dumps({'id': request_id, 'type': 'result', 'content': { 'job_id': job_id, 'status': job.get('Status', 'unknown'), 'message': f'Job {job_id} is {job.get('Status', 'unknown')}', 'details': { 'job': job, 'latest_allocation': latest_alloc } }})}\n\n" elif request_type == "nomad_job_action": # Handle job action request (stop, restart) job_id = request_content.get("job_id") action = request_content.get("action") namespace = request_content.get("namespace", "development") purge = request_content.get("purge", False) if not job_id or not action: yield f"data: {json.dumps({'id': request_id, 'type': 'error', 'content': 'Job ID and action are required'})}\n\n" else: # Create Nomad service nomad_service = NomadService() nomad_service.namespace = namespace if action.lower() == "stop": # Stop the job result = nomad_service.stop_job(job_id, purge=purge) yield f"data: {json.dumps({'id': request_id, 'type': 'result', 'content': { 'job_id': job_id, 'status': 'stopped', 'message': f'Job {job_id} has been stopped' + (' and purged' if purge else ''), 'details': result }})}\n\n" elif action.lower() == "restart": # Restart the job job_spec = nomad_service.get_job(job_id) nomad_service.stop_job(job_id) result = nomad_service.start_job(job_spec) yield f"data: {json.dumps({'id': request_id, 'type': 'result', 'content': { 'job_id': job_id, 'status': 'restarted', 'message': f'Job {job_id} has been restarted', 'details': result }})}\n\n" else: yield f"data: {json.dumps({'id': request_id, 'type': 'error', 'content': f'Unknown action: {action}'})}\n\n" elif request_type == "nomad_create_job": # Handle create job request try: # Convert request content to job spec job_spec = ClaudeJobSpecification(**request_content) # Create Nomad service nomad_service = NomadService() if job_spec.namespace: nomad_service.namespace = job_spec.namespace # Convert to Nomad format and start job nomad_job_spec = job_spec.to_nomad_job_spec() result = nomad_service.start_job(nomad_job_spec) yield f"data: {json.dumps({'id': request_id, 'type': 'result', 'content': { 'job_id': job_spec.job_id, 'status': 'started', 'message': f'Job {job_spec.job_id} has been created and started', 'details': result }})}\n\n" except Exception as e: logger.error(f"Error creating job: {str(e)}") yield f"data: {json.dumps({'id': request_id, 'type': 'error', 'content': f'Error creating job: {str(e)}'})}\n\n" elif request_type == "nomad_job_logs": # Handle job logs request job_id = request_content.get("job_id") namespace = request_content.get("namespace", "development") if not job_id: yield f"data: {json.dumps({'id': request_id, 'type': 'error', 'content': 'Job ID is required'})}\n\n" else: try: # Create Nomad service nomad_service = NomadService() nomad_service.namespace = namespace # Get job allocations allocations = nomad_service.get_allocations(job_id) if not allocations: yield f"data: {json.dumps({'id': request_id, 'type': 'result', 'content': { 'success': False, 'job_id': job_id, 'message': f'No allocations found for job {job_id}', 'logs': None }})}\n\n" else: # Get latest allocation and logs sorted_allocations = sorted( allocations, key=lambda a: a.get("CreateTime", 0), reverse=True ) latest_alloc = sorted_allocations[0] alloc_id = latest_alloc.get("ID") # Get task name task_name = None if "TaskStates" in latest_alloc: task_states = latest_alloc["TaskStates"] if task_states: task_name = next(iter(task_states.keys())) if not task_name: task_name = "app" # Default task name # Get logs stdout_logs = nomad_service.get_allocation_logs(alloc_id, task_name, "stdout") stderr_logs = nomad_service.get_allocation_logs(alloc_id, task_name, "stderr") yield f"data: {json.dumps({'id': request_id, 'type': 'result', 'content': { 'success': True, 'job_id': job_id, 'allocation_id': alloc_id, 'task_name': task_name, 'message': f'Retrieved logs for job {job_id}', 'logs': { 'stdout': stdout_logs, 'stderr': stderr_logs } }})}\n\n" except Exception as e: logger.error(f"Error getting logs for job {job_id}: {str(e)}") yield f"data: {json.dumps({'id': request_id, 'type': 'error', 'content': f'Error getting logs: {str(e)}'})}\n\n" else: # Unknown request type yield f"data: {json.dumps({'id': request_id, 'type': 'error', 'content': f'Unknown request type: {request_type}'})}\n\n" # Send completion message yield f"data: {json.dumps({'id': request_id, 'type': 'done'})}\n\n" except Exception as e: logger.error(f"Error processing MCP request: {str(e)}") yield f"data: {json.dumps({'id': request_id, 'type': 'error', 'content': f'Internal server error: {str(e)}'})}\n\n" yield f"data: {json.dumps({'id': request_id, 'type': 'done'})}\n\n" return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", # Needed for Nginx } )