diff options
| -rw-r--r-- | internal/executor/executor.go | 120 | ||||
| -rw-r--r-- | internal/executor/executor_test.go | 138 |
2 files changed, 257 insertions, 1 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go index a1f29ed..e87d066 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -38,6 +38,8 @@ type Store interface { ListTasksByStory(storyID string) ([]*task.Task, error) UpdateStoryStatus(id string, status task.StoryState) error CreateTask(t *task.Task) error + UpdateTaskCheckerReport(id, report string) error + GetCheckerTask(checkedTaskID string) (*task.Task, error) } // LogPather is an optional interface runners can implement to provide the log @@ -75,6 +77,7 @@ type Pool struct { rateLimited map[string]time.Time // agentType -> until cancels map[string]context.CancelFunc // taskID → cancel consecutiveFailures map[string]int // agentType -> count + closed bool // set to true when Shutdown has been called resultCh chan *Result startedCh chan string // task IDs that just transitioned to RUNNING workCh chan workItem // internal bounded queue; Submit enqueues here @@ -147,6 +150,12 @@ func (p *Pool) dispatch() { // work queue is full. When the pool is at capacity the task is buffered and // dispatched as soon as a slot becomes available. func (p *Pool) Submit(ctx context.Context, t *task.Task) error { + p.mu.Lock() + if p.closed { + p.mu.Unlock() + return fmt.Errorf("executor pool is shut down") + } + p.mu.Unlock() select { case p.workCh <- workItem{ctx: ctx, task: t}: return nil @@ -171,6 +180,9 @@ func (p *Pool) Results() <-chan *Result { func (p *Pool) Shutdown(ctx context.Context) error { // Stop the dispatch goroutine. We must wait for it to exit before calling // workerWg.Wait() to avoid a race between dispatch's Add(1) and Wait(). + p.mu.Lock() + p.closed = true + p.mu.Unlock() close(p.workCh) select { case <-p.dispatchDone: @@ -407,6 +419,13 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. p.mu.Lock() p.consecutiveFailures[agentType]++ p.mu.Unlock() + // If this is a checker task, attach the failure report to the checked task. + if t.CheckerForTaskID != "" { + report := exec.ErrorMsg + if reportErr := p.store.UpdateTaskCheckerReport(t.CheckerForTaskID, report); reportErr != nil { + p.logger.Error("handleRunResult: failed to set checker report", "taskID", t.CheckerForTaskID, "error", reportErr) + } + } } if t.StoryID != "" && exec.Status == "FAILED" { storyID := t.StoryID @@ -425,7 +444,23 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. p.mu.Lock() p.consecutiveFailures[agentType] = 0 p.mu.Unlock() - if t.ParentTaskID == "" { + if t.CheckerForTaskID != "" { + // Checker task succeeded — auto-accept the checked task. + exec.Status = "COMPLETED" + if err := p.store.UpdateTaskState(t.ID, task.StateCompleted); err != nil { + p.logger.Error("handleRunResult: failed to complete checker task", "taskID", t.ID, "error", err) + } + checkedTask, getErr := p.store.GetTask(t.CheckerForTaskID) + if getErr == nil { + if acceptErr := p.store.UpdateTaskState(t.CheckerForTaskID, task.StateCompleted); acceptErr != nil { + p.logger.Error("handleRunResult: failed to auto-accept checked task", "taskID", t.CheckerForTaskID, "error", acceptErr) + } else if checkedTask.StoryID != "" { + go p.checkStoryCompletion(context.Background(), checkedTask.StoryID) + } + } else { + p.logger.Error("handleRunResult: failed to get checked task", "taskID", t.CheckerForTaskID, "error", getErr) + } + } else if t.ParentTaskID == "" { subtasks, subErr := p.store.ListSubtasks(t.ID) if subErr != nil { p.logger.Error("failed to list subtasks", "taskID", t.ID, "error", subErr) @@ -440,6 +475,7 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. if err := p.store.UpdateTaskState(t.ID, task.StateReady); err != nil { p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateReady, "error", err) } + go p.spawnCheckerTask(context.Background(), t) } } else { exec.Status = "COMPLETED" @@ -474,6 +510,12 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. p.logger.Error("failed to update task summary", "taskID", t.ID, "error", summaryErr) } } + if t.CheckerForTaskID != "" && exec.Status == "FAILED" && summary != "" { + // Overwrite the initial error-message report with the richer summary. + if reportErr := p.store.UpdateTaskCheckerReport(t.CheckerForTaskID, summary); reportErr != nil { + p.logger.Error("handleRunResult: failed to update checker report with summary", "taskID", t.CheckerForTaskID, "error", reportErr) + } + } if exec.StdoutPath != "" { if cs := task.ParseChangestatFromFile(exec.StdoutPath); cs != nil { exec.Changestats = cs @@ -528,6 +570,82 @@ func (p *Pool) checkStoryCompletion(ctx context.Context, storyID string) { go p.triggerStoryDeploy(ctx, storyID) } +// spawnCheckerTask creates and submits a checker task for the given completed task. +// Guards: not called for subtasks, checker tasks, tasks without a repository URL, +// or tasks that already have a checker. +func (p *Pool) spawnCheckerTask(ctx context.Context, checked *task.Task) { + // Never spawn a checker for subtasks, checker tasks, or tasks without a repository. + if checked.ParentTaskID != "" || checked.CheckerForTaskID != "" || checked.RepositoryURL == "" { + return + } + // Idempotent: don't create a second checker if one already exists. + existing, err := p.store.GetCheckerTask(checked.ID) + if err != nil { + p.logger.Error("spawnCheckerTask: GetCheckerTask failed", "taskID", checked.ID, "error", err) + return + } + if existing != nil { + return + } + + criteria := checked.AcceptanceCriteria + if criteria == "" { + criteria = checked.Agent.Instructions + } + + instructions := fmt.Sprintf(`You are validating a completed task. Do not make any changes to the code or repository. + +Task: %s +Instructions given to the implementor: +%s + +Acceptance criteria: +%s + +Steps: +1. Clone the repository and review the changes made. +2. Verify each acceptance criterion is met. Run tests or make HTTP requests as needed. +3. If all criteria are satisfied, exit normally (success). +4. If any criterion is not met, use the Bash tool to exit with a non-zero code: + bash -c "exit 1" + Before exiting, write a brief summary of what failed.`, checked.Name, checked.Agent.Instructions, criteria) + + now := time.Now().UTC() + checker := &task.Task{ + ID: uuid.New().String(), + Name: "Check: " + checked.Name, + CheckerForTaskID: checked.ID, + RepositoryURL: checked.RepositoryURL, + Agent: task.AgentConfig{ + Type: "claude", + Instructions: instructions, + MaxBudgetUSD: 0.50, + AllowedTools: []string{"Bash", "Read", "Glob", "Grep"}, + }, + Timeout: task.Duration{Duration: 10 * time.Minute}, + Priority: task.PriorityNormal, + Tags: []string{}, + DependsOn: []string{}, + Retry: task.RetryConfig{MaxAttempts: 1, Backoff: "linear"}, + State: task.StatePending, + CreatedAt: now, + UpdatedAt: now, + } + + if err := p.store.CreateTask(checker); err != nil { + p.logger.Error("spawnCheckerTask: CreateTask failed", "error", err) + return + } + checker.State = task.StateQueued + if err := p.store.UpdateTaskState(checker.ID, task.StateQueued); err != nil { + p.logger.Error("spawnCheckerTask: UpdateTaskState failed", "error", err) + return + } + if err := p.Submit(ctx, checker); err != nil { + p.logger.Error("spawnCheckerTask: Submit failed", "error", err) + } +} + // triggerStoryDeploy runs the project deploy script for a SHIPPABLE story // and advances it to DEPLOYED on success. func (p *Pool) triggerStoryDeploy(ctx context.Context, storyID string) { diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go index 267d9ca..f5ac8b8 100644 --- a/internal/executor/executor_test.go +++ b/internal/executor/executor_test.go @@ -1109,6 +1109,8 @@ func (m *minimalMockStore) GetStory(_ string) (*task.Story, error) func (m *minimalMockStore) ListTasksByStory(_ string) ([]*task.Task, error) { return nil, nil } func (m *minimalMockStore) UpdateStoryStatus(_ string, _ task.StoryState) error { return nil } func (m *minimalMockStore) CreateTask(_ *task.Task) error { return nil } +func (m *minimalMockStore) UpdateTaskCheckerReport(_ string, _ string) error { return nil } +func (m *minimalMockStore) GetCheckerTask(_ string) (*task.Task, error) { return nil, nil } func (m *minimalMockStore) lastStateUpdate() (string, task.State, bool) { m.mu.Lock() @@ -2083,3 +2085,139 @@ func TestPool_Shutdown_TimesOut(t *testing.T) { } close(unblock) // cleanup } + +func TestPool_CheckerSpawned_OnReady(t *testing.T) { + store := testStore(t) + runner := &mockRunner{} // succeeds instantly + pool := NewPool(2, map[string]Runner{"claude": runner}, store, slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))) + + tk := makeTask("checker-spawn-1") + tk.RepositoryURL = "https://github.com/x/y" + store.CreateTask(tk) + pool.Submit(context.Background(), tk) + <-pool.Results() // wait for original task to finish + + // Give the async spawnCheckerTask goroutine a moment to run. + time.Sleep(200 * time.Millisecond) + + checker, err := store.GetCheckerTask("checker-spawn-1") + if err != nil { + t.Fatalf("GetCheckerTask: %v", err) + } + if checker == nil { + t.Fatal("expected a checker task to be created, got nil") + } + if checker.CheckerForTaskID != "checker-spawn-1" { + t.Errorf("expected CheckerForTaskID=checker-spawn-1, got %q", checker.CheckerForTaskID) + } +} + +func TestPool_CheckerNotSpawned_ForSubtask(t *testing.T) { + store := testStore(t) + runner := &mockRunner{} + pool := NewPool(2, map[string]Runner{"claude": runner}, store, slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))) + + parent := makeTask("no-checker-parent") + parent.RepositoryURL = "https://github.com/x/y" + store.CreateTask(parent) + + sub := makeTask("no-checker-sub") + sub.ParentTaskID = "no-checker-parent" + sub.RepositoryURL = "https://github.com/x/y" + store.CreateTask(sub) + + pool.Submit(context.Background(), sub) + <-pool.Results() + + time.Sleep(100 * time.Millisecond) + + checker, err := store.GetCheckerTask("no-checker-sub") + if err != nil { + t.Fatalf("GetCheckerTask: %v", err) + } + if checker != nil { + t.Error("expected no checker for subtask, but one was created") + } +} + +func TestPool_CheckerPass_AutoAcceptsTask(t *testing.T) { + store := testStore(t) + // Two-phase: first runner succeeds (original task), second also succeeds (checker). + callCount := 0 + runner := &mockRunner{ + onRun: func(t *task.Task, e *storage.Execution) error { + callCount++ + return nil // both original and checker succeed + }, + } + pool := NewPool(2, map[string]Runner{"claude": runner}, store, slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))) + + tk := makeTask("autoaccept-1") + tk.RepositoryURL = "https://github.com/x/y" + store.CreateTask(tk) + pool.Submit(context.Background(), tk) + <-pool.Results() // original finishes → READY + checker spawned + + // Wait for checker to run and complete. + deadline := time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) { + got, _ := store.GetTask("autoaccept-1") + if got != nil && got.State == task.StateCompleted { + break + } + <-pool.Results() + } + + got, err := store.GetTask("autoaccept-1") + if err != nil { + t.Fatalf("GetTask: %v", err) + } + if got.State != task.StateCompleted { + t.Errorf("expected COMPLETED after checker pass, got %s", got.State) + } +} + +func TestPool_CheckerFail_AttachesReport(t *testing.T) { + store := testStore(t) + callCount := 0 + runner := &mockRunner{ + onRun: func(t *task.Task, e *storage.Execution) error { + callCount++ + if t.CheckerForTaskID != "" { + return fmt.Errorf("test suite failed: 3 failures") + } + return nil // original task succeeds + }, + } + pool := NewPool(2, map[string]Runner{"claude": runner}, store, slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))) + + tk := makeTask("fail-checker-1") + tk.RepositoryURL = "https://github.com/x/y" + store.CreateTask(tk) + pool.Submit(context.Background(), tk) + <-pool.Results() // original → READY + + // Wait for checker to fail. + deadline := time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) { + got, _ := store.GetTask("fail-checker-1") + if got != nil && got.CheckerReport != "" { + break + } + select { + case <-pool.Results(): + case <-time.After(100 * time.Millisecond): + } + } + + got, err := store.GetTask("fail-checker-1") + if err != nil { + t.Fatalf("GetTask: %v", err) + } + if got.State != task.StateReady { + t.Errorf("expected task to stay READY after checker fail, got %s", got.State) + } + if got.CheckerReport == "" { + t.Error("expected checker_report to be set after checker failure") + } +} |
