Simple Deployment¶
AgentApp is the all-in-one application service wrapper in AgentScope Runtime. It provides an HTTP service framework for your agent logic and exposes it as an API.
In the current version, AgentApp directly inherits from FastAPI, allowing it to maintain high flexibility while deeply integrating advanced features specific to Agent business logic. Its core features include:
Full FastAPI Ecosystem Compatibility: Supports native route registration (GET/POST, etc.), middleware extensions, and standard lifecycle management.
Streaming responses (SSE) for real-time output.
Task Interrupt Management: Provides a task interruption mechanism based on distributed backends (e.g., Redis), supporting precise control over long-running tasks.
Built-in health-check endpoints.
Optional Celery asynchronous task queues.
Deployment to local or remote targets.
Important:
In the current version, AgentApp does not automatically include a /process endpoint.
You must explicitly register a request handler using decorators (e.g., @app.query(...)) before your service can process incoming requests.
The sections below dive into each capability with concrete examples.
Initialization and Basic Run¶
What it does
Creates a minimal AgentApp instance and starts a FastAPI-based HTTP service skeleton.
In its initial state, the service only provides:
Welcome page
/Health check
/healthReadiness probe
/readinessLiveness probe
/liveness
Note:
By default, no
/processor other business endpoints are exposed.You must register at least one handler using decorators such as
@app.query(...)or@app.task(...)before the service can process requests.Handlers can be regular or async functions, and may support streaming output via async generators.
Example
from agentscope_runtime.engine import AgentApp
agent_app = AgentApp(
app_name="Friday",
app_description="A helpful assistant",
)
agent_app.run(host="127.0.0.1", port=8090)
A2A Extension Field Configuration¶
What it does
Extend the configuration of the agent’s A2A (Agent-to-Agent) protocol information and runtime-related fields through the a2a_config parameter.
Key parameter
a2a_config: Optional parameter, supportsAgentCardWithRuntimeConfigobject.
Configuration content
a2a_config supports configuring two types of fields:
AgentCard protocol fields: Passed through the
agent_cardfield, containing skills, transport protocols, input/output modes, etc.Runtime fields: Top-level fields, containing service registration and discovery (Registry), timeout settings, service endpoints, etc.
Example
from agentscope_runtime.engine import AgentApp
from agentscope_runtime.engine.deployers.adapter.a2a import (
AgentCardWithRuntimeConfig,
)
agent_app = AgentApp(
app_name="MyAgent",
app_description="My agent description",
a2a_config=AgentCardWithRuntimeConfig(
agent_card={
"name": "MyAgent",
"description": "My agent description",
"skills": [...], # Agent skills list
"default_input_modes": ["text"],
"default_output_modes": ["text"],
# ... other protocol fields
},
registry=[...], # Service registration and discovery
task_timeout=120, # Task timeout settings
# ... other configuration fields
),
)
Detailed documentation
For complete field descriptions, configuration methods, and usage examples, please refer to the A2A Registry - Service Registration and Discovery documentation.
Streaming Output (SSE)¶
Purpose
Stream partial outputs to clients in real time—perfect for chat, coding, or any incremental generation scenario.
Key Parameters
response_type="sse"stream=True
Client Example
curl -N \
-X POST "http://localhost:8090/process" \
-H "Content-Type: application/json" \
-d '{
"input": [
{ "role": "user", "content": [{ "type": "text", "text": "Hello Friday" }] }
]
}'
Response Format
data: {"sequence_number":0,"object":"response","status":"created", ... }
data: {"sequence_number":1,"object":"response","status":"in_progress", ... }
data: {"sequence_number":2,"object":"message","status":"in_progress", ... }
data: {"sequence_number":3,"object":"content","status":"in_progress","text":"Hello" }
data: {"sequence_number":4,"object":"content","status":"in_progress","text":" World!" }
data: {"sequence_number":5,"object":"message","status":"completed","text":"Hello World!" }
data: {"sequence_number":6,"object":"response","status":"completed", ... }
Lifecycle Management (Lifespan)¶
Purpose
Loading models, initializing database connections before the app starts, or releasing resources upon shutdown are common requirements for production environments.
Method 1: Pass Callables as Parameters (Simple Logic)¶
Key Parameters
before_start: invoked before the API server startsafter_finish: invoked when the server stops
async def init_resources(app, **kwargs):
print("🚀 Service launching, initializing resources...")
async def cleanup_resources(app, **kwargs):
print("🛑 Service stopping, cleaning up resources...")
app = AgentApp(
agent=agent,
before_start=init_resources,
after_finish=cleanup_resources
)
Method 2: Use Lifespan Functions (Recommended)¶
This is the modern approach recommended by AgentScope Runtime. Thanks to its inheritance from FastAPI, AgentApp supports standard lifespan management, which offers the following advantages:
Native FastAPI Experience — This method is identical to the standard FastAPI implementation. If you are familiar with FastAPI development, you can apply native programming patterns seamlessly, significantly reducing learning costs.
Structured Management — Startup and cleanup logic are concentrated in a single function, separated by
yield, making the logic more compact.State Sharing — Resources can be attached to
app.stateduring the startup phase and accessed viaapp.statethroughout the application lifecycle (including request handlers).Built-in Compatibility — Even with a custom
lifespan,AgentAppinternally coordinates the preparation of the Runner, the mounting of protocol adapters, and the lifecycle of interruption services.
from contextlib import asynccontextmanager
from fastapi import FastAPI
from agentscope.session import RedisSession
from agentscope_runtime.engine import AgentApp
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup Phase
import fakeredis
fake_redis = fakeredis.aioredis.FakeRedis(
decode_responses=True
)
# NOTE: This FakeRedis instance is for development/testing only.
# In production, replace it with your own Redis client/connection
# (e.g., aioredis.Redis).
app.state.session = RedisSession(
connection_pool=fake_redis.connection_pool
)
print("✅ Service initialized")
try:
# yield transfers control to AgentApp
yield
finally:
# Cleanup Phase
print("✅ Resources released")
# Pass the defined lifespan to AgentApp
app = AgentApp(
app_name="Friday",
app_description="A helpful assistant",
lifespan=lifespan,
)
Key Notes
Function Signature: The
lifespanfunction must accept aFastAPIinstance as a parameter and be decorated with@asynccontextmanager.Execution Order:
AgentApphandles scheduling internally. It first executes internal framework logic, then your definedlifespanstartup logic, and finally reverses the cleanup logic when the service stops.Deprecation Notice: Please note that the
@app.initand@app.shutdowndecorators from older versions are now deprecated. Please migrate to thelifespanpattern for better stability.
Health Check Endpoints¶
Purpose
Expose readiness probes automatically for containers or clusters.
Endpoints
GET /health: returns status and timestampGET /readiness: readiness probeGET /liveness: liveness probeGET /: welcome message
curl http://localhost:8090/health
curl http://localhost:8090/readiness
curl http://localhost:8090/liveness
curl http://localhost:8090/
Celery Asynchronous Task Queue (Optional)¶
Purpose
Offload long-running background tasks so HTTP handlers return immediately.
Key Parameters
broker_url="redis://localhost:6379/0"backend_url="redis://localhost:6379/0"
app = AgentApp(
agent=agent,
broker_url="redis://localhost:6379/0",
backend_url="redis://localhost:6379/0"
)
@app.task("/longjob", queue="celery")
def heavy_computation(data):
return {"result": data["x"] ** 2}
Submit a task:
curl -X POST http://localhost:8090/longjob -H "Content-Type: application/json" -d '{"x": 5}'
Response:
{"task_id": "abc123"}
Fetch the result:
curl http://localhost:8090/longjob/abc123
Custom Query Handling¶
Purpose
Use @app.query() to fully control request handling—ideal when you need custom state, multi-turn logic, or different frameworks.
Basic Usage¶
from agentscope_runtime.engine import AgentApp
from agentscope_runtime.engine.schemas.agent_schemas import AgentRequest
from agentscope.agent import ReActAgent
from agentscope.model import DashScopeChatModel
from agentscope.pipeline import stream_printing_messages
from agentscope.memory import InMemoryMemory
app = AgentApp(
app_name="Friday",
app_description="A helpful assistant",
lifespan=lifespan,
)
@app.query(framework="agentscope")
async def query_func(
self,
msgs,
request: AgentRequest = None,
**kwargs,
):
session_id = request.session_id
user_id = request.user_id
toolkit = Toolkit()
toolkit.register_tool_function(execute_python_code)
agent = ReActAgent(
name="Friday",
model=DashScopeChatModel(
"qwen-turbo",
api_key=os.getenv("DASHSCOPE_API_KEY"),
stream=True,
),
sys_prompt="You're a helpful assistant named Friday.",
toolkit=toolkit,
memory=InMemoryMemory(),
formatter=DashScopeChatFormatter(),
)
agent.set_console_output_enabled(enabled=False)
# Access session via app.state
await app.state.session.load_session_state(
session_id=session_id,
user_id=user_id,
agent=agent,
)
async for msg, last in stream_printing_messages(
agents=[agent],
coroutine_task=agent(msgs),
):
yield msg, last
await app.state.session.save_session_state(
session_id=session_id,
user_id=user_id,
agent=agent,
)
Key Characteristics¶
Framework Flexibility:
frameworkaccepts"agentscope","autogen","agno","langgraph", etc.Function Signature:
self: the Runner instance bound to the AgentApp.msgs: incoming messagesrequest:AgentRequestwithsession_id,user_id, etc.**kwargs: extend as needed
Streaming Friendly: Handlers can be async generators that yield
(msg, last)pairs.Stateful: Access
app.state.state_serviceto load/store custom state.Session Memory: Use
app.state.session_serviceto keep chat history per user/session.
Comparison with the V0 versionagent Parameter Approach¶
Feature |
Pre-built |
Custom |
|---|---|---|
Flexibility |
Lower—uses a provided agent implementation |
Full control over every step |
State Management |
Automatic |
Manual but far more customizable |
Suitable Scenarios |
Simple, quick setups |
Complex workflows needing fine-grained control |
Multi-framework Support |
Limited |
Plug in any supported framework |
Custom Endpoint Definition¶
You can extend the functional interfaces of AgentApp in two ways. Since AgentApp directly inherits from FastAPI, it not only retains the native flexibility of a Web framework but also provides enhanced tools optimized for Agent business scenarios (such as streaming output and object serialization).
1. Native FastAPI Routes¶
This is the most flexible approach. You can use standard FastAPI decorators (such as @app.get, @app.post, etc.) to define any business interface.
Use Cases:
When you need full control over the
Responseobject, status codes, or headers.Defining simple Web console interfaces or monitoring interfaces outside of health checks.
Example:
app = AgentApp()
@app.get("/info")
async def get_info():
"""Interface defined using native FastAPI"""
return {
"app name": app.app_name,
"app description": app.app_description,
"custom_metadata": "v1.0.0"
}
@app.post("/update_config")
async def update_config(data: dict):
"""Standard POST request handling"""
# Your business logic
return {"status": "updated"}
Client calls:
curl -X GET http://localhost:8090/info
curl -X POST http://localhost:8090/update_config \
-H "Content-Type: application/json" \
-d '{"config_key": "max_tokens", "value": 512}'
2. @app.endpoint Convenience Decorator¶
AgentApp provides a specialized @app.endpoint decorator. It wraps FastAPI’s route registration under the hood, specifically optimized for Agent response scenarios.
Core Advantages:
Multiple return modes— Supports:
Regular sync/async functions returning JSON
Generators (sync or async) returning streaming data over SSE
Automatic parameter parsing— Endpoints can accept:
URL query parameters
JSON bodies mapped to Pydantic models
fastapi.RequestobjectsAgentRequestobjects (convenient for accessing unified session/user info)
Error handling — Exceptions raised in streaming generators are automatically wrapped into SSE error events and sent to the client.
Example:
app = AgentApp()
@app.endpoint("/hello")
def hello_endpoint():
return {"msg": "Hello world"}
@app.endpoint("/stream_numbers")
async def stream_numbers():
for i in range(5):
yield f"number: {i}\n"
Client calls:
curl -X POST http://localhost:8090/hello
curl -N -X POST http://localhost:8090/stream_numbers
Differences and Selection¶
Feature |
Native FastAPI ( |
Convenience Decorator ( |
|---|---|---|
Streaming |
Requires manual |
Automatically identifies generators and converts to SSE |
Serialization |
Relies on FastAPI’s built-in serialization |
Enhanced deep serialization (supports complex types) |
Error Handling |
Requires manual Middleware or Exception Handlers |
Provides automatic error encapsulation for streaming |
Flexibility |
Very High, supports all native configurations |
High, focuses on Agent response standards |
Recommendation:
If your interface needs to return Agent reasoning processes or streaming data, prioritize
@app.endpoint.If your interface follows standard Web business logic (e.g., config management, status queries), using native FastAPI is suggested.
Task Interrupt Management¶
In long-chain reasoning or complex Agent interaction scenarios, users may need to stop a running task mid-way. Currently, AgentApp leverages interruption backends (e.g., Redis) to provide precise control over task status, including:
Distributed Support: Via a Redis backend, interrupt signals can be sent from any node in a cluster to stop a task running on another node.
Status Synchronization: Automatically manages task states (RUNNING, STOPPED, FINISHED, ERROR) to prevent concurrency conflicts within the same Session.
Graceful Cancellation: Utilizes Python’s
asynciocancellation mechanism, allowing developers to execute cleanup logic (such as saving the Agent’s current state) after catchingCancelledError.
Configuring the Interrupt Backend¶
AgentApp supports three backend configurations:
Local Mode (Default): If no config is provided, it uses
LocalInterruptBackend, suitable for single-machine runs.Redis Mode (Recommended): Configured via
interrupt_redis_url, suitable for distributed deployments.Custom Backend: Pass an instance inherited from
BaseInterruptBackendvia theinterrupt_backendparameter.
Example:
app = AgentApp(
app_name="InterruptibleAgent",
# Enable distributed interrupt support
interrupt_redis_url="redis://localhost",
)
Writing Interrupt-Aware Handlers¶
In a handler decorated by @app.query, when an external interrupt is triggered, the executing coroutine will raise an asyncio.CancelledError. Developers should catch this exception to implement state saving and other features.
Note: When catching an interrupt signal in your handler, it is essential to manually call agent.interrupt(). This ensures that underlying model calls or complex loops are correctly truncated. While AgentApp cancels the outer asynchronous task flow, the underlying Agent might still be performing a blocking call or complex loop. Calling agent.interrupt() is a best practice to ensure the reasoning chain is gracefully stopped and accurate state data is generated for subsequent recovery.
Example Usage
@agent_app.query(framework="agentscope")
async def query_func(
self,
msgs,
request: AgentRequest = None,
**kwargs
):
# Prepare Agent
agent = ReActAgent(name="Friday", ...)
# Load historical state (useful for recovery after interruption)
await agent_app.state.session.load_session_state(
session_id=request.session_id,
user_id=request.user_id,
agent=agent,
)
try:
# AgentApp injects interrupt listening logic outside this generator
# When agent_app.stop_chat is called, a CancelledError will be raised here
async for msg, last in stream_printing_messages(...):
yield msg, last
except asyncio.CancelledError:
# Core logic: Respond to interrupt signal
print(f"Detected task {request.session_id} manually interrupted")
# Important: Manually stop the underlying Agent task
await agent.interrupt()
# Must re-raise the exception to let the system mark status as STOPPED
raise
finally:
# Save agent state whether task was interrupted or finished normally
await agent_app.state.session.save_session_state(
session_id=request.session_id,
user_id=request.user_id,
agent=agent,
)
Triggering Interrupt Signals¶
You can define a custom route and call the agent_app.stop_chat method within it to trigger an interrupt.
Example:
@agent_app.post("/stop")
async def stop_task(request: AgentRequest):
# Send interrupt signal to specific user_id and session_id
await agent_app.stop_chat(
user_id=request.user_id,
session_id=request.session_id
)
return {"status": "ok"}
Execution:
Users simply send a request containing the user_id and session_id to the /stop endpoint to cancel the corresponding running task:
curl -X POST "http://localhost:8090/stop" \
-H "Content-Type: application/json" \
-d '{
"input": [],
"session_id": "Target Session ID",
"user_id": "Target User ID"
}'
Full Example: AgentApp with State Management and Interruption¶
The following example demonstrates how to integrate the above features to build an Agent service with state recovery and task interruption capabilities.
Complete Code¶
import asyncio
import os
from contextlib import asynccontextmanager
from fastapi import FastAPI
from agentscope.agent import ReActAgent
from agentscope.model import DashScopeChatModel
from agentscope.formatter import DashScopeChatFormatter
from agentscope.tool import Toolkit, execute_python_code
from agentscope.pipeline import stream_printing_messages
from agentscope.memory import InMemoryMemory
from agentscope.session import RedisSession
from agentscope_runtime.engine import AgentApp
from agentscope_runtime.engine.schemas.agent_schemas import AgentRequest
# 1. Define Lifecycle
@asynccontextmanager
async def lifespan(app: FastAPI):
import fakeredis
fake_redis = fakeredis.aioredis.FakeRedis(
decode_responses=True
)
# NOTE: This FakeRedis instance is for development/testing only.
# In production, replace it with your own Redis client/connection
# (e.g., aioredis.Redis)
app.state.session = RedisSession(
connection_pool=fake_redis.connection_pool
)
try:
yield
finally:
print("AgentApp is shutting down...")
# 2. Create AgentApp
agent_app = AgentApp(
app_name="Friday",
app_description="A helpful assistant",
lifespan=lifespan,
# Note: Using local interrupt backend as no redis url is provided.
# To support distributed interrupts, add:
# interrupt_redis_url="redis://localhost",
)
# 3. Define Request Handling Logic
@agent_app.query(framework="agentscope")
async def query_func(
self,
msgs,
request: AgentRequest = None,
**kwargs,
):
session_id = request.session_id
user_id = request.user_id
toolkit = Toolkit()
toolkit.register_tool_function(execute_python_code)
agent = ReActAgent(
name="Friday",
model=DashScopeChatModel(
"qwen-turbo",
api_key=os.getenv("DASHSCOPE_API_KEY"),
stream=True,
),
sys_prompt="You're a helpful assistant named Friday.",
toolkit=toolkit,
memory=InMemoryMemory(),
formatter=DashScopeChatFormatter(),
)
agent.set_console_output_enabled(enabled=True)
# Load agent state
await agent_app.state.session.load_session_state(
session_id=session_id,
user_id=user_id,
agent=agent,
)
try:
async for msg, last in stream_printing_messages(
agents=[agent],
coroutine_task=agent(msgs),
):
yield msg, last
except asyncio.CancelledError:
# Interruption logic
print(f"Task {session_id} was manually interrupted.")
# Manually interrupt the agent to fully stop underlying execution
await agent.interrupt()
# Re-raise to mark status as STOPPED
raise
finally:
# Save agent state
await agent_app.state.session.save_session_state(
session_id=session_id,
user_id=user_id,
agent=agent,
)
# 4. Register Interrupt Route
@agent_app.post("/stop")
async def stop_task(request: AgentRequest):
await agent_app.stop_chat(
user_id=request.user_id,
session_id=request.session_id,
)
return {
"status": "success",
"message": "Interrupt signal broadcasted.",
}
# 5. Run Application
agent_app.run(host="127.0.0.1", port=8090)
Interruption Feature Test Example¶
To test the interruption feature, you can use two terminal windows: one to start a long-running task and another to send the interrupt signal.
1. Start a long-running task
In the first terminal, send a complex request (e.g., asking the Agent to write a long story) and specify session_id and user_id. Using the -N parameter will allow you to see the streaming output in real-time.
# Terminal 1: Initiate inference request
curl -N \
-X POST "http://localhost:8090/process" \
-H "Content-Type: application/json" \
-d '{
"input": [
{
"role": "user",
"content": [
{
"type": "text",
"text": "Please write a very long and detailed story about a robot named Friday exploring a distant galaxy."
}
]
}
],
"session_id": "ss-123",
"user_id": "uu-123"
}'
2. Send an interrupt signal
While the task is running (Terminal 1 is still printing), open a second terminal and send the interrupt request. Note: the session_id and user_id must match the previous request.
# Terminal 2: Trigger interrupt
curl -X POST "http://localhost:8090/stop" \
-H "Content-Type: application/json" \
-d '{
"input": [],
"session_id": "ss-123",
"user_id": "uu-123"
}'
Expected Result
Terminal 2: Will immediately return
{"status": "success", "message": "Interrupt signal broadcasted."}.Terminal 1: Streaming output will stop immediately, and the HTTP connection will close. If you caught
asyncio.CancelledError, you will see your custom log (e.g., “Task ss-123 was manually interrupted.”) in the server logs.
Deploy Locally or Remotely¶
Use the unified deploy() method to ship the same app to different environments:
from agentscope_runtime.engine.deployers import LocalDeployManager
await app.deploy(LocalDeployManager(host="0.0.0.0", port=8091))
See Advanced Deployment Guide for additional deployers (Kubernetes, ModelStudio, AgentRun, etc.) and more configuration tips.
AgentScope Runtime provides serverless deployment options, including deploying agents to ModelStudio(FC) and AgentRun. See Advanced Deployment Guide for more configuration details about ModelStudio and AgentRun.