From 5ad8356b5ce4d525af079f902791559d53b67bba Mon Sep 17 00:00:00 2001 From: Peter Stone Date: Mon, 9 Mar 2026 01:08:51 +0000 Subject: executor: fix Claude rate-limit detection and prioritize Gemini when limited Updated parseStream to detect 'rate_limit_event' and 'assistant' error:rate_limit messages from the Claude CLI. Updated Classifier to strongly prefer non-rate-limited agents. Added logging to Pool to track rate-limit status during classification. --- internal/executor/classifier.go | 2 +- internal/executor/claude.go | 11 +++++++++++ internal/executor/executor.go | 10 +++++++++- 3 files changed, 21 insertions(+), 2 deletions(-) (limited to 'internal/executor') diff --git a/internal/executor/classifier.go b/internal/executor/classifier.go index 28581ea..bab3ea9 100644 --- a/internal/executor/classifier.go +++ b/internal/executor/classifier.go @@ -45,7 +45,7 @@ Gemini: - gemini-1.5-pro (more powerful, larger context) Selection Criteria: -- Agent: Prefer the one with least running tasks and no active rate limit. +- Agent: You MUST prefer an agent that is NOT rate limited. If an agent is rate limited, do NOT select it unless all available agents are rate limited. - Model: Select based on task complexity. Use powerful models (opus, pro, pro-preview) for complex reasoning/coding, flash-lite/flash/haiku for simple tasks. Task: diff --git a/internal/executor/claude.go b/internal/executor/claude.go index d6d92cb..2faeff3 100644 --- a/internal/executor/claude.go +++ b/internal/executor/claude.go @@ -421,6 +421,17 @@ func parseStream(r io.Reader, w io.Writer, logger *slog.Logger) (float64, error) msgType, _ := msg["type"].(string) switch msgType { + case "rate_limit_event": + if info, ok := msg["rate_limit_info"].(map[string]interface{}); ok { + status, _ := info["status"].(string) + if status == "rejected" { + streamErr = fmt.Errorf("claude rate limit reached: %v", msg) + } + } + case "assistant": + if errStr, ok := msg["error"].(string); ok && errStr == "rate_limit" { + streamErr = fmt.Errorf("claude rate limit reached: %v", msg) + } case "result": if isErr, _ := msg["is_error"].(bool); isErr { result, _ := msg["result"].(string) diff --git a/internal/executor/executor.go b/internal/executor/executor.go index df222f8..f404ddf 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -238,6 +238,7 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex retryAfter = 1 * time.Minute } p.rateLimited[agentType] = time.Now().Add(retryAfter) + p.logger.Info("agent rate limited", "agent", agentType, "retryAfter", retryAfter) p.mu.Unlock() } @@ -295,12 +296,18 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { now := time.Now() for agent := range p.runners { activeTasks[agent] = p.activePerAgent[agent] - rateLimited[agent] = now.Before(p.rateLimited[agent]) + activeUntil := p.rateLimited[agent] + isLimited := now.Before(activeUntil) + rateLimited[agent] = isLimited + if isLimited { + p.logger.Debug("agent rate limited", "agent", agent, "until", activeUntil) + } } status := SystemStatus{ ActiveTasks: activeTasks, RateLimited: rateLimited, } + p.logger.Debug("classifying task", "taskID", t.ID, "status", status) p.mu.Unlock() cls, err := p.Classifier.Classify(ctx, t.Name, t.Agent.Instructions, status) @@ -433,6 +440,7 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { retryAfter = 1 * time.Minute } p.rateLimited[agentType] = time.Now().Add(retryAfter) + p.logger.Info("agent rate limited", "agent", agentType, "retryAfter", retryAfter) p.mu.Unlock() } -- cgit v1.2.3