diff options
Diffstat (limited to 'internal/executor/executor.go')
| -rw-r--r-- | internal/executor/executor.go | 17 |
1 files changed, 17 insertions, 0 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go index bafacd2..7ae4e2d 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -586,6 +586,23 @@ func (p *Pool) RecoverStaleRunning() { } } +// RecoverStaleQueued re-submits any tasks that are stuck in QUEUED state from +// a previous server instance. Call this once on server startup, after +// RecoverStaleRunning. +func (p *Pool) RecoverStaleQueued(ctx context.Context) { + tasks, err := p.store.ListTasks(storage.TaskFilter{State: task.StateQueued}) + if err != nil { + p.logger.Error("RecoverStaleQueued: list tasks", "error", err) + return + } + for _, t := range tasks { + p.logger.Info("resubmitting stale QUEUED task", "taskID", t.ID, "name", t.Name) + if err := p.Submit(ctx, t); err != nil { + p.logger.Error("RecoverStaleQueued: submit", "error", err, "taskID", t.ID) + } + } +} + // terminalFailureStates are dependency states that cause the waiting task to fail immediately. var terminalFailureStates = map[task.State]bool{ task.StateFailed: true, |
