Update README.md

This commit is contained in:
2025-02-26 15:25:39 +07:00
parent d6acf632e3
commit baf1723a50
69 changed files with 5525 additions and 0 deletions

1
app/services/__init__.py Normal file
View File

@ -0,0 +1 @@
# Import services

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,299 @@
import os
import yaml
import logging
import json
from typing import Dict, Any, Optional, List
from fastapi import HTTPException
from pathlib import Path
from app.services.gitea_client import GiteaClient
# Configure logging
logger = logging.getLogger(__name__)
# Default configs directory
CONFIG_DIR = os.getenv("CONFIG_DIR", "./configs")
class ConfigService:
"""Service for managing repository to job mappings."""
def __init__(self, config_dir: str = CONFIG_DIR):
self.config_dir = Path(config_dir)
self._ensure_config_dir()
self.gitea_client = GiteaClient()
def _ensure_config_dir(self):
"""Ensure the config directory exists."""
try:
self.config_dir.mkdir(parents=True, exist_ok=True)
except Exception as e:
logger.error(f"Failed to create config directory {self.config_dir}: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to create config directory: {str(e)}")
def list_configs(self) -> List[Dict[str, Any]]:
"""List all available configurations."""
configs = []
try:
for file_path in self.config_dir.glob("*.yaml"):
with open(file_path, "r") as f:
config = yaml.safe_load(f)
config["name"] = file_path.stem
configs.append(config)
return configs
except Exception as e:
logger.error(f"Failed to list configurations: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to list configurations: {str(e)}")
def get_config(self, name: str) -> Dict[str, Any]:
"""Get a specific configuration by name."""
file_path = self.config_dir / f"{name}.yaml"
try:
if not file_path.exists():
raise HTTPException(status_code=404, detail=f"Configuration not found: {name}")
with open(file_path, "r") as f:
config = yaml.safe_load(f)
config["name"] = name
# Enrich with repository information if available
if repository := config.get("repository"):
repo_info = self.gitea_client.get_repository_info(repository)
if repo_info:
config["repository_info"] = {
"description": repo_info.get("description"),
"default_branch": repo_info.get("default_branch"),
"stars": repo_info.get("stars_count"),
"forks": repo_info.get("forks_count"),
"owner": repo_info.get("owner", {}).get("login"),
"html_url": repo_info.get("html_url"),
}
return config
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to read configuration {name}: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to read configuration: {str(e)}")
def create_config(self, name: str, config: Dict[str, Any]) -> Dict[str, Any]:
"""Create a new configuration."""
file_path = self.config_dir / f"{name}.yaml"
try:
if file_path.exists():
raise HTTPException(status_code=409, detail=f"Configuration already exists: {name}")
# Validate required fields
required_fields = ["repository", "job_id"]
for field in required_fields:
if field not in config:
raise HTTPException(status_code=400, detail=f"Missing required field: {field}")
# Validate repository exists if Gitea integration is configured
if not self.gitea_client.check_repository_exists(config["repository"]):
raise HTTPException(status_code=400, detail=f"Repository not found: {config['repository']}")
# Add name to the config
config["name"] = name
# Get repository alias if not provided
if "repository_alias" not in config:
try:
owner, repo = self.gitea_client.parse_repo_url(config["repository"])
config["repository_alias"] = repo
except:
# Use job_id as fallback
config["repository_alias"] = config["job_id"]
# Write config to file
with open(file_path, "w") as f:
yaml.dump(config, f, default_flow_style=False)
return config
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to create configuration {name}: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to create configuration: {str(e)}")
def update_config(self, name: str, config: Dict[str, Any]) -> Dict[str, Any]:
"""Update an existing configuration."""
file_path = self.config_dir / f"{name}.yaml"
try:
if not file_path.exists():
raise HTTPException(status_code=404, detail=f"Configuration not found: {name}")
# Read existing config
with open(file_path, "r") as f:
existing_config = yaml.safe_load(f)
# Update with new values
for key, value in config.items():
existing_config[key] = value
# Validate repository exists if changed and Gitea integration is configured
if "repository" in config and config["repository"] != existing_config.get("repository"):
if not self.gitea_client.check_repository_exists(config["repository"]):
raise HTTPException(status_code=400, detail=f"Repository not found: {config['repository']}")
# Validate required fields
required_fields = ["repository", "job_id"]
for field in required_fields:
if field not in existing_config:
raise HTTPException(status_code=400, detail=f"Missing required field: {field}")
# Add name to the config
existing_config["name"] = name
# Update repository alias if repository changed
if "repository" in config and "repository_alias" not in config:
try:
owner, repo = self.gitea_client.parse_repo_url(existing_config["repository"])
existing_config["repository_alias"] = repo
except:
pass
# Write config to file
with open(file_path, "w") as f:
yaml.dump(existing_config, f, default_flow_style=False)
return existing_config
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to update configuration {name}: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to update configuration: {str(e)}")
def delete_config(self, name: str) -> Dict[str, Any]:
"""Delete a configuration."""
file_path = self.config_dir / f"{name}.yaml"
try:
if not file_path.exists():
raise HTTPException(status_code=404, detail=f"Configuration not found: {name}")
# Get the config before deleting
with open(file_path, "r") as f:
config = yaml.safe_load(f)
config["name"] = name
# Delete the file
file_path.unlink()
return {"name": name, "status": "deleted"}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to delete configuration {name}: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to delete configuration: {str(e)}")
def get_job_from_repository(self, repository: str) -> Optional[Dict[str, str]]:
"""Find job_id and namespace associated with a repository."""
try:
for config in self.list_configs():
if config.get("repository") == repository or config.get("repository_alias") == repository:
return {
"job_id": config.get("job_id"),
"namespace": config.get("namespace")
}
return None
except Exception as e:
logger.error(f"Failed to find job for repository {repository}: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to find job for repository: {str(e)}")
def get_repository_from_job(self, job_id: str) -> Optional[str]:
"""Find repository associated with a job_id."""
try:
for config in self.list_configs():
if config.get("job_id") == job_id:
return config.get("repository")
return None
except Exception as e:
logger.error(f"Failed to find repository for job {job_id}: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to find repository for job: {str(e)}")
def get_config_by_repository(self, repository: str) -> Optional[Dict[str, Any]]:
"""Find configuration by repository URL or alias."""
try:
for config in self.list_configs():
if config.get("repository") == repository or config.get("repository_alias") == repository:
return self.get_config(config.get("name"))
return None
except Exception as e:
logger.error(f"Failed to find config for repository {repository}: {str(e)}")
return None
def get_job_spec_from_repository(self, repository: str) -> Optional[Dict[str, Any]]:
"""Get job specification from repository config and template."""
try:
# Get the repository configuration
config = self.get_config_by_repository(repository)
if not config:
logger.error(f"No configuration found for repository: {repository}")
return None
# Check if the job template is specified
job_template = config.get("job_template")
if not job_template:
logger.error(f"No job template specified for repository: {repository}")
return None
# Read the job template file
template_path = Path(self.config_dir) / "templates" / f"{job_template}.json"
if not template_path.exists():
logger.error(f"Job template not found: {job_template}")
return None
try:
with open(template_path, "r") as f:
job_spec = json.load(f)
except Exception as e:
logger.error(f"Failed to read job template {job_template}: {str(e)}")
return None
# Apply configuration parameters to the template
job_spec["ID"] = config.get("job_id")
job_spec["Name"] = config.get("job_id")
# Apply other customizations from config
if env_vars := config.get("environment_variables"):
for task_group in job_spec.get("TaskGroups", []):
for task in task_group.get("Tasks", []):
if "Env" not in task:
task["Env"] = {}
task["Env"].update(env_vars)
if meta := config.get("metadata"):
job_spec["Meta"] = meta
# Add repository info to the metadata
if "Meta" not in job_spec:
job_spec["Meta"] = {}
job_spec["Meta"]["repository"] = repository
# Override specific job parameters if specified in config
if job_params := config.get("job_parameters"):
for param_key, param_value in job_params.items():
# Handle nested parameters with dot notation (e.g., "TaskGroups.0.Tasks.0.Config.image")
if "." in param_key:
parts = param_key.split(".")
current = job_spec
for part in parts[:-1]:
# Handle array indices
if part.isdigit() and isinstance(current, list):
current = current[int(part)]
elif part in current:
current = current[part]
else:
break
else:
# Only set the value if we successfully navigated the path
current[parts[-1]] = param_value
else:
# Direct parameter
job_spec[param_key] = param_value
logger.info(f"Generated job specification for repository {repository} using template {job_template}")
return job_spec
except Exception as e:
logger.error(f"Failed to get job specification for repository {repository}: {str(e)}")
return None

