diff options
Diffstat (limited to 'internal/executor/executor_test.go')
| -rw-r--r-- | internal/executor/executor_test.go | 911 |
1 files changed, 897 insertions, 14 deletions
diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go index b1173cb..9214872 100644 --- a/internal/executor/executor_test.go +++ b/internal/executor/executor_test.go @@ -5,6 +5,7 @@ import ( "fmt" "log/slog" "os" + "os/exec" "path/filepath" "strings" "sync" @@ -600,10 +601,17 @@ func TestPool_RecoverStaleRunning(t *testing.T) { // Execution record should be closed as FAILED. execs, _ := store.ListExecutions(tk.ID) - if len(execs) == 0 || execs[0].Status != "FAILED" { + var failedExec *storage.Execution + for _, e := range execs { + if e.ID == "exec-stale-1" { + failedExec = e + break + } + } + if failedExec == nil || failedExec.Status != "FAILED" { t.Errorf("execution status: want FAILED, got %+v", execs) } - if execs[0].ErrorMsg == "" { + if failedExec.ErrorMsg == "" { t.Error("expected non-empty error message on recovered execution") } @@ -739,6 +747,119 @@ func TestPool_RecoverStaleBlocked_KeepsBlockedWhenSubtaskIncomplete(t *testing.T } } +func TestPool_RecoverStaleBlocked_PromotesQueuedParentWithAllSubtasksDone(t *testing.T) { + store := testStore(t) + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger) + + now := time.Now().UTC() + story := &task.Story{ + ID: "story-queued-parent", Name: "Queued Parent Story", + Status: task.StoryInProgress, CreatedAt: now, UpdatedAt: now, + } + store.CreateStory(story) + + // Parent task stuck QUEUED (approved with pre-created subtasks, never run). + parent := makeTask("queued-parent-1") + parent.State = task.StateQueued + parent.StoryID = story.ID + store.CreateTask(parent) + + for i := 0; i < 2; i++ { + sub := makeTask(fmt.Sprintf("queued-sub-%d", i)) + sub.ParentTaskID = parent.ID + sub.StoryID = story.ID + sub.State = task.StateCompleted + store.CreateTask(sub) + } + + pool.RecoverStaleBlocked() + + got, err := store.GetTask(parent.ID) + if err != nil { + t.Fatalf("GetTask: %v", err) + } + if got.State != task.StateReady { + t.Errorf("parent state: want READY, got %s", got.State) + } + + // Story should still be IN_PROGRESS — READY tasks don't satisfy the completion check; + // the task must be accepted (READY → COMPLETED) before the story advances to SHIPPABLE. + s, err := store.GetStory(story.ID) + if err != nil { + t.Fatalf("GetStory: %v", err) + } + if s.Status != task.StoryInProgress { + t.Errorf("story status: want IN_PROGRESS, got %s", s.Status) + } +} + +// TestPool_RecoverStaleBlocked_DoesNotPromoteQueuedLeafTask verifies that a top-level +// QUEUED task with NO subtasks is not promoted to READY by RecoverStaleBlocked. +// This guards against the bug where a task that failed to start (stuck in QUEUED due +// to a DB error) was incorrectly promoted to READY because the "all subtasks done" +// check is vacuously true when there are no subtasks. +func TestPool_RecoverStaleBlocked_DoesNotPromoteQueuedLeafTask(t *testing.T) { + store := testStore(t) + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger) + + // A top-level task stuck in QUEUED with no subtasks (e.g. DB lock prevented RUNNING transition). + leaf := makeTask("queued-leaf-no-subtasks") + leaf.State = task.StateQueued + store.CreateTask(leaf) + + pool.RecoverStaleBlocked() + + got, err := store.GetTask(leaf.ID) + if err != nil { + t.Fatalf("GetTask: %v", err) + } + if got.State != task.StateQueued { + t.Errorf("leaf task state: want QUEUED (unchanged), got %s", got.State) + } +} + +// TestPool_CheckStoryCompletion_ReadyTasksNotSufficient verifies that READY tasks +// alone do not advance a story to SHIPPABLE — tasks must be COMPLETED. +func TestPool_CheckStoryCompletion_ReadyTasksNotSufficient(t *testing.T) { + store := testStore(t) + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger) + + now := time.Now().UTC() + story := &task.Story{ + ID: "story-ready-only", + Name: "Ready Only Story", + Status: task.StoryInProgress, + CreatedAt: now, + UpdatedAt: now, + } + store.CreateStory(story) + + // One task driven to READY (checker pending), one COMPLETED. + tk1 := makeTask("ro-task-1") + tk1.StoryID = story.ID + store.CreateTask(tk1) + for _, s := range []task.State{task.StateQueued, task.StateRunning, task.StateReady} { + store.UpdateTaskState(tk1.ID, s) + } + + tk2 := makeTask("ro-task-2") + tk2.StoryID = story.ID + store.CreateTask(tk2) + for _, s := range []task.State{task.StateQueued, task.StateRunning, task.StateReady, task.StateCompleted} { + store.UpdateTaskState(tk2.ID, s) + } + + pool.checkStoryCompletion(context.Background(), story.ID) + + got, _ := store.GetStory(story.ID) + if got.Status != task.StoryInProgress { + t.Errorf("story status: want IN_PROGRESS (tk1 still READY/checker pending), got %s", got.Status) + } +} + func TestPool_ActivePerAgent_DeletesZeroEntries(t *testing.T) { store := testStore(t) runner := &mockRunner{} @@ -1014,7 +1135,10 @@ func (m *minimalMockStore) ListSubtasks(parentID string) ([]*task.Task, error) { return nil, nil } func (m *minimalMockStore) ListExecutions(_ string) ([]*storage.Execution, error) { return nil, nil } -func (m *minimalMockStore) CreateExecution(e *storage.Execution) error { return nil } +func (m *minimalMockStore) CreateExecution(e *storage.Execution) error { return nil } +func (m *minimalMockStore) CreateExecutionAndSetRunning(e *storage.Execution) error { + return nil +} func (m *minimalMockStore) UpdateExecution(e *storage.Execution) error { return m.updateExecErr } @@ -1064,6 +1188,14 @@ func (m *minimalMockStore) UpdateExecutionChangestats(execID string, stats *task m.mu.Unlock() return nil } +func (m *minimalMockStore) RecordAgentEvent(_ storage.AgentEvent) error { return nil } +func (m *minimalMockStore) GetProject(_ string) (*task.Project, error) { return nil, nil } +func (m *minimalMockStore) GetStory(_ string) (*task.Story, error) { return nil, nil } +func (m *minimalMockStore) ListTasksByStory(_ string) ([]*task.Task, error) { return nil, nil } +func (m *minimalMockStore) UpdateStoryStatus(_ string, _ task.StoryState) error { return nil } +func (m *minimalMockStore) CreateTask(_ *task.Task) error { return nil } +func (m *minimalMockStore) UpdateTaskCheckerReport(_ string, _ string) error { return nil } +func (m *minimalMockStore) GetCheckerTask(_ string) (*task.Task, error) { return nil, nil } func (m *minimalMockStore) lastStateUpdate() (string, task.State, bool) { m.mu.Lock() @@ -1078,17 +1210,18 @@ func (m *minimalMockStore) lastStateUpdate() (string, task.State, bool) { func newPoolWithMockStore(store Store) *Pool { logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) return &Pool{ - maxConcurrent: 2, - runners: map[string]Runner{"claude": &mockRunner{}}, - store: store, - logger: logger, - activePerAgent: make(map[string]int), - rateLimited: make(map[string]time.Time), - cancels: make(map[string]context.CancelFunc), - resultCh: make(chan *Result, 4), - workCh: make(chan workItem, 4), - doneCh: make(chan struct{}, 2), - Questions: NewQuestionRegistry(), + maxConcurrent: 2, + maxPerAgent: 1, + runners: map[string]Runner{"claude": &mockRunner{}}, + store: store, + logger: logger, + activePerAgent: make(map[string]int), + rateLimited: make(map[string]time.Time), + cancels: make(map[string]context.CancelFunc), + consecutiveFailures: make(map[string]int), + resultCh: make(chan *Result, 4), + workCh: make(chan workItem, 4), + doneCh: make(chan struct{}, 2), } } @@ -1236,6 +1369,11 @@ func TestPool_SpecificAgent_SkipsLoadBalancing(t *testing.T) { logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(4, runners, store, logger) + // Raise per-agent limit so the concurrency gate doesn't interfere with this test. + // The injected activePerAgent is only to make pickAgent prefer "claude", + // verifying that explicit agent type bypasses load balancing. + pool.maxPerAgent = 10 + // Inject 2 active tasks for gemini, 0 for claude. // pickAgent would normally pick "claude". pool.mu.Lock() @@ -1425,3 +1563,748 @@ func TestExecute_MalformedChangestats(t *testing.T) { t.Errorf("expected nil changestats for malformed output, got %+v", execs[0].Changestats) } } + +func TestPool_MaxPerAgent_BlocksSecondTask(t *testing.T) { + store := testStore(t) + + var mu sync.Mutex + concurrentRuns := 0 + maxConcurrent := 0 + + runner := &mockRunner{ + delay: 100 * time.Millisecond, + onRun: func(tk *task.Task, e *storage.Execution) error { + mu.Lock() + concurrentRuns++ + if concurrentRuns > maxConcurrent { + maxConcurrent = concurrentRuns + } + mu.Unlock() + time.Sleep(100 * time.Millisecond) + mu.Lock() + concurrentRuns-- + mu.Unlock() + return nil + }, + } + runners := map[string]Runner{"claude": runner} + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + pool := NewPool(2, runners, store, logger) // pool size 2, but maxPerAgent=1 + pool.requeueDelay = 50 * time.Millisecond // speed up test + + tk1 := makeTask("mpa-1") + tk2 := makeTask("mpa-2") + store.CreateTask(tk1) + store.CreateTask(tk2) + + pool.Submit(context.Background(), tk1) + pool.Submit(context.Background(), tk2) + + for i := 0; i < 2; i++ { + select { + case <-pool.Results(): + case <-time.After(10 * time.Second): + t.Fatal("timed out waiting for result") + } + } + + mu.Lock() + got := maxConcurrent + mu.Unlock() + if got > 1 { + t.Errorf("maxPerAgent=1 violated: %d claude tasks ran concurrently", got) + } +} + +func TestPool_MaxPerAgent_AllowsDifferentAgents(t *testing.T) { + store := testStore(t) + + var mu sync.Mutex + concurrentRuns := 0 + maxConcurrent := 0 + + makeSlowRunner := func() *mockRunner { + return &mockRunner{ + onRun: func(tk *task.Task, e *storage.Execution) error { + mu.Lock() + concurrentRuns++ + if concurrentRuns > maxConcurrent { + maxConcurrent = concurrentRuns + } + mu.Unlock() + time.Sleep(80 * time.Millisecond) + mu.Lock() + concurrentRuns-- + mu.Unlock() + return nil + }, + } + } + + runners := map[string]Runner{ + "claude": makeSlowRunner(), + "gemini": makeSlowRunner(), + } + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + pool := NewPool(2, runners, store, logger) + + tk1 := makeTask("da-1") + tk1.Agent.Type = "claude" + tk2 := makeTask("da-2") + tk2.Agent.Type = "gemini" + store.CreateTask(tk1) + store.CreateTask(tk2) + + pool.Submit(context.Background(), tk1) + pool.Submit(context.Background(), tk2) + + for i := 0; i < 2; i++ { + select { + case <-pool.Results(): + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for result") + } + } + + mu.Lock() + got := maxConcurrent + mu.Unlock() + if got < 2 { + t.Errorf("different agents should run concurrently; max concurrent was %d", got) + } +} + +func TestPool_ConsecutiveFailures_ResetOnSuccess(t *testing.T) { + store := testStore(t) + + callCount := 0 + runner := &mockRunner{ + onRun: func(tk *task.Task, e *storage.Execution) error { + callCount++ + if callCount == 1 { + return fmt.Errorf("first failure") + } + return nil // second call succeeds + }, + } + runners := map[string]Runner{"claude": runner} + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + pool := NewPool(2, runners, store, logger) + + // First task fails + tk1 := makeTask("rs-1") + store.CreateTask(tk1) + pool.Submit(context.Background(), tk1) + <-pool.Results() + + pool.mu.Lock() + failsBefore := pool.consecutiveFailures["claude"] + pool.mu.Unlock() + if failsBefore != 1 { + t.Errorf("expected 1 failure after first task, got %d", failsBefore) + } + + // Second task succeeds — counter resets. + tk2 := makeTask("rs-2") + store.CreateTask(tk2) + pool.Submit(context.Background(), tk2) + <-pool.Results() + + pool.mu.Lock() + failsAfter := pool.consecutiveFailures["claude"] + pool.mu.Unlock() + + if failsAfter != 0 { + t.Errorf("expected consecutiveFailures reset to 0 after success, got %d", failsAfter) + } +} + +func TestPool_CheckStoryCompletion_AllComplete(t *testing.T) { + store := testStore(t) + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger) + + // Create a story in IN_PROGRESS state. + now := time.Now().UTC() + story := &task.Story{ + ID: "story-comp-1", + Name: "Completion Test", + Status: task.StoryInProgress, + CreatedAt: now, + UpdatedAt: now, + } + if err := store.CreateStory(story); err != nil { + t.Fatalf("CreateStory: %v", err) + } + + // Create two top-level story tasks and drive them through valid transitions to COMPLETED. + for i, id := range []string{"sctask-1", "sctask-2"} { + tk := makeTask(id) + tk.StoryID = "story-comp-1" + tk.State = task.StatePending + if err := store.CreateTask(tk); err != nil { + t.Fatalf("CreateTask %d: %v", i, err) + } + for _, s := range []task.State{task.StateQueued, task.StateRunning, task.StateReady, task.StateCompleted} { + if err := store.UpdateTaskState(id, s); err != nil { + t.Fatalf("UpdateTaskState %s → %s: %v", id, s, err) + } + } + } + + pool.checkStoryCompletion(context.Background(), "story-comp-1") + + got, err := store.GetStory("story-comp-1") + if err != nil { + t.Fatalf("GetStory: %v", err) + } + if got.Status != task.StoryShippable { + t.Errorf("story status: want SHIPPABLE, got %v", got.Status) + } +} + +func TestPool_CheckStoryCompletion_PartialComplete(t *testing.T) { + store := testStore(t) + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger) + + now := time.Now().UTC() + story := &task.Story{ + ID: "story-partial-1", + Name: "Partial Test", + Status: task.StoryInProgress, + CreatedAt: now, + UpdatedAt: now, + } + if err := store.CreateStory(story); err != nil { + t.Fatalf("CreateStory: %v", err) + } + + // First top-level task driven to READY. + tk1 := makeTask("sptask-1") + tk1.StoryID = "story-partial-1" + store.CreateTask(tk1) + for _, s := range []task.State{task.StateQueued, task.StateRunning, task.StateReady} { + store.UpdateTaskState("sptask-1", s) + } + + // Second top-level task still in PENDING (not done). + tk2 := makeTask("sptask-2") + tk2.StoryID = "story-partial-1" + store.CreateTask(tk2) + + pool.checkStoryCompletion(context.Background(), "story-partial-1") + + got, err := store.GetStory("story-partial-1") + if err != nil { + t.Fatalf("GetStory: %v", err) + } + if got.Status != task.StoryInProgress { + t.Errorf("story status: want IN_PROGRESS (no transition), got %v", got.Status) + } +} + +func TestPool_StoryDeploy_RunsDeployScript(t *testing.T) { + store := testStore(t) + runner := &mockRunner{} + runners := map[string]Runner{"claude": runner} + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + pool := NewPool(2, runners, store, logger) + + // Create a deploy script that writes a marker file. + tmpDir := t.TempDir() + markerFile := filepath.Join(tmpDir, "deployed.marker") + scriptPath := filepath.Join(tmpDir, "deploy.sh") + scriptContent := "#!/bin/sh\ntouch " + markerFile + "\n" + if err := os.WriteFile(scriptPath, []byte(scriptContent), 0755); err != nil { + t.Fatalf("write deploy script: %v", err) + } + + proj := &task.Project{ + ID: "proj-deploy-1", + Name: "Deploy Test Project", + DeployScript: scriptPath, + } + if err := store.CreateProject(proj); err != nil { + t.Fatalf("create project: %v", err) + } + + story := &task.Story{ + ID: "story-deploy-1", + Name: "Deploy Test Story", + ProjectID: proj.ID, + Status: task.StoryShippable, + } + if err := store.CreateStory(story); err != nil { + t.Fatalf("create story: %v", err) + } + + pool.triggerStoryDeploy(context.Background(), story.ID) + + if _, err := os.Stat(markerFile); os.IsNotExist(err) { + t.Error("deploy script did not run: marker file not found") + } + + got, err := store.GetStory(story.ID) + if err != nil { + t.Fatalf("get story: %v", err) + } + if got.Status != task.StoryDeployed { + t.Errorf("story status: want DEPLOYED, got %q", got.Status) + } +} + +func runGit(t *testing.T, dir string, args ...string) { + t.Helper() + cmd := exec.Command("git", args...) + if dir != "" { + cmd.Dir = dir + } + if out, err := cmd.CombinedOutput(); err != nil { + t.Fatalf("git %v: %v\n%s", args, err, out) + } +} + +func TestPool_StoryDeploy_MergesStoryBranch(t *testing.T) { + tmpDir := t.TempDir() + + // Set up bare repo + working copy with a story branch. + bareDir := filepath.Join(tmpDir, "bare.git") + localDir := filepath.Join(tmpDir, "local") + runGit(t, "", "init", "--bare", bareDir) + runGit(t, "", "clone", bareDir, localDir) + runGit(t, localDir, "config", "user.email", "test@test.com") + runGit(t, localDir, "config", "user.name", "Test") + + // Initial commit on main. + runGit(t, localDir, "checkout", "-b", "main") + os.WriteFile(filepath.Join(localDir, "README.md"), []byte("initial"), 0644) + runGit(t, localDir, "add", ".") + runGit(t, localDir, "commit", "-m", "initial") + runGit(t, localDir, "push", "-u", "origin", "main") + + // Story branch with a feature commit. + runGit(t, localDir, "checkout", "-b", "story/test-feature") + os.WriteFile(filepath.Join(localDir, "feature.go"), []byte("package main"), 0644) + runGit(t, localDir, "add", ".") + runGit(t, localDir, "commit", "-m", "feature work") + runGit(t, localDir, "push", "origin", "story/test-feature") + runGit(t, localDir, "checkout", "main") + + store := testStore(t) + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger) + + scriptPath := filepath.Join(tmpDir, "deploy.sh") + os.WriteFile(scriptPath, []byte("#!/bin/sh\nexit 0\n"), 0755) + + proj := &task.Project{ + ID: "proj-merge-1", Name: "Merge Test", + LocalPath: localDir, DeployScript: scriptPath, + } + if err := store.CreateProject(proj); err != nil { + t.Fatalf("create project: %v", err) + } + story := &task.Story{ + ID: "story-merge-1", Name: "Merge Test Story", + ProjectID: proj.ID, BranchName: "story/test-feature", + Status: task.StoryShippable, + } + if err := store.CreateStory(story); err != nil { + t.Fatalf("create story: %v", err) + } + + pool.triggerStoryDeploy(context.Background(), story.ID) + + // feature.go should now be on main in the working copy. + if _, err := os.Stat(filepath.Join(localDir, "feature.go")); os.IsNotExist(err) { + t.Error("story branch was not merged to main: feature.go missing") + } + got, _ := store.GetStory(story.ID) + if got.Status != task.StoryDeployed { + t.Errorf("story status: want DEPLOYED, got %q", got.Status) + } +} + +func TestPool_PostDeploy_CreatesValidationTask(t *testing.T) { + store := testStore(t) + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger) + + now := time.Now().UTC() + validationSpec := `{"type":"smoke","steps":["curl /health"],"success_criteria":"status 200"}` + story := &task.Story{ + ID: "story-postdeploy-1", + Name: "Post Deploy Test", + Status: task.StoryDeployed, + ValidationJSON: validationSpec, + CreatedAt: now, + UpdatedAt: now, + } + if err := store.CreateStory(story); err != nil { + t.Fatalf("CreateStory: %v", err) + } + + pool.createValidationTask(context.Background(), story.ID) + + // Story should now be VALIDATING. + got, err := store.GetStory(story.ID) + if err != nil { + t.Fatalf("GetStory: %v", err) + } + if got.Status != task.StoryValidating { + t.Errorf("story status: want VALIDATING, got %q", got.Status) + } + + // A validation task should have been created. + tasks, err := store.ListTasksByStory(story.ID) + if err != nil { + t.Fatalf("ListTasksByStory: %v", err) + } + if len(tasks) == 0 { + t.Fatal("expected a validation task to be created, got none") + } + vtask := tasks[0] + if !strings.Contains(strings.ToLower(vtask.Name), "validation") { + t.Errorf("task name %q does not contain 'validation'", vtask.Name) + } + if vtask.StoryID != story.ID { + t.Errorf("task story_id: want %q, got %q", story.ID, vtask.StoryID) + } + if !strings.Contains(vtask.Agent.Instructions, "smoke") { + t.Errorf("task instructions %q do not reference validation spec content", vtask.Agent.Instructions) + } +} + +func TestPool_ValidationTask_Pass_SetsReviewReady(t *testing.T) { + store := testStore(t) + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger) + + now := time.Now().UTC() + story := &task.Story{ + ID: "story-val-pass-1", + Name: "Validation Pass", + Status: task.StoryValidating, + CreatedAt: now, + UpdatedAt: now, + } + if err := store.CreateStory(story); err != nil { + t.Fatalf("CreateStory: %v", err) + } + + pool.checkValidationResult(context.Background(), story.ID, task.StateCompleted, "") + + got, err := store.GetStory(story.ID) + if err != nil { + t.Fatalf("GetStory: %v", err) + } + if got.Status != task.StoryReviewReady { + t.Errorf("story status: want REVIEW_READY, got %q", got.Status) + } +} + +// TestPool_DependsOn_NoDeadlock verifies that a task waiting for a dependency +// does NOT hold the per-agent slot, allowing the dependency to run first. +func TestPool_DependsOn_NoDeadlock(t *testing.T) { + store := testStore(t) + runner := &mockRunner{} // succeeds immediately + pool := NewPool(2, map[string]Runner{"claude": runner}, store, + slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))) + pool.requeueDelay = 10 * time.Millisecond + + // Task A has no deps; Task B depends on A. + taskA := makeTask("dep-a") + taskA.State = task.StateQueued + taskB := makeTask("dep-b") + taskB.DependsOn = []string{"dep-a"} + taskB.State = task.StateQueued + + store.CreateTask(taskA) + store.CreateTask(taskB) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Submit B first — it should not deadlock by holding the slot while waiting for A. + pool.Submit(ctx, taskB) + pool.Submit(ctx, taskA) + + var gotA, gotB bool + for i := 0; i < 2; i++ { + select { + case res := <-pool.Results(): + if res.TaskID == "dep-a" { + gotA = true + } + if res.TaskID == "dep-b" { + gotB = true + } + case <-ctx.Done(): + t.Fatal("timeout: likely deadlock — dep-b held the slot while waiting for dep-a") + } + } + if !gotA || !gotB { + t.Errorf("expected both tasks to complete: gotA=%v gotB=%v", gotA, gotB) + } + + // B must complete after A. + ta, _ := store.GetTask("dep-a") + tb, _ := store.GetTask("dep-b") + if ta.State != task.StateReady && ta.State != task.StateCompleted { + t.Errorf("dep-a should be READY/COMPLETED, got %s", ta.State) + } + if tb.State != task.StateReady && tb.State != task.StateCompleted { + t.Errorf("dep-b should be READY/COMPLETED, got %s", tb.State) + } +} + +func TestPool_ValidationTask_Fail_SetsNeedsFix(t *testing.T) { + store := testStore(t) + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger) + + now := time.Now().UTC() + story := &task.Story{ + ID: "story-val-fail-1", + Name: "Validation Fail", + Status: task.StoryValidating, + CreatedAt: now, + UpdatedAt: now, + } + if err := store.CreateStory(story); err != nil { + t.Fatalf("CreateStory: %v", err) + } + + execErr := "smoke test failed: /health returned 503" + pool.checkValidationResult(context.Background(), story.ID, task.StateFailed, execErr) + + got, err := store.GetStory(story.ID) + if err != nil { + t.Fatalf("GetStory: %v", err) + } + if got.Status != task.StoryNeedsFix { + t.Errorf("story status: want NEEDS_FIX, got %q", got.Status) + } +} + +func TestPool_Shutdown_WaitsForWorkers(t *testing.T) { + store := testStore(t) + started := make(chan struct{}) + unblock := make(chan struct{}) + runner := &mockRunner{ + onRun: func(t *task.Task, e *storage.Execution) error { + close(started) + <-unblock + return nil + }, + } + pool := NewPool(1, map[string]Runner{"claude": runner}, store, + slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))) + + tk := makeTask("shutdown-task") + tk.State = task.StateQueued + store.CreateTask(tk) + pool.Submit(context.Background(), tk) + + // Wait until the worker has started. + select { + case <-started: + case <-time.After(5 * time.Second): + t.Fatal("worker did not start") + } + + // Shutdown should block until we unblock the worker. + done := make(chan error, 1) + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + done <- pool.Shutdown(ctx) + }() + + // Shutdown should not have returned yet. + select { + case err := <-done: + t.Fatalf("Shutdown returned early: %v", err) + case <-time.After(50 * time.Millisecond): + } + + close(unblock) // let the worker finish + + select { + case err := <-done: + if err != nil { + t.Errorf("Shutdown returned error: %v", err) + } + case <-time.After(5 * time.Second): + t.Fatal("Shutdown did not return after worker finished") + } +} + +func TestPool_Shutdown_TimesOut(t *testing.T) { + store := testStore(t) + unblock := make(chan struct{}) + runner := &mockRunner{ + onRun: func(t *task.Task, e *storage.Execution) error { + <-unblock // never unblocked + return nil + }, + } + pool := NewPool(1, map[string]Runner{"claude": runner}, store, + slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))) + + tk := makeTask("shutdown-timeout-task") + tk.State = task.StateQueued + store.CreateTask(tk) + pool.Submit(context.Background(), tk) + + // Give worker a moment to start. + time.Sleep(50 * time.Millisecond) + + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + err := pool.Shutdown(ctx) + if err == nil { + t.Error("expected timeout error, got nil") + } + close(unblock) // cleanup +} + +func TestPool_CheckerSpawned_OnReady(t *testing.T) { + store := testStore(t) + runner := &mockRunner{} // succeeds instantly + pool := NewPool(2, map[string]Runner{"claude": runner}, store, slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))) + + tk := makeTask("checker-spawn-1") + tk.RepositoryURL = "https://github.com/x/y" + store.CreateTask(tk) + pool.Submit(context.Background(), tk) + <-pool.Results() // wait for original task to finish + + // Poll until the async spawnCheckerTask goroutine has written the checker task. + var checker *task.Task + var err error + deadline := time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) { + checker, err = store.GetCheckerTask("checker-spawn-1") + if err != nil { + t.Fatalf("GetCheckerTask: %v", err) + } + if checker != nil { + break + } + time.Sleep(50 * time.Millisecond) + } + if checker == nil { + t.Fatal("expected a checker task to be created, got nil") + } + if checker.CheckerForTaskID != "checker-spawn-1" { + t.Errorf("expected CheckerForTaskID=checker-spawn-1, got %q", checker.CheckerForTaskID) + } +} + +func TestPool_CheckerNotSpawned_ForSubtask(t *testing.T) { + store := testStore(t) + runner := &mockRunner{} + pool := NewPool(2, map[string]Runner{"claude": runner}, store, slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))) + + parent := makeTask("no-checker-parent") + parent.RepositoryURL = "https://github.com/x/y" + store.CreateTask(parent) + + sub := makeTask("no-checker-sub") + sub.ParentTaskID = "no-checker-parent" + sub.RepositoryURL = "https://github.com/x/y" + store.CreateTask(sub) + + pool.Submit(context.Background(), sub) + <-pool.Results() + + time.Sleep(100 * time.Millisecond) + + checker, err := store.GetCheckerTask("no-checker-sub") + if err != nil { + t.Fatalf("GetCheckerTask: %v", err) + } + if checker != nil { + t.Error("expected no checker for subtask, but one was created") + } +} + +func TestPool_CheckerPass_AutoAcceptsTask(t *testing.T) { + store := testStore(t) + // Two-phase: first runner succeeds (original task), second also succeeds (checker). + runner := &mockRunner{ + onRun: func(t *task.Task, e *storage.Execution) error { + return nil // both original and checker succeed + }, + } + pool := NewPool(2, map[string]Runner{"claude": runner}, store, slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))) + + tk := makeTask("autoaccept-1") + tk.RepositoryURL = "https://github.com/x/y" + store.CreateTask(tk) + pool.Submit(context.Background(), tk) + <-pool.Results() // original finishes → READY + checker spawned + + // Wait for checker to run and complete. + deadline := time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) { + got, _ := store.GetTask("autoaccept-1") + if got != nil && got.State == task.StateCompleted { + break + } + <-pool.Results() + } + + got, err := store.GetTask("autoaccept-1") + if err != nil { + t.Fatalf("GetTask: %v", err) + } + if got.State != task.StateCompleted { + t.Errorf("expected COMPLETED after checker pass, got %s", got.State) + } +} + +func TestPool_CheckerFail_AttachesReport(t *testing.T) { + store := testStore(t) + runner := &mockRunner{ + onRun: func(t *task.Task, e *storage.Execution) error { + if t.CheckerForTaskID != "" { + return fmt.Errorf("test suite failed: 3 failures") + } + return nil // original task succeeds + }, + } + pool := NewPool(2, map[string]Runner{"claude": runner}, store, slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))) + + tk := makeTask("fail-checker-1") + tk.RepositoryURL = "https://github.com/x/y" + store.CreateTask(tk) + pool.Submit(context.Background(), tk) + <-pool.Results() // original → READY + + // Wait for checker to fail. + deadline := time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) { + got, _ := store.GetTask("fail-checker-1") + if got != nil && got.CheckerReport != "" { + break + } + select { + case <-pool.Results(): + case <-time.After(100 * time.Millisecond): + } + } + + got, err := store.GetTask("fail-checker-1") + if err != nil { + t.Fatalf("GetTask: %v", err) + } + if got.State != task.StateReady { + t.Errorf("expected task to stay READY after checker fail, got %s", got.State) + } + if got.CheckerReport == "" { + t.Error("expected checker_report to be set after checker failure") + } +} |
