package executor import ( "context" "fmt" "log/slog" "os" "path/filepath" "strings" "sync" "testing" "time" "github.com/thepeterstone/claudomator/internal/storage" "github.com/thepeterstone/claudomator/internal/task" ) // capturingHandler is a slog.Handler that records log records for assertions. type capturingHandler struct { mu sync.Mutex records []slog.Record } func (h *capturingHandler) Enabled(_ context.Context, _ slog.Level) bool { return true } func (h *capturingHandler) Handle(_ context.Context, r slog.Record) error { h.mu.Lock() defer h.mu.Unlock() h.records = append(h.records, r) return nil } func (h *capturingHandler) WithAttrs(attrs []slog.Attr) slog.Handler { return h } func (h *capturingHandler) WithGroup(name string) slog.Handler { return h } func (h *capturingHandler) hasMessageContaining(substr string) bool { h.mu.Lock() defer h.mu.Unlock() for _, r := range h.records { if strings.Contains(r.Message, substr) { return true } } return false } // failingStore wraps a real DB but returns an error for UpdateTaskState calls. type failingStore struct { *storage.DB updateStateErr error } func (f *failingStore) UpdateTaskState(id string, newState task.State) error { return f.updateStateErr } // mockRunner implements Runner for testing. type mockRunner struct { mu sync.Mutex calls int delay time.Duration err error exitCode int onRun func(*task.Task, *storage.Execution) error } func (m *mockRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error { m.mu.Lock() m.calls++ cb := m.onRun m.mu.Unlock() if m.delay > 0 { select { case <-time.After(m.delay): case <-ctx.Done(): return ctx.Err() } } if cb != nil { return cb(t, e) } if m.err != nil { e.ExitCode = m.exitCode return m.err } return nil } func (m *mockRunner) callCount() int { m.mu.Lock() defer m.mu.Unlock() return m.calls } func testStore(t *testing.T) *storage.DB { t.Helper() dbPath := filepath.Join(t.TempDir(), "test.db") db, err := storage.Open(dbPath) if err != nil { t.Fatal(err) } t.Cleanup(func() { db.Close() }) return db } func makeTask(id string) *task.Task { now := time.Now().UTC() return &task.Task{ ID: id, Name: "Test " + id, Agent: task.AgentConfig{Type: "claude", Instructions: "test"}, Priority: task.PriorityNormal, Retry: task.RetryConfig{MaxAttempts: 1, Backoff: "linear"}, Tags: []string{}, DependsOn: []string{}, State: task.StateQueued, CreatedAt: now, UpdatedAt: now, } } func TestPickAgent_PrefersLessActiveAgent(t *testing.T) { status := SystemStatus{ ActiveTasks: map[string]int{"claude": 3, "gemini": 1}, RateLimited: map[string]bool{"claude": false, "gemini": false}, } if got := pickAgent(status); got != "gemini" { t.Errorf("expected gemini (fewer active tasks), got %s", got) } } func TestPickAgent_SkipsRateLimitedAgent(t *testing.T) { status := SystemStatus{ ActiveTasks: map[string]int{"claude": 0, "gemini": 5}, RateLimited: map[string]bool{"claude": true, "gemini": false}, } if got := pickAgent(status); got != "gemini" { t.Errorf("expected gemini (claude rate limited), got %s", got) } } func TestPickAgent_FallsBackWhenAllRateLimited(t *testing.T) { status := SystemStatus{ ActiveTasks: map[string]int{"claude": 2, "gemini": 5}, RateLimited: map[string]bool{"claude": true, "gemini": true}, } // Falls back to least active regardless of rate limit. if got := pickAgent(status); got != "claude" { t.Errorf("expected claude (fewer active tasks among all), got %s", got) } } func TestPickAgent_TieBreakPrefersFirstAlpha(t *testing.T) { status := SystemStatus{ ActiveTasks: map[string]int{"claude": 2, "gemini": 2}, RateLimited: map[string]bool{"claude": false, "gemini": false}, } got := pickAgent(status) if got != "claude" && got != "gemini" { t.Errorf("unexpected agent %q on tie", got) } } func TestPool_Submit_TopLevel_GoesToReady(t *testing.T) { store := testStore(t) runner := &mockRunner{} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("ps-1") // no ParentTaskID → top-level store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } result := <-pool.Results() if result.Err != nil { t.Errorf("expected no error, got: %v", result.Err) } if result.Execution.Status != "READY" { t.Errorf("status: want READY, got %q", result.Execution.Status) } got, _ := store.GetTask("ps-1") if got.State != task.StateReady { t.Errorf("task state: want READY, got %v", got.State) } } func TestPool_Submit_Subtask_GoesToCompleted(t *testing.T) { store := testStore(t) runner := &mockRunner{} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("sub-1") tk.ParentTaskID = "parent-99" // subtask store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } result := <-pool.Results() if result.Err != nil { t.Errorf("expected no error, got: %v", result.Err) } if result.Execution.Status != "COMPLETED" { t.Errorf("status: want COMPLETED, got %q", result.Execution.Status) } got, _ := store.GetTask("sub-1") if got.State != task.StateCompleted { t.Errorf("task state: want COMPLETED, got %v", got.State) } } func TestPool_Submit_Failure(t *testing.T) { store := testStore(t) runner := &mockRunner{err: fmt.Errorf("boom"), exitCode: 1} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("pf-1") store.CreateTask(tk) pool.Submit(context.Background(), tk) result := <-pool.Results() if result.Err == nil { t.Fatal("expected error") } if result.Execution.Status != "FAILED" { t.Errorf("status: want FAILED, got %q", result.Execution.Status) } } // TestPool_UpdateTaskState_DBError_IsLoggedAndResultDelivered verifies that // when UpdateTaskState returns an error, the error is logged with structured // context (taskID, state) and the execution result is still sent to resultCh. func TestPool_UpdateTaskState_DBError_IsLoggedAndResultDelivered(t *testing.T) { db := testStore(t) store := &failingStore{DB: db, updateStateErr: fmt.Errorf("db write failed")} handler := &capturingHandler{} logger := slog.New(handler) runner := &mockRunner{err: fmt.Errorf("runner error")} runners := map[string]Runner{"claude": runner} pool := NewPool(2, runners, store, logger) tk := makeTask("dberr-1") db.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } select { case result := <-pool.Results(): // Result must still arrive despite the DB error. if result == nil { t.Fatal("expected non-nil result") } if result.TaskID != tk.ID { t.Errorf("taskID: want %q, got %q", tk.ID, result.TaskID) } case <-time.After(5 * time.Second): t.Fatal("timed out waiting for result — result not delivered despite DB error") } if !handler.hasMessageContaining("failed to update task state") { t.Error("expected 'failed to update task state' log entry, but none found") } } func TestPool_Submit_Timeout(t *testing.T) { store := testStore(t) runner := &mockRunner{delay: 5 * time.Second} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("pt-1") tk.Timeout.Duration = 50 * time.Millisecond store.CreateTask(tk) pool.Submit(context.Background(), tk) result := <-pool.Results() if result.Execution.Status != "TIMED_OUT" { t.Errorf("status: want TIMED_OUT, got %q", result.Execution.Status) } } func TestPool_Submit_Cancellation(t *testing.T) { store := testStore(t) runner := &mockRunner{delay: 5 * time.Second} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) ctx, cancel := context.WithCancel(context.Background()) tk := makeTask("pc-1") store.CreateTask(tk) pool.Submit(ctx, tk) time.Sleep(20 * time.Millisecond) cancel() result := <-pool.Results() if result.Execution.Status != "CANCELLED" { t.Errorf("status: want CANCELLED, got %q", result.Execution.Status) } } func TestPool_Cancel_StopsRunningTask(t *testing.T) { store := testStore(t) runner := &mockRunner{delay: 5 * time.Second} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("cancel-1") store.CreateTask(tk) pool.Submit(context.Background(), tk) time.Sleep(20 * time.Millisecond) // let goroutine start if ok := pool.Cancel("cancel-1"); !ok { t.Fatal("Cancel returned false for a running task") } result := <-pool.Results() if result.Execution.Status != "CANCELLED" { t.Errorf("status: want CANCELLED, got %q", result.Execution.Status) } } func TestPool_Cancel_UnknownTask_ReturnsFalse(t *testing.T) { store := testStore(t) runner := &mockRunner{} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) if ok := pool.Cancel("nonexistent"); ok { t.Error("Cancel returned true for unknown task") } } // TestPool_QueuedWhenAtCapacity verifies that Submit enqueues a task rather than // returning an error when the pool is at capacity. Both tasks should eventually complete. func TestPool_QueuedWhenAtCapacity(t *testing.T) { store := testStore(t) runner := &mockRunner{delay: 100 * time.Millisecond} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(1, runners, store, logger) tk1 := makeTask("queue-1") store.CreateTask(tk1) if err := pool.Submit(context.Background(), tk1); err != nil { t.Fatalf("first submit: %v", err) } // Second submit must succeed (queued) even though pool slot is taken. tk2 := makeTask("queue-2") store.CreateTask(tk2) if err := pool.Submit(context.Background(), tk2); err != nil { t.Fatalf("second submit: %v — expected task to be queued, not rejected", err) } // Both tasks must complete. for i := 0; i < 2; i++ { r := <-pool.Results() if r.Err != nil { t.Errorf("task %s error: %v", r.TaskID, r.Err) } } } // logPatherMockRunner is a mockRunner that also implements LogPather, // and captures the StdoutPath seen when Run() is called. type logPatherMockRunner struct { mockRunner logDir string capturedPath string } func (m *logPatherMockRunner) ExecLogDir(execID string) string { return filepath.Join(m.logDir, execID) } func (m *logPatherMockRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error { m.mu.Lock() m.capturedPath = e.StdoutPath m.mu.Unlock() return m.mockRunner.Run(ctx, t, e) } // TestPool_Execute_LogPathsPreSetBeforeRun verifies that when the runner // implements LogPather, log paths are set on the execution before Run() is // called — so they land in the DB at CreateExecution time, not just at // UpdateExecution time. func TestPool_Execute_LogPathsPreSetBeforeRun(t *testing.T) { store := testStore(t) runner := &logPatherMockRunner{logDir: t.TempDir()} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("lp-1") store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } result := <-pool.Results() runner.mu.Lock() captured := runner.capturedPath runner.mu.Unlock() if captured == "" { t.Fatal("StdoutPath was empty when Run() was called; expected pre-set path") } if !strings.HasSuffix(captured, "stdout.log") { t.Errorf("expected stdout.log suffix, got: %s", captured) } // Path in the returned execution record should match. if result.Execution.StdoutPath != captured { t.Errorf("execution StdoutPath %q != captured %q", result.Execution.StdoutPath, captured) } } // TestPool_Execute_NoLogPather_PathsEmptyBeforeRun verifies that a runner // without LogPather doesn't panic and paths remain empty until Run() sets them. func TestPool_Execute_NoLogPather_PathsEmptyBeforeRun(t *testing.T) { store := testStore(t) runner := &mockRunner{} // does NOT implement LogPather runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("nolp-1") store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } result := <-pool.Results() if result.Err != nil { t.Fatalf("unexpected error: %v", result.Err) } } func TestPool_ConcurrentExecution(t *testing.T) { store := testStore(t) runner := &mockRunner{delay: 50 * time.Millisecond} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(3, runners, store, logger) for i := 0; i < 3; i++ { tk := makeTask(fmt.Sprintf("cc-%d", i)) store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit %d: %v", i, err) } } for i := 0; i < 3; i++ { result := <-pool.Results() if result.Execution.Status != "READY" { t.Errorf("task %s: want READY, got %q", result.TaskID, result.Execution.Status) } } if runner.callCount() != 3 { t.Errorf("calls: want 3, got %d", runner.callCount()) } } func TestWithFailureHistory_NoFailures_ReturnsUnchanged(t *testing.T) { tk := makeTask("no-fail") result := withFailureHistory(tk, nil, nil) if result != tk { t.Error("expected same pointer when no prior executions") } } func TestWithFailureHistory_WithError_ReturnsUnchanged(t *testing.T) { tk := makeTask("err-case") result := withFailureHistory(tk, nil, fmt.Errorf("db error")) if result != tk { t.Error("expected same pointer when ListExecutions errors") } } func TestWithFailureHistory_InjectsFailedHistory(t *testing.T) { tk := makeTask("with-fail") tk.Agent.Instructions = "do the work" execs := []*storage.Execution{ {ID: "e1", Status: "FAILED", ErrorMsg: "sandbox: uncommitted changes", StartTime: time.Now()}, {ID: "e2", Status: "COMPLETED", ErrorMsg: "", StartTime: time.Now()}, // not a failure, should be ignored } result := withFailureHistory(tk, execs, nil) if result == tk { t.Fatal("expected a new task copy, got same pointer") } if !strings.Contains(result.Agent.SystemPromptAppend, "Prior Attempt History") { t.Errorf("expected history header in SystemPromptAppend, got: %q", result.Agent.SystemPromptAppend) } if !strings.Contains(result.Agent.SystemPromptAppend, "sandbox: uncommitted changes") { t.Errorf("expected error message in SystemPromptAppend, got: %q", result.Agent.SystemPromptAppend) } // COMPLETED execution should not appear if strings.Contains(result.Agent.SystemPromptAppend, "e2") { t.Errorf("COMPLETED execution should not appear in history") } // Original instructions unchanged if result.Agent.Instructions != "do the work" { t.Errorf("instructions should be unchanged, got: %q", result.Agent.Instructions) } } func TestWithFailureHistory_PreservesExistingSystemPrompt(t *testing.T) { tk := makeTask("with-prompt") tk.Agent.SystemPromptAppend = "existing prompt" execs := []*storage.Execution{ {ID: "e1", Status: "FAILED", ErrorMsg: "some error", StartTime: time.Now()}, } result := withFailureHistory(tk, execs, nil) if !strings.Contains(result.Agent.SystemPromptAppend, "Prior Attempt History") { t.Error("expected history prepended") } if !strings.Contains(result.Agent.SystemPromptAppend, "existing prompt") { t.Error("expected existing prompt preserved") } // History must come BEFORE the existing prompt histIdx := strings.Index(result.Agent.SystemPromptAppend, "Prior Attempt History") existIdx := strings.Index(result.Agent.SystemPromptAppend, "existing prompt") if histIdx > existIdx { t.Error("history should be prepended before existing system prompt") } } func TestPool_FailureHistoryInjectedOnRetry(t *testing.T) { store := testStore(t) var capturedPrompt string runner := &mockRunner{} runner.onRun = func(t *task.Task, _ *storage.Execution) error { capturedPrompt = t.Agent.SystemPromptAppend return nil } runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("retry-hist") store.CreateTask(tk) // Simulate a prior failed execution. store.CreateExecution(&storage.Execution{ ID: "prior-exec", TaskID: tk.ID, StartTime: time.Now(), EndTime: time.Now(), Status: "FAILED", ErrorMsg: "prior failure reason", }) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } <-pool.Results() if !strings.Contains(capturedPrompt, "prior failure reason") { t.Errorf("expected prior failure in system prompt, got: %q", capturedPrompt) } } func TestPool_RecoverStaleRunning(t *testing.T) { store := testStore(t) logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger) // Create a task already in RUNNING state (simulating a crashed server). tk := makeTask("stale-1") tk.State = task.StateRunning store.CreateTask(tk) // Add an open execution record (no end time, status RUNNING). store.CreateExecution(&storage.Execution{ ID: "exec-stale-1", TaskID: tk.ID, StartTime: time.Now().Add(-5 * time.Minute), Status: "RUNNING", }) pool.RecoverStaleRunning(context.Background()) // Execution record should be closed as FAILED. execs, _ := store.ListExecutions(tk.ID) if len(execs) == 0 || execs[0].Status != "FAILED" { t.Errorf("execution status: want FAILED, got %+v", execs) } if execs[0].ErrorMsg == "" { t.Error("expected non-empty error message on recovered execution") } // Task should be re-queued for retry and complete. select { case result := <-pool.Results(): if result.TaskID != tk.ID { t.Errorf("unexpected task in results: %s", result.TaskID) } case <-time.After(2 * time.Second): t.Fatal("timed out waiting for stale RUNNING task to be re-queued and run") } recovered, err := store.GetTask(tk.ID) if err != nil { t.Fatalf("get task: %v", err) } // Top-level tasks (no parent) go to READY after a successful run. if recovered.State != task.StateReady { t.Errorf("state after re-queue: want READY, got %q", recovered.State) } } func TestPool_RecoverStaleQueued_ResubmitsToPool(t *testing.T) { store := testStore(t) runner := &mockRunner{} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, map[string]Runner{"claude": runner}, store, logger) // Create a task already in QUEUED state (persisted from before a server restart). tk := makeTask("stale-queued-1") tk.State = task.StateQueued store.CreateTask(tk) pool.RecoverStaleQueued(context.Background()) // Wait for the pool to pick it up and complete it. select { case result := <-pool.Results(): if result.TaskID != tk.ID { t.Errorf("unexpected task in results: %s", result.TaskID) } case <-time.After(2 * time.Second): t.Fatal("timed out waiting for stale QUEUED task to complete") } got, err := store.GetTask(tk.ID) if err != nil { t.Fatalf("get task: %v", err) } if got.State != task.StateCompleted && got.State != task.StateReady { t.Errorf("state: want COMPLETED or READY, got %q", got.State) } if runner.callCount() != 1 { t.Errorf("runner call count: want 1, got %d", runner.callCount()) } } func TestPool_RecoverStaleQueued_SkipsNonQueuedTasks(t *testing.T) { store := testStore(t) runner := &mockRunner{} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, map[string]Runner{"claude": runner}, store, logger) // PENDING task should NOT be resubmitted. tk := makeTask("pending-1") tk.State = task.StatePending store.CreateTask(tk) pool.RecoverStaleQueued(context.Background()) time.Sleep(50 * time.Millisecond) if runner.callCount() != 0 { t.Errorf("runner should not have been called for PENDING task, got %d calls", runner.callCount()) } } func TestPool_ActivePerAgent_DeletesZeroEntries(t *testing.T) { store := testStore(t) runner := &mockRunner{} runners := map[string]Runner{"claude": runner, "gemini": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("apa-1") store.CreateTask(tk) // Agent.Type = "claude" pool.Submit(context.Background(), tk) <-pool.Results() pool.mu.Lock() _, exists := pool.activePerAgent["claude"] pool.mu.Unlock() if exists { t.Error("activePerAgent should not have a zero-count entry for claude after task completes") } } func TestPool_RateLimited_StaleEntryCleaned(t *testing.T) { store := testStore(t) runner := &mockRunner{} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) // Inject a stale rate-limit entry (deadline already passed). pool.mu.Lock() pool.rateLimited["claude"] = time.Now().Add(-1 * time.Minute) pool.mu.Unlock() // Submit a task — the execute() path reads rateLimited during classification. tk := makeTask("rl-stale-1") store.CreateTask(tk) pool.Submit(context.Background(), tk) <-pool.Results() pool.mu.Lock() _, exists := pool.rateLimited["claude"] pool.mu.Unlock() if exists { t.Error("stale rate-limit entry should be deleted after deadline passes") } } // TestPool_Submit_TopLevel_NoSubtasks_GoesReady verifies that a top-level task // with no subtasks still transitions to READY after successful execution. func TestPool_Submit_TopLevel_NoSubtasks_GoesReady(t *testing.T) { store := testStore(t) runner := &mockRunner{} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("no-subtasks-1") // no ParentTaskID, no subtasks store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } result := <-pool.Results() if result.Err != nil { t.Errorf("expected no error, got: %v", result.Err) } if result.Execution.Status != "READY" { t.Errorf("status: want READY, got %q", result.Execution.Status) } got, _ := store.GetTask(tk.ID) if got.State != task.StateReady { t.Errorf("task state: want READY, got %v", got.State) } } // TestPool_Submit_TopLevel_WithSubtasks_GoesBlocked verifies that when a // top-level task finishes successfully but has subtasks, it transitions to // BLOCKED (waiting for subtasks) rather than READY. func TestPool_Submit_TopLevel_WithSubtasks_GoesBlocked(t *testing.T) { store := testStore(t) runner := &mockRunner{} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) parent := makeTask("parent-with-subtasks") store.CreateTask(parent) // Create a subtask in the store but do NOT submit it. sub := makeTask("sub-of-parent") sub.ParentTaskID = parent.ID store.CreateTask(sub) if err := pool.Submit(context.Background(), parent); err != nil { t.Fatalf("submit: %v", err) } result := <-pool.Results() if result.Err != nil { t.Errorf("expected no error, got: %v", result.Err) } if result.Execution.Status != "BLOCKED" { t.Errorf("status: want BLOCKED, got %q", result.Execution.Status) } got, _ := store.GetTask(parent.ID) if got.State != task.StateBlocked { t.Errorf("task state: want BLOCKED, got %v", got.State) } } // TestPool_Submit_LastSubtask_UnblocksParent verifies that when the last // remaining subtask completes, the parent task transitions from BLOCKED to READY. func TestPool_Submit_LastSubtask_UnblocksParent(t *testing.T) { store := testStore(t) runner := &mockRunner{} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) // Parent starts BLOCKED (waiting for subtasks). parent := makeTask("unblock-parent-1") parent.State = task.StateBlocked store.CreateTask(parent) // First subtask already completed. sub1 := makeTask("unblock-sub-1a") sub1.ParentTaskID = parent.ID sub1.State = task.StateCompleted store.CreateTask(sub1) // Second (last) subtask — the one we submit. sub2 := makeTask("unblock-sub-1b") sub2.ParentTaskID = parent.ID store.CreateTask(sub2) if err := pool.Submit(context.Background(), sub2); err != nil { t.Fatalf("submit: %v", err) } result := <-pool.Results() if result.Err != nil { t.Errorf("expected no error, got: %v", result.Err) } if result.Execution.Status != "COMPLETED" { t.Errorf("subtask status: want COMPLETED, got %q", result.Execution.Status) } // Parent must now be READY. got, err := store.GetTask(parent.ID) if err != nil { t.Fatalf("get parent: %v", err) } if got.State != task.StateReady { t.Errorf("parent state: want READY, got %v", got.State) } } // TestPool_Submit_NotLastSubtask_ParentStaysBlocked verifies that when a subtask // completes but another sibling subtask is still running, the parent stays BLOCKED. func TestPool_Submit_NotLastSubtask_ParentStaysBlocked(t *testing.T) { store := testStore(t) runner := &mockRunner{} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) parent := makeTask("blocked-parent-2") parent.State = task.StateBlocked store.CreateTask(parent) // First subtask still RUNNING — not done yet. sub1 := makeTask("blocked-sub-2a") sub1.ParentTaskID = parent.ID sub1.State = task.StateRunning store.CreateTask(sub1) // Second subtask — the one we submit. sub2 := makeTask("blocked-sub-2b") sub2.ParentTaskID = parent.ID store.CreateTask(sub2) if err := pool.Submit(context.Background(), sub2); err != nil { t.Fatalf("submit: %v", err) } <-pool.Results() // Parent must remain BLOCKED because sub1 is still RUNNING. got, err := store.GetTask(parent.ID) if err != nil { t.Fatalf("get parent: %v", err) } if got.State != task.StateBlocked { t.Errorf("parent state: want BLOCKED, got %v", got.State) } } // TestPool_Submit_ParentNotBlocked_NoTransition verifies that completing a subtask // does not change the parent's state when the parent is not BLOCKED. func TestPool_Submit_ParentNotBlocked_NoTransition(t *testing.T) { store := testStore(t) runner := &mockRunner{} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) // Parent is already READY (not BLOCKED). parent := makeTask("ready-parent-3") parent.State = task.StateReady store.CreateTask(parent) sub1 := makeTask("ready-sub-3a") sub1.ParentTaskID = parent.ID store.CreateTask(sub1) if err := pool.Submit(context.Background(), sub1); err != nil { t.Fatalf("submit: %v", err) } <-pool.Results() // Parent must remain READY — no spurious state transition. got, err := store.GetTask(parent.ID) if err != nil { t.Fatalf("get parent: %v", err) } if got.State != task.StateReady { t.Errorf("parent state: want READY, got %v", got.State) } } // minimalMockStore is a standalone Store implementation for unit-testing Pool // methods that do not require a real SQLite database. type minimalMockStore struct { mu sync.Mutex tasks map[string]*task.Task executions map[string]*storage.Execution stateUpdates []struct{ id string; state task.State } questionUpdates []string subtasksFunc func(parentID string) ([]*task.Task, error) updateExecErr error updateStateErr error } func newMinimalMockStore() *minimalMockStore { return &minimalMockStore{ tasks: make(map[string]*task.Task), executions: make(map[string]*storage.Execution), } } func (m *minimalMockStore) GetTask(id string) (*task.Task, error) { m.mu.Lock() defer m.mu.Unlock() t, ok := m.tasks[id] if !ok { return nil, fmt.Errorf("task %q not found", id) } return t, nil } func (m *minimalMockStore) ListTasks(_ storage.TaskFilter) ([]*task.Task, error) { return nil, nil } func (m *minimalMockStore) ListSubtasks(parentID string) ([]*task.Task, error) { if m.subtasksFunc != nil { return m.subtasksFunc(parentID) } return nil, nil } func (m *minimalMockStore) ListExecutions(_ string) ([]*storage.Execution, error) { return nil, nil } func (m *minimalMockStore) CreateExecution(e *storage.Execution) error { return nil } func (m *minimalMockStore) UpdateExecution(e *storage.Execution) error { return m.updateExecErr } func (m *minimalMockStore) UpdateTaskState(id string, newState task.State) error { if m.updateStateErr != nil { return m.updateStateErr } m.mu.Lock() m.stateUpdates = append(m.stateUpdates, struct{ id string; state task.State }{id, newState}) if t, ok := m.tasks[id]; ok { t.State = newState } m.mu.Unlock() return nil } func (m *minimalMockStore) UpdateTaskQuestion(taskID, questionJSON string) error { m.mu.Lock() m.questionUpdates = append(m.questionUpdates, questionJSON) m.mu.Unlock() return nil } func (m *minimalMockStore) UpdateTaskSummary(taskID, summary string) error { return nil } func (m *minimalMockStore) AppendTaskInteraction(taskID string, _ task.Interaction) error { return nil } func (m *minimalMockStore) UpdateTaskAgent(id string, agent task.AgentConfig) error { return nil } func (m *minimalMockStore) lastStateUpdate() (string, task.State, bool) { m.mu.Lock() defer m.mu.Unlock() if len(m.stateUpdates) == 0 { return "", "", false } u := m.stateUpdates[len(m.stateUpdates)-1] return u.id, u.state, true } func newPoolWithMockStore(store Store) *Pool { logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) return &Pool{ maxConcurrent: 2, runners: map[string]Runner{"claude": &mockRunner{}}, store: store, logger: logger, activePerAgent: make(map[string]int), rateLimited: make(map[string]time.Time), cancels: make(map[string]context.CancelFunc), resultCh: make(chan *Result, 4), workCh: make(chan workItem, 4), doneCh: make(chan struct{}, 2), Questions: NewQuestionRegistry(), } } // TestHandleRunResult_SharedPath verifies that handleRunResult correctly // classifies runner errors and transitions task state via the store. func TestHandleRunResult_SharedPath(t *testing.T) { t.Run("generic error sets FAILED", func(t *testing.T) { store := newMinimalMockStore() pool := newPoolWithMockStore(store) tk := makeTask("hrr-fail") store.tasks[tk.ID] = tk exec := &storage.Execution{ID: "e1", TaskID: tk.ID, Status: "RUNNING"} ctx := context.Background() pool.handleRunResult(ctx, tk, exec, fmt.Errorf("something broke"), "claude") if exec.Status != "FAILED" { t.Errorf("exec.Status: want FAILED, got %q", exec.Status) } if exec.ErrorMsg != "something broke" { t.Errorf("exec.ErrorMsg: want %q, got %q", "something broke", exec.ErrorMsg) } _, state, ok := store.lastStateUpdate() if !ok || state != task.StateFailed { t.Errorf("expected UpdateTaskState(FAILED), got state=%v ok=%v", state, ok) } result := <-pool.resultCh if result.Err == nil || result.Execution.Status != "FAILED" { t.Errorf("unexpected result: %+v", result) } }) t.Run("nil error top-level no subtasks sets READY", func(t *testing.T) { store := newMinimalMockStore() pool := newPoolWithMockStore(store) tk := makeTask("hrr-ready") store.tasks[tk.ID] = tk exec := &storage.Execution{ID: "e2", TaskID: tk.ID, Status: "RUNNING"} ctx := context.Background() pool.handleRunResult(ctx, tk, exec, nil, "claude") if exec.Status != "READY" { t.Errorf("exec.Status: want READY, got %q", exec.Status) } _, state, ok := store.lastStateUpdate() if !ok || state != task.StateReady { t.Errorf("expected UpdateTaskState(READY), got state=%v ok=%v", state, ok) } result := <-pool.resultCh if result.Err != nil || result.Execution.Status != "READY" { t.Errorf("unexpected result: %+v", result) } }) t.Run("nil error subtask sets COMPLETED", func(t *testing.T) { store := newMinimalMockStore() pool := newPoolWithMockStore(store) parent := makeTask("hrr-parent") parent.State = task.StateBlocked store.tasks[parent.ID] = parent tk := makeTask("hrr-sub") tk.ParentTaskID = parent.ID store.tasks[tk.ID] = tk exec := &storage.Execution{ID: "e3", TaskID: tk.ID, Status: "RUNNING"} ctx := context.Background() pool.handleRunResult(ctx, tk, exec, nil, "claude") if exec.Status != "COMPLETED" { t.Errorf("exec.Status: want COMPLETED, got %q", exec.Status) } result := <-pool.resultCh if result.Err != nil || result.Execution.Status != "COMPLETED" { t.Errorf("unexpected result: %+v", result) } }) t.Run("timeout sets TIMED_OUT", func(t *testing.T) { store := newMinimalMockStore() pool := newPoolWithMockStore(store) tk := makeTask("hrr-timeout") store.tasks[tk.ID] = tk exec := &storage.Execution{ID: "e4", TaskID: tk.ID, Status: "RUNNING"} ctx, cancel := context.WithCancel(context.Background()) cancel() // make ctx.Err() == context.Canceled // Simulate deadline exceeded by using a deadline-exceeded context. dctx, dcancel := context.WithDeadline(context.Background(), time.Now().Add(-time.Second)) defer dcancel() pool.handleRunResult(dctx, tk, exec, context.DeadlineExceeded, "claude") if exec.Status != "TIMED_OUT" { t.Errorf("exec.Status: want TIMED_OUT, got %q", exec.Status) } _ = ctx <-pool.resultCh }) } // TestPool_LoadBalancing_OverridesAgentType verifies that load balancing picks // from registered runners, overriding any pre-set Agent.Type on the task. func TestPool_LoadBalancing_OverridesAgentType(t *testing.T) { store := testStore(t) runner := &mockRunner{} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) // Task has a non-existent agent type; load balancing should route to "claude". tk := makeTask("lb-override") tk.Agent.Type = "super-ai" store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } result := <-pool.Results() if result.Err != nil { t.Fatalf("expected success (load balancing overrides agent type), got: %v", result.Err) } if runner.callCount() != 1 { t.Errorf("expected claude runner to be called once, got %d", runner.callCount()) } } // TestPool_SpecificAgent_SkipsLoadBalancing verifies that if a specific // registered agent is requested (claude or gemini), it is used directly // and load balancing (pickAgent) is skipped. func TestPool_SpecificAgent_SkipsLoadBalancing(t *testing.T) { store := testStore(t) claudeRunner := &mockRunner{} geminiRunner := &mockRunner{} runners := map[string]Runner{ "claude": claudeRunner, "gemini": geminiRunner, } logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(4, runners, store, logger) // Inject 2 active tasks for gemini, 0 for claude. // pickAgent would normally pick "claude". pool.mu.Lock() pool.activePerAgent["gemini"] = 2 pool.mu.Unlock() tk := makeTask("specific-gemini") tk.Agent.Type = "gemini" store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } <-pool.Results() if geminiRunner.callCount() != 1 { t.Errorf("expected gemini runner to be called once, got %d", geminiRunner.callCount()) } if claudeRunner.callCount() != 0 { t.Errorf("expected claude runner to NOT be called, got %d", claudeRunner.callCount()) } } // TestPool_SpecificAgent_PersistsToDB verifies that if a specific agent // is requested, it is persisted to the database before the task runs. func TestPool_SpecificAgent_PersistsToDB(t *testing.T) { store := testStore(t) geminiRunner := &mockRunner{} runners := map[string]Runner{ "gemini": geminiRunner, } logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(4, runners, store, logger) tk := makeTask("persist-gemini") tk.Agent.Type = "gemini" store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } <-pool.Results() // Check the task in the database. reloaded, err := store.GetTask(tk.ID) if err != nil { t.Fatalf("get task: %v", err) } if reloaded.Agent.Type != "gemini" { t.Errorf("expected agent type gemini in DB, got %q", reloaded.Agent.Type) } }