diff options
| author | Peter Stone <thepeterstone@gmail.com> | 2026-03-06 00:07:18 +0000 |
|---|---|---|
| committer | Peter Stone <thepeterstone@gmail.com> | 2026-03-06 00:07:18 +0000 |
| commit | 7466b1751c4126735769a3304e1db80dab166a9e (patch) | |
| tree | c5d0fe9d1018e62e3857480d471a0f6f8ebee104 /internal/executor | |
| parent | a33211d0ad07f5aaf2d8bb51ba18e6790a153bb4 (diff) | |
feat: blocked task state for agent questions via session resume
When an agent needs user input it writes a question to
$CLAUDOMATOR_QUESTION_FILE and exits. The runner detects the file and
returns BlockedError; the pool transitions the task to BLOCKED and
stores the question JSON on the task record.
The user answers via POST /api/tasks/{id}/answer. The server looks up
the claude session_id from the most recent execution and submits a
resume execution (claude --resume <session-id> "<answer>"), freeing the
executor slot entirely while waiting.
Changes:
- task: add StateBlocked, transitions RUNNING→BLOCKED, BLOCKED→QUEUED
- storage: add session_id to executions, question_json to tasks;
add GetLatestExecution and UpdateTaskQuestion methods
- executor: BlockedError type; ClaudeRunner pre-assigns --session-id,
sets CLAUDOMATOR_QUESTION_FILE env var, detects question file on exit;
buildArgs handles --resume mode; Pool.SubmitResume for resume path
- api: handleAnswerQuestion rewritten to create resume execution
- preamble: add question protocol instructions for agents
- web: BLOCKED state badge (indigo), question text + option buttons or
free-text input with Submit on the task card footer
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/executor')
| -rw-r--r-- | internal/executor/claude.go | 73 | ||||
| -rw-r--r-- | internal/executor/claude_test.go | 16 | ||||
| -rw-r--r-- | internal/executor/executor.go | 96 | ||||
| -rw-r--r-- | internal/executor/preamble.go | 19 |
4 files changed, 181 insertions, 23 deletions
diff --git a/internal/executor/claude.go b/internal/executor/claude.go index d3f5751..b97f202 100644 --- a/internal/executor/claude.go +++ b/internal/executor/claude.go @@ -27,6 +27,15 @@ type ClaudeRunner struct { APIURL string // base URL of the Claudomator API, passed to subprocesses } +// BlockedError is returned by Run when the agent wrote a question file and exited. +// The pool transitions the task to BLOCKED and stores the question for the user. +type BlockedError struct { + QuestionJSON string // raw JSON from the question file + SessionID string // claude session to resume once the user answers +} + +func (e *BlockedError) Error() string { return fmt.Sprintf("task blocked: %s", e.QuestionJSON) } + // ExecLogDir returns the log directory for the given execution ID. // Implements LogPather so the pool can persist paths before execution starts. func (r *ClaudeRunner) ExecLogDir(execID string) string { @@ -45,9 +54,8 @@ func (r *ClaudeRunner) binaryPath() string { // Run executes a claude -p invocation, streaming output to log files. // It retries up to 3 times on rate-limit errors using exponential backoff. +// If the agent writes a question file and exits, Run returns *BlockedError. func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error { - args := r.buildArgs(t) - if t.Claude.WorkingDir != "" { if _, err := os.Stat(t.Claude.WorkingDir); err != nil { return fmt.Errorf("working_dir %q: %w", t.Claude.WorkingDir, err) @@ -55,11 +63,9 @@ func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Executi } // Setup log directory once; retries overwrite the log files. - // Use pre-set paths if the pool already populated them via LogPather; - // otherwise fall back to computing from LogDir + execID. logDir := r.ExecLogDir(e.ID) if logDir == "" { - logDir = e.ID // fallback: use execID as relative dir (tests without LogDir set) + logDir = e.ID // fallback for tests without LogDir set } if err := os.MkdirAll(logDir, 0700); err != nil { return fmt.Errorf("creating log dir: %w", err) @@ -70,8 +76,17 @@ func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Executi e.ArtifactDir = logDir } + // Pre-assign session ID so we can resume after a BLOCKED state. + // If this is a resume execution the session ID is already set. + if e.SessionID == "" { + e.SessionID = e.ID // reuse execution UUID as session UUID (both are UUIDs) + } + + questionFile := filepath.Join(logDir, "question.json") + args := r.buildArgs(t, e, questionFile) + attempt := 0 - return runWithBackoff(ctx, 3, 5*time.Second, func() error { + err := runWithBackoff(ctx, 3, 5*time.Second, func() error { if attempt > 0 { delay := 5 * time.Second * (1 << (attempt - 1)) r.Logger.Warn("rate-limited by Claude API, retrying", @@ -80,22 +95,34 @@ func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Executi ) } attempt++ - return r.execOnce(ctx, t, args, e) + return r.execOnce(ctx, args, t.Claude.WorkingDir, e) }) + if err != nil { + return err + } + + // Check whether the agent left a question before exiting. + data, readErr := os.ReadFile(questionFile) + if readErr == nil { + os.Remove(questionFile) // consumed + return &BlockedError{QuestionJSON: strings.TrimSpace(string(data)), SessionID: e.SessionID} + } + return nil } // execOnce runs the claude subprocess once, streaming output to e's log paths. -func (r *ClaudeRunner) execOnce(ctx context.Context, t *task.Task, args []string, e *storage.Execution) error { +func (r *ClaudeRunner) execOnce(ctx context.Context, args []string, workingDir string, e *storage.Execution) error { cmd := exec.CommandContext(ctx, r.binaryPath(), args...) cmd.Env = append(os.Environ(), "CLAUDOMATOR_API_URL="+r.APIURL, - "CLAUDOMATOR_TASK_ID="+t.ID, + "CLAUDOMATOR_TASK_ID="+e.TaskID, + "CLAUDOMATOR_QUESTION_FILE="+filepath.Join(e.ArtifactDir, "question.json"), ) // Put the subprocess in its own process group so we can SIGKILL the entire // group (MCP servers, bash children, etc.) on cancellation. cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} - if t.Claude.WorkingDir != "" { - cmd.Dir = t.Claude.WorkingDir + if workingDir != "" { + cmd.Dir = workingDir } stdoutFile, err := os.Create(e.StdoutPath) @@ -172,13 +199,32 @@ func (r *ClaudeRunner) execOnce(ctx context.Context, t *task.Task, args []string return nil } -func (r *ClaudeRunner) buildArgs(t *task.Task) []string { +func (r *ClaudeRunner) buildArgs(t *task.Task, e *storage.Execution, questionFile string) []string { + // Resume execution: the agent already has context; just deliver the answer. + if e.ResumeSessionID != "" { + args := []string{ + "-p", e.ResumeAnswer, + "--resume", e.ResumeSessionID, + "--output-format", "stream-json", + "--verbose", + } + permMode := t.Claude.PermissionMode + if permMode == "" { + permMode = "bypassPermissions" + } + args = append(args, "--permission-mode", permMode) + if t.Claude.Model != "" { + args = append(args, "--model", t.Claude.Model) + } + return args + } + instructions := t.Claude.Instructions allowedTools := t.Claude.AllowedTools if !t.Claude.SkipPlanning { instructions = withPlanningPreamble(instructions) - // Ensure Bash is available so the agent can POST subtasks. + // Ensure Bash is available so the agent can POST subtasks and ask questions. hasBash := false for _, tool := range allowedTools { if tool == "Bash" { @@ -193,6 +239,7 @@ func (r *ClaudeRunner) buildArgs(t *task.Task) []string { args := []string{ "-p", instructions, + "--session-id", e.SessionID, "--output-format", "stream-json", "--verbose", } diff --git a/internal/executor/claude_test.go b/internal/executor/claude_test.go index edf13b5..056c7e8 100644 --- a/internal/executor/claude_test.go +++ b/internal/executor/claude_test.go @@ -21,7 +21,7 @@ func TestClaudeRunner_BuildArgs_BasicTask(t *testing.T) { }, } - args := r.buildArgs(tk) + args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") argMap := make(map[string]bool) for _, a := range args { @@ -51,7 +51,7 @@ func TestClaudeRunner_BuildArgs_FullConfig(t *testing.T) { }, } - args := r.buildArgs(tk) + args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") // Check key args are present. argMap := make(map[string]bool) @@ -86,7 +86,7 @@ func TestClaudeRunner_BuildArgs_DefaultsToBypassPermissions(t *testing.T) { }, } - args := r.buildArgs(tk) + args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") found := false for i, a := range args { @@ -109,7 +109,7 @@ func TestClaudeRunner_BuildArgs_RespectsExplicitPermissionMode(t *testing.T) { }, } - args := r.buildArgs(tk) + args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") for i, a := range args { if a == "--permission-mode" && i+1 < len(args) { @@ -131,7 +131,7 @@ func TestClaudeRunner_BuildArgs_AlwaysIncludesVerbose(t *testing.T) { }, } - args := r.buildArgs(tk) + args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") found := false for _, a := range args { @@ -154,7 +154,7 @@ func TestClaudeRunner_BuildArgs_PreamblePrepended(t *testing.T) { }, } - args := r.buildArgs(tk) + args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") // The -p value should start with the preamble and end with the original instructions. if len(args) < 2 || args[0] != "-p" { @@ -178,7 +178,7 @@ func TestClaudeRunner_BuildArgs_PreambleAddsBash(t *testing.T) { }, } - args := r.buildArgs(tk) + args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") // Bash should be appended to allowed tools. foundBash := false @@ -202,7 +202,7 @@ func TestClaudeRunner_BuildArgs_PreambleBashNotDuplicated(t *testing.T) { }, } - args := r.buildArgs(tk) + args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") // Count Bash occurrences in --allowedTools values. bashCount := 0 diff --git a/internal/executor/executor.go b/internal/executor/executor.go index eb23c02..f6932f4 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -2,6 +2,7 @@ package executor import ( "context" + "errors" "fmt" "log/slog" "path/filepath" @@ -80,6 +81,94 @@ func (p *Pool) Results() <-chan *Result { return p.resultCh } +// SubmitResume re-queues a blocked task using the provided resume execution. +// The execution must have ResumeSessionID and ResumeAnswer set. +func (p *Pool) SubmitResume(ctx context.Context, t *task.Task, exec *storage.Execution) error { + p.mu.Lock() + if p.active >= p.maxConcurrent { + active := p.active + max := p.maxConcurrent + p.mu.Unlock() + return fmt.Errorf("executor pool at capacity (%d/%d)", active, max) + } + p.active++ + p.mu.Unlock() + + go p.executeResume(ctx, t, exec) + return nil +} + +func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Execution) { + defer func() { + p.mu.Lock() + p.active-- + p.mu.Unlock() + }() + + // Pre-populate log paths. + if lp, ok := p.runner.(LogPather); ok { + if logDir := lp.ExecLogDir(exec.ID); logDir != "" { + exec.StdoutPath = filepath.Join(logDir, "stdout.log") + exec.StderrPath = filepath.Join(logDir, "stderr.log") + exec.ArtifactDir = logDir + } + } + exec.StartTime = time.Now().UTC() + exec.Status = "RUNNING" + + if err := p.store.CreateExecution(exec); err != nil { + p.logger.Error("failed to create resume execution record", "error", err) + } + if err := p.store.UpdateTaskState(t.ID, task.StateRunning); err != nil { + p.logger.Error("failed to update task state", "error", err) + } + + var cancel context.CancelFunc + if t.Timeout.Duration > 0 { + ctx, cancel = context.WithTimeout(ctx, t.Timeout.Duration) + } else { + ctx, cancel = context.WithCancel(ctx) + } + defer cancel() + + err := p.runner.Run(ctx, t, exec) + exec.EndTime = time.Now().UTC() + + if err != nil { + var blockedErr *BlockedError + if errors.As(err, &blockedErr) { + exec.Status = "BLOCKED" + p.store.UpdateTaskState(t.ID, task.StateBlocked) + p.store.UpdateTaskQuestion(t.ID, blockedErr.QuestionJSON) + } else if ctx.Err() == context.DeadlineExceeded { + exec.Status = "TIMED_OUT" + exec.ErrorMsg = "execution timed out" + p.store.UpdateTaskState(t.ID, task.StateTimedOut) + } else if ctx.Err() == context.Canceled { + exec.Status = "CANCELLED" + exec.ErrorMsg = "execution cancelled" + p.store.UpdateTaskState(t.ID, task.StateCancelled) + } else { + exec.Status = "FAILED" + exec.ErrorMsg = err.Error() + p.store.UpdateTaskState(t.ID, task.StateFailed) + } + } else { + if t.ParentTaskID == "" { + exec.Status = "READY" + p.store.UpdateTaskState(t.ID, task.StateReady) + } else { + exec.Status = "COMPLETED" + p.store.UpdateTaskState(t.ID, task.StateCompleted) + } + } + + if updateErr := p.store.UpdateExecution(exec); updateErr != nil { + p.logger.Error("failed to update resume execution", "error", updateErr) + } + p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} +} + // ActiveCount returns the number of currently running tasks. func (p *Pool) ActiveCount() int { p.mu.Lock() @@ -155,7 +244,12 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { exec.EndTime = time.Now().UTC() if err != nil { - if ctx.Err() == context.DeadlineExceeded { + var blockedErr *BlockedError + if errors.As(err, &blockedErr) { + exec.Status = "BLOCKED" + p.store.UpdateTaskState(t.ID, task.StateBlocked) + p.store.UpdateTaskQuestion(t.ID, blockedErr.QuestionJSON) + } else if ctx.Err() == context.DeadlineExceeded { exec.Status = "TIMED_OUT" exec.ErrorMsg = "execution timed out" p.store.UpdateTaskState(t.ID, task.StateTimedOut) diff --git a/internal/executor/preamble.go b/internal/executor/preamble.go index cd1a2cc..8a2cce3 100644 --- a/internal/executor/preamble.go +++ b/internal/executor/preamble.go @@ -1,6 +1,23 @@ package executor -const planningPreamble = `## Planning Step (do this first) +const planningPreamble = `## Runtime Environment + +You are running as a background agent inside Claudomator. You cannot interact +with the user directly. However, if you need a decision or clarification: + +**To ask the user a question and pause:** +1. Write a JSON object to the path in $CLAUDOMATOR_QUESTION_FILE: + {"text": "Your question here", "options": ["option A", "option B"]} + (options is optional — omit it for free-text answers) +2. Exit immediately. Do not wait. The task will be resumed with the user's answer + as the next message in this conversation. + +Only ask a question when truly blocked. Prefer making a reasonable decision +and noting it in your output. + +--- + +## Planning Step (do this first) Before doing any implementation work: |
