summaryrefslogtreecommitdiff
path: root/internal/executor/executor_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/executor/executor_test.go')
-rw-r--r--internal/executor/executor_test.go263
1 files changed, 252 insertions, 11 deletions
diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go
index 91d0137..ac09cfc 100644
--- a/internal/executor/executor_test.go
+++ b/internal/executor/executor_test.go
@@ -1071,17 +1071,20 @@ func (m *minimalMockStore) lastStateUpdate() (string, task.State, bool) {
func newPoolWithMockStore(store Store) *Pool {
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
return &Pool{
- maxConcurrent: 2,
- runners: map[string]Runner{"claude": &mockRunner{}},
- store: store,
- logger: logger,
- activePerAgent: make(map[string]int),
- rateLimited: make(map[string]time.Time),
- cancels: make(map[string]context.CancelFunc),
- resultCh: make(chan *Result, 4),
- workCh: make(chan workItem, 4),
- doneCh: make(chan struct{}, 2),
- Questions: NewQuestionRegistry(),
+ maxConcurrent: 2,
+ maxPerAgent: 1,
+ runners: map[string]Runner{"claude": &mockRunner{}},
+ store: store,
+ logger: logger,
+ activePerAgent: make(map[string]int),
+ rateLimited: make(map[string]time.Time),
+ cancels: make(map[string]context.CancelFunc),
+ consecutiveFailures: make(map[string]int),
+ drained: make(map[string]bool),
+ resultCh: make(chan *Result, 4),
+ workCh: make(chan workItem, 4),
+ doneCh: make(chan struct{}, 2),
+ Questions: NewQuestionRegistry(),
}
}
@@ -1418,3 +1421,241 @@ func TestExecute_MalformedChangestats(t *testing.T) {
t.Errorf("expected nil changestats for malformed output, got %+v", execs[0].Changestats)
}
}
+
+func TestPool_MaxPerAgent_BlocksSecondTask(t *testing.T) {
+ store := testStore(t)
+
+ var mu sync.Mutex
+ concurrentRuns := 0
+ maxConcurrent := 0
+
+ runner := &mockRunner{
+ delay: 100 * time.Millisecond,
+ onRun: func(tk *task.Task, e *storage.Execution) error {
+ mu.Lock()
+ concurrentRuns++
+ if concurrentRuns > maxConcurrent {
+ maxConcurrent = concurrentRuns
+ }
+ mu.Unlock()
+ time.Sleep(100 * time.Millisecond)
+ mu.Lock()
+ concurrentRuns--
+ mu.Unlock()
+ return nil
+ },
+ }
+ runners := map[string]Runner{"claude": runner}
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, runners, store, logger) // pool size 2, but maxPerAgent=1
+
+ tk1 := makeTask("mpa-1")
+ tk2 := makeTask("mpa-2")
+ store.CreateTask(tk1)
+ store.CreateTask(tk2)
+
+ pool.Submit(context.Background(), tk1)
+ pool.Submit(context.Background(), tk2)
+
+ for i := 0; i < 2; i++ {
+ select {
+ case <-pool.Results():
+ case <-time.After(10 * time.Second):
+ t.Fatal("timed out waiting for result")
+ }
+ }
+
+ mu.Lock()
+ got := maxConcurrent
+ mu.Unlock()
+ if got > 1 {
+ t.Errorf("maxPerAgent=1 violated: %d claude tasks ran concurrently", got)
+ }
+}
+
+func TestPool_MaxPerAgent_AllowsDifferentAgents(t *testing.T) {
+ store := testStore(t)
+
+ var mu sync.Mutex
+ concurrentRuns := 0
+ maxConcurrent := 0
+
+ makeSlowRunner := func() *mockRunner {
+ return &mockRunner{
+ onRun: func(tk *task.Task, e *storage.Execution) error {
+ mu.Lock()
+ concurrentRuns++
+ if concurrentRuns > maxConcurrent {
+ maxConcurrent = concurrentRuns
+ }
+ mu.Unlock()
+ time.Sleep(80 * time.Millisecond)
+ mu.Lock()
+ concurrentRuns--
+ mu.Unlock()
+ return nil
+ },
+ }
+ }
+
+ runners := map[string]Runner{
+ "claude": makeSlowRunner(),
+ "gemini": makeSlowRunner(),
+ }
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, runners, store, logger)
+
+ tk1 := makeTask("da-1")
+ tk1.Agent.Type = "claude"
+ tk2 := makeTask("da-2")
+ tk2.Agent.Type = "gemini"
+ store.CreateTask(tk1)
+ store.CreateTask(tk2)
+
+ pool.Submit(context.Background(), tk1)
+ pool.Submit(context.Background(), tk2)
+
+ for i := 0; i < 2; i++ {
+ select {
+ case <-pool.Results():
+ case <-time.After(5 * time.Second):
+ t.Fatal("timed out waiting for result")
+ }
+ }
+
+ mu.Lock()
+ got := maxConcurrent
+ mu.Unlock()
+ if got < 2 {
+ t.Errorf("different agents should run concurrently; max concurrent was %d", got)
+ }
+}
+
+func TestPool_ConsecutiveFailures_DrainAtTwo(t *testing.T) {
+ store := testStore(t)
+ runner := &mockRunner{err: fmt.Errorf("boom")}
+ runners := map[string]Runner{"claude": runner}
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, runners, store, logger)
+
+ // Submit two failing tasks
+ for _, id := range []string{"cf-1", "cf-2"} {
+ tk := makeTask(id)
+ store.CreateTask(tk)
+ pool.Submit(context.Background(), tk)
+ <-pool.Results() // drain
+ }
+
+ pool.mu.Lock()
+ drained := pool.drained["claude"]
+ failures := pool.consecutiveFailures["claude"]
+ pool.mu.Unlock()
+
+ if !drained {
+ t.Error("expected claude to be drained after 2 consecutive failures")
+ }
+ if failures < 2 {
+ t.Errorf("expected consecutiveFailures >= 2, got %d", failures)
+ }
+
+ // The second task should have a drain question set
+ tk2, err := store.GetTask("cf-2")
+ if err != nil {
+ t.Fatalf("GetTask: %v", err)
+ }
+ if tk2.QuestionJSON == "" {
+ t.Error("expected drain question to be set on task after drain")
+ }
+}
+
+func TestPool_ConsecutiveFailures_ResetOnSuccess(t *testing.T) {
+ store := testStore(t)
+
+ callCount := 0
+ runner := &mockRunner{
+ onRun: func(tk *task.Task, e *storage.Execution) error {
+ callCount++
+ if callCount == 1 {
+ return fmt.Errorf("first failure")
+ }
+ return nil // second call succeeds
+ },
+ }
+ runners := map[string]Runner{"claude": runner}
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, runners, store, logger)
+
+ // First task fails
+ tk1 := makeTask("rs-1")
+ store.CreateTask(tk1)
+ pool.Submit(context.Background(), tk1)
+ <-pool.Results()
+
+ pool.mu.Lock()
+ failsBefore := pool.consecutiveFailures["claude"]
+ pool.mu.Unlock()
+ if failsBefore != 1 {
+ t.Errorf("expected 1 failure after first task, got %d", failsBefore)
+ }
+
+ // Second task succeeds
+ tk2 := makeTask("rs-2")
+ store.CreateTask(tk2)
+ pool.Submit(context.Background(), tk2)
+ <-pool.Results()
+
+ pool.mu.Lock()
+ failsAfter := pool.consecutiveFailures["claude"]
+ isDrained := pool.drained["claude"]
+ pool.mu.Unlock()
+
+ if failsAfter != 0 {
+ t.Errorf("expected consecutiveFailures reset to 0 after success, got %d", failsAfter)
+ }
+ if isDrained {
+ t.Error("expected drained to be false after success")
+ }
+}
+
+func TestPool_Undrain_ResumesExecution(t *testing.T) {
+ store := testStore(t)
+
+ // Force drain state
+ runner := &mockRunner{}
+ runners := map[string]Runner{"claude": runner}
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, runners, store, logger)
+
+ pool.mu.Lock()
+ pool.drained["claude"] = true
+ pool.consecutiveFailures["claude"] = 3
+ pool.mu.Unlock()
+
+ // Undrain
+ pool.UndrainingAgent("claude")
+
+ pool.mu.Lock()
+ drained := pool.drained["claude"]
+ failures := pool.consecutiveFailures["claude"]
+ pool.mu.Unlock()
+
+ if drained {
+ t.Error("expected drained=false after UndrainingAgent")
+ }
+ if failures != 0 {
+ t.Errorf("expected consecutiveFailures=0 after UndrainingAgent, got %d", failures)
+ }
+
+ // Verify a task can now run
+ tk := makeTask("undrain-1")
+ store.CreateTask(tk)
+ pool.Submit(context.Background(), tk)
+ select {
+ case result := <-pool.Results():
+ if result.Err != nil {
+ t.Errorf("unexpected error after undrain: %v", result.Err)
+ }
+ case <-time.After(5 * time.Second):
+ t.Fatal("timed out waiting for task after undrain")
+ }
+}