Files
nomad_mcp/app/routers/logs.py
Nicolas Koehl dc2fe4c425 Enhance log analysis with advanced filtering and search capabilities
Add comprehensive log analysis features to improve troubleshooting workflows:
- Time-based filtering (8pm-6am shifts) with HH:MM format support
- Multi-level log filtering (ERROR, WARNING, EMERGENCY, etc.)
- Full-text search across log content with case-insensitive matching
- Proper line break formatting for readable output
- Line count limiting for large log files

New REST API endpoints:
- /api/logs/errors/{job_id} - Get only error/warning logs
- /api/logs/search/{job_id} - Search logs for specific terms
- Enhanced /api/logs/job/{job_id} with filtering parameters

New MCP tools:
- get_error_logs - Streamlined error analysis
- search_job_logs - Pattern-based log searching
- Enhanced get_job_logs with all filtering options

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-06-02 17:42:35 +07:00

491 lines
18 KiB
Python

from fastapi import APIRouter, HTTPException, Query
from typing import List, Dict, Any, Optional
import logging
import re
from datetime import datetime, time as datetime_time
from app.services.nomad_client import NomadService
from app.services.config_service import ConfigService
# Configure logging
logger = logging.getLogger(__name__)
router = APIRouter()
nomad_service = NomadService()
config_service = ConfigService()
def format_logs_with_line_breaks(logs: str) -> str:
"""Format logs with proper line breaks."""
if not logs:
return logs
# Ensure proper line breaks
formatted = logs.replace('\\n', '\n')
# Clean up any double line breaks
formatted = re.sub(r'\n{3,}', '\n\n', formatted)
return formatted.strip()
def filter_logs_by_time(logs: str, start_time: str = None, end_time: str = None) -> str:
"""Filter logs by time range."""
if not logs or (not start_time and not end_time):
return logs
lines = logs.split('\n')
filtered_lines = []
for line in lines:
# Extract timestamp from log line (assumes format: YYYY-MM-DD HH:MM:SS)
timestamp_match = re.search(r'(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})', line)
if not timestamp_match:
# If no timestamp, include the line (might be continuation)
filtered_lines.append(line)
continue
try:
log_time = datetime.strptime(timestamp_match.group(1), '%Y-%m-%d %H:%M:%S')
log_time_only = log_time.time()
# Parse time filters
include_line = True
if start_time:
if ':' in start_time and len(start_time.split(':')) == 2:
start_t = datetime.strptime(start_time, '%H:%M').time()
if log_time_only < start_t:
include_line = False
if end_time and include_line:
if ':' in end_time and len(end_time.split(':')) == 2:
end_t = datetime.strptime(end_time, '%H:%M').time()
if log_time_only > end_t:
include_line = False
if include_line:
filtered_lines.append(line)
except ValueError:
# If time parsing fails, include the line
filtered_lines.append(line)
return '\n'.join(filtered_lines)
def filter_logs_by_level(logs: str, log_level: str) -> str:
"""Filter logs by log level. Supports multiple levels separated by |"""
if not logs or not log_level:
return logs
lines = logs.split('\n')
filtered_lines = []
# Handle multiple log levels separated by |
levels = [level.strip().upper() for level in log_level.split('|')]
level_patterns = [re.compile(rf'\b{level}\b', re.IGNORECASE) for level in levels]
for line in lines:
if any(pattern.search(line) for pattern in level_patterns):
filtered_lines.append(line)
return '\n'.join(filtered_lines)
def search_logs(logs: str, search_term: str) -> str:
"""Search logs for specific term."""
if not logs or not search_term:
return logs
lines = logs.split('\n')
filtered_lines = []
search_pattern = re.compile(re.escape(search_term), re.IGNORECASE)
for line in lines:
if search_pattern.search(line):
filtered_lines.append(line)
return '\n'.join(filtered_lines)
def limit_log_lines(logs: str, lines_limit: int) -> str:
"""Limit number of log lines returned."""
if not logs or not lines_limit:
return logs
lines = logs.split('\n')
return '\n'.join(lines[-lines_limit:]) # Return most recent lines
def process_logs(logs: str, start_time: str = None, end_time: str = None,
log_level: str = None, search: str = None, lines: int = None,
formatted: bool = True) -> str:
"""Process logs with all filters and formatting."""
if not logs:
return logs
# Apply formatting first
if formatted:
logs = format_logs_with_line_breaks(logs)
# Apply time filtering
if start_time or end_time:
logs = filter_logs_by_time(logs, start_time, end_time)
# Apply log level filtering
if log_level:
logs = filter_logs_by_level(logs, log_level)
# Apply search filtering
if search:
logs = search_logs(logs, search)
# Limit lines if specified
if lines:
logs = limit_log_lines(logs, lines)
return logs
# More specific routes first
@router.get("/repository/{repository}")
async def get_repository_logs(
repository: str,
log_type: str = Query("stderr", description="Log type: stdout or stderr"),
limit: int = Query(1, description="Number of allocations to return logs for"),
plain_text: bool = Query(False, description="Return plain text logs instead of JSON")
):
"""Get logs for a repository's associated job."""
# Get the job info for the repository
job_info = config_service.get_job_from_repository(repository)
if not job_info:
raise HTTPException(status_code=404, detail=f"No job found for repository: {repository}")
job_id = job_info.get("job_id")
namespace = job_info.get("namespace")
logger.info(f"Getting logs for job {job_id} in namespace {namespace}")
# Create a custom service with the specific namespace if provided
custom_nomad = NomadService()
if namespace:
custom_nomad.namespace = namespace
# Get allocations for the job
allocations = custom_nomad.get_allocations(job_id)
if not allocations:
raise HTTPException(status_code=404, detail=f"No allocations found for job {job_id}")
logger.info(f"Found {len(allocations)} allocations for job {job_id}")
# Sort allocations by creation time (descending)
sorted_allocations = sorted(
allocations,
key=lambda a: a.get("CreateTime", 0),
reverse=True
)
# Limit the number of allocations
allocations_to_check = sorted_allocations[:limit]
# Also get the job info to determine task names
job = custom_nomad.get_job(job_id)
# Collect logs for each allocation and task
result = []
error_messages = []
for alloc in allocations_to_check:
# Use the full UUID of the allocation
alloc_id = alloc.get("ID")
if not alloc_id:
logger.warning(f"Allocation ID not found in allocation data")
error_messages.append("Allocation ID not found in allocation data")
continue
logger.info(f"Processing allocation {alloc_id} for job {job_id}")
# Get task name from the allocation's TaskStates
task_states = alloc.get("TaskStates", {})
if not task_states:
logger.warning(f"No task states found in allocation {alloc_id}")
error_messages.append(f"No task states found in allocation {alloc_id}")
for task_name, task_state in task_states.items():
try:
logger.info(f"Retrieving logs for allocation {alloc_id}, task {task_name}")
logs = custom_nomad.get_allocation_logs(alloc_id, task_name, log_type)
# Check if logs is an error message
if logs and isinstance(logs, str):
if logs.startswith("Error:") or logs.startswith("No "):
logger.warning(f"Error retrieving logs for {task_name}: {logs}")
error_messages.append(logs)
continue
# Only add if we got some logs
if logs:
result.append({
"alloc_id": alloc_id,
"task": task_name,
"type": log_type,
"create_time": alloc.get("CreateTime"),
"logs": logs
})
logger.info(f"Successfully retrieved logs for {task_name}")
else:
error_msg = f"No logs found for {task_name}"
logger.warning(error_msg)
error_messages.append(error_msg)
except Exception as e:
# Log but continue to try other tasks
error_msg = f"Failed to get logs for {alloc_id}/{task_name}: {str(e)}"
logger.error(error_msg)
error_messages.append(error_msg)
# Return as plain text if requested
if plain_text:
if not result:
if error_messages:
return f"No logs found for this job. Errors: {'; '.join(error_messages)}"
return "No logs found for this job"
return "\n\n".join([f"=== {r.get('task')} ===\n{r.get('logs')}" for r in result])
# Otherwise return as JSON
return {
"job_id": job_id,
"repository": repository,
"namespace": namespace,
"allocation_logs": result,
"errors": error_messages if error_messages else None
}
@router.get("/job/{job_id}")
async def get_job_logs(
job_id: str,
namespace: str = Query(None, description="Nomad namespace"),
log_type: str = Query("stderr", description="Log type: stdout or stderr"),
limit: int = Query(1, description="Number of allocations to return logs for"),
plain_text: bool = Query(False, description="Return plain text logs instead of JSON"),
# New filtering parameters
start_time: str = Query(None, description="Start time filter (YYYY-MM-DD HH:MM or HH:MM)"),
end_time: str = Query(None, description="End time filter (YYYY-MM-DD HH:MM or HH:MM)"),
log_level: str = Query(None, description="Filter by log level: ERROR, WARNING, INFO, DEBUG"),
search: str = Query(None, description="Search term to filter logs"),
lines: int = Query(None, description="Number of log lines to return (most recent)"),
formatted: bool = Query(True, description="Return formatted logs with proper line breaks")
):
"""Get logs for the most recent allocations of a job."""
# Create a custom service with the specific namespace if provided
custom_nomad = NomadService()
if namespace:
custom_nomad.namespace = namespace
logger.info(f"Getting logs for job {job_id} in namespace {namespace}")
else:
logger.info(f"Getting logs for job {job_id} in default namespace")
# Get all allocations for the job
allocations = custom_nomad.get_allocations(job_id)
if not allocations:
raise HTTPException(status_code=404, detail=f"No allocations found for job {job_id}")
logger.info(f"Found {len(allocations)} allocations for job {job_id}")
# Sort allocations by creation time (descending)
sorted_allocations = sorted(
allocations,
key=lambda a: a.get("CreateTime", 0),
reverse=True
)
# Limit the number of allocations
allocations_to_check = sorted_allocations[:limit]
# Collect logs for each allocation and task
result = []
for alloc in allocations_to_check:
alloc_id = alloc.get("ID")
if not alloc_id:
logger.warning(f"Allocation ID not found in allocation data")
continue
logger.info(f"Processing allocation {alloc_id} for job {job_id}")
# Get task names from the allocation's TaskStates
task_states = alloc.get("TaskStates", {})
for task_name, task_state in task_states.items():
try:
logger.info(f"Retrieving logs for allocation {alloc_id}, task {task_name}")
logs = custom_nomad.get_allocation_logs(alloc_id, task_name, log_type)
# Only add if we got some logs and not an error message
if logs and not logs.startswith("No") and not logs.startswith("Error"):
# Process logs with filters
processed_logs = process_logs(
logs,
start_time=start_time,
end_time=end_time,
log_level=log_level,
search=search,
lines=lines,
formatted=formatted
)
result.append({
"alloc_id": alloc_id,
"task": task_name,
"type": log_type,
"create_time": alloc.get("CreateTime"),
"logs": processed_logs
})
logger.info(f"Successfully retrieved logs for {task_name}")
else:
logger.warning(f"No logs found for {task_name}: {logs}")
except Exception as e:
# Log but continue to try other tasks
logger.error(f"Failed to get logs for {alloc_id}/{task_name}: {str(e)}")
# Return as plain text if requested
if plain_text:
if not result:
return "No logs found for this job"
# Combine all logs with task separators
combined_logs = []
for r in result:
task_logs = r.get('logs', '')
if task_logs:
combined_logs.append(f"=== {r.get('task')} ===\n{task_logs}")
return "\n\n".join(combined_logs)
# Otherwise return as JSON
return {
"job_id": job_id,
"namespace": namespace,
"allocation_logs": result
}
@router.get("/latest/{job_id}")
async def get_latest_allocation_logs(
job_id: str,
log_type: str = Query("stderr", description="Log type: stdout or stderr"),
plain_text: bool = Query(False, description="Return plain text logs instead of JSON")
):
"""Get logs from the latest allocation of a job."""
# Get all allocations for the job
allocations = nomad_service.get_allocations(job_id)
if not allocations:
raise HTTPException(status_code=404, detail=f"No allocations found for job {job_id}")
# Sort allocations by creation time (descending)
sorted_allocations = sorted(
allocations,
key=lambda a: a.get("CreateTime", 0),
reverse=True
)
# Get the latest allocation
latest_alloc = sorted_allocations[0]
alloc_id = latest_alloc.get("ID")
# Get task group and task information
job = nomad_service.get_job(job_id)
task_groups = job.get("TaskGroups", [])
# Collect logs for each task in the latest allocation
result = []
for task_group in task_groups:
tasks = task_group.get("Tasks", [])
for task in tasks:
task_name = task.get("Name")
try:
logs = nomad_service.get_allocation_logs(alloc_id, task_name, log_type)
result.append({
"alloc_id": alloc_id,
"task": task_name,
"type": log_type,
"create_time": latest_alloc.get("CreateTime"),
"logs": logs
})
except Exception as e:
# Skip if logs cannot be retrieved for this task
pass
# Return as plain text if requested
if plain_text:
return "\n\n".join([f"=== {r['task']} ===\n{r['logs']}" for r in result])
# Otherwise return as JSON
return {
"job_id": job_id,
"latest_allocation": alloc_id,
"task_logs": result
}
@router.get("/build/{job_id}")
async def get_build_logs(job_id: str, plain_text: bool = Query(False)):
"""Get build logs for a job (usually stderr logs from the latest allocation)."""
# This is a convenience endpoint that returns stderr logs from the latest allocation
return await get_latest_allocation_logs(job_id, "stderr", plain_text)
@router.get("/errors/{job_id}")
async def get_error_logs(
job_id: str,
namespace: str = Query(None, description="Nomad namespace"),
start_time: str = Query(None, description="Start time filter (HH:MM format, e.g., 20:00)"),
end_time: str = Query(None, description="End time filter (HH:MM format, e.g., 06:00)"),
plain_text: bool = Query(True, description="Return plain text logs instead of JSON")
):
"""Get error and warning logs for a job, with optional time filtering."""
return await get_job_logs(
job_id=job_id,
namespace=namespace,
log_type="stderr",
limit=5, # Check more allocations for errors
plain_text=plain_text,
start_time=start_time,
end_time=end_time,
log_level="WARNING|ERROR|EMERGENCY|CRITICAL", # Multiple levels
formatted=True
)
@router.get("/search/{job_id}")
async def search_job_logs(
job_id: str,
q: str = Query(..., description="Search term"),
namespace: str = Query(None, description="Nomad namespace"),
log_type: str = Query("stderr", description="Log type: stdout or stderr"),
limit: int = Query(3, description="Number of allocations to search"),
lines: int = Query(100, description="Number of matching lines to return"),
plain_text: bool = Query(True, description="Return plain text logs instead of JSON")
):
"""Search job logs for specific terms."""
return await get_job_logs(
job_id=job_id,
namespace=namespace,
log_type=log_type,
limit=limit,
plain_text=plain_text,
search=q,
lines=lines,
formatted=True
)
# Generic allocation logs route last
@router.get("/allocation/{alloc_id}/{task}")
async def get_allocation_logs(
alloc_id: str,
task: str,
log_type: str = Query("stderr", description="Log type: stdout or stderr"),
plain_text: bool = Query(False, description="Return plain text logs instead of JSON")
):
"""Get logs for a specific allocation and task."""
# Validate log_type
if log_type not in ["stdout", "stderr"]:
raise HTTPException(status_code=400, detail="Log type must be stdout or stderr")
# Get logs from Nomad
logs = nomad_service.get_allocation_logs(alloc_id, task, log_type)
# Return as plain text if requested
if plain_text:
return logs
# Otherwise return as JSON
return {"alloc_id": alloc_id, "task": task, "type": log_type, "logs": logs}