summaryrefslogtreecommitdiff
path: root/internal/executor/executor.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/executor/executor.go')
-rw-r--r--internal/executor/executor.go92
1 files changed, 10 insertions, 82 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index f445ef3..8924830 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -246,6 +246,14 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex
err = runner.Run(ctx, t, exec)
exec.EndTime = time.Now().UTC()
+ p.handleRunResult(ctx, t, exec, err, agentType)
+}
+
+// handleRunResult applies the shared post-run error-classification and
+// state-update logic used by both execute() and executeResume(). It sets
+// exec.Status and exec.ErrorMsg, updates storage, and emits the result to
+// resultCh. The caller must set exec.EndTime before calling.
+func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.Execution, err error, agentType string) {
if err != nil {
if isRateLimitError(err) || isQuotaExhausted(err) {
p.mu.Lock()
@@ -323,7 +331,7 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex
}
if updateErr := p.store.UpdateExecution(exec); updateErr != nil {
- p.logger.Error("failed to update resume execution", "error", updateErr)
+ p.logger.Error("failed to update execution", "error", updateErr)
}
p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err}
}
@@ -493,87 +501,7 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
err = runner.Run(ctx, t, exec)
exec.EndTime = time.Now().UTC()
- if err != nil {
- if isRateLimitError(err) || isQuotaExhausted(err) {
- p.mu.Lock()
- retryAfter := parseRetryAfter(err.Error())
- if retryAfter == 0 {
- if isQuotaExhausted(err) {
- retryAfter = 5 * time.Hour
- } else {
- retryAfter = 1 * time.Minute
- }
- }
- p.rateLimited[agentType] = time.Now().Add(retryAfter)
- p.logger.Info("agent rate limited", "agent", agentType, "retryAfter", retryAfter, "quotaExhausted", isQuotaExhausted(err))
- p.mu.Unlock()
- }
-
- var blockedErr *BlockedError
- if errors.As(err, &blockedErr) {
- exec.Status = "BLOCKED"
- if err := p.store.UpdateTaskState(t.ID, task.StateBlocked); err != nil {
- p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateBlocked, "error", err)
- }
- if err := p.store.UpdateTaskQuestion(t.ID, blockedErr.QuestionJSON); err != nil {
- p.logger.Error("failed to update task question", "taskID", t.ID, "error", err)
- }
- } else if ctx.Err() == context.DeadlineExceeded {
- exec.Status = "TIMED_OUT"
- exec.ErrorMsg = "execution timed out"
- if err := p.store.UpdateTaskState(t.ID, task.StateTimedOut); err != nil {
- p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateTimedOut, "error", err)
- }
- } else if ctx.Err() == context.Canceled {
- exec.Status = "CANCELLED"
- exec.ErrorMsg = "execution cancelled"
- if err := p.store.UpdateTaskState(t.ID, task.StateCancelled); err != nil {
- p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateCancelled, "error", err)
- }
- } else if isQuotaExhausted(err) {
- exec.Status = "BUDGET_EXCEEDED"
- exec.ErrorMsg = err.Error()
- if err := p.store.UpdateTaskState(t.ID, task.StateBudgetExceeded); err != nil {
- p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateBudgetExceeded, "error", err)
- }
- } else {
- exec.Status = "FAILED"
- exec.ErrorMsg = err.Error()
- if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil {
- p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateFailed, "error", err)
- }
- }
- } else {
- if t.ParentTaskID == "" {
- subtasks, subErr := p.store.ListSubtasks(t.ID)
- if subErr != nil {
- p.logger.Error("failed to list subtasks", "taskID", t.ID, "error", subErr)
- }
- if subErr == nil && len(subtasks) > 0 {
- exec.Status = "BLOCKED"
- if err := p.store.UpdateTaskState(t.ID, task.StateBlocked); err != nil {
- p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateBlocked, "error", err)
- }
- } else {
- exec.Status = "READY"
- if err := p.store.UpdateTaskState(t.ID, task.StateReady); err != nil {
- p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateReady, "error", err)
- }
- }
- } else {
- exec.Status = "COMPLETED"
- if err := p.store.UpdateTaskState(t.ID, task.StateCompleted); err != nil {
- p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateCompleted, "error", err)
- }
- p.maybeUnblockParent(t.ParentTaskID)
- }
- }
-
- if updateErr := p.store.UpdateExecution(exec); updateErr != nil {
- p.logger.Error("failed to update execution", "error", updateErr)
- }
-
- p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err}
+ p.handleRunResult(ctx, t, exec, err, agentType)
}
// RecoverStaleRunning marks any tasks stuck in RUNNING state (from a previous