From 0bed6ec4377dd47b414d975304ae5bfae5d2b4c0 Mon Sep 17 00:00:00 2001 From: Peter Stone Date: Sun, 22 Mar 2026 00:13:13 +0000 Subject: fix: make requeueDelay configurable to fix test timeout in TestPool_MaxPerAgent_BlocksSecondTask --- internal/executor/executor.go | 6 ++++-- internal/executor/executor_test.go | 6 ++++++ 2 files changed, 10 insertions(+), 2 deletions(-) (limited to 'internal/executor') diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 7513916..972c254 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -60,7 +60,8 @@ type Pool struct { runners map[string]Runner store Store logger *slog.Logger - depPollInterval time.Duration // how often waitForDependencies polls; defaults to 5s + depPollInterval time.Duration // how often waitForDependencies polls; defaults to 5s + requeueDelay time.Duration // how long to wait before requeuing a blocked-per-agent task; defaults to 30s mu sync.Mutex active int @@ -94,6 +95,7 @@ func NewPool(maxConcurrent int, runners map[string]Runner, store Store, logger * store: store, logger: logger, depPollInterval: 5 * time.Second, + requeueDelay: 30 * time.Second, activePerAgent: make(map[string]int), rateLimited: make(map[string]time.Time), cancels: make(map[string]context.CancelFunc), @@ -574,7 +576,7 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { } if p.activePerAgent[agentType] >= p.maxPerAgent { p.mu.Unlock() - time.AfterFunc(30*time.Second, func() { p.workCh <- workItem{ctx: ctx, task: t} }) + time.AfterFunc(p.requeueDelay, func() { p.workCh <- workItem{ctx: ctx, task: t} }) return } if deadline, ok := p.rateLimited[agentType]; ok && time.Now().After(deadline) { diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go index ac09cfc..9dfd860 100644 --- a/internal/executor/executor_test.go +++ b/internal/executor/executor_test.go @@ -1232,6 +1232,11 @@ func TestPool_SpecificAgent_SkipsLoadBalancing(t *testing.T) { logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(4, runners, store, logger) + // Raise per-agent limit so the concurrency gate doesn't interfere with this test. + // The injected activePerAgent is only to make pickAgent prefer "claude", + // verifying that explicit agent type bypasses load balancing. + pool.maxPerAgent = 10 + // Inject 2 active tasks for gemini, 0 for claude. // pickAgent would normally pick "claude". pool.mu.Lock() @@ -1448,6 +1453,7 @@ func TestPool_MaxPerAgent_BlocksSecondTask(t *testing.T) { runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) // pool size 2, but maxPerAgent=1 + pool.requeueDelay = 50 * time.Millisecond // speed up test tk1 := makeTask("mpa-1") tk2 := makeTask("mpa-2") -- cgit v1.2.3