View File

@ -0,0 +1,180 @@
import os
import logging
import requests
from typing import Dict, Any, List, Optional, Tuple
from urllib.parse import urlparse
from fastapi import HTTPException
# Configure logging
logger = logging.getLogger(__name__)
class GiteaClient:
"""Client for interacting with Gitea API."""
def __init__(self):
"""Initialize Gitea client with configuration from environment variables."""
self.api_base_url = os.getenv("GITEA_API_URL", "").rstrip("/")
self.token = os.getenv("GITEA_API_TOKEN")
self.username = os.getenv("GITEA_USERNAME")
self.verify_ssl = os.getenv("GITEA_VERIFY_SSL", "true").lower() == "true"
if not self.api_base_url:
logger.warning("GITEA_API_URL is not configured. Gitea integration will not work.")
if not self.token and (self.username and os.getenv("GITEA_PASSWORD")):
self.token = self._get_token_from_credentials()
def _get_token_from_credentials(self) -> Optional[str]:
"""Get a token using username and password if provided."""
try:
response = requests.post(
f"{self.api_base_url}/users/{self.username}/tokens",
auth=(self.username, os.getenv("GITEA_PASSWORD", "")),
json={
"name": "nomad-mcp-service",
"scopes": ["repo", "read:org"]
},
verify=self.verify_ssl
)
if response.status_code == 201:
return response.json().get("sha1")
else:
logger.error(f"Failed to get Gitea token: {response.text}")
return None
except Exception as e:
logger.error(f"Failed to get Gitea token: {str(e)}")
return None
def _get_headers(self) -> Dict[str, str]:
"""Get request headers with authentication."""
headers = {
"Content-Type": "application/json",
"Accept": "application/json"
}
if self.token:
headers["Authorization"] = f"token {self.token}"
return headers
def parse_repo_url(self, repo_url: str) -> Tuple[str, str]:
"""
Parse a Gitea repository URL to extract owner and repo name.
Examples:
- http://gitea.internal.example.com/username/repo-name -> (username, repo-name)
- https://gitea.example.com/org/project -> (org, project)
"""
try:
# Parse the URL
parsed_url = urlparse(repo_url)
# Get the path and remove leading/trailing slashes
path = parsed_url.path.strip("/")
# Split the path
parts = path.split("/")
if len(parts) < 2:
raise ValueError(f"Invalid repository URL: {repo_url}")
# Extract owner and repo
owner = parts[0]
repo = parts[1]
return owner, repo
except Exception as e:
logger.error(f"Failed to parse repository URL: {repo_url}, error: {str(e)}")
raise ValueError(f"Invalid repository URL: {repo_url}")
def check_repository_exists(self, repo_url: str) -> bool:
"""Check if a repository exists in Gitea."""
if not self.api_base_url:
# No Gitea integration configured, assume repository exists
return True
try:
owner, repo = self.parse_repo_url(repo_url)
response = requests.get(
f"{self.api_base_url}/repos/{owner}/{repo}",
headers=self._get_headers(),
verify=self.verify_ssl
)
return response.status_code == 200
except Exception as e:
logger.error(f"Failed to check repository: {repo_url}, error: {str(e)}")
return False
def get_repository_info(self, repo_url: str) -> Optional[Dict[str, Any]]:
"""Get repository information from Gitea."""
if not self.api_base_url:
# No Gitea integration configured
return None
try:
owner, repo = self.parse_repo_url(repo_url)
response = requests.get(
f"{self.api_base_url}/repos/{owner}/{repo}",
headers=self._get_headers(),
verify=self.verify_ssl
)
if response.status_code == 200:
return response.json()
else:
logger.error(f"Failed to get repository info: {response.text}")
return None
except Exception as e:
logger.error(f"Failed to get repository info: {repo_url}, error: {str(e)}")
return None
def list_repositories(self, limit: int = 100) -> List[Dict[str, Any]]:
"""List available repositories from Gitea."""
if not self.api_base_url:
# No Gitea integration configured
return []
try:
response = requests.get(
f"{self.api_base_url}/user/repos",
headers=self._get_headers(),
params={"limit": limit},
verify=self.verify_ssl
)
if response.status_code == 200:
return response.json()
else:
logger.error(f"Failed to list repositories: {response.text}")
return []
except Exception as e:
logger.error(f"Failed to list repositories: {str(e)}")
return []
def get_repository_branches(self, repo_url: str) -> List[Dict[str, Any]]:
"""Get branches for a repository."""
if not self.api_base_url:
# No Gitea integration configured
return []
try:
owner, repo = self.parse_repo_url(repo_url)
response = requests.get(
f"{self.api_base_url}/repos/{owner}/{repo}/branches",
headers=self._get_headers(),
verify=self.verify_ssl
)
if response.status_code == 200:
return response.json()
else:
logger.error(f"Failed to get repository branches: {response.text}")
return []
except Exception as e:
logger.error(f"Failed to get repository branches: {repo_url}, error: {str(e)}")
return []

