diff --git a/CLAUDE.md b/CLAUDE.md index 8023fc2..07cc18b 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -26,6 +26,42 @@ Nomad MCP is a service that enables management of HashiCorp Nomad jobs via REST Always maintain backward compatibility with existing API endpoints. Follow REST principles. +## Enhanced Log Analysis Features + +The logs API has been enhanced with advanced filtering and analysis capabilities: + +### REST API Endpoints: +- `/api/logs/job/{job_id}` - Enhanced with time, level, and search filtering +- `/api/logs/errors/{job_id}` - Get only error/warning logs +- `/api/logs/search/{job_id}` - Search logs for specific terms +- `/api/logs/repository/{repository}` - Get logs by repository name + +### New Query Parameters: +- `start_time` & `end_time` - Filter by time range (HH:MM format) +- `log_level` - Filter by levels (ERROR, WARNING, INFO, etc.) +- `search` - Search for specific terms +- `lines` - Limit number of lines returned +- `formatted` - Proper line breaks (default: true) + +### MCP Tools Available: +- `get_job_logs` - Enhanced with all filtering options +- `get_error_logs` - Convenience tool for troubleshooting +- `search_job_logs` - Search logs for patterns + +### Example Usage: +```bash +# Get errors between 8pm-6am for plant-manager in production +curl "https://nomad_mcp.dev.meisheng.group/api/logs/errors/plant-manager?namespace=production&start_time=20:00&end_time=06:00" + +# Search for pump issues +curl "https://nomad_mcp.dev.meisheng.group/api/logs/search/plant-manager?q=pump&namespace=production" + +# Get last 50 lines with proper formatting +curl "https://nomad_mcp.dev.meisheng.group/api/logs/job/plant-manager?namespace=production&lines=50&formatted=true" +``` + +Always maintain backward compatibility with existing API endpoints. Follow REST principles. + ## SSL Certificate Management for Internal Services When working with internal/corporate services that use custom Certificate Authorities (CAs): diff --git a/app/routers/logs.py b/app/routers/logs.py index e14a094..1400323 100644 --- a/app/routers/logs.py +++ b/app/routers/logs.py @@ -1,6 +1,8 @@ from fastapi import APIRouter, HTTPException, Query from typing import List, Dict, Any, Optional import logging +import re +from datetime import datetime, time as datetime_time from app.services.nomad_client import NomadService from app.services.config_service import ConfigService @@ -12,6 +14,133 @@ router = APIRouter() nomad_service = NomadService() config_service = ConfigService() +def format_logs_with_line_breaks(logs: str) -> str: + """Format logs with proper line breaks.""" + if not logs: + return logs + + # Ensure proper line breaks + formatted = logs.replace('\\n', '\n') + + # Clean up any double line breaks + formatted = re.sub(r'\n{3,}', '\n\n', formatted) + + return formatted.strip() + +def filter_logs_by_time(logs: str, start_time: str = None, end_time: str = None) -> str: + """Filter logs by time range.""" + if not logs or (not start_time and not end_time): + return logs + + lines = logs.split('\n') + filtered_lines = [] + + for line in lines: + # Extract timestamp from log line (assumes format: YYYY-MM-DD HH:MM:SS) + timestamp_match = re.search(r'(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})', line) + if not timestamp_match: + # If no timestamp, include the line (might be continuation) + filtered_lines.append(line) + continue + + try: + log_time = datetime.strptime(timestamp_match.group(1), '%Y-%m-%d %H:%M:%S') + log_time_only = log_time.time() + + # Parse time filters + include_line = True + + if start_time: + if ':' in start_time and len(start_time.split(':')) == 2: + start_t = datetime.strptime(start_time, '%H:%M').time() + if log_time_only < start_t: + include_line = False + + if end_time and include_line: + if ':' in end_time and len(end_time.split(':')) == 2: + end_t = datetime.strptime(end_time, '%H:%M').time() + if log_time_only > end_t: + include_line = False + + if include_line: + filtered_lines.append(line) + + except ValueError: + # If time parsing fails, include the line + filtered_lines.append(line) + + return '\n'.join(filtered_lines) + +def filter_logs_by_level(logs: str, log_level: str) -> str: + """Filter logs by log level. Supports multiple levels separated by |""" + if not logs or not log_level: + return logs + + lines = logs.split('\n') + filtered_lines = [] + + # Handle multiple log levels separated by | + levels = [level.strip().upper() for level in log_level.split('|')] + level_patterns = [re.compile(rf'\b{level}\b', re.IGNORECASE) for level in levels] + + for line in lines: + if any(pattern.search(line) for pattern in level_patterns): + filtered_lines.append(line) + + return '\n'.join(filtered_lines) + +def search_logs(logs: str, search_term: str) -> str: + """Search logs for specific term.""" + if not logs or not search_term: + return logs + + lines = logs.split('\n') + filtered_lines = [] + search_pattern = re.compile(re.escape(search_term), re.IGNORECASE) + + for line in lines: + if search_pattern.search(line): + filtered_lines.append(line) + + return '\n'.join(filtered_lines) + +def limit_log_lines(logs: str, lines_limit: int) -> str: + """Limit number of log lines returned.""" + if not logs or not lines_limit: + return logs + + lines = logs.split('\n') + return '\n'.join(lines[-lines_limit:]) # Return most recent lines + +def process_logs(logs: str, start_time: str = None, end_time: str = None, + log_level: str = None, search: str = None, lines: int = None, + formatted: bool = True) -> str: + """Process logs with all filters and formatting.""" + if not logs: + return logs + + # Apply formatting first + if formatted: + logs = format_logs_with_line_breaks(logs) + + # Apply time filtering + if start_time or end_time: + logs = filter_logs_by_time(logs, start_time, end_time) + + # Apply log level filtering + if log_level: + logs = filter_logs_by_level(logs, log_level) + + # Apply search filtering + if search: + logs = search_logs(logs, search) + + # Limit lines if specified + if lines: + logs = limit_log_lines(logs, lines) + + return logs + # More specific routes first @router.get("/repository/{repository}") async def get_repository_logs( @@ -132,7 +261,14 @@ async def get_job_logs( namespace: str = Query(None, description="Nomad namespace"), log_type: str = Query("stderr", description="Log type: stdout or stderr"), limit: int = Query(1, description="Number of allocations to return logs for"), - plain_text: bool = Query(False, description="Return plain text logs instead of JSON") + plain_text: bool = Query(False, description="Return plain text logs instead of JSON"), + # New filtering parameters + start_time: str = Query(None, description="Start time filter (YYYY-MM-DD HH:MM or HH:MM)"), + end_time: str = Query(None, description="End time filter (YYYY-MM-DD HH:MM or HH:MM)"), + log_level: str = Query(None, description="Filter by log level: ERROR, WARNING, INFO, DEBUG"), + search: str = Query(None, description="Search term to filter logs"), + lines: int = Query(None, description="Number of log lines to return (most recent)"), + formatted: bool = Query(True, description="Return formatted logs with proper line breaks") ): """Get logs for the most recent allocations of a job.""" # Create a custom service with the specific namespace if provided @@ -179,12 +315,23 @@ async def get_job_logs( logs = custom_nomad.get_allocation_logs(alloc_id, task_name, log_type) # Only add if we got some logs and not an error message if logs and not logs.startswith("No") and not logs.startswith("Error"): + # Process logs with filters + processed_logs = process_logs( + logs, + start_time=start_time, + end_time=end_time, + log_level=log_level, + search=search, + lines=lines, + formatted=formatted + ) + result.append({ "alloc_id": alloc_id, "task": task_name, "type": log_type, "create_time": alloc.get("CreateTime"), - "logs": logs + "logs": processed_logs }) logger.info(f"Successfully retrieved logs for {task_name}") else: @@ -197,7 +344,15 @@ async def get_job_logs( if plain_text: if not result: return "No logs found for this job" - return "\n\n".join([f"=== {r.get('task')} ===\n{r.get('logs')}" for r in result]) + + # Combine all logs with task separators + combined_logs = [] + for r in result: + task_logs = r.get('logs', '') + if task_logs: + combined_logs.append(f"=== {r.get('task')} ===\n{task_logs}") + + return "\n\n".join(combined_logs) # Otherwise return as JSON return { @@ -269,6 +424,49 @@ async def get_build_logs(job_id: str, plain_text: bool = Query(False)): # This is a convenience endpoint that returns stderr logs from the latest allocation return await get_latest_allocation_logs(job_id, "stderr", plain_text) +@router.get("/errors/{job_id}") +async def get_error_logs( + job_id: str, + namespace: str = Query(None, description="Nomad namespace"), + start_time: str = Query(None, description="Start time filter (HH:MM format, e.g., 20:00)"), + end_time: str = Query(None, description="End time filter (HH:MM format, e.g., 06:00)"), + plain_text: bool = Query(True, description="Return plain text logs instead of JSON") +): + """Get error and warning logs for a job, with optional time filtering.""" + return await get_job_logs( + job_id=job_id, + namespace=namespace, + log_type="stderr", + limit=5, # Check more allocations for errors + plain_text=plain_text, + start_time=start_time, + end_time=end_time, + log_level="WARNING|ERROR|EMERGENCY|CRITICAL", # Multiple levels + formatted=True + ) + +@router.get("/search/{job_id}") +async def search_job_logs( + job_id: str, + q: str = Query(..., description="Search term"), + namespace: str = Query(None, description="Nomad namespace"), + log_type: str = Query("stderr", description="Log type: stdout or stderr"), + limit: int = Query(3, description="Number of allocations to search"), + lines: int = Query(100, description="Number of matching lines to return"), + plain_text: bool = Query(True, description="Return plain text logs instead of JSON") +): + """Search job logs for specific terms.""" + return await get_job_logs( + job_id=job_id, + namespace=namespace, + log_type=log_type, + limit=limit, + plain_text=plain_text, + search=q, + lines=lines, + formatted=True + ) + # Generic allocation logs route last @router.get("/allocation/{alloc_id}/{task}") async def get_allocation_logs( diff --git a/mcp_server.py b/mcp_server.py index eb5addf..f5aebd5 100644 --- a/mcp_server.py +++ b/mcp_server.py @@ -9,6 +9,7 @@ import json import logging import os import sys +import requests from typing import Any, Dict, List, Optional from mcp.server import NotificationOptions, Server @@ -172,7 +173,7 @@ async def handle_list_tools() -> List[types.Tool]: ), types.Tool( name="get_job_logs", - description="Get logs for a Nomad job", + description="Get logs for a Nomad job with advanced filtering options", inputSchema={ "type": "object", "properties": { @@ -184,11 +185,103 @@ async def handle_list_tools() -> List[types.Tool]: "type": "string", "description": "Nomad namespace", "default": "development" + }, + "log_type": { + "type": "string", + "description": "Type of logs: stdout or stderr", + "enum": ["stdout", "stderr"], + "default": "stderr" + }, + "start_time": { + "type": "string", + "description": "Start time filter (HH:MM format, e.g., '20:00' for 8 PM)" + }, + "end_time": { + "type": "string", + "description": "End time filter (HH:MM format, e.g., '06:00' for 6 AM)" + }, + "log_level": { + "type": "string", + "description": "Filter by log level: ERROR, WARNING, INFO, DEBUG, EMERGENCY, CRITICAL" + }, + "search": { + "type": "string", + "description": "Search term to filter logs" + }, + "lines": { + "type": "integer", + "description": "Number of recent log lines to return" + }, + "limit": { + "type": "integer", + "description": "Number of allocations to check", + "default": 1 } }, "required": ["job_id"] } ), + types.Tool( + name="get_error_logs", + description="Get only error and warning logs for a Nomad job, useful for troubleshooting", + inputSchema={ + "type": "object", + "properties": { + "job_id": { + "type": "string", + "description": "ID of the job to get error logs for" + }, + "namespace": { + "type": "string", + "description": "Nomad namespace", + "default": "development" + }, + "start_time": { + "type": "string", + "description": "Start time filter (HH:MM format, e.g., '20:00' for 8 PM)" + }, + "end_time": { + "type": "string", + "description": "End time filter (HH:MM format, e.g., '06:00' for 6 AM)" + } + }, + "required": ["job_id"] + } + ), + types.Tool( + name="search_job_logs", + description="Search Nomad job logs for specific terms or patterns", + inputSchema={ + "type": "object", + "properties": { + "job_id": { + "type": "string", + "description": "ID of the job to search logs for" + }, + "search_term": { + "type": "string", + "description": "Term or pattern to search for in logs" + }, + "namespace": { + "type": "string", + "description": "Nomad namespace", + "default": "development" + }, + "log_type": { + "type": "string", + "description": "Type of logs: stdout or stderr", + "enum": ["stdout", "stderr"], + "default": "stderr" + }, + "lines": { + "type": "integer", + "description": "Number of matching lines to return", + "default": 50 + } + }, + "required": ["job_id", "search_term"] + } + ), types.Tool( name="submit_job_file", description="Submit a Nomad job from HCL or JSON file content", @@ -431,56 +524,174 @@ async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[types.T text="Error: job_id is required" )] - # Get allocations - allocations = nomad_service.get_allocations(job_id) - if not allocations: + # Use the enhanced REST API endpoint + import requests + base_url = os.getenv("BASE_URL", "http://localhost:8000") + + # Build query parameters + params = { + "namespace": arguments.get("namespace", namespace), + "log_type": arguments.get("log_type", "stderr"), + "limit": arguments.get("limit", 1), + "plain_text": True, + "formatted": True + } + + # Add optional filters + if arguments.get("start_time"): + params["start_time"] = arguments["start_time"] + if arguments.get("end_time"): + params["end_time"] = arguments["end_time"] + if arguments.get("log_level"): + params["log_level"] = arguments["log_level"] + if arguments.get("search"): + params["search"] = arguments["search"] + if arguments.get("lines"): + params["lines"] = arguments["lines"] + + try: + response = requests.get( + f"{base_url}/api/logs/job/{job_id}", + params=params, + timeout=30 + ) + + if response.status_code == 200: + logs_text = response.text + + result = { + "success": True, + "job_id": job_id, + "namespace": params["namespace"], + "message": f"Retrieved logs for job {job_id}", + "logs": logs_text, + "filters_applied": {k: v for k, v in params.items() if k not in ["namespace", "plain_text", "formatted"]} + } + else: + result = { + "success": False, + "job_id": job_id, + "message": f"Failed to get logs: {response.status_code} - {response.text}", + "logs": None + } + + except Exception as e: result = { "success": False, "job_id": job_id, - "message": f"No allocations found for job {job_id}", + "message": f"Error getting logs: {str(e)}", "logs": None } - else: - # Get latest allocation - sorted_allocations = sorted( - allocations, - key=lambda a: a.get("CreateTime", 0), - reverse=True - ) - latest_alloc = sorted_allocations[0] - alloc_id = latest_alloc.get("ID") - - # Get task name - task_name = None - if "TaskStates" in latest_alloc: - task_states = latest_alloc["TaskStates"] - if task_states: - task_name = next(iter(task_states.keys())) - - if not task_name: - task_name = "app" # Default task name - - # Get logs - stdout_logs = nomad_service.get_allocation_logs(alloc_id, task_name, "stdout") - stderr_logs = nomad_service.get_allocation_logs(alloc_id, task_name, "stderr") - - result = { - "success": True, - "job_id": job_id, - "allocation_id": alloc_id, - "task_name": task_name, - "message": f"Retrieved logs for job {job_id}", - "logs": { - "stdout": stdout_logs, - "stderr": stderr_logs - } - } return [types.TextContent( type="text", - text=json.dumps(result, indent=2) + text=json.dumps(result, indent=2) if not result.get("success") else result["logs"] )] + elif name == "get_error_logs": + job_id = arguments.get("job_id") + + if not job_id: + return [types.TextContent( + type="text", + text="Error: job_id is required" + )] + + # Use the error logs endpoint + import requests + base_url = os.getenv("BASE_URL", "http://localhost:8000") + + params = { + "namespace": arguments.get("namespace", namespace), + "plain_text": True + } + + if arguments.get("start_time"): + params["start_time"] = arguments["start_time"] + if arguments.get("end_time"): + params["end_time"] = arguments["end_time"] + + try: + response = requests.get( + f"{base_url}/api/logs/errors/{job_id}", + params=params, + timeout=30 + ) + + if response.status_code == 200: + logs_text = response.text + + result = { + "success": True, + "job_id": job_id, + "message": f"Retrieved error logs for job {job_id}", + "logs": logs_text + } + + return [types.TextContent( + type="text", + text=logs_text + )] + else: + return [types.TextContent( + type="text", + text=f"Error: Failed to get error logs: {response.status_code} - {response.text}" + )] + + except Exception as e: + return [types.TextContent( + type="text", + text=f"Error getting error logs: {str(e)}" + )] + + elif name == "search_job_logs": + job_id = arguments.get("job_id") + search_term = arguments.get("search_term") + + if not job_id or not search_term: + return [types.TextContent( + type="text", + text="Error: job_id and search_term are required" + )] + + # Use the search logs endpoint + import requests + base_url = os.getenv("BASE_URL", "http://localhost:8000") + + params = { + "q": search_term, + "namespace": arguments.get("namespace", namespace), + "log_type": arguments.get("log_type", "stderr"), + "lines": arguments.get("lines", 50), + "plain_text": True + } + + try: + response = requests.get( + f"{base_url}/api/logs/search/{job_id}", + params=params, + timeout=30 + ) + + if response.status_code == 200: + logs_text = response.text + + return [types.TextContent( + type="text", + text=logs_text + )] + else: + return [types.TextContent( + type="text", + text=f"Error: Failed to search logs: {response.status_code} - {response.text}" + )] + + except Exception as e: + return [types.TextContent( + type="text", + text=f"Error searching logs: {str(e)}" + )] + elif name == "submit_job_file": file_content = arguments.get("file_content") file_type = arguments.get("file_type", "json")