参考: 完整部署样例

本教程演示了如何使用AgentScope Runtime与AgentScope框架创建和部署 “推理与行动”(ReAct) 智能体。

Note

ReAct(推理与行动)范式使智能体能够将推理轨迹与特定任务的行动交织在一起,使其在工具交互任务中特别有效。通过将AgentScope的ReActAgent与AgentScope Runtime的基础设施相结合,您可以同时获得智能决策和安全的工具执行。

前置要求

安装依赖

pip install agentscope-runtime

沙箱

Note

确保您的浏览器沙箱环境已准备好使用,详细信息请参见沙箱

确保浏览器沙箱镜像可用:

docker pull agentscope-registry.ap-southeast-1.cr.aliyuncs.com/agentscope/runtime-sandbox-browser:latest && docker tag agentscope-registry.ap-southeast-1.cr.aliyuncs.com/agentscope/runtime-sandbox-browser:latest agentscope/runtime-sandbox-browser:latest

API密钥配置

您需要为您选择的LLM提供商准备API密钥。此示例使用DashScope(Qwen),但您可以将其适配到其他提供商:

export DASHSCOPE_API_KEY="your_api_key_here"

分步实现

步骤 1:导入依赖

import asyncio
import os
from contextlib import asynccontextmanager

from fastapi import FastAPI
from agentscope.agent import ReActAgent
from agentscope.model import DashScopeChatModel
from agentscope.formatter import DashScopeChatFormatter
from agentscope.tool import Toolkit, execute_python_code
from agentscope.pipeline import stream_printing_messages
from agentscope.memory import InMemoryMemory
from agentscope.session import RedisSession

from agentscope_runtime.engine import AgentApp
from agentscope_runtime.engine.schemas.agent_schemas import AgentRequest
from agentscope_runtime.engine.services.sandbox import SandboxService
from agentscope_runtime.sandbox import BrowserSandbox

步骤 2:准备浏览器沙箱工具

tests/sandbox/test_sandbox.py 相同,我们可以直接通过上下文管理器验证浏览器沙箱是否可用:

with BrowserSandbox() as box:
    print(box.list_tools())
    print(box.browser_navigate("https://www.example.com/"))
    print(box.browser_snapshot())

当需要在服务内长期复用沙箱时,参考 tests/sandbox/test_sandbox_service.py 使用 SandboxService 管理生命周期:

import asyncio

async def bootstrap_browser_sandbox():
    sandbox_service = SandboxService()
    await sandbox_service.start()

    session_id = "demo_session"
    user_id = "demo_user"

    sandboxes = sandbox_service.connect(
        session_id=session_id,
        user_id=user_id,
        sandbox_types=["browser"],
    )
    browser_box = sandboxes[0]
    browser_box.browser_navigate("https://www.example.com/")
    browser_box.browser_snapshot()

    await sandbox_service.stop()

asyncio.run(bootstrap_browser_sandbox())

这里的 sandbox_types=["browser"]tests/sandbox/test_sandbox_service.py 保持一致,可确保同一 session_id / user_id 复用同一个浏览器沙箱实例。

步骤 3:构建 AgentApp

Important

⚠️ 提示

此处的 Agent 构建(模型、工具、会话记忆、格式化器等)只是一个示例配置,您需要根据实际需求替换为自己的模块实现。

下面的逻辑展示了如何利用 AgentApp 的特性构建服务,包含生命周期管理、会话记忆、流式响应以及中断处理:

PORT = 8090

@asynccontextmanager
async def lifespan(app: FastAPI):
    import fakeredis

    fake_redis = fakeredis.aioredis.FakeRedis(decode_responses=True)
    # NOTE: This FakeRedis instance is for development/testing only.
    # In production, replace it with your own Redis client/connection
    # (e.g., aioredis.Redis)
    app.state.session = RedisSession(connection_pool=fake_redis.connection_pool)

    app.state.sandbox_service = SandboxService()
    await app.state.sandbox_service.start()
    try:
        yield
    finally:
        await app.state.sandbox_service.stop()

agent_app = AgentApp(
    app_name="Friday",
    app_description="A helpful assistant",
    lifespan=lifespan,
)


