Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/7340.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Parentless run-ahead and sequential xtriggered tasks no longer spawn their next instance on manual removal.
5 changes: 1 addition & 4 deletions cylc/flow/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,7 @@ def _remove_matched_tasks(
removed[itask.tokens.task] = fnums_to_remove
if fnums_to_remove == itask.flow_nums:
# Need to remove the task from the pool.
# Spawn next occurrence of xtrigger sequential task (otherwise
# this would not happen after removing this occurrence):
schd.pool.check_spawn_psx_task(itask)
schd.pool.remove(itask, 'request')
schd.pool.remove(itask, 'request', spawn=False)
to_kill.append(itask)
itask.removed = True
itask.flow_nums.difference_update(fnums_to_remove)
Expand Down
11 changes: 9 additions & 2 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -883,15 +883,22 @@ def spawn_if_parentless(self, tdef, point, flow_nums):
if ntask is not None and not is_in_pool:
self.add_to_pool(ntask)

def remove(self, itask: 'TaskProxy', reason: Optional[str] = None) -> None:
def remove(
self, itask: 'TaskProxy',
reason: Optional[str] = None,
spawn: Optional[bool] = True
) -> None:
"""Remove a task from the pool."""
# the held state is no longer relevant -> remove it
self.release_held_active_task(itask)

# Mark any PSX as spawned if we need to avoid spawning.
if itask.is_xtrigger_sequential and not spawn:
self.xtrigger_mgr.sequential_has_spawned_next.add(itask.identity)
# xtriggers are no longer relevant -> remove them
self.xtrigger_mgr.force_satisfy_all(itask, log=False)

if itask.state.is_runahead and itask.flow_nums:
if itask.state.is_runahead and itask.flow_nums and spawn:
# If removing a parentless runahead-limited task
# auto-spawn its next instance first.
self.spawn_if_parentless(
Expand Down
11 changes: 3 additions & 8 deletions tests/functional/xtriggers/04-sequential.t
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

. "$(dirname "$0")/test_header"

set_test_number 7
set_test_number 6

# Test workflow uses built-in 'echo' xtrigger.
init_workflow "${TEST_NAME_BASE}" << '__FLOW_CONFIG__'
Expand Down Expand Up @@ -78,15 +78,10 @@ cylc remove "${WORKFLOW_NAME}//3001/b"

poll_grep_workflow_log 'Command "remove_tasks" actioned.'

cylc show "${WORKFLOW_NAME}//3002/b" | grep -E 'state: ' > 3002.b.log
cylc show "${WORKFLOW_NAME}//3003/b" 2>&1 >/dev/null | grep -E 'No matching' > 3003.b.log
cylc show "${WORKFLOW_NAME}//3002/b" 2>&1 >/dev/null | grep -E 'No matching' > 3002.b.log

# 3002/b should be only at 3002.
cmp_ok 3002.b.log - <<__END__
state: waiting
__END__
cmp_ok 3003.b.log - <<__END__
No matching active tasks found: 3003/b
No matching active tasks found: 3002/b
__END__

cylc show "${WORKFLOW_NAME}//3002/c" | grep -E 'state: ' > 3002.c.log
Expand Down
41 changes: 41 additions & 0 deletions tests/integration/test_remove.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,3 +545,44 @@ async def test_remove_triggered(flow, scheduler, start):
)
assert not schd.pool.get_tasks()
assert not schd.pool.tasks_to_trigger_now


async def test_remove_spawn(flow, scheduler, start):
"""Test the no-spawn removal of parentless tasks."""
schd: Scheduler = scheduler(
flow({
'scheduler': {
'cycle point format': 'CCYY',
},
'scheduling': {
'initial cycle point': '2000',
'runahead limit': 'P0',
'graph': {
'R3//P1Y': '''
@wall_clock => a
b
c
''',
},
},
})
)
async with start(schd):
assert schd.pool.get_task_ids() == {
'2000/a',
'2000/b',
'2000/c',
'2001/b',
'2001/c',
}

# Removal of parentless runahead and sequential xtrigger (PSX) spawned.
await run_cmd(remove_tasks(schd, ['2000/a', '2001/c', '2001/b'], []))
assert schd.pool.get_task_ids() == {
'2000/b',
'2000/c',
}

# empty the workflow
await run_cmd(remove_tasks(schd, ['2000/b', '2000/c'], []))
assert schd.pool.get_task_ids() == set()
48 changes: 35 additions & 13 deletions tests/integration/test_sequential_xtriggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,37 +56,59 @@ def sequential(flow, scheduler):


async def test_remove(sequential: Scheduler, start):
"""It should spawn the next instance when a task is removed.
"""It should not spawn the next instance when a task is removed.

Ensure that removing a task with a sequential xtrigger does not break the
chain causing future instances to be removed from the workflow.
Ensure that manually removing a task with a sequential xtrigger does not
spawn the next, while internal removal does.
"""
async with start(sequential):
# the scheduler starts with one task in the pool
assert list_cycles(sequential) == ['2000']

# it sequentially spawns out to the runahead limit
for year in range(2000, 2010):
# internal remove should spawn the next cycle
foo = sequential.pool.get_task(ISO8601Point('2000'), 'foo')
sequential.pool.remove(foo)
assert list_cycles(sequential) == ['2001']

# this sequentially spawns out to the runahead limit
for year in range(2001, 2010):
foo = sequential.pool.get_task(ISO8601Point(f'{year}'), 'foo')
if foo.state(is_runahead=True):
break
sequential.xtrigger_mgr.call_xtriggers_async(foo)
assert list_cycles(sequential) == [
'2000',
'2001',
'2002',
'2003',
'2004',
]

# remove all tasks in the pool
await run_cmd(remove_tasks(sequential, ['*'], ["1"]))
# internal remove of RH PSX should spawn the next cycle
foo = sequential.pool.get_task(ISO8601Point('2004'), 'foo')
sequential.pool.remove(foo)
assert '2005' in list_cycles(sequential)

# the next cycle should be automatically spawned
assert list_cycles(sequential) == ['2004']
# remove command should not spawn next RH task
await run_cmd(remove_tasks(sequential, ['2005'], ["1"]))
assert list_cycles(sequential) == [
'2001',
'2002',
'2003',
]

# NOTE: You won't spot this issue in a functional test because the
# re-spawned tasks are detected as completed and automatically removed.
# So ATM not dangerous, but potentially inefficient.
# and internal remove of already xtrigger spawned task should
# not spawn the next either.
foo = sequential.pool.get_task(ISO8601Point('2003'), 'foo')
sequential.pool.remove(foo)
assert list_cycles(sequential) == [
'2001',
'2002',
]

# Now, let's just command/manual remove all tasks in the pool
await run_cmd(remove_tasks(sequential, ['*'], ["1"]))
# the workflow should be empty
assert not list_cycles(sequential)


async def test_trigger(sequential, start):
Expand Down
Loading