Source code for agentscope_runtime.engine.deployers.local_deployer

# -*- coding: utf-8 -*-
import asyncio
import json
import logging
import socket
import threading
import time
import uuid
from contextlib import asynccontextmanager
from typing import Optional, Dict, Any, Callable, Type, Tuple, Union

import uvicorn
from fastapi import FastAPI, HTTPException, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel

from .base import DeployManager
from .adapter.protocol_adapter import ProtocolAdapter
from ..schemas.agent_schemas import AgentRequest, AgentResponse, Error


[docs] class LocalDeployManager(DeployManager):
[docs] def __init__(self, host: str = "localhost", port: int = 8090): super().__init__() self.host = host self.port = port self._server = None self._server_task = None self._server_thread = None # Add thread for server self._is_running = False self._logger = logging.getLogger(__name__) self._app = None self._startup_timeout = 30 # seconds self._shutdown_timeout = 10 # seconds self._setup_logging()
def _setup_logging(self): formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) app_logger = logging.getLogger("app") app_logger.setLevel(logging.INFO) file_handler = logging.handlers.RotatingFileHandler( "app.log", maxBytes=10 * 1024 * 1024, # 10MB backupCount=5, ) file_handler.setFormatter(formatter) app_logger.addHandler(file_handler) console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) app_logger.addHandler(console_handler) access_logger = logging.getLogger("access") access_logger.setLevel(logging.INFO) access_file_handler = logging.handlers.RotatingFileHandler( "access.log", maxBytes=10 * 1024 * 1024, backupCount=5, ) access_file_handler.setFormatter( logging.Formatter("%(asctime)s - %(message)s"), ) access_logger.addHandler(access_file_handler) self.app_logger = app_logger self.access_logger = access_logger def _create_fastapi_app(self) -> FastAPI: """Create and configure FastAPI application with lifespan management.""" @asynccontextmanager async def lifespan(app: FastAPI) -> Any: """Manage the application lifespan.""" if hasattr(self, "before_start") and self.before_start: if asyncio.iscoroutinefunction(self.before_start): await self.before_start(app, **getattr(self, "kwargs", {})) else: self.before_start(app, **getattr(self, "kwargs", {})) yield if hasattr(self, "after_finish") and self.after_finish: if asyncio.iscoroutinefunction(self.after_finish): await self.after_finish(app, **getattr(self, "kwargs", {})) else: self.after_finish(app, **getattr(self, "kwargs", {})) app = FastAPI( title="Agent Service", version="1.0.0", description="Production-ready Agent Service API", lifespan=lifespan, ) self._add_middleware(app) self._add_health_endpoints(app) if hasattr(self, "func") and self.func: self._add_main_endpoint(app) return app def _add_middleware(self, app: FastAPI) -> None: """Add middleware to the FastAPI application.""" @app.middleware("http") async def log_requests(request: Request, call_next): start_time = time.time() self.app_logger.info(f"Request: {request.method} {request.url}") response = await call_next( request, ) process_time = time.time() - start_time self.access_logger.info( f'{request.client.host} - "{request.method} {request.url}" ' f"{response.status_code} - {process_time:.3f}s", ) return response @app.middleware("http") async def custom_middleware( request: Request, call_next: Callable, ) -> Response: """Custom middleware for request processing.""" response: Response = await call_next(request) return response app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) def _add_health_endpoints(self, app: FastAPI) -> None: """Add health check endpoints to the FastAPI application.""" @app.get("/health") async def health_check(): return { "status": "healthy", "timestamp": time.time(), "service": "agent-service", } @app.get("/readiness") async def readiness() -> str: """Check if the application is ready to serve requests.""" if getattr(app.state, "is_ready", True): return "success" raise HTTPException( status_code=500, detail="Application is not ready", ) @app.get("/liveness") async def liveness() -> str: """Check if the application is alive and healthy.""" if getattr(app.state, "is_healthy", True): return "success" raise HTTPException( status_code=500, detail="Application is not healthy", ) @app.get("/") async def root(): return {"message": "Agent Service is running"} def _add_main_endpoint(self, app: FastAPI) -> None: """Add the main processing endpoint to the FastAPI application.""" async def _get_request_info(request: Request) -> Tuple[Dict, Any, str]: """Extract request information from the HTTP request.""" body = await request.body() request_body = json.loads(body.decode("utf-8")) if body else {} user_id = request_body.get("user_id", "") if hasattr(self, "request_model") and self.request_model: try: request_body_obj = self.request_model.model_validate( request_body, ) except Exception as e: raise HTTPException( status_code=400, detail=f"Invalid request format: {e}", ) from e else: request_body_obj = request_body query_params = dict(request.query_params) return query_params, request_body_obj, user_id def _get_request_id(request_body_obj: Any) -> str: """Extract or generate a request ID from the request body.""" if hasattr(request_body_obj, "header") and hasattr( request_body_obj.header, "request_id", ): request_id = request_body_obj.header.request_id elif ( isinstance( request_body_obj, dict, ) and "request_id" in request_body_obj ): request_id = request_body_obj["request_id"] else: request_id = str(uuid.uuid4()) return request_id @app.post(self.endpoint_path) async def main_endpoint(request: Request): """Main endpoint handler for processing requests.""" try: ( _, # query_params request_body_obj, user_id, ) = await _get_request_info( request=request, ) request_id = _get_request_id(request_body_obj) if ( hasattr( self, "response_type", ) and self.response_type == "sse" ): return self._handle_sse_response( user_id=user_id, request_body_obj=request_body_obj, request_id=request_id, ) else: return await self._handle_standard_response( user_id=user_id, request_body_obj=request_body_obj, request_id=request_id, ) except Exception as e: self._logger.error(f"Request processing failed: {e}") raise HTTPException(status_code=500, detail=str(e)) from e def _handle_sse_response( self, user_id: str, request_body_obj: Any, request_id: str, ) -> StreamingResponse: """Handle Server-Sent Events response.""" async def stream_generator(): """Generate streaming response data.""" try: if asyncio.iscoroutinefunction(self.func): async for output in self.func( user_id=user_id, request=request_body_obj, request_id=request_id, ): _data = self._create_success_result( output=output, ) yield f"data: {_data}\n\n" else: # For sync functions, we need to handle differently result = self.func( user_id=user_id, request=request_body_obj, request_id=request_id, ) if hasattr(result, "__aiter__"): async for output in result: _data = self._create_success_result( output=output, ) yield f"data: {_data}\n\n" else: _data = self._create_success_result( output=result, ) yield f"data: {_data}\n\n" except Exception as e: _data = self._create_error_response( request_id=request_id, error=e, ) yield f"data: {_data}\n\n" return StreamingResponse( stream_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", }, ) async def _handle_standard_response( self, user_id: str, request_body_obj: Any, request_id: str, ): """Handle standard JSON response.""" try: if asyncio.iscoroutinefunction(self.func): result = await self.func( user_id=user_id, request=request_body_obj, request_id=request_id, ) else: result = self.func( user_id=user_id, request=request_body_obj, request_id=request_id, ) return self._create_success_result( output=result, ) except Exception as e: return self._create_error_response(request_id=request_id, error=e) def _create_success_result( self, output: Union[BaseModel, Dict, str], ) -> str: """Create a success response.""" if isinstance(output, BaseModel): return output.model_dump_json() elif isinstance(output, dict): return json.dumps(output) else: return output def _create_error_response( self, request_id: str, error: Exception, ) -> str: """Create an error response.""" response = AgentResponse(id=request_id) response.failed(Error(code=str(error), message=str(error))) return response.model_dump_json()
[docs] def deploy_sync( self, func: Callable, endpoint_path: str = "/process", request_model: Optional[Type] = AgentRequest, response_type: str = "sse", before_start: Optional[Callable] = None, after_finish: Optional[Callable] = None, **kwargs: Any, ) -> Dict[str, str]: """ Deploy the agent as a FastAPI service (synchronous version). Args: func: Custom processing function endpoint_path: API endpoint path for the processing function request_model: Pydantic model for request validation response_type: Response type - "json", "sse", or "text" before_start: Callback function called before server starts after_finish: Callback function called after server finishes **kwargs: Additional keyword arguments passed to callbacks Returns: Dict[str, str]: Dictionary containing deploy_id and url of the deployed service Raises: RuntimeError: If deployment fails """ return asyncio.run( self._deploy_async( func=func, endpoint_path=endpoint_path, request_model=request_model, response_type=response_type, before_start=before_start, after_finish=after_finish, **kwargs, ), )
[docs] async def deploy( self, func: Callable, endpoint_path: str = "/process", request_model: Optional[Type] = AgentRequest, response_type: str = "sse", before_start: Optional[Callable] = None, after_finish: Optional[Callable] = None, protocol_adapters: Optional[list[ProtocolAdapter]] = None, **kwargs: Any, ) -> Dict[str, str]: """ Deploy the agent as a FastAPI service (asynchronous version). Args: func: Custom processing function endpoint_path: API endpoint path for the processing function request_model: Pydantic model for request validation response_type: Response type - "json", "sse", or "text" before_start: Callback function called before server starts after_finish: Callback function called after server finishes **kwargs: Additional keyword arguments passed to callbacks Returns: Dict[str, str]: Dictionary containing deploy_id and url of the deployed service Raises: RuntimeError: If deployment fails """ return await self._deploy_async( func=func, endpoint_path=endpoint_path, request_model=request_model, response_type=response_type, before_start=before_start, after_finish=after_finish, protocol_adapters=protocol_adapters, **kwargs, )
async def _deploy_async( self, func: Callable, endpoint_path: str = "/process", request_model: Optional[Type] = None, response_type: str = "sse", before_start: Optional[Callable] = None, after_finish: Optional[Callable] = None, protocol_adapters: Optional[list[ProtocolAdapter]] = None, **kwargs: Any, ) -> Dict[str, str]: if self._is_running: raise RuntimeError("Service is already running") try: self._logger.info("Starting FastAPI service deployment...") # Store callable configuration self.func = func self.endpoint_path = endpoint_path self.request_model = request_model self.response_type = response_type self.before_start = before_start self.after_finish = after_finish self.kwargs = kwargs # Create FastAPI app self._app = self._create_fastapi_app() # Support extension protocol if protocol_adapters: for protocol_adapter in protocol_adapters: protocol_adapter.add_endpoint(app=self._app, func=func) # Configure uvicorn server config = uvicorn.Config( self._app, host=self.host, port=self.port, log_level="info", access_log=False, timeout_keep_alive=30, ) self._server = uvicorn.Server(config) # Run the server in a separate thread self._server_thread = threading.Thread(target=self._server.run) self._server_thread.daemon = ( True # Ensure thread doesn't block exit ) self._server_thread.start() # Wait for server to start with timeout start_time = time.time() while not self._is_server_ready(): if time.time() - start_time > self._startup_timeout: # Clean up the thread if server fails to start if self._server: self._server.should_exit = True self._server_thread.join(timeout=self._shutdown_timeout) raise RuntimeError( f"Server startup timeout after " f"{self._startup_timeout} seconds", ) await asyncio.sleep(0.1) self._is_running = True url = f"http://{self.host}:{self.port}" self._logger.info( f"FastAPI service deployed successfully at {url}", ) return { "deploy_id": self.deploy_id, "url": url, } except Exception as e: self._logger.error(f"Deployment failed: {e}") await self._cleanup_server() raise RuntimeError(f"Failed to deploy FastAPI service: {e}") from e def _is_server_ready(self) -> bool: """Check if the server is ready to accept connections.""" try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(0.1) result = s.connect_ex((self.host, self.port)) return result == 0 except Exception: return False
[docs] async def stop(self) -> None: """ Stop the FastAPI service. Raises: RuntimeError: If stopping fails """ if not self._is_running: self._logger.warning("Service is not running") return try: self._logger.info("Stopping FastAPI service...") # Stop the server gracefully if self._server: self._server.should_exit = True # Wait for the server thread to finish if self._server_thread and self._server_thread.is_alive(): self._server_thread.join(timeout=self._shutdown_timeout) if self._server_thread.is_alive(): self._logger.warning( "Server thread did not terminate, " "potential resource leak", ) await self._cleanup_server() self._is_running = False self._logger.info("FastAPI service stopped successfully") except Exception as e: self._logger.error(f"Failed to stop service: {e}") raise RuntimeError(f"Failed to stop FastAPI service: {e}") from e
async def _cleanup_server(self): """Clean up server resources.""" self._server = None self._server_task = None self._server_thread = None self._app = None @property def is_running(self) -> bool: """Check if the service is currently running.""" return self._is_running @property def service_url(self) -> Optional[str]: """Get the current service URL if running.""" if self._is_running and self.port: return f"http://{self.host}:{self.port}" return None