Source code for agentscope_runtime.engine.app.agent_app

# -*- coding: utf-8 -*-
import asyncio
import inspect
import json
import logging
import os
import platform
import shlex
import subprocess
import types
from contextlib import asynccontextmanager, AsyncExitStack
from typing import Any, Callable, Dict, List, Optional, Type

import uvicorn
from a2a.types import A2ARequest
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.params import Depends as DependsClass
from fastapi.responses import StreamingResponse
from starlette.types import Lifespan
from pydantic import BaseModel

from agentscope_runtime.common.utils.deprecation import deprecated
from agentscope_runtime.engine.deployers.adapter.protocol_adapter import (
    ProtocolAdapter,
)
from agentscope_runtime.engine.schemas.response_api import ResponseAPI
from ...version import __version__
from ..deployers import DeployManager
from ..deployers.adapter.a2a import (
    A2AFastAPIDefaultAdapter,
    AgentCardWithRuntimeConfig,
    extract_a2a_config,
)
from ..deployers.adapter.agui import AGUIDefaultAdapter, AGUIAdaptorConfig
from ..deployers.adapter.responses.response_api_protocol_adapter import (
    ResponseAPIDefaultAdapter,
)
from ..deployers.utils.deployment_modes import DeploymentMode
from ..deployers.utils.service_utils.interrupt import (
    BaseInterruptBackend,
    InterruptMixin,
    RedisInterruptBackend,
    LocalInterruptBackend,
)
from ..deployers.utils.service_utils.routing import (
    UnifiedRoutingMixin,
)
from ..runner import Runner
from ..schemas.agent_schemas import AgentRequest

logger = logging.getLogger(__name__)

HOST = os.getenv("HOST", "0.0.0.0")
PORT = int(os.getenv("PORT", "8080"))


