summaryrefslogtreecommitdiff
path: root/internal/executor/executor_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/executor/executor_test.go')
-rw-r--r--internal/executor/executor_test.go911
1 files changed, 897 insertions, 14 deletions
diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go
index b1173cb..9214872 100644
--- a/internal/executor/executor_test.go
+++ b/internal/executor/executor_test.go
@@ -5,6 +5,7 @@ import (
"fmt"
"log/slog"
"os"
+ "os/exec"
"path/filepath"
"strings"
"sync"
@@ -600,10 +601,17 @@ func TestPool_RecoverStaleRunning(t *testing.T) {
// Execution record should be closed as FAILED.
execs, _ := store.ListExecutions(tk.ID)
- if len(execs) == 0 || execs[0].Status != "FAILED" {
+ var failedExec *storage.Execution
+ for _, e := range execs {
+ if e.ID == "exec-stale-1" {
+ failedExec = e
+ break
+ }
+ }
+ if failedExec == nil || failedExec.Status != "FAILED" {
t.Errorf("execution status: want FAILED, got %+v", execs)
}
- if execs[0].ErrorMsg == "" {
+ if failedExec.ErrorMsg == "" {
t.Error("expected non-empty error message on recovered execution")
}
@@ -739,6 +747,119 @@ func TestPool_RecoverStaleBlocked_KeepsBlockedWhenSubtaskIncomplete(t *testing.T
}
}
+func TestPool_RecoverStaleBlocked_PromotesQueuedParentWithAllSubtasksDone(t *testing.T) {
+ store := testStore(t)
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger)
+
+ now := time.Now().UTC()
+ story := &task.Story{
+ ID: "story-queued-parent", Name: "Queued Parent Story",
+ Status: task.StoryInProgress, CreatedAt: now, UpdatedAt: now,
+ }
+ store.CreateStory(story)
+
+ // Parent task stuck QUEUED (approved with pre-created subtasks, never run).
+ parent := makeTask("queued-parent-1")
+ parent.State = task.StateQueued
+ parent.StoryID = story.ID
+ store.CreateTask(parent)
+
+ for i := 0; i < 2; i++ {
+ sub := makeTask(fmt.Sprintf("queued-sub-%d", i))
+ sub.ParentTaskID = parent.ID
+ sub.StoryID = story.ID
+ sub.State = task.StateCompleted
+ store.CreateTask(sub)
+ }
+
+ pool.RecoverStaleBlocked()
+
+ got, err := store.GetTask(parent.ID)
+ if err != nil {
+ t.Fatalf("GetTask: %v", err)
+ }
+ if got.State != task.StateReady {
+ t.Errorf("parent state: want READY, got %s", got.State)
+ }
+
+ // Story should still be IN_PROGRESS — READY tasks don't satisfy the completion check;
+ // the task must be accepted (READY → COMPLETED) before the story advances to SHIPPABLE.
+ s, err := store.GetStory(story.ID)
+ if err != nil {
+ t.Fatalf("GetStory: %v", err)
+ }
+ if s.Status != task.StoryInProgress {
+ t.Errorf("story status: want IN_PROGRESS, got %s", s.Status)
+ }
+}
+
+// TestPool_RecoverStaleBlocked_DoesNotPromoteQueuedLeafTask verifies that a top-level
+// QUEUED task with NO subtasks is not promoted to READY by RecoverStaleBlocked.
+// This guards against the bug where a task that failed to start (stuck in QUEUED due
+// to a DB error) was incorrectly promoted to READY because the "all subtasks done"
+// check is vacuously true when there are no subtasks.
+func TestPool_RecoverStaleBlocked_DoesNotPromoteQueuedLeafTask(t *testing.T) {
+ store := testStore(t)
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger)
+
+ // A top-level task stuck in QUEUED with no subtasks (e.g. DB lock prevented RUNNING transition).
+ leaf := makeTask("queued-leaf-no-subtasks")
+ leaf.State = task.StateQueued
+ store.CreateTask(leaf)
+
+ pool.RecoverStaleBlocked()
+
+ got, err := store.GetTask(leaf.ID)
+ if err != nil {
+ t.Fatalf("GetTask: %v", err)
+ }
+ if got.State != task.StateQueued {
+ t.Errorf("leaf task state: want QUEUED (unchanged), got %s", got.State)
+ }
+}
+
+// TestPool_CheckStoryCompletion_ReadyTasksNotSufficient verifies that READY tasks
+// alone do not advance a story to SHIPPABLE — tasks must be COMPLETED.
+func TestPool_CheckStoryCompletion_ReadyTasksNotSufficient(t *testing.T) {
+ store := testStore(t)
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger)
+
+ now := time.Now().UTC()
+ story := &task.Story{
+ ID: "story-ready-only",
+ Name: "Ready Only Story",
+ Status: task.StoryInProgress,
+ CreatedAt: now,
+ UpdatedAt: now,
+ }
+ store.CreateStory(story)
+
+ // One task driven to READY (checker pending), one COMPLETED.
+ tk1 := makeTask("ro-task-1")
+ tk1.StoryID = story.ID
+ store.CreateTask(tk1)
+ for _, s := range []task.State{task.StateQueued, task.StateRunning, task.StateReady} {
+ store.UpdateTaskState(tk1.ID, s)
+ }
+
+ tk2 := makeTask("ro-task-2")
+ tk2.StoryID = story.ID
+ store.CreateTask(tk2)
+ for _, s := range []task.State{task.StateQueued, task.StateRunning, task.StateReady, task.StateCompleted} {
+ store.UpdateTaskState(tk2.ID, s)
+ }
+
+ pool.checkStoryCompletion(context.Background(), story.ID)
+
+ got, _ := store.GetStory(story.ID)
+ if got.Status != task.StoryInProgress {
+ t.Errorf("story status: want IN_PROGRESS (tk1 still READY/checker pending), got %s", got.Status)
+ }
+}
+
func TestPool_ActivePerAgent_DeletesZeroEntries(t *testing.T) {
store := testStore(t)
runner := &mockRunner{}
@@ -1014,7 +1135,10 @@ func (m *minimalMockStore) ListSubtasks(parentID string) ([]*task.Task, error) {
return nil, nil
}
func (m *minimalMockStore) ListExecutions(_ string) ([]*storage.Execution, error) { return nil, nil }
-func (m *minimalMockStore) CreateExecution(e *storage.Execution) error { return nil }
+func (m *minimalMockStore) CreateExecution(e *storage.Execution) error { return nil }
+func (m *minimalMockStore) CreateExecutionAndSetRunning(e *storage.Execution) error {
+ return nil
+}
func (m *minimalMockStore) UpdateExecution(e *storage.Execution) error {
return m.updateExecErr
}
@@ -1064,6 +1188,14 @@ func (m *minimalMockStore) UpdateExecutionChangestats(execID string, stats *task
m.mu.Unlock()
return nil
}
+func (m *minimalMockStore) RecordAgentEvent(_ storage.AgentEvent) error { return nil }
+func (m *minimalMockStore) GetProject(_ string) (*task.Project, error) { return nil, nil }
+func (m *minimalMockStore) GetStory(_ string) (*task.Story, error) { return nil, nil }
+func (m *minimalMockStore) ListTasksByStory(_ string) ([]*task.Task, error) { return nil, nil }
+func (m *minimalMockStore) UpdateStoryStatus(_ string, _ task.StoryState) error { return nil }
+func (m *minimalMockStore) CreateTask(_ *task.Task) error { return nil }
+func (m *minimalMockStore) UpdateTaskCheckerReport(_ string, _ string) error { return nil }
+func (m *minimalMockStore) GetCheckerTask(_ string) (*task.Task, error) { return nil, nil }
func (m *minimalMockStore) lastStateUpdate() (string, task.State, bool) {
m.mu.Lock()
@@ -1078,17 +1210,18 @@ func (m *minimalMockStore) lastStateUpdate() (string, task.State, bool) {
func newPoolWithMockStore(store Store) *Pool {
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
return &Pool{
- maxConcurrent: 2,
- runners: map[string]Runner{"claude": &mockRunner{}},
- store: store,
- logger: logger,
- activePerAgent: make(map[string]int),
- rateLimited: make(map[string]time.Time),
- cancels: make(map[string]context.CancelFunc),
- resultCh: make(chan *Result, 4),
- workCh: make(chan workItem, 4),
- doneCh: make(chan struct{}, 2),
- Questions: NewQuestionRegistry(),
+ maxConcurrent: 2,
+ maxPerAgent: 1,
+ runners: map[string]Runner{"claude": &mockRunner{}},
+ store: store,
+ logger: logger,
+ activePerAgent: make(map[string]int),
+ rateLimited: make(map[string]time.Time),
+ cancels: make(map[string]context.CancelFunc),
+ consecutiveFailures: make(map[string]int),
+ resultCh: make(chan *Result, 4),
+ workCh: make(chan workItem, 4),
+ doneCh: make(chan struct{}, 2),
}
}
@@ -1236,6 +1369,11 @@ func TestPool_SpecificAgent_SkipsLoadBalancing(t *testing.T) {
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
pool := NewPool(4, runners, store, logger)
+ // Raise per-agent limit so the concurrency gate doesn't interfere with this test.
+ // The injected activePerAgent is only to make pickAgent prefer "claude",
+ // verifying that explicit agent type bypasses load balancing.
+ pool.maxPerAgent = 10
+
// Inject 2 active tasks for gemini, 0 for claude.
// pickAgent would normally pick "claude".
pool.mu.Lock()
@@ -1425,3 +1563,748 @@ func TestExecute_MalformedChangestats(t *testing.T) {
t.Errorf("expected nil changestats for malformed output, got %+v", execs[0].Changestats)
}
}
+
+func TestPool_MaxPerAgent_BlocksSecondTask(t *testing.T) {
+ store := testStore(t)
+
+ var mu sync.Mutex
+ concurrentRuns := 0
+ maxConcurrent := 0
+
+ runner := &mockRunner{
+ delay: 100 * time.Millisecond,
+ onRun: func(tk *task.Task, e *storage.Execution) error {
+ mu.Lock()
+ concurrentRuns++
+ if concurrentRuns > maxConcurrent {
+ maxConcurrent = concurrentRuns
+ }
+ mu.Unlock()
+ time.Sleep(100 * time.Millisecond)
+ mu.Lock()
+ concurrentRuns--
+ mu.Unlock()
+ return nil
+ },
+ }
+ runners := map[string]Runner{"claude": runner}
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, runners, store, logger) // pool size 2, but maxPerAgent=1
+ pool.requeueDelay = 50 * time.Millisecond // speed up test
+
+ tk1 := makeTask("mpa-1")
+ tk2 := makeTask("mpa-2")
+ store.CreateTask(tk1)
+ store.CreateTask(tk2)
+
+ pool.Submit(context.Background(), tk1)
+ pool.Submit(context.Background(), tk2)
+
+ for i := 0; i < 2; i++ {
+ select {
+ case <-pool.Results():
+ case <-time.After(10 * time.Second):
+ t.Fatal("timed out waiting for result")
+ }
+ }
+
+ mu.Lock()
+ got := maxConcurrent
+ mu.Unlock()
+ if got > 1 {
+ t.Errorf("maxPerAgent=1 violated: %d claude tasks ran concurrently", got)
+ }
+}
+
+func TestPool_MaxPerAgent_AllowsDifferentAgents(t *testing.T) {
+ store := testStore(t)
+
+ var mu sync.Mutex
+ concurrentRuns := 0
+ maxConcurrent := 0
+
+ makeSlowRunner := func() *mockRunner {
+ return &mockRunner{
+ onRun: func(tk *task.Task, e *storage.Execution) error {
+ mu.Lock()
+ concurrentRuns++
+ if concurrentRuns > maxConcurrent {
+ maxConcurrent = concurrentRuns
+ }
+ mu.Unlock()
+ time.Sleep(80 * time.Millisecond)
+ mu.Lock()
+ concurrentRuns--
+ mu.Unlock()
+ return nil
+ },
+ }
+ }
+
+ runners := map[string]Runner{
+ "claude": makeSlowRunner(),
+ "gemini": makeSlowRunner(),
+ }
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, runners, store, logger)
+
+ tk1 := makeTask("da-1")
+ tk1.Agent.Type = "claude"
+ tk2 := makeTask("da-2")
+ tk2.Agent.Type = "gemini"
+ store.CreateTask(tk1)
+ store.CreateTask(tk2)
+
+ pool.Submit(context.Background(), tk1)
+ pool.Submit(context.Background(), tk2)
+
+ for i := 0; i < 2; i++ {
+ select {
+ case <-pool.Results():
+ case <-time.After(5 * time.Second):
+ t.Fatal("timed out waiting for result")
+ }
+ }
+
+ mu.Lock()
+ got := maxConcurrent
+ mu.Unlock()
+ if got < 2 {
+ t.Errorf("different agents should run concurrently; max concurrent was %d", got)
+ }
+}
+
+func TestPool_ConsecutiveFailures_ResetOnSuccess(t *testing.T) {
+ store := testStore(t)
+
+ callCount := 0
+ runner := &mockRunner{
+ onRun: func(tk *task.Task, e *storage.Execution) error {
+ callCount++
+ if callCount == 1 {
+ return fmt.Errorf("first failure")
+ }
+ return nil // second call succeeds
+ },
+ }
+ runners := map[string]Runner{"claude": runner}
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, runners, store, logger)
+
+ // First task fails
+ tk1 := makeTask("rs-1")
+ store.CreateTask(tk1)
+ pool.Submit(context.Background(), tk1)
+ <-pool.Results()
+
+ pool.mu.Lock()
+ failsBefore := pool.consecutiveFailures["claude"]
+ pool.mu.Unlock()
+ if failsBefore != 1 {
+ t.Errorf("expected 1 failure after first task, got %d", failsBefore)
+ }
+
+ // Second task succeeds — counter resets.
+ tk2 := makeTask("rs-2")
+ store.CreateTask(tk2)
+ pool.Submit(context.Background(), tk2)
+ <-pool.Results()
+
+ pool.mu.Lock()
+ failsAfter := pool.consecutiveFailures["claude"]
+ pool.mu.Unlock()
+
+ if failsAfter != 0 {
+ t.Errorf("expected consecutiveFailures reset to 0 after success, got %d", failsAfter)
+ }
+}
+
+func TestPool_CheckStoryCompletion_AllComplete(t *testing.T) {
+ store := testStore(t)
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger)
+
+ // Create a story in IN_PROGRESS state.
+ now := time.Now().UTC()
+ story := &task.Story{
+ ID: "story-comp-1",
+ Name: "Completion Test",
+ Status: task.StoryInProgress,
+ CreatedAt: now,
+ UpdatedAt: now,
+ }
+ if err := store.CreateStory(story); err != nil {
+ t.Fatalf("CreateStory: %v", err)
+ }
+
+ // Create two top-level story tasks and drive them through valid transitions to COMPLETED.
+ for i, id := range []string{"sctask-1", "sctask-2"} {
+ tk := makeTask(id)
+ tk.StoryID = "story-comp-1"
+ tk.State = task.StatePending
+ if err := store.CreateTask(tk); err != nil {
+ t.Fatalf("CreateTask %d: %v", i, err)
+ }
+ for _, s := range []task.State{task.StateQueued, task.StateRunning, task.StateReady, task.StateCompleted} {
+ if err := store.UpdateTaskState(id, s); err != nil {
+ t.Fatalf("UpdateTaskState %s → %s: %v", id, s, err)
+ }
+ }
+ }
+
+ pool.checkStoryCompletion(context.Background(), "story-comp-1")
+
+ got, err := store.GetStory("story-comp-1")
+ if err != nil {
+ t.Fatalf("GetStory: %v", err)
+ }
+ if got.Status != task.StoryShippable {
+ t.Errorf("story status: want SHIPPABLE, got %v", got.Status)
+ }
+}
+
+func TestPool_CheckStoryCompletion_PartialComplete(t *testing.T) {
+ store := testStore(t)
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger)
+
+ now := time.Now().UTC()
+ story := &task.Story{
+ ID: "story-partial-1",
+ Name: "Partial Test",
+ Status: task.StoryInProgress,
+ CreatedAt: now,
+ UpdatedAt: now,
+ }
+ if err := store.CreateStory(story); err != nil {
+ t.Fatalf("CreateStory: %v", err)
+ }
+
+ // First top-level task driven to READY.
+ tk1 := makeTask("sptask-1")
+ tk1.StoryID = "story-partial-1"
+ store.CreateTask(tk1)
+ for _, s := range []task.State{task.StateQueued, task.StateRunning, task.StateReady} {
+ store.UpdateTaskState("sptask-1", s)
+ }
+
+ // Second top-level task still in PENDING (not done).
+ tk2 := makeTask("sptask-2")
+ tk2.StoryID = "story-partial-1"
+ store.CreateTask(tk2)
+
+ pool.checkStoryCompletion(context.Background(), "story-partial-1")
+
+ got, err := store.GetStory("story-partial-1")
+ if err != nil {
+ t.Fatalf("GetStory: %v", err)
+ }
+ if got.Status != task.StoryInProgress {
+ t.Errorf("story status: want IN_PROGRESS (no transition), got %v", got.Status)
+ }
+}
+
+func TestPool_StoryDeploy_RunsDeployScript(t *testing.T) {
+ store := testStore(t)
+ runner := &mockRunner{}
+ runners := map[string]Runner{"claude": runner}
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, runners, store, logger)
+
+ // Create a deploy script that writes a marker file.
+ tmpDir := t.TempDir()
+ markerFile := filepath.Join(tmpDir, "deployed.marker")
+ scriptPath := filepath.Join(tmpDir, "deploy.sh")
+ scriptContent := "#!/bin/sh\ntouch " + markerFile + "\n"
+ if err := os.WriteFile(scriptPath, []byte(scriptContent), 0755); err != nil {
+ t.Fatalf("write deploy script: %v", err)
+ }
+
+ proj := &task.Project{
+ ID: "proj-deploy-1",
+ Name: "Deploy Test Project",
+ DeployScript: scriptPath,
+ }
+ if err := store.CreateProject(proj); err != nil {
+ t.Fatalf("create project: %v", err)
+ }
+
+ story := &task.Story{
+ ID: "story-deploy-1",
+ Name: "Deploy Test Story",
+ ProjectID: proj.ID,
+ Status: task.StoryShippable,
+ }
+ if err := store.CreateStory(story); err != nil {
+ t.Fatalf("create story: %v", err)
+ }
+
+ pool.triggerStoryDeploy(context.Background(), story.ID)
+
+ if _, err := os.Stat(markerFile); os.IsNotExist(err) {
+ t.Error("deploy script did not run: marker file not found")
+ }
+
+ got, err := store.GetStory(story.ID)
+ if err != nil {
+ t.Fatalf("get story: %v", err)
+ }
+ if got.Status != task.StoryDeployed {
+ t.Errorf("story status: want DEPLOYED, got %q", got.Status)
+ }
+}
+
+func runGit(t *testing.T, dir string, args ...string) {
+ t.Helper()
+ cmd := exec.Command("git", args...)
+ if dir != "" {
+ cmd.Dir = dir
+ }
+ if out, err := cmd.CombinedOutput(); err != nil {
+ t.Fatalf("git %v: %v\n%s", args, err, out)
+ }
+}
+
+func TestPool_StoryDeploy_MergesStoryBranch(t *testing.T) {
+ tmpDir := t.TempDir()
+
+ // Set up bare repo + working copy with a story branch.
+ bareDir := filepath.Join(tmpDir, "bare.git")
+ localDir := filepath.Join(tmpDir, "local")
+ runGit(t, "", "init", "--bare", bareDir)
+ runGit(t, "", "clone", bareDir, localDir)
+ runGit(t, localDir, "config", "user.email", "test@test.com")
+ runGit(t, localDir, "config", "user.name", "Test")
+
+ // Initial commit on main.
+ runGit(t, localDir, "checkout", "-b", "main")
+ os.WriteFile(filepath.Join(localDir, "README.md"), []byte("initial"), 0644)
+ runGit(t, localDir, "add", ".")
+ runGit(t, localDir, "commit", "-m", "initial")
+ runGit(t, localDir, "push", "-u", "origin", "main")
+
+ // Story branch with a feature commit.
+ runGit(t, localDir, "checkout", "-b", "story/test-feature")
+ os.WriteFile(filepath.Join(localDir, "feature.go"), []byte("package main"), 0644)
+ runGit(t, localDir, "add", ".")
+ runGit(t, localDir, "commit", "-m", "feature work")
+ runGit(t, localDir, "push", "origin", "story/test-feature")
+ runGit(t, localDir, "checkout", "main")
+
+ store := testStore(t)
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger)
+
+ scriptPath := filepath.Join(tmpDir, "deploy.sh")
+ os.WriteFile(scriptPath, []byte("#!/bin/sh\nexit 0\n"), 0755)
+
+ proj := &task.Project{
+ ID: "proj-merge-1", Name: "Merge Test",
+ LocalPath: localDir, DeployScript: scriptPath,
+ }
+ if err := store.CreateProject(proj); err != nil {
+ t.Fatalf("create project: %v", err)
+ }
+ story := &task.Story{
+ ID: "story-merge-1", Name: "Merge Test Story",
+ ProjectID: proj.ID, BranchName: "story/test-feature",
+ Status: task.StoryShippable,
+ }
+ if err := store.CreateStory(story); err != nil {
+ t.Fatalf("create story: %v", err)
+ }
+
+ pool.triggerStoryDeploy(context.Background(), story.ID)
+
+ // feature.go should now be on main in the working copy.
+ if _, err := os.Stat(filepath.Join(localDir, "feature.go")); os.IsNotExist(err) {
+ t.Error("story branch was not merged to main: feature.go missing")
+ }
+ got, _ := store.GetStory(story.ID)
+ if got.Status != task.StoryDeployed {
+ t.Errorf("story status: want DEPLOYED, got %q", got.Status)
+ }
+}
+
+func TestPool_PostDeploy_CreatesValidationTask(t *testing.T) {
+ store := testStore(t)
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger)
+
+ now := time.Now().UTC()
+ validationSpec := `{"type":"smoke","steps":["curl /health"],"success_criteria":"status 200"}`
+ story := &task.Story{
+ ID: "story-postdeploy-1",
+ Name: "Post Deploy Test",
+ Status: task.StoryDeployed,
+ ValidationJSON: validationSpec,
+ CreatedAt: now,
+ UpdatedAt: now,
+ }
+ if err := store.CreateStory(story); err != nil {
+ t.Fatalf("CreateStory: %v", err)
+ }
+
+ pool.createValidationTask(context.Background(), story.ID)
+
+ // Story should now be VALIDATING.
+ got, err := store.GetStory(story.ID)
+ if err != nil {
+ t.Fatalf("GetStory: %v", err)
+ }
+ if got.Status != task.StoryValidating {
+ t.Errorf("story status: want VALIDATING, got %q", got.Status)
+ }
+
+ // A validation task should have been created.
+ tasks, err := store.ListTasksByStory(story.ID)
+ if err != nil {
+ t.Fatalf("ListTasksByStory: %v", err)
+ }
+ if len(tasks) == 0 {
+ t.Fatal("expected a validation task to be created, got none")
+ }
+ vtask := tasks[0]
+ if !strings.Contains(strings.ToLower(vtask.Name), "validation") {
+ t.Errorf("task name %q does not contain 'validation'", vtask.Name)
+ }
+ if vtask.StoryID != story.ID {
+ t.Errorf("task story_id: want %q, got %q", story.ID, vtask.StoryID)
+ }
+ if !strings.Contains(vtask.Agent.Instructions, "smoke") {
+ t.Errorf("task instructions %q do not reference validation spec content", vtask.Agent.Instructions)
+ }
+}
+
+func TestPool_ValidationTask_Pass_SetsReviewReady(t *testing.T) {
+ store := testStore(t)
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger)
+
+ now := time.Now().UTC()
+ story := &task.Story{
+ ID: "story-val-pass-1",
+ Name: "Validation Pass",
+ Status: task.StoryValidating,
+ CreatedAt: now,
+ UpdatedAt: now,
+ }
+ if err := store.CreateStory(story); err != nil {
+ t.Fatalf("CreateStory: %v", err)
+ }
+
+ pool.checkValidationResult(context.Background(), story.ID, task.StateCompleted, "")
+
+ got, err := store.GetStory(story.ID)
+ if err != nil {
+ t.Fatalf("GetStory: %v", err)
+ }
+ if got.Status != task.StoryReviewReady {
+ t.Errorf("story status: want REVIEW_READY, got %q", got.Status)
+ }
+}
+
+// TestPool_DependsOn_NoDeadlock verifies that a task waiting for a dependency
+// does NOT hold the per-agent slot, allowing the dependency to run first.
+func TestPool_DependsOn_NoDeadlock(t *testing.T) {
+ store := testStore(t)
+ runner := &mockRunner{} // succeeds immediately
+ pool := NewPool(2, map[string]Runner{"claude": runner}, store,
+ slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})))
+ pool.requeueDelay = 10 * time.Millisecond
+
+ // Task A has no deps; Task B depends on A.
+ taskA := makeTask("dep-a")
+ taskA.State = task.StateQueued
+ taskB := makeTask("dep-b")
+ taskB.DependsOn = []string{"dep-a"}
+ taskB.State = task.StateQueued
+
+ store.CreateTask(taskA)
+ store.CreateTask(taskB)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ // Submit B first — it should not deadlock by holding the slot while waiting for A.
+ pool.Submit(ctx, taskB)
+ pool.Submit(ctx, taskA)
+
+ var gotA, gotB bool
+ for i := 0; i < 2; i++ {
+ select {
+ case res := <-pool.Results():
+ if res.TaskID == "dep-a" {
+ gotA = true
+ }
+ if res.TaskID == "dep-b" {
+ gotB = true
+ }
+ case <-ctx.Done():
+ t.Fatal("timeout: likely deadlock — dep-b held the slot while waiting for dep-a")
+ }
+ }
+ if !gotA || !gotB {
+ t.Errorf("expected both tasks to complete: gotA=%v gotB=%v", gotA, gotB)
+ }
+
+ // B must complete after A.
+ ta, _ := store.GetTask("dep-a")
+ tb, _ := store.GetTask("dep-b")
+ if ta.State != task.StateReady && ta.State != task.StateCompleted {
+ t.Errorf("dep-a should be READY/COMPLETED, got %s", ta.State)
+ }
+ if tb.State != task.StateReady && tb.State != task.StateCompleted {
+ t.Errorf("dep-b should be READY/COMPLETED, got %s", tb.State)
+ }
+}
+
+func TestPool_ValidationTask_Fail_SetsNeedsFix(t *testing.T) {
+ store := testStore(t)
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger)
+
+ now := time.Now().UTC()
+ story := &task.Story{
+ ID: "story-val-fail-1",
+ Name: "Validation Fail",
+ Status: task.StoryValidating,
+ CreatedAt: now,
+ UpdatedAt: now,
+ }
+ if err := store.CreateStory(story); err != nil {
+ t.Fatalf("CreateStory: %v", err)
+ }
+
+ execErr := "smoke test failed: /health returned 503"
+ pool.checkValidationResult(context.Background(), story.ID, task.StateFailed, execErr)
+
+ got, err := store.GetStory(story.ID)
+ if err != nil {
+ t.Fatalf("GetStory: %v", err)
+ }
+ if got.Status != task.StoryNeedsFix {
+ t.Errorf("story status: want NEEDS_FIX, got %q", got.Status)
+ }
+}
+
+func TestPool_Shutdown_WaitsForWorkers(t *testing.T) {
+ store := testStore(t)
+ started := make(chan struct{})
+ unblock := make(chan struct{})
+ runner := &mockRunner{
+ onRun: func(t *task.Task, e *storage.Execution) error {
+ close(started)
+ <-unblock
+ return nil
+ },
+ }
+ pool := NewPool(1, map[string]Runner{"claude": runner}, store,
+ slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})))
+
+ tk := makeTask("shutdown-task")
+ tk.State = task.StateQueued
+ store.CreateTask(tk)
+ pool.Submit(context.Background(), tk)
+
+ // Wait until the worker has started.
+ select {
+ case <-started:
+ case <-time.After(5 * time.Second):
+ t.Fatal("worker did not start")
+ }
+
+ // Shutdown should block until we unblock the worker.
+ done := make(chan error, 1)
+ go func() {
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+ done <- pool.Shutdown(ctx)
+ }()
+
+ // Shutdown should not have returned yet.
+ select {
+ case err := <-done:
+ t.Fatalf("Shutdown returned early: %v", err)
+ case <-time.After(50 * time.Millisecond):
+ }
+
+ close(unblock) // let the worker finish
+
+ select {
+ case err := <-done:
+ if err != nil {
+ t.Errorf("Shutdown returned error: %v", err)
+ }
+ case <-time.After(5 * time.Second):
+ t.Fatal("Shutdown did not return after worker finished")
+ }
+}
+
+func TestPool_Shutdown_TimesOut(t *testing.T) {
+ store := testStore(t)
+ unblock := make(chan struct{})
+ runner := &mockRunner{
+ onRun: func(t *task.Task, e *storage.Execution) error {
+ <-unblock // never unblocked
+ return nil
+ },
+ }
+ pool := NewPool(1, map[string]Runner{"claude": runner}, store,
+ slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})))
+
+ tk := makeTask("shutdown-timeout-task")
+ tk.State = task.StateQueued
+ store.CreateTask(tk)
+ pool.Submit(context.Background(), tk)
+
+ // Give worker a moment to start.
+ time.Sleep(50 * time.Millisecond)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
+ defer cancel()
+ err := pool.Shutdown(ctx)
+ if err == nil {
+ t.Error("expected timeout error, got nil")
+ }
+ close(unblock) // cleanup
+}
+
+func TestPool_CheckerSpawned_OnReady(t *testing.T) {
+ store := testStore(t)
+ runner := &mockRunner{} // succeeds instantly
+ pool := NewPool(2, map[string]Runner{"claude": runner}, store, slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})))
+
+ tk := makeTask("checker-spawn-1")
+ tk.RepositoryURL = "https://github.com/x/y"
+ store.CreateTask(tk)
+ pool.Submit(context.Background(), tk)
+ <-pool.Results() // wait for original task to finish
+
+ // Poll until the async spawnCheckerTask goroutine has written the checker task.
+ var checker *task.Task
+ var err error
+ deadline := time.Now().Add(5 * time.Second)
+ for time.Now().Before(deadline) {
+ checker, err = store.GetCheckerTask("checker-spawn-1")
+ if err != nil {
+ t.Fatalf("GetCheckerTask: %v", err)
+ }
+ if checker != nil {
+ break
+ }
+ time.Sleep(50 * time.Millisecond)
+ }
+ if checker == nil {
+ t.Fatal("expected a checker task to be created, got nil")
+ }
+ if checker.CheckerForTaskID != "checker-spawn-1" {
+ t.Errorf("expected CheckerForTaskID=checker-spawn-1, got %q", checker.CheckerForTaskID)
+ }
+}
+
+func TestPool_CheckerNotSpawned_ForSubtask(t *testing.T) {
+ store := testStore(t)
+ runner := &mockRunner{}
+ pool := NewPool(2, map[string]Runner{"claude": runner}, store, slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})))
+
+ parent := makeTask("no-checker-parent")
+ parent.RepositoryURL = "https://github.com/x/y"
+ store.CreateTask(parent)
+
+ sub := makeTask("no-checker-sub")
+ sub.ParentTaskID = "no-checker-parent"
+ sub.RepositoryURL = "https://github.com/x/y"
+ store.CreateTask(sub)
+
+ pool.Submit(context.Background(), sub)
+ <-pool.Results()
+
+ time.Sleep(100 * time.Millisecond)
+
+ checker, err := store.GetCheckerTask("no-checker-sub")
+ if err != nil {
+ t.Fatalf("GetCheckerTask: %v", err)
+ }
+ if checker != nil {
+ t.Error("expected no checker for subtask, but one was created")
+ }
+}
+
+func TestPool_CheckerPass_AutoAcceptsTask(t *testing.T) {
+ store := testStore(t)
+ // Two-phase: first runner succeeds (original task), second also succeeds (checker).
+ runner := &mockRunner{
+ onRun: func(t *task.Task, e *storage.Execution) error {
+ return nil // both original and checker succeed
+ },
+ }
+ pool := NewPool(2, map[string]Runner{"claude": runner}, store, slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})))
+
+ tk := makeTask("autoaccept-1")
+ tk.RepositoryURL = "https://github.com/x/y"
+ store.CreateTask(tk)
+ pool.Submit(context.Background(), tk)
+ <-pool.Results() // original finishes → READY + checker spawned
+
+ // Wait for checker to run and complete.
+ deadline := time.Now().Add(5 * time.Second)
+ for time.Now().Before(deadline) {
+ got, _ := store.GetTask("autoaccept-1")
+ if got != nil && got.State == task.StateCompleted {
+ break
+ }
+ <-pool.Results()
+ }
+
+ got, err := store.GetTask("autoaccept-1")
+ if err != nil {
+ t.Fatalf("GetTask: %v", err)
+ }
+ if got.State != task.StateCompleted {
+ t.Errorf("expected COMPLETED after checker pass, got %s", got.State)
+ }
+}
+
+func TestPool_CheckerFail_AttachesReport(t *testing.T) {
+ store := testStore(t)
+ runner := &mockRunner{
+ onRun: func(t *task.Task, e *storage.Execution) error {
+ if t.CheckerForTaskID != "" {
+ return fmt.Errorf("test suite failed: 3 failures")
+ }
+ return nil // original task succeeds
+ },
+ }
+ pool := NewPool(2, map[string]Runner{"claude": runner}, store, slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})))
+
+ tk := makeTask("fail-checker-1")
+ tk.RepositoryURL = "https://github.com/x/y"
+ store.CreateTask(tk)
+ pool.Submit(context.Background(), tk)
+ <-pool.Results() // original → READY
+
+ // Wait for checker to fail.
+ deadline := time.Now().Add(5 * time.Second)
+ for time.Now().Before(deadline) {
+ got, _ := store.GetTask("fail-checker-1")
+ if got != nil && got.CheckerReport != "" {
+ break
+ }
+ select {
+ case <-pool.Results():
+ case <-time.After(100 * time.Millisecond):
+ }
+ }
+
+ got, err := store.GetTask("fail-checker-1")
+ if err != nil {
+ t.Fatalf("GetTask: %v", err)
+ }
+ if got.State != task.StateReady {
+ t.Errorf("expected task to stay READY after checker fail, got %s", got.State)
+ }
+ if got.CheckerReport == "" {
+ t.Error("expected checker_report to be set after checker failure")
+ }
+}