Files
nomad_mcp/tests/test_nomad_service.py
2025-02-26 15:25:39 +07:00

193 lines
5.8 KiB
Python

import os
import pytest
import time
import uuid
from dotenv import load_dotenv
from app.services.nomad_client import NomadService
# Load environment variables
load_dotenv()
# Skip tests if Nomad server is not configured
nomad_addr = os.getenv("NOMAD_ADDR")
if not nomad_addr:
pytest.skip("NOMAD_ADDR not configured", allow_module_level=True)
# Test job ID prefix - each test will append a unique suffix
TEST_JOB_ID_PREFIX = "test-job-"
# Simple nginx job specification template for testing
def get_test_job_spec(job_id):
return {
"ID": job_id,
"Name": job_id,
"Type": "service",
"Datacenters": ["jm"], # Adjust to match your Nomad cluster
"Namespace": "development",
"Priority": 50,
"TaskGroups": [
{
"Name": "app",
"Count": 1,
"Tasks": [
{
"Name": "nginx",
"Driver": "docker",
"Config": {
"image": "nginx:latest",
"ports": ["http"],
},
"Resources": {
"CPU": 100,
"MemoryMB": 128
}
}
],
"Networks": [
{
"DynamicPorts": [
{
"Label": "http",
"Value": 0,
"To": 80
}
]
}
]
}
]
}
@pytest.fixture
def nomad_service():
"""Fixture to provide a NomadService instance."""
return NomadService()
@pytest.fixture
def test_job_id():
"""Fixture to provide a unique job ID for each test."""
job_id = f"{TEST_JOB_ID_PREFIX}{uuid.uuid4().hex[:8]}"
yield job_id
# Cleanup: ensure job is stopped after the test
try:
service = NomadService()
service.stop_job(job_id, purge=True)
print(f"Cleaned up job {job_id}")
except Exception as e:
print(f"Error cleaning up job {job_id}: {str(e)}")
def test_job_start_and_stop(nomad_service, test_job_id):
"""Test starting and stopping a job."""
# Create job specification
job_spec = get_test_job_spec(test_job_id)
# Start the job
start_response = nomad_service.start_job(job_spec)
assert start_response["job_id"] == test_job_id
assert start_response["status"] == "started"
assert "eval_id" in start_response
# Wait longer for job to be registered (increased from 2 to 10 seconds)
time.sleep(10)
# Verify job exists
job = nomad_service.get_job(test_job_id)
assert job["ID"] == test_job_id
# Stop the job
stop_response = nomad_service.stop_job(test_job_id)
assert stop_response["job_id"] == test_job_id
assert stop_response["status"] == "stopped"
# Wait for job to be stopped
time.sleep(5)
# Verify job is stopped
job = nomad_service.get_job(test_job_id)
assert job["Stop"] is True
def test_job_with_namespace(nomad_service, test_job_id):
"""Test job with explicit namespace."""
# Create job specification with explicit namespace
job_spec = get_test_job_spec(test_job_id)
job_spec["Namespace"] = "development"
# Start the job
start_response = nomad_service.start_job(job_spec)
assert start_response["job_id"] == test_job_id
assert start_response["namespace"] == "development"
# Wait longer for job to be registered (increased from 2 to 10 seconds)
time.sleep(10)
# Verify job exists in the correct namespace
job = nomad_service.get_job(test_job_id)
assert job["Namespace"] == "development"
# Clean up
nomad_service.stop_job(test_job_id)
def test_job_with_job_wrapper(nomad_service, test_job_id):
"""Test job specification already wrapped in 'Job' key."""
# Create job specification with Job wrapper
job_spec = {
"Job": get_test_job_spec(test_job_id)
}
# Start the job
start_response = nomad_service.start_job(job_spec)
assert start_response["job_id"] == test_job_id
# Wait longer for job to be registered (increased from 2 to 10 seconds)
time.sleep(10)
# Verify job exists
job = nomad_service.get_job(test_job_id)
assert job["ID"] == test_job_id
# Clean up
nomad_service.stop_job(test_job_id)
def test_list_jobs(nomad_service):
"""Test listing jobs."""
jobs = nomad_service.list_jobs()
assert isinstance(jobs, list)
# List should contain job details
if jobs:
assert "ID" in jobs[0]
assert "Status" in jobs[0]
def test_job_lifecycle(nomad_service, test_job_id):
"""Test the full job lifecycle - start, check status, get allocations, stop."""
# Start the job
job_spec = get_test_job_spec(test_job_id)
start_response = nomad_service.start_job(job_spec)
assert start_response["status"] == "started"
# Wait longer for job to be scheduled (increased from 5 to 15 seconds)
time.sleep(15)
# Check job status
job = nomad_service.get_job(test_job_id)
assert job["ID"] == test_job_id
# Get allocations
try:
allocations = nomad_service.get_allocations(test_job_id)
assert isinstance(allocations, list)
except Exception:
# It's possible allocations aren't available yet, which is okay for the test
pass
# Stop the job
stop_response = nomad_service.stop_job(test_job_id)
assert stop_response["status"] == "stopped"
# Wait longer for job to be stopped (increased from 2 to 5 seconds)
time.sleep(5)
# Verify job is stopped
job = nomad_service.get_job(test_job_id)
assert job["Stop"] is True