diff options
Diffstat (limited to 'internal/executor/executor.go')
| -rw-r--r-- | internal/executor/executor.go | 92 |
1 files changed, 10 insertions, 82 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go index f445ef3..8924830 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -246,6 +246,14 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex err = runner.Run(ctx, t, exec) exec.EndTime = time.Now().UTC() + p.handleRunResult(ctx, t, exec, err, agentType) +} + +// handleRunResult applies the shared post-run error-classification and +// state-update logic used by both execute() and executeResume(). It sets +// exec.Status and exec.ErrorMsg, updates storage, and emits the result to +// resultCh. The caller must set exec.EndTime before calling. +func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.Execution, err error, agentType string) { if err != nil { if isRateLimitError(err) || isQuotaExhausted(err) { p.mu.Lock() @@ -323,7 +331,7 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex } if updateErr := p.store.UpdateExecution(exec); updateErr != nil { - p.logger.Error("failed to update resume execution", "error", updateErr) + p.logger.Error("failed to update execution", "error", updateErr) } p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} } @@ -493,87 +501,7 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { err = runner.Run(ctx, t, exec) exec.EndTime = time.Now().UTC() - if err != nil { - if isRateLimitError(err) || isQuotaExhausted(err) { - p.mu.Lock() - retryAfter := parseRetryAfter(err.Error()) - if retryAfter == 0 { - if isQuotaExhausted(err) { - retryAfter = 5 * time.Hour - } else { - retryAfter = 1 * time.Minute - } - } - p.rateLimited[agentType] = time.Now().Add(retryAfter) - p.logger.Info("agent rate limited", "agent", agentType, "retryAfter", retryAfter, "quotaExhausted", isQuotaExhausted(err)) - p.mu.Unlock() - } - - var blockedErr *BlockedError - if errors.As(err, &blockedErr) { - exec.Status = "BLOCKED" - if err := p.store.UpdateTaskState(t.ID, task.StateBlocked); err != nil { - p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateBlocked, "error", err) - } - if err := p.store.UpdateTaskQuestion(t.ID, blockedErr.QuestionJSON); err != nil { - p.logger.Error("failed to update task question", "taskID", t.ID, "error", err) - } - } else if ctx.Err() == context.DeadlineExceeded { - exec.Status = "TIMED_OUT" - exec.ErrorMsg = "execution timed out" - if err := p.store.UpdateTaskState(t.ID, task.StateTimedOut); err != nil { - p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateTimedOut, "error", err) - } - } else if ctx.Err() == context.Canceled { - exec.Status = "CANCELLED" - exec.ErrorMsg = "execution cancelled" - if err := p.store.UpdateTaskState(t.ID, task.StateCancelled); err != nil { - p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateCancelled, "error", err) - } - } else if isQuotaExhausted(err) { - exec.Status = "BUDGET_EXCEEDED" - exec.ErrorMsg = err.Error() - if err := p.store.UpdateTaskState(t.ID, task.StateBudgetExceeded); err != nil { - p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateBudgetExceeded, "error", err) - } - } else { - exec.Status = "FAILED" - exec.ErrorMsg = err.Error() - if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil { - p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateFailed, "error", err) - } - } - } else { - if t.ParentTaskID == "" { - subtasks, subErr := p.store.ListSubtasks(t.ID) - if subErr != nil { - p.logger.Error("failed to list subtasks", "taskID", t.ID, "error", subErr) - } - if subErr == nil && len(subtasks) > 0 { - exec.Status = "BLOCKED" - if err := p.store.UpdateTaskState(t.ID, task.StateBlocked); err != nil { - p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateBlocked, "error", err) - } - } else { - exec.Status = "READY" - if err := p.store.UpdateTaskState(t.ID, task.StateReady); err != nil { - p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateReady, "error", err) - } - } - } else { - exec.Status = "COMPLETED" - if err := p.store.UpdateTaskState(t.ID, task.StateCompleted); err != nil { - p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateCompleted, "error", err) - } - p.maybeUnblockParent(t.ParentTaskID) - } - } - - if updateErr := p.store.UpdateExecution(exec); updateErr != nil { - p.logger.Error("failed to update execution", "error", updateErr) - } - - p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} + p.handleRunResult(ctx, t, exec, err, agentType) } // RecoverStaleRunning marks any tasks stuck in RUNNING state (from a previous |
