summaryrefslogtreecommitdiff
path: root/internal/executor/executor.go
diff options
context:
space:
mode:
authorPeter Stone <thepeterstone@gmail.com>2026-03-06 00:07:18 +0000
committerPeter Stone <thepeterstone@gmail.com>2026-03-06 00:07:18 +0000
commit7466b1751c4126735769a3304e1db80dab166a9e (patch)
treec5d0fe9d1018e62e3857480d471a0f6f8ebee104 /internal/executor/executor.go
parenta33211d0ad07f5aaf2d8bb51ba18e6790a153bb4 (diff)
feat: blocked task state for agent questions via session resume
When an agent needs user input it writes a question to $CLAUDOMATOR_QUESTION_FILE and exits. The runner detects the file and returns BlockedError; the pool transitions the task to BLOCKED and stores the question JSON on the task record. The user answers via POST /api/tasks/{id}/answer. The server looks up the claude session_id from the most recent execution and submits a resume execution (claude --resume <session-id> "<answer>"), freeing the executor slot entirely while waiting. Changes: - task: add StateBlocked, transitions RUNNING→BLOCKED, BLOCKED→QUEUED - storage: add session_id to executions, question_json to tasks; add GetLatestExecution and UpdateTaskQuestion methods - executor: BlockedError type; ClaudeRunner pre-assigns --session-id, sets CLAUDOMATOR_QUESTION_FILE env var, detects question file on exit; buildArgs handles --resume mode; Pool.SubmitResume for resume path - api: handleAnswerQuestion rewritten to create resume execution - preamble: add question protocol instructions for agents - web: BLOCKED state badge (indigo), question text + option buttons or free-text input with Submit on the task card footer Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/executor/executor.go')
-rw-r--r--internal/executor/executor.go96
1 files changed, 95 insertions, 1 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index eb23c02..f6932f4 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -2,6 +2,7 @@ package executor
import (
"context"
+ "errors"
"fmt"
"log/slog"
"path/filepath"
@@ -80,6 +81,94 @@ func (p *Pool) Results() <-chan *Result {
return p.resultCh
}
+// SubmitResume re-queues a blocked task using the provided resume execution.
+// The execution must have ResumeSessionID and ResumeAnswer set.
+func (p *Pool) SubmitResume(ctx context.Context, t *task.Task, exec *storage.Execution) error {
+ p.mu.Lock()
+ if p.active >= p.maxConcurrent {
+ active := p.active
+ max := p.maxConcurrent
+ p.mu.Unlock()
+ return fmt.Errorf("executor pool at capacity (%d/%d)", active, max)
+ }
+ p.active++
+ p.mu.Unlock()
+
+ go p.executeResume(ctx, t, exec)
+ return nil
+}
+
+func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Execution) {
+ defer func() {
+ p.mu.Lock()
+ p.active--
+ p.mu.Unlock()
+ }()
+
+ // Pre-populate log paths.
+ if lp, ok := p.runner.(LogPather); ok {
+ if logDir := lp.ExecLogDir(exec.ID); logDir != "" {
+ exec.StdoutPath = filepath.Join(logDir, "stdout.log")
+ exec.StderrPath = filepath.Join(logDir, "stderr.log")
+ exec.ArtifactDir = logDir
+ }
+ }
+ exec.StartTime = time.Now().UTC()
+ exec.Status = "RUNNING"
+
+ if err := p.store.CreateExecution(exec); err != nil {
+ p.logger.Error("failed to create resume execution record", "error", err)
+ }
+ if err := p.store.UpdateTaskState(t.ID, task.StateRunning); err != nil {
+ p.logger.Error("failed to update task state", "error", err)
+ }
+
+ var cancel context.CancelFunc
+ if t.Timeout.Duration > 0 {
+ ctx, cancel = context.WithTimeout(ctx, t.Timeout.Duration)
+ } else {
+ ctx, cancel = context.WithCancel(ctx)
+ }
+ defer cancel()
+
+ err := p.runner.Run(ctx, t, exec)
+ exec.EndTime = time.Now().UTC()
+
+ if err != nil {
+ var blockedErr *BlockedError
+ if errors.As(err, &blockedErr) {
+ exec.Status = "BLOCKED"
+ p.store.UpdateTaskState(t.ID, task.StateBlocked)
+ p.store.UpdateTaskQuestion(t.ID, blockedErr.QuestionJSON)
+ } else if ctx.Err() == context.DeadlineExceeded {
+ exec.Status = "TIMED_OUT"
+ exec.ErrorMsg = "execution timed out"
+ p.store.UpdateTaskState(t.ID, task.StateTimedOut)
+ } else if ctx.Err() == context.Canceled {
+ exec.Status = "CANCELLED"
+ exec.ErrorMsg = "execution cancelled"
+ p.store.UpdateTaskState(t.ID, task.StateCancelled)
+ } else {
+ exec.Status = "FAILED"
+ exec.ErrorMsg = err.Error()
+ p.store.UpdateTaskState(t.ID, task.StateFailed)
+ }
+ } else {
+ if t.ParentTaskID == "" {
+ exec.Status = "READY"
+ p.store.UpdateTaskState(t.ID, task.StateReady)
+ } else {
+ exec.Status = "COMPLETED"
+ p.store.UpdateTaskState(t.ID, task.StateCompleted)
+ }
+ }
+
+ if updateErr := p.store.UpdateExecution(exec); updateErr != nil {
+ p.logger.Error("failed to update resume execution", "error", updateErr)
+ }
+ p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err}
+}
+
// ActiveCount returns the number of currently running tasks.
func (p *Pool) ActiveCount() int {
p.mu.Lock()
@@ -155,7 +244,12 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
exec.EndTime = time.Now().UTC()
if err != nil {
- if ctx.Err() == context.DeadlineExceeded {
+ var blockedErr *BlockedError
+ if errors.As(err, &blockedErr) {
+ exec.Status = "BLOCKED"
+ p.store.UpdateTaskState(t.ID, task.StateBlocked)
+ p.store.UpdateTaskQuestion(t.ID, blockedErr.QuestionJSON)
+ } else if ctx.Err() == context.DeadlineExceeded {
exec.Status = "TIMED_OUT"
exec.ErrorMsg = "execution timed out"
p.store.UpdateTaskState(t.ID, task.StateTimedOut)