diff options
| author | Peter Stone <thepeterstone@gmail.com> | 2026-03-09 04:24:06 +0000 |
|---|---|---|
| committer | Peter Stone <thepeterstone@gmail.com> | 2026-03-09 04:24:06 +0000 |
| commit | 91595726d696ef8bdc8c3ef0a39a1d3a7be34408 (patch) | |
| tree | 71d885f13d5209170a10913c69f2dde9e7703b1b /internal/executor/executor.go | |
| parent | 02851c000399a0a9fa1cee3a6d6695de00661dab (diff) | |
executor: recover stale RUNNING tasks on server startup
On restart, any tasks in RUNNING state have no active goroutine.
RecoverStaleRunning() marks them FAILED (retryable) and closes
their open execution records with an appropriate error message.
Called once from serve.go after the pool is created.
Diffstat (limited to 'internal/executor/executor.go')
| -rw-r--r-- | internal/executor/executor.go | 31 |
1 files changed, 31 insertions, 0 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go index f404ddf..9dd37ae 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -483,6 +483,37 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} } +// RecoverStaleRunning marks any tasks stuck in RUNNING state (from a previous +// server crash or restart) as FAILED. It also closes any open RUNNING execution +// records for those tasks. Call this once on server startup. +func (p *Pool) RecoverStaleRunning() { + tasks, err := p.store.ListTasks(storage.TaskFilter{State: task.StateRunning}) + if err != nil { + p.logger.Error("RecoverStaleRunning: list tasks", "error", err) + return + } + for _, t := range tasks { + p.logger.Warn("recovering stale RUNNING task", "taskID", t.ID, "name", t.Name) + // Close any open execution records. + execs, err := p.store.ListExecutions(t.ID) + if err == nil { + for _, e := range execs { + if e.Status == "RUNNING" { + e.Status = "FAILED" + e.ErrorMsg = "server restarted while task was running" + e.EndTime = time.Now().UTC() + if updateErr := p.store.UpdateExecution(e); updateErr != nil { + p.logger.Error("RecoverStaleRunning: update execution", "error", updateErr, "execID", e.ID) + } + } + } + } + if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil { + p.logger.Error("RecoverStaleRunning: update task state", "error", err, "taskID", t.ID) + } + } +} + // terminalFailureStates are dependency states that cause the waiting task to fail immediately. var terminalFailureStates = map[task.State]bool{ task.StateFailed: true, |
