summaryrefslogtreecommitdiff
path: root/internal/executor/executor.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/executor/executor.go')
-rw-r--r--internal/executor/executor.go43
1 files changed, 40 insertions, 3 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index 76c8ac7..7ae4e2d 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -26,6 +26,8 @@ type Store interface {
UpdateExecution(e *storage.Execution) error
UpdateTaskState(id string, newState task.State) error
UpdateTaskQuestion(taskID, questionJSON string) error
+ UpdateTaskSummary(taskID, summary string) error
+ AppendTaskInteraction(taskID string, interaction task.Interaction) error
}
// LogPather is an optional interface runners can implement to provide the log
@@ -149,11 +151,20 @@ func (p *Pool) Cancel(taskID string) bool {
return true
}
-// SubmitResume re-queues a blocked task using the provided resume execution.
+// resumablePoolStates are the task states that may be submitted for session resume.
+var resumablePoolStates = map[task.State]bool{
+ task.StateBlocked: true,
+ task.StateTimedOut: true,
+ task.StateCancelled: true,
+ task.StateFailed: true,
+ task.StateBudgetExceeded: true,
+}
+
+// SubmitResume re-queues a blocked or interrupted task using the provided resume execution.
// The execution must have ResumeSessionID and ResumeAnswer set.
func (p *Pool) SubmitResume(ctx context.Context, t *task.Task, exec *storage.Execution) error {
- if t.State != task.StateBlocked && t.State != task.StateTimedOut {
- return fmt.Errorf("task %s must be in BLOCKED or TIMED_OUT state to resume (current: %s)", t.ID, t.State)
+ if !resumablePoolStates[t.State] {
+ return fmt.Errorf("task %s must be in a resumable state to resume (current: %s)", t.ID, t.State)
}
if exec.ResumeSessionID == "" {
return fmt.Errorf("resume execution for task %s must have a ResumeSessionID", t.ID)
@@ -331,6 +342,15 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.
}
}
+ summary := exec.Summary
+ if summary == "" && exec.StdoutPath != "" {
+ summary = extractSummary(exec.StdoutPath)
+ }
+ if summary != "" {
+ if summaryErr := p.store.UpdateTaskSummary(t.ID, summary); summaryErr != nil {
+ p.logger.Error("failed to update task summary", "taskID", t.ID, "error", summaryErr)
+ }
+ }
if updateErr := p.store.UpdateExecution(exec); updateErr != nil {
p.logger.Error("failed to update execution", "error", updateErr)
}
@@ -566,6 +586,23 @@ func (p *Pool) RecoverStaleRunning() {
}
}
+// RecoverStaleQueued re-submits any tasks that are stuck in QUEUED state from
+// a previous server instance. Call this once on server startup, after
+// RecoverStaleRunning.
+func (p *Pool) RecoverStaleQueued(ctx context.Context) {
+ tasks, err := p.store.ListTasks(storage.TaskFilter{State: task.StateQueued})
+ if err != nil {
+ p.logger.Error("RecoverStaleQueued: list tasks", "error", err)
+ return
+ }
+ for _, t := range tasks {
+ p.logger.Info("resubmitting stale QUEUED task", "taskID", t.ID, "name", t.Name)
+ if err := p.Submit(ctx, t); err != nil {
+ p.logger.Error("RecoverStaleQueued: submit", "error", err, "taskID", t.ID)
+ }
+ }
+}
+
// terminalFailureStates are dependency states that cause the waiting task to fail immediately.
var terminalFailureStates = map[task.State]bool{
task.StateFailed: true,