summaryrefslogtreecommitdiff
path: root/internal/executor
diff options
context:
space:
mode:
authorPeter Stone <thepeterstone@gmail.com>2026-03-22 00:13:13 +0000
committerPeter Stone <thepeterstone@gmail.com>2026-03-22 00:13:13 +0000
commit0bed6ec4377dd47b414d975304ae5bfae5d2b4c0 (patch)
treeb3bc1ce89502c946e6be5d47a930e7fe55ba3ef8 /internal/executor
parent8dca9bbb0baee59ffe0d3127180ef0958dda8b91 (diff)
fix: make requeueDelay configurable to fix test timeout in TestPool_MaxPerAgent_BlocksSecondTask
Diffstat (limited to 'internal/executor')
-rw-r--r--internal/executor/executor.go6
-rw-r--r--internal/executor/executor_test.go6
2 files changed, 10 insertions, 2 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index 7513916..972c254 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -60,7 +60,8 @@ type Pool struct {
runners map[string]Runner
store Store
logger *slog.Logger
- depPollInterval time.Duration // how often waitForDependencies polls; defaults to 5s
+ depPollInterval time.Duration // how often waitForDependencies polls; defaults to 5s
+ requeueDelay time.Duration // how long to wait before requeuing a blocked-per-agent task; defaults to 30s
mu sync.Mutex
active int
@@ -94,6 +95,7 @@ func NewPool(maxConcurrent int, runners map[string]Runner, store Store, logger *
store: store,
logger: logger,
depPollInterval: 5 * time.Second,
+ requeueDelay: 30 * time.Second,
activePerAgent: make(map[string]int),
rateLimited: make(map[string]time.Time),
cancels: make(map[string]context.CancelFunc),
@@ -574,7 +576,7 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
}
if p.activePerAgent[agentType] >= p.maxPerAgent {
p.mu.Unlock()
- time.AfterFunc(30*time.Second, func() { p.workCh <- workItem{ctx: ctx, task: t} })
+ time.AfterFunc(p.requeueDelay, func() { p.workCh <- workItem{ctx: ctx, task: t} })
return
}
if deadline, ok := p.rateLimited[agentType]; ok && time.Now().After(deadline) {
diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go
index ac09cfc..9dfd860 100644
--- a/internal/executor/executor_test.go
+++ b/internal/executor/executor_test.go
@@ -1232,6 +1232,11 @@ func TestPool_SpecificAgent_SkipsLoadBalancing(t *testing.T) {
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
pool := NewPool(4, runners, store, logger)
+ // Raise per-agent limit so the concurrency gate doesn't interfere with this test.
+ // The injected activePerAgent is only to make pickAgent prefer "claude",
+ // verifying that explicit agent type bypasses load balancing.
+ pool.maxPerAgent = 10
+
// Inject 2 active tasks for gemini, 0 for claude.
// pickAgent would normally pick "claude".
pool.mu.Lock()
@@ -1448,6 +1453,7 @@ func TestPool_MaxPerAgent_BlocksSecondTask(t *testing.T) {
runners := map[string]Runner{"claude": runner}
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
pool := NewPool(2, runners, store, logger) // pool size 2, but maxPerAgent=1
+ pool.requeueDelay = 50 * time.Millisecond // speed up test
tk1 := makeTask("mpa-1")
tk2 := makeTask("mpa-2")