Source code for agentscope_runtime.engine.app.base_app

# -*- coding: utf-8 -*-
import inspect
import logging
from typing import Callable, Optional

from fastapi import Request

from .celery_mixin import CeleryMixin

logger = logging.getLogger(__name__)


[docs] class BaseApp(CeleryMixin): """ BaseApp integrates Celery for asynchronous background task execution, and provides FastAPI-like routing for task endpoints. """
[docs] def __init__( self, broker_url: Optional[str] = None, backend_url: Optional[str] = None, ): # Initialize CeleryMixin CeleryMixin.__init__(self, broker_url, backend_url)
[docs] def task(self, path: str, queue: str = "celery"): """ Register an asynchronous task endpoint. POST <path> -> Create a task and return task ID GET <path>/{task_id} -> Check the task status and result Combines Celery and FastAPI routing functionality. """ if self.celery_app is None: raise RuntimeError( f"[AgentApp] Cannot register task endpoint '{path}'.\n" f"Reason: The @task decorator requires a background task " f"queue to run asynchronous jobs.\n\n" "If you want to use async task queue, you must initialize " "AgentApp with broker_url and backend_url, e.g.: \n\n" " app = AgentApp(\n" " broker_url='redis://localhost:6379/0',\n" " backend_url='redis://localhost:6379/0'\n" " )\n", ) def decorator(func: Callable): # Register Celery task using CeleryMixin celery_task = self.register_celery_task(func, queue=queue) # Add FastAPI HTTP routes @self.post(path) async def create_task(request: Request): if len(inspect.signature(func).parameters) > 0: body = await request.json() task = celery_task.delay(body) else: task = celery_task.delay() return {"task_id": task.id} @self.get(path + "/{task_id}") async def get_task(task_id: str): return self.get_task_status(task_id) return func return decorator