summaryrefslogtreecommitdiff
path: root/internal/executor
diff options
context:
space:
mode:
authorPeter Stone <thepeterstone@gmail.com>2026-03-08 06:32:14 +0000
committerPeter Stone <thepeterstone@gmail.com>2026-03-08 06:32:14 +0000
commit076c0faa0ae63278b3120cd6622e64ba1e36e36b (patch)
tree2dfe67328f1e6c795cc956d268aa6d84dc9ef93d /internal/executor
parentcad057fd64fbf44f953bc2784f70ce344f3389cf (diff)
fix: detect quota exhaustion from stream; map to BUDGET_EXCEEDED not FAILED
When claude hits the 5-hour usage limit it exits 1. execOnce was returning the generic "exit status 1" error, hiding the real cause from the retry loop and the task state machine. Fix: - execOnce now surfaces streamErr when it indicates rate limiting or quota exhaustion, so callers see the actual message. - New isQuotaExhausted() detects "hit your limit" messages — these are not retried (retrying a depleted 5h bucket wastes nothing but is pointless), and map to BUDGET_EXCEEDED in both execute/executeResume. - isRateLimitError() remains for transient throttling (429/overloaded), which continues to trigger exponential backoff retries. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/executor')
-rw-r--r--internal/executor/claude.go5
-rw-r--r--internal/executor/executor.go40
-rw-r--r--internal/executor/ratelimit.go14
3 files changed, 46 insertions, 13 deletions
diff --git a/internal/executor/claude.go b/internal/executor/claude.go
index b97f202..c04a747 100644
--- a/internal/executor/claude.go
+++ b/internal/executor/claude.go
@@ -189,6 +189,11 @@ func (r *ClaudeRunner) execOnce(ctx context.Context, args []string, workingDir s
if exitErr, ok := waitErr.(*exec.ExitError); ok {
e.ExitCode = exitErr.ExitCode()
}
+ // If the stream captured a rate-limit or quota message, return it
+ // so callers can distinguish it from a generic exit-status failure.
+ if isRateLimitError(streamErr) || isQuotaExhausted(streamErr) {
+ return streamErr
+ }
return fmt.Errorf("claude exited with error: %w", waitErr)
}
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index 62fed2e..9c8e126 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -28,10 +28,11 @@ type Runner interface {
// Pool manages a bounded set of concurrent task workers.
type Pool struct {
- maxConcurrent int
- runner Runner
- store *storage.DB
- logger *slog.Logger
+ maxConcurrent int
+ runner Runner
+ store *storage.DB
+ logger *slog.Logger
+ depPollInterval time.Duration // how often waitForDependencies polls; defaults to 5s
mu sync.Mutex
active int
@@ -52,13 +53,14 @@ func NewPool(maxConcurrent int, runner Runner, store *storage.DB, logger *slog.L
maxConcurrent = 1
}
return &Pool{
- maxConcurrent: maxConcurrent,
- runner: runner,
- store: store,
- logger: logger,
- cancels: make(map[string]context.CancelFunc),
- resultCh: make(chan *Result, maxConcurrent*2),
- Questions: NewQuestionRegistry(),
+ maxConcurrent: maxConcurrent,
+ runner: runner,
+ store: store,
+ logger: logger,
+ depPollInterval: 5 * time.Second,
+ cancels: make(map[string]context.CancelFunc),
+ resultCh: make(chan *Result, maxConcurrent*2),
+ Questions: NewQuestionRegistry(),
}
}
@@ -99,6 +101,12 @@ func (p *Pool) Cancel(taskID string) bool {
// SubmitResume re-queues a blocked task using the provided resume execution.
// The execution must have ResumeSessionID and ResumeAnswer set.
func (p *Pool) SubmitResume(ctx context.Context, t *task.Task, exec *storage.Execution) error {
+ if t.State != task.StateBlocked && t.State != task.StateTimedOut {
+ return fmt.Errorf("task %s must be in BLOCKED or TIMED_OUT state to resume (current: %s)", t.ID, t.State)
+ }
+ if exec.ResumeSessionID == "" {
+ return fmt.Errorf("resume execution for task %s must have a ResumeSessionID", t.ID)
+ }
p.mu.Lock()
if p.active >= p.maxConcurrent {
active := p.active
@@ -163,6 +171,10 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex
exec.Status = "CANCELLED"
exec.ErrorMsg = "execution cancelled"
p.store.UpdateTaskState(t.ID, task.StateCancelled)
+ } else if isQuotaExhausted(err) {
+ exec.Status = "BUDGET_EXCEEDED"
+ exec.ErrorMsg = err.Error()
+ p.store.UpdateTaskState(t.ID, task.StateBudgetExceeded)
} else {
exec.Status = "FAILED"
exec.ErrorMsg = err.Error()
@@ -280,6 +292,10 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
exec.Status = "CANCELLED"
exec.ErrorMsg = "execution cancelled"
p.store.UpdateTaskState(t.ID, task.StateCancelled)
+ } else if isQuotaExhausted(err) {
+ exec.Status = "BUDGET_EXCEEDED"
+ exec.ErrorMsg = err.Error()
+ p.store.UpdateTaskState(t.ID, task.StateBudgetExceeded)
} else {
exec.Status = "FAILED"
exec.ErrorMsg = err.Error()
@@ -334,7 +350,7 @@ func (p *Pool) waitForDependencies(ctx context.Context, t *task.Task) error {
select {
case <-ctx.Done():
return ctx.Err()
- case <-time.After(5 * time.Second):
+ case <-time.After(p.depPollInterval):
}
}
}
diff --git a/internal/executor/ratelimit.go b/internal/executor/ratelimit.go
index 884da43..deaad18 100644
--- a/internal/executor/ratelimit.go
+++ b/internal/executor/ratelimit.go
@@ -13,7 +13,8 @@ var retryAfterRe = regexp.MustCompile(`(?i)retry[-_ ]after[:\s]+(\d+)`)
const maxBackoffDelay = 5 * time.Minute
-// isRateLimitError returns true if err looks like a Claude API rate-limit response.
+// isRateLimitError returns true if err looks like a transient Claude API
+// rate-limit that is worth retrying (e.g. per-minute/per-request throttle).
func isRateLimitError(err error) bool {
if err == nil {
return false
@@ -25,6 +26,17 @@ func isRateLimitError(err error) bool {
strings.Contains(msg, "overloaded")
}
+// isQuotaExhausted returns true if err indicates the 5-hour usage quota is
+// fully exhausted. Unlike transient rate limits, these should not be retried.
+func isQuotaExhausted(err error) bool {
+ if err == nil {
+ return false
+ }
+ msg := strings.ToLower(err.Error())
+ return strings.Contains(msg, "hit your limit") ||
+ strings.Contains(msg, "you've hit your limit")
+}
+
// parseRetryAfter extracts a Retry-After duration from an error message.
// Returns 0 if no retry-after value is found.
func parseRetryAfter(msg string) time.Duration {