简单部署¶
AgentApp 是 AgentScope Runtime 中的全能型应用服务封装器。
它为你的 agent 逻辑提供 HTTP 服务框架,并可将其作为 API 暴露,支持以下功能:
流式响应(SSE),实现实时输出
内置 健康检查 接口
生命周期钩子(
@app.init/@app.shutdown),用于启动与清理逻辑可选的 Celery 异步任务队列
部署到本地或远程目标
重要说明:
在当前版本中,AgentApp 不会自动包含 /process 端点。
你必须显式地使用装饰器(例如 @app.query(...))注册一个请求处理函数,服务才能处理传入的请求。
下面的章节将通过具体示例深入介绍每项功能。
初始化与基本运行¶
功能
创建一个最小的 AgentApp 实例,并启动基于 FastAPI 的 HTTP 服务骨架。
初始状态下,服务只提供:
欢迎页
/健康检查
/health就绪探针
/readiness存活探针
/liveness
注意:
默认不会暴露
/process或其它业务处理端点。必须使用如
@app.query(...)装饰器、@app.task(...)等方法注册至少一个 handler,才能对外提供处理请求的 API。处理函数可以是普通函数或 async 函数,也可以支持流式(async generator)输出。
用法示例
from agentscope_runtime.engine import AgentApp
agent_app = AgentApp(
app_name="Friday",
app_description="A helpful assistant",
)
agent_app.run(host="127.0.0.1", port=8090)
A2A 扩展字段配置¶
功能
通过 a2a_config 参数扩展配置 Agent 的 A2A(Agent-to-Agent)协议信息和运行时相关字段。
关键参数
a2a_config:可选参数,支持AgentCardWithRuntimeConfig对象
配置内容
a2a_config 支持配置两类字段:
AgentCard 协议字段:通过
agent_card字段传递,包含技能、传输协议、输入输出模式等Runtime 运行时字段:顶层字段,包含服务注册与发现(Registry)、超时设置、服务端点等
用法示例
from agentscope_runtime.engine import AgentApp
from agentscope_runtime.engine.deployers.adapter.a2a import (
AgentCardWithRuntimeConfig,
)
agent_app = AgentApp(
app_name="MyAgent",
app_description="My agent description",
a2a_config=AgentCardWithRuntimeConfig(
agent_card={
"name": "MyAgent",
"description": "My agent description",
"skills": [...], # Agent 技能列表
"default_input_modes": ["text"],
"default_output_modes": ["text"],
# ... 其他协议字段
},
registry=[...], # 服务注册与发现
task_timeout=120, # 任务超时设置
# ... 其他配置字段
),
)
详细说明
完整的字段说明、配置方法和使用示例,请参考 A2A Registry - 服务注册与发现 文档。
流式输出(SSE)¶
功能 让客户端实时接收生成结果(适合聊天、代码生成等逐步输出场景)。
关键参数
response_type="sse"stream=True
用法示例(客户端)
curl -N \
-X POST "http://localhost:8090/process" \
-H "Content-Type: application/json" \
-d '{
"input": [
{ "role": "user", "content": [{ "type": "text", "text": "Hello Friday" }] }
]
}'
返回格式
data: {"sequence_number":0,"object":"response","status":"created", ... }
data: {"sequence_number":1,"object":"response","status":"in_progress", ... }
data: {"sequence_number":2,"object":"message","status":"in_progress", ... }
data: {"sequence_number":3,"object":"content","status":"in_progress","text":"Hello" }
data: {"sequence_number":4,"object":"content","status":"in_progress","text":" World!" }
data: {"sequence_number":5,"object":"message","status":"completed","text":"Hello World!" }
data: {"sequence_number":6,"object":"response","status":"completed", ... }
生命周期钩子¶
功能
在应用启动前和停止后执行自定义逻辑,例如加载模型或关闭连接。
方式1:使用参数传递¶
关键参数
before_start:在 API 服务启动之前执行after_finish:在 API 服务终止时执行
用法示例
async def init_resources(app, **kwargs):
print("🚀 服务启动中,初始化资源...")
async def cleanup_resources(app, **kwargs):
print("🛑 服务即将关闭,释放资源...")
app = AgentApp(
agent=agent,
before_start=init_resources,
after_finish=cleanup_resources
)
方式2:使用装饰器(推荐)¶
除了通过构造函数参数传递钩子函数外,还可以使用装饰器的方式来注册生命周期钩子。 这种写法有以下优点:
更灵活直观 —— 生命周期逻辑直接贴近应用定义,结构更清晰,可读性更高;
可共享成员变量 —— 装饰器定义的函数会接收
self,可以访问AgentApp实例的属性和服务(例如@app.init中启动的状态服务、会话服务等),方便在不同生命周期或请求处理逻辑中共享和复用资源;
from agentscope_runtime.engine import AgentApp
from agentscope_runtime.engine.services.agent_state import InMemoryStateService
from agentscope_runtime.engine.services.session_history import InMemorySessionHistoryService
app = AgentApp(
app_name="Friday",
app_description="A helpful assistant",
)
@app.init
async def init_func(self):
"""初始化服务资源"""
self.state_service = InMemoryStateService()
self.session_service = InMemorySessionHistoryService()
await self.state_service.start()
await self.session_service.start()
print("✅ 服务初始化完成")
@app.shutdown
async def shutdown_func(self):
"""清理服务资源"""
await self.state_service.stop()
await self.session_service.stop()
print("✅ 服务资源已清理")
装饰器说明
@app.init:注册初始化钩子,在服务启动前执行@app.shutdown:注册关闭钩子,在服务停止时执行装饰器函数接收
self参数,可以访问AgentApp实例支持同步和异步函数
健康检查接口¶
功能
自动提供健康探针接口,方便容器或集群部署。
接口列表
GET /health:返回状态与时间戳GET /readiness:判断是否就绪GET /liveness:判断是否存活GET /:欢迎信息
用法示例
curl http://localhost:8090/health
curl http://localhost:8090/readiness
curl http://localhost:8090/liveness
curl http://localhost:8090/
Celery 异步任务队列(可选)¶
功能
支持长耗时后台任务,不阻塞 HTTP 主线程。
关键参数
broker_url="redis://localhost:6379/0"backend_url="redis://localhost:6379/0"
用法示例
app = AgentApp(
agent=agent,
broker_url="redis://localhost:6379/0",
backend_url="redis://localhost:6379/0"
)
@app.task("/longjob", queue="celery")
def heavy_computation(data):
return {"result": data["x"] ** 2}
请求:
curl -X POST http://localhost:8090/longjob -H "Content-Type: application/json" -d '{"x": 5}'
返回任务 ID:
{"task_id": "abc123"}
查询结果:
curl http://localhost:8090/longjob/abc123
自定义查询处理¶
功能
使用 @app.query() 装饰器可以完全自定义查询处理逻辑,实现更灵活的控制,包括状态管理、会话历史管理等。
基本用法¶
from agentscope_runtime.engine import AgentApp
from agentscope_runtime.engine.schemas.agent_schemas import AgentRequest
from agentscope.agent import ReActAgent
from agentscope.model import DashScopeChatModel
from agentscope.pipeline import stream_printing_messages
from agentscope_runtime.adapters.agentscope.memory import AgentScopeSessionHistoryMemory
app = AgentApp(
app_name="Friday",
app_description="A helpful assistant",
)
@app.query(framework="agentscope")
async def query_func(
self,
msgs,
request: AgentRequest = None,
**kwargs,
):
"""自定义查询处理函数"""
session_id = request.session_id
user_id = request.user_id
# 加载会话状态
state = await self.state_service.export_state(
session_id=session_id,
user_id=user_id,
)
# 创建 Agent 实例
agent = ReActAgent(
name="Friday",
model=DashScopeChatModel(
"qwen-turbo",
api_key=os.getenv("DASHSCOPE_API_KEY"),
stream=True,
),
sys_prompt="You're a helpful assistant named Friday.",
memory=AgentScopeSessionHistoryMemory(
service=self.session_service,
session_id=session_id,
user_id=user_id,
),
)
# 恢复状态(如果存在)
if state:
agent.load_state_dict(state)
# 流式处理消息
async for msg, last in stream_printing_messages(
agents=[agent],
coroutine_task=agent(msgs),
):
yield msg, last
# 保存状态
state = agent.state_dict()
await self.state_service.save_state(
user_id=user_id,
session_id=session_id,
state=state,
)
关键特性¶
框架支持:
framework参数支持"agentscope","autogen","agno","langgraph"等函数签名:
self:AgentApp 实例,可以访问注册的服务msgs:输入消息列表request:AgentRequest 对象,包含session_id,user_id等信息**kwargs:其他扩展参数
流式输出:函数可以是生成器,支持流式返回结果
状态管理:可以访问
self.state_service进行状态保存和恢复会话历史:可以访问
self.session_service管理会话历史
完整示例:带状态管理的 AgentApp¶
import os
from agentscope_runtime.engine import AgentApp
from agentscope_runtime.engine.schemas.agent_schemas import AgentRequest
from agentscope.agent import ReActAgent
from agentscope.model import DashScopeChatModel
from agentscope.tool import Toolkit, execute_python_code
from agentscope.pipeline import stream_printing_messages
from agentscope_runtime.adapters.agentscope.memory import AgentScopeSessionHistoryMemory
from agentscope_runtime.engine.services.agent_state import InMemoryStateService
from agentscope_runtime.engine.services.session_history import InMemorySessionHistoryService
app = AgentApp(
app_name="Friday",
app_description="A helpful assistant with state management",
)
@app.init
async def init_func(self):
"""初始化状态和会话服务"""
self.state_service = InMemoryStateService()
self.session_service = InMemorySessionHistoryService()
await self.state_service.start()
await self.session_service.start()
@app.shutdown
async def shutdown_func(self):
"""清理服务"""
await self.state_service.stop()
await self.session_service.stop()
@app.query(framework="agentscope")
async def query_func(
self,
msgs,
request: AgentRequest = None,
**kwargs,
):
"""带状态管理的查询处理"""
session_id = request.session_id
user_id = request.user_id
# 加载历史状态
state = await self.state_service.export_state(
session_id=session_id,
user_id=user_id,
)
# 创建工具包
toolkit = Toolkit()
toolkit.register_tool_function(execute_python_code)
# 创建 Agent
agent = ReActAgent(
name="Friday",
model=DashScopeChatModel(
"qwen-turbo",
api_key=os.getenv("DASHSCOPE_API_KEY"),
enable_thinking=True,
stream=True,
),
sys_prompt="You're a helpful assistant named Friday.",
toolkit=toolkit,
memory=AgentScopeSessionHistoryMemory(
service=self.session_service,
session_id=session_id,
user_id=user_id,
),
)
agent.set_console_output_enabled(enabled=False)
# 恢复状态
if state:
agent.load_state_dict(state)
# 流式处理
async for msg, last in stream_printing_messages(
agents=[agent],
coroutine_task=agent(msgs),
):
yield msg, last
# 保存状态
state = agent.state_dict()
await self.state_service.save_state(
user_id=user_id,
session_id=session_id,
state=state,
)
# 运行服务
app.run(host="0.0.0.0", port=8090)
与 V0 版本 Agent 参数方式的区别¶
特性 |
标准方式(agent 参数) |
自定义查询(@app.query) |
|---|---|---|
灵活性 |
较低,使用预定义的 Agent |
高,完全自定义处理逻辑 |
状态管理 |
自动处理 |
手动管理,更灵活 |
适用场景 |
简单场景 |
复杂场景,需要精细控制 |
多框架支持 |
有限 |
支持多种框架 |
通过 @app.endpoint 自定义接口¶
AgentApp 除了可以用 @app.query(...) 定义统一的 /process 请求入口外,还支持通过 @app.endpoint(...) 装饰器为应用注册任意路径的 HTTP 接口。
主要特点:
灵活性高 —— 你可以为不同业务定义专门的 API 路径,而不是都走
/process;多种返回模式—— 支持
普通同步/异步函数返回 JSON 对象
生成器(同步或异步)返回 流式数据(SSE)
参数解析——
@app.endpoint装饰的函数可以自动解析URL 查询参数
JSON 请求体(自动映射到 Pydantic 模型)
fastapi.Request对象AgentRequest对象(方便统一 session、用户信息等)
异常处理 —— 流式生成器抛出的异常会自动封装到 SSE 错误事件中返回给客户端。
示例:
app = AgentApp()
@app.endpoint("/hello")
def hello_endpoint():
return {"msg": "Hello world"}
@app.endpoint("/stream_numbers")
async def stream_numbers():
for i in range(5):
yield f"number: {i}\n"
调用:
curl -X POST http://localhost:8090/hello
curl -X POST http://localhost:8090/stream_numbers
部署到本地或远程¶
功能
通过 deploy() 方法统一部署到不同运行环境。
用法示例
from agentscope_runtime.engine.deployers import LocalDeployManager
await app.deploy(LocalDeployManager(host="0.0.0.0", port=8091))
更多部署选项和详细说明,请参考 高级部署 文档。
AgentScope Runtime 提供了Serverless的部署方案,您可以将您的Agent部署到 ModelStudio(FC) 或 AgentRun 上。 参考 高级部署 文档,查看ModelStudio和AgentRun部署部分获取更多配置详情.