summaryrefslogtreecommitdiff
path: root/internal/executor/executor.go
diff options
context:
space:
mode:
authorPeter Stone <thepeterstone@gmail.com>2026-03-08 06:32:14 +0000
committerPeter Stone <thepeterstone@gmail.com>2026-03-08 06:32:14 +0000
commit076c0faa0ae63278b3120cd6622e64ba1e36e36b (patch)
tree2dfe67328f1e6c795cc956d268aa6d84dc9ef93d /internal/executor/executor.go
parentcad057fd64fbf44f953bc2784f70ce344f3389cf (diff)
fix: detect quota exhaustion from stream; map to BUDGET_EXCEEDED not FAILED
When claude hits the 5-hour usage limit it exits 1. execOnce was returning the generic "exit status 1" error, hiding the real cause from the retry loop and the task state machine. Fix: - execOnce now surfaces streamErr when it indicates rate limiting or quota exhaustion, so callers see the actual message. - New isQuotaExhausted() detects "hit your limit" messages — these are not retried (retrying a depleted 5h bucket wastes nothing but is pointless), and map to BUDGET_EXCEEDED in both execute/executeResume. - isRateLimitError() remains for transient throttling (429/overloaded), which continues to trigger exponential backoff retries. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/executor/executor.go')
-rw-r--r--internal/executor/executor.go40
1 files changed, 28 insertions, 12 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index 62fed2e..9c8e126 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -28,10 +28,11 @@ type Runner interface {
// Pool manages a bounded set of concurrent task workers.
type Pool struct {
- maxConcurrent int
- runner Runner
- store *storage.DB
- logger *slog.Logger
+ maxConcurrent int
+ runner Runner
+ store *storage.DB
+ logger *slog.Logger
+ depPollInterval time.Duration // how often waitForDependencies polls; defaults to 5s
mu sync.Mutex
active int
@@ -52,13 +53,14 @@ func NewPool(maxConcurrent int, runner Runner, store *storage.DB, logger *slog.L
maxConcurrent = 1
}
return &Pool{
- maxConcurrent: maxConcurrent,
- runner: runner,
- store: store,
- logger: logger,
- cancels: make(map[string]context.CancelFunc),
- resultCh: make(chan *Result, maxConcurrent*2),
- Questions: NewQuestionRegistry(),
+ maxConcurrent: maxConcurrent,
+ runner: runner,
+ store: store,
+ logger: logger,
+ depPollInterval: 5 * time.Second,
+ cancels: make(map[string]context.CancelFunc),
+ resultCh: make(chan *Result, maxConcurrent*2),
+ Questions: NewQuestionRegistry(),
}
}
@@ -99,6 +101,12 @@ func (p *Pool) Cancel(taskID string) bool {
// SubmitResume re-queues a blocked task using the provided resume execution.
// The execution must have ResumeSessionID and ResumeAnswer set.
func (p *Pool) SubmitResume(ctx context.Context, t *task.Task, exec *storage.Execution) error {
+ if t.State != task.StateBlocked && t.State != task.StateTimedOut {
+ return fmt.Errorf("task %s must be in BLOCKED or TIMED_OUT state to resume (current: %s)", t.ID, t.State)
+ }
+ if exec.ResumeSessionID == "" {
+ return fmt.Errorf("resume execution for task %s must have a ResumeSessionID", t.ID)
+ }
p.mu.Lock()
if p.active >= p.maxConcurrent {
active := p.active
@@ -163,6 +171,10 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex
exec.Status = "CANCELLED"
exec.ErrorMsg = "execution cancelled"
p.store.UpdateTaskState(t.ID, task.StateCancelled)
+ } else if isQuotaExhausted(err) {
+ exec.Status = "BUDGET_EXCEEDED"
+ exec.ErrorMsg = err.Error()
+ p.store.UpdateTaskState(t.ID, task.StateBudgetExceeded)
} else {
exec.Status = "FAILED"
exec.ErrorMsg = err.Error()
@@ -280,6 +292,10 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
exec.Status = "CANCELLED"
exec.ErrorMsg = "execution cancelled"
p.store.UpdateTaskState(t.ID, task.StateCancelled)
+ } else if isQuotaExhausted(err) {
+ exec.Status = "BUDGET_EXCEEDED"
+ exec.ErrorMsg = err.Error()
+ p.store.UpdateTaskState(t.ID, task.StateBudgetExceeded)
} else {
exec.Status = "FAILED"
exec.ErrorMsg = err.Error()
@@ -334,7 +350,7 @@ func (p *Pool) waitForDependencies(ctx context.Context, t *task.Task) error {
select {
case <-ctx.Done():
return ctx.Err()
- case <-time.After(5 * time.Second):
+ case <-time.After(p.depPollInterval):
}
}
}