summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPeter Stone <thepeterstone@gmail.com>2026-03-06 00:07:18 +0000
committerPeter Stone <thepeterstone@gmail.com>2026-03-06 00:07:18 +0000
commit7466b1751c4126735769a3304e1db80dab166a9e (patch)
treec5d0fe9d1018e62e3857480d471a0f6f8ebee104
parenta33211d0ad07f5aaf2d8bb51ba18e6790a153bb4 (diff)
feat: blocked task state for agent questions via session resume
When an agent needs user input it writes a question to $CLAUDOMATOR_QUESTION_FILE and exits. The runner detects the file and returns BlockedError; the pool transitions the task to BLOCKED and stores the question JSON on the task record. The user answers via POST /api/tasks/{id}/answer. The server looks up the claude session_id from the most recent execution and submits a resume execution (claude --resume <session-id> "<answer>"), freeing the executor slot entirely while waiting. Changes: - task: add StateBlocked, transitions RUNNING→BLOCKED, BLOCKED→QUEUED - storage: add session_id to executions, question_json to tasks; add GetLatestExecution and UpdateTaskQuestion methods - executor: BlockedError type; ClaudeRunner pre-assigns --session-id, sets CLAUDOMATOR_QUESTION_FILE env var, detects question file on exit; buildArgs handles --resume mode; Pool.SubmitResume for resume path - api: handleAnswerQuestion rewritten to create resume execution - preamble: add question protocol instructions for agents - web: BLOCKED state badge (indigo), question text + option buttons or free-text input with Submit on the task card footer Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
-rw-r--r--internal/api/server.go39
-rw-r--r--internal/api/server_test.go70
-rw-r--r--internal/executor/claude.go73
-rw-r--r--internal/executor/claude_test.go16
-rw-r--r--internal/executor/executor.go96
-rw-r--r--internal/executor/preamble.go19
-rw-r--r--internal/storage/db.go45
-rw-r--r--internal/storage/db_test.go68
-rw-r--r--internal/task/task.go5
-rw-r--r--internal/task/task_test.go2
-rwxr-xr-xscripts/deploy6
-rwxr-xr-xscripts/start-next-task2
-rw-r--r--web/app.js77
-rw-r--r--web/style.css44
14 files changed, 482 insertions, 80 deletions
diff --git a/internal/api/server.go b/internal/api/server.go
index bac98b6..5758347 100644
--- a/internal/api/server.go
+++ b/internal/api/server.go
@@ -112,31 +112,52 @@ func (s *Server) BroadcastQuestion(taskID, toolUseID string, questionData json.R
func (s *Server) handleAnswerQuestion(w http.ResponseWriter, r *http.Request) {
taskID := r.PathValue("id")
- if _, err := s.store.GetTask(taskID); err != nil {
+ tk, err := s.store.GetTask(taskID)
+ if err != nil {
writeJSON(w, http.StatusNotFound, map[string]string{"error": "task not found"})
return
}
+ if tk.State != task.StateBlocked {
+ writeJSON(w, http.StatusConflict, map[string]string{"error": "task is not blocked"})
+ return
+ }
var input struct {
- QuestionID string `json:"question_id"`
- Answer string `json:"answer"`
+ Answer string `json:"answer"`
}
if err := json.NewDecoder(r.Body).Decode(&input); err != nil {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid JSON: " + err.Error()})
return
}
- if input.QuestionID == "" {
- writeJSON(w, http.StatusBadRequest, map[string]string{"error": "question_id is required"})
+ if input.Answer == "" {
+ writeJSON(w, http.StatusBadRequest, map[string]string{"error": "answer is required"})
+ return
+ }
+
+ // Look up the session ID from the most recent execution.
+ latest, err := s.store.GetLatestExecution(taskID)
+ if err != nil || latest.SessionID == "" {
+ writeJSON(w, http.StatusInternalServerError, map[string]string{"error": "no resumable session found"})
return
}
- ok := s.pool.Questions.Answer(input.QuestionID, input.Answer)
- if !ok {
- writeJSON(w, http.StatusNotFound, map[string]string{"error": "no pending question with that ID"})
+ // Clear the question and transition to QUEUED.
+ s.store.UpdateTaskQuestion(taskID, "")
+ s.store.UpdateTaskState(taskID, task.StateQueued)
+
+ // Submit a resume execution.
+ resumeExec := &storage.Execution{
+ ID: uuid.New().String(),
+ TaskID: taskID,
+ ResumeSessionID: latest.SessionID,
+ ResumeAnswer: input.Answer,
+ }
+ if err := s.pool.SubmitResume(r.Context(), tk, resumeExec); err != nil {
+ writeJSON(w, http.StatusServiceUnavailable, map[string]string{"error": err.Error()})
return
}
- writeJSON(w, http.StatusOK, map[string]string{"status": "delivered"})
+ writeJSON(w, http.StatusOK, map[string]string{"status": "queued"})
}
func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
diff --git a/internal/api/server_test.go b/internal/api/server_test.go
index af93a77..2325b0b 100644
--- a/internal/api/server_test.go
+++ b/internal/api/server_test.go
@@ -339,8 +339,7 @@ func TestCORS_Headers(t *testing.T) {
func TestAnswerQuestion_NoTask_Returns404(t *testing.T) {
srv, _ := testServer(t)
- payload := `{"question_id": "toolu_abc", "answer": "blue"}`
- req := httptest.NewRequest("POST", "/api/tasks/nonexistent/answer", bytes.NewBufferString(payload))
+ req := httptest.NewRequest("POST", "/api/tasks/nonexistent/answer", bytes.NewBufferString(`{"answer":"blue"}`))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
@@ -351,64 +350,65 @@ func TestAnswerQuestion_NoTask_Returns404(t *testing.T) {
}
}
-func TestAnswerQuestion_NoPendingQuestion_Returns404(t *testing.T) {
+func TestAnswerQuestion_TaskNotBlocked_Returns409(t *testing.T) {
srv, store := testServer(t)
createTaskWithState(t, store, "answer-task-1", task.StatePending)
- payload := `{"question_id": "toolu_nonexistent", "answer": "blue"}`
- req := httptest.NewRequest("POST", "/api/tasks/answer-task-1/answer", bytes.NewBufferString(payload))
+ req := httptest.NewRequest("POST", "/api/tasks/answer-task-1/answer", bytes.NewBufferString(`{"answer":"blue"}`))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
srv.Handler().ServeHTTP(w, req)
- if w.Code != http.StatusNotFound {
- t.Errorf("status: want 404, got %d; body: %s", w.Code, w.Body.String())
- }
- var body map[string]string
- json.NewDecoder(w.Body).Decode(&body)
- if body["error"] != "no pending question with that ID" {
- t.Errorf("error: want 'no pending question with that ID', got %q", body["error"])
+ if w.Code != http.StatusConflict {
+ t.Errorf("status: want 409, got %d; body: %s", w.Code, w.Body.String())
}
}
-func TestAnswerQuestion_WithPendingQuestion_Returns200(t *testing.T) {
+func TestAnswerQuestion_MissingAnswer_Returns400(t *testing.T) {
srv, store := testServer(t)
- createTaskWithState(t, store, "answer-task-2", task.StateRunning)
-
- ch := srv.pool.Questions.Register("answer-task-2", "toolu_Q1", []byte(`{}`))
+ createTaskWithState(t, store, "answer-task-2", task.StateBlocked)
- go func() {
- payload := `{"question_id": "toolu_Q1", "answer": "red"}`
- req := httptest.NewRequest("POST", "/api/tasks/answer-task-2/answer", bytes.NewBufferString(payload))
- req.Header.Set("Content-Type", "application/json")
- w := httptest.NewRecorder()
- srv.Handler().ServeHTTP(w, req)
+ req := httptest.NewRequest("POST", "/api/tasks/answer-task-2/answer", bytes.NewBufferString(`{}`))
+ req.Header.Set("Content-Type", "application/json")
+ w := httptest.NewRecorder()
- if w.Code != http.StatusOK {
- t.Errorf("status: want 200, got %d; body: %s", w.Code, w.Body.String())
- }
- }()
+ srv.Handler().ServeHTTP(w, req)
- answer := <-ch
- if answer != "red" {
- t.Errorf("answer: want 'red', got %q", answer)
+ if w.Code != http.StatusBadRequest {
+ t.Errorf("status: want 400, got %d; body: %s", w.Code, w.Body.String())
}
}
-func TestAnswerQuestion_MissingQuestionID_Returns400(t *testing.T) {
+func TestAnswerQuestion_BlockedTask_QueuesResume(t *testing.T) {
srv, store := testServer(t)
- createTaskWithState(t, store, "answer-task-3", task.StateRunning)
+ createTaskWithState(t, store, "answer-task-3", task.StateBlocked)
+
+ // Create an execution with a session ID, as the runner would have.
+ exec := &storage.Execution{
+ ID: "exec-blocked-1",
+ TaskID: "answer-task-3",
+ SessionID: "550e8400-e29b-41d4-a716-446655440001",
+ Status: "BLOCKED",
+ }
+ if err := store.CreateExecution(exec); err != nil {
+ t.Fatalf("create execution: %v", err)
+ }
- payload := `{"answer": "blue"}`
- req := httptest.NewRequest("POST", "/api/tasks/answer-task-3/answer", bytes.NewBufferString(payload))
+ req := httptest.NewRequest("POST", "/api/tasks/answer-task-3/answer", bytes.NewBufferString(`{"answer":"main"}`))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
srv.Handler().ServeHTTP(w, req)
- if w.Code != http.StatusBadRequest {
- t.Errorf("status: want 400, got %d", w.Code)
+ if w.Code != http.StatusOK {
+ t.Errorf("status: want 200, got %d; body: %s", w.Code, w.Body.String())
+ }
+
+ // Task should now be QUEUED (or RUNNING since the mock runner is instant).
+ got, _ := store.GetTask("answer-task-3")
+ if got.State != task.StateQueued && got.State != task.StateRunning && got.State != task.StateReady {
+ t.Errorf("task state: want QUEUED/RUNNING/READY after answer, got %v", got.State)
}
}
diff --git a/internal/executor/claude.go b/internal/executor/claude.go
index d3f5751..b97f202 100644
--- a/internal/executor/claude.go
+++ b/internal/executor/claude.go
@@ -27,6 +27,15 @@ type ClaudeRunner struct {
APIURL string // base URL of the Claudomator API, passed to subprocesses
}
+// BlockedError is returned by Run when the agent wrote a question file and exited.
+// The pool transitions the task to BLOCKED and stores the question for the user.
+type BlockedError struct {
+ QuestionJSON string // raw JSON from the question file
+ SessionID string // claude session to resume once the user answers
+}
+
+func (e *BlockedError) Error() string { return fmt.Sprintf("task blocked: %s", e.QuestionJSON) }
+
// ExecLogDir returns the log directory for the given execution ID.
// Implements LogPather so the pool can persist paths before execution starts.
func (r *ClaudeRunner) ExecLogDir(execID string) string {
@@ -45,9 +54,8 @@ func (r *ClaudeRunner) binaryPath() string {
// Run executes a claude -p invocation, streaming output to log files.
// It retries up to 3 times on rate-limit errors using exponential backoff.
+// If the agent writes a question file and exits, Run returns *BlockedError.
func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error {
- args := r.buildArgs(t)
-
if t.Claude.WorkingDir != "" {
if _, err := os.Stat(t.Claude.WorkingDir); err != nil {
return fmt.Errorf("working_dir %q: %w", t.Claude.WorkingDir, err)
@@ -55,11 +63,9 @@ func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Executi
}
// Setup log directory once; retries overwrite the log files.
- // Use pre-set paths if the pool already populated them via LogPather;
- // otherwise fall back to computing from LogDir + execID.
logDir := r.ExecLogDir(e.ID)
if logDir == "" {
- logDir = e.ID // fallback: use execID as relative dir (tests without LogDir set)
+ logDir = e.ID // fallback for tests without LogDir set
}
if err := os.MkdirAll(logDir, 0700); err != nil {
return fmt.Errorf("creating log dir: %w", err)
@@ -70,8 +76,17 @@ func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Executi
e.ArtifactDir = logDir
}
+ // Pre-assign session ID so we can resume after a BLOCKED state.
+ // If this is a resume execution the session ID is already set.
+ if e.SessionID == "" {
+ e.SessionID = e.ID // reuse execution UUID as session UUID (both are UUIDs)
+ }
+
+ questionFile := filepath.Join(logDir, "question.json")
+ args := r.buildArgs(t, e, questionFile)
+
attempt := 0
- return runWithBackoff(ctx, 3, 5*time.Second, func() error {
+ err := runWithBackoff(ctx, 3, 5*time.Second, func() error {
if attempt > 0 {
delay := 5 * time.Second * (1 << (attempt - 1))
r.Logger.Warn("rate-limited by Claude API, retrying",
@@ -80,22 +95,34 @@ func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Executi
)
}
attempt++
- return r.execOnce(ctx, t, args, e)
+ return r.execOnce(ctx, args, t.Claude.WorkingDir, e)
})
+ if err != nil {
+ return err
+ }
+
+ // Check whether the agent left a question before exiting.
+ data, readErr := os.ReadFile(questionFile)
+ if readErr == nil {
+ os.Remove(questionFile) // consumed
+ return &BlockedError{QuestionJSON: strings.TrimSpace(string(data)), SessionID: e.SessionID}
+ }
+ return nil
}
// execOnce runs the claude subprocess once, streaming output to e's log paths.
-func (r *ClaudeRunner) execOnce(ctx context.Context, t *task.Task, args []string, e *storage.Execution) error {
+func (r *ClaudeRunner) execOnce(ctx context.Context, args []string, workingDir string, e *storage.Execution) error {
cmd := exec.CommandContext(ctx, r.binaryPath(), args...)
cmd.Env = append(os.Environ(),
"CLAUDOMATOR_API_URL="+r.APIURL,
- "CLAUDOMATOR_TASK_ID="+t.ID,
+ "CLAUDOMATOR_TASK_ID="+e.TaskID,
+ "CLAUDOMATOR_QUESTION_FILE="+filepath.Join(e.ArtifactDir, "question.json"),
)
// Put the subprocess in its own process group so we can SIGKILL the entire
// group (MCP servers, bash children, etc.) on cancellation.
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
- if t.Claude.WorkingDir != "" {
- cmd.Dir = t.Claude.WorkingDir
+ if workingDir != "" {
+ cmd.Dir = workingDir
}
stdoutFile, err := os.Create(e.StdoutPath)
@@ -172,13 +199,32 @@ func (r *ClaudeRunner) execOnce(ctx context.Context, t *task.Task, args []string
return nil
}
-func (r *ClaudeRunner) buildArgs(t *task.Task) []string {
+func (r *ClaudeRunner) buildArgs(t *task.Task, e *storage.Execution, questionFile string) []string {
+ // Resume execution: the agent already has context; just deliver the answer.
+ if e.ResumeSessionID != "" {
+ args := []string{
+ "-p", e.ResumeAnswer,
+ "--resume", e.ResumeSessionID,
+ "--output-format", "stream-json",
+ "--verbose",
+ }
+ permMode := t.Claude.PermissionMode
+ if permMode == "" {
+ permMode = "bypassPermissions"
+ }
+ args = append(args, "--permission-mode", permMode)
+ if t.Claude.Model != "" {
+ args = append(args, "--model", t.Claude.Model)
+ }
+ return args
+ }
+
instructions := t.Claude.Instructions
allowedTools := t.Claude.AllowedTools
if !t.Claude.SkipPlanning {
instructions = withPlanningPreamble(instructions)
- // Ensure Bash is available so the agent can POST subtasks.
+ // Ensure Bash is available so the agent can POST subtasks and ask questions.
hasBash := false
for _, tool := range allowedTools {
if tool == "Bash" {
@@ -193,6 +239,7 @@ func (r *ClaudeRunner) buildArgs(t *task.Task) []string {
args := []string{
"-p", instructions,
+ "--session-id", e.SessionID,
"--output-format", "stream-json",
"--verbose",
}
diff --git a/internal/executor/claude_test.go b/internal/executor/claude_test.go
index edf13b5..056c7e8 100644
--- a/internal/executor/claude_test.go
+++ b/internal/executor/claude_test.go
@@ -21,7 +21,7 @@ func TestClaudeRunner_BuildArgs_BasicTask(t *testing.T) {
},
}
- args := r.buildArgs(tk)
+ args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
argMap := make(map[string]bool)
for _, a := range args {
@@ -51,7 +51,7 @@ func TestClaudeRunner_BuildArgs_FullConfig(t *testing.T) {
},
}
- args := r.buildArgs(tk)
+ args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
// Check key args are present.
argMap := make(map[string]bool)
@@ -86,7 +86,7 @@ func TestClaudeRunner_BuildArgs_DefaultsToBypassPermissions(t *testing.T) {
},
}
- args := r.buildArgs(tk)
+ args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
found := false
for i, a := range args {
@@ -109,7 +109,7 @@ func TestClaudeRunner_BuildArgs_RespectsExplicitPermissionMode(t *testing.T) {
},
}
- args := r.buildArgs(tk)
+ args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
for i, a := range args {
if a == "--permission-mode" && i+1 < len(args) {
@@ -131,7 +131,7 @@ func TestClaudeRunner_BuildArgs_AlwaysIncludesVerbose(t *testing.T) {
},
}
- args := r.buildArgs(tk)
+ args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
found := false
for _, a := range args {
@@ -154,7 +154,7 @@ func TestClaudeRunner_BuildArgs_PreamblePrepended(t *testing.T) {
},
}
- args := r.buildArgs(tk)
+ args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
// The -p value should start with the preamble and end with the original instructions.
if len(args) < 2 || args[0] != "-p" {
@@ -178,7 +178,7 @@ func TestClaudeRunner_BuildArgs_PreambleAddsBash(t *testing.T) {
},
}
- args := r.buildArgs(tk)
+ args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
// Bash should be appended to allowed tools.
foundBash := false
@@ -202,7 +202,7 @@ func TestClaudeRunner_BuildArgs_PreambleBashNotDuplicated(t *testing.T) {
},
}
- args := r.buildArgs(tk)
+ args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
// Count Bash occurrences in --allowedTools values.
bashCount := 0
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index eb23c02..f6932f4 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -2,6 +2,7 @@ package executor
import (
"context"
+ "errors"
"fmt"
"log/slog"
"path/filepath"
@@ -80,6 +81,94 @@ func (p *Pool) Results() <-chan *Result {
return p.resultCh
}
+// SubmitResume re-queues a blocked task using the provided resume execution.
+// The execution must have ResumeSessionID and ResumeAnswer set.
+func (p *Pool) SubmitResume(ctx context.Context, t *task.Task, exec *storage.Execution) error {
+ p.mu.Lock()
+ if p.active >= p.maxConcurrent {
+ active := p.active
+ max := p.maxConcurrent
+ p.mu.Unlock()
+ return fmt.Errorf("executor pool at capacity (%d/%d)", active, max)
+ }
+ p.active++
+ p.mu.Unlock()
+
+ go p.executeResume(ctx, t, exec)
+ return nil
+}
+
+func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Execution) {
+ defer func() {
+ p.mu.Lock()
+ p.active--
+ p.mu.Unlock()
+ }()
+
+ // Pre-populate log paths.
+ if lp, ok := p.runner.(LogPather); ok {
+ if logDir := lp.ExecLogDir(exec.ID); logDir != "" {
+ exec.StdoutPath = filepath.Join(logDir, "stdout.log")
+ exec.StderrPath = filepath.Join(logDir, "stderr.log")
+ exec.ArtifactDir = logDir
+ }
+ }
+ exec.StartTime = time.Now().UTC()
+ exec.Status = "RUNNING"
+
+ if err := p.store.CreateExecution(exec); err != nil {
+ p.logger.Error("failed to create resume execution record", "error", err)
+ }
+ if err := p.store.UpdateTaskState(t.ID, task.StateRunning); err != nil {
+ p.logger.Error("failed to update task state", "error", err)
+ }
+
+ var cancel context.CancelFunc
+ if t.Timeout.Duration > 0 {
+ ctx, cancel = context.WithTimeout(ctx, t.Timeout.Duration)
+ } else {
+ ctx, cancel = context.WithCancel(ctx)
+ }
+ defer cancel()
+
+ err := p.runner.Run(ctx, t, exec)
+ exec.EndTime = time.Now().UTC()
+
+ if err != nil {
+ var blockedErr *BlockedError
+ if errors.As(err, &blockedErr) {
+ exec.Status = "BLOCKED"
+ p.store.UpdateTaskState(t.ID, task.StateBlocked)
+ p.store.UpdateTaskQuestion(t.ID, blockedErr.QuestionJSON)
+ } else if ctx.Err() == context.DeadlineExceeded {
+ exec.Status = "TIMED_OUT"
+ exec.ErrorMsg = "execution timed out"
+ p.store.UpdateTaskState(t.ID, task.StateTimedOut)
+ } else if ctx.Err() == context.Canceled {
+ exec.Status = "CANCELLED"
+ exec.ErrorMsg = "execution cancelled"
+ p.store.UpdateTaskState(t.ID, task.StateCancelled)
+ } else {
+ exec.Status = "FAILED"
+ exec.ErrorMsg = err.Error()
+ p.store.UpdateTaskState(t.ID, task.StateFailed)
+ }
+ } else {
+ if t.ParentTaskID == "" {
+ exec.Status = "READY"
+ p.store.UpdateTaskState(t.ID, task.StateReady)
+ } else {
+ exec.Status = "COMPLETED"
+ p.store.UpdateTaskState(t.ID, task.StateCompleted)
+ }
+ }
+
+ if updateErr := p.store.UpdateExecution(exec); updateErr != nil {
+ p.logger.Error("failed to update resume execution", "error", updateErr)
+ }
+ p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err}
+}
+
// ActiveCount returns the number of currently running tasks.
func (p *Pool) ActiveCount() int {
p.mu.Lock()
@@ -155,7 +244,12 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
exec.EndTime = time.Now().UTC()
if err != nil {
- if ctx.Err() == context.DeadlineExceeded {
+ var blockedErr *BlockedError
+ if errors.As(err, &blockedErr) {
+ exec.Status = "BLOCKED"
+ p.store.UpdateTaskState(t.ID, task.StateBlocked)
+ p.store.UpdateTaskQuestion(t.ID, blockedErr.QuestionJSON)
+ } else if ctx.Err() == context.DeadlineExceeded {
exec.Status = "TIMED_OUT"
exec.ErrorMsg = "execution timed out"
p.store.UpdateTaskState(t.ID, task.StateTimedOut)
diff --git a/internal/executor/preamble.go b/internal/executor/preamble.go
index cd1a2cc..8a2cce3 100644
--- a/internal/executor/preamble.go
+++ b/internal/executor/preamble.go
@@ -1,6 +1,23 @@
package executor
-const planningPreamble = `## Planning Step (do this first)
+const planningPreamble = `## Runtime Environment
+
+You are running as a background agent inside Claudomator. You cannot interact
+with the user directly. However, if you need a decision or clarification:
+
+**To ask the user a question and pause:**
+1. Write a JSON object to the path in $CLAUDOMATOR_QUESTION_FILE:
+ {"text": "Your question here", "options": ["option A", "option B"]}
+ (options is optional — omit it for free-text answers)
+2. Exit immediately. Do not wait. The task will be resumed with the user's answer
+ as the next message in this conversation.
+
+Only ask a question when truly blocked. Prefer making a reasonable decision
+and noting it in your output.
+
+---
+
+## Planning Step (do this first)
Before doing any implementation work:
diff --git a/internal/storage/db.go b/internal/storage/db.go
index e656f98..0e4d6f1 100644
--- a/internal/storage/db.go
+++ b/internal/storage/db.go
@@ -88,6 +88,8 @@ func (s *DB) migrate() error {
migrations := []string{
`ALTER TABLE tasks ADD COLUMN parent_task_id TEXT`,
`ALTER TABLE tasks ADD COLUMN rejection_comment TEXT`,
+ `ALTER TABLE tasks ADD COLUMN question_json TEXT`,
+ `ALTER TABLE executions ADD COLUMN session_id TEXT`,
}
for _, m := range migrations {
if _, err := s.db.Exec(m); err != nil {
@@ -136,13 +138,13 @@ func (s *DB) CreateTask(t *task.Task) error {
// GetTask retrieves a task by ID.
func (s *DB) GetTask(id string) (*task.Task, error) {
- row := s.db.QueryRow(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment FROM tasks WHERE id = ?`, id)
+ row := s.db.QueryRow(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json FROM tasks WHERE id = ?`, id)
return scanTask(row)
}
// ListTasks returns tasks matching the given filter.
func (s *DB) ListTasks(filter TaskFilter) ([]*task.Task, error) {
- query := `SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment FROM tasks WHERE 1=1`
+ query := `SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json FROM tasks WHERE 1=1`
var args []interface{}
if filter.State != "" {
@@ -174,7 +176,7 @@ func (s *DB) ListTasks(filter TaskFilter) ([]*task.Task, error) {
// ListSubtasks returns all tasks whose parent_task_id matches the given ID.
func (s *DB) ListSubtasks(parentID string) ([]*task.Task, error) {
- rows, err := s.db.Query(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment FROM tasks WHERE parent_task_id = ? ORDER BY created_at ASC`, parentID)
+ rows, err := s.db.Query(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json FROM tasks WHERE parent_task_id = ? ORDER BY created_at ASC`, parentID)
if err != nil {
return nil, err
}
@@ -308,28 +310,33 @@ type Execution struct {
ArtifactDir string
CostUSD float64
ErrorMsg string
+ SessionID string // claude --session-id; persisted for resume
+
+ // In-memory only: set when creating a resume execution, not stored in DB.
+ ResumeSessionID string
+ ResumeAnswer string
}
// CreateExecution inserts an execution record.
func (s *DB) CreateExecution(e *Execution) error {
_, err := s.db.Exec(`
- INSERT INTO executions (id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
+ INSERT INTO executions (id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
e.ID, e.TaskID, e.StartTime.UTC(), e.EndTime.UTC(), e.ExitCode, e.Status,
- e.StdoutPath, e.StderrPath, e.ArtifactDir, e.CostUSD, e.ErrorMsg,
+ e.StdoutPath, e.StderrPath, e.ArtifactDir, e.CostUSD, e.ErrorMsg, e.SessionID,
)
return err
}
// GetExecution retrieves an execution by ID.
func (s *DB) GetExecution(id string) (*Execution, error) {
- row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg FROM executions WHERE id = ?`, id)
+ row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id FROM executions WHERE id = ?`, id)
return scanExecution(row)
}
// ListExecutions returns executions for a task.
func (s *DB) ListExecutions(taskID string) ([]*Execution, error) {
- rows, err := s.db.Query(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg FROM executions WHERE task_id = ? ORDER BY start_time DESC`, taskID)
+ rows, err := s.db.Query(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id FROM executions WHERE task_id = ? ORDER BY start_time DESC`, taskID)
if err != nil {
return nil, err
}
@@ -346,6 +353,20 @@ func (s *DB) ListExecutions(taskID string) ([]*Execution, error) {
return execs, rows.Err()
}
+// GetLatestExecution returns the most recent execution for a task.
+func (s *DB) GetLatestExecution(taskID string) (*Execution, error) {
+ row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id FROM executions WHERE task_id = ? ORDER BY start_time DESC LIMIT 1`, taskID)
+ return scanExecution(row)
+}
+
+// UpdateTaskQuestion stores the pending question JSON on a task.
+// Pass empty string to clear the question after it has been answered.
+func (s *DB) UpdateTaskQuestion(taskID, questionJSON string) error {
+ _, err := s.db.Exec(`UPDATE tasks SET question_json = ?, updated_at = ? WHERE id = ?`,
+ questionJSON, time.Now().UTC(), taskID)
+ return err
+}
+
// UpdateExecution updates a completed execution.
func (s *DB) UpdateExecution(e *Execution) error {
_, err := s.db.Exec(`
@@ -374,10 +395,12 @@ func scanTask(row scanner) (*task.Task, error) {
timeoutNS int64
parentTaskID sql.NullString
rejectionComment sql.NullString
+ questionJSON sql.NullString
)
- err := row.Scan(&t.ID, &t.Name, &t.Description, &configJSON, &priority, &timeoutNS, &retryJSON, &tagsJSON, &depsJSON, &parentTaskID, &state, &t.CreatedAt, &t.UpdatedAt, &rejectionComment)
+ err := row.Scan(&t.ID, &t.Name, &t.Description, &configJSON, &priority, &timeoutNS, &retryJSON, &tagsJSON, &depsJSON, &parentTaskID, &state, &t.CreatedAt, &t.UpdatedAt, &rejectionComment, &questionJSON)
t.ParentTaskID = parentTaskID.String
t.RejectionComment = rejectionComment.String
+ t.QuestionJSON = questionJSON.String
if err != nil {
return nil, err
}
@@ -405,11 +428,13 @@ func scanTaskRows(rows *sql.Rows) (*task.Task, error) {
func scanExecution(row scanner) (*Execution, error) {
var e Execution
+ var sessionID sql.NullString
err := row.Scan(&e.ID, &e.TaskID, &e.StartTime, &e.EndTime, &e.ExitCode, &e.Status,
- &e.StdoutPath, &e.StderrPath, &e.ArtifactDir, &e.CostUSD, &e.ErrorMsg)
+ &e.StdoutPath, &e.StderrPath, &e.ArtifactDir, &e.CostUSD, &e.ErrorMsg, &sessionID)
if err != nil {
return nil, err
}
+ e.SessionID = sessionID.String
return &e, nil
}
diff --git a/internal/storage/db_test.go b/internal/storage/db_test.go
index 4f9069a..395574c 100644
--- a/internal/storage/db_test.go
+++ b/internal/storage/db_test.go
@@ -421,3 +421,71 @@ func TestUpdateExecution(t *testing.T) {
t.Errorf("artifact_dir: want /tmp/exec, got %q", got.ArtifactDir)
}
}
+
+func makeTestTask(id string, now time.Time) *task.Task {
+ return &task.Task{
+ ID: id, Name: "T-" + id, Claude: task.ClaudeConfig{Instructions: "x"},
+ Priority: task.PriorityNormal, Retry: task.RetryConfig{MaxAttempts: 1, Backoff: "linear"},
+ Tags: []string{}, DependsOn: []string{},
+ State: task.StatePending, CreatedAt: now, UpdatedAt: now,
+ }
+}
+
+func TestStorage_SessionID_RoundTrip(t *testing.T) {
+ db := testDB(t)
+ now := time.Now().UTC()
+ db.CreateTask(makeTestTask("sid-task", now))
+
+ exec := &Execution{
+ ID: "sid-exec", TaskID: "sid-task", StartTime: now, Status: "RUNNING",
+ SessionID: "550e8400-e29b-41d4-a716-446655440000",
+ }
+ if err := db.CreateExecution(exec); err != nil {
+ t.Fatalf("create: %v", err)
+ }
+
+ got, err := db.GetExecution("sid-exec")
+ if err != nil {
+ t.Fatalf("get: %v", err)
+ }
+ if got.SessionID != exec.SessionID {
+ t.Errorf("session_id: want %q, got %q", exec.SessionID, got.SessionID)
+ }
+}
+
+func TestStorage_UpdateTaskQuestion(t *testing.T) {
+ db := testDB(t)
+ now := time.Now().UTC()
+ tk := makeTestTask("q-task", now)
+ db.CreateTask(tk)
+
+ q := `{"text":"Which branch?","options":["main","develop"]}`
+ if err := db.UpdateTaskQuestion("q-task", q); err != nil {
+ t.Fatalf("update question: %v", err)
+ }
+
+ got, err := db.GetTask("q-task")
+ if err != nil {
+ t.Fatalf("get: %v", err)
+ }
+ if got.QuestionJSON != q {
+ t.Errorf("question_json: want %q, got %q", q, got.QuestionJSON)
+ }
+}
+
+func TestStorage_GetLatestExecution(t *testing.T) {
+ db := testDB(t)
+ now := time.Now().UTC()
+ db.CreateTask(makeTestTask("le-task", now))
+
+ db.CreateExecution(&Execution{ID: "le-1", TaskID: "le-task", StartTime: now, Status: "COMPLETED"})
+ db.CreateExecution(&Execution{ID: "le-2", TaskID: "le-task", StartTime: now.Add(time.Minute), Status: "RUNNING"})
+
+ got, err := db.GetLatestExecution("le-task")
+ if err != nil {
+ t.Fatalf("get latest: %v", err)
+ }
+ if got.ID != "le-2" {
+ t.Errorf("want le-2, got %q", got.ID)
+ }
+}
diff --git a/internal/task/task.go b/internal/task/task.go
index 3e74a82..587993f 100644
--- a/internal/task/task.go
+++ b/internal/task/task.go
@@ -14,6 +14,7 @@ const (
StateTimedOut State = "TIMED_OUT"
StateCancelled State = "CANCELLED"
StateBudgetExceeded State = "BUDGET_EXCEEDED"
+ StateBlocked State = "BLOCKED"
)
type Priority string
@@ -56,6 +57,7 @@ type Task struct {
DependsOn []string `yaml:"depends_on" json:"depends_on"`
State State `yaml:"-" json:"state"`
RejectionComment string `yaml:"-" json:"rejection_comment,omitempty"`
+ QuestionJSON string `yaml:"-" json:"question,omitempty"`
CreatedAt time.Time `yaml:"-" json:"created_at"`
UpdatedAt time.Time `yaml:"-" json:"updated_at"`
}
@@ -92,10 +94,11 @@ func ValidTransition(from, to State) bool {
transitions := map[State][]State{
StatePending: {StateQueued, StateCancelled},
StateQueued: {StateRunning, StateCancelled},
- StateRunning: {StateReady, StateCompleted, StateFailed, StateTimedOut, StateCancelled, StateBudgetExceeded},
+ StateRunning: {StateReady, StateCompleted, StateFailed, StateTimedOut, StateCancelled, StateBudgetExceeded, StateBlocked},
StateReady: {StateCompleted, StatePending},
StateFailed: {StateQueued}, // retry
StateTimedOut: {StateQueued}, // retry
+ StateBlocked: {StateQueued}, // answer received → re-queue as resume execution
}
for _, allowed := range transitions[from] {
if allowed == to {
diff --git a/internal/task/task_test.go b/internal/task/task_test.go
index 6498271..5d997ac 100644
--- a/internal/task/task_test.go
+++ b/internal/task/task_test.go
@@ -22,6 +22,8 @@ func TestValidTransition_AllowedTransitions(t *testing.T) {
{"running to budget exceeded", StateRunning, StateBudgetExceeded},
{"failed to queued (retry)", StateFailed, StateQueued},
{"timed out to queued (retry)", StateTimedOut, StateQueued},
+ {"running to blocked (question)", StateRunning, StateBlocked},
+ {"blocked to queued (answer resume)", StateBlocked, StateQueued},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
diff --git a/scripts/deploy b/scripts/deploy
index 4722dab..0b8e162 100755
--- a/scripts/deploy
+++ b/scripts/deploy
@@ -18,6 +18,12 @@ export GOPATH="${SITE_DIR}/cache/gopath"
mkdir -p "${GOCACHE}" "${GOPATH}"
go build -o "${BIN_DIR}/claudomator" ./cmd/claudomator/
+echo "==> Copying scripts..."
+mkdir -p "${SITE_DIR}/scripts"
+cp "${REPO_DIR}/scripts/"* "${SITE_DIR}/scripts/"
+chown -R www-data:www-data "${SITE_DIR}/scripts"
+chmod +x "${SITE_DIR}/scripts/"*
+
echo "==> Fixing permissions..."
chown www-data:www-data "${BIN_DIR}/claudomator"
chmod +x "${BIN_DIR}/claudomator"
diff --git a/scripts/start-next-task b/scripts/start-next-task
index 1e45eb3..da019b6 100755
--- a/scripts/start-next-task
+++ b/scripts/start-next-task
@@ -12,4 +12,4 @@ if [[ -z "$task_id" ]]; then
exit 0
fi
-echo claudomator start "$task_id"
+claudomator start "$task_id"
diff --git a/web/app.js b/web/app.js
index 20d7a03..271a18e 100644
--- a/web/app.js
+++ b/web/app.js
@@ -71,7 +71,7 @@ function createTaskCard(task) {
// Footer: action buttons based on state
const RESTART_STATES = new Set(['FAILED', 'TIMED_OUT', 'CANCELLED']);
- if (task.state === 'PENDING' || task.state === 'RUNNING' || task.state === 'READY' || RESTART_STATES.has(task.state)) {
+ if (task.state === 'PENDING' || task.state === 'RUNNING' || task.state === 'READY' || task.state === 'BLOCKED' || RESTART_STATES.has(task.state)) {
const footer = document.createElement('div');
footer.className = 'task-card-footer';
@@ -110,6 +110,8 @@ function createTaskCard(task) {
});
footer.appendChild(acceptBtn);
footer.appendChild(rejectBtn);
+ } else if (task.state === 'BLOCKED') {
+ renderQuestionFooter(task, footer);
} else if (RESTART_STATES.has(task.state)) {
const btn = document.createElement('button');
btn.className = 'btn-restart';
@@ -312,6 +314,79 @@ async function restartTask(taskId) {
return res.json();
}
+function renderQuestionFooter(task, footer) {
+ let question = { text: 'Waiting for your input.', options: [] };
+ if (task.question) {
+ try { question = JSON.parse(task.question); } catch {}
+ }
+
+ const questionEl = document.createElement('p');
+ questionEl.className = 'task-question-text';
+ questionEl.textContent = question.text;
+ footer.appendChild(questionEl);
+
+ if (question.options && question.options.length > 0) {
+ question.options.forEach(opt => {
+ const btn = document.createElement('button');
+ btn.className = 'btn-answer';
+ btn.textContent = opt;
+ btn.addEventListener('click', (e) => {
+ e.stopPropagation();
+ handleAnswer(task.id, opt, footer);
+ });
+ footer.appendChild(btn);
+ });
+ } else {
+ const row = document.createElement('div');
+ row.className = 'task-answer-row';
+ const input = document.createElement('input');
+ input.type = 'text';
+ input.className = 'task-answer-input';
+ input.placeholder = 'Your answer…';
+ const btn = document.createElement('button');
+ btn.className = 'btn-answer';
+ btn.textContent = 'Submit';
+ btn.addEventListener('click', (e) => {
+ e.stopPropagation();
+ if (input.value.trim()) handleAnswer(task.id, input.value.trim(), footer);
+ });
+ input.addEventListener('keydown', (e) => {
+ if (e.key === 'Enter' && input.value.trim()) {
+ e.stopPropagation();
+ handleAnswer(task.id, input.value.trim(), footer);
+ }
+ });
+ row.append(input, btn);
+ footer.appendChild(row);
+ }
+}
+
+async function handleAnswer(taskId, answer, footer) {
+ const btns = footer.querySelectorAll('button, input');
+ btns.forEach(el => { el.disabled = true; });
+ const prev = footer.querySelector('.task-error');
+ if (prev) prev.remove();
+
+ try {
+ const res = await fetch(`${API_BASE}/api/tasks/${taskId}/answer`, {
+ method: 'POST',
+ headers: { 'Content-Type': 'application/json' },
+ body: JSON.stringify({ answer }),
+ });
+ if (!res.ok) {
+ const body = await res.json().catch(() => ({}));
+ throw new Error(body.error || `HTTP ${res.status}`);
+ }
+ await poll();
+ } catch (err) {
+ btns.forEach(el => { el.disabled = false; });
+ const errEl = document.createElement('span');
+ errEl.className = 'task-error';
+ errEl.textContent = `Failed: ${err.message}`;
+ footer.appendChild(errEl);
+ }
+}
+
async function handleCancel(taskId, btn, footer) {
btn.disabled = true;
btn.textContent = 'Cancelling…';
diff --git a/web/style.css b/web/style.css
index 1478b36..91466ee 100644
--- a/web/style.css
+++ b/web/style.css
@@ -8,6 +8,7 @@
--state-timed-out: #c084fc;
--state-cancelled: #9ca3af;
--state-budget-exceeded: #fb923c;
+ --state-blocked: #818cf8;
--bg: #0f172a;
--surface: #1e293b;
@@ -181,6 +182,7 @@ main {
.state-badge[data-state="TIMED_OUT"] { background: var(--state-timed-out); }
.state-badge[data-state="CANCELLED"] { background: var(--state-cancelled); }
.state-badge[data-state="BUDGET_EXCEEDED"] { background: var(--state-budget-exceeded); }
+.state-badge[data-state="BLOCKED"] { background: var(--state-blocked); }
/* Task meta */
.task-meta {
@@ -294,6 +296,48 @@ main {
cursor: not-allowed;
}
+.task-question-text {
+ font-size: 0.82rem;
+ color: var(--text);
+ margin: 0 0 0.5rem 0;
+ line-height: 1.4;
+ width: 100%;
+}
+
+.task-answer-row {
+ display: flex;
+ gap: 0.375rem;
+ width: 100%;
+}
+
+.task-answer-input {
+ flex: 1;
+ font-size: 0.8rem;
+ padding: 0.3em 0.6em;
+ border-radius: 0.375rem;
+ border: 1px solid var(--border);
+ background: var(--bg);
+ color: var(--text);
+}
+
+.btn-answer {
+ font-size: 0.8rem;
+ font-weight: 600;
+ padding: 0.35em 0.85em;
+ border-radius: 0.375rem;
+ border: none;
+ cursor: pointer;
+ background: var(--state-blocked);
+ color: #0f172a;
+ transition: opacity 0.15s;
+ margin-right: 0.375rem;
+}
+
+.btn-answer:disabled {
+ opacity: 0.5;
+ cursor: not-allowed;
+}
+
.task-error {
font-size: 0.78rem;
color: var(--state-failed);