Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion backend/domain/workflow/internal/compose/test/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,12 +362,22 @@ func TestBatch(t *testing.T) {
type mockRepo struct {
workflow.Repository
mu sync.Mutex
nextID int64
events []*entity.InterruptEvent
cp map[string][]byte
}

func (m *mockRepo) GenID(ctx context.Context) (int64, error) {
return 10001, nil
m.mu.Lock()
defer m.mu.Unlock()
m.nextID++
return m.nextID, nil
}

func (m *mockRepo) getNextID() int64 {
m.mu.Lock()
defer m.mu.Unlock()
return m.nextID
}

func (m *mockRepo) ListInterruptEvents(ctx context.Context, wfExeID int64) ([]*entity.InterruptEvent, error) {
Expand Down
239 changes: 239 additions & 0 deletions backend/domain/workflow/internal/compose/test/loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,26 @@ package test
import (
"context"
"fmt"
"sync/atomic"
"testing"

"github.com/bytedance/mockey"
"github.com/cloudwego/eino/compose"
"github.com/stretchr/testify/assert"

model "github.com/coze-dev/coze-studio/backend/crossdomain/workflow/model"
"github.com/coze-dev/coze-studio/backend/domain/workflow"
"github.com/coze-dev/coze-studio/backend/domain/workflow/entity"
"github.com/coze-dev/coze-studio/backend/domain/workflow/entity/vo"
compose2 "github.com/coze-dev/coze-studio/backend/domain/workflow/internal/compose"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/execute"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/entry"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/exit"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/loop"
_break "github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/loop/break"
_continue "github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/loop/continue"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/subworkflow"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/variableassigner"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/schema"
"github.com/coze-dev/coze-studio/backend/pkg/lang/ptr"
Expand Down Expand Up @@ -729,3 +734,237 @@ func TestLoop_Interrupt(t *testing.T) {
// 1 from running index 1 fresh
assert.Equal(t, 3, callCount)
}

func TestLoop_SubWorkflow_Nested_Interrupt(t *testing.T) {
ctx := context.Background()

var callCount atomic.Int64

lambdaNode := &schema.NodeSchema{
Key: "lambda",
Type: entity.NodeTypeLambda,
Configs: &interruptibleConfig{},
Lambda: compose.InvokableLambda(func(ctx context.Context, in map[string]any) (map[string]any, error) {
n := callCount.Add(1)
t.Logf("lambda invoked (call #%d)", n)

if n == 1 {
interruptEvent := &entity.InterruptEvent{
ID: n,
NodeKey: "lambda",
EventType: entity.InterruptEventInput,
InterruptData: "{}",
}
return nil, compose.NewInterruptAndRerunErr(interruptEvent)
}
return map[string]any{"output": "done"}, nil
}, compose.WithLambdaType(string(entity.NodeTypeLambda))),
}

innerSubWfSchema := &schema.WorkflowSchema{
Nodes: []*schema.NodeSchema{
{Key: entity.EntryNodeKey, Type: entity.NodeTypeEntry, Configs: &entry.Config{}},
lambdaNode,
{Key: entity.ExitNodeKey, Type: entity.NodeTypeExit, Configs: &exit.Config{TerminatePlan: vo.ReturnVariables},
InputSources: []*vo.FieldInfo{
{
Path: compose.FieldPath{"output"},
Source: vo.FieldSource{
Ref: &vo.Reference{FromNodeKey: "lambda", FromPath: compose.FieldPath{"output"}},
},
},
},
},
},
Connections: []*schema.Connection{
{FromNode: entity.EntryNodeKey, ToNode: "lambda"},
{FromNode: "lambda", ToNode: entity.ExitNodeKey},
},
}
innerSubWfSchema.Init()

innerSubWfNode := &schema.NodeSchema{
Key: "inner_sub_wf",
Type: entity.NodeTypeSubWorkflow,
Configs: &subworkflow.Config{WorkflowID: 200},
SubWorkflowSchema: innerSubWfSchema,
SubWorkflowBasic: &entity.WorkflowBasic{ID: 200, Version: "1"},
OutputSources: []*vo.FieldInfo{
{
Path: compose.FieldPath{"output"},
Source: vo.FieldSource{
Ref: &vo.Reference{FromNodeKey: entity.ExitNodeKey, FromPath: compose.FieldPath{"output"}},
},
},
},
}

outerSubWfSchema := &schema.WorkflowSchema{
Nodes: []*schema.NodeSchema{
{Key: entity.EntryNodeKey, Type: entity.NodeTypeEntry, Configs: &entry.Config{}},
innerSubWfNode,
{Key: entity.ExitNodeKey, Type: entity.NodeTypeExit, Configs: &exit.Config{TerminatePlan: vo.ReturnVariables},
InputSources: []*vo.FieldInfo{
{
Path: compose.FieldPath{"output"},
Source: vo.FieldSource{
Ref: &vo.Reference{FromNodeKey: "inner_sub_wf", FromPath: compose.FieldPath{"output"}},
},
},
},
},
},
Connections: []*schema.Connection{
{FromNode: entity.EntryNodeKey, ToNode: "inner_sub_wf"},
{FromNode: "inner_sub_wf", ToNode: entity.ExitNodeKey},
},
}
outerSubWfSchema.Init()

outerSubWfNode := &schema.NodeSchema{
Key: "outer_sub_wf",
Type: entity.NodeTypeSubWorkflow,
Configs: &subworkflow.Config{WorkflowID: 100},
SubWorkflowSchema: outerSubWfSchema,
SubWorkflowBasic: &entity.WorkflowBasic{ID: 100, Version: "1"},
OutputSources: []*vo.FieldInfo{
{
Path: compose.FieldPath{"output"},
Source: vo.FieldSource{
Ref: &vo.Reference{FromNodeKey: entity.ExitNodeKey, FromPath: compose.FieldPath{"output"}},
},
},
},
}

continueNode := &schema.NodeSchema{
Key: "continueNode",
Type: entity.NodeTypeContinue,
Configs: &_continue.Config{},
}

loopNode := &schema.NodeSchema{
Key: "loop_node",
Type: entity.NodeTypeLoop,
Configs: &loop.Config{
LoopType: loop.ByIteration,
},
InputSources: []*vo.FieldInfo{
{
Path: compose.FieldPath{loop.Count},
Source: vo.FieldSource{Ref: &vo.Reference{FromNodeKey: entity.EntryNodeKey, FromPath: compose.FieldPath{"count"}}},
},
},
OutputSources: []*vo.FieldInfo{
{
Path: compose.FieldPath{"loop_output"},
Source: vo.FieldSource{
Ref: &vo.Reference{FromNodeKey: "outer_sub_wf", FromPath: compose.FieldPath{"output"}},
},
},
},
}

ws := &schema.WorkflowSchema{
Nodes: []*schema.NodeSchema{
{Key: entity.EntryNodeKey, Type: entity.NodeTypeEntry, Configs: &entry.Config{}},
loopNode,
outerSubWfNode,
continueNode,
{Key: entity.ExitNodeKey, Type: entity.NodeTypeExit, Configs: &exit.Config{TerminatePlan: vo.ReturnVariables},
InputSources: []*vo.FieldInfo{
{
Path: compose.FieldPath{"loop_output"},
Source: vo.FieldSource{
Ref: &vo.Reference{FromNodeKey: "loop_node", FromPath: compose.FieldPath{"loop_output"}},
},
},
},
},
},
Hierarchy: map[vo.NodeKey]vo.NodeKey{
"outer_sub_wf": "loop_node",
"continueNode": "loop_node",
},
Connections: []*schema.Connection{
{FromNode: entity.EntryNodeKey, ToNode: "loop_node"},
{FromNode: "loop_node", ToNode: "outer_sub_wf"},
{FromNode: "outer_sub_wf", ToNode: "continueNode"},
{FromNode: "continueNode", ToNode: "loop_node"},
{FromNode: "loop_node", ToNode: entity.ExitNodeKey},
},
}
ws.Init()

basic := &entity.WorkflowBasic{ID: 1, Version: "1"}

myRepo := &mockRepo{}
mockPatch := mockey.Mock(workflow.GetRepository).To(func() workflow.Repository {
return myRepo
}).Build()
defer mockPatch.UnPatch()

initialRunner := compose2.NewWorkflowRunner(basic, ws, model.ExecuteConfig{})
initialCtx, executeID, opts, _, err := initialRunner.Prepare(ctx)
assert.NoError(t, err)

wf, err := compose2.NewWorkflow(initialCtx, ws, compose2.WithIDAsName(basic.ID))
assert.NoError(t, err)

_, err = wf.Runner.Invoke(initialCtx, map[string]any{
"count": int64(1),
}, opts...)

assert.Error(t, err)
info, existed := compose.ExtractInterruptInfo(err)
assert.True(t, existed)
assert.NotNil(t, info)
assert.Equal(t, int64(1), callCount.Load(),
"Lambda should have been called exactly once (loop has 1 item, interrupted on first call)")

repo := workflow.GetRepository()
event0, found, _ := repo.GetFirstInterruptEvent(ctx, executeID)
assert.True(t, found)
assert.NotNil(t, event0)
if event0 == nil {
t.Fatal("interrupt event is nil, cannot proceed with resume")
}

t.Logf("Interrupt event NodePath: %v", event0.NodePath)

resumeRunner := compose2.NewWorkflowRunner(basic, ws, model.ExecuteConfig{},
compose2.WithResumeReq(&entity.ResumeRequest{
ExecuteID: executeID,
EventID: event0.ID,
ResumeData: "resumed",
}))

resumeCtx, _, resumeOpts, _, err := resumeRunner.Prepare(ctx)
assert.NoError(t, err)

var wrongPrepareSubExeCtxCalled atomic.Bool
var prepareSubExePatch *mockey.Mocker
prepareSubExePatch = mockey.Mock(execute.PrepareSubExeCtx).To(
func(ctx context.Context, wb *entity.WorkflowBasic, requireCheckpoint bool) (context.Context, error) {
if wb != nil && wb.ID == 200 {
wrongPrepareSubExeCtxCalled.Store(true)
t.Logf("BUG: PrepareSubExeCtx called for inner_sub_wf (ID=200) during resume — this generates a new sub-execute-ID")
}
prepareSubExePatch.UnPatch()
defer prepareSubExePatch.Patch()
return execute.PrepareSubExeCtx(ctx, wb, requireCheckpoint)
}).Build()
defer prepareSubExePatch.UnPatch()

_, err = wf.Runner.Invoke(resumeCtx, map[string]any{
"count": int64(1),
}, resumeOpts...)

assert.NoError(t, err)

assert.Equal(t, int64(2), callCount.Load(),
"Lambda should have been called exactly twice: once for initial interrupt, once for resume")

assert.False(t, wrongPrepareSubExeCtxCalled.Load(),
"PrepareSubExeCtx should NOT be called for inner_sub_wf (ID=200) during resume — it should use restoreWorkflowCtx instead")
}
6 changes: 3 additions & 3 deletions backend/domain/workflow/internal/execute/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,10 +345,10 @@ func PrepareNodeExeCtx(ctx context.Context, nodeKey vo.NodeKey, nodeName string,
if c.NodeCtx == nil { // node within top level workflow, also not under composite node
newC.NodeCtx.NodePath = []string{string(nodeKey)}
} else {
if c.BatchInfo == nil {
newC.NodeCtx.NodePath = append(c.NodeCtx.NodePath, string(nodeKey))
} else {
if c.BatchInfo != nil && c.BatchInfo.CompositeNodeKey == c.NodeCtx.NodeKey {
newC.NodeCtx.NodePath = append(c.NodeCtx.NodePath, InterruptEventIndexPrefix+strconv.Itoa(c.BatchInfo.Index), string(nodeKey))
} else {
newC.NodeCtx.NodePath = append(c.NodeCtx.NodePath, string(nodeKey))
}
}

Expand Down
Loading