diff options
Diffstat (limited to 'internal/executor/executor.go')
| -rw-r--r-- | internal/executor/executor.go | 77 |
1 files changed, 57 insertions, 20 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go index c04f68e..76c8ac7 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -273,6 +273,7 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. var blockedErr *BlockedError if errors.As(err, &blockedErr) { exec.Status = "BLOCKED" + exec.SandboxDir = blockedErr.SandboxDir // preserve so resume runs in same dir if err := p.store.UpdateTaskState(t.ID, task.StateBlocked); err != nil { p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateBlocked, "error", err) } @@ -343,30 +344,66 @@ func (p *Pool) ActiveCount() int { return p.active } -func (p *Pool) execute(ctx context.Context, t *task.Task) { - // 1. Classification - if p.Classifier != nil { - p.mu.Lock() - activeTasks := make(map[string]int) - rateLimited := make(map[string]bool) - now := time.Now() - for agent := range p.runners { - activeTasks[agent] = p.activePerAgent[agent] - if deadline, ok := p.rateLimited[agent]; ok && now.After(deadline) { - delete(p.rateLimited, agent) - } - rateLimited[agent] = now.Before(p.rateLimited[agent]) +// pickAgent selects the best agent from the given SystemStatus using explicit +// load balancing: prefer the available (non-rate-limited) agent with the fewest +// active tasks. If all agents are rate-limited, fall back to fewest active. +func pickAgent(status SystemStatus) string { + best := "" + bestActive := -1 + + // First pass: only consider non-rate-limited agents. + for agent, active := range status.ActiveTasks { + if status.RateLimited[agent] { + continue } - status := SystemStatus{ - ActiveTasks: activeTasks, - RateLimited: rateLimited, + if bestActive == -1 || active < bestActive || (active == bestActive && agent < best) { + best = agent + bestActive = active } - p.mu.Unlock() + } + if best != "" { + return best + } + + // Fallback: all rate-limited — pick least active anyway. + for agent, active := range status.ActiveTasks { + if bestActive == -1 || active < bestActive || (active == bestActive && agent < best) { + best = agent + bestActive = active + } + } + return best +} - cls, err := p.Classifier.Classify(ctx, t.Name, t.Agent.Instructions, status) +func (p *Pool) execute(ctx context.Context, t *task.Task) { + // 1. Load-balanced agent selection + model classification. + p.mu.Lock() + activeTasks := make(map[string]int) + rateLimited := make(map[string]bool) + now := time.Now() + for agent := range p.runners { + activeTasks[agent] = p.activePerAgent[agent] + if deadline, ok := p.rateLimited[agent]; ok && now.After(deadline) { + delete(p.rateLimited, agent) + } + rateLimited[agent] = now.Before(p.rateLimited[agent]) + } + status := SystemStatus{ + ActiveTasks: activeTasks, + RateLimited: rateLimited, + } + p.mu.Unlock() + + // Deterministically pick the agent with fewest active tasks. + selectedAgent := pickAgent(status) + if selectedAgent != "" { + t.Agent.Type = selectedAgent + } + + if p.Classifier != nil { + cls, err := p.Classifier.Classify(ctx, t.Name, t.Agent.Instructions, status, t.Agent.Type) if err == nil { - p.logger.Info("task classified", "taskID", t.ID, "agent", cls.AgentType, "model", cls.Model, "reason", cls.Reason) - t.Agent.Type = cls.AgentType + p.logger.Info("task classified", "taskID", t.ID, "agent", t.Agent.Type, "model", cls.Model, "reason", cls.Reason) t.Agent.Model = cls.Model } else { p.logger.Error("classification failed", "error", err, "taskID", t.ID) |
