package executor import ( "context" "fmt" "log/slog" "os" "path/filepath" "strings" "sync" "testing" "time" "github.com/thepeterstone/claudomator/internal/storage" "github.com/thepeterstone/claudomator/internal/task" ) // mockRunner implements Runner for testing. type mockRunner struct { mu sync.Mutex calls int delay time.Duration err error exitCode int onRun func(*task.Task, *storage.Execution) error } func (m *mockRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error { m.mu.Lock() m.calls++ cb := m.onRun m.mu.Unlock() if m.delay > 0 { select { case <-time.After(m.delay): case <-ctx.Done(): return ctx.Err() } } if cb != nil { return cb(t, e) } if m.err != nil { e.ExitCode = m.exitCode return m.err } return nil } func (m *mockRunner) callCount() int { m.mu.Lock() defer m.mu.Unlock() return m.calls } func testStore(t *testing.T) *storage.DB { t.Helper() dbPath := filepath.Join(t.TempDir(), "test.db") db, err := storage.Open(dbPath) if err != nil { t.Fatal(err) } t.Cleanup(func() { db.Close() }) return db } func makeTask(id string) *task.Task { now := time.Now().UTC() return &task.Task{ ID: id, Name: "Test " + id, Agent: task.AgentConfig{Type: "claude", Instructions: "test"}, Priority: task.PriorityNormal, Retry: task.RetryConfig{MaxAttempts: 1, Backoff: "linear"}, Tags: []string{}, DependsOn: []string{}, State: task.StateQueued, CreatedAt: now, UpdatedAt: now, } } func TestPool_Submit_TopLevel_GoesToReady(t *testing.T) { store := testStore(t) runner := &mockRunner{} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("ps-1") // no ParentTaskID → top-level store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } result := <-pool.Results() if result.Err != nil { t.Errorf("expected no error, got: %v", result.Err) } if result.Execution.Status != "READY" { t.Errorf("status: want READY, got %q", result.Execution.Status) } got, _ := store.GetTask("ps-1") if got.State != task.StateReady { t.Errorf("task state: want READY, got %v", got.State) } } func TestPool_Submit_Subtask_GoesToCompleted(t *testing.T) { store := testStore(t) runner := &mockRunner{} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("sub-1") tk.ParentTaskID = "parent-99" // subtask store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } result := <-pool.Results() if result.Err != nil { t.Errorf("expected no error, got: %v", result.Err) } if result.Execution.Status != "COMPLETED" { t.Errorf("status: want COMPLETED, got %q", result.Execution.Status) } got, _ := store.GetTask("sub-1") if got.State != task.StateCompleted { t.Errorf("task state: want COMPLETED, got %v", got.State) } } func TestPool_Submit_Failure(t *testing.T) { store := testStore(t) runner := &mockRunner{err: fmt.Errorf("boom"), exitCode: 1} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("pf-1") store.CreateTask(tk) pool.Submit(context.Background(), tk) result := <-pool.Results() if result.Err == nil { t.Fatal("expected error") } if result.Execution.Status != "FAILED" { t.Errorf("status: want FAILED, got %q", result.Execution.Status) } } func TestPool_Submit_Timeout(t *testing.T) { store := testStore(t) runner := &mockRunner{delay: 5 * time.Second} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("pt-1") tk.Timeout.Duration = 50 * time.Millisecond store.CreateTask(tk) pool.Submit(context.Background(), tk) result := <-pool.Results() if result.Execution.Status != "TIMED_OUT" { t.Errorf("status: want TIMED_OUT, got %q", result.Execution.Status) } } func TestPool_Submit_Cancellation(t *testing.T) { store := testStore(t) runner := &mockRunner{delay: 5 * time.Second} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) ctx, cancel := context.WithCancel(context.Background()) tk := makeTask("pc-1") store.CreateTask(tk) pool.Submit(ctx, tk) time.Sleep(20 * time.Millisecond) cancel() result := <-pool.Results() if result.Execution.Status != "CANCELLED" { t.Errorf("status: want CANCELLED, got %q", result.Execution.Status) } } func TestPool_Cancel_StopsRunningTask(t *testing.T) { store := testStore(t) runner := &mockRunner{delay: 5 * time.Second} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("cancel-1") store.CreateTask(tk) pool.Submit(context.Background(), tk) time.Sleep(20 * time.Millisecond) // let goroutine start if ok := pool.Cancel("cancel-1"); !ok { t.Fatal("Cancel returned false for a running task") } result := <-pool.Results() if result.Execution.Status != "CANCELLED" { t.Errorf("status: want CANCELLED, got %q", result.Execution.Status) } } func TestPool_Cancel_UnknownTask_ReturnsFalse(t *testing.T) { store := testStore(t) runner := &mockRunner{} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) if ok := pool.Cancel("nonexistent"); ok { t.Error("Cancel returned true for unknown task") } } // TestPool_QueuedWhenAtCapacity verifies that Submit enqueues a task rather than // returning an error when the pool is at capacity. Both tasks should eventually complete. func TestPool_QueuedWhenAtCapacity(t *testing.T) { store := testStore(t) runner := &mockRunner{delay: 100 * time.Millisecond} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(1, runners, store, logger) tk1 := makeTask("queue-1") store.CreateTask(tk1) if err := pool.Submit(context.Background(), tk1); err != nil { t.Fatalf("first submit: %v", err) } // Second submit must succeed (queued) even though pool slot is taken. tk2 := makeTask("queue-2") store.CreateTask(tk2) if err := pool.Submit(context.Background(), tk2); err != nil { t.Fatalf("second submit: %v — expected task to be queued, not rejected", err) } // Both tasks must complete. for i := 0; i < 2; i++ { r := <-pool.Results() if r.Err != nil { t.Errorf("task %s error: %v", r.TaskID, r.Err) } } } // logPatherMockRunner is a mockRunner that also implements LogPather, // and captures the StdoutPath seen when Run() is called. type logPatherMockRunner struct { mockRunner logDir string capturedPath string } func (m *logPatherMockRunner) ExecLogDir(execID string) string { return filepath.Join(m.logDir, execID) } func (m *logPatherMockRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error { m.mu.Lock() m.capturedPath = e.StdoutPath m.mu.Unlock() return m.mockRunner.Run(ctx, t, e) } // TestPool_Execute_LogPathsPreSetBeforeRun verifies that when the runner // implements LogPather, log paths are set on the execution before Run() is // called — so they land in the DB at CreateExecution time, not just at // UpdateExecution time. func TestPool_Execute_LogPathsPreSetBeforeRun(t *testing.T) { store := testStore(t) runner := &logPatherMockRunner{logDir: t.TempDir()} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("lp-1") store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } result := <-pool.Results() runner.mu.Lock() captured := runner.capturedPath runner.mu.Unlock() if captured == "" { t.Fatal("StdoutPath was empty when Run() was called; expected pre-set path") } if !strings.HasSuffix(captured, "stdout.log") { t.Errorf("expected stdout.log suffix, got: %s", captured) } // Path in the returned execution record should match. if result.Execution.StdoutPath != captured { t.Errorf("execution StdoutPath %q != captured %q", result.Execution.StdoutPath, captured) } } // TestPool_Execute_NoLogPather_PathsEmptyBeforeRun verifies that a runner // without LogPather doesn't panic and paths remain empty until Run() sets them. func TestPool_Execute_NoLogPather_PathsEmptyBeforeRun(t *testing.T) { store := testStore(t) runner := &mockRunner{} // does NOT implement LogPather runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("nolp-1") store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } result := <-pool.Results() if result.Err != nil { t.Fatalf("unexpected error: %v", result.Err) } } func TestPool_ConcurrentExecution(t *testing.T) { store := testStore(t) runner := &mockRunner{delay: 50 * time.Millisecond} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(3, runners, store, logger) for i := 0; i < 3; i++ { tk := makeTask(fmt.Sprintf("cc-%d", i)) store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit %d: %v", i, err) } } for i := 0; i < 3; i++ { result := <-pool.Results() if result.Execution.Status != "READY" { t.Errorf("task %s: want READY, got %q", result.TaskID, result.Execution.Status) } } if runner.callCount() != 3 { t.Errorf("calls: want 3, got %d", runner.callCount()) } } func TestWithFailureHistory_NoFailures_ReturnsUnchanged(t *testing.T) { tk := makeTask("no-fail") result := withFailureHistory(tk, nil, nil) if result != tk { t.Error("expected same pointer when no prior executions") } } func TestWithFailureHistory_WithError_ReturnsUnchanged(t *testing.T) { tk := makeTask("err-case") result := withFailureHistory(tk, nil, fmt.Errorf("db error")) if result != tk { t.Error("expected same pointer when ListExecutions errors") } } func TestWithFailureHistory_InjectsFailedHistory(t *testing.T) { tk := makeTask("with-fail") tk.Agent.Instructions = "do the work" execs := []*storage.Execution{ {ID: "e1", Status: "FAILED", ErrorMsg: "sandbox: uncommitted changes", StartTime: time.Now()}, {ID: "e2", Status: "COMPLETED", ErrorMsg: "", StartTime: time.Now()}, // not a failure, should be ignored } result := withFailureHistory(tk, execs, nil) if result == tk { t.Fatal("expected a new task copy, got same pointer") } if !strings.Contains(result.Agent.SystemPromptAppend, "Prior Attempt History") { t.Errorf("expected history header in SystemPromptAppend, got: %q", result.Agent.SystemPromptAppend) } if !strings.Contains(result.Agent.SystemPromptAppend, "sandbox: uncommitted changes") { t.Errorf("expected error message in SystemPromptAppend, got: %q", result.Agent.SystemPromptAppend) } // COMPLETED execution should not appear if strings.Contains(result.Agent.SystemPromptAppend, "e2") { t.Errorf("COMPLETED execution should not appear in history") } // Original instructions unchanged if result.Agent.Instructions != "do the work" { t.Errorf("instructions should be unchanged, got: %q", result.Agent.Instructions) } } func TestWithFailureHistory_PreservesExistingSystemPrompt(t *testing.T) { tk := makeTask("with-prompt") tk.Agent.SystemPromptAppend = "existing prompt" execs := []*storage.Execution{ {ID: "e1", Status: "FAILED", ErrorMsg: "some error", StartTime: time.Now()}, } result := withFailureHistory(tk, execs, nil) if !strings.Contains(result.Agent.SystemPromptAppend, "Prior Attempt History") { t.Error("expected history prepended") } if !strings.Contains(result.Agent.SystemPromptAppend, "existing prompt") { t.Error("expected existing prompt preserved") } // History must come BEFORE the existing prompt histIdx := strings.Index(result.Agent.SystemPromptAppend, "Prior Attempt History") existIdx := strings.Index(result.Agent.SystemPromptAppend, "existing prompt") if histIdx > existIdx { t.Error("history should be prepended before existing system prompt") } } func TestPool_FailureHistoryInjectedOnRetry(t *testing.T) { store := testStore(t) var capturedPrompt string runner := &mockRunner{} runner.onRun = func(t *task.Task, _ *storage.Execution) error { capturedPrompt = t.Agent.SystemPromptAppend return nil } runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("retry-hist") store.CreateTask(tk) // Simulate a prior failed execution. store.CreateExecution(&storage.Execution{ ID: "prior-exec", TaskID: tk.ID, StartTime: time.Now(), EndTime: time.Now(), Status: "FAILED", ErrorMsg: "prior failure reason", }) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } <-pool.Results() if !strings.Contains(capturedPrompt, "prior failure reason") { t.Errorf("expected prior failure in system prompt, got: %q", capturedPrompt) } } func TestPool_UnsupportedAgent(t *testing.T) { store := testStore(t) runners := map[string]Runner{"claude": &mockRunner{}} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) tk := makeTask("bad-agent") tk.Agent.Type = "super-ai" store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } result := <-pool.Results() if result.Err == nil { t.Fatal("expected error for unsupported agent") } if !strings.Contains(result.Err.Error(), "unsupported agent type") { t.Errorf("expected 'unsupported agent type' in error, got: %v", result.Err) } if result.Execution.Status != "FAILED" { t.Errorf("status: want FAILED, got %q", result.Execution.Status) } }