Source code for agentscope_runtime.engine.tracing.asyncio_util
# -*- coding: utf-8 -*-
from typing import AsyncIterable, AsyncIterator, Tuple, TypeVar
T_co = TypeVar("T_co", covariant=True)
[docs]
async def aenumerate(
asequence: AsyncIterable[T_co],
start: int = 0,
) -> AsyncIterator[Tuple[int, T_co]]:
"""Asynchronously enumerate an async iterator from a given start value.
Args:
asequence (AsyncIterable[T_co]): The async iterable to enumerate.
start (int): The starting value for enumeration. Defaults to 0.
Yields:
Tuple[int, T_co]: A tuple containing the index and the item from the
async iterable.
"""
n = start
async for elem in asequence:
yield n, elem
n += 1