summaryrefslogtreecommitdiff
path: root/internal/executor
diff options
context:
space:
mode:
Diffstat (limited to 'internal/executor')
-rw-r--r--internal/executor/executor.go120
-rw-r--r--internal/executor/executor_test.go138
2 files changed, 257 insertions, 1 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index a1f29ed..e87d066 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -38,6 +38,8 @@ type Store interface {
ListTasksByStory(storyID string) ([]*task.Task, error)
UpdateStoryStatus(id string, status task.StoryState) error
CreateTask(t *task.Task) error
+ UpdateTaskCheckerReport(id, report string) error
+ GetCheckerTask(checkedTaskID string) (*task.Task, error)
}
// LogPather is an optional interface runners can implement to provide the log
@@ -75,6 +77,7 @@ type Pool struct {
rateLimited map[string]time.Time // agentType -> until
cancels map[string]context.CancelFunc // taskID → cancel
consecutiveFailures map[string]int // agentType -> count
+ closed bool // set to true when Shutdown has been called
resultCh chan *Result
startedCh chan string // task IDs that just transitioned to RUNNING
workCh chan workItem // internal bounded queue; Submit enqueues here
@@ -147,6 +150,12 @@ func (p *Pool) dispatch() {
// work queue is full. When the pool is at capacity the task is buffered and
// dispatched as soon as a slot becomes available.
func (p *Pool) Submit(ctx context.Context, t *task.Task) error {
+ p.mu.Lock()
+ if p.closed {
+ p.mu.Unlock()
+ return fmt.Errorf("executor pool is shut down")
+ }
+ p.mu.Unlock()
select {
case p.workCh <- workItem{ctx: ctx, task: t}:
return nil
@@ -171,6 +180,9 @@ func (p *Pool) Results() <-chan *Result {
func (p *Pool) Shutdown(ctx context.Context) error {
// Stop the dispatch goroutine. We must wait for it to exit before calling
// workerWg.Wait() to avoid a race between dispatch's Add(1) and Wait().
+ p.mu.Lock()
+ p.closed = true
+ p.mu.Unlock()
close(p.workCh)
select {
case <-p.dispatchDone:
@@ -407,6 +419,13 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.
p.mu.Lock()
p.consecutiveFailures[agentType]++
p.mu.Unlock()
+ // If this is a checker task, attach the failure report to the checked task.
+ if t.CheckerForTaskID != "" {
+ report := exec.ErrorMsg
+ if reportErr := p.store.UpdateTaskCheckerReport(t.CheckerForTaskID, report); reportErr != nil {
+ p.logger.Error("handleRunResult: failed to set checker report", "taskID", t.CheckerForTaskID, "error", reportErr)
+ }
+ }
}
if t.StoryID != "" && exec.Status == "FAILED" {
storyID := t.StoryID
@@ -425,7 +444,23 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.
p.mu.Lock()
p.consecutiveFailures[agentType] = 0
p.mu.Unlock()
- if t.ParentTaskID == "" {
+ if t.CheckerForTaskID != "" {
+ // Checker task succeeded — auto-accept the checked task.
+ exec.Status = "COMPLETED"
+ if err := p.store.UpdateTaskState(t.ID, task.StateCompleted); err != nil {
+ p.logger.Error("handleRunResult: failed to complete checker task", "taskID", t.ID, "error", err)
+ }
+ checkedTask, getErr := p.store.GetTask(t.CheckerForTaskID)
+ if getErr == nil {
+ if acceptErr := p.store.UpdateTaskState(t.CheckerForTaskID, task.StateCompleted); acceptErr != nil {
+ p.logger.Error("handleRunResult: failed to auto-accept checked task", "taskID", t.CheckerForTaskID, "error", acceptErr)
+ } else if checkedTask.StoryID != "" {
+ go p.checkStoryCompletion(context.Background(), checkedTask.StoryID)
+ }
+ } else {
+ p.logger.Error("handleRunResult: failed to get checked task", "taskID", t.CheckerForTaskID, "error", getErr)
+ }
+ } else if t.ParentTaskID == "" {
subtasks, subErr := p.store.ListSubtasks(t.ID)
if subErr != nil {
p.logger.Error("failed to list subtasks", "taskID", t.ID, "error", subErr)
@@ -440,6 +475,7 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.
if err := p.store.UpdateTaskState(t.ID, task.StateReady); err != nil {
p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateReady, "error", err)
}
+ go p.spawnCheckerTask(context.Background(), t)
}
} else {
exec.Status = "COMPLETED"
@@ -474,6 +510,12 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.
p.logger.Error("failed to update task summary", "taskID", t.ID, "error", summaryErr)
}
}
+ if t.CheckerForTaskID != "" && exec.Status == "FAILED" && summary != "" {
+ // Overwrite the initial error-message report with the richer summary.
+ if reportErr := p.store.UpdateTaskCheckerReport(t.CheckerForTaskID, summary); reportErr != nil {
+ p.logger.Error("handleRunResult: failed to update checker report with summary", "taskID", t.CheckerForTaskID, "error", reportErr)
+ }
+ }
if exec.StdoutPath != "" {
if cs := task.ParseChangestatFromFile(exec.StdoutPath); cs != nil {
exec.Changestats = cs
@@ -528,6 +570,82 @@ func (p *Pool) checkStoryCompletion(ctx context.Context, storyID string) {
go p.triggerStoryDeploy(ctx, storyID)
}
+// spawnCheckerTask creates and submits a checker task for the given completed task.
+// Guards: not called for subtasks, checker tasks, tasks without a repository URL,
+// or tasks that already have a checker.
+func (p *Pool) spawnCheckerTask(ctx context.Context, checked *task.Task) {
+ // Never spawn a checker for subtasks, checker tasks, or tasks without a repository.
+ if checked.ParentTaskID != "" || checked.CheckerForTaskID != "" || checked.RepositoryURL == "" {
+ return
+ }
+ // Idempotent: don't create a second checker if one already exists.
+ existing, err := p.store.GetCheckerTask(checked.ID)
+ if err != nil {
+ p.logger.Error("spawnCheckerTask: GetCheckerTask failed", "taskID", checked.ID, "error", err)
+ return
+ }
+ if existing != nil {
+ return
+ }
+
+ criteria := checked.AcceptanceCriteria
+ if criteria == "" {
+ criteria = checked.Agent.Instructions
+ }
+
+ instructions := fmt.Sprintf(`You are validating a completed task. Do not make any changes to the code or repository.
+
+Task: %s
+Instructions given to the implementor:
+%s
+
+Acceptance criteria:
+%s
+
+Steps:
+1. Clone the repository and review the changes made.
+2. Verify each acceptance criterion is met. Run tests or make HTTP requests as needed.
+3. If all criteria are satisfied, exit normally (success).
+4. If any criterion is not met, use the Bash tool to exit with a non-zero code:
+ bash -c "exit 1"
+ Before exiting, write a brief summary of what failed.`, checked.Name, checked.Agent.Instructions, criteria)
+
+ now := time.Now().UTC()
+ checker := &task.Task{
+ ID: uuid.New().String(),
+ Name: "Check: " + checked.Name,
+ CheckerForTaskID: checked.ID,
+ RepositoryURL: checked.RepositoryURL,
+ Agent: task.AgentConfig{
+ Type: "claude",
+ Instructions: instructions,
+ MaxBudgetUSD: 0.50,
+ AllowedTools: []string{"Bash", "Read", "Glob", "Grep"},
+ },
+ Timeout: task.Duration{Duration: 10 * time.Minute},
+ Priority: task.PriorityNormal,
+ Tags: []string{},
+ DependsOn: []string{},
+ Retry: task.RetryConfig{MaxAttempts: 1, Backoff: "linear"},
+ State: task.StatePending,
+ CreatedAt: now,
+ UpdatedAt: now,
+ }
+
+ if err := p.store.CreateTask(checker); err != nil {
+ p.logger.Error("spawnCheckerTask: CreateTask failed", "error", err)
+ return
+ }
+ checker.State = task.StateQueued
+ if err := p.store.UpdateTaskState(checker.ID, task.StateQueued); err != nil {
+ p.logger.Error("spawnCheckerTask: UpdateTaskState failed", "error", err)
+ return
+ }
+ if err := p.Submit(ctx, checker); err != nil {
+ p.logger.Error("spawnCheckerTask: Submit failed", "error", err)
+ }
+}
+
// triggerStoryDeploy runs the project deploy script for a SHIPPABLE story
// and advances it to DEPLOYED on success.
func (p *Pool) triggerStoryDeploy(ctx context.Context, storyID string) {
diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go
index 267d9ca..f5ac8b8 100644
--- a/internal/executor/executor_test.go
+++ b/internal/executor/executor_test.go
@@ -1109,6 +1109,8 @@ func (m *minimalMockStore) GetStory(_ string) (*task.Story, error)
func (m *minimalMockStore) ListTasksByStory(_ string) ([]*task.Task, error) { return nil, nil }
func (m *minimalMockStore) UpdateStoryStatus(_ string, _ task.StoryState) error { return nil }
func (m *minimalMockStore) CreateTask(_ *task.Task) error { return nil }
+func (m *minimalMockStore) UpdateTaskCheckerReport(_ string, _ string) error { return nil }
+func (m *minimalMockStore) GetCheckerTask(_ string) (*task.Task, error) { return nil, nil }
func (m *minimalMockStore) lastStateUpdate() (string, task.State, bool) {
m.mu.Lock()
@@ -2083,3 +2085,139 @@ func TestPool_Shutdown_TimesOut(t *testing.T) {
}
close(unblock) // cleanup
}
+
+func TestPool_CheckerSpawned_OnReady(t *testing.T) {
+ store := testStore(t)
+ runner := &mockRunner{} // succeeds instantly
+ pool := NewPool(2, map[string]Runner{"claude": runner}, store, slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})))
+
+ tk := makeTask("checker-spawn-1")
+ tk.RepositoryURL = "https://github.com/x/y"
+ store.CreateTask(tk)
+ pool.Submit(context.Background(), tk)
+ <-pool.Results() // wait for original task to finish
+
+ // Give the async spawnCheckerTask goroutine a moment to run.
+ time.Sleep(200 * time.Millisecond)
+
+ checker, err := store.GetCheckerTask("checker-spawn-1")
+ if err != nil {
+ t.Fatalf("GetCheckerTask: %v", err)
+ }
+ if checker == nil {
+ t.Fatal("expected a checker task to be created, got nil")
+ }
+ if checker.CheckerForTaskID != "checker-spawn-1" {
+ t.Errorf("expected CheckerForTaskID=checker-spawn-1, got %q", checker.CheckerForTaskID)
+ }
+}
+
+func TestPool_CheckerNotSpawned_ForSubtask(t *testing.T) {
+ store := testStore(t)
+ runner := &mockRunner{}
+ pool := NewPool(2, map[string]Runner{"claude": runner}, store, slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})))
+
+ parent := makeTask("no-checker-parent")
+ parent.RepositoryURL = "https://github.com/x/y"
+ store.CreateTask(parent)
+
+ sub := makeTask("no-checker-sub")
+ sub.ParentTaskID = "no-checker-parent"
+ sub.RepositoryURL = "https://github.com/x/y"
+ store.CreateTask(sub)
+
+ pool.Submit(context.Background(), sub)
+ <-pool.Results()
+
+ time.Sleep(100 * time.Millisecond)
+
+ checker, err := store.GetCheckerTask("no-checker-sub")
+ if err != nil {
+ t.Fatalf("GetCheckerTask: %v", err)
+ }
+ if checker != nil {
+ t.Error("expected no checker for subtask, but one was created")
+ }
+}
+
+func TestPool_CheckerPass_AutoAcceptsTask(t *testing.T) {
+ store := testStore(t)
+ // Two-phase: first runner succeeds (original task), second also succeeds (checker).
+ callCount := 0
+ runner := &mockRunner{
+ onRun: func(t *task.Task, e *storage.Execution) error {
+ callCount++
+ return nil // both original and checker succeed
+ },
+ }
+ pool := NewPool(2, map[string]Runner{"claude": runner}, store, slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})))
+
+ tk := makeTask("autoaccept-1")
+ tk.RepositoryURL = "https://github.com/x/y"
+ store.CreateTask(tk)
+ pool.Submit(context.Background(), tk)
+ <-pool.Results() // original finishes → READY + checker spawned
+
+ // Wait for checker to run and complete.
+ deadline := time.Now().Add(5 * time.Second)
+ for time.Now().Before(deadline) {
+ got, _ := store.GetTask("autoaccept-1")
+ if got != nil && got.State == task.StateCompleted {
+ break
+ }
+ <-pool.Results()
+ }
+
+ got, err := store.GetTask("autoaccept-1")
+ if err != nil {
+ t.Fatalf("GetTask: %v", err)
+ }
+ if got.State != task.StateCompleted {
+ t.Errorf("expected COMPLETED after checker pass, got %s", got.State)
+ }
+}
+
+func TestPool_CheckerFail_AttachesReport(t *testing.T) {
+ store := testStore(t)
+ callCount := 0
+ runner := &mockRunner{
+ onRun: func(t *task.Task, e *storage.Execution) error {
+ callCount++
+ if t.CheckerForTaskID != "" {
+ return fmt.Errorf("test suite failed: 3 failures")
+ }
+ return nil // original task succeeds
+ },
+ }
+ pool := NewPool(2, map[string]Runner{"claude": runner}, store, slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})))
+
+ tk := makeTask("fail-checker-1")
+ tk.RepositoryURL = "https://github.com/x/y"
+ store.CreateTask(tk)
+ pool.Submit(context.Background(), tk)
+ <-pool.Results() // original → READY
+
+ // Wait for checker to fail.
+ deadline := time.Now().Add(5 * time.Second)
+ for time.Now().Before(deadline) {
+ got, _ := store.GetTask("fail-checker-1")
+ if got != nil && got.CheckerReport != "" {
+ break
+ }
+ select {
+ case <-pool.Results():
+ case <-time.After(100 * time.Millisecond):
+ }
+ }
+
+ got, err := store.GetTask("fail-checker-1")
+ if err != nil {
+ t.Fatalf("GetTask: %v", err)
+ }
+ if got.State != task.StateReady {
+ t.Errorf("expected task to stay READY after checker fail, got %s", got.State)
+ }
+ if got.CheckerReport == "" {
+ t.Error("expected checker_report to be set after checker failure")
+ }
+}