Source code for agentscope_runtime.engine.deployers.utils.service_utils.routing.unified_routing_mixin
# -*- coding: utf-8 -*-
import inspect
import asyncio
import logging
import uuid
import time
from typing import Callable, Optional, List, Any, Dict
from fastapi.routing import APIRoute
from .task_engine_mixin import TaskEngineMixin
from .custom_endpoint_mixin import CustomEndpointMixin
logger = logging.getLogger(__name__)
[docs]
class UnifiedRoutingMixin(TaskEngineMixin, CustomEndpointMixin):
[docs]
def init_routing_manager(
self,
broker_url: Optional[str] = None,
backend_url: Optional[str] = None,
):
self.init_task_engine(broker_url, backend_url)
self._custom_endpoints: List[Dict[str, Any]] = []
[docs]
def task(self, path: str, queue: str = "celery"):
def decorator(func: Callable):
meta = {
"queue": queue,
"task_type": True,
"original_func": func,
}
if self.celery_app and not hasattr(func, "celery_task"):
func.celery_task = self.register_celery_task(func, queue)
@self.post(path, tags=["custom"])
async def task_endpoint(request: dict):
try:
task_id = str(uuid.uuid4())
if self.celery_app:
if len(inspect.signature(func).parameters) > 0:
result = self.submit_celery_task(func, request)
else:
result = self.submit_celery_task(func)
return {
"task_id": result.id,
"status": "submitted",
"queue": queue,
"message": f"Task {result.id} submitted to Celery "
f"queue {queue}",
}
else:
self.active_tasks[task_id] = {
"task_id": task_id,
"status": "submitted",
"queue": queue,
"submitted_at": time.time(),
"request": request,
}
asyncio.create_task(
self.execute_background_task(
task_id,
func,
request,
queue,
),
)
return {
"task_id": task_id,
"status": "submitted",
"queue": queue,
"message": f"Task {task_id} submitted to queue "
f"{queue}",
}
except Exception as e:
logger.exception("Task submission failed")
return {
"error": str(e),
"type": "task",
"queue": queue,
"status": "failed",
}
# Attach metadata to the actual FastAPI endpoint function
setattr(task_endpoint, "_task_meta", meta)
# Register GET route for task status polling
@self.get(f"{path}/{{task_id}}", tags=["custom"])
@UnifiedRoutingMixin.internal_route
async def task_status_endpoint(task_id: str):
if not task_id:
return {"error": "task_id required"}
return self.get_task_status(task_id)
return func
return decorator
[docs]
def endpoint(self, path: str, methods: Optional[List[str]] = None):
"""Decorator to register custom endpoints"""
if methods is None:
methods = ["POST"]
def decorator(func: Callable):
self.register_single_custom_endpoint(path, func, methods)
return func
return decorator
@property
def custom_endpoints(self) -> List[Dict[str, Any]]:
self.sync_routing_metadata()
return self._custom_endpoints
[docs]
def sync_routing_metadata(self):
"""
Synchronize and update routing metadata for discovery.
"""
# Define a blacklist of internal system paths
INTERNAL_PATHS = []
endpoint_path = getattr(self, "endpoint_path", None)
if endpoint_path:
INTERNAL_PATHS.append(endpoint_path)
# Clear existing metadata to ensure idempotency
self._custom_endpoints = []
for route in self.routes:
if not isinstance(route, APIRoute):
continue
handler = route.endpoint
if (
route.path in INTERNAL_PATHS
or UnifiedRoutingMixin.is_internal_route(handler)
):
continue
# Check if the route is an async task
task_meta = getattr(handler, "_task_meta", None)
current_tags = list(route.tags or [])
if task_meta:
# Extract task metadata
info = {
"path": route.path,
"handler": handler,
"methods": list(route.methods),
"module": getattr(
task_meta["original_func"],
"__module__",
None,
),
"function_name": getattr(
task_meta["original_func"],
"__name__",
None,
),
"queue": task_meta["queue"],
"task_type": True,
"original_func": task_meta["original_func"],
"tags": current_tags,
}
else:
# Extract endpoint metadata
info = {
"path": route.path,
"handler": handler,
"methods": list(route.methods),
"module": getattr(handler, "__module__", None),
"function_name": getattr(handler, "__name__", None),
"tags": current_tags,
}
if info not in self._custom_endpoints:
self._custom_endpoints.append(info)
[docs]
def restore_custom_endpoints(self, custom_endpoints: List[Dict[str, Any]]):
"""
Re-register all custom routes and tasks based on the provided metadata.
"""
self.sync_routing_metadata()
paths_to_delete = set()
for old_info in self._custom_endpoints:
path = old_info["path"]
paths_to_delete.add(path)
if old_info.get("task_type") is True:
status_path = f"{path}/{{task_id}}"
paths_to_delete.add(status_path)
if hasattr(self, "router") and self.router.routes:
self.router.routes = [
route
for route in self.router.routes
if not (
isinstance(route, APIRoute)
and route.path in paths_to_delete
)
]
self._custom_endpoints = []
for info in custom_endpoints:
path = info["path"]
methods = info["methods"]
tags = info.get("tags", [])
if isinstance(methods, (list, set)):
methods = [m for m in methods if m.upper() != "OPTIONS"]
if info.get("task_type") is True:
original_func = info["original_func"]
queue = info.get("queue", "celery")
self.task(path=path, queue=queue)(original_func)
else:
handler = info["handler"]
self.add_api_route(
path=path,
endpoint=handler,
methods=methods,
tags=tags,
)
self.sync_routing_metadata()
[docs]
@staticmethod
def internal_route(func: Callable) -> Callable:
"""
Decorator: mark a route function as an internal system route.
"""
setattr(func, "_is_system_route", True)
return func
[docs]
@staticmethod
def is_internal_route(handler: Callable) -> bool:
"""
Determine if a handler is marked as an internal system route.
"""
return getattr(handler, "_is_system_route", False)