package executor import ( "context" "fmt" "log/slog" "os" "path/filepath" "strings" "sync" "testing" "time" "github.com/thepeterstone/claudomator/internal/storage" "github.com/thepeterstone/claudomator/internal/task" ) // capturingHandler is a slog.Handler that records log records for assertions. type capturingHandler struct { mu sync.Mutex records []slog.Record } func (h *capturingHandler) Enabled(_ context.Context, _ slog.Level) bool { return true } func (h *capturingHandler) Handle(_ context.Context, r slog.Record) error { h.mu.Lock() defer h.mu.Unlock() h.records = append(h.records, r) return nil } func (h *capturingHandler) WithAttrs(attrs []slog.Attr) slog.Handler { return h } func (h *capturingHandler) WithGroup(name string) slog.Handler { return h } func (h *capturingHandler) hasMessageContaining(substr string) bool { h.mu.Lock() defer h.mu.Unlock() for _, r := range h.records { if strings.Contains(r.Message, substr) { return true } } return false } // failingStore wraps a real DB but returns an error for UpdateTaskState calls. type failingStore struct { *storage.DB updateStateErr error } func (f *failingStore) UpdateTaskState(id string, newState task.State) error { return f.updateStateErr } // mockRunner implements Runner for testing. type mockRunner struct { mu sync.Mutex calls int delay time.Duration err error exitCode int onRun func(*task.Task, *storage.Execution) error } func (m *mockRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error { m.mu.Lock() m.calls++ cb := m.onRun m.mu.Unlock() if m.delay > 0 { select { case <-time.After(m.delay): case <-ctx.Done(): return ctx.Err() } } if cb != nil { return cb(t, e) } if m.err != nil { e.ExitCode = m.exitCode return m.err } return nil } func (m *mockRunner) callCount() int { m.mu.Lock() defer m.mu.Unlock() return m.calls } func testStore(t *testing.T) *storage.DB { t.Helper() dbPath := filepath.Join(t.TempDir(), "test.db") db, err := storage.Open(dbPath) if err != nil { t.Fatal(err) } t.Cleanup(func() { db.Close() }) return db } func makeTask(id string) *task.Task { now := time.Now().UTC() return &task.Task{ ID: id, Name: "Test " + id, Agent: task.AgentConfig{Type: "claude", Instructions: "test"}, Priority: task.PriorityNormal, Retry: task.RetryConfig{MaxAttempts: 1, Backoff: "linear"}, Tags: []string{}, DependsOn: []string{}, State: task.StateQueued, CreatedAt: now, UpdatedAt: now, } } func TestPool_Submit_TopLevel_GoesToReady(t *testing.T) { store := testStore(t) runner := &mockRunner{} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("ps-1") // no ParentTaskID → top-level store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } result := <-pool.Results() if result.Err != nil { t.Errorf("expected no error, got: %v", result.Err) } if result.Execution.Status != "READY" { t.Errorf("status: want READY, got %q", result.Execution.Status) } got, _ := store.GetTask("ps-1") if got.State != task.StateReady { t.Errorf("task state: want READY, got %v", got.State) } } func TestPool_Submit_Subtask_GoesToCompleted(t *testing.T) { store := testStore(t) runner := &mockRunner{} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("sub-1") tk.ParentTaskID = "parent-99" // subtask store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } result := <-pool.Results() if result.Err != nil { t.Errorf("expected no error, got: %v", result.Err) } if result.Execution.Status != "COMPLETED" { t.Errorf("status: want COMPLETED, got %q", result.Execution.Status) } got, _ := store.GetTask("sub-1") if got.State != task.StateCompleted { t.Errorf("task state: want COMPLETED, got %v", got.State) } } func TestPool_Submit_Failure(t *testing.T) { store := testStore(t) runner := &mockRunner{err: fmt.Errorf("boom"), exitCode: 1} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("pf-1") store.CreateTask(tk) pool.Submit(context.Background(), tk) result := <-pool.Results() if result.Err == nil { t.Fatal("expected error") } if result.Execution.Status != "FAILED" { t.Errorf("status: want FAILED, got %q", result.Execution.Status) } } // TestPool_UpdateTaskState_DBError_IsLoggedAndResultDelivered verifies that // when UpdateTaskState returns an error, the error is logged with structured // context (taskID, state) and the execution result is still sent to resultCh. func TestPool_UpdateTaskState_DBError_IsLoggedAndResultDelivered(t *testing.T) { db := testStore(t) store := &failingStore{DB: db, updateStateErr: fmt.Errorf("db write failed")} handler := &capturingHandler{} logger := slog.New(handler) runner := &mockRunner{err: fmt.Errorf("runner error")} runners := map[string]Runner{"claude": runner} pool := NewPool(2, runners, store, logger) tk := makeTask("dberr-1") db.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } select { case result := <-pool.Results(): // Result must still arrive despite the DB error. if result == nil { t.Fatal("expected non-nil result") } if result.TaskID != tk.ID { t.Errorf("taskID: want %q, got %q", tk.ID, result.TaskID) } case <-time.After(5 * time.Second): t.Fatal("timed out waiting for result — result not delivered despite DB error") } if !handler.hasMessageContaining("failed to update task state") { t.Error("expected 'failed to update task state' log entry, but none found") } } func TestPool_Submit_Timeout(t *testing.T) { store := testStore(t) runner := &mockRunner{delay: 5 * time.Second} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("pt-1") tk.Timeout.Duration = 50 * time.Millisecond store.CreateTask(tk) pool.Submit(context.Background(), tk) result := <-pool.Results() if result.Execution.Status != "TIMED_OUT" { t.Errorf("status: want TIMED_OUT, got %q", result.Execution.Status) } } func TestPool_Submit_Cancellation(t *testing.T) { store := testStore(t) runner := &mockRunner{delay: 5 * time.Second} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) ctx, cancel := context.WithCancel(context.Background()) tk := makeTask("pc-1") store.CreateTask(tk) pool.Submit(ctx, tk) time.Sleep(20 * time.Millisecond) cancel() result := <-pool.Results() if result.Execution.Status != "CANCELLED" { t.Errorf("status: want CANCELLED, got %q", result.Execution.Status) } } func TestPool_Cancel_StopsRunningTask(t *testing.T) { store := testStore(t) runner := &mockRunner{delay: 5 * time.Second} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("cancel-1") store.CreateTask(tk) pool.Submit(context.Background(), tk) time.Sleep(20 * time.Millisecond) // let goroutine start if ok := pool.Cancel("cancel-1"); !ok { t.Fatal("Cancel returned false for a running task") } result := <-pool.Results() if result.Execution.Status != "CANCELLED" { t.Errorf("status: want CANCELLED, got %q", result.Execution.Status) } } func TestPool_Cancel_UnknownTask_ReturnsFalse(t *testing.T) { store := testStore(t) runner := &mockRunner{} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) if ok := pool.Cancel("nonexistent"); ok { t.Error("Cancel returned true for unknown task") } } // TestPool_QueuedWhenAtCapacity verifies that Submit enqueues a task rather than // returning an error when the pool is at capacity. Both tasks should eventually complete. func TestPool_QueuedWhenAtCapacity(t *testing.T) { store := testStore(t) runner := &mockRunner{delay: 100 * time.Millisecond} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(1, runners, store, logger) tk1 := makeTask("queue-1") store.CreateTask(tk1) if err := pool.Submit(context.Background(), tk1); err != nil { t.Fatalf("first submit: %v", err) } // Second submit must succeed (queued) even though pool slot is taken. tk2 := makeTask("queue-2") store.CreateTask(tk2) if err := pool.Submit(context.Background(), tk2); err != nil { t.Fatalf("second submit: %v — expected task to be queued, not rejected", err) } // Both tasks must complete. for i := 0; i < 2; i++ { r := <-pool.Results() if r.Err != nil { t.Errorf("task %s error: %v", r.TaskID, r.Err) } } } // logPatherMockRunner is a mockRunner that also implements LogPather, // and captures the StdoutPath seen when Run() is called. type logPatherMockRunner struct { mockRunner logDir string capturedPath string } func (m *logPatherMockRunner) ExecLogDir(execID string) string { return filepath.Join(m.logDir, execID) } func (m *logPatherMockRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error { m.mu.Lock() m.capturedPath = e.StdoutPath m.mu.Unlock() return m.mockRunner.Run(ctx, t, e) } // TestPool_Execute_LogPathsPreSetBeforeRun verifies that when the runner // implements LogPather, log paths are set on the execution before Run() is // called — so they land in the DB at CreateExecution time, not just at // UpdateExecution time. func TestPool_Execute_LogPathsPreSetBeforeRun(t *testing.T) { store := testStore(t) runner := &logPatherMockRunner{logDir: t.TempDir()} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("lp-1") store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } result := <-pool.Results() runner.mu.Lock() captured := runner.capturedPath runner.mu.Unlock() if captured == "" { t.Fatal("StdoutPath was empty when Run() was called; expected pre-set path") } if !strings.HasSuffix(captured, "stdout.log") { t.Errorf("expected stdout.log suffix, got: %s", captured) } // Path in the returned execution record should match. if result.Execution.StdoutPath != captured { t.Errorf("execution StdoutPath %q != captured %q", result.Execution.StdoutPath, captured) } } // TestPool_Execute_NoLogPather_PathsEmptyBeforeRun verifies that a runner // without LogPather doesn't panic and paths remain empty until Run() sets them. func TestPool_Execute_NoLogPather_PathsEmptyBeforeRun(t *testing.T) { store := testStore(t) runner := &mockRunner{} // does NOT implement LogPather runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("nolp-1") store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } result := <-pool.Results() if result.Err != nil { t.Fatalf("unexpected error: %v", result.Err) } } func TestPool_ConcurrentExecution(t *testing.T) { store := testStore(t) runner := &mockRunner{delay: 50 * time.Millisecond} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(3, runners, store, logger) for i := 0; i < 3; i++ { tk := makeTask(fmt.Sprintf("cc-%d", i)) store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit %d: %v", i, err) } } for i := 0; i < 3; i++ { result := <-pool.Results() if result.Execution.Status != "READY" { t.Errorf("task %s: want READY, got %q", result.TaskID, result.Execution.Status) } } if runner.callCount() != 3 { t.Errorf("calls: want 3, got %d", runner.callCount()) } } func TestWithFailureHistory_NoFailures_ReturnsUnchanged(t *testing.T) { tk := makeTask("no-fail") result := withFailureHistory(tk, nil, nil) if result != tk { t.Error("expected same pointer when no prior executions") } } func TestWithFailureHistory_WithError_ReturnsUnchanged(t *testing.T) { tk := makeTask("err-case") result := withFailureHistory(tk, nil, fmt.Errorf("db error")) if result != tk { t.Error("expected same pointer when ListExecutions errors") } } func TestWithFailureHistory_InjectsFailedHistory(t *testing.T) { tk := makeTask("with-fail") tk.Agent.Instructions = "do the work" execs := []*storage.Execution{ {ID: "e1", Status: "FAILED", ErrorMsg: "sandbox: uncommitted changes", StartTime: time.Now()}, {ID: "e2", Status: "COMPLETED", ErrorMsg: "", StartTime: time.Now()}, // not a failure, should be ignored } result := withFailureHistory(tk, execs, nil) if result == tk { t.Fatal("expected a new task copy, got same pointer") } if !strings.Contains(result.Agent.SystemPromptAppend, "Prior Attempt History") { t.Errorf("expected history header in SystemPromptAppend, got: %q", result.Agent.SystemPromptAppend) } if !strings.Contains(result.Agent.SystemPromptAppend, "sandbox: uncommitted changes") { t.Errorf("expected error message in SystemPromptAppend, got: %q", result.Agent.SystemPromptAppend) } // COMPLETED execution should not appear if strings.Contains(result.Agent.SystemPromptAppend, "e2") { t.Errorf("COMPLETED execution should not appear in history") } // Original instructions unchanged if result.Agent.Instructions != "do the work" { t.Errorf("instructions should be unchanged, got: %q", result.Agent.Instructions) } } func TestWithFailureHistory_PreservesExistingSystemPrompt(t *testing.T) { tk := makeTask("with-prompt") tk.Agent.SystemPromptAppend = "existing prompt" execs := []*storage.Execution{ {ID: "e1", Status: "FAILED", ErrorMsg: "some error", StartTime: time.Now()}, } result := withFailureHistory(tk, execs, nil) if !strings.Contains(result.Agent.SystemPromptAppend, "Prior Attempt History") { t.Error("expected history prepended") } if !strings.Contains(result.Agent.SystemPromptAppend, "existing prompt") { t.Error("expected existing prompt preserved") } // History must come BEFORE the existing prompt histIdx := strings.Index(result.Agent.SystemPromptAppend, "Prior Attempt History") existIdx := strings.Index(result.Agent.SystemPromptAppend, "existing prompt") if histIdx > existIdx { t.Error("history should be prepended before existing system prompt") } } func TestPool_FailureHistoryInjectedOnRetry(t *testing.T) { store := testStore(t) var capturedPrompt string runner := &mockRunner{} runner.onRun = func(t *task.Task, _ *storage.Execution) error { capturedPrompt = t.Agent.SystemPromptAppend return nil } runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("retry-hist") store.CreateTask(tk) // Simulate a prior failed execution. store.CreateExecution(&storage.Execution{ ID: "prior-exec", TaskID: tk.ID, StartTime: time.Now(), EndTime: time.Now(), Status: "FAILED", ErrorMsg: "prior failure reason", }) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } <-pool.Results() if !strings.Contains(capturedPrompt, "prior failure reason") { t.Errorf("expected prior failure in system prompt, got: %q", capturedPrompt) } } func TestPool_RecoverStaleRunning(t *testing.T) { store := testStore(t) logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger) // Create a task already in RUNNING state (simulating a crashed server). tk := makeTask("stale-1") tk.State = task.StateRunning store.CreateTask(tk) // Add an open execution record (no end time, status RUNNING). store.CreateExecution(&storage.Execution{ ID: "exec-stale-1", TaskID: tk.ID, StartTime: time.Now().Add(-5 * time.Minute), Status: "RUNNING", }) pool.RecoverStaleRunning() recovered, err := store.GetTask(tk.ID) if err != nil { t.Fatalf("get task: %v", err) } if recovered.State != task.StateFailed { t.Errorf("state: want FAILED, got %q", recovered.State) } execs, _ := store.ListExecutions(tk.ID) if len(execs) == 0 || execs[0].Status != "FAILED" { t.Errorf("execution status: want FAILED, got %+v", execs) } if execs[0].ErrorMsg == "" { t.Error("expected non-empty error message on recovered execution") } } func TestPool_ActivePerAgent_DeletesZeroEntries(t *testing.T) { store := testStore(t) runner := &mockRunner{} runners := map[string]Runner{"claude": runner, "gemini": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("apa-1") store.CreateTask(tk) // Agent.Type = "claude" pool.Submit(context.Background(), tk) <-pool.Results() pool.mu.Lock() _, exists := pool.activePerAgent["claude"] pool.mu.Unlock() if exists { t.Error("activePerAgent should not have a zero-count entry for claude after task completes") } } func TestPool_RateLimited_StaleEntryCleaned(t *testing.T) { store := testStore(t) runner := &mockRunner{} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) // Inject a stale rate-limit entry (deadline already passed). pool.mu.Lock() pool.rateLimited["claude"] = time.Now().Add(-1 * time.Minute) pool.mu.Unlock() // Submit a task — the execute() path reads rateLimited during classification. tk := makeTask("rl-stale-1") store.CreateTask(tk) pool.Submit(context.Background(), tk) <-pool.Results() pool.mu.Lock() _, exists := pool.rateLimited["claude"] pool.mu.Unlock() if exists { t.Error("stale rate-limit entry should be deleted after deadline passes") } } // TestPool_Submit_TopLevel_NoSubtasks_GoesReady verifies that a top-level task // with no subtasks still transitions to READY after successful execution. func TestPool_Submit_TopLevel_NoSubtasks_GoesReady(t *testing.T) { store := testStore(t) runner := &mockRunner{} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("no-subtasks-1") // no ParentTaskID, no subtasks store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } result := <-pool.Results() if result.Err != nil { t.Errorf("expected no error, got: %v", result.Err) } if result.Execution.Status != "READY" { t.Errorf("status: want READY, got %q", result.Execution.Status) } got, _ := store.GetTask(tk.ID) if got.State != task.StateReady { t.Errorf("task state: want READY, got %v", got.State) } } // TestPool_Submit_TopLevel_WithSubtasks_GoesBlocked verifies that when a // top-level task finishes successfully but has subtasks, it transitions to // BLOCKED (waiting for subtasks) rather than READY. func TestPool_Submit_TopLevel_WithSubtasks_GoesBlocked(t *testing.T) { store := testStore(t) runner := &mockRunner{} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) parent := makeTask("parent-with-subtasks") store.CreateTask(parent) // Create a subtask in the store but do NOT submit it. sub := makeTask("sub-of-parent") sub.ParentTaskID = parent.ID store.CreateTask(sub) if err := pool.Submit(context.Background(), parent); err != nil { t.Fatalf("submit: %v", err) } result := <-pool.Results() if result.Err != nil { t.Errorf("expected no error, got: %v", result.Err) } if result.Execution.Status != "BLOCKED" { t.Errorf("status: want BLOCKED, got %q", result.Execution.Status) } got, _ := store.GetTask(parent.ID) if got.State != task.StateBlocked { t.Errorf("task state: want BLOCKED, got %v", got.State) } } func TestPool_UnsupportedAgent(t *testing.T) { store := testStore(t) runners := map[string]Runner{"claude": &mockRunner{}} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("bad-agent") tk.Agent.Type = "super-ai" store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } result := <-pool.Results() if result.Err == nil { t.Fatal("expected error for unsupported agent") } if !strings.Contains(result.Err.Error(), "unsupported agent type") { t.Errorf("expected 'unsupported agent type' in error, got: %v", result.Err) } if result.Execution.Status != "FAILED" { t.Errorf("status: want FAILED, got %q", result.Execution.Status) } }