import os import yaml import logging import json from typing import Dict, Any, Optional, List from fastapi import HTTPException from pathlib import Path from app.services.gitea_client import GiteaClient # Configure logging logger = logging.getLogger(__name__) # Default configs directory CONFIG_DIR = os.getenv("CONFIG_DIR", "./configs") class ConfigService: """Service for managing repository to job mappings.""" def __init__(self, config_dir: str = CONFIG_DIR): self.config_dir = Path(config_dir) self._ensure_config_dir() self.gitea_client = GiteaClient() def _ensure_config_dir(self): """Ensure the config directory exists.""" try: self.config_dir.mkdir(parents=True, exist_ok=True) except Exception as e: logger.error(f"Failed to create config directory {self.config_dir}: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to create config directory: {str(e)}") def list_configs(self) -> List[Dict[str, Any]]: """List all available configurations.""" configs = [] try: for file_path in self.config_dir.glob("*.yaml"): with open(file_path, "r") as f: config = yaml.safe_load(f) config["name"] = file_path.stem configs.append(config) return configs except Exception as e: logger.error(f"Failed to list configurations: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to list configurations: {str(e)}") def get_config(self, name: str) -> Dict[str, Any]: """Get a specific configuration by name.""" file_path = self.config_dir / f"{name}.yaml" try: if not file_path.exists(): raise HTTPException(status_code=404, detail=f"Configuration not found: {name}") with open(file_path, "r") as f: config = yaml.safe_load(f) config["name"] = name # Enrich with repository information if available if repository := config.get("repository"): repo_info = self.gitea_client.get_repository_info(repository) if repo_info: config["repository_info"] = { "description": repo_info.get("description"), "default_branch": repo_info.get("default_branch"), "stars": repo_info.get("stars_count"), "forks": repo_info.get("forks_count"), "owner": repo_info.get("owner", {}).get("login"), "html_url": repo_info.get("html_url"), } return config except HTTPException: raise except Exception as e: logger.error(f"Failed to read configuration {name}: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to read configuration: {str(e)}") def create_config(self, name: str, config: Dict[str, Any]) -> Dict[str, Any]: """Create a new configuration.""" file_path = self.config_dir / f"{name}.yaml" try: if file_path.exists(): raise HTTPException(status_code=409, detail=f"Configuration already exists: {name}") # Validate required fields required_fields = ["repository", "job_id"] for field in required_fields: if field not in config: raise HTTPException(status_code=400, detail=f"Missing required field: {field}") # Validate repository exists if Gitea integration is configured if not self.gitea_client.check_repository_exists(config["repository"]): raise HTTPException(status_code=400, detail=f"Repository not found: {config['repository']}") # Add name to the config config["name"] = name # Get repository alias if not provided if "repository_alias" not in config: try: owner, repo = self.gitea_client.parse_repo_url(config["repository"]) config["repository_alias"] = repo except: # Use job_id as fallback config["repository_alias"] = config["job_id"] # Write config to file with open(file_path, "w") as f: yaml.dump(config, f, default_flow_style=False) return config except HTTPException: raise except Exception as e: logger.error(f"Failed to create configuration {name}: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to create configuration: {str(e)}") def update_config(self, name: str, config: Dict[str, Any]) -> Dict[str, Any]: """Update an existing configuration.""" file_path = self.config_dir / f"{name}.yaml" try: if not file_path.exists(): raise HTTPException(status_code=404, detail=f"Configuration not found: {name}") # Read existing config with open(file_path, "r") as f: existing_config = yaml.safe_load(f) # Update with new values for key, value in config.items(): existing_config[key] = value # Validate repository exists if changed and Gitea integration is configured if "repository" in config and config["repository"] != existing_config.get("repository"): if not self.gitea_client.check_repository_exists(config["repository"]): raise HTTPException(status_code=400, detail=f"Repository not found: {config['repository']}") # Validate required fields required_fields = ["repository", "job_id"] for field in required_fields: if field not in existing_config: raise HTTPException(status_code=400, detail=f"Missing required field: {field}") # Add name to the config existing_config["name"] = name # Update repository alias if repository changed if "repository" in config and "repository_alias" not in config: try: owner, repo = self.gitea_client.parse_repo_url(existing_config["repository"]) existing_config["repository_alias"] = repo except: pass # Write config to file with open(file_path, "w") as f: yaml.dump(existing_config, f, default_flow_style=False) return existing_config except HTTPException: raise except Exception as e: logger.error(f"Failed to update configuration {name}: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to update configuration: {str(e)}") def delete_config(self, name: str) -> Dict[str, Any]: """Delete a configuration.""" file_path = self.config_dir / f"{name}.yaml" try: if not file_path.exists(): raise HTTPException(status_code=404, detail=f"Configuration not found: {name}") # Get the config before deleting with open(file_path, "r") as f: config = yaml.safe_load(f) config["name"] = name # Delete the file file_path.unlink() return {"name": name, "status": "deleted"} except HTTPException: raise except Exception as e: logger.error(f"Failed to delete configuration {name}: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to delete configuration: {str(e)}") def get_job_from_repository(self, repository: str) -> Optional[Dict[str, str]]: """Find job_id and namespace associated with a repository.""" try: for config in self.list_configs(): if config.get("repository") == repository or config.get("repository_alias") == repository: return { "job_id": config.get("job_id"), "namespace": config.get("namespace") } return None except Exception as e: logger.error(f"Failed to find job for repository {repository}: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to find job for repository: {str(e)}") def get_repository_from_job(self, job_id: str) -> Optional[str]: """Find repository associated with a job_id.""" try: for config in self.list_configs(): if config.get("job_id") == job_id: return config.get("repository") return None except Exception as e: logger.error(f"Failed to find repository for job {job_id}: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to find repository for job: {str(e)}") def get_config_by_repository(self, repository: str) -> Optional[Dict[str, Any]]: """Find configuration by repository URL or alias.""" try: for config in self.list_configs(): if config.get("repository") == repository or config.get("repository_alias") == repository: return self.get_config(config.get("name")) return None except Exception as e: logger.error(f"Failed to find config for repository {repository}: {str(e)}") return None def get_job_spec_from_repository(self, repository: str) -> Optional[Dict[str, Any]]: """Get job specification from repository config and template.""" try: # Get the repository configuration config = self.get_config_by_repository(repository) if not config: logger.error(f"No configuration found for repository: {repository}") return None # Check if the job template is specified job_template = config.get("job_template") if not job_template: logger.error(f"No job template specified for repository: {repository}") return None # Read the job template file template_path = Path(self.config_dir) / "templates" / f"{job_template}.json" if not template_path.exists(): logger.error(f"Job template not found: {job_template}") return None try: with open(template_path, "r") as f: job_spec = json.load(f) except Exception as e: logger.error(f"Failed to read job template {job_template}: {str(e)}") return None # Apply configuration parameters to the template job_spec["ID"] = config.get("job_id") job_spec["Name"] = config.get("job_id") # Apply other customizations from config if env_vars := config.get("environment_variables"): for task_group in job_spec.get("TaskGroups", []): for task in task_group.get("Tasks", []): if "Env" not in task: task["Env"] = {} task["Env"].update(env_vars) if meta := config.get("metadata"): job_spec["Meta"] = meta # Add repository info to the metadata if "Meta" not in job_spec: job_spec["Meta"] = {} job_spec["Meta"]["repository"] = repository # Override specific job parameters if specified in config if job_params := config.get("job_parameters"): for param_key, param_value in job_params.items(): # Handle nested parameters with dot notation (e.g., "TaskGroups.0.Tasks.0.Config.image") if "." in param_key: parts = param_key.split(".") current = job_spec for part in parts[:-1]: # Handle array indices if part.isdigit() and isinstance(current, list): current = current[int(part)] elif part in current: current = current[part] else: break else: # Only set the value if we successfully navigated the path current[parts[-1]] = param_value else: # Direct parameter job_spec[param_key] = param_value logger.info(f"Generated job specification for repository {repository} using template {job_template}") return job_spec except Exception as e: logger.error(f"Failed to get job specification for repository {repository}: {str(e)}") return None