Skip to content

Feature: Native AsyncTaskEngine for async workflow execution #228

@FernandoCelmer

Description

@FernandoCelmer

Summary

Add an AsyncTaskEngine that runs within an event loop, enabling native async workflow execution without the sync-to-async bridge overhead.

Motivation

After the fix in #211, async tasks work correctly but still create/destroy event loops per task via asyncio.run(). This prevents sharing async resources (connections, sessions, semaphores) across tasks and adds overhead.

Current architecture

TaskEngine (sync)
  └── execute_with_retry()
       └── Action._call_func()
            └── asyncio.run() or ThreadPoolExecutor  ← bridge overhead
                 └── user async function

Proposed architecture

AsyncTaskEngine (async)
  └── async execute_with_retry()
       └── await task.step()  ← native, no bridge
            └── user async function

Design

class AsyncTaskEngine:
    """Async counterpart of TaskEngine for native async workflows."""

    def __init__(self, task, workflow_id, previous_context=None):
        self.task = task
        self.workflow_id = workflow_id
        self.previous_context = previous_context

    @asynccontextmanager
    async def start(self):
        self.task.status = TypeStatus.IN_PROGRESS
        self._start_time = datetime.now()
        try:
            yield self
        except Exception as err:
            self.task.errors = err
            self.task.status = TypeStatus.FAILED
        else:
            self.task.status = TypeStatus.COMPLETED
        finally:
            self.task.duration = (datetime.now() - self._start_time).total_seconds()
            self.task.config.tracer.end_task(task=self.task)

    async def execute(self):
        result = await self.task.step(
            initial_context=self.task.initial_context,
            previous_context=self.task.previous_context,
            task=self.task,
        )
        self.task.current_context = result
        return result

Async strategy

class AsyncSequential(Flow):
    async def run(self):
        for task in self.tasks:
            engine = AsyncTaskEngine(task=task, ...)
            async with engine.start():
                await engine.execute()

Scope

  • Create dotflow/core/async_engine.py with AsyncTaskEngine
  • Create async execution strategies (AsyncSequential, etc.)
  • Opt-in: users choose async mode via mode="async_sequential"
  • Shared event loop across all tasks in a workflow
  • Async retry with asyncio.sleep instead of time.sleep
  • Tests for async engine
  • Documentation

Notes

  • Opt-in feature — does not replace the sync TaskEngine
  • The @action decorator continues to work for both sync and async functions
  • The sync TaskEngine remains the default

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or requestfuturePlanned for future releases

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions