Source code for agentscope_runtime.engine.deployers.kubernetes_deployer

# -*- coding: utf-8 -*-
import logging
import os
from datetime import datetime
from typing import Optional, Dict, List, Union, Any

from pydantic import BaseModel, Field

from agentscope_runtime.engine.deployers.state import Deployment
from .adapter.protocol_adapter import ProtocolAdapter
from .base import DeployManager
from .utils.docker_image_utils import (
    ImageFactory,
    RegistryConfig,
)
from .utils.k8s_utils import isLocalK8sEnvironment
from ...common.container_clients.kubernetes_client import (
    KubernetesClient,
)

logger = logging.getLogger(__name__)


[docs] class K8sConfig(BaseModel): # Kubernetes settings k8s_namespace: Optional[str] = Field( "agentscope-runtime", description="Kubernetes namespace to deploy pods. Required if " "container_deployment is 'k8s'.", ) kubeconfig_path: Optional[str] = Field( None, description="Path to kubeconfig file. If not set, will try " "in-cluster config or default kubeconfig.", )
[docs] class BuildConfig(BaseModel): """Build configuration""" build_context_dir: Optional[str] = None # None allows caching dockerfile_template: str = None build_timeout: int = 600 # 10 minutes push_timeout: int = 300 # 5 minutes cleanup_after_build: bool = True
[docs] class KubernetesDeployManager(DeployManager): """Kubernetes deployer for agent services"""
[docs] def __init__( self, kube_config: K8sConfig = None, registry_config: RegistryConfig = RegistryConfig(), use_deployment: bool = True, build_context_dir: Optional[str] = None, state_manager=None, ): super().__init__(state_manager=state_manager) self.kubeconfig = kube_config self.registry_config = registry_config self.image_factory = ImageFactory() self.use_deployment = use_deployment self.build_context_dir = build_context_dir self._built_images = {} self.k8s_client = KubernetesClient( config=self.kubeconfig, image_registry=self.registry_config.get_full_url(), )
[docs] @staticmethod def get_service_endpoint( service_external_ip: Optional[str], service_port: Optional[Union[int, list]], fallback_host: str = "127.0.0.1", ) -> str: """ Auto-select appropriate service endpoint based on detected environment. Solves the common issue where Kubernetes LoadBalancer/ExternalIP is not reachable from localhost in local clusters (e.g., Minikube/Kind). Args: service_external_ip: ExternalIP or LoadBalancer IP from Service service_port: Target port fallback_host: Host to use in local environments (default: 127.0.0.1) Returns: str: Full HTTP endpoint URL: http://<host>:<port> Example: >>> endpoint = get_service_endpoint('192.168.5.1', 8080) >>> # In local env → 'http://127.0.0.1:8080' >>> # In cloud env → 'http://192.168.5.1:8080' """ if not service_external_ip: service_external_ip = "127.0.0.1" if not service_port: service_port = 8080 if isinstance(service_port, list): service_port = service_port[0] if isLocalK8sEnvironment(): host = fallback_host logger.info( f"Local K8s environment detected; using {host} instead of " f"{service_external_ip}", ) else: host = service_external_ip logger.info( f"Cloud/remote environment detected; using External IP: " f"{host}", ) return f"http://{host}:{service_port}"
[docs] @staticmethod def get_resource_name(deploy_id: str) -> str: return f"agent-{deploy_id[:8]}"
[docs] async def deploy( self, app=None, runner=None, entrypoint: Optional[str] = None, endpoint_path: str = "/process", stream: bool = True, custom_endpoints: Optional[List[Dict]] = None, protocol_adapters: Optional[list[ProtocolAdapter]] = None, requirements: Optional[Union[str, List[str]]] = None, extra_packages: Optional[List[str]] = None, base_image: str = "python:3.9-slim", environment: Dict = None, runtime_config: Dict = None, port: int = 8090, replicas: int = 1, mount_dir: str = None, image_name: str = "agent_llm", image_tag: str = "latest", push_to_registry: bool = False, use_cache: bool = True, **kwargs, ) -> Dict[str, Any]: """ Deploy runner to Kubernetes. All temporary files are created in cwd/.agentscope_runtime/ by default. Args: app: Agent app to be deployed runner: Complete Runner object with agent, environment_manager, context_manager entrypoint: Entrypoint spec (e.g., "app.py" or "app.py:handler") endpoint_path: API endpoint path stream: Enable streaming responses custom_endpoints: Custom endpoints from agent app protocol_adapters: protocol adapters requirements: PyPI dependencies (following _agent_engines.py pattern) extra_packages: User code directory/file path base_image: Docker base image port: Container port replicas: Number of replicas environment: Environment variables dict mount_dir: Mount directory runtime_config: K8s runtime configuration use_cache: Enable build cache (default: True) # Backward compatibility image_name: Image name image_tag: Image tag push_to_registry: Push to registry **kwargs: Additional arguments Returns: Dict containing deploy_id, url, resource_name, replicas Raises: RuntimeError: If deployment fails """ created_resources = [] deploy_id = self.deploy_id try: logger.info(f"Starting deployment {deploy_id}") # Step 1: Build image with proper error handling logger.info("Building runner image...") try: built_image_name = self.image_factory.build_image( app=app, runner=runner, entrypoint=entrypoint, requirements=requirements, extra_packages=extra_packages or [], base_image=base_image, stream=stream, endpoint_path=endpoint_path, build_context_dir=self.build_context_dir, registry_config=self.registry_config, image_name=image_name, image_tag=image_tag, push_to_registry=push_to_registry, port=port, protocol_adapters=protocol_adapters, custom_endpoints=custom_endpoints, use_cache=use_cache, **kwargs, ) if not built_image_name: raise RuntimeError( "Image build failed - no image name returned", ) created_resources.append(f"image:{built_image_name}") self._built_images[deploy_id] = built_image_name logger.info(f"Image built successfully: {built_image_name}") except Exception as e: logger.error(f"Image build failed: {e}") raise RuntimeError(f"Failed to build image: {e}") from e if mount_dir: if not os.path.isabs(mount_dir): mount_dir = os.path.abspath(mount_dir) if mount_dir: volume_bindings = { mount_dir: { "bind": mount_dir, "mode": "rw", }, } else: volume_bindings = {} resource_name = self.get_resource_name(deploy_id) logger.info(f"Building kubernetes deployment for {deploy_id}") # Create Deployment _id, ports, ip = self.k8s_client.create_deployment( image=built_image_name, name=resource_name, ports=[port], volumes=volume_bindings, environment=environment, runtime_config=runtime_config or {}, replicas=replicas, create_service=True, ) if not _id: import traceback raise RuntimeError( f"Failed to create resource: " f"{resource_name}, {traceback.format_exc()}", ) if ports: url = self.get_service_endpoint(ip, ports) else: url = self.get_service_endpoint(ip, port) logger.info(f"Deployment {deploy_id} successful: {url}") deployment = Deployment( id=deploy_id, platform="k8s", url=url, status="running", created_at=datetime.now().isoformat(), agent_source=kwargs.get("agent_source"), config={ "service_name": _id, "image": built_image_name, "replicas": replicas if self.use_deployment else 1, "runner": runner.__class__.__name__ if runner else None, "extra_packages": extra_packages, "requirements": requirements, "base_image": base_image, "port": port, "environment": environment, "runtime_config": runtime_config, "endpoint_path": endpoint_path, "stream": stream, }, ) self.state_manager.save(deployment) return { "deploy_id": deploy_id, "url": url, "resource_name": resource_name, "replicas": replicas, } except Exception as e: import traceback logger.error(f"Deployment {deploy_id} failed: {e}") # Enhanced rollback with better error handling raise RuntimeError( f"Deployment failed: {e}, {traceback.format_exc()}", ) from e
[docs] async def stop( self, deploy_id: str, **kwargs, ) -> Dict[str, Any]: """Stop Kubernetes deployment. Args: deploy_id: Deployment identifier **kwargs: Additional parameters Returns: Dict with success status, message, and details """ # Derive resource name from deploy_id resource_name = self.get_resource_name(deploy_id) try: # Try to remove the deployment success = self.k8s_client.remove_deployment(resource_name) if success: # Remove from state manager try: self.state_manager.update_status(deploy_id, "stopped") except KeyError: logger.debug( f"Deployment {deploy_id} not found " f"in state (already removed)", ) return { "success": True, "message": f"Kubernetes deployment {resource_name} " f"removed", "details": { "deploy_id": deploy_id, "resource_name": resource_name, }, } else: return { "success": False, "message": f"Kubernetes deployment {resource_name} not " f"found (may already be deleted), Please check the " f"detail in cluster", "details": { "deploy_id": deploy_id, "resource_name": resource_name, }, } except Exception as e: logger.error( f"Failed to remove K8s deployment {resource_name}: {e}", ) return { "success": False, "message": f"Failed to remove K8s deployment: {e}", "details": { "deploy_id": deploy_id, "resource_name": resource_name, "error": str(e), }, }
[docs] def get_status(self) -> str: """Get deployment status""" deployment = self.state_manager.get(self.deploy_id) if not deployment: return "not_found" # Get service_name from config config = getattr(deployment, "config", {}) service_name = config.get("service_name") if not service_name: return "unknown" return self.k8s_client.get_deployment_status(service_name)