#!/usr/bin/env python3 """ Nomad MCP Server for Claude Desktop App Provides MCP tools for managing HashiCorp Nomad jobs """ import asyncio import json import logging import sys from typing import Any, Dict, List, Optional from mcp.server import NotificationOptions, Server from mcp.server.models import InitializationOptions import mcp.server.stdio import mcp.types as types from app.services.nomad_client import NomadService from app.schemas.claude_api import ClaudeJobSpecification # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("nomad-mcp") # Create the server instance server = Server("nomad-mcp") @server.list_tools() async def handle_list_tools() -> List[types.Tool]: """List available tools for Nomad management.""" return [ types.Tool( name="list_nomad_jobs", description="List all Nomad jobs in a namespace", inputSchema={ "type": "object", "properties": { "namespace": { "type": "string", "description": "Nomad namespace", "default": "development" } } } ), types.Tool( name="get_job_status", description="Get the status of a specific Nomad job", inputSchema={ "type": "object", "properties": { "job_id": { "type": "string", "description": "ID of the job to check" }, "namespace": { "type": "string", "description": "Nomad namespace", "default": "development" } }, "required": ["job_id"] } ), types.Tool( name="stop_job", description="Stop a running Nomad job", inputSchema={ "type": "object", "properties": { "job_id": { "type": "string", "description": "ID of the job to stop" }, "namespace": { "type": "string", "description": "Nomad namespace", "default": "development" }, "purge": { "type": "boolean", "description": "Whether to purge the job", "default": False } }, "required": ["job_id"] } ), types.Tool( name="restart_job", description="Restart a Nomad job", inputSchema={ "type": "object", "properties": { "job_id": { "type": "string", "description": "ID of the job to restart" }, "namespace": { "type": "string", "description": "Nomad namespace", "default": "development" } }, "required": ["job_id"] } ), types.Tool( name="create_job", description="Create a new Nomad job", inputSchema={ "type": "object", "properties": { "job_id": { "type": "string", "description": "Unique ID for the job" }, "name": { "type": "string", "description": "Display name for the job" }, "type": { "type": "string", "description": "Job type (service, batch, etc.)", "default": "service" }, "datacenters": { "type": "array", "description": "List of datacenters to run the job in", "items": {"type": "string"}, "default": ["jm"] }, "namespace": { "type": "string", "description": "Nomad namespace", "default": "development" }, "docker_image": { "type": "string", "description": "Docker image to run" }, "count": { "type": "integer", "description": "Number of instances to run", "default": 1 }, "cpu": { "type": "integer", "description": "CPU allocation in MHz", "default": 100 }, "memory": { "type": "integer", "description": "Memory allocation in MB", "default": 128 }, "ports": { "type": "array", "description": "Port mappings", "items": {"type": "object"}, "default": [] }, "env_vars": { "type": "object", "description": "Environment variables for the container", "default": {} } }, "required": ["job_id", "name", "docker_image"] } ), types.Tool( name="get_job_logs", description="Get logs for a Nomad job", inputSchema={ "type": "object", "properties": { "job_id": { "type": "string", "description": "ID of the job to get logs for" }, "namespace": { "type": "string", "description": "Nomad namespace", "default": "development" } }, "required": ["job_id"] } ) ] @server.call_tool() async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[types.TextContent]: """Handle tool calls from Claude.""" try: # Create Nomad service instance nomad_service = NomadService() namespace = arguments.get("namespace", "development") nomad_service.namespace = namespace if name == "list_nomad_jobs": jobs = nomad_service.list_jobs() simplified_jobs = [] for job in jobs: simplified_jobs.append({ "id": job.get("ID"), "name": job.get("Name"), "status": job.get("Status"), "type": job.get("Type"), "namespace": namespace }) return [types.TextContent( type="text", text=json.dumps(simplified_jobs, indent=2) )] elif name == "get_job_status": job_id = arguments.get("job_id") if not job_id: return [types.TextContent( type="text", text="Error: job_id is required" )] job = nomad_service.get_job(job_id) allocations = nomad_service.get_allocations(job_id) latest_alloc = None if allocations: sorted_allocations = sorted( allocations, key=lambda a: a.get("CreateTime", 0), reverse=True ) latest_alloc = sorted_allocations[0] result = { "job_id": job_id, "status": job.get("Status", "unknown"), "message": f"Job {job_id} is {job.get('Status', 'unknown')}", "details": { "job": job, "latest_allocation": latest_alloc } } return [types.TextContent( type="text", text=json.dumps(result, indent=2) )] elif name == "stop_job": job_id = arguments.get("job_id") purge = arguments.get("purge", False) if not job_id: return [types.TextContent( type="text", text="Error: job_id is required" )] result = nomad_service.stop_job(job_id, purge=purge) response = { "success": True, "job_id": job_id, "status": "stopped", "message": f"Job {job_id} has been stopped" + (" and purged" if purge else ""), "details": result } return [types.TextContent( type="text", text=json.dumps(response, indent=2) )] elif name == "restart_job": job_id = arguments.get("job_id") if not job_id: return [types.TextContent( type="text", text="Error: job_id is required" )] # Get current job spec job_spec = nomad_service.get_job(job_id) # Stop and restart nomad_service.stop_job(job_id) result = nomad_service.start_job(job_spec) response = { "success": True, "job_id": job_id, "status": "restarted", "message": f"Job {job_id} has been restarted", "details": result } return [types.TextContent( type="text", text=json.dumps(response, indent=2) )] elif name == "create_job": # Validate required arguments required_args = ["job_id", "name", "docker_image"] for arg in required_args: if not arguments.get(arg): return [types.TextContent( type="text", text=f"Error: {arg} is required" )] # Create job specification job_spec = ClaudeJobSpecification(**arguments) # Set namespace if job_spec.namespace: nomad_service.namespace = job_spec.namespace # Convert to Nomad format and start nomad_job_spec = job_spec.to_nomad_job_spec() result = nomad_service.start_job(nomad_job_spec) response = { "success": True, "job_id": job_spec.job_id, "status": "started", "message": f"Job {job_spec.job_id} has been created and started", "details": result } return [types.TextContent( type="text", text=json.dumps(response, indent=2) )] elif name == "get_job_logs": job_id = arguments.get("job_id") if not job_id: return [types.TextContent( type="text", text="Error: job_id is required" )] # Get allocations allocations = nomad_service.get_allocations(job_id) if not allocations: result = { "success": False, "job_id": job_id, "message": f"No allocations found for job {job_id}", "logs": None } else: # Get latest allocation sorted_allocations = sorted( allocations, key=lambda a: a.get("CreateTime", 0), reverse=True ) latest_alloc = sorted_allocations[0] alloc_id = latest_alloc.get("ID") # Get task name task_name = None if "TaskStates" in latest_alloc: task_states = latest_alloc["TaskStates"] if task_states: task_name = next(iter(task_states.keys())) if not task_name: task_name = "app" # Default task name # Get logs stdout_logs = nomad_service.get_allocation_logs(alloc_id, task_name, "stdout") stderr_logs = nomad_service.get_allocation_logs(alloc_id, task_name, "stderr") result = { "success": True, "job_id": job_id, "allocation_id": alloc_id, "task_name": task_name, "message": f"Retrieved logs for job {job_id}", "logs": { "stdout": stdout_logs, "stderr": stderr_logs } } return [types.TextContent( type="text", text=json.dumps(result, indent=2) )] else: return [types.TextContent( type="text", text=f"Error: Unknown tool '{name}'" )] except Exception as e: logger.error(f"Error in tool '{name}': {str(e)}") return [types.TextContent( type="text", text=f"Error: {str(e)}" )] async def main(): """Main entry point for the MCP server.""" # Run the server using stdio transport async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, InitializationOptions( server_name="nomad-mcp", server_version="1.0.0", capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ), ) if __name__ == "__main__": asyncio.run(main())