diff options
| author | Peter Stone <thepeterstone@gmail.com> | 2026-04-04 09:18:52 +0000 |
|---|---|---|
| committer | Peter Stone <thepeterstone@gmail.com> | 2026-04-04 09:18:52 +0000 |
| commit | 2917c580ae3eab093e9e655ccdf210030b7b9d1f (patch) | |
| tree | 2d1d89af605a19a3b70c3eee5972921c083be364 /internal/executor/executor.go | |
| parent | d4aeb496a883fa20194ae01d127535494e684812 (diff) | |
fix: executor checker — close race, test flakiness, checker report for all failure states
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/executor/executor.go')
| -rw-r--r-- | internal/executor/executor.go | 21 |
1 files changed, 13 insertions, 8 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go index e87d066..a60a771 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -155,11 +155,15 @@ func (p *Pool) Submit(ctx context.Context, t *task.Task) error { p.mu.Unlock() return fmt.Errorf("executor pool is shut down") } - p.mu.Unlock() + // Send while holding the lock so that Shutdown cannot close workCh between + // the closed-check above and the send below. The dispatch goroutine never + // holds p.mu while receiving from workCh, so this cannot deadlock. select { case p.workCh <- workItem{ctx: ctx, task: t}: + p.mu.Unlock() return nil default: + p.mu.Unlock() return fmt.Errorf("executor work queue full (capacity %d)", cap(p.workCh)) } } @@ -419,12 +423,12 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. p.mu.Lock() p.consecutiveFailures[agentType]++ p.mu.Unlock() - // If this is a checker task, attach the failure report to the checked task. - if t.CheckerForTaskID != "" { - report := exec.ErrorMsg - if reportErr := p.store.UpdateTaskCheckerReport(t.CheckerForTaskID, report); reportErr != nil { - p.logger.Error("handleRunResult: failed to set checker report", "taskID", t.CheckerForTaskID, "error", reportErr) - } + } + // If this is a checker task, attach the failure report for any terminal + // failure state (FAILED, TIMED_OUT, CANCELLED, BUDGET_EXCEEDED). + if t.CheckerForTaskID != "" && exec.ErrorMsg != "" { + if reportErr := p.store.UpdateTaskCheckerReport(t.CheckerForTaskID, exec.ErrorMsg); reportErr != nil { + p.logger.Error("handleRunResult: failed to set checker report", "taskID", t.CheckerForTaskID, "error", reportErr) } } if t.StoryID != "" && exec.Status == "FAILED" { @@ -510,7 +514,8 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. p.logger.Error("failed to update task summary", "taskID", t.ID, "error", summaryErr) } } - if t.CheckerForTaskID != "" && exec.Status == "FAILED" && summary != "" { + terminalFailure := exec.Status == "FAILED" || exec.Status == "TIMED_OUT" || exec.Status == "CANCELLED" || exec.Status == "BUDGET_EXCEEDED" + if t.CheckerForTaskID != "" && terminalFailure && summary != "" { // Overwrite the initial error-message report with the richer summary. if reportErr := p.store.UpdateTaskCheckerReport(t.CheckerForTaskID, summary); reportErr != nil { p.logger.Error("handleRunResult: failed to update checker report with summary", "taskID", t.CheckerForTaskID, "error", reportErr) |