View File

@ -0,0 +1,505 @@
import os
import logging
import nomad
from fastapi import HTTPException
from typing import Dict, Any, Optional, List
from dotenv import load_dotenv
import time
# Load environment variables
load_dotenv()
# Configure logging
logger = logging.getLogger(__name__)
def get_nomad_client():
"""
Create and return a Nomad client using environment variables.
"""
try:
nomad_addr = os.getenv("NOMAD_ADDR", "http://localhost:4646").rstrip('/')
nomad_token = os.getenv("NOMAD_TOKEN")
# Use "development" as the default namespace since all jobs are likely to be in this namespace
nomad_namespace = os.getenv("NOMAD_NAMESPACE", "development")
# Ensure namespace is never "*" (wildcard)
if nomad_namespace == "*":
nomad_namespace = "development"
logger.info("Replaced wildcard namespace '*' with 'development'")
# Extract host and port from the address
host_with_port = nomad_addr.replace("http://", "").replace("https://", "")
host = host_with_port.split(":")[0]
# Safely extract port
port_part = host_with_port.split(":")[-1] if ":" in host_with_port else "4646"
port = int(port_part.split('/')[0]) # Remove any path components
logger.info(f"Creating Nomad client with host={host}, port={port}, namespace={nomad_namespace}")
return nomad.Nomad(
host=host,
port=port,
secure=nomad_addr.startswith("https"),
token=nomad_token,
timeout=10,
namespace=nomad_namespace, # Query across development namespace by default
verify=False if os.getenv("NOMAD_SKIP_VERIFY", "false").lower() == "true" else True
)
except Exception as e:
logger.error(f"Failed to create Nomad client: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to connect to Nomad: {str(e)}")
class NomadService:
"""Service for interacting with Nomad API."""
def __init__(self):
self.client = get_nomad_client()
self.namespace = os.getenv("NOMAD_NAMESPACE", "development") # Use "development" namespace as default
def get_job(self, job_id: str, max_retries: int = 3, retry_delay: int = 2) -> Dict[str, Any]:
"""
Get a job by ID with retry logic.
Args:
job_id: The ID of the job to retrieve
max_retries: Maximum number of retry attempts (default: 3)
retry_delay: Delay between retries in seconds (default: 2)
Returns:
Dict containing job details
"""
last_exception = None
# Try multiple times to get the job
for attempt in range(max_retries):
try:
# Get the Nomad address from the client
nomad_addr = f"http://{self.client.host}:{self.client.port}"
# Build the URL for the job endpoint
url = f"{nomad_addr}/v1/job/{job_id}"
# Set up headers
headers = {}
if hasattr(self.client, 'token') and self.client.token:
headers["X-Nomad-Token"] = self.client.token
# Set up params with the correct namespace
params = {"namespace": self.namespace}
# Make the request directly
import requests
response = requests.get(
url=url,
headers=headers,
params=params,
verify=False if os.getenv("NOMAD_SKIP_VERIFY", "false").lower() == "true" else True
)
# Check if the request was successful
if response.status_code == 200:
return response.json()
elif response.status_code == 404:
# If not the last attempt, log and retry
if attempt < max_retries - 1:
logger.warning(f"Job {job_id} not found on attempt {attempt+1}/{max_retries}, retrying in {retry_delay}s...")
time.sleep(retry_delay)
continue
else:
raise ValueError(f"Job not found after {max_retries} attempts: {job_id}")
else:
raise ValueError(f"Failed to get job: {response.text}")
except Exception as e:
last_exception = e
# If not the last attempt, log and retry
if attempt < max_retries - 1:
logger.warning(f"Error getting job {job_id} on attempt {attempt+1}/{max_retries}: {str(e)}, retrying in {retry_delay}s...")
time.sleep(retry_delay)
continue
else:
logger.error(f"Failed to get job {job_id} after {max_retries} attempts: {str(e)}")
raise HTTPException(status_code=404, detail=f"Job not found: {job_id}")
# If we get here, all retries failed
logger.error(f"Failed to get job {job_id} after {max_retries} attempts")
raise HTTPException(status_code=404, detail=f"Job not found: {job_id}")
def list_jobs(self) -> List[Dict[str, Any]]:
"""List all jobs."""
try:
# Get the Nomad address from the client
nomad_addr = f"http://{self.client.host}:{self.client.port}"
# Build the URL for the jobs endpoint
url = f"{nomad_addr}/v1/jobs"
# Set up headers
headers = {}
if hasattr(self.client, 'token') and self.client.token:
headers["X-Nomad-Token"] = self.client.token
# Set up params with the correct namespace
params = {"namespace": self.namespace}
# Make the request directly
import requests
response = requests.get(
url=url,
headers=headers,
params=params,
verify=False if os.getenv("NOMAD_SKIP_VERIFY", "false").lower() == "true" else True
)
# Check if the request was successful
if response.status_code == 200:
return response.json()
else:
raise ValueError(f"Failed to list jobs: {response.text}")
except Exception as e:
logger.error(f"Failed to list jobs: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to list jobs: {str(e)}")
def start_job(self, job_spec: Dict[str, Any]) -> Dict[str, Any]:
"""
Start a job using the provided specification.
Args:
job_spec: The job specification to submit. Can be a raw job spec or wrapped in a "Job" key.
Returns:
Dict containing job_id, eval_id, status, and any warnings
"""
try:
# Extract job ID from specification
job_id = None
if "Job" in job_spec:
job_id = job_spec["Job"].get("ID") or job_spec["Job"].get("id")
else:
job_id = job_spec.get("ID") or job_spec.get("id")
if not job_id:
raise ValueError("Job ID is required in the job specification")
logger.info(f"Processing job start request for job ID: {job_id}")
# Determine the namespace to use, with clear priorities:
# 1. Explicitly provided in the job spec (highest priority)
# 2. Service instance namespace
# 3. Fallback to "development"
namespace = self.namespace
# Normalize the job structure to ensure it has a "Job" wrapper
normalized_job_spec = {}
if "Job" in job_spec:
normalized_job_spec = job_spec
# Check if namespace is specified in the job spec
if "Namespace" in job_spec["Job"]:
namespace = job_spec["Job"]["Namespace"]
logger.info(f"Using namespace from job spec: {namespace}")
else:
# Check if namespace is specified in the job spec
if "Namespace" in job_spec:
namespace = job_spec["Namespace"]
logger.info(f"Using namespace from job spec: {namespace}")
# Wrap the job spec in a "Job" key
normalized_job_spec = {"Job": job_spec}
# Replace wildcard namespaces with the default
if namespace == "*":
namespace = "development"
logger.info(f"Replaced wildcard namespace with default: {namespace}")
# Always explicitly set the namespace in the job spec
normalized_job_spec["Job"]["Namespace"] = namespace
logger.info(f"Submitting job {job_id} to namespace {namespace}")
logger.info(f"Job specification structure: {list(normalized_job_spec.keys())}")
logger.info(f"Job keys: {list(normalized_job_spec['Job'].keys())}")
# Submit the job - pass the job_id and job spec directly
# The namespace is already set in the job spec
response = self.client.job.register_job(job_id, normalized_job_spec)
logger.info(f"Job registration response: {response}")
return {
"job_id": job_id,
"eval_id": response.get("EvalID"),
"status": "started",
"warnings": response.get("Warnings"),
"namespace": namespace
}
except Exception as e:
logger.error(f"Failed to start job: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to start job: {str(e)}")
def stop_job(self, job_id: str, purge: bool = False) -> Dict[str, Any]:
"""
Stop a job by ID.
Args:
job_id: The ID of the job to stop
purge: If true, the job will be purged from Nomad's state entirely
Returns:
Dict containing job_id, eval_id, and status
"""
try:
logger.info(f"Stopping job {job_id} in namespace {self.namespace} (purge={purge})")
# Get the Nomad address from the client
nomad_addr = f"http://{self.client.host}:{self.client.port}"
# Build the URL for the job endpoint
url = f"{nomad_addr}/v1/job/{job_id}"
# Set up headers
headers = {}
if hasattr(self.client, 'token') and self.client.token:
headers["X-Nomad-Token"] = self.client.token
# Set up params with the correct namespace and purge option
params = {
"namespace": self.namespace,
"purge": str(purge).lower()
}
# Make the request directly
import requests
response = requests.delete(
url=url,
headers=headers,
params=params,
verify=False if os.getenv("NOMAD_SKIP_VERIFY", "false").lower() == "true" else True
)
# Check if the request was successful
if response.status_code == 200:
response_data = response.json()
logger.info(f"Job stop response: {response_data}")
return {
"job_id": job_id,
"eval_id": response_data.get("EvalID"),
"status": "stopped",
"namespace": self.namespace
}
else:
raise ValueError(f"Failed to stop job: {response.text}")
except Exception as e:
logger.error(f"Failed to stop job {job_id}: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to stop job: {str(e)}")
def get_allocations(self, job_id: str) -> List[Dict[str, Any]]:
"""Get all allocations for a job."""
try:
# Get the Nomad address from the client
nomad_addr = f"http://{self.client.host}:{self.client.port}"
# Build the URL for the job allocations endpoint
url = f"{nomad_addr}/v1/job/{job_id}/allocations"
# Set up headers
headers = {}
if hasattr(self.client, 'token') and self.client.token:
headers["X-Nomad-Token"] = self.client.token
# Set up params with the correct namespace
params = {"namespace": self.namespace}
# Make the request directly
import requests
response = requests.get(
url=url,
headers=headers,
params=params,
verify=False if os.getenv("NOMAD_SKIP_VERIFY", "false").lower() == "true" else True
)
# Check if the request was successful
if response.status_code == 200:
return response.json()
elif response.status_code == 404:
logger.warning(f"No allocations found for job {job_id}")
return []
else:
raise ValueError(f"Failed to get allocations: {response.text}")
except Exception as e:
logger.error(f"Failed to get allocations for job {job_id}: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to get allocations: {str(e)}")
def get_allocation_logs(self, alloc_id: str, task: str, log_type: str = "stderr") -> str:
"""Get logs for a specific allocation and task."""
try:
# More detailed debugging to understand what's happening
logger.info(f"Getting logs for allocation {alloc_id}, task {task}, type {log_type}")
if alloc_id == "repository":
logger.error("Invalid allocation ID 'repository' detected")
return f"Error: Invalid allocation ID 'repository'"
# Verify the allocation ID is a valid UUID (must be 36 characters)
if not alloc_id or len(alloc_id) != 36:
logger.error(f"Invalid allocation ID format: {alloc_id} (length: {len(alloc_id) if alloc_id else 0})")
return f"Error: Invalid allocation ID format - must be 36 character UUID"
# Get allocation info to verify it exists
try:
allocation = self.client.allocation.get_allocation(alloc_id)
if not allocation:
logger.warning(f"Allocation {alloc_id} not found")
return f"Allocation {alloc_id} not found"
except Exception as e:
logger.error(f"Error checking allocation: {str(e)}")
return f"Error checking allocation: {str(e)}"
# Try multiple approaches to get logs
log_content = None
error_messages = []
# Approach 1: Standard API
try:
logger.info(f"Attempting to get logs using standard API")
logs = self.client.allocation.logs.get_logs(
alloc_id,
task,
log_type,
plain=True
)
if logs:
if isinstance(logs, dict) and logs.get("Data"):
log_content = logs.get("Data")
logger.info(f"Successfully retrieved logs using standard API")
elif isinstance(logs, str):
log_content = logs
logger.info(f"Successfully retrieved logs as string")
else:
error_messages.append(f"Unexpected log format: {type(logs)}")
logger.warning(f"Unexpected log format: {type(logs)}")
else:
error_messages.append("No logs returned from standard API")
logger.warning("No logs returned from standard API")
except Exception as e:
error_str = str(e)
error_messages.append(f"Standard API error: {error_str}")
logger.warning(f"Standard API failed: {error_str}")
# Approach 2: Try raw HTTP if the standard API didn't work
if not log_content:
try:
import requests
# Get the Nomad address from environment or use default
nomad_addr = os.getenv("NOMAD_ADDR", "http://localhost:4646").rstrip('/')
nomad_token = os.getenv("NOMAD_TOKEN")
# Construct the URL for logs
logs_url = f"{nomad_addr}/v1/client/fs/logs/{alloc_id}"
# Setup headers
headers = {}
if nomad_token:
headers["X-Nomad-Token"] = nomad_token
# Setup query parameters
params = {
"task": task,
"type": log_type,
"plain": "true"
}
if self.namespace and self.namespace != "*":
params["namespace"] = self.namespace
logger.info(f"Attempting to get logs using direct HTTP request to: {logs_url}")
response = requests.get(logs_url, headers=headers, params=params, verify=False)
if response.status_code == 200:
log_content = response.text
logger.info(f"Successfully retrieved logs using direct HTTP request")
else:
error_messages.append(f"HTTP request failed with status {response.status_code}: {response.text}")
logger.warning(f"HTTP request failed: {response.status_code} - {response.text}")
except ImportError:
error_messages.append("Requests library not available for fallback HTTP request")
logger.warning("Requests library not available for fallback HTTP request")
except Exception as e:
error_str = str(e)
error_messages.append(f"HTTP request error: {error_str}")
logger.warning(f"HTTP request failed: {error_str}")
# Approach 3: Direct system call as a last resort
if not log_content:
try:
import subprocess
# Get the Nomad command-line client path
nomad_cmd = "nomad" # Default, assumes nomad is in PATH
# Build the command
cmd_parts = [
nomad_cmd,
"alloc", "logs",
"-verbose",
]
# Add namespace if specified
if self.namespace and self.namespace != "*":
cmd_parts.extend(["-namespace", self.namespace])
# Add allocation and task info
cmd_parts.extend(["-job", alloc_id, task])
# Use stderr or stdout
if log_type == "stderr":
cmd_parts.append("-stderr")
else:
cmd_parts.append("-stdout")
logger.info(f"Attempting to get logs using command: {' '.join(cmd_parts)}")
process = subprocess.run(cmd_parts, capture_output=True, text=True)
if process.returncode == 0:
log_content = process.stdout
logger.info(f"Successfully retrieved logs using command-line client")
else:
error_messages.append(f"Command-line client failed: {process.stderr}")
logger.warning(f"Command-line client failed: {process.stderr}")
except Exception as e:
error_str = str(e)
error_messages.append(f"Command-line client error: {error_str}")
logger.warning(f"Command-line client failed: {error_str}")
# Return the logs if we got them, otherwise return error
if log_content:
return log_content
else:
error_msg = "; ".join(error_messages)
logger.error(f"Failed to get logs after multiple attempts: {error_msg}")
return f"Error retrieving {log_type} logs: {error_msg}"
except Exception as e:
error_str = str(e)
logger.error(f"Failed to get logs for allocation {alloc_id}, task {task}: {error_str}")
raise HTTPException(status_code=500, detail=f"Failed to get logs: {error_str}")
def get_deployment_status(self, job_id: str) -> Dict[str, Any]:
"""Get the deployment status for a job."""
try:
return self.client.job.get_deployment(job_id, namespace=self.namespace)
except Exception as e:
logger.error(f"Failed to get deployment status for job {job_id}: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to get deployment status: {str(e)}")
def get_job_evaluations(self, job_id: str) -> List[Dict[str, Any]]:
"""Get evaluations for a job."""
try:
return self.client.job.get_evaluations(job_id, namespace=self.namespace)
except Exception as e:
logger.error(f"Failed to get evaluations for job {job_id}: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to get evaluations: {str(e)}")