diff options
| author | Peter Stone <thepeterstone@gmail.com> | 2026-03-08 06:32:14 +0000 |
|---|---|---|
| committer | Peter Stone <thepeterstone@gmail.com> | 2026-03-08 06:32:14 +0000 |
| commit | 076c0faa0ae63278b3120cd6622e64ba1e36e36b (patch) | |
| tree | 2dfe67328f1e6c795cc956d268aa6d84dc9ef93d | |
| parent | cad057fd64fbf44f953bc2784f70ce344f3389cf (diff) | |
fix: detect quota exhaustion from stream; map to BUDGET_EXCEEDED not FAILED
When claude hits the 5-hour usage limit it exits 1. execOnce was
returning the generic "exit status 1" error, hiding the real cause from
the retry loop and the task state machine.
Fix:
- execOnce now surfaces streamErr when it indicates rate limiting or
quota exhaustion, so callers see the actual message.
- New isQuotaExhausted() detects "hit your limit" messages — these are
not retried (retrying a depleted 5h bucket wastes nothing but is
pointless), and map to BUDGET_EXCEEDED in both execute/executeResume.
- isRateLimitError() remains for transient throttling (429/overloaded),
which continues to trigger exponential backoff retries.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
| -rw-r--r-- | internal/executor/claude.go | 5 | ||||
| -rw-r--r-- | internal/executor/executor.go | 40 | ||||
| -rw-r--r-- | internal/executor/ratelimit.go | 14 |
3 files changed, 46 insertions, 13 deletions
diff --git a/internal/executor/claude.go b/internal/executor/claude.go index b97f202..c04a747 100644 --- a/internal/executor/claude.go +++ b/internal/executor/claude.go @@ -189,6 +189,11 @@ func (r *ClaudeRunner) execOnce(ctx context.Context, args []string, workingDir s if exitErr, ok := waitErr.(*exec.ExitError); ok { e.ExitCode = exitErr.ExitCode() } + // If the stream captured a rate-limit or quota message, return it + // so callers can distinguish it from a generic exit-status failure. + if isRateLimitError(streamErr) || isQuotaExhausted(streamErr) { + return streamErr + } return fmt.Errorf("claude exited with error: %w", waitErr) } diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 62fed2e..9c8e126 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -28,10 +28,11 @@ type Runner interface { // Pool manages a bounded set of concurrent task workers. type Pool struct { - maxConcurrent int - runner Runner - store *storage.DB - logger *slog.Logger + maxConcurrent int + runner Runner + store *storage.DB + logger *slog.Logger + depPollInterval time.Duration // how often waitForDependencies polls; defaults to 5s mu sync.Mutex active int @@ -52,13 +53,14 @@ func NewPool(maxConcurrent int, runner Runner, store *storage.DB, logger *slog.L maxConcurrent = 1 } return &Pool{ - maxConcurrent: maxConcurrent, - runner: runner, - store: store, - logger: logger, - cancels: make(map[string]context.CancelFunc), - resultCh: make(chan *Result, maxConcurrent*2), - Questions: NewQuestionRegistry(), + maxConcurrent: maxConcurrent, + runner: runner, + store: store, + logger: logger, + depPollInterval: 5 * time.Second, + cancels: make(map[string]context.CancelFunc), + resultCh: make(chan *Result, maxConcurrent*2), + Questions: NewQuestionRegistry(), } } @@ -99,6 +101,12 @@ func (p *Pool) Cancel(taskID string) bool { // SubmitResume re-queues a blocked task using the provided resume execution. // The execution must have ResumeSessionID and ResumeAnswer set. func (p *Pool) SubmitResume(ctx context.Context, t *task.Task, exec *storage.Execution) error { + if t.State != task.StateBlocked && t.State != task.StateTimedOut { + return fmt.Errorf("task %s must be in BLOCKED or TIMED_OUT state to resume (current: %s)", t.ID, t.State) + } + if exec.ResumeSessionID == "" { + return fmt.Errorf("resume execution for task %s must have a ResumeSessionID", t.ID) + } p.mu.Lock() if p.active >= p.maxConcurrent { active := p.active @@ -163,6 +171,10 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex exec.Status = "CANCELLED" exec.ErrorMsg = "execution cancelled" p.store.UpdateTaskState(t.ID, task.StateCancelled) + } else if isQuotaExhausted(err) { + exec.Status = "BUDGET_EXCEEDED" + exec.ErrorMsg = err.Error() + p.store.UpdateTaskState(t.ID, task.StateBudgetExceeded) } else { exec.Status = "FAILED" exec.ErrorMsg = err.Error() @@ -280,6 +292,10 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { exec.Status = "CANCELLED" exec.ErrorMsg = "execution cancelled" p.store.UpdateTaskState(t.ID, task.StateCancelled) + } else if isQuotaExhausted(err) { + exec.Status = "BUDGET_EXCEEDED" + exec.ErrorMsg = err.Error() + p.store.UpdateTaskState(t.ID, task.StateBudgetExceeded) } else { exec.Status = "FAILED" exec.ErrorMsg = err.Error() @@ -334,7 +350,7 @@ func (p *Pool) waitForDependencies(ctx context.Context, t *task.Task) error { select { case <-ctx.Done(): return ctx.Err() - case <-time.After(5 * time.Second): + case <-time.After(p.depPollInterval): } } } diff --git a/internal/executor/ratelimit.go b/internal/executor/ratelimit.go index 884da43..deaad18 100644 --- a/internal/executor/ratelimit.go +++ b/internal/executor/ratelimit.go @@ -13,7 +13,8 @@ var retryAfterRe = regexp.MustCompile(`(?i)retry[-_ ]after[:\s]+(\d+)`) const maxBackoffDelay = 5 * time.Minute -// isRateLimitError returns true if err looks like a Claude API rate-limit response. +// isRateLimitError returns true if err looks like a transient Claude API +// rate-limit that is worth retrying (e.g. per-minute/per-request throttle). func isRateLimitError(err error) bool { if err == nil { return false @@ -25,6 +26,17 @@ func isRateLimitError(err error) bool { strings.Contains(msg, "overloaded") } +// isQuotaExhausted returns true if err indicates the 5-hour usage quota is +// fully exhausted. Unlike transient rate limits, these should not be retried. +func isQuotaExhausted(err error) bool { + if err == nil { + return false + } + msg := strings.ToLower(err.Error()) + return strings.Contains(msg, "hit your limit") || + strings.Contains(msg, "you've hit your limit") +} + // parseRetryAfter extracts a Retry-After duration from an error message. // Returns 0 if no retry-after value is found. func parseRetryAfter(msg string) time.Duration { |
