Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -432,13 +432,13 @@ Fork("parallel_branch", func(branch1 *floxy.Builder) {
JoinStep("join", []string{"branch1_step1", "branch2_step1"}, floxy.JoinStrategyAll)
```

**SOLVED:** Avoid using `JoinStep` with `Condition`, use `Join` instead that dynamically creates waitFor list (virtual steps conception used).
**SOLVED**: Avoid using `JoinStep` with `Condition`, use `Join` instead that dynamically creates waitFor list (virtual steps conception used).

See `examples/condition/main.go` for a demonstration of this issue.

### Rollback for nested Fork/Join branches

When using nested `Fork` branches, rollback is not supported.
**SOLVED**: Nested `Fork`/`Join` branches are fully supported for rollback. The engine uses a `visited` map to prevent double rollback of steps that may be reached through multiple paths in nested fork/join structures.

## Installation

Expand Down
162 changes: 145 additions & 17 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2302,9 +2302,14 @@ func (engine *Engine) rollbackStepsToSavePoint(
stepMap[failedStep.StepName] = failedStep
}

return engine.rollbackStepChain(ctx, instanceID, failedStep.StepName, savePointName, def, stepMap, false, 0)
// Use visited map to prevent double rollback of steps in nested fork/join structures
visited := make(map[string]bool)

return engine.rollbackStepChainWithVisited(ctx, instanceID, failedStep.StepName, savePointName, def, stepMap, visited, false, 0)
}

// rollbackStepChain is a wrapper for backward compatibility that creates a visited map
// and delegates to rollbackStepChainWithVisited.
func (engine *Engine) rollbackStepChain(
ctx context.Context,
instanceID int64,
Expand All @@ -2313,6 +2318,24 @@ func (engine *Engine) rollbackStepChain(
stepMap map[string]*WorkflowStep,
isParallel bool,
depth int,
) error {
visited := make(map[string]bool)

return engine.rollbackStepChainWithVisited(ctx, instanceID, currentStep, savePointName, def, stepMap, visited, isParallel, depth)
}

// rollbackStepChainWithVisited performs rollback of a step chain with support for nested fork/join.
// The visited map prevents double rollback of steps that may be reached through multiple paths
// in nested fork/join structures.
func (engine *Engine) rollbackStepChainWithVisited(
ctx context.Context,
instanceID int64,
currentStep, savePointName string,
def *WorkflowDefinition,
stepMap map[string]*WorkflowStep,
visited map[string]bool,
isParallel bool,
depth int,
) error {
// PLUGIN HOOK: OnRollbackStepChain
if engine.pluginManager != nil {
Expand All @@ -2321,6 +2344,12 @@ func (engine *Engine) rollbackStepChain(
}
}

// Prevent cycles and double rollback of already processed steps
if visited[currentStep] {
return nil
}
visited[currentStep] = true

if currentStep == savePointName {
return nil // Reached save point
}
Expand All @@ -2330,61 +2359,67 @@ func (engine *Engine) rollbackStepChain(
return fmt.Errorf("step definition not found: %s", currentStep)
}

// First, traverse to the end of the chain (depth-first)
// Handle parallel steps (fork branches)
// Handle Fork/Parallel steps: traverse all parallel branches first (depth-first)
if stepDef.Type == StepTypeFork || stepDef.Type == StepTypeParallel {
for _, parallelStepName := range stepDef.Parallel {
if err := engine.rollbackStepChain(ctx, instanceID, parallelStepName, savePointName, def, stepMap, true, depth+1); err != nil {
if err := engine.rollbackStepChainWithVisited(ctx, instanceID, parallelStepName, savePointName, def, stepMap, visited, true, depth+1); err != nil {
return err
}
}
}

// For parallel branches, traverse all subsequent steps in the chain
if isParallel {
// For condition steps, we need to determine which branch was executed
if stepDef.Type == StepTypeCondition {
// Check if the condition step was executed and determine which branch was taken
// For condition steps, determine which branch was executed
if step, exists := stepMap[currentStep]; exists && step.Status == StepStatusCompleted {
// Determine which branch was executed by checking which subsequent steps exist
executedBranch := engine.determineExecutedBranch(stepDef, stepMap)

if executedBranch == "next" {
// Rollback the Next branch
for _, nextStepName := range stepDef.Next {
if err := engine.rollbackStepChain(ctx, instanceID, nextStepName, savePointName, def, stepMap, true, depth+1); err != nil {
if err := engine.rollbackStepChainWithVisited(ctx, instanceID, nextStepName, savePointName, def, stepMap, visited, true, depth+1); err != nil {
return err
}
}
} else if executedBranch == "else" && stepDef.Else != "" {
// Rollback the Else branch
if err := engine.rollbackStepChain(ctx, instanceID, stepDef.Else, savePointName, def, stepMap, true, depth+1); err != nil {
if err := engine.rollbackStepChainWithVisited(ctx, instanceID, stepDef.Else, savePointName, def, stepMap, visited, true, depth+1); err != nil {
return err
}
}
// If no branch was executed, skip rollback for this condition step
}
} else {
// For non-condition steps, traverse all next steps
for _, nextStepName := range stepDef.Next {
if err := engine.rollbackStepChain(ctx, instanceID, nextStepName, savePointName, def, stepMap, true, depth+1); err != nil {
if err := engine.rollbackStepChainWithVisited(ctx, instanceID, nextStepName, savePointName, def, stepMap, visited, true, depth+1); err != nil {
return err
}
}
}
}

// Do the actual rollback BEFORE traversing to previous steps (reverse order)
// Perform the actual rollback for completed or failed steps
if step, exists := stepMap[currentStep]; exists &&
(step.Status == StepStatusCompleted || step.Status == StepStatusFailed) {
if err := engine.rollbackStep(ctx, step, def); err != nil {
return fmt.Errorf("rollback step %s: %w", currentStep, err)
}
}

// Continue with a previous step (traverse backwards)
if stepDef.Prev != "" && !isParallel {
if err := engine.rollbackStepChain(ctx, instanceID, stepDef.Prev, savePointName, def, stepMap, isParallel, depth+1); err != nil {
// Continue traversing backwards through Prev links
if stepDef.Prev != "" && stepDef.Prev != rootStepName && !isParallel {
prevDef, prevOk := def.Definition.Steps[stepDef.Prev]
if prevOk && (prevDef.Type == StepTypeFork || prevDef.Type == StepTypeParallel) {
// Previous step is a Fork/Parallel - rollback ALL branches before continuing
// This ensures that when we traverse backwards from a nested fork,
// all sibling branches of the parent fork are also rolled back
for _, parallelStepName := range prevDef.Parallel {
if err := engine.rollbackStepChainWithVisited(ctx, instanceID, parallelStepName, savePointName, def, stepMap, visited, true, depth+1); err != nil {
return err
}
}
}
// Continue backwards traversal
if err := engine.rollbackStepChainWithVisited(ctx, instanceID, stepDef.Prev, savePointName, def, stepMap, visited, false, depth+1); err != nil {
return err
}
}
Expand Down Expand Up @@ -2590,6 +2625,99 @@ func (engine *Engine) findForkStepForParallelStep(ctx context.Context, instanceI
return ""
}

// findForkStepForStepInBranch finds the nearest Fork step that contains the given step
// anywhere in its branches. This works for any step within a fork branch, not just
// the first step of the branch.
func (engine *Engine) findForkStepForStepInBranch(stepName string, def *WorkflowDefinition) string {
if stepName == "" || stepName == rootStepName {
return ""
}

// First check if this step is directly in a Fork's Parallel array
for forkName, forkDef := range def.Definition.Steps {
if forkDef.Type == StepTypeFork || forkDef.Type == StepTypeParallel {
for _, parallelStep := range forkDef.Parallel {
if parallelStep == stepName {
return forkName
}
}
}
}

// Traverse backwards through Prev links to find a Fork
visited := make(map[string]bool)
current := stepName

for current != "" && current != rootStepName {
if visited[current] {
break
}
visited[current] = true

currentDef, ok := def.Definition.Steps[current]
if !ok {
break
}

// Check if the previous step is a Fork
if currentDef.Prev != "" && currentDef.Prev != rootStepName {
prevDef, prevOk := def.Definition.Steps[currentDef.Prev]
if prevOk && (prevDef.Type == StepTypeFork || prevDef.Type == StepTypeParallel) {
// Verify that current is in the parallel branches
for _, parallelStep := range prevDef.Parallel {
if engine.isStepReachableFrom(current, parallelStep, def, make(map[string]bool)) {
return currentDef.Prev
}
}
}
}

current = currentDef.Prev
}

return ""
}

// isStepReachableFrom checks if targetStep is reachable from startStep by traversing Next links.
func (engine *Engine) isStepReachableFrom(targetStep, startStep string, def *WorkflowDefinition, visited map[string]bool) bool {
if startStep == targetStep {
return true
}

if visited[startStep] {
return false
}
visited[startStep] = true

stepDef, ok := def.Definition.Steps[startStep]
if !ok {
return false
}

// Check Next steps
for _, nextStep := range stepDef.Next {
if engine.isStepReachableFrom(targetStep, nextStep, def, visited) {
return true
}
}

// Check Else branch for conditions
if stepDef.Else != "" {
if engine.isStepReachableFrom(targetStep, stepDef.Else, def, visited) {
return true
}
}

// Check parallel branches for Fork/Parallel
for _, parallelStep := range stepDef.Parallel {
if engine.isStepReachableFrom(targetStep, parallelStep, def, visited) {
return true
}
}

return false
}

// isStepInForkBranch checks if a step is part of any fork/parallel branch.
func (engine *Engine) isStepInForkBranch(
ctx context.Context,
Expand Down
Loading