[docs] class AgentApp(FastAPI, UnifiedRoutingMixin, InterruptMixin): """ Agent application integrating FastAPI and Runner with support for distributed interrupts. """ _REF_TEMPLATE = "#/components/schemas/{model}"
[docs] def openapi(self) -> dict[str, Any]: """ Generate OpenAPI schema with protocol-specific components. """ openapi_schema = super().openapi() if self.protocol_adapters: if any( isinstance(adapter, A2AFastAPIDefaultAdapter) for adapter in self.protocol_adapters ): self._inject_schema( openapi_schema, "A2ARequest", A2ARequest.model_json_schema( ref_template=self._REF_TEMPLATE, ), ) if any( isinstance(adapter, ResponseAPIDefaultAdapter) for adapter in self.protocol_adapters ): self._inject_schema( openapi_schema, "ResponseAPI", ResponseAPI.model_json_schema( ref_template=self._REF_TEMPLATE, ), ) self._inject_schema( openapi_schema, "AgentRequest", AgentRequest.model_json_schema( ref_template=self._REF_TEMPLATE, ), ) return openapi_schema
@staticmethod def _inject_schema( openapi_schema: dict[str, Any], schema_name: str, schema_definition: dict[str, Any], ) -> None: """Insert schema definition (and nested defs) into OpenAPI.""" components = openapi_schema.setdefault("components", {}) component_schemas = components.setdefault("schemas", {}) defs = schema_definition.pop("$defs", {}) for def_name, def_schema in defs.items(): component_schemas.setdefault(def_name, def_schema) component_schemas[schema_name] = schema_definition
[docs] def __init__( self, *, app_name: str = "AgentScope Runtime API", app_description: str = "", endpoint_path: str = "/process", response_type: str = "sse", stream: bool = True, request_model: Optional[Type[BaseModel]] = AgentRequest, before_start: Optional[Callable] = None, after_finish: Optional[Callable] = None, broker_url: Optional[str] = None, backend_url: Optional[str] = None, runner: Optional[Runner] = None, enable_embedded_worker: bool = False, a2a_config: Optional["AgentCardWithRuntimeConfig"] = None, agui_config: Optional[AGUIAdaptorConfig] = None, interrupt_backend: Optional[BaseInterruptBackend] = None, interrupt_redis_url: Optional[str] = None, lifespan: Optional[Lifespan[Any]] = None, mode: DeploymentMode = DeploymentMode.DAEMON_THREAD, protocol_adapters: Optional[list[ProtocolAdapter]] = None, custom_endpoints: Optional[List[Dict]] = None, **kwargs: Any, ): self._user_lifespan = lifespan fastapi_kwargs = { "title": app_name, "description": app_description, "version": __version__, "lifespan": self._lifespan_manager, **kwargs, } FastAPI.__init__(self, **fastapi_kwargs) self.init_routing_manager(broker_url, backend_url) self.endpoint_path = endpoint_path self.response_type = response_type self.stream = stream self.request_model = request_model self.before_start = before_start self.after_finish = after_finish self.broker_url = broker_url self.backend_url = backend_url self.enable_embedded_worker = enable_embedded_worker self._query_handler: Optional[Callable] = None self._init_handler: Optional[Callable] = None self._shutdown_handler: Optional[Callable] = None self._framework_type: Optional[str] = None if runner: self._runner = runner self._add_endpoint_router() else: self._runner = Runner() self.deployment_mode = mode if protocol_adapters: self.protocol_adapters: List[Any] = protocol_adapters else: self.protocol_adapters = self._init_protocol_adapters( app_name, app_description, a2a_config, agui_config, ) self._app_kwargs = { "title": "Agent Service", "version": __version__, "description": "Production-ready Agent Service API", **kwargs, } self._setup_interrupt_service( interrupt_backend, interrupt_redis_url, ) self._setup_builtin_routes() if custom_endpoints: self.restore_custom_endpoints(custom_endpoints) self._add_middleware()
def _setup_interrupt_service( self, backend: Optional[BaseInterruptBackend], redis_url: Optional[str], ) -> None: """Setup the interrupt service backend based on configuration.""" if backend: logger.info( "Initializing interrupt service using an " "externally provided backend instance.", ) self._init_interrupt_service(backend) elif redis_url: logger.info( "Initializing distributed interrupt service " "with Redis backend.", ) self._init_interrupt_service(RedisInterruptBackend(redis_url)) else: logger.info( "No distributed backend configuration detected. " "Falling back to LocalInterruptBackend for " "single-node execution.", ) self._init_interrupt_service(LocalInterruptBackend()) @asynccontextmanager async def _internal_framework_lifespan(self, app: FastAPI): """ Lifecycle manager for internal runner and hooks. """ # pylint: disable=too-many-branches self._build_runner() try: # aexit any possible running instances before set up # runner await self._runner.__aexit__(None, None, None) await self._runner.__aenter__() if self.before_start: if asyncio.iscoroutinefunction(self.before_start): await self.before_start(app) else: self.before_start(app) func = ( self._runner.stream_query if self.stream else self._runner.query ) for adapter in self.protocol_adapters: adapter.add_endpoint(app=self, func=func) if self.enable_embedded_worker and self.celery_app: self.start_embedded_celery_worker() yield finally: if self.after_finish: try: if asyncio.iscoroutinefunction(self.after_finish): await self.after_finish(app) else: self.after_finish(app) except Exception as e: logger.error(f"Error in after_finish hook: {e}") if self._runner: try: await self._runner.__aexit__(None, None, None) except Exception as e: logger.error(f"Warning: Error during runner cleanup: {e}") if self._interrupt_backend: try: await self.close_interrupt_service() except Exception as e: logger.error( "Warning: Error occurred while " f"closing the interrupt service: {e}", ) @asynccontextmanager async def _lifespan_manager(self, app: FastAPI): """ Main lifespan orchestrator combining internal and user logic. """ try: async with AsyncExitStack() as stack: await stack.enter_async_context( self._internal_framework_lifespan(app), ) user_state = {} if self._user_lifespan: user_state = await stack.enter_async_context( self._user_lifespan(app), ) yield user_state except Exception as e: logger.error(f"Application runtime error: {e}") raise def _init_protocol_adapters( self, app_name, app_description, a2a_config, agui_config, ) -> List[Any]: """Initialize supported protocol adapters for the agent.""" a2a_config = extract_a2a_config(a2a_config=a2a_config) return [ A2AFastAPIDefaultAdapter( agent_name=app_name, agent_description=app_description, a2a_config=a2a_config, ), ResponseAPIDefaultAdapter(), AGUIDefaultAdapter(config=agui_config), ] def _add_middleware(self): """Add middleware based on deployment mode.""" # Common middleware self.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @self.middleware("http") async def dynamic_deployment_middleware(request: Request, call_next): response = await call_next(request) if self.deployment_mode == DeploymentMode.DETACHED_PROCESS: response.headers["X-Process-Mode"] = "detached" elif self.deployment_mode == DeploymentMode.STANDALONE: response.headers["X-Deployment-Mode"] = "standalone" return response def _setup_builtin_routes(self): """Register health check and information discovery routes.""" @self.get("/health") @UnifiedRoutingMixin.internal_route async def health_check(): """Health check endpoint.""" status = { "status": "healthy", "mode": self.deployment_mode.value, } # Add service health checks if self._runner: status["runner"] = "ready" else: status["runner"] = "not_ready" return status @self.get("/") @UnifiedRoutingMixin.internal_route async def root(): return { "service": "AgentScope Runtime", "mode": self.deployment_mode.value, "endpoints": { "process": self.endpoint_path, "stream": ( f"{self.endpoint_path}/stream" if self.stream else None ), "health": "/health", }, } self._add_process_control_endpoints() def _add_process_control_endpoints(self): """Add process control endpoints for detached mode.""" @self.post("/shutdown") @UnifiedRoutingMixin.internal_route async def shutdown_process_simple(): """Gracefully shutdown the process (simple endpoint).""" import signal async def delayed_shutdown(): await asyncio.sleep(0.5) os.kill(os.getpid(), signal.SIGTERM) asyncio.create_task(delayed_shutdown()) return {"status": "shutting down"} @self.post("/admin/shutdown") @UnifiedRoutingMixin.internal_route async def shutdown_process(): """Gracefully shutdown the process.""" import signal # Schedule shutdown after response async def delayed_shutdown(): await asyncio.sleep(1) os.kill(os.getpid(), signal.SIGTERM) asyncio.create_task(delayed_shutdown()) return {"message": "Shutdown initiated"} @self.get("/admin/status") @UnifiedRoutingMixin.internal_route async def get_process_status(): """Get process status information.""" import psutil proc = psutil.Process(os.getpid()) return { "pid": os.getpid(), "status": proc.status(), "memory_usage": proc.memory_info().rss, "cpu_percent": proc.cpu_percent(), "uptime": proc.create_time(), } async def _stream_generator(self, request: dict, **kwargs): """ Dispatch stream generation based on interrupt backend status. """ if not self._interrupt_backend: try: if not self._runner: yield f"data: {json.dumps({'error': 'No runner'})}\n\n" return async for chunk in self._common_stream_generator( request, **kwargs, ): yield chunk except Exception as e: yield f"data: {json.dumps({'error': str(e)})}\n\n" else: async for chunk in self._stream_generator_with_interrupt( request, **kwargs, ): yield chunk async def _stream_generator_with_interrupt( self, request: dict, **kwargs, ): """ Execute stream generation wrapped with interrupt management. """ try: agent_req = AgentRequest(**request) async for chunk in self.run_and_stream( agent_req.user_id, agent_req.session_id, self._common_stream_generator, request, **kwargs, ): yield chunk except Exception as e: yield f"data: {json.dumps({'error': str(e)})}\n\n" async def _common_stream_generator(self, request: dict, **kwargs): """Yield standard SSE formatted chunks from the runner.""" if not self._runner: raise RuntimeError("Runner is not initialized.") async for chunk in self._runner.stream_query(request, **kwargs): if hasattr(chunk, "model_dump_json"): data = chunk.model_dump_json() elif hasattr(chunk, "json"): data = chunk.json() else: data = json.dumps({"text": str(chunk)}) yield f"data: {data}\n\n"
[docs] @deprecated( reason=( "Manual initialization is deprecated. " "Lifecycle management has been unified " "under FastAPI's 'lifespan' parameter. " "Please move your startup logic to " "a lifespan context manager." ), alternative="the 'lifespan' argument in AgentApp constructor", since="1.1.0", removed_in="1.2.0", ) def init(self, func: Callable) -> Callable: """Register init hook (support async and sync functions).""" self._init_handler = func self._build_runner() return func
[docs] def query(self, framework: Optional[str] = "agentscope"): """ Register run hook and optionally specify agent framework. Allowed framework values: 'agentscope', 'autogen', 'agno', 'langgraph'. """ allowed_frameworks = {"agentscope", "autogen", "agno", "langgraph"} if framework not in allowed_frameworks: raise ValueError(f"framework must be one of {allowed_frameworks}") def decorator(func: Callable): self._query_handler = func self._framework_type = framework self._build_runner() self._add_endpoint_router() return func return decorator
[docs] @deprecated( reason=( "Manual shutdown is deprecated. " "Lifecycle management has been unified " "under FastAPI's 'lifespan' parameter. " "Please move your shutdown logic to " "a lifespan context manager." ), alternative="the 'lifespan' argument in AgentApp constructor", since="1.1.0", removed_in="1.2.0", ) def shutdown(self, func: Callable) -> Callable: """Register shutdown hook (support async and sync functions).""" self._shutdown_handler = func self._build_runner() return func
def _build_runner(self): """Bind decorated handlers to the internal Runner instance.""" if self._runner is None: self._runner = Runner() if self._framework_type: self._runner.framework_type = self._framework_type handlers = [ ("query_handler", self._query_handler), ("init_handler", self._init_handler), ("shutdown_handler", self._shutdown_handler), ] for attr, handler in handlers: if handler: setattr( self._runner, attr, types.MethodType(handler, self._runner), ) def _add_endpoint_router(self): """ Dynamically construct and register the main inference endpoint. """ if not self._runner: return self.router.routes = [ route for route in self.router.routes if not ( hasattr(route, "path") and route.path == self.endpoint_path ) ] user_func = self._runner.query_handler async def agent_api(request: dict, **kwargs): return StreamingResponse( self._stream_generator(request, **kwargs), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", }, ) full_sig = inspect.signature(user_func) new_params = [ inspect.Parameter( "request", inspect.Parameter.POSITIONAL_OR_KEYWORD, annotation=dict, ), ] for _, param in full_sig.parameters.items(): if isinstance(param.default, DependsClass): new_params.append(param) agent_api.__signature__ = full_sig.replace(parameters=new_params) agent_api.__name__ = user_func.__name__ agent_api.__doc__ = user_func.__doc__ self.post( self.endpoint_path, openapi_extra={ "requestBody": { "content": { "application/json": { "schema": { "$ref": "#/components/schemas/AgentRequest", }, }, }, "required": True, "description": "Agent API Request Format. " "See https://runtime.agentscope.io/en/protocol.html for " "more details.", }, }, tags=["agent-api"], )(agent_api) def _apply_runtime_configs(self, kwargs: dict): """ Apply runtime configuration updates and synchronize internal services. """ self.stream = kwargs.pop("stream", self.stream) self.protocol_adapters = kwargs.pop( "protocol_adapters", self.protocol_adapters, ) self.enable_embedded_worker = kwargs.pop( "embed_task_processor", self.enable_embedded_worker, ) self.deployment_mode = kwargs.pop("mode", self.deployment_mode) if "runner" in kwargs: self._runner = kwargs.pop("runner") self._add_endpoint_router() if "endpoint_path" in kwargs: self.router.routes = [ route for route in self.router.routes if not ( hasattr(route, "path") and route.path == self.endpoint_path ) ] self.endpoint_path = kwargs.pop("endpoint_path") self._add_endpoint_router() if "custom_endpoints" in kwargs: custom_endpoints = kwargs.pop("custom_endpoints") self.restore_custom_endpoints(custom_endpoints)
[docs] def run(self, host=HOST, port=PORT, web_ui=False, **kwargs): """Launch the application server and optional Web UI.""" self._apply_runtime_configs(kwargs) try: logger.info( "Starting AgentApp...", ) logger.info(f"Starting server on {host}:{port}") if web_ui: webui_url = f"http://{host}:{port}{self.endpoint_path}" cmd = ( f"npx @agentscope-ai/chat agentscope-runtime-webui " f"--url {webui_url}" ) logger.info(f"WebUI started at {webui_url}") logger.info( "Note: First WebUI launch may take extra time " "as dependencies are installed.", ) cmd_kwarg = {} if platform.system() == "Windows": cmd_kwarg.update({"shell": True}) else: cmd = shlex.split(cmd) with subprocess.Popen(cmd, **cmd_kwarg): uvicorn.run( self, host=host, port=port, log_level="info", access_log=True, ) else: uvicorn.run( self, host=host, port=port, log_level="info", access_log=True, ) except KeyboardInterrupt: logger.info( "KeyboardInterrupt received, shutting down...", )
[docs] async def deploy(self, deployer: DeployManager, **kwargs): """Deploy the agent app""" deploy_kwargs = { "app": self, "custom_endpoints": self.custom_endpoints, "runner": self._runner, "endpoint_path": self.endpoint_path, "stream": self.stream, "protocol_adapters": self.protocol_adapters, } deploy_kwargs.update(kwargs) return await deployer.deploy(**deploy_kwargs)