diff options
Diffstat (limited to 'internal/executor/executor.go')
| -rw-r--r-- | internal/executor/executor.go | 43 |
1 files changed, 40 insertions, 3 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 76c8ac7..7ae4e2d 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -26,6 +26,8 @@ type Store interface { UpdateExecution(e *storage.Execution) error UpdateTaskState(id string, newState task.State) error UpdateTaskQuestion(taskID, questionJSON string) error + UpdateTaskSummary(taskID, summary string) error + AppendTaskInteraction(taskID string, interaction task.Interaction) error } // LogPather is an optional interface runners can implement to provide the log @@ -149,11 +151,20 @@ func (p *Pool) Cancel(taskID string) bool { return true } -// SubmitResume re-queues a blocked task using the provided resume execution. +// resumablePoolStates are the task states that may be submitted for session resume. +var resumablePoolStates = map[task.State]bool{ + task.StateBlocked: true, + task.StateTimedOut: true, + task.StateCancelled: true, + task.StateFailed: true, + task.StateBudgetExceeded: true, +} + +// SubmitResume re-queues a blocked or interrupted task using the provided resume execution. // The execution must have ResumeSessionID and ResumeAnswer set. func (p *Pool) SubmitResume(ctx context.Context, t *task.Task, exec *storage.Execution) error { - if t.State != task.StateBlocked && t.State != task.StateTimedOut { - return fmt.Errorf("task %s must be in BLOCKED or TIMED_OUT state to resume (current: %s)", t.ID, t.State) + if !resumablePoolStates[t.State] { + return fmt.Errorf("task %s must be in a resumable state to resume (current: %s)", t.ID, t.State) } if exec.ResumeSessionID == "" { return fmt.Errorf("resume execution for task %s must have a ResumeSessionID", t.ID) @@ -331,6 +342,15 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. } } + summary := exec.Summary + if summary == "" && exec.StdoutPath != "" { + summary = extractSummary(exec.StdoutPath) + } + if summary != "" { + if summaryErr := p.store.UpdateTaskSummary(t.ID, summary); summaryErr != nil { + p.logger.Error("failed to update task summary", "taskID", t.ID, "error", summaryErr) + } + } if updateErr := p.store.UpdateExecution(exec); updateErr != nil { p.logger.Error("failed to update execution", "error", updateErr) } @@ -566,6 +586,23 @@ func (p *Pool) RecoverStaleRunning() { } } +// RecoverStaleQueued re-submits any tasks that are stuck in QUEUED state from +// a previous server instance. Call this once on server startup, after +// RecoverStaleRunning. +func (p *Pool) RecoverStaleQueued(ctx context.Context) { + tasks, err := p.store.ListTasks(storage.TaskFilter{State: task.StateQueued}) + if err != nil { + p.logger.Error("RecoverStaleQueued: list tasks", "error", err) + return + } + for _, t := range tasks { + p.logger.Info("resubmitting stale QUEUED task", "taskID", t.ID, "name", t.Name) + if err := p.Submit(ctx, t); err != nil { + p.logger.Error("RecoverStaleQueued: submit", "error", err, "taskID", t.ID) + } + } +} + // terminalFailureStates are dependency states that cause the waiting task to fail immediately. var terminalFailureStates = map[task.State]bool{ task.StateFailed: true, |
