Implement FastAPI MCP zero-config integration

- Add fastapi_mcp to provide automatic MCP tooling from API endpoints
- Create MCP request/response schema models
- Update main.py to initialize FastAPI MCP with zero config
- Add comprehensive MCP integration documentation
- Update README with zero-config MCP integration information

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-04-15 11:50:55 +07:00
parent 403fa50b4f
commit ba9201dfa6
5 changed files with 250 additions and 87 deletions

View File

@ -4,10 +4,12 @@ from fastapi.staticfiles import StaticFiles
import os
import logging
from dotenv import load_dotenv
from fastapi_mcp import FastApiMCP
from app.routers import jobs, logs, configs, repositories, claude
from app.services.nomad_client import get_nomad_client
from app.services.gitea_client import GiteaClient
from app.schemas.claude_api import McpRequest, McpResponse
# Load environment variables
load_dotenv()
@ -42,6 +44,17 @@ app.include_router(configs.router, prefix="/api/configs", tags=["configs"])
app.include_router(repositories.router, prefix="/api/repositories", tags=["repositories"])
app.include_router(claude.router, prefix="/api/claude", tags=["claude"])
# Initialize the FastAPI MCP
base_url = os.getenv("BASE_URL", "http://localhost:8000")
mcp = FastApiMCP(
app,
base_url=base_url,
name="Nomad MCP Tools",
description="Tools for managing Nomad jobs via MCP protocol",
include_tags=["jobs", "logs", "configs", "repositories"],
)
mcp.mount()
@app.get("/api/health", tags=["health"])
async def health_check():
"""Health check endpoint."""

View File

@ -1,78 +1,92 @@
from pydantic import BaseModel, Field
from typing import Dict, Any, List, Optional, Union
class ClaudeJobRequest(BaseModel):
"""Request model for Claude to start or manage a job"""
job_id: str = Field(..., description="The ID of the job to manage")
action: str = Field(..., description="Action to perform: start, stop, restart, status")
namespace: Optional[str] = Field("development", description="Nomad namespace")
purge: Optional[bool] = Field(False, description="Whether to purge the job when stopping")
class ClaudeJobSpecification(BaseModel):
"""Simplified job specification for Claude to create a new job"""
job_id: str = Field(..., description="The ID for the new job")
name: Optional[str] = Field(None, description="Name of the job (defaults to job_id)")
type: str = Field("service", description="Job type: service, batch, or system")
datacenters: List[str] = Field(["jm"], description="List of datacenters")
namespace: str = Field("development", description="Nomad namespace")
docker_image: str = Field(..., description="Docker image to run")
count: int = Field(1, description="Number of instances to run")
cpu: int = Field(100, description="CPU resources in MHz")
memory: int = Field(128, description="Memory in MB")
ports: Optional[List[Dict[str, Any]]] = Field(None, description="Port mappings")
env_vars: Optional[Dict[str, str]] = Field(None, description="Environment variables")
def to_nomad_job_spec(self) -> Dict[str, Any]:
"""Convert to Nomad job specification format"""
# Create a task with the specified Docker image
task = {
"Name": "app",
"Driver": "docker",
"Config": {
"image": self.docker_image,
},
"Resources": {
"CPU": self.cpu,
"MemoryMB": self.memory
}
}
# Add environment variables if specified
if self.env_vars:
task["Env"] = self.env_vars
# Create network configuration
network = {}
if self.ports:
network["DynamicPorts"] = self.ports
task["Config"]["ports"] = [port["Label"] for port in self.ports]
# Create the full job specification
job_spec = {
"ID": self.job_id,
"Name": self.name or self.job_id,
"Type": self.type,
"Datacenters": self.datacenters,
"Namespace": self.namespace,
"TaskGroups": [
{
"Name": "app",
"Count": self.count,
"Tasks": [task],
"Networks": [network] if network else []
}
]
}
return job_spec
class ClaudeJobResponse(BaseModel):
"""Response model for Claude job operations"""
success: bool = Field(..., description="Whether the operation was successful")
job_id: str = Field(..., description="The ID of the job")
status: str = Field(..., description="Current status of the job")
message: str = Field(..., description="Human-readable message about the operation")
details: Optional[Dict[str, Any]] = Field(None, description="Additional details about the job")
from pydantic import BaseModel, Field
from typing import Dict, Any, List, Optional, Union
class ClaudeJobRequest(BaseModel):
"""Request model for Claude to start or manage a job"""
job_id: str = Field(..., description="The ID of the job to manage")
action: str = Field(..., description="Action to perform: start, stop, restart, status")
namespace: Optional[str] = Field("development", description="Nomad namespace")
purge: Optional[bool] = Field(False, description="Whether to purge the job when stopping")
class ClaudeJobSpecification(BaseModel):
"""Simplified job specification for Claude to create a new job"""
job_id: str = Field(..., description="The ID for the new job")
name: Optional[str] = Field(None, description="Name of the job (defaults to job_id)")
type: str = Field("service", description="Job type: service, batch, or system")
datacenters: List[str] = Field(["jm"], description="List of datacenters")
namespace: str = Field("development", description="Nomad namespace")
docker_image: str = Field(..., description="Docker image to run")
count: int = Field(1, description="Number of instances to run")
cpu: int = Field(100, description="CPU resources in MHz")
memory: int = Field(128, description="Memory in MB")
ports: Optional[List[Dict[str, Any]]] = Field(None, description="Port mappings")
env_vars: Optional[Dict[str, str]] = Field(None, description="Environment variables")
def to_nomad_job_spec(self) -> Dict[str, Any]:
"""Convert to Nomad job specification format"""
# Create a task with the specified Docker image
task = {
"Name": "app",
"Driver": "docker",
"Config": {
"image": self.docker_image,
},
"Resources": {
"CPU": self.cpu,
"MemoryMB": self.memory
}
}
# Add environment variables if specified
if self.env_vars:
task["Env"] = self.env_vars
# Create network configuration
network = {}
if self.ports:
network["DynamicPorts"] = self.ports
task["Config"]["ports"] = [port["Label"] for port in self.ports]
# Create the full job specification
job_spec = {
"ID": self.job_id,
"Name": self.name or self.job_id,
"Type": self.type,
"Datacenters": self.datacenters,
"Namespace": self.namespace,
"TaskGroups": [
{
"Name": "app",
"Count": self.count,
"Tasks": [task],
"Networks": [network] if network else []
}
]
}
return job_spec
class ClaudeJobResponse(BaseModel):
"""Response model for Claude job operations"""
success: bool = Field(..., description="Whether the operation was successful")
job_id: str = Field(..., description="The ID of the job")
status: str = Field(..., description="Current status of the job")
message: str = Field(..., description="Human-readable message about the operation")
details: Optional[Dict[str, Any]] = Field(None, description="Additional details about the job")
class McpRequest(BaseModel):
"""Model for MCP protocol requests"""
id: str = Field(..., description="Unique identifier for the request")
type: str = Field(..., description="Type of request")
content: Dict[str, Any] = Field(..., description="Request content")
class McpResponse(BaseModel):
"""Model for MCP protocol responses"""
id: str = Field(..., description="Unique identifier matching the request")
type: str = Field(..., description="Type of response: ack, result, error, done")
content: Optional[Dict[str, Any]] = Field(None, description="Response content")