package executor import ( "context" "fmt" "log/slog" "os" "path/filepath" "sync" "testing" "time" "github.com/thepeterstone/claudomator/internal/storage" "github.com/thepeterstone/claudomator/internal/task" ) // mockRunner implements Runner for testing. type mockRunner struct { mu sync.Mutex calls int delay time.Duration err error exitCode int } func (m *mockRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error { m.mu.Lock() m.calls++ m.mu.Unlock() if m.delay > 0 { select { case <-time.After(m.delay): case <-ctx.Done(): return ctx.Err() } } if m.err != nil { e.ExitCode = m.exitCode return m.err } return nil } func (m *mockRunner) callCount() int { m.mu.Lock() defer m.mu.Unlock() return m.calls } func testStore(t *testing.T) *storage.DB { t.Helper() dbPath := filepath.Join(t.TempDir(), "test.db") db, err := storage.Open(dbPath) if err != nil { t.Fatal(err) } t.Cleanup(func() { db.Close() }) return db } func makeTask(id string) *task.Task { now := time.Now().UTC() return &task.Task{ ID: id, Name: "Test " + id, Claude: task.ClaudeConfig{Instructions: "test"}, Priority: task.PriorityNormal, Retry: task.RetryConfig{MaxAttempts: 1, Backoff: "linear"}, Tags: []string{}, DependsOn: []string{}, State: task.StateQueued, CreatedAt: now, UpdatedAt: now, } } func TestPool_Submit_TopLevel_GoesToReady(t *testing.T) { store := testStore(t) runner := &mockRunner{} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runner, store, logger) tk := makeTask("ps-1") // no ParentTaskID → top-level store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } result := <-pool.Results() if result.Err != nil { t.Errorf("expected no error, got: %v", result.Err) } if result.Execution.Status != "READY" { t.Errorf("status: want READY, got %q", result.Execution.Status) } got, _ := store.GetTask("ps-1") if got.State != task.StateReady { t.Errorf("task state: want READY, got %v", got.State) } } func TestPool_Submit_Subtask_GoesToCompleted(t *testing.T) { store := testStore(t) runner := &mockRunner{} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runner, store, logger) tk := makeTask("sub-1") tk.ParentTaskID = "parent-99" // subtask store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit: %v", err) } result := <-pool.Results() if result.Err != nil { t.Errorf("expected no error, got: %v", result.Err) } if result.Execution.Status != "COMPLETED" { t.Errorf("status: want COMPLETED, got %q", result.Execution.Status) } got, _ := store.GetTask("sub-1") if got.State != task.StateCompleted { t.Errorf("task state: want COMPLETED, got %v", got.State) } } func TestPool_Submit_Failure(t *testing.T) { store := testStore(t) runner := &mockRunner{err: fmt.Errorf("boom"), exitCode: 1} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runner, store, logger) tk := makeTask("pf-1") store.CreateTask(tk) pool.Submit(context.Background(), tk) result := <-pool.Results() if result.Err == nil { t.Fatal("expected error") } if result.Execution.Status != "FAILED" { t.Errorf("status: want FAILED, got %q", result.Execution.Status) } } func TestPool_Submit_Timeout(t *testing.T) { store := testStore(t) runner := &mockRunner{delay: 5 * time.Second} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runner, store, logger) tk := makeTask("pt-1") tk.Timeout.Duration = 50 * time.Millisecond store.CreateTask(tk) pool.Submit(context.Background(), tk) result := <-pool.Results() if result.Execution.Status != "TIMED_OUT" { t.Errorf("status: want TIMED_OUT, got %q", result.Execution.Status) } } func TestPool_Submit_Cancellation(t *testing.T) { store := testStore(t) runner := &mockRunner{delay: 5 * time.Second} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runner, store, logger) ctx, cancel := context.WithCancel(context.Background()) tk := makeTask("pc-1") store.CreateTask(tk) pool.Submit(ctx, tk) time.Sleep(20 * time.Millisecond) cancel() result := <-pool.Results() if result.Execution.Status != "CANCELLED" { t.Errorf("status: want CANCELLED, got %q", result.Execution.Status) } } func TestPool_AtCapacity(t *testing.T) { store := testStore(t) runner := &mockRunner{delay: time.Second} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(1, runner, store, logger) tk1 := makeTask("cap-1") store.CreateTask(tk1) pool.Submit(context.Background(), tk1) // Pool is at capacity, second submit should fail. time.Sleep(10 * time.Millisecond) // let goroutine start tk2 := makeTask("cap-2") store.CreateTask(tk2) err := pool.Submit(context.Background(), tk2) if err == nil { t.Fatal("expected capacity error") } <-pool.Results() // drain } func TestPool_ConcurrentExecution(t *testing.T) { store := testStore(t) runner := &mockRunner{delay: 50 * time.Millisecond} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(3, runner, store, logger) for i := 0; i < 3; i++ { tk := makeTask(fmt.Sprintf("cc-%d", i)) store.CreateTask(tk) if err := pool.Submit(context.Background(), tk); err != nil { t.Fatalf("submit %d: %v", i, err) } } for i := 0; i < 3; i++ { result := <-pool.Results() if result.Execution.Status != "READY" { t.Errorf("task %s: want READY, got %q", result.TaskID, result.Execution.Status) } } if runner.callCount() != 3 { t.Errorf("calls: want 3, got %d", runner.callCount()) } }