LangGraph Integration Guide¶
This guide introduces how to integrate and use LangGraph with AgentScope Runtime to build complex agent workflows. Recommend to use python 3.11 or above for stream mode, refer to LangGraph for more details.
📦 Example Descriptions¶
1. Basic LLM Interaction¶
A simple example demonstrating basic LLM interaction using LangGraph within AgentScope Runtime:
Uses Qwen-Plus model from DashScope
Implements a basic state graph workflow with a single node
Shows how to stream responses from the LLM
Includes memory management for conversation history
Demonstrates the use of
StateGraphwithSTARTandcall_modelnodes
Here is the core code:
# -*- coding: utf-8 -*-
import os
import uuid
from langchain.agents import AgentState, create_agent
from langchain.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.base import BaseCheckpointSaver
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.store.base import BaseStore
from langgraph.store.memory import InMemoryStore
from agentscope_runtime.engine import AgentApp
from agentscope_runtime.engine.schemas.agent_schemas import AgentRequest
global_short_term_memory: BaseCheckpointSaver = None
global_long_term_memory: BaseStore = None
@tool
def get_weather(location: str, date: str) -> str:
"""Get the weather for a location and date."""
print(f"Getting weather for {location} on {date}...")
return f"The weather in {location} is sunny with a temperature of 25°C."
# Create the AgentApp instance
agent_app = AgentApp(
app_name="LangGraphAgent",
app_description="A LangGraph-based research assistant",
)
class CustomAgentState(AgentState):
user_id: str
session_id: dict
# Initialize services as instance variables
@agent_app.init
async def init_func(self):
global global_short_term_memory
global global_long_term_memory
self.short_term_mem = InMemorySaver()
self.long_term_mem = InMemoryStore()
global_short_term_memory = self.short_term_mem
global_long_term_memory = self.long_term_mem
# Shutdown services, in this case,
# we don't use any resources, so we don't need to do anything here
@agent_app.shutdown
async def shutdown_func(self):
pass
@agent_app.query(framework="langgraph")
async def query_func(
self,
msgs,
request: AgentRequest = None,
**kwargs,
):
# Extract session information
session_id = request.session_id
user_id = request.user_id
print(f"Received query from user {user_id} with session {session_id}")
tools = [get_weather]
# Choose the LLM that will drive the agent
llm = ChatOpenAI(
model="qwen-plus",
api_key=os.environ.get("DASHSCOPE_API_KEY"),
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
namespace_for_long_term_memory = (user_id, "memories")
prompt = """You are a proactive research assistant. """
agent = create_agent(
llm,
tools,
system_prompt=prompt,
checkpointer=self.short_term_mem,
store=self.long_term_mem,
state_schema=CustomAgentState,
name="LangGraphAgent",
)
async for chunk, meta_data in agent.astream(
input={"messages": msgs, "session_id": session_id, "user_id": user_id},
stream_mode="messages",
config={"configurable": {"thread_id": session_id}},
):
is_last_chunk = (
True if getattr(chunk, "chunk_position", "") == "last" else False
)
if meta_data["langgraph_node"] == "tools":
memory_id = str(uuid.uuid4())
memory = {"lastest_tool_call": chunk.name}
global_long_term_memory.put(
namespace_for_long_term_memory,
memory_id,
memory,
)
yield chunk, is_last_chunk
@agent_app.endpoint("/api/memory/short-term/{session_id}", methods=["GET"])
async def get_short_term_memory(session_id: str):
if global_short_term_memory is None:
return {"error": "Short-term memory not initialized yet."}
config = {"configurable": {"thread_id": session_id}}
value = await global_short_term_memory.aget_tuple(config)
if value is None:
return {"error": "No memory found for session_id"}
return {
"session_id": session_id,
"messages": value.checkpoint["channel_values"]["messages"],
"metadata": value.metadata,
}
@agent_app.endpoint("/api/memory/short-term", methods=["GET"])
async def list_short_term_memory():
if global_short_term_memory is None:
return {"error": "Short-term memory not initialized yet."}
result = []
short_mems = list(global_short_term_memory.list(None))
for short_mem in short_mems:
ch_vals = short_mem.checkpoint["channel_values"]
# Ignore the __pregel_tasks field, which is not serializable
safe_dict = {
key: value
for key, value in ch_vals.items()
if key != "__pregel_tasks"
}
result.append(safe_dict)
return result
@agent_app.endpoint("/api/memory/long-term/{user_id}", methods=["GET"])
async def get_long_term_memory(user_id: str):
if global_short_term_memory is None:
return {"error": "Short-term memory not initialized yet."}
namespace_for_long_term_memory = (user_id, "memories")
long_term_mem = global_long_term_memory.search(
namespace_for_long_term_memory,
)
def serialize_search_item(item):
return {
"namespace": item.namespace,
"key": item.key,
"value": item.value,
"created_at": item.created_at,
"updated_at": item.updated_at,
"score": item.score,
}
serialized = [serialize_search_item(item) for item in long_term_mem]
return serialized
if __name__ == "__main__":
agent_app.run(host="127.0.0.1", port=8090)
⚙️ Prerequisites¶
Note
Before you begin, make sure you have installed the full version of AgentScope Runtime and configured the required API keys.
Install dependencies:
pip install "agentscope-runtime[ext]"
Set environment variables:
# Required: DashScope API key for Qwen model export DASHSCOPE_API_KEY="your-dashscope-api-key"
▶️ Running Examples¶
Tip
Make sure you have set all the required environment variables in the Prerequisites section before running these examples.
After starting the server, you can interact with the agent through the query interface and inspect memory states through the provided API endpoints.
Interacting with the Agent¶
Once the server is running, you can send queries to the agent using the /query endpoint:
curl -N \
-X POST "http://localhost:8090/process" \
-H "Content-Type: application/json" \
-d '{
"input": [
{
"role": "user",
"content": [
{ "type": "text", "text": "what is the capital of France?" }
]
}
],
"session_id": "sessionid123",
"user_id": "userid123"
}'
✨ Key Features Demonstration¶
LangGraph Integration¶
State management with
AgentStateWorkflow definition using
StateGraphCheckpointing for persistent state
Streaming responses for real-time interaction
Memory Management¶
Short-term memory for conversation history
Long-term memory for persistent storage
API endpoints to inspect memory states
Session-based memory isolation
Tool Integration¶
Custom tool definition using LangChain’s
@tooldecoratorTool calling and result handling
🌐 API Endpoints¶
Important
The following API endpoints are only available when running the advanced agent example.
When running the advanced agent example, the following API endpoints are available:
POST /process- Send queries to the agentGET /api/memory/short-term/{session_id}- Get short-term memory for a sessionGET /api/memory/short-term- List all short-term memoriesGET /api/memory/long-term/{user_id}- Get long-term memory for a user
🔧 Customization¶
You can customize these examples by:
Adding new tools: Define custom tools using the
@tooldecoratorChanging the LLM: Modify the
ChatOpenAIinitialization to use different modelsExtending the workflow: Add new nodes and edges to the state graph
Customizing memory: Implement different memory storage backends