Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changes.d/7237.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fixed a bug that could cause premature shutdown or stall if a parented instance
of a sometimes parentless task ended up at the runahead limit.
Comment thread
hjoliver marked this conversation as resolved.
Outdated
4 changes: 2 additions & 2 deletions cylc/flow/clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,8 @@ def remote_clean(
f"Remote clean failed for {id_} - could not clean on these "
"install target(s):"
)
for target, exc in failed_targets.items():
msg += f"\n[{target}]\n{exc}"
for target, excep in failed_targets.items():
msg += f"\n[{target}]\n{excep}"
raise CylcError(msg)


Expand Down
4 changes: 0 additions & 4 deletions cylc/flow/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,6 @@ def _remove_matched_tasks(
continue
removed[itask.tokens.task] = fnums_to_remove
if fnums_to_remove == itask.flow_nums:
# Need to remove the task from the pool.
# Spawn next occurrence of xtrigger sequential task (otherwise
# this would not happen after removing this occurrence):
schd.pool.check_spawn_psx_task(itask)
schd.pool.remove(itask, 'request')
to_kill.append(itask)
itask.removed = True
Expand Down
34 changes: 18 additions & 16 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,12 @@ def _load_pool_from_tasks(self):
flow=[FLOW_NEW],
flow_descr=f"original flow from {self.options.starttask}"
)
# Spawning to the runahead limit immediately is not strictly necessary
# as it would occur over several scheduler main loop iterations; we do
# it mainly for compatibility with integration tests pre PR #7237.
self.pool.spawn_to_runahead_limit()
for itask in self.pool.get_tasks():
self.pool.queue_if_ready(itask)
Comment thread
hjoliver marked this conversation as resolved.
Outdated

def _load_pool_from_point(self):
"""Load task pool for a cycle point, for a new run.
Expand Down Expand Up @@ -881,13 +887,6 @@ def _load_pool_from_db(self):
self.workflow_db_mgr.pri_dao.select_abs_outputs_for_restart(
self.pool.load_abs_outputs_for_restart)

# Compute and release runahead tasks once after loading all tasks from
# the DB. This also causes spawning of parentless tasks out to the
# runahead limit, which may be necessary here if the stop point or
# runahead limit was changed for the restart.
self.pool.compute_runahead()
self.pool.release_runahead_tasks()

self.pool.load_db_tasks_to_hold()
self.pool.update_flow_mgr()

Expand Down Expand Up @@ -1622,8 +1621,13 @@ def update_profiler_logs(self, tinit):

async def _main_loop(self) -> None:
"""A single iteration of the main loop."""

tinit = time()

self.pool.compute_runahead()
self.pool.release_runahead_tasks()
await self.workflow_shutdown()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps take this opportunity to rename workflow_shutdown to e.g. set_stop_mode

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we punt that as off-topic? From a quick look it doesn't just set the stop mode, it might also shut the scheduler down.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little confusing that we would attempt shutdown so soon after the start of main loop... Could you explain why this has been moved here?

At the least, I think a comment would be useful:

Suggested change
await self.workflow_shutdown()
# If applicable, set stop mode or shutdown on task failure:
await self.workflow_shutdown()


# Useful for debugging core scheduler issues:
# import logging
# self.pool.log_task_pool(logging.CRITICAL)
Expand Down Expand Up @@ -1656,11 +1660,11 @@ async def _main_loop(self) -> None:
self.broadcast_mgr.check_ext_triggers(
itask, self.ext_trigger_queue)

if itask.is_ready_to_run() and not itask.is_manual_submit:
self.pool.queue_task(itask)
self.pool.queue_if_ready(itask)

if self.xtrigger_mgr.do_housekeeping:
self.xtrigger_mgr.housekeep(self.pool.get_tasks())

self.pool.clock_expire_tasks()
self.release_tasks_to_run()

Expand Down Expand Up @@ -1701,6 +1705,8 @@ async def _main_loop(self) -> None:
# Update state summary, database, and uifeed
self.workflow_db_mgr.put_task_event_timers(self.task_events_mgr)

self.pool.release_runahead_tasks()
Comment thread
hjoliver marked this conversation as resolved.
Outdated

# List of task whose states have changed.
updated_task_list = [
t for t in self.pool.get_tasks() if t.state.is_updated]
Expand All @@ -1717,11 +1723,10 @@ async def _main_loop(self) -> None:
await self.update_data_structure()

if has_updated:
if not self.is_reloaded:
if not self.is_reloaded and self.is_stalled:
# (A reload cannot un-stall workflow by itself)
if self.is_stalled:
self.is_stalled = False
self.update_data_store()
self.is_stalled = False
self.update_data_store()
self.is_reloaded = False

# Reset workflow and task updated flags.
Expand All @@ -1743,9 +1748,6 @@ async def _main_loop(self) -> None:
# Shutdown workflow if timeouts have occurred
self.timeout_check()

# Does the workflow need to shutdown on task failure?
await self.workflow_shutdown()

if self.options.profile_mode:
self.update_profiler_logs(tinit)

Expand Down
Loading
Loading