diff options
| author | Peter Stone <thepeterstone@gmail.com> | 2026-03-10 16:48:15 +0000 |
|---|---|---|
| committer | Peter Stone <thepeterstone@gmail.com> | 2026-03-10 16:48:15 +0000 |
| commit | e0335047e063830ca000a8cb3a9ec31a8ab411a7 (patch) | |
| tree | 9f38a4b669f35a0b9590b13b0fd0a0aeb37d99a9 /internal/executor/executor.go | |
| parent | e392f99727aa2f399033896f2cda5b22e3277700 (diff) | |
executor: explicit load balancing — code picks agent, classifier picks model
pickAgent() deterministically selects the agent with the fewest active tasks,
skipping rate-limited agents. The classifier now only selects the model for the
pre-assigned agent, so Gemini gets tasks from the start rather than only as a
fallback when Claude's quota is exhausted.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/executor/executor.go')
| -rw-r--r-- | internal/executor/executor.go | 76 |
1 files changed, 56 insertions, 20 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go index c04f68e..f54773a 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -343,30 +343,66 @@ func (p *Pool) ActiveCount() int { return p.active } -func (p *Pool) execute(ctx context.Context, t *task.Task) { - // 1. Classification - if p.Classifier != nil { - p.mu.Lock() - activeTasks := make(map[string]int) - rateLimited := make(map[string]bool) - now := time.Now() - for agent := range p.runners { - activeTasks[agent] = p.activePerAgent[agent] - if deadline, ok := p.rateLimited[agent]; ok && now.After(deadline) { - delete(p.rateLimited, agent) - } - rateLimited[agent] = now.Before(p.rateLimited[agent]) +// pickAgent selects the best agent from the given SystemStatus using explicit +// load balancing: prefer the available (non-rate-limited) agent with the fewest +// active tasks. If all agents are rate-limited, fall back to fewest active. +func pickAgent(status SystemStatus) string { + best := "" + bestActive := -1 + + // First pass: only consider non-rate-limited agents. + for agent, active := range status.ActiveTasks { + if status.RateLimited[agent] { + continue } - status := SystemStatus{ - ActiveTasks: activeTasks, - RateLimited: rateLimited, + if bestActive == -1 || active < bestActive || (active == bestActive && agent < best) { + best = agent + bestActive = active } - p.mu.Unlock() + } + if best != "" { + return best + } + + // Fallback: all rate-limited — pick least active anyway. + for agent, active := range status.ActiveTasks { + if bestActive == -1 || active < bestActive || (active == bestActive && agent < best) { + best = agent + bestActive = active + } + } + return best +} - cls, err := p.Classifier.Classify(ctx, t.Name, t.Agent.Instructions, status) +func (p *Pool) execute(ctx context.Context, t *task.Task) { + // 1. Load-balanced agent selection + model classification. + p.mu.Lock() + activeTasks := make(map[string]int) + rateLimited := make(map[string]bool) + now := time.Now() + for agent := range p.runners { + activeTasks[agent] = p.activePerAgent[agent] + if deadline, ok := p.rateLimited[agent]; ok && now.After(deadline) { + delete(p.rateLimited, agent) + } + rateLimited[agent] = now.Before(p.rateLimited[agent]) + } + status := SystemStatus{ + ActiveTasks: activeTasks, + RateLimited: rateLimited, + } + p.mu.Unlock() + + // Deterministically pick the agent with fewest active tasks. + selectedAgent := pickAgent(status) + if selectedAgent != "" { + t.Agent.Type = selectedAgent + } + + if p.Classifier != nil { + cls, err := p.Classifier.Classify(ctx, t.Name, t.Agent.Instructions, status, t.Agent.Type) if err == nil { - p.logger.Info("task classified", "taskID", t.ID, "agent", cls.AgentType, "model", cls.Model, "reason", cls.Reason) - t.Agent.Type = cls.AgentType + p.logger.Info("task classified", "taskID", t.ID, "agent", t.Agent.Type, "model", cls.Model, "reason", cls.Reason) t.Agent.Model = cls.Model } else { p.logger.Error("classification failed", "error", err, "taskID", t.ID) |
