summaryrefslogtreecommitdiff
path: root/internal/executor/executor.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/executor/executor.go')
-rw-r--r--internal/executor/executor.go17
1 files changed, 17 insertions, 0 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index bafacd2..7ae4e2d 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -586,6 +586,23 @@ func (p *Pool) RecoverStaleRunning() {
}
}
+// RecoverStaleQueued re-submits any tasks that are stuck in QUEUED state from
+// a previous server instance. Call this once on server startup, after
+// RecoverStaleRunning.
+func (p *Pool) RecoverStaleQueued(ctx context.Context) {
+ tasks, err := p.store.ListTasks(storage.TaskFilter{State: task.StateQueued})
+ if err != nil {
+ p.logger.Error("RecoverStaleQueued: list tasks", "error", err)
+ return
+ }
+ for _, t := range tasks {
+ p.logger.Info("resubmitting stale QUEUED task", "taskID", t.ID, "name", t.Name)
+ if err := p.Submit(ctx, t); err != nil {
+ p.logger.Error("RecoverStaleQueued: submit", "error", err, "taskID", t.ID)
+ }
+ }
+}
+
// terminalFailureStates are dependency states that cause the waiting task to fail immediately.
var terminalFailureStates = map[task.State]bool{
task.StateFailed: true,