Skip to content

Enhancement: Pass previous_context between task groups #248

@FernandoCelmer

Description

@FernandoCelmer

Description

When tasks are organized into different groups, the previous_context is not passed between groups. Each group starts execution with previous_context = None, which breaks workflows that depend on data flowing from one stage to the next.

This is a design limitation that forces users to either abandon group separation or manually wire context between groups.

Current Behavior

from dotflow import DotFlow, action

@action
def fetch_data(initial_context):
    return {"users": [{"id": 1}, {"id": 2}]}

@action
def process_data(previous_context):
    # previous_context.storage is None here!
    data = previous_context.storage  # TypeError: NoneType
    return {"total": len(data["users"])}

workflow = DotFlow()
workflow.task.add(step=fetch_data, group_name="extract")
workflow.task.add(step=process_data, group_name="transform")
workflow.start()
# process_data fails because previous_context is None

What the user expects

process_data should receive the output of fetch_data via previous_context.storage.

What actually happens

Each group resets previous_context = None. The data from fetch_data is lost when process_data runs.

Workarounds

Option 1: Single pipeline (no groups)

workflow.task.add(step=[fetch_data, process_data], initial_context=data)
workflow.start()

Works but loses the logical separation of groups.

Option 2: Manual wiring with workflow.result()

workflow.task.add(step=fetch_data, initial_context=data, group_name="extract")
workflow.start()

results = workflow.result()
workflow.task.clear()

workflow.task.add(step=process_data, initial_context=results, group_name="transform")
workflow.start()

Works but requires boilerplate and multiple start() calls.

Proposal

Add an option to chain groups so the last task's current_context from group N automatically becomes the previous_context for the first task in group N+1.

Possible API

# Option A: workflow-level flag
workflow.start(chain_groups=True)

# Option B: explicit group ordering
workflow.task.add(step=fetch_data, group_name="extract", group_order=1)
workflow.task.add(step=process_data, group_name="transform", group_order=2)

Real-world use case

In the server_flow example, we have:

  1. fetch_data group — async HTTP calls to multiple endpoints
  2. result group — aggregate responses and generate a report

These are logically separate stages but need data to flow from fetch → aggregate → report. Today this forces either a single pipeline or manual result() + clear() + add() wiring.

Impact

This affects any workflow that:

  • Uses groups for logical organization (ETL stages, microservice orchestration, etc.)
  • Needs data to flow between stages
  • Wants clean separation of concerns without boilerplate

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or requestfuturePlanned for future releasessignificantSignificant severity

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions