diff options
| author | Peter Stone <thepeterstone@gmail.com> | 2026-04-04 09:18:52 +0000 |
|---|---|---|
| committer | Peter Stone <thepeterstone@gmail.com> | 2026-04-04 09:18:52 +0000 |
| commit | 2917c580ae3eab093e9e655ccdf210030b7b9d1f (patch) | |
| tree | 2d1d89af605a19a3b70c3eee5972921c083be364 /internal/executor | |
| parent | d4aeb496a883fa20194ae01d127535494e684812 (diff) | |
fix: executor checker — close race, test flakiness, checker report for all failure states
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/executor')
| -rw-r--r-- | internal/executor/executor.go | 21 | ||||
| -rw-r--r-- | internal/executor/executor_test.go | 23 |
2 files changed, 26 insertions, 18 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go index e87d066..a60a771 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -155,11 +155,15 @@ func (p *Pool) Submit(ctx context.Context, t *task.Task) error { p.mu.Unlock() return fmt.Errorf("executor pool is shut down") } - p.mu.Unlock() + // Send while holding the lock so that Shutdown cannot close workCh between + // the closed-check above and the send below. The dispatch goroutine never + // holds p.mu while receiving from workCh, so this cannot deadlock. select { case p.workCh <- workItem{ctx: ctx, task: t}: + p.mu.Unlock() return nil default: + p.mu.Unlock() return fmt.Errorf("executor work queue full (capacity %d)", cap(p.workCh)) } } @@ -419,12 +423,12 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. p.mu.Lock() p.consecutiveFailures[agentType]++ p.mu.Unlock() - // If this is a checker task, attach the failure report to the checked task. - if t.CheckerForTaskID != "" { - report := exec.ErrorMsg - if reportErr := p.store.UpdateTaskCheckerReport(t.CheckerForTaskID, report); reportErr != nil { - p.logger.Error("handleRunResult: failed to set checker report", "taskID", t.CheckerForTaskID, "error", reportErr) - } + } + // If this is a checker task, attach the failure report for any terminal + // failure state (FAILED, TIMED_OUT, CANCELLED, BUDGET_EXCEEDED). + if t.CheckerForTaskID != "" && exec.ErrorMsg != "" { + if reportErr := p.store.UpdateTaskCheckerReport(t.CheckerForTaskID, exec.ErrorMsg); reportErr != nil { + p.logger.Error("handleRunResult: failed to set checker report", "taskID", t.CheckerForTaskID, "error", reportErr) } } if t.StoryID != "" && exec.Status == "FAILED" { @@ -510,7 +514,8 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. p.logger.Error("failed to update task summary", "taskID", t.ID, "error", summaryErr) } } - if t.CheckerForTaskID != "" && exec.Status == "FAILED" && summary != "" { + terminalFailure := exec.Status == "FAILED" || exec.Status == "TIMED_OUT" || exec.Status == "CANCELLED" || exec.Status == "BUDGET_EXCEEDED" + if t.CheckerForTaskID != "" && terminalFailure && summary != "" { // Overwrite the initial error-message report with the richer summary. if reportErr := p.store.UpdateTaskCheckerReport(t.CheckerForTaskID, summary); reportErr != nil { p.logger.Error("handleRunResult: failed to update checker report with summary", "taskID", t.CheckerForTaskID, "error", reportErr) diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go index f5ac8b8..e947606 100644 --- a/internal/executor/executor_test.go +++ b/internal/executor/executor_test.go @@ -2097,12 +2097,19 @@ func TestPool_CheckerSpawned_OnReady(t *testing.T) { pool.Submit(context.Background(), tk) <-pool.Results() // wait for original task to finish - // Give the async spawnCheckerTask goroutine a moment to run. - time.Sleep(200 * time.Millisecond) - - checker, err := store.GetCheckerTask("checker-spawn-1") - if err != nil { - t.Fatalf("GetCheckerTask: %v", err) + // Poll until the async spawnCheckerTask goroutine has written the checker task. + var checker *task.Task + var err error + deadline := time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) { + checker, err = store.GetCheckerTask("checker-spawn-1") + if err != nil { + t.Fatalf("GetCheckerTask: %v", err) + } + if checker != nil { + break + } + time.Sleep(50 * time.Millisecond) } if checker == nil { t.Fatal("expected a checker task to be created, got nil") @@ -2143,10 +2150,8 @@ func TestPool_CheckerNotSpawned_ForSubtask(t *testing.T) { func TestPool_CheckerPass_AutoAcceptsTask(t *testing.T) { store := testStore(t) // Two-phase: first runner succeeds (original task), second also succeeds (checker). - callCount := 0 runner := &mockRunner{ onRun: func(t *task.Task, e *storage.Execution) error { - callCount++ return nil // both original and checker succeed }, } @@ -2179,10 +2184,8 @@ func TestPool_CheckerPass_AutoAcceptsTask(t *testing.T) { func TestPool_CheckerFail_AttachesReport(t *testing.T) { store := testStore(t) - callCount := 0 runner := &mockRunner{ onRun: func(t *task.Task, e *storage.Execution) error { - callCount++ if t.CheckerForTaskID != "" { return fmt.Errorf("test suite failed: 3 failures") } |
