diff options
| author | Peter Stone <thepeterstone@gmail.com> | 2026-03-06 00:07:18 +0000 |
|---|---|---|
| committer | Peter Stone <thepeterstone@gmail.com> | 2026-03-06 00:07:18 +0000 |
| commit | 7466b1751c4126735769a3304e1db80dab166a9e (patch) | |
| tree | c5d0fe9d1018e62e3857480d471a0f6f8ebee104 /internal/executor/executor.go | |
| parent | a33211d0ad07f5aaf2d8bb51ba18e6790a153bb4 (diff) | |
feat: blocked task state for agent questions via session resume
When an agent needs user input it writes a question to
$CLAUDOMATOR_QUESTION_FILE and exits. The runner detects the file and
returns BlockedError; the pool transitions the task to BLOCKED and
stores the question JSON on the task record.
The user answers via POST /api/tasks/{id}/answer. The server looks up
the claude session_id from the most recent execution and submits a
resume execution (claude --resume <session-id> "<answer>"), freeing the
executor slot entirely while waiting.
Changes:
- task: add StateBlocked, transitions RUNNING→BLOCKED, BLOCKED→QUEUED
- storage: add session_id to executions, question_json to tasks;
add GetLatestExecution and UpdateTaskQuestion methods
- executor: BlockedError type; ClaudeRunner pre-assigns --session-id,
sets CLAUDOMATOR_QUESTION_FILE env var, detects question file on exit;
buildArgs handles --resume mode; Pool.SubmitResume for resume path
- api: handleAnswerQuestion rewritten to create resume execution
- preamble: add question protocol instructions for agents
- web: BLOCKED state badge (indigo), question text + option buttons or
free-text input with Submit on the task card footer
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/executor/executor.go')
| -rw-r--r-- | internal/executor/executor.go | 96 |
1 files changed, 95 insertions, 1 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go index eb23c02..f6932f4 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -2,6 +2,7 @@ package executor import ( "context" + "errors" "fmt" "log/slog" "path/filepath" @@ -80,6 +81,94 @@ func (p *Pool) Results() <-chan *Result { return p.resultCh } +// SubmitResume re-queues a blocked task using the provided resume execution. +// The execution must have ResumeSessionID and ResumeAnswer set. +func (p *Pool) SubmitResume(ctx context.Context, t *task.Task, exec *storage.Execution) error { + p.mu.Lock() + if p.active >= p.maxConcurrent { + active := p.active + max := p.maxConcurrent + p.mu.Unlock() + return fmt.Errorf("executor pool at capacity (%d/%d)", active, max) + } + p.active++ + p.mu.Unlock() + + go p.executeResume(ctx, t, exec) + return nil +} + +func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Execution) { + defer func() { + p.mu.Lock() + p.active-- + p.mu.Unlock() + }() + + // Pre-populate log paths. + if lp, ok := p.runner.(LogPather); ok { + if logDir := lp.ExecLogDir(exec.ID); logDir != "" { + exec.StdoutPath = filepath.Join(logDir, "stdout.log") + exec.StderrPath = filepath.Join(logDir, "stderr.log") + exec.ArtifactDir = logDir + } + } + exec.StartTime = time.Now().UTC() + exec.Status = "RUNNING" + + if err := p.store.CreateExecution(exec); err != nil { + p.logger.Error("failed to create resume execution record", "error", err) + } + if err := p.store.UpdateTaskState(t.ID, task.StateRunning); err != nil { + p.logger.Error("failed to update task state", "error", err) + } + + var cancel context.CancelFunc + if t.Timeout.Duration > 0 { + ctx, cancel = context.WithTimeout(ctx, t.Timeout.Duration) + } else { + ctx, cancel = context.WithCancel(ctx) + } + defer cancel() + + err := p.runner.Run(ctx, t, exec) + exec.EndTime = time.Now().UTC() + + if err != nil { + var blockedErr *BlockedError + if errors.As(err, &blockedErr) { + exec.Status = "BLOCKED" + p.store.UpdateTaskState(t.ID, task.StateBlocked) + p.store.UpdateTaskQuestion(t.ID, blockedErr.QuestionJSON) + } else if ctx.Err() == context.DeadlineExceeded { + exec.Status = "TIMED_OUT" + exec.ErrorMsg = "execution timed out" + p.store.UpdateTaskState(t.ID, task.StateTimedOut) + } else if ctx.Err() == context.Canceled { + exec.Status = "CANCELLED" + exec.ErrorMsg = "execution cancelled" + p.store.UpdateTaskState(t.ID, task.StateCancelled) + } else { + exec.Status = "FAILED" + exec.ErrorMsg = err.Error() + p.store.UpdateTaskState(t.ID, task.StateFailed) + } + } else { + if t.ParentTaskID == "" { + exec.Status = "READY" + p.store.UpdateTaskState(t.ID, task.StateReady) + } else { + exec.Status = "COMPLETED" + p.store.UpdateTaskState(t.ID, task.StateCompleted) + } + } + + if updateErr := p.store.UpdateExecution(exec); updateErr != nil { + p.logger.Error("failed to update resume execution", "error", updateErr) + } + p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} +} + // ActiveCount returns the number of currently running tasks. func (p *Pool) ActiveCount() int { p.mu.Lock() @@ -155,7 +244,12 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { exec.EndTime = time.Now().UTC() if err != nil { - if ctx.Err() == context.DeadlineExceeded { + var blockedErr *BlockedError + if errors.As(err, &blockedErr) { + exec.Status = "BLOCKED" + p.store.UpdateTaskState(t.ID, task.StateBlocked) + p.store.UpdateTaskQuestion(t.ID, blockedErr.QuestionJSON) + } else if ctx.Err() == context.DeadlineExceeded { exec.Status = "TIMED_OUT" exec.ErrorMsg = "execution timed out" p.store.UpdateTaskState(t.ID, task.StateTimedOut) |
