summaryrefslogtreecommitdiff
path: root/internal/executor/executor.go
diff options
context:
space:
mode:
authorPeter Stone <thepeterstone@gmail.com>2026-04-04 09:08:14 +0000
committerPeter Stone <thepeterstone@gmail.com>2026-04-04 09:08:14 +0000
commitd4aeb496a883fa20194ae01d127535494e684812 (patch)
treea2b709e48d54b31df49d7b3a162c2e41eb09444e /internal/executor/executor.go
parent4b20fb7cabd46b193fbbe15f0481dd3e1929274f (diff)
feat: spawn checker task on READY; auto-accept on pass; attach report on fail
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/executor/executor.go')
-rw-r--r--internal/executor/executor.go120
1 files changed, 119 insertions, 1 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index a1f29ed..e87d066 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -38,6 +38,8 @@ type Store interface {
ListTasksByStory(storyID string) ([]*task.Task, error)
UpdateStoryStatus(id string, status task.StoryState) error
CreateTask(t *task.Task) error
+ UpdateTaskCheckerReport(id, report string) error
+ GetCheckerTask(checkedTaskID string) (*task.Task, error)
}
// LogPather is an optional interface runners can implement to provide the log
@@ -75,6 +77,7 @@ type Pool struct {
rateLimited map[string]time.Time // agentType -> until
cancels map[string]context.CancelFunc // taskID → cancel
consecutiveFailures map[string]int // agentType -> count
+ closed bool // set to true when Shutdown has been called
resultCh chan *Result
startedCh chan string // task IDs that just transitioned to RUNNING
workCh chan workItem // internal bounded queue; Submit enqueues here
@@ -147,6 +150,12 @@ func (p *Pool) dispatch() {
// work queue is full. When the pool is at capacity the task is buffered and
// dispatched as soon as a slot becomes available.
func (p *Pool) Submit(ctx context.Context, t *task.Task) error {
+ p.mu.Lock()
+ if p.closed {
+ p.mu.Unlock()
+ return fmt.Errorf("executor pool is shut down")
+ }
+ p.mu.Unlock()
select {
case p.workCh <- workItem{ctx: ctx, task: t}:
return nil
@@ -171,6 +180,9 @@ func (p *Pool) Results() <-chan *Result {
func (p *Pool) Shutdown(ctx context.Context) error {
// Stop the dispatch goroutine. We must wait for it to exit before calling
// workerWg.Wait() to avoid a race between dispatch's Add(1) and Wait().
+ p.mu.Lock()
+ p.closed = true
+ p.mu.Unlock()
close(p.workCh)
select {
case <-p.dispatchDone:
@@ -407,6 +419,13 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.
p.mu.Lock()
p.consecutiveFailures[agentType]++
p.mu.Unlock()
+ // If this is a checker task, attach the failure report to the checked task.
+ if t.CheckerForTaskID != "" {
+ report := exec.ErrorMsg
+ if reportErr := p.store.UpdateTaskCheckerReport(t.CheckerForTaskID, report); reportErr != nil {
+ p.logger.Error("handleRunResult: failed to set checker report", "taskID", t.CheckerForTaskID, "error", reportErr)
+ }
+ }
}
if t.StoryID != "" && exec.Status == "FAILED" {
storyID := t.StoryID
@@ -425,7 +444,23 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.
p.mu.Lock()
p.consecutiveFailures[agentType] = 0
p.mu.Unlock()
- if t.ParentTaskID == "" {
+ if t.CheckerForTaskID != "" {
+ // Checker task succeeded — auto-accept the checked task.
+ exec.Status = "COMPLETED"
+ if err := p.store.UpdateTaskState(t.ID, task.StateCompleted); err != nil {
+ p.logger.Error("handleRunResult: failed to complete checker task", "taskID", t.ID, "error", err)
+ }
+ checkedTask, getErr := p.store.GetTask(t.CheckerForTaskID)
+ if getErr == nil {
+ if acceptErr := p.store.UpdateTaskState(t.CheckerForTaskID, task.StateCompleted); acceptErr != nil {
+ p.logger.Error("handleRunResult: failed to auto-accept checked task", "taskID", t.CheckerForTaskID, "error", acceptErr)
+ } else if checkedTask.StoryID != "" {
+ go p.checkStoryCompletion(context.Background(), checkedTask.StoryID)
+ }
+ } else {
+ p.logger.Error("handleRunResult: failed to get checked task", "taskID", t.CheckerForTaskID, "error", getErr)
+ }
+ } else if t.ParentTaskID == "" {
subtasks, subErr := p.store.ListSubtasks(t.ID)
if subErr != nil {
p.logger.Error("failed to list subtasks", "taskID", t.ID, "error", subErr)
@@ -440,6 +475,7 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.
if err := p.store.UpdateTaskState(t.ID, task.StateReady); err != nil {
p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateReady, "error", err)
}
+ go p.spawnCheckerTask(context.Background(), t)
}
} else {
exec.Status = "COMPLETED"
@@ -474,6 +510,12 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.
p.logger.Error("failed to update task summary", "taskID", t.ID, "error", summaryErr)
}
}
+ if t.CheckerForTaskID != "" && exec.Status == "FAILED" && summary != "" {
+ // Overwrite the initial error-message report with the richer summary.
+ if reportErr := p.store.UpdateTaskCheckerReport(t.CheckerForTaskID, summary); reportErr != nil {
+ p.logger.Error("handleRunResult: failed to update checker report with summary", "taskID", t.CheckerForTaskID, "error", reportErr)
+ }
+ }
if exec.StdoutPath != "" {
if cs := task.ParseChangestatFromFile(exec.StdoutPath); cs != nil {
exec.Changestats = cs
@@ -528,6 +570,82 @@ func (p *Pool) checkStoryCompletion(ctx context.Context, storyID string) {
go p.triggerStoryDeploy(ctx, storyID)
}
+// spawnCheckerTask creates and submits a checker task for the given completed task.
+// Guards: not called for subtasks, checker tasks, tasks without a repository URL,
+// or tasks that already have a checker.
+func (p *Pool) spawnCheckerTask(ctx context.Context, checked *task.Task) {
+ // Never spawn a checker for subtasks, checker tasks, or tasks without a repository.
+ if checked.ParentTaskID != "" || checked.CheckerForTaskID != "" || checked.RepositoryURL == "" {
+ return
+ }
+ // Idempotent: don't create a second checker if one already exists.
+ existing, err := p.store.GetCheckerTask(checked.ID)
+ if err != nil {
+ p.logger.Error("spawnCheckerTask: GetCheckerTask failed", "taskID", checked.ID, "error", err)
+ return
+ }
+ if existing != nil {
+ return
+ }
+
+ criteria := checked.AcceptanceCriteria
+ if criteria == "" {
+ criteria = checked.Agent.Instructions
+ }
+
+ instructions := fmt.Sprintf(`You are validating a completed task. Do not make any changes to the code or repository.
+
+Task: %s
+Instructions given to the implementor:
+%s
+
+Acceptance criteria:
+%s
+
+Steps:
+1. Clone the repository and review the changes made.
+2. Verify each acceptance criterion is met. Run tests or make HTTP requests as needed.
+3. If all criteria are satisfied, exit normally (success).
+4. If any criterion is not met, use the Bash tool to exit with a non-zero code:
+ bash -c "exit 1"
+ Before exiting, write a brief summary of what failed.`, checked.Name, checked.Agent.Instructions, criteria)
+
+ now := time.Now().UTC()
+ checker := &task.Task{
+ ID: uuid.New().String(),
+ Name: "Check: " + checked.Name,
+ CheckerForTaskID: checked.ID,
+ RepositoryURL: checked.RepositoryURL,
+ Agent: task.AgentConfig{
+ Type: "claude",
+ Instructions: instructions,
+ MaxBudgetUSD: 0.50,
+ AllowedTools: []string{"Bash", "Read", "Glob", "Grep"},
+ },
+ Timeout: task.Duration{Duration: 10 * time.Minute},
+ Priority: task.PriorityNormal,
+ Tags: []string{},
+ DependsOn: []string{},
+ Retry: task.RetryConfig{MaxAttempts: 1, Backoff: "linear"},
+ State: task.StatePending,
+ CreatedAt: now,
+ UpdatedAt: now,
+ }
+
+ if err := p.store.CreateTask(checker); err != nil {
+ p.logger.Error("spawnCheckerTask: CreateTask failed", "error", err)
+ return
+ }
+ checker.State = task.StateQueued
+ if err := p.store.UpdateTaskState(checker.ID, task.StateQueued); err != nil {
+ p.logger.Error("spawnCheckerTask: UpdateTaskState failed", "error", err)
+ return
+ }
+ if err := p.Submit(ctx, checker); err != nil {
+ p.logger.Error("spawnCheckerTask: Submit failed", "error", err)
+ }
+}
+
// triggerStoryDeploy runs the project deploy script for a SHIPPABLE story
// and advances it to DEPLOYED on success.
func (p *Pool) triggerStoryDeploy(ctx context.Context, storyID string) {