summaryrefslogtreecommitdiff
path: root/internal/executor/executor.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/executor/executor.go')
-rw-r--r--internal/executor/executor.go17
1 files changed, 14 insertions, 3 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index 475d150..7674fe6 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -567,9 +567,10 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
}
// RecoverStaleRunning marks any tasks stuck in RUNNING state (from a previous
-// server crash or restart) as FAILED. It also closes any open RUNNING execution
-// records for those tasks. Call this once on server startup.
-func (p *Pool) RecoverStaleRunning() {
+// server crash or restart) as FAILED, then immediately re-queues them for
+// retry. It also closes any open RUNNING execution records for those tasks.
+// Call this once on server startup.
+func (p *Pool) RecoverStaleRunning(ctx context.Context) {
tasks, err := p.store.ListTasks(storage.TaskFilter{State: task.StateRunning})
if err != nil {
p.logger.Error("RecoverStaleRunning: list tasks", "error", err)
@@ -593,6 +594,16 @@ func (p *Pool) RecoverStaleRunning() {
}
if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil {
p.logger.Error("RecoverStaleRunning: update task state", "error", err, "taskID", t.ID)
+ continue
+ }
+ // Re-queue so the task retries automatically. Submit expects QUEUED state.
+ if err := p.store.UpdateTaskState(t.ID, task.StateQueued); err != nil {
+ p.logger.Error("RecoverStaleRunning: set queued", "error", err, "taskID", t.ID)
+ continue
+ }
+ t.State = task.StateQueued
+ if err := p.Submit(ctx, t); err != nil {
+ p.logger.Error("RecoverStaleRunning: re-queue", "error", err, "taskID", t.ID)
}
}
}