Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions runner/mobiagent/workflow/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,10 @@ def _run_loop(self, step: dict[str, Any], actual_step_id: str) -> dict[str, Any]
local_aliases: dict[str, str] = {}
with self.context.push_scope(local_aliases, loop_runtime):
stopped = self._execute_steps_in_scope(body_steps, iteration_prefix, local_aliases)
should_break = self._evaluate_condition(break_if_condition) if break_if_condition is not None else False
if stopped:
should_break = False
else:
should_break = self._evaluate_condition(break_if_condition) if break_if_condition is not None else False
executed_iterations.append(
{
"iteration": iteration_index + 1,
Expand Down Expand Up @@ -751,4 +754,4 @@ def _get_device(self, device_type: str):
self._devices[device_type] = mobiagent.HarmonyDevice()
else:
raise ValueError(f"Unsupported device type: {device_type}")
return self._devices[device_type]
return self._devices[device_type]
73 changes: 73 additions & 0 deletions runner/mobiagent/workflow/test_workflow_loop_failures.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from __future__ import annotations

import unittest
from pathlib import Path
from unittest.mock import patch

from runner.mobiagent.workflow.engine import WorkflowContext, WorkflowRunner


class WorkflowLoopFailureTests(unittest.TestCase):
def test_loop_body_failure_is_not_masked_by_break_condition(self) -> None:
runner = WorkflowRunner.__new__(WorkflowRunner)
runner.context = WorkflowContext(Path("workflow.json"), Path("."), {})

step = {
"id": "3",
"type": "loop",
"max_times": 1,
"break_if": {
"left": "${steps.2.output.structured_output.contains_today_chat}",
"operator": "==",
"right": False,
},
"steps": [],
}

with patch.object(runner, "_execute_steps_in_scope", return_value=True):
with self.assertRaisesRegex(RuntimeError, "Loop body stopped at iteration 1"):
runner._run_loop(step, "3")

def test_loop_break_condition_can_read_current_iteration_tool_output(self) -> None:
runner = WorkflowRunner.__new__(WorkflowRunner)
runner.context = WorkflowContext(Path("workflow.json"), Path("."), {})

step = {
"id": "3",
"type": "loop",
"max_times": 1,
"break_if": {
"left": "${steps.2.output.structured_output.contains_today_chat}",
"operator": "==",
"right": False,
},
"steps": [],
}

def execute_body(_body_steps, actual_prefix, local_aliases):
result = type(
"Result",
(),
{
"to_dict": lambda self: {
"output": {
"structured_output": {
"contains_today_chat": False,
}
}
}
},
)()
local_aliases["2"] = f"{actual_prefix}.2"
runner.context.step_results[f"{actual_prefix}.2"] = result
return False

with patch.object(runner, "_execute_steps_in_scope", side_effect=execute_body):
output = runner._run_loop(step, "3")

self.assertTrue(output["broke_early"])
self.assertEqual(output["break_iteration"], 1)


if __name__ == "__main__":
unittest.main()