package executor import ( "context" "fmt" "log/slog" "os" "os/exec" "path/filepath" "strings" "sync" "testing" "time" "github.com/thepeterstone/claudomator/internal/storage" "github.com/thepeterstone/claudomator/internal/task" ) // capturingHandler is a slog.Handler that records log records for assertions. type capturingHandler struct { mu sync.Mutex records []slog.Record } func (h *capturingHandler) Enabled(_ context.Context, _ slog.Level) bool { return true } func (h *capturingHandler) Handle(_ context.Context, r slog.Record) error { h.mu.Lock() defer h.mu.Unlock() h.records = append(h.records, r) return nil } func (h *capturingHandler) WithAttrs(attrs []slog.Attr) slog.Handler { return h } func (h *capturingHandler) WithGroup(name string) slog.Handler { return h } func (h *capturingHandler) hasMessageContaining(substr string) bool { h.mu.Lock() defer h.mu.Unlock() for _, r := range h.records { if strings.Contains(r.Message, substr) { return true } } return false } // failingStore wraps a real DB but returns an error for UpdateTaskState calls. type failingStore struct { *storage.DB updateStateErr error } func (f *failingStore) UpdateTaskState(id string, newState task.State) error { return f.updateStateErr } // mockRunner implements Runner for testing. type mockRunner struct { mu sync.Mutex calls int delay time.Duration err error exitCode int onRun func(*task.Task, *storage.Execution) error } func (m *mockRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error { m.mu.Lock() m.calls++ cb := m.onRun m.mu.Unlock() if m.delay > 0 { select { case <-time.After(m.delay): case <-ctx.Done(): return ctx.Err() } } if cb != nil { return cb(t, e) } if m.err != nil { e.ExitCode = m.exitCode return m.err } return nil } func (m *mockRunner) callCount() int { m.mu.Lock() defer m.mu.Unlock() return m.calls } func testStore(t *testing.T) *storage.DB { t.Helper() dbPath := filepath.Join(t.TempDir(), "test.db") db, err := storage.Open(dbPath) if err != nil { t.Fatal(err) } t.Cleanup(func() { db.Close() }) return db } func makeTask(id string) *task.Task { now := time.Now().UTC() return &task.Task{ ID: id, Name: "Test " + id, Agent: task.AgentConfig{Type: "claude", Instructions: "test"}, Priority: task.PriorityNormal, Retry: task.RetryConfig{MaxAttempts: 1, Backoff: "linear"}, Tags: []string{}, DependsOn: []string{}, State: task.StateQueued, CreatedAt: now, UpdatedAt: now, } } func TestPickAgent_PrefersLessActiveAgent(t *testing.T) { status := SystemStatus{ ActiveTasks: map[string]int{"claude": 3, "gemini": 1}, RateLimited: map[string]bool{"claude": false, "gemini": false}, } if got := pickAgent(status); got != "gemini" { t.Errorf("expected gemini (fewer active tasks), got %s", got) } } func TestPickAgent_SkipsRateLimitedAgent(t *testing.T) { status := SystemStatus{ ActiveTasks: map[string]int{"claude": 0, "gemini": 5}, RateLimited: map[string]bool{"claude": true, "gemini": false}, } if got := pickAgent(status); got != "gemini" { t.Errorf("expected gemini (claude rate limited), got %s", got) } } func TestPickAgent_FallsBackWhenAllRateLimited(t *testing.T) { status := SystemStatus{ ActiveTasks: map[string]int{"claude": 2, "gemini": 5}, RateLimited: map[string]bool{"claude": true, "gemini": true}, } // Falls back to least active regardless of rate limit. if got := pickAgent(status); got != "claude" { t.Errorf("expected claude (fewer active tasks among all), got %s", got) } } func TestPickAgent_TieBreakPrefersFirstAlpha(t *testing.T) { status := SystemStatus{ ActiveTasks: map[string]int{"claude": 2, "gemini": 2}, RateLimited: map[string]bool{"claude": false, "gemini": false}, } got := pickAgent(status) if got != "claude" && got != "gemini" { t.Errorf("unexpected agent %q on tie", got) } } func TestPool_Submit_TopLevel_GoesToReady(t *testing.T) { store := testStore(t) runner := &mockRunner{} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("ps-1") // no ParentTaskID → top-level store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } result := <-pool.Results() if result.Err != nil { t.Errorf("expected no error, got: %v", result.Err) } if result.Execution.Status != "READY" { t.Errorf("status: want READY, got %q", result.Execution.Status) } got, _ := store.GetTask("ps-1") if got.State != task.StateReady { t.Errorf("task state: want READY, got %v", got.State) } } func TestPool_Submit_Subtask_GoesToCompleted(t *testing.T) { store := testStore(t) runner := &mockRunner{} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("sub-1") tk.ParentTaskID = "parent-99" // subtask store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } result := <-pool.Results() if result.Err != nil { t.Errorf("expected no error, got: %v", result.Err) } if result.Execution.Status != "COMPLETED" { t.Errorf("status: want COMPLETED, got %q", result.Execution.Status) } got, _ := store.GetTask("sub-1") if got.State != task.StateCompleted { t.Errorf("task state: want COMPLETED, got %v", got.State) } } func TestPool_Submit_Failure(t *testing.T) { store := testStore(t) runner := &mockRunner{err: fmt.Errorf("boom"), exitCode: 1} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("pf-1") store.CreateTask(tk) pool.Submit(context.Background(), tk) result := <-pool.Results() if result.Err == nil { t.Fatal("expected error") } if result.Execution.Status != "FAILED" { t.Errorf("status: want FAILED, got %q", result.Execution.Status) } } // TestPool_UpdateTaskState_DBError_IsLoggedAndResultDelivered verifies that // when UpdateTaskState returns an error, the error is logged with structured // context (taskID, state) and the execution result is still sent to resultCh. func TestPool_UpdateTaskState_DBError_IsLoggedAndResultDelivered(t *testing.T) { db := testStore(t) store := &failingStore{DB: db, updateStateErr: fmt.Errorf("db write failed")} handler := &capturingHandler{} logger := slog.New(handler) runner := &mockRunner{err: fmt.Errorf("runner error")} runners := map[string]Runner{"claude": runner} pool := NewPool(2, runners, store, logger) tk := makeTask("dberr-1") db.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } select { case result := <-pool.Results(): // Result must still arrive despite the DB error. if result == nil { t.Fatal("expected non-nil result") } if result.TaskID != tk.ID { t.Errorf("taskID: want %q, got %q", tk.ID, result.TaskID) } case <-time.After(5 * time.Second): t.Fatal("timed out waiting for result — result not delivered despite DB error") } if !handler.hasMessageContaining("failed to update task state") { t.Error("expected 'failed to update task state' log entry, but none found") } } func TestPool_Submit_Timeout(t *testing.T) { store := testStore(t) runner := &mockRunner{delay: 5 * time.Second} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("pt-1") tk.Timeout.Duration = 50 * time.Millisecond store.CreateTask(tk) pool.Submit(context.Background(), tk) result := <-pool.Results() if result.Execution.Status != "TIMED_OUT" { t.Errorf("status: want TIMED_OUT, got %q", result.Execution.Status) } } func TestPool_Submit_Cancellation(t *testing.T) { store := testStore(t) runner := &mockRunner{delay: 5 * time.Second} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) ctx, cancel := context.WithCancel(context.Background()) tk := makeTask("pc-1") store.CreateTask(tk) pool.Submit(ctx, tk) time.Sleep(20 * time.Millisecond) cancel() result := <-pool.Results() if result.Execution.Status != "CANCELLED" { t.Errorf("status: want CANCELLED, got %q", result.Execution.Status) } } func TestPool_Cancel_StopsRunningTask(t *testing.T) { store := testStore(t) runner := &mockRunner{delay: 5 * time.Second} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("cancel-1") store.CreateTask(tk) pool.Submit(context.Background(), tk) time.Sleep(20 * time.Millisecond) // let goroutine start if ok := pool.Cancel("cancel-1"); !ok { t.Fatal("Cancel returned false for a running task") } result := <-pool.Results() if result.Execution.Status != "CANCELLED" { t.Errorf("status: want CANCELLED, got %q", result.Execution.Status) } } func TestPool_Cancel_UnknownTask_ReturnsFalse(t *testing.T) { store := testStore(t) runner := &mockRunner{} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) if ok := pool.Cancel("nonexistent"); ok { t.Error("Cancel returned true for unknown task") } } // TestPool_QueuedWhenAtCapacity verifies that Submit enqueues a task rather than // returning an error when the pool is at capacity. Both tasks should eventually complete. func TestPool_QueuedWhenAtCapacity(t *testing.T) { store := testStore(t) runner := &mockRunner{delay: 100 * time.Millisecond} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(1, runners, store, logger) tk1 := makeTask("queue-1") store.CreateTask(tk1) if err := pool.Submit(context.Background(), tk1); err != nil { t.Fatalf("first submit: %v", err) } // Second submit must succeed (queued) even though pool slot is taken. tk2 := makeTask("queue-2") store.CreateTask(tk2) if err := pool.Submit(context.Background(), tk2); err != nil { t.Fatalf("second submit: %v — expected task to be queued, not rejected", err) } // Both tasks must complete. for i := 0; i < 2; i++ { r := <-pool.Results() if r.Err != nil { t.Errorf("task %s error: %v", r.TaskID, r.Err) } } } // logPatherMockRunner is a mockRunner that also implements LogPather, // and captures the StdoutPath seen when Run() is called. type logPatherMockRunner struct { mockRunner logDir string capturedPath string } func (m *logPatherMockRunner) ExecLogDir(execID string) string { return filepath.Join(m.logDir, execID) } func (m *logPatherMockRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error { m.mu.Lock() m.capturedPath = e.StdoutPath m.mu.Unlock() return m.mockRunner.Run(ctx, t, e) } // TestPool_Execute_LogPathsPreSetBeforeRun verifies that when the runner // implements LogPather, log paths are set on the execution before Run() is // called — so they land in the DB at CreateExecution time, not just at // UpdateExecution time. func TestPool_Execute_LogPathsPreSetBeforeRun(t *testing.T) { store := testStore(t) runner := &logPatherMockRunner{logDir: t.TempDir()} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("lp-1") store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } result := <-pool.Results() runner.mu.Lock() captured := runner.capturedPath runner.mu.Unlock() if captured == "" { t.Fatal("StdoutPath was empty when Run() was called; expected pre-set path") } if !strings.HasSuffix(captured, "stdout.log") { t.Errorf("expected stdout.log suffix, got: %s", captured) } // Path in the returned execution record should match. if result.Execution.StdoutPath != captured { t.Errorf("execution StdoutPath %q != captured %q", result.Execution.StdoutPath, captured) } } // TestPool_Execute_NoLogPather_PathsEmptyBeforeRun verifies that a runner // without LogPather doesn't panic and paths remain empty until Run() sets them. func TestPool_Execute_NoLogPather_PathsEmptyBeforeRun(t *testing.T) { store := testStore(t) runner := &mockRunner{} // does NOT implement LogPather runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("nolp-1") store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } result := <-pool.Results() if result.Err != nil { t.Fatalf("unexpected error: %v", result.Err) } } func TestPool_ConcurrentExecution(t *testing.T) { store := testStore(t) runner := &mockRunner{delay: 50 * time.Millisecond} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(3, runners, store, logger) for i := 0; i < 3; i++ { tk := makeTask(fmt.Sprintf("cc-%d", i)) store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit %d: %v", i, err) } } for i := 0; i < 3; i++ { result := <-pool.Results() if result.Execution.Status != "READY" { t.Errorf("task %s: want READY, got %q", result.TaskID, result.Execution.Status) } } if runner.callCount() != 3 { t.Errorf("calls: want 3, got %d", runner.callCount()) } } func TestWithFailureHistory_NoFailures_ReturnsUnchanged(t *testing.T) { tk := makeTask("no-fail") result := withFailureHistory(tk, nil, nil) if result != tk { t.Error("expected same pointer when no prior executions") } } func TestWithFailureHistory_WithError_ReturnsUnchanged(t *testing.T) { tk := makeTask("err-case") result := withFailureHistory(tk, nil, fmt.Errorf("db error")) if result != tk { t.Error("expected same pointer when ListExecutions errors") } } func TestWithFailureHistory_InjectsFailedHistory(t *testing.T) { tk := makeTask("with-fail") tk.Agent.Instructions = "do the work" execs := []*storage.Execution{ {ID: "e1", Status: "FAILED", ErrorMsg: "sandbox: uncommitted changes", StartTime: time.Now()}, {ID: "e2", Status: "COMPLETED", ErrorMsg: "", StartTime: time.Now()}, // not a failure, should be ignored } result := withFailureHistory(tk, execs, nil) if result == tk { t.Fatal("expected a new task copy, got same pointer") } if !strings.Contains(result.Agent.SystemPromptAppend, "Prior Attempt History") { t.Errorf("expected history header in SystemPromptAppend, got: %q", result.Agent.SystemPromptAppend) } if !strings.Contains(result.Agent.SystemPromptAppend, "sandbox: uncommitted changes") { t.Errorf("expected error message in SystemPromptAppend, got: %q", result.Agent.SystemPromptAppend) } // COMPLETED execution should not appear if strings.Contains(result.Agent.SystemPromptAppend, "e2") { t.Errorf("COMPLETED execution should not appear in history") } // Original instructions unchanged if result.Agent.Instructions != "do the work" { t.Errorf("instructions should be unchanged, got: %q", result.Agent.Instructions) } } func TestWithFailureHistory_PreservesExistingSystemPrompt(t *testing.T) { tk := makeTask("with-prompt") tk.Agent.SystemPromptAppend = "existing prompt" execs := []*storage.Execution{ {ID: "e1", Status: "FAILED", ErrorMsg: "some error", StartTime: time.Now()}, } result := withFailureHistory(tk, execs, nil) if !strings.Contains(result.Agent.SystemPromptAppend, "Prior Attempt History") { t.Error("expected history prepended") } if !strings.Contains(result.Agent.SystemPromptAppend, "existing prompt") { t.Error("expected existing prompt preserved") } // History must come BEFORE the existing prompt histIdx := strings.Index(result.Agent.SystemPromptAppend, "Prior Attempt History") existIdx := strings.Index(result.Agent.SystemPromptAppend, "existing prompt") if histIdx > existIdx { t.Error("history should be prepended before existing system prompt") } } func TestPool_FailureHistoryInjectedOnRetry(t *testing.T) { store := testStore(t) var capturedPrompt string runner := &mockRunner{} runner.onRun = func(t *task.Task, _ *storage.Execution) error { capturedPrompt = t.Agent.SystemPromptAppend return nil } runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("retry-hist") store.CreateTask(tk) // Simulate a prior failed execution. store.CreateExecution(&storage.Execution{ ID: "prior-exec", TaskID: tk.ID, StartTime: time.Now(), EndTime: time.Now(), Status: "FAILED", ErrorMsg: "prior failure reason", }) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } <-pool.Results() if !strings.Contains(capturedPrompt, "prior failure reason") { t.Errorf("expected prior failure in system prompt, got: %q", capturedPrompt) } } func TestPool_RecoverStaleRunning(t *testing.T) { store := testStore(t) logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger) // Create a task already in RUNNING state (simulating a crashed server). tk := makeTask("stale-1") tk.State = task.StateRunning store.CreateTask(tk) // Add an open execution record (no end time, status RUNNING). store.CreateExecution(&storage.Execution{ ID: "exec-stale-1", TaskID: tk.ID, StartTime: time.Now().Add(-5 * time.Minute), Status: "RUNNING", }) pool.RecoverStaleRunning(context.Background()) // Execution record should be closed as FAILED. execs, _ := store.ListExecutions(tk.ID) var failedExec *storage.Execution for _, e := range execs { if e.ID == "exec-stale-1" { failedExec = e break } } if failedExec == nil || failedExec.Status != "FAILED" { t.Errorf("execution status: want FAILED, got %+v", execs) } if failedExec.ErrorMsg == "" { t.Error("expected non-empty error message on recovered execution") } // Task should be re-queued for retry and complete. select { case result := <-pool.Results(): if result.TaskID != tk.ID { t.Errorf("unexpected task in results: %s", result.TaskID) } case <-time.After(2 * time.Second): t.Fatal("timed out waiting for stale RUNNING task to be re-queued and run") } recovered, err := store.GetTask(tk.ID) if err != nil { t.Fatalf("get task: %v", err) } // Top-level tasks (no parent) go to READY after a successful run. if recovered.State != task.StateReady { t.Errorf("state after re-queue: want READY, got %q", recovered.State) } } func TestPool_RecoverStaleQueued_ResubmitsToPool(t *testing.T) { store := testStore(t) runner := &mockRunner{} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, map[string]Runner{"claude": runner}, store, logger) // Create a task already in QUEUED state (persisted from before a server restart). tk := makeTask("stale-queued-1") tk.State = task.StateQueued store.CreateTask(tk) pool.RecoverStaleQueued(context.Background()) // Wait for the pool to pick it up and complete it. select { case result := <-pool.Results(): if result.TaskID != tk.ID { t.Errorf("unexpected task in results: %s", result.TaskID) } case <-time.After(2 * time.Second): t.Fatal("timed out waiting for stale QUEUED task to complete") } got, err := store.GetTask(tk.ID) if err != nil { t.Fatalf("get task: %v", err) } if got.State != task.StateCompleted && got.State != task.StateReady { t.Errorf("state: want COMPLETED or READY, got %q", got.State) } if runner.callCount() != 1 { t.Errorf("runner call count: want 1, got %d", runner.callCount()) } } func TestPool_RecoverStaleQueued_SkipsNonQueuedTasks(t *testing.T) { store := testStore(t) runner := &mockRunner{} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, map[string]Runner{"claude": runner}, store, logger) // PENDING task should NOT be resubmitted. tk := makeTask("pending-1") tk.State = task.StatePending store.CreateTask(tk) pool.RecoverStaleQueued(context.Background()) time.Sleep(50 * time.Millisecond) if runner.callCount() != 0 { t.Errorf("runner should not have been called for PENDING task, got %d calls", runner.callCount()) } } func TestPool_RecoverStaleBlocked_UnblocksWhenAllSubtasksCompleted(t *testing.T) { store := testStore(t) logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger) // Parent task stuck in BLOCKED state (server restarted after subtasks completed). parent := makeTask("parent-stale-blocked") parent.State = task.StateBlocked store.CreateTask(parent) // All subtasks completed. for i := 0; i < 3; i++ { sub := makeTask(fmt.Sprintf("sub-%d", i)) sub.ParentTaskID = parent.ID sub.State = task.StateCompleted store.CreateTask(sub) } pool.RecoverStaleBlocked() got, err := store.GetTask(parent.ID) if err != nil { t.Fatalf("get task: %v", err) } if got.State != task.StateReady { t.Errorf("parent state: want READY, got %q", got.State) } } func TestPool_RecoverStaleBlocked_KeepsBlockedWhenSubtaskIncomplete(t *testing.T) { store := testStore(t) logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger) parent := makeTask("parent-still-blocked") parent.State = task.StateBlocked store.CreateTask(parent) sub1 := makeTask("sub-done") sub1.ParentTaskID = parent.ID sub1.State = task.StateCompleted store.CreateTask(sub1) sub2 := makeTask("sub-running") sub2.ParentTaskID = parent.ID sub2.State = task.StateRunning store.CreateTask(sub2) pool.RecoverStaleBlocked() got, err := store.GetTask(parent.ID) if err != nil { t.Fatalf("get task: %v", err) } if got.State != task.StateBlocked { t.Errorf("parent state: want BLOCKED, got %q", got.State) } } func TestPool_ActivePerAgent_DeletesZeroEntries(t *testing.T) { store := testStore(t) runner := &mockRunner{} runners := map[string]Runner{"claude": runner, "gemini": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("apa-1") store.CreateTask(tk) // Agent.Type = "claude" pool.Submit(context.Background(), tk) <-pool.Results() pool.mu.Lock() _, exists := pool.activePerAgent["claude"] pool.mu.Unlock() if exists { t.Error("activePerAgent should not have a zero-count entry for claude after task completes") } } func TestPool_RateLimited_StaleEntryCleaned(t *testing.T) { store := testStore(t) runner := &mockRunner{} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) // Inject a stale rate-limit entry (deadline already passed). pool.mu.Lock() pool.rateLimited["claude"] = time.Now().Add(-1 * time.Minute) pool.mu.Unlock() // Submit a task — the execute() path reads rateLimited during classification. tk := makeTask("rl-stale-1") store.CreateTask(tk) pool.Submit(context.Background(), tk) <-pool.Results() pool.mu.Lock() _, exists := pool.rateLimited["claude"] pool.mu.Unlock() if exists { t.Error("stale rate-limit entry should be deleted after deadline passes") } } // TestPool_Submit_TopLevel_NoSubtasks_GoesReady verifies that a top-level task // with no subtasks still transitions to READY after successful execution. func TestPool_Submit_TopLevel_NoSubtasks_GoesReady(t *testing.T) { store := testStore(t) runner := &mockRunner{} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("no-subtasks-1") // no ParentTaskID, no subtasks store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } result := <-pool.Results() if result.Err != nil { t.Errorf("expected no error, got: %v", result.Err) } if result.Execution.Status != "READY" { t.Errorf("status: want READY, got %q", result.Execution.Status) } got, _ := store.GetTask(tk.ID) if got.State != task.StateReady { t.Errorf("task state: want READY, got %v", got.State) } } // TestPool_Submit_TopLevel_WithSubtasks_GoesBlocked verifies that when a // top-level task finishes successfully but has subtasks, it transitions to // BLOCKED (waiting for subtasks) rather than READY. func TestPool_Submit_TopLevel_WithSubtasks_GoesBlocked(t *testing.T) { store := testStore(t) runner := &mockRunner{} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) parent := makeTask("parent-with-subtasks") store.CreateTask(parent) // Create a subtask in the store but do NOT submit it. sub := makeTask("sub-of-parent") sub.ParentTaskID = parent.ID store.CreateTask(sub) if err := pool.Submit(context.Background(), parent); err != nil { t.Fatalf("submit: %v", err) } result := <-pool.Results() if result.Err != nil { t.Errorf("expected no error, got: %v", result.Err) } if result.Execution.Status != "BLOCKED" { t.Errorf("status: want BLOCKED, got %q", result.Execution.Status) } got, _ := store.GetTask(parent.ID) if got.State != task.StateBlocked { t.Errorf("task state: want BLOCKED, got %v", got.State) } } // TestPool_Submit_LastSubtask_UnblocksParent verifies that when the last // remaining subtask completes, the parent task transitions from BLOCKED to READY. func TestPool_Submit_LastSubtask_UnblocksParent(t *testing.T) { store := testStore(t) runner := &mockRunner{} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) // Parent starts BLOCKED (waiting for subtasks). parent := makeTask("unblock-parent-1") parent.State = task.StateBlocked store.CreateTask(parent) // First subtask already completed. sub1 := makeTask("unblock-sub-1a") sub1.ParentTaskID = parent.ID sub1.State = task.StateCompleted store.CreateTask(sub1) // Second (last) subtask — the one we submit. sub2 := makeTask("unblock-sub-1b") sub2.ParentTaskID = parent.ID store.CreateTask(sub2) if err := pool.Submit(context.Background(), sub2); err != nil { t.Fatalf("submit: %v", err) } result := <-pool.Results() if result.Err != nil { t.Errorf("expected no error, got: %v", result.Err) } if result.Execution.Status != "COMPLETED" { t.Errorf("subtask status: want COMPLETED, got %q", result.Execution.Status) } // Parent must now be READY. got, err := store.GetTask(parent.ID) if err != nil { t.Fatalf("get parent: %v", err) } if got.State != task.StateReady { t.Errorf("parent state: want READY, got %v", got.State) } } // TestPool_Submit_NotLastSubtask_ParentStaysBlocked verifies that when a subtask // completes but another sibling subtask is still running, the parent stays BLOCKED. func TestPool_Submit_NotLastSubtask_ParentStaysBlocked(t *testing.T) { store := testStore(t) runner := &mockRunner{} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) parent := makeTask("blocked-parent-2") parent.State = task.StateBlocked store.CreateTask(parent) // First subtask still RUNNING — not done yet. sub1 := makeTask("blocked-sub-2a") sub1.ParentTaskID = parent.ID sub1.State = task.StateRunning store.CreateTask(sub1) // Second subtask — the one we submit. sub2 := makeTask("blocked-sub-2b") sub2.ParentTaskID = parent.ID store.CreateTask(sub2) if err := pool.Submit(context.Background(), sub2); err != nil { t.Fatalf("submit: %v", err) } <-pool.Results() // Parent must remain BLOCKED because sub1 is still RUNNING. got, err := store.GetTask(parent.ID) if err != nil { t.Fatalf("get parent: %v", err) } if got.State != task.StateBlocked { t.Errorf("parent state: want BLOCKED, got %v", got.State) } } // TestPool_Submit_ParentNotBlocked_NoTransition verifies that completing a subtask // does not change the parent's state when the parent is not BLOCKED. func TestPool_Submit_ParentNotBlocked_NoTransition(t *testing.T) { store := testStore(t) runner := &mockRunner{} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) // Parent is already READY (not BLOCKED). parent := makeTask("ready-parent-3") parent.State = task.StateReady store.CreateTask(parent) sub1 := makeTask("ready-sub-3a") sub1.ParentTaskID = parent.ID store.CreateTask(sub1) if err := pool.Submit(context.Background(), sub1); err != nil { t.Fatalf("submit: %v", err) } <-pool.Results() // Parent must remain READY — no spurious state transition. got, err := store.GetTask(parent.ID) if err != nil { t.Fatalf("get parent: %v", err) } if got.State != task.StateReady { t.Errorf("parent state: want READY, got %v", got.State) } } // minimalMockStore is a standalone Store implementation for unit-testing Pool // methods that do not require a real SQLite database. type minimalMockStore struct { mu sync.Mutex tasks map[string]*task.Task executions map[string]*storage.Execution stateUpdates []struct{ id string; state task.State } questionUpdates []string changestatCalls []struct { execID string stats *task.Changestats } subtasksFunc func(parentID string) ([]*task.Task, error) updateExecErr error updateStateErr error } func newMinimalMockStore() *minimalMockStore { return &minimalMockStore{ tasks: make(map[string]*task.Task), executions: make(map[string]*storage.Execution), } } func (m *minimalMockStore) GetTask(id string) (*task.Task, error) { m.mu.Lock() defer m.mu.Unlock() t, ok := m.tasks[id] if !ok { return nil, fmt.Errorf("task %q not found", id) } return t, nil } func (m *minimalMockStore) ListTasks(_ storage.TaskFilter) ([]*task.Task, error) { return nil, nil } func (m *minimalMockStore) ListSubtasks(parentID string) ([]*task.Task, error) { if m.subtasksFunc != nil { return m.subtasksFunc(parentID) } return nil, nil } func (m *minimalMockStore) ListExecutions(_ string) ([]*storage.Execution, error) { return nil, nil } func (m *minimalMockStore) CreateExecution(e *storage.Execution) error { return nil } func (m *minimalMockStore) UpdateExecution(e *storage.Execution) error { return m.updateExecErr } func (m *minimalMockStore) UpdateTaskState(id string, newState task.State) error { if m.updateStateErr != nil { return m.updateStateErr } m.mu.Lock() m.stateUpdates = append(m.stateUpdates, struct{ id string; state task.State }{id, newState}) if t, ok := m.tasks[id]; ok { t.State = newState } m.mu.Unlock() return nil } func (m *minimalMockStore) UpdateTaskQuestion(taskID, questionJSON string) error { m.mu.Lock() m.questionUpdates = append(m.questionUpdates, questionJSON) m.mu.Unlock() return nil } func (m *minimalMockStore) UpdateTaskSummary(taskID, summary string) error { return nil } func (m *minimalMockStore) AppendTaskInteraction(taskID string, _ task.Interaction) error { return nil } func (m *minimalMockStore) UpdateTaskAgent(id string, agent task.AgentConfig) error { return nil } func (m *minimalMockStore) UpdateExecutionChangestats(execID string, stats *task.Changestats) error { m.mu.Lock() m.changestatCalls = append(m.changestatCalls, struct { execID string stats *task.Changestats }{execID, stats}) m.mu.Unlock() return nil } func (m *minimalMockStore) RecordAgentEvent(_ storage.AgentEvent) error { return nil } func (m *minimalMockStore) GetProject(_ string) (*task.Project, error) { return nil, nil } func (m *minimalMockStore) GetStory(_ string) (*task.Story, error) { return nil, nil } func (m *minimalMockStore) ListTasksByStory(_ string) ([]*task.Task, error) { return nil, nil } func (m *minimalMockStore) UpdateStoryStatus(_ string, _ task.StoryState) error { return nil } func (m *minimalMockStore) CreateTask(_ *task.Task) error { return nil } func (m *minimalMockStore) lastStateUpdate() (string, task.State, bool) { m.mu.Lock() defer m.mu.Unlock() if len(m.stateUpdates) == 0 { return "", "", false } u := m.stateUpdates[len(m.stateUpdates)-1] return u.id, u.state, true } func newPoolWithMockStore(store Store) *Pool { logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) return &Pool{ maxConcurrent: 2, maxPerAgent: 1, runners: map[string]Runner{"claude": &mockRunner{}}, store: store, logger: logger, activePerAgent: make(map[string]int), rateLimited: make(map[string]time.Time), cancels: make(map[string]context.CancelFunc), consecutiveFailures: make(map[string]int), drained: make(map[string]bool), resultCh: make(chan *Result, 4), workCh: make(chan workItem, 4), doneCh: make(chan struct{}, 2), Questions: NewQuestionRegistry(), } } // TestHandleRunResult_SharedPath verifies that handleRunResult correctly // classifies runner errors and transitions task state via the store. func TestHandleRunResult_SharedPath(t *testing.T) { t.Run("generic error sets FAILED", func(t *testing.T) { store := newMinimalMockStore() pool := newPoolWithMockStore(store) tk := makeTask("hrr-fail") store.tasks[tk.ID] = tk exec := &storage.Execution{ID: "e1", TaskID: tk.ID, Status: "RUNNING"} ctx := context.Background() pool.handleRunResult(ctx, tk, exec, fmt.Errorf("something broke"), "claude") if exec.Status != "FAILED" { t.Errorf("exec.Status: want FAILED, got %q", exec.Status) } if exec.ErrorMsg != "something broke" { t.Errorf("exec.ErrorMsg: want %q, got %q", "something broke", exec.ErrorMsg) } _, state, ok := store.lastStateUpdate() if !ok || state != task.StateFailed { t.Errorf("expected UpdateTaskState(FAILED), got state=%v ok=%v", state, ok) } result := <-pool.resultCh if result.Err == nil || result.Execution.Status != "FAILED" { t.Errorf("unexpected result: %+v", result) } }) t.Run("nil error top-level no subtasks sets READY", func(t *testing.T) { store := newMinimalMockStore() pool := newPoolWithMockStore(store) tk := makeTask("hrr-ready") store.tasks[tk.ID] = tk exec := &storage.Execution{ID: "e2", TaskID: tk.ID, Status: "RUNNING"} ctx := context.Background() pool.handleRunResult(ctx, tk, exec, nil, "claude") if exec.Status != "READY" { t.Errorf("exec.Status: want READY, got %q", exec.Status) } _, state, ok := store.lastStateUpdate() if !ok || state != task.StateReady { t.Errorf("expected UpdateTaskState(READY), got state=%v ok=%v", state, ok) } result := <-pool.resultCh if result.Err != nil || result.Execution.Status != "READY" { t.Errorf("unexpected result: %+v", result) } }) t.Run("nil error subtask sets COMPLETED", func(t *testing.T) { store := newMinimalMockStore() pool := newPoolWithMockStore(store) parent := makeTask("hrr-parent") parent.State = task.StateBlocked store.tasks[parent.ID] = parent tk := makeTask("hrr-sub") tk.ParentTaskID = parent.ID store.tasks[tk.ID] = tk exec := &storage.Execution{ID: "e3", TaskID: tk.ID, Status: "RUNNING"} ctx := context.Background() pool.handleRunResult(ctx, tk, exec, nil, "claude") if exec.Status != "COMPLETED" { t.Errorf("exec.Status: want COMPLETED, got %q", exec.Status) } result := <-pool.resultCh if result.Err != nil || result.Execution.Status != "COMPLETED" { t.Errorf("unexpected result: %+v", result) } }) t.Run("timeout sets TIMED_OUT", func(t *testing.T) { store := newMinimalMockStore() pool := newPoolWithMockStore(store) tk := makeTask("hrr-timeout") store.tasks[tk.ID] = tk exec := &storage.Execution{ID: "e4", TaskID: tk.ID, Status: "RUNNING"} ctx, cancel := context.WithCancel(context.Background()) cancel() // make ctx.Err() == context.Canceled // Simulate deadline exceeded by using a deadline-exceeded context. dctx, dcancel := context.WithDeadline(context.Background(), time.Now().Add(-time.Second)) defer dcancel() pool.handleRunResult(dctx, tk, exec, context.DeadlineExceeded, "claude") if exec.Status != "TIMED_OUT" { t.Errorf("exec.Status: want TIMED_OUT, got %q", exec.Status) } _ = ctx <-pool.resultCh }) } // TestPool_LoadBalancing_OverridesAgentType verifies that load balancing picks // from registered runners, overriding any pre-set Agent.Type on the task. func TestPool_LoadBalancing_OverridesAgentType(t *testing.T) { store := testStore(t) runner := &mockRunner{} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) // Task has a non-existent agent type; load balancing should route to "claude". tk := makeTask("lb-override") tk.Agent.Type = "super-ai" store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } result := <-pool.Results() if result.Err != nil { t.Fatalf("expected success (load balancing overrides agent type), got: %v", result.Err) } if runner.callCount() != 1 { t.Errorf("expected claude runner to be called once, got %d", runner.callCount()) } } // TestPool_SpecificAgent_SkipsLoadBalancing verifies that if a specific // registered agent is requested (claude or gemini), it is used directly // and load balancing (pickAgent) is skipped. func TestPool_SpecificAgent_SkipsLoadBalancing(t *testing.T) { store := testStore(t) claudeRunner := &mockRunner{} geminiRunner := &mockRunner{} runners := map[string]Runner{ "claude": claudeRunner, "gemini": geminiRunner, } logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(4, runners, store, logger) // Raise per-agent limit so the concurrency gate doesn't interfere with this test. // The injected activePerAgent is only to make pickAgent prefer "claude", // verifying that explicit agent type bypasses load balancing. pool.maxPerAgent = 10 // Inject 2 active tasks for gemini, 0 for claude. // pickAgent would normally pick "claude". pool.mu.Lock() pool.activePerAgent["gemini"] = 2 pool.mu.Unlock() tk := makeTask("specific-gemini") tk.Agent.Type = "gemini" store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } <-pool.Results() if geminiRunner.callCount() != 1 { t.Errorf("expected gemini runner to be called once, got %d", geminiRunner.callCount()) } if claudeRunner.callCount() != 0 { t.Errorf("expected claude runner to NOT be called, got %d", claudeRunner.callCount()) } } // TestPool_SpecificAgent_PersistsToDB verifies that if a specific agent // is requested, it is persisted to the database before the task runs. func TestPool_SpecificAgent_PersistsToDB(t *testing.T) { store := testStore(t) geminiRunner := &mockRunner{} runners := map[string]Runner{ "gemini": geminiRunner, } logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(4, runners, store, logger) tk := makeTask("persist-gemini") tk.Agent.Type = "gemini" store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } <-pool.Results() // Check the task in the database. reloaded, err := store.GetTask(tk.ID) if err != nil { t.Fatalf("get task: %v", err) } if reloaded.Agent.Type != "gemini" { t.Errorf("expected agent type gemini in DB, got %q", reloaded.Agent.Type) } } // TestExecute_ExtractAndStoreChangestats verifies that when the execution stdout // contains a git diff --stat summary line, the changestats are parsed and stored. func TestExecute_ExtractAndStoreChangestats(t *testing.T) { store := testStore(t) logDir := t.TempDir() runner := &logPatherMockRunner{logDir: logDir} runner.onRun = func(tk *task.Task, e *storage.Execution) error { if err := os.MkdirAll(filepath.Dir(e.StdoutPath), 0755); err != nil { return err } content := "some output\n5 files changed, 127 insertions(+), 43 deletions(-)\n" return os.WriteFile(e.StdoutPath, []byte(content), 0644) } runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("cs-extract-1") store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } result := <-pool.Results() if result.Err != nil { t.Fatalf("unexpected error: %v", result.Err) } execs, err := store.ListExecutions(tk.ID) if err != nil { t.Fatalf("list executions: %v", err) } if len(execs) == 0 { t.Fatal("no executions found") } cs := execs[0].Changestats if cs == nil { t.Fatal("expected changestats to be populated, got nil") } if cs.FilesChanged != 5 { t.Errorf("FilesChanged: want 5, got %d", cs.FilesChanged) } if cs.LinesAdded != 127 { t.Errorf("LinesAdded: want 127, got %d", cs.LinesAdded) } if cs.LinesRemoved != 43 { t.Errorf("LinesRemoved: want 43, got %d", cs.LinesRemoved) } } // TestExecute_NoChangestats verifies that when execution output contains no git // diff stat line, changestats are not stored (remain nil). func TestExecute_NoChangestats(t *testing.T) { store := testStore(t) logDir := t.TempDir() runner := &logPatherMockRunner{logDir: logDir} runner.onRun = func(tk *task.Task, e *storage.Execution) error { if err := os.MkdirAll(filepath.Dir(e.StdoutPath), 0755); err != nil { return err } return os.WriteFile(e.StdoutPath, []byte("no git output here\n"), 0644) } runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("cs-none-1") store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } result := <-pool.Results() if result.Err != nil { t.Fatalf("unexpected error: %v", result.Err) } execs, err := store.ListExecutions(tk.ID) if err != nil { t.Fatalf("list executions: %v", err) } if len(execs) == 0 { t.Fatal("no executions found") } if execs[0].Changestats != nil { t.Errorf("expected changestats to be nil for output with no git stats, got %+v", execs[0].Changestats) } } // TestExecute_MalformedChangestats verifies that malformed git-stat-like output // does not produce changestats (parser returns nil, nothing is stored). func TestExecute_MalformedChangestats(t *testing.T) { store := testStore(t) logDir := t.TempDir() runner := &logPatherMockRunner{logDir: logDir} runner.onRun = func(tk *task.Task, e *storage.Execution) error { if err := os.MkdirAll(filepath.Dir(e.StdoutPath), 0755); err != nil { return err } // Looks like a git stat line but doesn't match the regex. return os.WriteFile(e.StdoutPath, []byte("lots of cheese changed, many insertions\n"), 0644) } runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("cs-malformed-1") store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } result := <-pool.Results() if result.Err != nil { t.Fatalf("unexpected error: %v", result.Err) } execs, err := store.ListExecutions(tk.ID) if err != nil { t.Fatalf("list executions: %v", err) } if len(execs) == 0 { t.Fatal("no executions found") } if execs[0].Changestats != nil { t.Errorf("expected nil changestats for malformed output, got %+v", execs[0].Changestats) } } func TestPool_MaxPerAgent_BlocksSecondTask(t *testing.T) { store := testStore(t) var mu sync.Mutex concurrentRuns := 0 maxConcurrent := 0 runner := &mockRunner{ delay: 100 * time.Millisecond, onRun: func(tk *task.Task, e *storage.Execution) error { mu.Lock() concurrentRuns++ if concurrentRuns > maxConcurrent { maxConcurrent = concurrentRuns } mu.Unlock() time.Sleep(100 * time.Millisecond) mu.Lock() concurrentRuns-- mu.Unlock() return nil }, } runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) // pool size 2, but maxPerAgent=1 pool.requeueDelay = 50 * time.Millisecond // speed up test tk1 := makeTask("mpa-1") tk2 := makeTask("mpa-2") store.CreateTask(tk1) store.CreateTask(tk2) pool.Submit(context.Background(), tk1) pool.Submit(context.Background(), tk2) for i := 0; i < 2; i++ { select { case <-pool.Results(): case <-time.After(10 * time.Second): t.Fatal("timed out waiting for result") } } mu.Lock() got := maxConcurrent mu.Unlock() if got > 1 { t.Errorf("maxPerAgent=1 violated: %d claude tasks ran concurrently", got) } } func TestPool_MaxPerAgent_AllowsDifferentAgents(t *testing.T) { store := testStore(t) var mu sync.Mutex concurrentRuns := 0 maxConcurrent := 0 makeSlowRunner := func() *mockRunner { return &mockRunner{ onRun: func(tk *task.Task, e *storage.Execution) error { mu.Lock() concurrentRuns++ if concurrentRuns > maxConcurrent { maxConcurrent = concurrentRuns } mu.Unlock() time.Sleep(80 * time.Millisecond) mu.Lock() concurrentRuns-- mu.Unlock() return nil }, } } runners := map[string]Runner{ "claude": makeSlowRunner(), "gemini": makeSlowRunner(), } logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk1 := makeTask("da-1") tk1.Agent.Type = "claude" tk2 := makeTask("da-2") tk2.Agent.Type = "gemini" store.CreateTask(tk1) store.CreateTask(tk2) pool.Submit(context.Background(), tk1) pool.Submit(context.Background(), tk2) for i := 0; i < 2; i++ { select { case <-pool.Results(): case <-time.After(5 * time.Second): t.Fatal("timed out waiting for result") } } mu.Lock() got := maxConcurrent mu.Unlock() if got < 2 { t.Errorf("different agents should run concurrently; max concurrent was %d", got) } } func TestPool_ConsecutiveFailures_DrainAtThree(t *testing.T) { store := testStore(t) runner := &mockRunner{err: fmt.Errorf("boom")} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(3, runners, store, logger) // Two failures should NOT drain. for _, id := range []string{"cf-1", "cf-2"} { tk := makeTask(id) store.CreateTask(tk) pool.Submit(context.Background(), tk) <-pool.Results() } pool.mu.Lock() draineEarly := pool.drained["claude"] pool.mu.Unlock() if draineEarly { t.Error("expected claude NOT drained after only 2 failures") } // Third failure should drain. tk3 := makeTask("cf-3") store.CreateTask(tk3) pool.Submit(context.Background(), tk3) <-pool.Results() pool.mu.Lock() drained := pool.drained["claude"] failures := pool.consecutiveFailures["claude"] pool.mu.Unlock() if !drained { t.Error("expected claude to be drained after 3 consecutive failures") } if failures < 3 { t.Errorf("expected consecutiveFailures >= 3, got %d", failures) } // The third task should have a drain question set. tk3fetched, err := store.GetTask("cf-3") if err != nil { t.Fatalf("GetTask: %v", err) } if tk3fetched.QuestionJSON == "" { t.Error("expected drain question to be set on task after drain") } } func TestPool_ConsecutiveFailures_ResetOnSuccess(t *testing.T) { store := testStore(t) callCount := 0 runner := &mockRunner{ onRun: func(tk *task.Task, e *storage.Execution) error { callCount++ if callCount == 1 { return fmt.Errorf("first failure") } return nil // second call succeeds }, } runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) // First task fails tk1 := makeTask("rs-1") store.CreateTask(tk1) pool.Submit(context.Background(), tk1) <-pool.Results() pool.mu.Lock() failsBefore := pool.consecutiveFailures["claude"] pool.mu.Unlock() if failsBefore != 1 { t.Errorf("expected 1 failure after first task, got %d", failsBefore) } // Second task succeeds tk2 := makeTask("rs-2") store.CreateTask(tk2) pool.Submit(context.Background(), tk2) <-pool.Results() pool.mu.Lock() failsAfter := pool.consecutiveFailures["claude"] isDrained := pool.drained["claude"] pool.mu.Unlock() if failsAfter != 0 { t.Errorf("expected consecutiveFailures reset to 0 after success, got %d", failsAfter) } if isDrained { t.Error("expected drained to be false after success") } } func TestPool_CheckStoryCompletion_AllComplete(t *testing.T) { store := testStore(t) logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger) // Create a story in IN_PROGRESS state. now := time.Now().UTC() story := &task.Story{ ID: "story-comp-1", Name: "Completion Test", Status: task.StoryInProgress, CreatedAt: now, UpdatedAt: now, } if err := store.CreateStory(story); err != nil { t.Fatalf("CreateStory: %v", err) } // Create two story tasks and drive them through valid transitions to COMPLETED. for i, id := range []string{"sctask-1", "sctask-2"} { tk := makeTask(id) tk.StoryID = "story-comp-1" tk.ParentTaskID = "fake-parent" // so it goes to COMPLETED tk.State = task.StatePending if err := store.CreateTask(tk); err != nil { t.Fatalf("CreateTask %d: %v", i, err) } for _, s := range []task.State{task.StateQueued, task.StateRunning, task.StateCompleted} { if err := store.UpdateTaskState(id, s); err != nil { t.Fatalf("UpdateTaskState %s → %s: %v", id, s, err) } } } pool.checkStoryCompletion(context.Background(), "story-comp-1") got, err := store.GetStory("story-comp-1") if err != nil { t.Fatalf("GetStory: %v", err) } if got.Status != task.StoryShippable { t.Errorf("story status: want SHIPPABLE, got %v", got.Status) } } func TestPool_CheckStoryCompletion_PartialComplete(t *testing.T) { store := testStore(t) logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger) now := time.Now().UTC() story := &task.Story{ ID: "story-partial-1", Name: "Partial Test", Status: task.StoryInProgress, CreatedAt: now, UpdatedAt: now, } if err := store.CreateStory(story); err != nil { t.Fatalf("CreateStory: %v", err) } // First task driven to COMPLETED. tk1 := makeTask("sptask-1") tk1.StoryID = "story-partial-1" tk1.ParentTaskID = "fake-parent" store.CreateTask(tk1) for _, s := range []task.State{task.StateQueued, task.StateRunning, task.StateCompleted} { store.UpdateTaskState("sptask-1", s) } // Second task still in PENDING (not done). tk2 := makeTask("sptask-2") tk2.StoryID = "story-partial-1" tk2.ParentTaskID = "fake-parent" store.CreateTask(tk2) pool.checkStoryCompletion(context.Background(), "story-partial-1") got, err := store.GetStory("story-partial-1") if err != nil { t.Fatalf("GetStory: %v", err) } if got.Status != task.StoryInProgress { t.Errorf("story status: want IN_PROGRESS (no transition), got %v", got.Status) } } func TestPool_Undrain_ResumesExecution(t *testing.T) { store := testStore(t) // Force drain state runner := &mockRunner{} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) pool.mu.Lock() pool.drained["claude"] = true pool.consecutiveFailures["claude"] = 3 pool.mu.Unlock() // Undrain pool.UndrainingAgent("claude") pool.mu.Lock() drained := pool.drained["claude"] failures := pool.consecutiveFailures["claude"] pool.mu.Unlock() if drained { t.Error("expected drained=false after UndrainingAgent") } if failures != 0 { t.Errorf("expected consecutiveFailures=0 after UndrainingAgent, got %d", failures) } // Verify a task can now run tk := makeTask("undrain-1") store.CreateTask(tk) pool.Submit(context.Background(), tk) select { case result := <-pool.Results(): if result.Err != nil { t.Errorf("unexpected error after undrain: %v", result.Err) } case <-time.After(5 * time.Second): t.Fatal("timed out waiting for task after undrain") } } func TestPool_StoryDeploy_RunsDeployScript(t *testing.T) { store := testStore(t) runner := &mockRunner{} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) // Create a deploy script that writes a marker file. tmpDir := t.TempDir() markerFile := filepath.Join(tmpDir, "deployed.marker") scriptPath := filepath.Join(tmpDir, "deploy.sh") scriptContent := "#!/bin/sh\ntouch " + markerFile + "\n" if err := os.WriteFile(scriptPath, []byte(scriptContent), 0755); err != nil { t.Fatalf("write deploy script: %v", err) } proj := &task.Project{ ID: "proj-deploy-1", Name: "Deploy Test Project", DeployScript: scriptPath, } if err := store.CreateProject(proj); err != nil { t.Fatalf("create project: %v", err) } story := &task.Story{ ID: "story-deploy-1", Name: "Deploy Test Story", ProjectID: proj.ID, Status: task.StoryShippable, } if err := store.CreateStory(story); err != nil { t.Fatalf("create story: %v", err) } pool.triggerStoryDeploy(context.Background(), story.ID) if _, err := os.Stat(markerFile); os.IsNotExist(err) { t.Error("deploy script did not run: marker file not found") } got, err := store.GetStory(story.ID) if err != nil { t.Fatalf("get story: %v", err) } if got.Status != task.StoryDeployed { t.Errorf("story status: want DEPLOYED, got %q", got.Status) } } func runGit(t *testing.T, dir string, args ...string) { t.Helper() cmd := exec.Command("git", args...) if dir != "" { cmd.Dir = dir } if out, err := cmd.CombinedOutput(); err != nil { t.Fatalf("git %v: %v\n%s", args, err, out) } } func TestPool_StoryDeploy_MergesStoryBranch(t *testing.T) { tmpDir := t.TempDir() // Set up bare repo + working copy with a story branch. bareDir := filepath.Join(tmpDir, "bare.git") localDir := filepath.Join(tmpDir, "local") runGit(t, "", "init", "--bare", bareDir) runGit(t, "", "clone", bareDir, localDir) runGit(t, localDir, "config", "user.email", "test@test.com") runGit(t, localDir, "config", "user.name", "Test") // Initial commit on main. runGit(t, localDir, "checkout", "-b", "main") os.WriteFile(filepath.Join(localDir, "README.md"), []byte("initial"), 0644) runGit(t, localDir, "add", ".") runGit(t, localDir, "commit", "-m", "initial") runGit(t, localDir, "push", "-u", "origin", "main") // Story branch with a feature commit. runGit(t, localDir, "checkout", "-b", "story/test-feature") os.WriteFile(filepath.Join(localDir, "feature.go"), []byte("package main"), 0644) runGit(t, localDir, "add", ".") runGit(t, localDir, "commit", "-m", "feature work") runGit(t, localDir, "push", "origin", "story/test-feature") runGit(t, localDir, "checkout", "main") store := testStore(t) logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger) scriptPath := filepath.Join(tmpDir, "deploy.sh") os.WriteFile(scriptPath, []byte("#!/bin/sh\nexit 0\n"), 0755) proj := &task.Project{ ID: "proj-merge-1", Name: "Merge Test", LocalPath: localDir, DeployScript: scriptPath, } if err := store.CreateProject(proj); err != nil { t.Fatalf("create project: %v", err) } story := &task.Story{ ID: "story-merge-1", Name: "Merge Test Story", ProjectID: proj.ID, BranchName: "story/test-feature", Status: task.StoryShippable, } if err := store.CreateStory(story); err != nil { t.Fatalf("create story: %v", err) } pool.triggerStoryDeploy(context.Background(), story.ID) // feature.go should now be on main in the working copy. if _, err := os.Stat(filepath.Join(localDir, "feature.go")); os.IsNotExist(err) { t.Error("story branch was not merged to main: feature.go missing") } got, _ := store.GetStory(story.ID) if got.Status != task.StoryDeployed { t.Errorf("story status: want DEPLOYED, got %q", got.Status) } } func TestPool_PostDeploy_CreatesValidationTask(t *testing.T) { store := testStore(t) logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger) now := time.Now().UTC() validationSpec := `{"type":"smoke","steps":["curl /health"],"success_criteria":"status 200"}` story := &task.Story{ ID: "story-postdeploy-1", Name: "Post Deploy Test", Status: task.StoryDeployed, ValidationJSON: validationSpec, CreatedAt: now, UpdatedAt: now, } if err := store.CreateStory(story); err != nil { t.Fatalf("CreateStory: %v", err) } pool.createValidationTask(context.Background(), story.ID) // Story should now be VALIDATING. got, err := store.GetStory(story.ID) if err != nil { t.Fatalf("GetStory: %v", err) } if got.Status != task.StoryValidating { t.Errorf("story status: want VALIDATING, got %q", got.Status) } // A validation task should have been created. tasks, err := store.ListTasksByStory(story.ID) if err != nil { t.Fatalf("ListTasksByStory: %v", err) } if len(tasks) == 0 { t.Fatal("expected a validation task to be created, got none") } vtask := tasks[0] if !strings.Contains(strings.ToLower(vtask.Name), "validation") { t.Errorf("task name %q does not contain 'validation'", vtask.Name) } if vtask.StoryID != story.ID { t.Errorf("task story_id: want %q, got %q", story.ID, vtask.StoryID) } if !strings.Contains(vtask.Agent.Instructions, "smoke") { t.Errorf("task instructions %q do not reference validation spec content", vtask.Agent.Instructions) } } func TestPool_ValidationTask_Pass_SetsReviewReady(t *testing.T) { store := testStore(t) logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger) now := time.Now().UTC() story := &task.Story{ ID: "story-val-pass-1", Name: "Validation Pass", Status: task.StoryValidating, CreatedAt: now, UpdatedAt: now, } if err := store.CreateStory(story); err != nil { t.Fatalf("CreateStory: %v", err) } pool.checkValidationResult(context.Background(), story.ID, task.StateCompleted, "") got, err := store.GetStory(story.ID) if err != nil { t.Fatalf("GetStory: %v", err) } if got.Status != task.StoryReviewReady { t.Errorf("story status: want REVIEW_READY, got %q", got.Status) } } // TestPool_DependsOn_NoDeadlock verifies that a task waiting for a dependency // does NOT hold the per-agent slot, allowing the dependency to run first. func TestPool_DependsOn_NoDeadlock(t *testing.T) { store := testStore(t) runner := &mockRunner{} // succeeds immediately pool := NewPool(2, map[string]Runner{"claude": runner}, store, slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))) pool.requeueDelay = 10 * time.Millisecond // Task A has no deps; Task B depends on A. taskA := makeTask("dep-a") taskA.State = task.StateQueued taskB := makeTask("dep-b") taskB.DependsOn = []string{"dep-a"} taskB.State = task.StateQueued store.CreateTask(taskA) store.CreateTask(taskB) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() // Submit B first — it should not deadlock by holding the slot while waiting for A. pool.Submit(ctx, taskB) pool.Submit(ctx, taskA) var gotA, gotB bool for i := 0; i < 2; i++ { select { case res := <-pool.Results(): if res.TaskID == "dep-a" { gotA = true } if res.TaskID == "dep-b" { gotB = true } case <-ctx.Done(): t.Fatal("timeout: likely deadlock — dep-b held the slot while waiting for dep-a") } } if !gotA || !gotB { t.Errorf("expected both tasks to complete: gotA=%v gotB=%v", gotA, gotB) } // B must complete after A. ta, _ := store.GetTask("dep-a") tb, _ := store.GetTask("dep-b") if ta.State != task.StateReady && ta.State != task.StateCompleted { t.Errorf("dep-a should be READY/COMPLETED, got %s", ta.State) } if tb.State != task.StateReady && tb.State != task.StateCompleted { t.Errorf("dep-b should be READY/COMPLETED, got %s", tb.State) } } func TestCreateValidationTask_InstructionsIncludeNamedCriteria(t *testing.T) { store := testStore(t) runner := &mockRunner{} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) now := time.Now().UTC() story := &task.Story{ ID: "story-named-criteria-1", Name: "Named Criteria Story", Status: task.StoryDeployed, CreatedAt: now, UpdatedAt: now, ValidationJSON: `{ "type": "automated", "acceptance_criteria": [ {"name": "API returns 200", "verification": "GET /health returns HTTP 200", "test_ref": "TestHealthEndpoint"}, {"name": "DB migration applied", "verification": "table users exists", "test_ref": ""}, {"name": "Auth header required", "verification": "requests without token get 401"} ] }`, } if err := store.CreateStory(story); err != nil { t.Fatalf("CreateStory: %v", err) } pool.createValidationTask(context.Background(), story.ID) // Drain the submitted task from results. select { case <-pool.Results(): case <-time.After(5 * time.Second): t.Fatal("timed out waiting for validation task result") } tasks, err := store.ListTasksByStory(story.ID) if err != nil { t.Fatalf("ListTasksByStory: %v", err) } if len(tasks) != 1 { t.Fatalf("expected 1 validation task, got %d", len(tasks)) } instructions := tasks[0].Agent.Instructions // Each criterion name must appear as a formatted list item (not merely in raw JSON). for _, line := range []string{ "- API returns 200:", "- DB migration applied:", "- Auth header required:", } { if !strings.Contains(instructions, line) { t.Errorf("instructions missing formatted criterion line %q\n\nfull instructions:\n%s", line, instructions) } } // test_ref should appear for criteria that have one. if !strings.Contains(instructions, "TestHealthEndpoint") { t.Errorf("instructions should include test_ref 'TestHealthEndpoint'\n\nfull instructions:\n%s", instructions) } } func TestPool_ValidationTask_Fail_SetsNeedsFix(t *testing.T) { store := testStore(t) logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger) now := time.Now().UTC() story := &task.Story{ ID: "story-val-fail-1", Name: "Validation Fail", Status: task.StoryValidating, CreatedAt: now, UpdatedAt: now, } if err := store.CreateStory(story); err != nil { t.Fatalf("CreateStory: %v", err) } execErr := "smoke test failed: /health returned 503" pool.checkValidationResult(context.Background(), story.ID, task.StateFailed, execErr) got, err := store.GetStory(story.ID) if err != nil { t.Fatalf("GetStory: %v", err) } if got.Status != task.StoryNeedsFix { t.Errorf("story status: want NEEDS_FIX, got %q", got.Status) } }