From 076c0faa0ae63278b3120cd6622e64ba1e36e36b Mon Sep 17 00:00:00 2001 From: Peter Stone Date: Sun, 8 Mar 2026 06:32:14 +0000 Subject: fix: detect quota exhaustion from stream; map to BUDGET_EXCEEDED not FAILED MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When claude hits the 5-hour usage limit it exits 1. execOnce was returning the generic "exit status 1" error, hiding the real cause from the retry loop and the task state machine. Fix: - execOnce now surfaces streamErr when it indicates rate limiting or quota exhaustion, so callers see the actual message. - New isQuotaExhausted() detects "hit your limit" messages — these are not retried (retrying a depleted 5h bucket wastes nothing but is pointless), and map to BUDGET_EXCEEDED in both execute/executeResume. - isRateLimitError() remains for transient throttling (429/overloaded), which continues to trigger exponential backoff retries. Co-Authored-By: Claude Sonnet 4.6 --- internal/executor/executor.go | 40 ++++++++++++++++++++++++++++------------ 1 file changed, 28 insertions(+), 12 deletions(-) (limited to 'internal/executor/executor.go') diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 62fed2e..9c8e126 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -28,10 +28,11 @@ type Runner interface { // Pool manages a bounded set of concurrent task workers. type Pool struct { - maxConcurrent int - runner Runner - store *storage.DB - logger *slog.Logger + maxConcurrent int + runner Runner + store *storage.DB + logger *slog.Logger + depPollInterval time.Duration // how often waitForDependencies polls; defaults to 5s mu sync.Mutex active int @@ -52,13 +53,14 @@ func NewPool(maxConcurrent int, runner Runner, store *storage.DB, logger *slog.L maxConcurrent = 1 } return &Pool{ - maxConcurrent: maxConcurrent, - runner: runner, - store: store, - logger: logger, - cancels: make(map[string]context.CancelFunc), - resultCh: make(chan *Result, maxConcurrent*2), - Questions: NewQuestionRegistry(), + maxConcurrent: maxConcurrent, + runner: runner, + store: store, + logger: logger, + depPollInterval: 5 * time.Second, + cancels: make(map[string]context.CancelFunc), + resultCh: make(chan *Result, maxConcurrent*2), + Questions: NewQuestionRegistry(), } } @@ -99,6 +101,12 @@ func (p *Pool) Cancel(taskID string) bool { // SubmitResume re-queues a blocked task using the provided resume execution. // The execution must have ResumeSessionID and ResumeAnswer set. func (p *Pool) SubmitResume(ctx context.Context, t *task.Task, exec *storage.Execution) error { + if t.State != task.StateBlocked && t.State != task.StateTimedOut { + return fmt.Errorf("task %s must be in BLOCKED or TIMED_OUT state to resume (current: %s)", t.ID, t.State) + } + if exec.ResumeSessionID == "" { + return fmt.Errorf("resume execution for task %s must have a ResumeSessionID", t.ID) + } p.mu.Lock() if p.active >= p.maxConcurrent { active := p.active @@ -163,6 +171,10 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex exec.Status = "CANCELLED" exec.ErrorMsg = "execution cancelled" p.store.UpdateTaskState(t.ID, task.StateCancelled) + } else if isQuotaExhausted(err) { + exec.Status = "BUDGET_EXCEEDED" + exec.ErrorMsg = err.Error() + p.store.UpdateTaskState(t.ID, task.StateBudgetExceeded) } else { exec.Status = "FAILED" exec.ErrorMsg = err.Error() @@ -280,6 +292,10 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { exec.Status = "CANCELLED" exec.ErrorMsg = "execution cancelled" p.store.UpdateTaskState(t.ID, task.StateCancelled) + } else if isQuotaExhausted(err) { + exec.Status = "BUDGET_EXCEEDED" + exec.ErrorMsg = err.Error() + p.store.UpdateTaskState(t.ID, task.StateBudgetExceeded) } else { exec.Status = "FAILED" exec.ErrorMsg = err.Error() @@ -334,7 +350,7 @@ func (p *Pool) waitForDependencies(ctx context.Context, t *task.Task) error { select { case <-ctx.Done(): return ctx.Err() - case <-time.After(5 * time.Second): + case <-time.After(p.depPollInterval): } } } -- cgit v1.2.3