diff options
Diffstat (limited to 'internal/executor/executor.go')
| -rw-r--r-- | internal/executor/executor.go | 40 |
1 files changed, 28 insertions, 12 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 62fed2e..9c8e126 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -28,10 +28,11 @@ type Runner interface { // Pool manages a bounded set of concurrent task workers. type Pool struct { - maxConcurrent int - runner Runner - store *storage.DB - logger *slog.Logger + maxConcurrent int + runner Runner + store *storage.DB + logger *slog.Logger + depPollInterval time.Duration // how often waitForDependencies polls; defaults to 5s mu sync.Mutex active int @@ -52,13 +53,14 @@ func NewPool(maxConcurrent int, runner Runner, store *storage.DB, logger *slog.L maxConcurrent = 1 } return &Pool{ - maxConcurrent: maxConcurrent, - runner: runner, - store: store, - logger: logger, - cancels: make(map[string]context.CancelFunc), - resultCh: make(chan *Result, maxConcurrent*2), - Questions: NewQuestionRegistry(), + maxConcurrent: maxConcurrent, + runner: runner, + store: store, + logger: logger, + depPollInterval: 5 * time.Second, + cancels: make(map[string]context.CancelFunc), + resultCh: make(chan *Result, maxConcurrent*2), + Questions: NewQuestionRegistry(), } } @@ -99,6 +101,12 @@ func (p *Pool) Cancel(taskID string) bool { // SubmitResume re-queues a blocked task using the provided resume execution. // The execution must have ResumeSessionID and ResumeAnswer set. func (p *Pool) SubmitResume(ctx context.Context, t *task.Task, exec *storage.Execution) error { + if t.State != task.StateBlocked && t.State != task.StateTimedOut { + return fmt.Errorf("task %s must be in BLOCKED or TIMED_OUT state to resume (current: %s)", t.ID, t.State) + } + if exec.ResumeSessionID == "" { + return fmt.Errorf("resume execution for task %s must have a ResumeSessionID", t.ID) + } p.mu.Lock() if p.active >= p.maxConcurrent { active := p.active @@ -163,6 +171,10 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex exec.Status = "CANCELLED" exec.ErrorMsg = "execution cancelled" p.store.UpdateTaskState(t.ID, task.StateCancelled) + } else if isQuotaExhausted(err) { + exec.Status = "BUDGET_EXCEEDED" + exec.ErrorMsg = err.Error() + p.store.UpdateTaskState(t.ID, task.StateBudgetExceeded) } else { exec.Status = "FAILED" exec.ErrorMsg = err.Error() @@ -280,6 +292,10 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { exec.Status = "CANCELLED" exec.ErrorMsg = "execution cancelled" p.store.UpdateTaskState(t.ID, task.StateCancelled) + } else if isQuotaExhausted(err) { + exec.Status = "BUDGET_EXCEEDED" + exec.ErrorMsg = err.Error() + p.store.UpdateTaskState(t.ID, task.StateBudgetExceeded) } else { exec.Status = "FAILED" exec.ErrorMsg = err.Error() @@ -334,7 +350,7 @@ func (p *Pool) waitForDependencies(ctx context.Context, t *task.Task) error { select { case <-ctx.Done(): return ctx.Err() - case <-time.After(5 * time.Second): + case <-time.After(p.depPollInterval): } } } |
