From d4aeb496a883fa20194ae01d127535494e684812 Mon Sep 17 00:00:00 2001 From: Peter Stone Date: Sat, 4 Apr 2026 09:08:14 +0000 Subject: feat: spawn checker task on READY; auto-accept on pass; attach report on fail Co-Authored-By: Claude Sonnet 4.6 --- internal/executor/executor.go | 120 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 119 insertions(+), 1 deletion(-) (limited to 'internal/executor/executor.go') diff --git a/internal/executor/executor.go b/internal/executor/executor.go index a1f29ed..e87d066 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -38,6 +38,8 @@ type Store interface { ListTasksByStory(storyID string) ([]*task.Task, error) UpdateStoryStatus(id string, status task.StoryState) error CreateTask(t *task.Task) error + UpdateTaskCheckerReport(id, report string) error + GetCheckerTask(checkedTaskID string) (*task.Task, error) } // LogPather is an optional interface runners can implement to provide the log @@ -75,6 +77,7 @@ type Pool struct { rateLimited map[string]time.Time // agentType -> until cancels map[string]context.CancelFunc // taskID → cancel consecutiveFailures map[string]int // agentType -> count + closed bool // set to true when Shutdown has been called resultCh chan *Result startedCh chan string // task IDs that just transitioned to RUNNING workCh chan workItem // internal bounded queue; Submit enqueues here @@ -147,6 +150,12 @@ func (p *Pool) dispatch() { // work queue is full. When the pool is at capacity the task is buffered and // dispatched as soon as a slot becomes available. func (p *Pool) Submit(ctx context.Context, t *task.Task) error { + p.mu.Lock() + if p.closed { + p.mu.Unlock() + return fmt.Errorf("executor pool is shut down") + } + p.mu.Unlock() select { case p.workCh <- workItem{ctx: ctx, task: t}: return nil @@ -171,6 +180,9 @@ func (p *Pool) Results() <-chan *Result { func (p *Pool) Shutdown(ctx context.Context) error { // Stop the dispatch goroutine. We must wait for it to exit before calling // workerWg.Wait() to avoid a race between dispatch's Add(1) and Wait(). + p.mu.Lock() + p.closed = true + p.mu.Unlock() close(p.workCh) select { case <-p.dispatchDone: @@ -407,6 +419,13 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. p.mu.Lock() p.consecutiveFailures[agentType]++ p.mu.Unlock() + // If this is a checker task, attach the failure report to the checked task. + if t.CheckerForTaskID != "" { + report := exec.ErrorMsg + if reportErr := p.store.UpdateTaskCheckerReport(t.CheckerForTaskID, report); reportErr != nil { + p.logger.Error("handleRunResult: failed to set checker report", "taskID", t.CheckerForTaskID, "error", reportErr) + } + } } if t.StoryID != "" && exec.Status == "FAILED" { storyID := t.StoryID @@ -425,7 +444,23 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. p.mu.Lock() p.consecutiveFailures[agentType] = 0 p.mu.Unlock() - if t.ParentTaskID == "" { + if t.CheckerForTaskID != "" { + // Checker task succeeded — auto-accept the checked task. + exec.Status = "COMPLETED" + if err := p.store.UpdateTaskState(t.ID, task.StateCompleted); err != nil { + p.logger.Error("handleRunResult: failed to complete checker task", "taskID", t.ID, "error", err) + } + checkedTask, getErr := p.store.GetTask(t.CheckerForTaskID) + if getErr == nil { + if acceptErr := p.store.UpdateTaskState(t.CheckerForTaskID, task.StateCompleted); acceptErr != nil { + p.logger.Error("handleRunResult: failed to auto-accept checked task", "taskID", t.CheckerForTaskID, "error", acceptErr) + } else if checkedTask.StoryID != "" { + go p.checkStoryCompletion(context.Background(), checkedTask.StoryID) + } + } else { + p.logger.Error("handleRunResult: failed to get checked task", "taskID", t.CheckerForTaskID, "error", getErr) + } + } else if t.ParentTaskID == "" { subtasks, subErr := p.store.ListSubtasks(t.ID) if subErr != nil { p.logger.Error("failed to list subtasks", "taskID", t.ID, "error", subErr) @@ -440,6 +475,7 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. if err := p.store.UpdateTaskState(t.ID, task.StateReady); err != nil { p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateReady, "error", err) } + go p.spawnCheckerTask(context.Background(), t) } } else { exec.Status = "COMPLETED" @@ -474,6 +510,12 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. p.logger.Error("failed to update task summary", "taskID", t.ID, "error", summaryErr) } } + if t.CheckerForTaskID != "" && exec.Status == "FAILED" && summary != "" { + // Overwrite the initial error-message report with the richer summary. + if reportErr := p.store.UpdateTaskCheckerReport(t.CheckerForTaskID, summary); reportErr != nil { + p.logger.Error("handleRunResult: failed to update checker report with summary", "taskID", t.CheckerForTaskID, "error", reportErr) + } + } if exec.StdoutPath != "" { if cs := task.ParseChangestatFromFile(exec.StdoutPath); cs != nil { exec.Changestats = cs @@ -528,6 +570,82 @@ func (p *Pool) checkStoryCompletion(ctx context.Context, storyID string) { go p.triggerStoryDeploy(ctx, storyID) } +// spawnCheckerTask creates and submits a checker task for the given completed task. +// Guards: not called for subtasks, checker tasks, tasks without a repository URL, +// or tasks that already have a checker. +func (p *Pool) spawnCheckerTask(ctx context.Context, checked *task.Task) { + // Never spawn a checker for subtasks, checker tasks, or tasks without a repository. + if checked.ParentTaskID != "" || checked.CheckerForTaskID != "" || checked.RepositoryURL == "" { + return + } + // Idempotent: don't create a second checker if one already exists. + existing, err := p.store.GetCheckerTask(checked.ID) + if err != nil { + p.logger.Error("spawnCheckerTask: GetCheckerTask failed", "taskID", checked.ID, "error", err) + return + } + if existing != nil { + return + } + + criteria := checked.AcceptanceCriteria + if criteria == "" { + criteria = checked.Agent.Instructions + } + + instructions := fmt.Sprintf(`You are validating a completed task. Do not make any changes to the code or repository. + +Task: %s +Instructions given to the implementor: +%s + +Acceptance criteria: +%s + +Steps: +1. Clone the repository and review the changes made. +2. Verify each acceptance criterion is met. Run tests or make HTTP requests as needed. +3. If all criteria are satisfied, exit normally (success). +4. If any criterion is not met, use the Bash tool to exit with a non-zero code: + bash -c "exit 1" + Before exiting, write a brief summary of what failed.`, checked.Name, checked.Agent.Instructions, criteria) + + now := time.Now().UTC() + checker := &task.Task{ + ID: uuid.New().String(), + Name: "Check: " + checked.Name, + CheckerForTaskID: checked.ID, + RepositoryURL: checked.RepositoryURL, + Agent: task.AgentConfig{ + Type: "claude", + Instructions: instructions, + MaxBudgetUSD: 0.50, + AllowedTools: []string{"Bash", "Read", "Glob", "Grep"}, + }, + Timeout: task.Duration{Duration: 10 * time.Minute}, + Priority: task.PriorityNormal, + Tags: []string{}, + DependsOn: []string{}, + Retry: task.RetryConfig{MaxAttempts: 1, Backoff: "linear"}, + State: task.StatePending, + CreatedAt: now, + UpdatedAt: now, + } + + if err := p.store.CreateTask(checker); err != nil { + p.logger.Error("spawnCheckerTask: CreateTask failed", "error", err) + return + } + checker.State = task.StateQueued + if err := p.store.UpdateTaskState(checker.ID, task.StateQueued); err != nil { + p.logger.Error("spawnCheckerTask: UpdateTaskState failed", "error", err) + return + } + if err := p.Submit(ctx, checker); err != nil { + p.logger.Error("spawnCheckerTask: Submit failed", "error", err) + } +} + // triggerStoryDeploy runs the project deploy script for a SHIPPABLE story // and advances it to DEPLOYED on success. func (p *Pool) triggerStoryDeploy(ctx context.Context, storyID string) { -- cgit v1.2.3