@agent_app.query(framework="agentscope")
async def query_func(self, msgs, request: AgentRequest = None, **kwargs):
    session_id = request.session_id
    user_id = request.user_id

    sandboxes = agent_app.state.sandbox_service.connect(
        session_id=session_id,
        user_id=user_id,
        sandbox_types=["browser"],
    )
    browser_box = sandboxes[0]

    toolkit = Toolkit()
    for tool in (
        browser_box.browser_navigate,
        browser_box.browser_snapshot,
        browser_box.browser_take_screenshot,
        browser_box.browser_click,
        browser_box.browser_type,
    ):
        toolkit.register_tool_function(tool)
    toolkit.register_tool_function(execute_python_code)

    agent = ReActAgent(
        name="Friday",
        model=DashScopeChatModel(
            "qwen-turbo",
            api_key=os.getenv("DASHSCOPE_API_KEY"),
            enable_thinking=True,
            stream=True,
        ),
        sys_prompt="You're a helpful assistant named Friday.",
        toolkit=toolkit,
        memory=InMemoryMemory(),
        formatter=DashScopeChatFormatter(),
    )
    agent.set_console_output_enabled(enabled=False)

    await agent_app.state.session.load_session_state(
        session_id=session_id,
        user_id=user_id,
        agent=agent,
    )

    try:
        async for msg, last in stream_printing_messages(
            agents=[agent],
            coroutine_task=agent(msgs),
        ):
            yield msg, last

    except asyncio.CancelledError:
        await agent.interrupt()
        raise

    finally:
        await agent_app.state.session.save_session_state(
            session_id=session_id,
            user_id=user_id,
            agent=agent,
        )

@agent_app.post("/stop")
async def stop_task(request: AgentRequest):
    await agent_app.stop_chat(
        user_id=request.user_id,
        session_id=request.session_id,
    )
    return {
        "status": "success",
        "message": "Interrupt signal broadcasted.",
    }

上述 query_func 会将 Agent 的输出通过 SSE 逐条返回,同时把最新 state 写回内存服务,实现多轮记忆。

借助 SandboxServicesandbox_types=["browser"]) ,浏览器沙箱会根据同一个 session_iduser_id 在多轮对话中复用,避免重复启动容器。

步骤 4:启动服务

if __name__ == "__main__":
    agent_app.run(host="127.0.0.1", port=PORT)

运行脚本后即可在 http://127.0.0.1:8090/process 收到流式响应。

步骤 5:测试 SSE 输出

curl -N \
  -X POST "http://127.0.0.1:8090/process" \
  -H "Content-Type: application/json" \
  -d '{
    "input": [
      {
        "role": "user",
        "content": [
          { "type": "text", "text": "What is the capital of France?" }
        ]
      }
    ]
  }'

你将看到多条 data: {...} 事件以及最终的 data: [DONE]。如果消息体中包含 “Paris” 即表示回答正确。

步骤 6:多轮记忆验证

要验证原生 Session 持久化是否生效,可以复用测试中「两轮对话」的交互流程:第一次提交 “My name is Alice.” 并携带固定 session_id,第二次询问 “What is my name?”,若返回文本包含 “Alice” 即表示记忆成功。

步骤 7:OpenAI 兼容模式

AgentApp 同时暴露了 compatible-mode/v1 路径,可使用官方 openai SDK 验证:

from openai import OpenAI

client = OpenAI(base_url="http://127.0.0.1:8090/compatible-mode/v1")
resp = client.responses.create(
    model="any_name",
    input="Who are you?",
)

print(resp.response["output"][0]["content"][0]["text"])

正常情况下你会得到 “I’m Friday …” 之类的回答。

步骤 8:测试中断功能

在长任务处理过程中,您可能需要手动干预并停止推理。

1. 启动一个耗时任务 在终端 1 中,要求 Agent 执行一个复杂的任务(如写长文或进行复杂的网页浏览):

curl -N -X POST "http://127.0.0.1:8090/process" \
  -H "Content-Type: application/json" \
  -d '{
    "input": [
      {
        "role": "user",
        "content": [
          {
            "type": "text",
            "text": "Please browse the news today and write a 1000-word detailed report."
          }
        ]
      }
    ],
    "session_id": "ss-123",
    "user_id": "uu-123"
  }'

2. 发送中断信号 在终端 1 正在流式输出时,打开终端 2 发送中断请求(注意 session_iduser_id 须保持一致):

curl -X POST "http://localhost:8090/stop" \
  -H "Content-Type: application/json" \
  -d '{
    "input": [],
    "session_id": "ss-123",
    "user_id": "uu-123"
  }'

3. 预期现象

  • 终端 2 将返回 {"status": "success", "message": "Interrupt signal broadcasted."}

  • 终端 1 的 SSE 数据流将立即停止。

总结

通过本章节的内容,你可以快速获得一个带有流式响应、会话记忆、中断处理以及 OpenAI 兼容接口的 ReAct 智能体服务。若需部署到远端或扩展更多工具,只需替换 DashScopeChatModel、状态服务或工具注册逻辑即可。