Skip to content

Commit 20a8f46

Browse files
authored
Merge pull request #3237 from lohitkolluri/lk/constraint-enforcer-ignore-completed-jobs
constraintenforcer: ignore completed tasks in reservation accounting
2 parents 12ce349 + 4e661c8 commit 20a8f46

2 files changed

Lines changed: 82 additions & 0 deletions

File tree

manager/orchestrator/constraintenforcer/constraint_enforcer.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,12 @@ loop:
118118
if t.DesiredState < api.TaskStateAssigned || t.DesiredState > api.TaskStateCompleted {
119119
continue
120120
}
121+
// Completed tasks should not consume node reservations. This is
122+
// particularly important for replicated-job services, where tasks may
123+
// remain with DesiredState=Completed after they finished.
124+
if t.Status.State >= api.TaskStateCompleted {
125+
continue
126+
}
121127

122128
// Ensure that the node still satisfies placement constraints.
123129
// NOTE: If the task is associacted with a service then we must use the

manager/orchestrator/constraintenforcer/constraint_enforcer_test.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,82 @@ import (
1212
"github.com/stretchr/testify/require"
1313
)
1414

15+
func TestRejectNoncompliantTasksIgnoresCompletedJobTasksInReservations(t *testing.T) {
16+
s := store.NewMemoryStore(nil)
17+
require.NotNil(t, s)
18+
t.Cleanup(func() { _ = s.Close() })
19+
20+
node := &api.Node{
21+
ID: "node1",
22+
Spec: api.NodeSpec{
23+
Availability: api.NodeAvailabilityActive,
24+
},
25+
Description: &api.NodeDescription{
26+
Resources: &api.Resources{
27+
MemoryBytes: 1024,
28+
},
29+
},
30+
}
31+
32+
// A live task that fits if completed job tasks are ignored.
33+
runningTask := &api.Task{
34+
ID: "running1",
35+
NodeID: node.ID,
36+
ServiceID: "svc1",
37+
DesiredState: api.TaskStateRunning,
38+
Status: api.TaskStatus{
39+
State: api.TaskStateRunning,
40+
},
41+
Spec: api.TaskSpec{
42+
Resources: &api.ResourceRequirements{
43+
Reservations: &api.Resources{
44+
MemoryBytes: 700,
45+
},
46+
},
47+
},
48+
}
49+
50+
// A completed replicated-job task that should not consume reservations.
51+
completedJobTask := &api.Task{
52+
ID: "job1",
53+
NodeID: node.ID,
54+
ServiceID: "jobsvc",
55+
DesiredState: api.TaskStateCompleted,
56+
Status: api.TaskStatus{
57+
State: api.TaskStateCompleted,
58+
},
59+
Spec: api.TaskSpec{
60+
Resources: &api.ResourceRequirements{
61+
Reservations: &api.Resources{
62+
MemoryBytes: 700,
63+
},
64+
},
65+
},
66+
}
67+
68+
require.NoError(t, s.Update(func(tx store.Tx) error {
69+
if err := store.CreateNode(tx, node); err != nil {
70+
return err
71+
}
72+
if err := store.CreateTask(tx, runningTask); err != nil {
73+
return err
74+
}
75+
if err := store.CreateTask(tx, completedJobTask); err != nil {
76+
return err
77+
}
78+
return nil
79+
}))
80+
81+
ce := New(s)
82+
ce.rejectNoncompliantTasks(node)
83+
84+
s.View(func(tx store.ReadTx) {
85+
got := store.GetTask(tx, runningTask.ID)
86+
require.NotNil(t, got)
87+
assert.NotEqual(t, api.TaskStateRejected, got.Status.State, "running task unexpectedly rejected; completed job tasks should not count toward reservations")
88+
})
89+
}
90+
1591
func TestConstraintEnforcer(t *testing.T) {
1692
nodes := []*api.Node{
1793
// this node starts as a worker, but then is changed to a manager.

0 commit comments

Comments
 (0)