Files
nomad_mcp/test_direct_nomad.py
2025-02-26 15:25:39 +07:00

123 lines
3.8 KiB
Python

#!/usr/bin/env python
"""
Test script to directly use the Nomad client library.
"""
import os
import sys
import uuid
import nomad
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
def get_test_job_spec(job_id):
"""Create a simple test job specification."""
return {
"Job": {
"ID": job_id,
"Name": job_id,
"Type": "service",
"Datacenters": ["jm"],
"Namespace": "development",
"Priority": 50,
"TaskGroups": [
{
"Name": "app",
"Count": 1,
"Tasks": [
{
"Name": "nginx",
"Driver": "docker",
"Config": {
"image": "nginx:latest",
"ports": ["http"],
},
"Resources": {
"CPU": 100,
"MemoryMB": 128
}
}
],
"Networks": [
{
"DynamicPorts": [
{
"Label": "http",
"Value": 0,
"To": 80
}
]
}
]
}
]
}
}
def main():
print("Testing direct Nomad client...")
# Check if NOMAD_ADDR is configured
nomad_addr = os.getenv("NOMAD_ADDR")
if not nomad_addr:
print("Error: NOMAD_ADDR is not configured in .env file.")
sys.exit(1)
print(f"Connecting to Nomad at: {nomad_addr}")
try:
# Extract host and port from the address
host_with_port = nomad_addr.replace("http://", "").replace("https://", "")
host = host_with_port.split(":")[0]
# Safely extract port
port_part = host_with_port.split(":")[-1] if ":" in host_with_port else "4646"
port = int(port_part.split('/')[0]) # Remove any path components
# Initialize the Nomad client
client = nomad.Nomad(
host=host,
port=port,
secure=nomad_addr.startswith("https"),
timeout=10,
namespace="development", # Set namespace explicitly
verify=False
)
# Create a unique job ID for testing
job_id = f"test-job-{uuid.uuid4().hex[:8]}"
print(f"Created test job ID: {job_id}")
# Create job specification
job_spec = get_test_job_spec(job_id)
print("Created job specification with explicit namespace: development")
# Start the job
print(f"Attempting to start job {job_id}...")
# Print the job spec for debugging
print(f"Job spec structure: {list(job_spec.keys())}")
print(f"Job keys: {list(job_spec['Job'].keys())}")
# Register the job
response = client.job.register_job(job_id, job_spec)
print(f"Job registration response: {response}")
print(f"Job {job_id} started successfully!")
# Clean up - stop the job
print(f"Stopping job {job_id}...")
stop_response = client.job.deregister_job(job_id, purge=True)
print(f"Job stop response: {stop_response}")
print(f"Job {job_id} stopped and purged successfully!")
print("\nDirect Nomad client test completed successfully.")
except Exception as e:
print(f"Error during direct Nomad client test: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()