Skip to content

Commit 74fdbe9

Browse files
committed
fix(tasks): improve stop all
1 parent c71328e commit 74fdbe9

File tree

4 files changed

+67
-9
lines changed

4 files changed

+67
-9
lines changed

db/Store.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,7 @@ type TokenManager interface {
396396
type TaskManager interface {
397397
CreateTask(task Task, maxTasks int) (Task, error)
398398
UpdateTask(task Task) error
399+
SetWaitingTasksToStopped(projectID int, templateID int) error
399400
GetTemplateTasks(projectID int, templateID int, params RetrieveQueryParams) ([]TaskWithTpl, error)
400401
GetProjectTasks(projectID int, params RetrieveQueryParams) ([]TaskWithTpl, error)
401402
GetTask(projectID int, taskID int) (Task, error)

db/bolt/task.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"time"
55

66
"github.com/semaphoreui/semaphore/db"
7+
"github.com/semaphoreui/semaphore/pkg/task_logger"
8+
"github.com/semaphoreui/semaphore/pkg/tz"
79
"go.etcd.io/bbolt"
810
)
911

@@ -130,6 +132,29 @@ func (d *BoltDb) UpdateTask(task db.Task) error {
130132
return d.updateObject(0, db.TaskProps, task)
131133
}
132134

135+
func (d *BoltDb) SetWaitingTasksToStopped(projectID int, templateID int) error {
136+
var tasks []db.Task
137+
err := d.getObjects(0, db.TaskProps, db.RetrieveQueryParams{}, func(tsk any) bool {
138+
task := tsk.(db.Task)
139+
return task.ProjectID == projectID &&
140+
task.TemplateID == templateID &&
141+
task.Status == task_logger.TaskWaitingStatus
142+
}, &tasks)
143+
if err != nil {
144+
return err
145+
}
146+
147+
now := tz.Now()
148+
for _, task := range tasks {
149+
task.Status = task_logger.TaskStoppedStatus
150+
task.End = &now
151+
if err := d.updateObject(0, db.TaskProps, task); err != nil {
152+
return err
153+
}
154+
}
155+
return nil
156+
}
157+
133158
func (d *BoltDb) CreateTaskOutput(output db.TaskOutput) (db.TaskOutput, error) {
134159
newOutput, err := d.createObject(output.TaskID, db.TaskOutputProps, output)
135160
if err != nil {

db/sql/task.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"github.com/Masterminds/squirrel"
99
"github.com/semaphoreui/semaphore/db"
10+
"github.com/semaphoreui/semaphore/pkg/task_logger"
1011
)
1112

1213
func (d *SqlDb) CreateTaskStage(stage db.TaskStage) (res db.TaskStage, err error) {
@@ -217,6 +218,17 @@ func (d *SqlDb) UpdateTask(task db.Task) error {
217218
return err
218219
}
219220

221+
func (d *SqlDb) SetWaitingTasksToStopped(projectID int, templateID int) error {
222+
_, err := d.exec(
223+
"update task set status=?, `end`=? where template_id=? and project_id=? and status=?",
224+
task_logger.TaskStoppedStatus,
225+
time.Now().UTC(),
226+
templateID,
227+
projectID,
228+
task_logger.TaskWaitingStatus)
229+
return err
230+
}
231+
220232
func (d *SqlDb) CreateTaskOutput(output db.TaskOutput) (db.TaskOutput, error) {
221233
insertID, err := d.insert(
222234
"id",

services/tasks/TaskPool.go

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -557,32 +557,53 @@ func (p *TaskPool) StopTask(targetTask db.Task, forceStop bool) error {
557557
// the specified project and template. If forceStop is true, tasks are marked as
558558
// stopped immediately and running tasks are killed; otherwise tasks are marked
559559
// as stopping and will gracefully transition to stopped.
560+
//
561+
// Waiting tasks (which have no running process) are dequeued and bulk-updated in
562+
// the database in a single query, avoiding expensive per-task hydration.
563+
// Non-waiting tasks go through the regular per-task SetStatus path.
560564
func (p *TaskPool) StopTasksByTemplate(projectID int, templateID int, forceStop bool) {
561565

562566
stoppedTasks := map[int]struct{}{}
563567

564-
// Handle queued tasks
565-
for _, t := range p.state.QueueRange() {
568+
// Bulk-update all waiting tasks in DB in a single query.
569+
// This is the fast path -- waiting tasks have no running process.
570+
if err := p.store.SetWaitingTasksToStopped(projectID, templateID); err != nil {
571+
log.Error(err)
572+
}
573+
574+
// Dequeue waiting tasks from the in-memory queue.
575+
i := 0
576+
for i < p.state.QueueLen() {
577+
t := p.state.QueueGet(i)
566578
if t == nil {
579+
i++
567580
continue
568581
}
569582
if t.Task.ProjectID != projectID || t.Task.TemplateID != templateID {
583+
i++
570584
continue
571585
}
572586
if t.Task.Status.IsFinished() {
587+
i++
588+
continue
589+
}
590+
591+
if t.Task.Status == task_logger.TaskWaitingStatus {
592+
stoppedTasks[t.Task.ID] = struct{}{}
593+
_ = p.state.DequeueAt(i)
573594
continue
574595
}
596+
575597
if forceStop {
576598
t.SetStatus(task_logger.TaskStoppedStatus)
577599
} else {
578600
t.SetStatus(task_logger.TaskStoppingStatus)
579601
}
580-
581602
stoppedTasks[t.Task.ID] = struct{}{}
582-
// Queued tasks will be dequeued and immediately finalize to Stopped in run()
603+
i++
583604
}
584605

585-
// Handle running tasks
606+
// Handle running tasks -- these need per-task SetStatus and kill.
586607
for _, t := range p.state.RunningRange() {
587608
if t == nil {
588609
continue
@@ -606,9 +627,8 @@ func (p *TaskPool) StopTasksByTemplate(projectID int, templateID int, forceStop
606627
stoppedTasks[t.Task.ID] = struct{}{}
607628
}
608629

609-
// Update tasks in DB that are neither queued nor running but still active
610-
// (e.g., created but not present in this instance's memory state).
611-
630+
// Handle non-waiting tasks in DB that are neither queued nor running locally
631+
// (e.g., HA mode or tasks created but not present in this instance's memory).
612632
tasks, err := p.store.GetTemplateTasks(projectID, templateID, db.RetrieveQueryParams{
613633
TaskFilter: &db.TaskFilter{
614634
Status: task_logger.UnfinishedTaskStatuses(),
@@ -622,7 +642,7 @@ func (p *TaskPool) StopTasksByTemplate(projectID int, templateID int, forceStop
622642

623643
for _, twt := range tasks {
624644

625-
if _, ok := stoppedTasks[twt.ID]; ok { // already stopped
645+
if _, ok := stoppedTasks[twt.ID]; ok {
626646
continue
627647
}
628648

0 commit comments

Comments
 (0)