505 lines
22 KiB
Python
505 lines
22 KiB
Python
import os
|
|
import logging
|
|
import nomad
|
|
from fastapi import HTTPException
|
|
from typing import Dict, Any, Optional, List
|
|
from dotenv import load_dotenv
|
|
import time
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
# Configure logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def get_nomad_client():
|
|
"""
|
|
Create and return a Nomad client using environment variables.
|
|
"""
|
|
try:
|
|
nomad_addr = os.getenv("NOMAD_ADDR", "http://localhost:4646").rstrip('/')
|
|
nomad_token = os.getenv("NOMAD_TOKEN")
|
|
# Use "development" as the default namespace since all jobs are likely to be in this namespace
|
|
nomad_namespace = os.getenv("NOMAD_NAMESPACE", "development")
|
|
|
|
# Ensure namespace is never "*" (wildcard)
|
|
if nomad_namespace == "*":
|
|
nomad_namespace = "development"
|
|
logger.info("Replaced wildcard namespace '*' with 'development'")
|
|
|
|
# Extract host and port from the address
|
|
host_with_port = nomad_addr.replace("http://", "").replace("https://", "")
|
|
host = host_with_port.split(":")[0]
|
|
|
|
# Safely extract port
|
|
port_part = host_with_port.split(":")[-1] if ":" in host_with_port else "4646"
|
|
port = int(port_part.split('/')[0]) # Remove any path components
|
|
|
|
logger.info(f"Creating Nomad client with host={host}, port={port}, namespace={nomad_namespace}")
|
|
|
|
return nomad.Nomad(
|
|
host=host,
|
|
port=port,
|
|
secure=nomad_addr.startswith("https"),
|
|
token=nomad_token,
|
|
timeout=10,
|
|
namespace=nomad_namespace, # Query across development namespace by default
|
|
verify=False if os.getenv("NOMAD_SKIP_VERIFY", "false").lower() == "true" else True
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Failed to create Nomad client: {str(e)}")
|
|
raise HTTPException(status_code=500, detail=f"Failed to connect to Nomad: {str(e)}")
|
|
|
|
class NomadService:
|
|
"""Service for interacting with Nomad API."""
|
|
|
|
def __init__(self):
|
|
self.client = get_nomad_client()
|
|
self.namespace = os.getenv("NOMAD_NAMESPACE", "development") # Use "development" namespace as default
|
|
|
|
def get_job(self, job_id: str, max_retries: int = 3, retry_delay: int = 2) -> Dict[str, Any]:
|
|
"""
|
|
Get a job by ID with retry logic.
|
|
|
|
Args:
|
|
job_id: The ID of the job to retrieve
|
|
max_retries: Maximum number of retry attempts (default: 3)
|
|
retry_delay: Delay between retries in seconds (default: 2)
|
|
|
|
Returns:
|
|
Dict containing job details
|
|
"""
|
|
last_exception = None
|
|
|
|
# Try multiple times to get the job
|
|
for attempt in range(max_retries):
|
|
try:
|
|
# Get the Nomad address from the client
|
|
nomad_addr = f"http://{self.client.host}:{self.client.port}"
|
|
|
|
# Build the URL for the job endpoint
|
|
url = f"{nomad_addr}/v1/job/{job_id}"
|
|
|
|
# Set up headers
|
|
headers = {}
|
|
if hasattr(self.client, 'token') and self.client.token:
|
|
headers["X-Nomad-Token"] = self.client.token
|
|
|
|
# Set up params with the correct namespace
|
|
params = {"namespace": self.namespace}
|
|
|
|
# Make the request directly
|
|
import requests
|
|
response = requests.get(
|
|
url=url,
|
|
headers=headers,
|
|
params=params,
|
|
verify=False if os.getenv("NOMAD_SKIP_VERIFY", "false").lower() == "true" else True
|
|
)
|
|
|
|
# Check if the request was successful
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
elif response.status_code == 404:
|
|
# If not the last attempt, log and retry
|
|
if attempt < max_retries - 1:
|
|
logger.warning(f"Job {job_id} not found on attempt {attempt+1}/{max_retries}, retrying in {retry_delay}s...")
|
|
time.sleep(retry_delay)
|
|
continue
|
|
else:
|
|
raise ValueError(f"Job not found after {max_retries} attempts: {job_id}")
|
|
else:
|
|
raise ValueError(f"Failed to get job: {response.text}")
|
|
|
|
except Exception as e:
|
|
last_exception = e
|
|
# If not the last attempt, log and retry
|
|
if attempt < max_retries - 1:
|
|
logger.warning(f"Error getting job {job_id} on attempt {attempt+1}/{max_retries}: {str(e)}, retrying in {retry_delay}s...")
|
|
time.sleep(retry_delay)
|
|
continue
|
|
else:
|
|
logger.error(f"Failed to get job {job_id} after {max_retries} attempts: {str(e)}")
|
|
raise HTTPException(status_code=404, detail=f"Job not found: {job_id}")
|
|
|
|
# If we get here, all retries failed
|
|
logger.error(f"Failed to get job {job_id} after {max_retries} attempts")
|
|
raise HTTPException(status_code=404, detail=f"Job not found: {job_id}")
|
|
|
|
def list_jobs(self) -> List[Dict[str, Any]]:
|
|
"""List all jobs."""
|
|
try:
|
|
# Get the Nomad address from the client
|
|
nomad_addr = f"http://{self.client.host}:{self.client.port}"
|
|
|
|
# Build the URL for the jobs endpoint
|
|
url = f"{nomad_addr}/v1/jobs"
|
|
|
|
# Set up headers
|
|
headers = {}
|
|
if hasattr(self.client, 'token') and self.client.token:
|
|
headers["X-Nomad-Token"] = self.client.token
|
|
|
|
# Set up params with the correct namespace
|
|
params = {"namespace": self.namespace}
|
|
|
|
# Make the request directly
|
|
import requests
|
|
response = requests.get(
|
|
url=url,
|
|
headers=headers,
|
|
params=params,
|
|
verify=False if os.getenv("NOMAD_SKIP_VERIFY", "false").lower() == "true" else True
|
|
)
|
|
|
|
# Check if the request was successful
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
else:
|
|
raise ValueError(f"Failed to list jobs: {response.text}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to list jobs: {str(e)}")
|
|
raise HTTPException(status_code=500, detail=f"Failed to list jobs: {str(e)}")
|
|
|
|
def start_job(self, job_spec: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Start a job using the provided specification.
|
|
|
|
Args:
|
|
job_spec: The job specification to submit. Can be a raw job spec or wrapped in a "Job" key.
|
|
|
|
Returns:
|
|
Dict containing job_id, eval_id, status, and any warnings
|
|
"""
|
|
try:
|
|
# Extract job ID from specification
|
|
job_id = None
|
|
if "Job" in job_spec:
|
|
job_id = job_spec["Job"].get("ID") or job_spec["Job"].get("id")
|
|
else:
|
|
job_id = job_spec.get("ID") or job_spec.get("id")
|
|
|
|
if not job_id:
|
|
raise ValueError("Job ID is required in the job specification")
|
|
|
|
logger.info(f"Processing job start request for job ID: {job_id}")
|
|
|
|
# Determine the namespace to use, with clear priorities:
|
|
# 1. Explicitly provided in the job spec (highest priority)
|
|
# 2. Service instance namespace
|
|
# 3. Fallback to "development"
|
|
namespace = self.namespace
|
|
|
|
# Normalize the job structure to ensure it has a "Job" wrapper
|
|
normalized_job_spec = {}
|
|
if "Job" in job_spec:
|
|
normalized_job_spec = job_spec
|
|
# Check if namespace is specified in the job spec
|
|
if "Namespace" in job_spec["Job"]:
|
|
namespace = job_spec["Job"]["Namespace"]
|
|
logger.info(f"Using namespace from job spec: {namespace}")
|
|
else:
|
|
# Check if namespace is specified in the job spec
|
|
if "Namespace" in job_spec:
|
|
namespace = job_spec["Namespace"]
|
|
logger.info(f"Using namespace from job spec: {namespace}")
|
|
|
|
# Wrap the job spec in a "Job" key
|
|
normalized_job_spec = {"Job": job_spec}
|
|
|
|
# Replace wildcard namespaces with the default
|
|
if namespace == "*":
|
|
namespace = "development"
|
|
logger.info(f"Replaced wildcard namespace with default: {namespace}")
|
|
|
|
# Always explicitly set the namespace in the job spec
|
|
normalized_job_spec["Job"]["Namespace"] = namespace
|
|
|
|
logger.info(f"Submitting job {job_id} to namespace {namespace}")
|
|
logger.info(f"Job specification structure: {list(normalized_job_spec.keys())}")
|
|
logger.info(f"Job keys: {list(normalized_job_spec['Job'].keys())}")
|
|
|
|
# Submit the job - pass the job_id and job spec directly
|
|
# The namespace is already set in the job spec
|
|
response = self.client.job.register_job(job_id, normalized_job_spec)
|
|
|
|
logger.info(f"Job registration response: {response}")
|
|
|
|
return {
|
|
"job_id": job_id,
|
|
"eval_id": response.get("EvalID"),
|
|
"status": "started",
|
|
"warnings": response.get("Warnings"),
|
|
"namespace": namespace
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Failed to start job: {str(e)}")
|
|
raise HTTPException(status_code=500, detail=f"Failed to start job: {str(e)}")
|
|
|
|
def stop_job(self, job_id: str, purge: bool = False) -> Dict[str, Any]:
|
|
"""
|
|
Stop a job by ID.
|
|
|
|
Args:
|
|
job_id: The ID of the job to stop
|
|
purge: If true, the job will be purged from Nomad's state entirely
|
|
|
|
Returns:
|
|
Dict containing job_id, eval_id, and status
|
|
"""
|
|
try:
|
|
logger.info(f"Stopping job {job_id} in namespace {self.namespace} (purge={purge})")
|
|
|
|
# Get the Nomad address from the client
|
|
nomad_addr = f"http://{self.client.host}:{self.client.port}"
|
|
|
|
# Build the URL for the job endpoint
|
|
url = f"{nomad_addr}/v1/job/{job_id}"
|
|
|
|
# Set up headers
|
|
headers = {}
|
|
if hasattr(self.client, 'token') and self.client.token:
|
|
headers["X-Nomad-Token"] = self.client.token
|
|
|
|
# Set up params with the correct namespace and purge option
|
|
params = {
|
|
"namespace": self.namespace,
|
|
"purge": str(purge).lower()
|
|
}
|
|
|
|
# Make the request directly
|
|
import requests
|
|
response = requests.delete(
|
|
url=url,
|
|
headers=headers,
|
|
params=params,
|
|
verify=False if os.getenv("NOMAD_SKIP_VERIFY", "false").lower() == "true" else True
|
|
)
|
|
|
|
# Check if the request was successful
|
|
if response.status_code == 200:
|
|
response_data = response.json()
|
|
logger.info(f"Job stop response: {response_data}")
|
|
|
|
return {
|
|
"job_id": job_id,
|
|
"eval_id": response_data.get("EvalID"),
|
|
"status": "stopped",
|
|
"namespace": self.namespace
|
|
}
|
|
else:
|
|
raise ValueError(f"Failed to stop job: {response.text}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to stop job {job_id}: {str(e)}")
|
|
raise HTTPException(status_code=500, detail=f"Failed to stop job: {str(e)}")
|
|
|
|
def get_allocations(self, job_id: str) -> List[Dict[str, Any]]:
|
|
"""Get all allocations for a job."""
|
|
try:
|
|
# Get the Nomad address from the client
|
|
nomad_addr = f"http://{self.client.host}:{self.client.port}"
|
|
|
|
# Build the URL for the job allocations endpoint
|
|
url = f"{nomad_addr}/v1/job/{job_id}/allocations"
|
|
|
|
# Set up headers
|
|
headers = {}
|
|
if hasattr(self.client, 'token') and self.client.token:
|
|
headers["X-Nomad-Token"] = self.client.token
|
|
|
|
# Set up params with the correct namespace
|
|
params = {"namespace": self.namespace}
|
|
|
|
# Make the request directly
|
|
import requests
|
|
response = requests.get(
|
|
url=url,
|
|
headers=headers,
|
|
params=params,
|
|
verify=False if os.getenv("NOMAD_SKIP_VERIFY", "false").lower() == "true" else True
|
|
)
|
|
|
|
# Check if the request was successful
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
elif response.status_code == 404:
|
|
logger.warning(f"No allocations found for job {job_id}")
|
|
return []
|
|
else:
|
|
raise ValueError(f"Failed to get allocations: {response.text}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to get allocations for job {job_id}: {str(e)}")
|
|
raise HTTPException(status_code=500, detail=f"Failed to get allocations: {str(e)}")
|
|
|
|
def get_allocation_logs(self, alloc_id: str, task: str, log_type: str = "stderr") -> str:
|
|
"""Get logs for a specific allocation and task."""
|
|
try:
|
|
# More detailed debugging to understand what's happening
|
|
logger.info(f"Getting logs for allocation {alloc_id}, task {task}, type {log_type}")
|
|
|
|
if alloc_id == "repository":
|
|
logger.error("Invalid allocation ID 'repository' detected")
|
|
return f"Error: Invalid allocation ID 'repository'"
|
|
|
|
# Verify the allocation ID is a valid UUID (must be 36 characters)
|
|
if not alloc_id or len(alloc_id) != 36:
|
|
logger.error(f"Invalid allocation ID format: {alloc_id} (length: {len(alloc_id) if alloc_id else 0})")
|
|
return f"Error: Invalid allocation ID format - must be 36 character UUID"
|
|
|
|
# Get allocation info to verify it exists
|
|
try:
|
|
allocation = self.client.allocation.get_allocation(alloc_id)
|
|
if not allocation:
|
|
logger.warning(f"Allocation {alloc_id} not found")
|
|
return f"Allocation {alloc_id} not found"
|
|
except Exception as e:
|
|
logger.error(f"Error checking allocation: {str(e)}")
|
|
return f"Error checking allocation: {str(e)}"
|
|
|
|
# Try multiple approaches to get logs
|
|
log_content = None
|
|
error_messages = []
|
|
|
|
# Approach 1: Standard API
|
|
try:
|
|
logger.info(f"Attempting to get logs using standard API")
|
|
logs = self.client.allocation.logs.get_logs(
|
|
alloc_id,
|
|
task,
|
|
log_type,
|
|
plain=True
|
|
)
|
|
|
|
if logs:
|
|
if isinstance(logs, dict) and logs.get("Data"):
|
|
log_content = logs.get("Data")
|
|
logger.info(f"Successfully retrieved logs using standard API")
|
|
elif isinstance(logs, str):
|
|
log_content = logs
|
|
logger.info(f"Successfully retrieved logs as string")
|
|
else:
|
|
error_messages.append(f"Unexpected log format: {type(logs)}")
|
|
logger.warning(f"Unexpected log format: {type(logs)}")
|
|
else:
|
|
error_messages.append("No logs returned from standard API")
|
|
logger.warning("No logs returned from standard API")
|
|
except Exception as e:
|
|
error_str = str(e)
|
|
error_messages.append(f"Standard API error: {error_str}")
|
|
logger.warning(f"Standard API failed: {error_str}")
|
|
|
|
# Approach 2: Try raw HTTP if the standard API didn't work
|
|
if not log_content:
|
|
try:
|
|
import requests
|
|
|
|
# Get the Nomad address from environment or use default
|
|
nomad_addr = os.getenv("NOMAD_ADDR", "http://localhost:4646").rstrip('/')
|
|
nomad_token = os.getenv("NOMAD_TOKEN")
|
|
|
|
# Construct the URL for logs
|
|
logs_url = f"{nomad_addr}/v1/client/fs/logs/{alloc_id}"
|
|
|
|
# Setup headers
|
|
headers = {}
|
|
if nomad_token:
|
|
headers["X-Nomad-Token"] = nomad_token
|
|
|
|
# Setup query parameters
|
|
params = {
|
|
"task": task,
|
|
"type": log_type,
|
|
"plain": "true"
|
|
}
|
|
|
|
if self.namespace and self.namespace != "*":
|
|
params["namespace"] = self.namespace
|
|
|
|
logger.info(f"Attempting to get logs using direct HTTP request to: {logs_url}")
|
|
response = requests.get(logs_url, headers=headers, params=params, verify=False)
|
|
|
|
if response.status_code == 200:
|
|
log_content = response.text
|
|
logger.info(f"Successfully retrieved logs using direct HTTP request")
|
|
else:
|
|
error_messages.append(f"HTTP request failed with status {response.status_code}: {response.text}")
|
|
logger.warning(f"HTTP request failed: {response.status_code} - {response.text}")
|
|
except ImportError:
|
|
error_messages.append("Requests library not available for fallback HTTP request")
|
|
logger.warning("Requests library not available for fallback HTTP request")
|
|
except Exception as e:
|
|
error_str = str(e)
|
|
error_messages.append(f"HTTP request error: {error_str}")
|
|
logger.warning(f"HTTP request failed: {error_str}")
|
|
|
|
# Approach 3: Direct system call as a last resort
|
|
if not log_content:
|
|
try:
|
|
import subprocess
|
|
|
|
# Get the Nomad command-line client path
|
|
nomad_cmd = "nomad" # Default, assumes nomad is in PATH
|
|
|
|
# Build the command
|
|
cmd_parts = [
|
|
nomad_cmd,
|
|
"alloc", "logs",
|
|
"-verbose",
|
|
]
|
|
|
|
# Add namespace if specified
|
|
if self.namespace and self.namespace != "*":
|
|
cmd_parts.extend(["-namespace", self.namespace])
|
|
|
|
# Add allocation and task info
|
|
cmd_parts.extend(["-job", alloc_id, task])
|
|
|
|
# Use stderr or stdout
|
|
if log_type == "stderr":
|
|
cmd_parts.append("-stderr")
|
|
else:
|
|
cmd_parts.append("-stdout")
|
|
|
|
logger.info(f"Attempting to get logs using command: {' '.join(cmd_parts)}")
|
|
process = subprocess.run(cmd_parts, capture_output=True, text=True)
|
|
|
|
if process.returncode == 0:
|
|
log_content = process.stdout
|
|
logger.info(f"Successfully retrieved logs using command-line client")
|
|
else:
|
|
error_messages.append(f"Command-line client failed: {process.stderr}")
|
|
logger.warning(f"Command-line client failed: {process.stderr}")
|
|
except Exception as e:
|
|
error_str = str(e)
|
|
error_messages.append(f"Command-line client error: {error_str}")
|
|
logger.warning(f"Command-line client failed: {error_str}")
|
|
|
|
# Return the logs if we got them, otherwise return error
|
|
if log_content:
|
|
return log_content
|
|
else:
|
|
error_msg = "; ".join(error_messages)
|
|
logger.error(f"Failed to get logs after multiple attempts: {error_msg}")
|
|
return f"Error retrieving {log_type} logs: {error_msg}"
|
|
|
|
except Exception as e:
|
|
error_str = str(e)
|
|
logger.error(f"Failed to get logs for allocation {alloc_id}, task {task}: {error_str}")
|
|
raise HTTPException(status_code=500, detail=f"Failed to get logs: {error_str}")
|
|
|
|
def get_deployment_status(self, job_id: str) -> Dict[str, Any]:
|
|
"""Get the deployment status for a job."""
|
|
try:
|
|
return self.client.job.get_deployment(job_id, namespace=self.namespace)
|
|
except Exception as e:
|
|
logger.error(f"Failed to get deployment status for job {job_id}: {str(e)}")
|
|
raise HTTPException(status_code=500, detail=f"Failed to get deployment status: {str(e)}")
|
|
|
|
def get_job_evaluations(self, job_id: str) -> List[Dict[str, Any]]:
|
|
"""Get evaluations for a job."""
|
|
try:
|
|
return self.client.job.get_evaluations(job_id, namespace=self.namespace)
|
|
except Exception as e:
|
|
logger.error(f"Failed to get evaluations for job {job_id}: {str(e)}")
|
|
raise HTTPException(status_code=500, detail=f"Failed to get evaluations: {str(e)}") |