LangGraph 集成指南¶
本指南介绍了如何在 AgentScope Runtime 中集成和使用 LangGraph 来构建复杂的智能体工作流。 推荐使用 python 3.11 或更高版本以支持流模式,详情请参考 LangGraph。
📦 示例说明¶
1. 基础 LLM 交互¶
一个简单的示例,演示在 AgentScope Runtime 中使用 LangGraph 进行基础的 LLM 交互:
使用来自 DashScope 的 Qwen-Plus 模型
实现了一个包含单个节点的基础状态图工作流
展示了如何从 LLM 流式传输响应
包含对话历史的记忆管理
演示了使用
StateGraph与START和call_model节点
以下是核心代码:
# -*- coding: utf-8 -*-
import os
import uuid
from langchain.agents import AgentState, create_agent
from langchain.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.base import BaseCheckpointSaver
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.store.base import BaseStore
from langgraph.store.memory import InMemoryStore
from agentscope_runtime.engine import AgentApp
from agentscope_runtime.engine.schemas.agent_schemas import AgentRequest
global_short_term_memory: BaseCheckpointSaver = None
global_long_term_memory: BaseStore = None
@tool
def get_weather(location: str, date: str) -> str:
"""Get the weather for a location and date."""
print(f"Getting weather for {location} on {date}...")
return f"The weather in {location} is sunny with a temperature of 25°C."
# Create the AgentApp instance
agent_app = AgentApp(
app_name="LangGraphAgent",
app_description="A LangGraph-based research assistant",
)
class CustomAgentState(AgentState):
user_id: str
session_id: dict
# Initialize services as instance variables
@agent_app.init
async def init_func(self):
global global_short_term_memory
global global_long_term_memory
self.short_term_mem = InMemorySaver()
self.long_term_mem = InMemoryStore()
global_short_term_memory = self.short_term_mem
global_long_term_memory = self.long_term_mem
# Shutdown services, in this case,
# we don't use any resources, so we don't need to do anything here
@agent_app.shutdown
async def shutdown_func(self):
pass
@agent_app.query(framework="langgraph")
async def query_func(
self,
msgs,
request: AgentRequest = None,
**kwargs,
):
# Extract session information
session_id = request.session_id
user_id = request.user_id
print(f"Received query from user {user_id} with session {session_id}")
tools = [get_weather]
# Choose the LLM that will drive the agent
llm = ChatOpenAI(
model="qwen-plus",
api_key=os.environ.get("DASHSCOPE_API_KEY"),
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
namespace_for_long_term_memory = (user_id, "memories")
prompt = """You are a proactive research assistant. """
agent = create_agent(
llm,
tools,
system_prompt=prompt,
checkpointer=self.short_term_mem,
store=self.long_term_mem,
state_schema=CustomAgentState,
name="LangGraphAgent",
)
async for chunk, meta_data in agent.astream(
input={"messages": msgs, "session_id": session_id, "user_id": user_id},
stream_mode="messages",
config={"configurable": {"thread_id": session_id}},
):
is_last_chunk = (
True if getattr(chunk, "chunk_position", "") == "last" else False
)
if meta_data["langgraph_node"] == "tools":
memory_id = str(uuid.uuid4())
memory = {"lastest_tool_call": chunk.name}
global_long_term_memory.put(
namespace_for_long_term_memory,
memory_id,
memory,
)
yield chunk, is_last_chunk
@agent_app.endpoint("/api/memory/short-term/{session_id}", methods=["GET"])
async def get_short_term_memory(session_id: str):
if global_short_term_memory is None:
return {"error": "Short-term memory not initialized yet."}
config = {"configurable": {"thread_id": session_id}}
value = await global_short_term_memory.aget_tuple(config)
if value is None:
return {"error": "No memory found for session_id"}
return {
"session_id": session_id,
"messages": value.checkpoint["channel_values"]["messages"],
"metadata": value.metadata,
}
@agent_app.endpoint("/api/memory/short-term", methods=["GET"])
async def list_short_term_memory():
if global_short_term_memory is None:
return {"error": "Short-term memory not initialized yet."}
result = []
short_mems = list(global_short_term_memory.list(None))
for short_mem in short_mems:
ch_vals = short_mem.checkpoint["channel_values"]
# Ignore the __pregel_tasks field, which is not serializable
safe_dict = {
key: value
for key, value in ch_vals.items()
if key != "__pregel_tasks"
}
result.append(safe_dict)
return result
@agent_app.endpoint("/api/memory/long-term/{user_id}", methods=["GET"])
async def get_long_term_memory(user_id: str):
if global_short_term_memory is None:
return {"error": "Short-term memory not initialized yet."}
namespace_for_long_term_memory = (user_id, "memories")
long_term_mem = global_long_term_memory.search(
namespace_for_long_term_memory,
)
def serialize_search_item(item):
return {
"namespace": item.namespace,
"key": item.key,
"value": item.value,
"created_at": item.created_at,
"updated_at": item.updated_at,
"score": item.score,
}
serialized = [serialize_search_item(item) for item in long_term_mem]
return serialized
if __name__ == "__main__":
agent_app.run(host="127.0.0.1", port=8090)
2. 具有工具调用能力的高级智能体¶
一个更复杂的示例,演示具有工具调用能力的智能体:
实现了短期(对话)和长期(持久化)记忆
使用检查点机制跨会话保持状态
提供了用于检查内存状态的 API 端点
包含一个用于演示目的的自定义天气工具
使用带有 user_id 和 session_id 字段的自定义
AgentState扩展实现了工具调用结果的长期记忆存储
以下核心代码:
# -*- coding: utf-8 -*-
import os
import uuid
from langchain.agents import AgentState, create_agent
from langchain.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.store.memory import InMemoryStore
from agentscope_runtime.engine import AgentApp
from agentscope_runtime.engine.schemas.agent_schemas import AgentRequest
global_short_term_memory = None
global_long_term_memory = None
@tool
def get_weather(location: str, date: str) -> str:
"""Get the weather for a location and date."""
print(f"Getting weather for {location} on {date}...")
return f"The weather in {location} is sunny with a temperature of 25°C."
# Create the AgentApp instance
agent_app = AgentApp(
app_name="LangGraphAgent",
app_description="A LangGraph-based research assistant",
)
class CustomAgentState(AgentState):
user_id: str
session_id: dict
# Initialize services as instance variables
@agent_app.init
async def init_func(self):
global global_short_term_memory
global global_long_term_memory
self.short_term_mem = InMemorySaver()
self.long_term_mem = InMemoryStore()
global_short_term_memory = self.short_term_mem
global_long_term_memory = self.long_term_mem
# Query endpoint for LangGraph integration with tools
@agent_app.query(framework="langgraph")
async def query_func(
self,
msgs,
request: AgentRequest = None,
**kwargs,
):
# Extract session information
session_id = request.session_id
user_id = request.user_id
print(f"Received query from user {user_id} with session {session_id}")
tools = [get_weather]
# Choose the LLM that will drive the agent
llm = ChatOpenAI(
model="qwen-plus",
api_key=os.environ.get("DASHSCOPE_API_KEY"),
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
namespace_for_long_term_memory = (user_id, "memories")
prompt = """You are a proactive research assistant. """
agent = create_agent(
llm,
tools,
system_prompt=prompt,
checkpointer=self.short_term_mem,
store=self.long_term_mem,
state_schema=CustomAgentState,
name="LangGraphAgent",
)
async for chunk, meta_data in agent.astream(
input={"messages": msgs, "session_id": session_id, "user_id": user_id},
stream_mode="messages",
config={"configurable": {"thread_id": session_id}},
):
is_last_chunk = (
True if getattr(chunk, "chunk_position", "") == "last" else False
)
if meta_data["langgraph_node"] == "tools":
memory_id = str(uuid.uuid4())
memory = {"lastest_tool_call": chunk.name}
global_long_term_memory.put(
namespace_for_long_term_memory,
memory_id,
memory,
)
yield chunk, is_last_chunk
if __name__ == "__main__":
agent_app.run(host="127.0.0.1", port=8090)
⚙️ 先决条件¶
Note
在开始之前,请确保您已经安装了 AgentScope Runtime 完整版并配置了必要的 API 密钥。
安装依赖:
pip install "agentscope-runtime[ext]"
设置环境变量:
# 必需:DashScope API 密钥用于 Qwen 模型 export DASHSCOPE_API_KEY="your-dashscope-api-key"
▶️ 运行示例¶
Tip
确保您已经在先决条件部分设置了所有必需的环境变量,然后再运行这些示例。
启动服务器后,您可以通过查询界面与智能体交互,并通过提供的 API 端点检查内存状态。
与智能体交互¶
服务器运行后,您可以使用 /query 端点向智能体发送查询:
curl -N \
-X POST "http://localhost:8090/process" \
-H "Content-Type: application/json" \
-d '{
"input": [
{
"role": "user",
"content": [
{ "type": "text", "text": "上海天气如何?" }
]
}
],
"session_id": "session_id_123",
"user_id": "user_id_123"
}'
✨ 关键特性展示¶
LangGraph 集成¶
使用
AgentState进行状态管理使用
StateGraph定义工作流检查点机制实现持久化状态
流式响应实现实时交互
内存管理¶
短期记忆用于对话历史
长期记忆用于持久化存储
API 端点检查内存状态
基于会话的内存隔离
工具集成¶
使用 LangChain 的
@tool装饰器定义自定义工具工具调用和结果处理
🌐 API 端点¶
Important
以下 API 端点仅在运行高级智能体示例时可用。
运行高级智能体示例时,以下 API 端点可用:
POST /process- 向智能体发送查询GET /api/memory/short-term/{session_id}- 获取会话的短期记忆GET /api/memory/short-term- 列出所有短期记忆GET /api/memory/long-term/{user_id}- 获取用户的长期记忆
🔧 自定义¶
您可以通过以下方式自定义这些示例:
添加新工具:使用
@tool装饰器定义自定义工具更改 LLM:修改
ChatOpenAI初始化以使用不同的模型扩展工作流:向状态图添加新节点和边
自定义内存:实现不同的内存存储后端