# -*- coding: utf-8 -*-
import logging
import os
import types
import platform
import subprocess
import shlex
from typing import Optional, Callable, List
import uvicorn
from pydantic import BaseModel
from .base_app import BaseApp
from ..deployers import DeployManager
from ..deployers.adapter.a2a import A2AFastAPIDefaultAdapter
from ..deployers.adapter.responses.response_api_protocol_adapter import (
ResponseAPIDefaultAdapter,
)
from ..deployers.utils.deployment_modes import DeploymentMode
from ..deployers.utils.service_utils.fastapi_factory import FastAPIAppFactory
from ..runner import Runner
from ..schemas.agent_schemas import AgentRequest
from ...version import __version__
logger = logging.getLogger(__name__)
HOST = os.getenv("HOST", "0.0.0.0")
PORT = int(os.getenv("PORT", "8080"))
[docs]
class AgentApp(BaseApp):
"""
The AgentApp class represents an application that runs as an agent.
"""
[docs]
def __init__(
self,
*,
app_name: str = "",
app_description: str = "",
endpoint_path: str = "/process",
response_type: str = "sse",
stream: bool = True,
request_model: Optional[type[BaseModel]] = AgentRequest,
before_start: Optional[Callable] = None,
after_finish: Optional[Callable] = None,
broker_url: Optional[str] = None,
backend_url: Optional[str] = None,
runner: Optional[Runner] = None,
enable_embedded_worker: bool = False,
**kwargs,
):
"""
Initialize the AgentApp.
Args:
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
"""
self.endpoint_path = endpoint_path
self.response_type = response_type
self.stream = stream
self.request_model = request_model
self.before_start = before_start
self.after_finish = after_finish
self.broker_url = broker_url
self.backend_url = backend_url
self.enable_embedded_worker = enable_embedded_worker
self._runner = runner
self.custom_endpoints = [] # Store custom endpoints
# Custom Handlers
self._query_handler: Optional[Callable] = None
self._init_handler: Optional[Callable] = None
self._shutdown_handler: Optional[Callable] = None
self._framework_type: Optional[str] = None
a2a_protocol = A2AFastAPIDefaultAdapter(
agent_name=app_name,
agent_description=app_description,
)
response_protocol = ResponseAPIDefaultAdapter()
self.protocol_adapters = [a2a_protocol, response_protocol]
self._app_kwargs = {
"title": "Agent Service",
"version": __version__,
"description": "Production-ready Agent Service API",
**kwargs,
}
super().__init__(
broker_url=broker_url,
backend_url=backend_url,
)
# Store custom endpoints and tasks for deployment
# but don't add them to FastAPI here - let FastAPIAppFactory handle it
[docs]
def init(self, func):
"""Register init hook (support async and sync functions)."""
self._init_handler = func
return func
[docs]
def query(self, framework: Optional[str] = "agentscope"):
"""
Register run hook and optionally specify agent framework.
Allowed framework values: 'agentscope', 'autogen', 'agno', 'langgraph'.
"""
allowed_frameworks = {"agentscope", "autogen", "agno", "langgraph"}
if framework not in allowed_frameworks:
raise ValueError(f"framework must be one of {allowed_frameworks}")
def decorator(func):
self._query_handler = func
self._framework_type = framework
return func
return decorator
[docs]
def shutdown(self, func):
"""Register shutdown hook (support async and sync functions)."""
self._shutdown_handler = func
return func
def _build_runner(self):
if self._runner is None:
self._runner = Runner()
if self._framework_type is not None:
self._runner.framework_type = self._framework_type
if self._query_handler is not None:
self._runner.query_handler = types.MethodType(
self._query_handler,
self._runner,
)
if self._init_handler is not None:
self._runner.init_handler = types.MethodType(
self._init_handler,
self._runner,
)
if self._shutdown_handler is not None:
self._runner.shutdown_handler = types.MethodType(
self._shutdown_handler,
self._runner,
)
[docs]
def run(
self,
host=HOST,
port=PORT,
web_ui=False,
**kwargs,
):
"""
Launch the AgentApp HTTP API server.
This method starts a FastAPI server for the agent service.
Optionally, it can also launch a browser-based Web UI for
interacting with the agent.
Note:
If `web_ui=True` and this is the **first time** launching the Web UI,
additional time may be required to initialize. This happens because the
underlying Node.js command (`npx @agentscope-ai/chat
agentscope-runtime-webui`) might install dependencies and set up
the runtime environment.
Args:
host (str): Host address to bind to. Default "0.0.0.0".
port (int): Port number to serve the application on. Default 8080.
web_ui (bool): If True, launches the Agentscope Web UI in a
separate process, pointing it to the API endpoint. This
allows interactive use via browser. Default False.
**kwargs: Additional keyword arguments passed to FastAPIAppFactory
when creating the FastAPI application.
Example:
>>> app = AgentApp(app_name="MyAgent")
>>> app.chat(host="127.0.0.1", port=8080, web_ui=True)
"""
# Build runner
self._build_runner()
try:
logger.info(
"[AgentApp] Starting AgentApp with FastAPIAppFactory...",
)
fastapi_app = self.get_fastapi_app(**kwargs)
logger.info(f"[AgentApp] Starting server on {host}:{port}")
if web_ui:
webui_url = f"http://{host}:{port}{self.endpoint_path}"
cmd = (
f"npx @agentscope-ai/chat agentscope-runtime-webui "
f"--url {webui_url}"
)
logger.info(f"[AgentApp] WebUI started at {webui_url}")
logger.info(
"[AgentApp] Note: First WebUI launch may take extra time "
"as dependencies are installed.",
)
cmd_kwarg = {}
if platform.system() == "Windows":
cmd_kwarg.update({"shell": True})
else:
cmd = shlex.split(cmd)
with subprocess.Popen(cmd, **cmd_kwarg):
uvicorn.run(
fastapi_app,
host=host,
port=port,
log_level="info",
access_log=True,
)
else:
uvicorn.run(
fastapi_app,
host=host,
port=port,
log_level="info",
access_log=True,
)
except KeyboardInterrupt:
logger.info(
"[AgentApp] KeyboardInterrupt received, shutting down...",
)
[docs]
def get_fastapi_app(self, **kwargs):
"""Get the FastAPI application"""
self._build_runner()
mode = kwargs.pop("mode", DeploymentMode.DAEMON_THREAD)
return FastAPIAppFactory.create_app(
runner=self._runner,
endpoint_path=self.endpoint_path,
request_model=self.request_model,
response_type=self.response_type,
stream=self.stream,
before_start=self.before_start,
after_finish=self.after_finish,
mode=mode,
protocol_adapters=self.protocol_adapters,
custom_endpoints=self.custom_endpoints,
broker_url=self.broker_url,
backend_url=self.backend_url,
enable_embedded_worker=self.enable_embedded_worker,
app_kwargs=self._app_kwargs,
**kwargs,
)
[docs]
async def deploy(self, deployer: DeployManager, **kwargs):
"""Deploy the agent app with custom endpoints support"""
# Pass custom endpoints and tasks to the deployer
# Build runner
self._build_runner()
deploy_kwargs = {
"app": self,
"custom_endpoints": self.custom_endpoints,
"runner": self._runner,
"endpoint_path": self.endpoint_path,
"stream": self.stream,
"protocol_adapters": self.protocol_adapters,
}
deploy_kwargs.update(kwargs)
return await deployer.deploy(**deploy_kwargs)
[docs]
def endpoint(self, path: str, methods: Optional[List[str]] = None):
"""Decorator to register custom endpoints"""
if methods is None:
methods = ["POST"]
def decorator(func: Callable):
endpoint_info = {
"path": path,
"handler": func,
"methods": methods,
"module": getattr(func, "__module__", None),
"function_name": getattr(func, "__name__", None),
}
self.custom_endpoints.append(endpoint_info)
return func
return decorator
[docs]
def task(self, path: str, queue: str = "default"):
"""Decorator to register custom task endpoints"""
def decorator(func: Callable):
# Store task configuration for FastAPIAppFactory to handle
task_info = {
"path": path,
"handler": func, # Store original function
"methods": ["POST"],
"module": getattr(func, "__module__", None),
"function_name": getattr(func, "__name__", None),
"queue": queue,
"task_type": True, # Mark as task endpoint
"original_func": func,
}
self.custom_endpoints.append(
task_info,
) # Add to endpoints for deployment
return func
return decorator