Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 7 additions & 14 deletions cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
TASK_STATUS_SUCCEEDED,
TASK_STATUS_WAITING,
TASK_STATUSES_ACTIVE,
TASK_STATUSES_FINAL,
)
from cylc.flow.wallclock import (
get_current_time_string,
Expand Down Expand Up @@ -786,9 +787,7 @@ def process_message(
)

if message == self.EVENT_STARTED:
if flag == self.FLAG_RECEIVED and itask.state.is_gt(
TASK_STATUS_RUNNING
):
if itask.state.is_gt(TASK_STATUS_RUNNING):
# Already running.
return True
self._process_message_started(itask, event_time, forced)
Expand All @@ -803,10 +802,8 @@ def process_message(
self.spawn_children(itask, TASK_OUTPUT_EXPIRED, forced)

elif task_output == self.EVENT_FAILED:
if flag == self.FLAG_RECEIVED and itask.state.is_gt(
TASK_STATUS_FAILED
):
# Already failed.
if itask.state(*TASK_STATUSES_FINAL):
# Already in a final state
return True
msg = self.JOB_FAILED
if run_signal is not None:
Expand All @@ -829,20 +826,16 @@ def process_message(
self.spawn_children(itask, TASK_OUTPUT_FAILED, forced)

elif message == self.EVENT_SUBMIT_FAILED:
if flag == self.FLAG_RECEIVED and itask.state.is_gt(
TASK_STATUS_SUBMIT_FAILED
):
# Already submit-failed
if itask.state(*TASK_STATUSES_FINAL):
# Already in a final state
return True
if forced or self._process_message_submit_failed(
itask, event_time
):
self.spawn_children(itask, TASK_OUTPUT_SUBMIT_FAILED, forced)

elif message == self.EVENT_SUBMITTED:
if flag == self.FLAG_RECEIVED and itask.state.is_gte(
TASK_STATUS_SUBMITTED
):
if itask.state.is_gte(TASK_STATUS_SUBMITTED):
# Already submitted.
return True
if not forced:
Expand Down
42 changes: 42 additions & 0 deletions tests/integration/test_task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@
from cylc.flow.scheduler import Scheduler
from cylc.flow.task_events_mgr import (
EventKey,
TaskEventsManager,
TaskJobLogsRetrieveContext,
)
from cylc.flow.task_outputs import TASK_OUTPUT_STARTED
from cylc.flow.task_state import (
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMIT_FAILED,
TASK_STATUS_SUCCEEDED,
)

from cylc.flow.network.resolvers import TaskMsg
Expand Down Expand Up @@ -368,3 +371,42 @@ async def test_event_email_body(
assert f'host: {mod_one.host}' in email_body
assert f'port: {mod_one.server.port}' in email_body
assert f'owner: {mod_one.owner}' in email_body


@pytest.mark.parametrize(
'message_flag',
(
TaskEventsManager.FLAG_POLLED,
TaskEventsManager.FLAG_INTERNAL,
TaskEventsManager.FLAG_RECEIVED,
),
)
async def test_delayed_event_notification(
message_flag,
one_conf,
flow,
scheduler,
run,
complete,
):
"""Delated event notification should not cause task state to rewind.

One a task reaches a state, it should not be possible for that state to
re-wind as the result of a subsequent event.

In this test, a task succeeds naturally, a "started" event is then sent
using one of the configured flags. The task state should not re-wind as a
result.

See https://github.com/cylc/cylc-flow/issues/7269
"""
id_ = flow(one_conf)
schd = scheduler(id_, paused_start=False)
async with run(schd):
itask = schd.pool.get_tasks()[0]
await complete(schd, itask.tokens.relative_id)
assert itask.state.status == TASK_STATUS_SUCCEEDED
schd.task_events_mgr.process_message(
itask, 'INFO', TASK_OUTPUT_STARTED, flag=message_flag
)
assert itask.state.status == TASK_STATUS_SUCCEEDED
Loading