#!/usr/bin/env python """ Test script to directly use the Nomad client library. """ import os import sys import uuid import nomad from dotenv import load_dotenv # Load environment variables from .env file load_dotenv() def get_test_job_spec(job_id): """Create a simple test job specification.""" return { "Job": { "ID": job_id, "Name": job_id, "Type": "service", "Datacenters": ["jm"], "Namespace": "development", "Priority": 50, "TaskGroups": [ { "Name": "app", "Count": 1, "Tasks": [ { "Name": "nginx", "Driver": "docker", "Config": { "image": "nginx:latest", "ports": ["http"], }, "Resources": { "CPU": 100, "MemoryMB": 128 } } ], "Networks": [ { "DynamicPorts": [ { "Label": "http", "Value": 0, "To": 80 } ] } ] } ] } } def main(): print("Testing direct Nomad client...") # Check if NOMAD_ADDR is configured nomad_addr = os.getenv("NOMAD_ADDR") if not nomad_addr: print("Error: NOMAD_ADDR is not configured in .env file.") sys.exit(1) print(f"Connecting to Nomad at: {nomad_addr}") try: # Extract host and port from the address host_with_port = nomad_addr.replace("http://", "").replace("https://", "") host = host_with_port.split(":")[0] # Safely extract port port_part = host_with_port.split(":")[-1] if ":" in host_with_port else "4646" port = int(port_part.split('/')[0]) # Remove any path components # Initialize the Nomad client client = nomad.Nomad( host=host, port=port, secure=nomad_addr.startswith("https"), timeout=10, namespace="development", # Set namespace explicitly verify=False ) # Create a unique job ID for testing job_id = f"test-job-{uuid.uuid4().hex[:8]}" print(f"Created test job ID: {job_id}") # Create job specification job_spec = get_test_job_spec(job_id) print("Created job specification with explicit namespace: development") # Start the job print(f"Attempting to start job {job_id}...") # Print the job spec for debugging print(f"Job spec structure: {list(job_spec.keys())}") print(f"Job keys: {list(job_spec['Job'].keys())}") # Register the job response = client.job.register_job(job_id, job_spec) print(f"Job registration response: {response}") print(f"Job {job_id} started successfully!") # Clean up - stop the job print(f"Stopping job {job_id}...") stop_response = client.job.deregister_job(job_id, purge=True) print(f"Job stop response: {stop_response}") print(f"Job {job_id} stopped and purged successfully!") print("\nDirect Nomad client test completed successfully.") except Exception as e: print(f"Error during direct Nomad client test: {str(e)}") sys.exit(1) if __name__ == "__main__": main()