import os import logging import nomad from fastapi import HTTPException from typing import Dict, Any, Optional, List from dotenv import load_dotenv import time # Load environment variables load_dotenv() # Configure logging logger = logging.getLogger(__name__) def get_nomad_client(): """ Create and return a Nomad client using environment variables. """ try: nomad_addr = os.getenv("NOMAD_ADDR", "http://localhost:4646").rstrip('/') nomad_token = os.getenv("NOMAD_TOKEN") # Use "development" as the default namespace since all jobs are likely to be in this namespace nomad_namespace = os.getenv("NOMAD_NAMESPACE", "development") # Ensure namespace is never "*" (wildcard) if nomad_namespace == "*": nomad_namespace = "development" logger.info("Replaced wildcard namespace '*' with 'development'") # Extract host and port from the address host_with_port = nomad_addr.replace("http://", "").replace("https://", "") host = host_with_port.split(":")[0] # Safely extract port port_part = host_with_port.split(":")[-1] if ":" in host_with_port else "4646" port = int(port_part.split('/')[0]) # Remove any path components logger.info(f"Creating Nomad client with host={host}, port={port}, namespace={nomad_namespace}") return nomad.Nomad( host=host, port=port, secure=nomad_addr.startswith("https"), token=nomad_token, timeout=10, namespace=nomad_namespace, # Query across development namespace by default verify=False if os.getenv("NOMAD_SKIP_VERIFY", "false").lower() == "true" else True ) except Exception as e: logger.error(f"Failed to create Nomad client: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to connect to Nomad: {str(e)}") class NomadService: """Service for interacting with Nomad API.""" def __init__(self): self.client = get_nomad_client() self.namespace = os.getenv("NOMAD_NAMESPACE", "development") # Use "development" namespace as default def get_job(self, job_id: str, max_retries: int = 3, retry_delay: int = 2) -> Dict[str, Any]: """ Get a job by ID with retry logic. Args: job_id: The ID of the job to retrieve max_retries: Maximum number of retry attempts (default: 3) retry_delay: Delay between retries in seconds (default: 2) Returns: Dict containing job details """ last_exception = None # Try multiple times to get the job for attempt in range(max_retries): try: # Get the Nomad address from the client nomad_addr = f"http://{self.client.host}:{self.client.port}" # Build the URL for the job endpoint url = f"{nomad_addr}/v1/job/{job_id}" # Set up headers headers = {} if hasattr(self.client, 'token') and self.client.token: headers["X-Nomad-Token"] = self.client.token # Set up params with the correct namespace params = {"namespace": self.namespace} # Make the request directly import requests response = requests.get( url=url, headers=headers, params=params, verify=False if os.getenv("NOMAD_SKIP_VERIFY", "false").lower() == "true" else True ) # Check if the request was successful if response.status_code == 200: return response.json() elif response.status_code == 404: # If not the last attempt, log and retry if attempt < max_retries - 1: logger.warning(f"Job {job_id} not found on attempt {attempt+1}/{max_retries}, retrying in {retry_delay}s...") time.sleep(retry_delay) continue else: raise ValueError(f"Job not found after {max_retries} attempts: {job_id}") else: raise ValueError(f"Failed to get job: {response.text}") except Exception as e: last_exception = e # If not the last attempt, log and retry if attempt < max_retries - 1: logger.warning(f"Error getting job {job_id} on attempt {attempt+1}/{max_retries}: {str(e)}, retrying in {retry_delay}s...") time.sleep(retry_delay) continue else: logger.error(f"Failed to get job {job_id} after {max_retries} attempts: {str(e)}") raise HTTPException(status_code=404, detail=f"Job not found: {job_id}") # If we get here, all retries failed logger.error(f"Failed to get job {job_id} after {max_retries} attempts") raise HTTPException(status_code=404, detail=f"Job not found: {job_id}") def list_jobs(self) -> List[Dict[str, Any]]: """List all jobs.""" try: # Get the Nomad address from the client nomad_addr = f"http://{self.client.host}:{self.client.port}" # Build the URL for the jobs endpoint url = f"{nomad_addr}/v1/jobs" # Set up headers headers = {} if hasattr(self.client, 'token') and self.client.token: headers["X-Nomad-Token"] = self.client.token # Set up params with the correct namespace params = {"namespace": self.namespace} # Make the request directly import requests response = requests.get( url=url, headers=headers, params=params, verify=False if os.getenv("NOMAD_SKIP_VERIFY", "false").lower() == "true" else True ) # Check if the request was successful if response.status_code == 200: return response.json() else: raise ValueError(f"Failed to list jobs: {response.text}") except Exception as e: logger.error(f"Failed to list jobs: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to list jobs: {str(e)}") def start_job(self, job_spec: Dict[str, Any]) -> Dict[str, Any]: """ Start a job using the provided specification. Args: job_spec: The job specification to submit. Can be a raw job spec or wrapped in a "Job" key. Returns: Dict containing job_id, eval_id, status, and any warnings """ try: # Extract job ID from specification job_id = None if "Job" in job_spec: job_id = job_spec["Job"].get("ID") or job_spec["Job"].get("id") else: job_id = job_spec.get("ID") or job_spec.get("id") if not job_id: raise ValueError("Job ID is required in the job specification") logger.info(f"Processing job start request for job ID: {job_id}") # Determine the namespace to use, with clear priorities: # 1. Explicitly provided in the job spec (highest priority) # 2. Service instance namespace # 3. Fallback to "development" namespace = self.namespace # Normalize the job structure to ensure it has a "Job" wrapper normalized_job_spec = {} if "Job" in job_spec: normalized_job_spec = job_spec # Check if namespace is specified in the job spec if "Namespace" in job_spec["Job"]: namespace = job_spec["Job"]["Namespace"] logger.info(f"Using namespace from job spec: {namespace}") else: # Check if namespace is specified in the job spec if "Namespace" in job_spec: namespace = job_spec["Namespace"] logger.info(f"Using namespace from job spec: {namespace}") # Wrap the job spec in a "Job" key normalized_job_spec = {"Job": job_spec} # Replace wildcard namespaces with the default if namespace == "*": namespace = "development" logger.info(f"Replaced wildcard namespace with default: {namespace}") # Always explicitly set the namespace in the job spec normalized_job_spec["Job"]["Namespace"] = namespace logger.info(f"Submitting job {job_id} to namespace {namespace}") logger.info(f"Job specification structure: {list(normalized_job_spec.keys())}") logger.info(f"Job keys: {list(normalized_job_spec['Job'].keys())}") # Submit the job - pass the job_id and job spec directly # The namespace is already set in the job spec response = self.client.job.register_job(job_id, normalized_job_spec) logger.info(f"Job registration response: {response}") return { "job_id": job_id, "eval_id": response.get("EvalID"), "status": "started", "warnings": response.get("Warnings"), "namespace": namespace } except Exception as e: logger.error(f"Failed to start job: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to start job: {str(e)}") def stop_job(self, job_id: str, purge: bool = False) -> Dict[str, Any]: """ Stop a job by ID. Args: job_id: The ID of the job to stop purge: If true, the job will be purged from Nomad's state entirely Returns: Dict containing job_id, eval_id, and status """ try: logger.info(f"Stopping job {job_id} in namespace {self.namespace} (purge={purge})") # Get the Nomad address from the client nomad_addr = f"http://{self.client.host}:{self.client.port}" # Build the URL for the job endpoint url = f"{nomad_addr}/v1/job/{job_id}" # Set up headers headers = {} if hasattr(self.client, 'token') and self.client.token: headers["X-Nomad-Token"] = self.client.token # Set up params with the correct namespace and purge option params = { "namespace": self.namespace, "purge": str(purge).lower() } # Make the request directly import requests response = requests.delete( url=url, headers=headers, params=params, verify=False if os.getenv("NOMAD_SKIP_VERIFY", "false").lower() == "true" else True ) # Check if the request was successful if response.status_code == 200: response_data = response.json() logger.info(f"Job stop response: {response_data}") return { "job_id": job_id, "eval_id": response_data.get("EvalID"), "status": "stopped", "namespace": self.namespace } else: raise ValueError(f"Failed to stop job: {response.text}") except Exception as e: logger.error(f"Failed to stop job {job_id}: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to stop job: {str(e)}") def get_allocations(self, job_id: str) -> List[Dict[str, Any]]: """Get all allocations for a job.""" try: # Get the Nomad address from the client nomad_addr = f"http://{self.client.host}:{self.client.port}" # Build the URL for the job allocations endpoint url = f"{nomad_addr}/v1/job/{job_id}/allocations" # Set up headers headers = {} if hasattr(self.client, 'token') and self.client.token: headers["X-Nomad-Token"] = self.client.token # Set up params with the correct namespace params = {"namespace": self.namespace} # Make the request directly import requests response = requests.get( url=url, headers=headers, params=params, verify=False if os.getenv("NOMAD_SKIP_VERIFY", "false").lower() == "true" else True ) # Check if the request was successful if response.status_code == 200: return response.json() elif response.status_code == 404: logger.warning(f"No allocations found for job {job_id}") return [] else: raise ValueError(f"Failed to get allocations: {response.text}") except Exception as e: logger.error(f"Failed to get allocations for job {job_id}: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to get allocations: {str(e)}") def get_allocation_logs(self, alloc_id: str, task: str, log_type: str = "stderr") -> str: """Get logs for a specific allocation and task.""" try: # More detailed debugging to understand what's happening logger.info(f"Getting logs for allocation {alloc_id}, task {task}, type {log_type}") if alloc_id == "repository": logger.error("Invalid allocation ID 'repository' detected") return f"Error: Invalid allocation ID 'repository'" # Verify the allocation ID is a valid UUID (must be 36 characters) if not alloc_id or len(alloc_id) != 36: logger.error(f"Invalid allocation ID format: {alloc_id} (length: {len(alloc_id) if alloc_id else 0})") return f"Error: Invalid allocation ID format - must be 36 character UUID" # Get allocation info to verify it exists try: allocation = self.client.allocation.get_allocation(alloc_id) if not allocation: logger.warning(f"Allocation {alloc_id} not found") return f"Allocation {alloc_id} not found" except Exception as e: logger.error(f"Error checking allocation: {str(e)}") return f"Error checking allocation: {str(e)}" # Try multiple approaches to get logs log_content = None error_messages = [] # Approach 1: Standard API try: logger.info(f"Attempting to get logs using standard API") logs = self.client.allocation.logs.get_logs( alloc_id, task, log_type, plain=True ) if logs: if isinstance(logs, dict) and logs.get("Data"): log_content = logs.get("Data") logger.info(f"Successfully retrieved logs using standard API") elif isinstance(logs, str): log_content = logs logger.info(f"Successfully retrieved logs as string") else: error_messages.append(f"Unexpected log format: {type(logs)}") logger.warning(f"Unexpected log format: {type(logs)}") else: error_messages.append("No logs returned from standard API") logger.warning("No logs returned from standard API") except Exception as e: error_str = str(e) error_messages.append(f"Standard API error: {error_str}") logger.warning(f"Standard API failed: {error_str}") # Approach 2: Try raw HTTP if the standard API didn't work if not log_content: try: import requests # Get the Nomad address from environment or use default nomad_addr = os.getenv("NOMAD_ADDR", "http://localhost:4646").rstrip('/') nomad_token = os.getenv("NOMAD_TOKEN") # Construct the URL for logs logs_url = f"{nomad_addr}/v1/client/fs/logs/{alloc_id}" # Setup headers headers = {} if nomad_token: headers["X-Nomad-Token"] = nomad_token # Setup query parameters params = { "task": task, "type": log_type, "plain": "true" } if self.namespace and self.namespace != "*": params["namespace"] = self.namespace logger.info(f"Attempting to get logs using direct HTTP request to: {logs_url}") response = requests.get(logs_url, headers=headers, params=params, verify=False) if response.status_code == 200: log_content = response.text logger.info(f"Successfully retrieved logs using direct HTTP request") else: error_messages.append(f"HTTP request failed with status {response.status_code}: {response.text}") logger.warning(f"HTTP request failed: {response.status_code} - {response.text}") except ImportError: error_messages.append("Requests library not available for fallback HTTP request") logger.warning("Requests library not available for fallback HTTP request") except Exception as e: error_str = str(e) error_messages.append(f"HTTP request error: {error_str}") logger.warning(f"HTTP request failed: {error_str}") # Approach 3: Direct system call as a last resort if not log_content: try: import subprocess # Get the Nomad command-line client path nomad_cmd = "nomad" # Default, assumes nomad is in PATH # Build the command cmd_parts = [ nomad_cmd, "alloc", "logs", "-verbose", ] # Add namespace if specified if self.namespace and self.namespace != "*": cmd_parts.extend(["-namespace", self.namespace]) # Add allocation and task info cmd_parts.extend(["-job", alloc_id, task]) # Use stderr or stdout if log_type == "stderr": cmd_parts.append("-stderr") else: cmd_parts.append("-stdout") logger.info(f"Attempting to get logs using command: {' '.join(cmd_parts)}") process = subprocess.run(cmd_parts, capture_output=True, text=True) if process.returncode == 0: log_content = process.stdout logger.info(f"Successfully retrieved logs using command-line client") else: error_messages.append(f"Command-line client failed: {process.stderr}") logger.warning(f"Command-line client failed: {process.stderr}") except Exception as e: error_str = str(e) error_messages.append(f"Command-line client error: {error_str}") logger.warning(f"Command-line client failed: {error_str}") # Return the logs if we got them, otherwise return error if log_content: return log_content else: error_msg = "; ".join(error_messages) logger.error(f"Failed to get logs after multiple attempts: {error_msg}") return f"Error retrieving {log_type} logs: {error_msg}" except Exception as e: error_str = str(e) logger.error(f"Failed to get logs for allocation {alloc_id}, task {task}: {error_str}") raise HTTPException(status_code=500, detail=f"Failed to get logs: {error_str}") def get_deployment_status(self, job_id: str) -> Dict[str, Any]: """Get the deployment status for a job.""" try: return self.client.job.get_deployment(job_id, namespace=self.namespace) except Exception as e: logger.error(f"Failed to get deployment status for job {job_id}: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to get deployment status: {str(e)}") def get_job_evaluations(self, job_id: str) -> List[Dict[str, Any]]: """Get evaluations for a job.""" try: return self.client.job.get_evaluations(job_id, namespace=self.namespace) except Exception as e: logger.error(f"Failed to get evaluations for job {job_id}: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to get evaluations: {str(e)}")