diff options
| author | Peter Stone <thepeterstone@gmail.com> | 2026-03-06 00:07:18 +0000 |
|---|---|---|
| committer | Peter Stone <thepeterstone@gmail.com> | 2026-03-06 00:07:18 +0000 |
| commit | 7466b1751c4126735769a3304e1db80dab166a9e (patch) | |
| tree | c5d0fe9d1018e62e3857480d471a0f6f8ebee104 | |
| parent | a33211d0ad07f5aaf2d8bb51ba18e6790a153bb4 (diff) | |
feat: blocked task state for agent questions via session resume
When an agent needs user input it writes a question to
$CLAUDOMATOR_QUESTION_FILE and exits. The runner detects the file and
returns BlockedError; the pool transitions the task to BLOCKED and
stores the question JSON on the task record.
The user answers via POST /api/tasks/{id}/answer. The server looks up
the claude session_id from the most recent execution and submits a
resume execution (claude --resume <session-id> "<answer>"), freeing the
executor slot entirely while waiting.
Changes:
- task: add StateBlocked, transitions RUNNING→BLOCKED, BLOCKED→QUEUED
- storage: add session_id to executions, question_json to tasks;
add GetLatestExecution and UpdateTaskQuestion methods
- executor: BlockedError type; ClaudeRunner pre-assigns --session-id,
sets CLAUDOMATOR_QUESTION_FILE env var, detects question file on exit;
buildArgs handles --resume mode; Pool.SubmitResume for resume path
- api: handleAnswerQuestion rewritten to create resume execution
- preamble: add question protocol instructions for agents
- web: BLOCKED state badge (indigo), question text + option buttons or
free-text input with Submit on the task card footer
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
| -rw-r--r-- | internal/api/server.go | 39 | ||||
| -rw-r--r-- | internal/api/server_test.go | 70 | ||||
| -rw-r--r-- | internal/executor/claude.go | 73 | ||||
| -rw-r--r-- | internal/executor/claude_test.go | 16 | ||||
| -rw-r--r-- | internal/executor/executor.go | 96 | ||||
| -rw-r--r-- | internal/executor/preamble.go | 19 | ||||
| -rw-r--r-- | internal/storage/db.go | 45 | ||||
| -rw-r--r-- | internal/storage/db_test.go | 68 | ||||
| -rw-r--r-- | internal/task/task.go | 5 | ||||
| -rw-r--r-- | internal/task/task_test.go | 2 | ||||
| -rwxr-xr-x | scripts/deploy | 6 | ||||
| -rwxr-xr-x | scripts/start-next-task | 2 | ||||
| -rw-r--r-- | web/app.js | 77 | ||||
| -rw-r--r-- | web/style.css | 44 |
14 files changed, 482 insertions, 80 deletions
diff --git a/internal/api/server.go b/internal/api/server.go index bac98b6..5758347 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -112,31 +112,52 @@ func (s *Server) BroadcastQuestion(taskID, toolUseID string, questionData json.R func (s *Server) handleAnswerQuestion(w http.ResponseWriter, r *http.Request) { taskID := r.PathValue("id") - if _, err := s.store.GetTask(taskID); err != nil { + tk, err := s.store.GetTask(taskID) + if err != nil { writeJSON(w, http.StatusNotFound, map[string]string{"error": "task not found"}) return } + if tk.State != task.StateBlocked { + writeJSON(w, http.StatusConflict, map[string]string{"error": "task is not blocked"}) + return + } var input struct { - QuestionID string `json:"question_id"` - Answer string `json:"answer"` + Answer string `json:"answer"` } if err := json.NewDecoder(r.Body).Decode(&input); err != nil { writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid JSON: " + err.Error()}) return } - if input.QuestionID == "" { - writeJSON(w, http.StatusBadRequest, map[string]string{"error": "question_id is required"}) + if input.Answer == "" { + writeJSON(w, http.StatusBadRequest, map[string]string{"error": "answer is required"}) + return + } + + // Look up the session ID from the most recent execution. + latest, err := s.store.GetLatestExecution(taskID) + if err != nil || latest.SessionID == "" { + writeJSON(w, http.StatusInternalServerError, map[string]string{"error": "no resumable session found"}) return } - ok := s.pool.Questions.Answer(input.QuestionID, input.Answer) - if !ok { - writeJSON(w, http.StatusNotFound, map[string]string{"error": "no pending question with that ID"}) + // Clear the question and transition to QUEUED. + s.store.UpdateTaskQuestion(taskID, "") + s.store.UpdateTaskState(taskID, task.StateQueued) + + // Submit a resume execution. + resumeExec := &storage.Execution{ + ID: uuid.New().String(), + TaskID: taskID, + ResumeSessionID: latest.SessionID, + ResumeAnswer: input.Answer, + } + if err := s.pool.SubmitResume(r.Context(), tk, resumeExec); err != nil { + writeJSON(w, http.StatusServiceUnavailable, map[string]string{"error": err.Error()}) return } - writeJSON(w, http.StatusOK, map[string]string{"status": "delivered"}) + writeJSON(w, http.StatusOK, map[string]string{"status": "queued"}) } func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { diff --git a/internal/api/server_test.go b/internal/api/server_test.go index af93a77..2325b0b 100644 --- a/internal/api/server_test.go +++ b/internal/api/server_test.go @@ -339,8 +339,7 @@ func TestCORS_Headers(t *testing.T) { func TestAnswerQuestion_NoTask_Returns404(t *testing.T) { srv, _ := testServer(t) - payload := `{"question_id": "toolu_abc", "answer": "blue"}` - req := httptest.NewRequest("POST", "/api/tasks/nonexistent/answer", bytes.NewBufferString(payload)) + req := httptest.NewRequest("POST", "/api/tasks/nonexistent/answer", bytes.NewBufferString(`{"answer":"blue"}`)) req.Header.Set("Content-Type", "application/json") w := httptest.NewRecorder() @@ -351,64 +350,65 @@ func TestAnswerQuestion_NoTask_Returns404(t *testing.T) { } } -func TestAnswerQuestion_NoPendingQuestion_Returns404(t *testing.T) { +func TestAnswerQuestion_TaskNotBlocked_Returns409(t *testing.T) { srv, store := testServer(t) createTaskWithState(t, store, "answer-task-1", task.StatePending) - payload := `{"question_id": "toolu_nonexistent", "answer": "blue"}` - req := httptest.NewRequest("POST", "/api/tasks/answer-task-1/answer", bytes.NewBufferString(payload)) + req := httptest.NewRequest("POST", "/api/tasks/answer-task-1/answer", bytes.NewBufferString(`{"answer":"blue"}`)) req.Header.Set("Content-Type", "application/json") w := httptest.NewRecorder() srv.Handler().ServeHTTP(w, req) - if w.Code != http.StatusNotFound { - t.Errorf("status: want 404, got %d; body: %s", w.Code, w.Body.String()) - } - var body map[string]string - json.NewDecoder(w.Body).Decode(&body) - if body["error"] != "no pending question with that ID" { - t.Errorf("error: want 'no pending question with that ID', got %q", body["error"]) + if w.Code != http.StatusConflict { + t.Errorf("status: want 409, got %d; body: %s", w.Code, w.Body.String()) } } -func TestAnswerQuestion_WithPendingQuestion_Returns200(t *testing.T) { +func TestAnswerQuestion_MissingAnswer_Returns400(t *testing.T) { srv, store := testServer(t) - createTaskWithState(t, store, "answer-task-2", task.StateRunning) - - ch := srv.pool.Questions.Register("answer-task-2", "toolu_Q1", []byte(`{}`)) + createTaskWithState(t, store, "answer-task-2", task.StateBlocked) - go func() { - payload := `{"question_id": "toolu_Q1", "answer": "red"}` - req := httptest.NewRequest("POST", "/api/tasks/answer-task-2/answer", bytes.NewBufferString(payload)) - req.Header.Set("Content-Type", "application/json") - w := httptest.NewRecorder() - srv.Handler().ServeHTTP(w, req) + req := httptest.NewRequest("POST", "/api/tasks/answer-task-2/answer", bytes.NewBufferString(`{}`)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() - if w.Code != http.StatusOK { - t.Errorf("status: want 200, got %d; body: %s", w.Code, w.Body.String()) - } - }() + srv.Handler().ServeHTTP(w, req) - answer := <-ch - if answer != "red" { - t.Errorf("answer: want 'red', got %q", answer) + if w.Code != http.StatusBadRequest { + t.Errorf("status: want 400, got %d; body: %s", w.Code, w.Body.String()) } } -func TestAnswerQuestion_MissingQuestionID_Returns400(t *testing.T) { +func TestAnswerQuestion_BlockedTask_QueuesResume(t *testing.T) { srv, store := testServer(t) - createTaskWithState(t, store, "answer-task-3", task.StateRunning) + createTaskWithState(t, store, "answer-task-3", task.StateBlocked) + + // Create an execution with a session ID, as the runner would have. + exec := &storage.Execution{ + ID: "exec-blocked-1", + TaskID: "answer-task-3", + SessionID: "550e8400-e29b-41d4-a716-446655440001", + Status: "BLOCKED", + } + if err := store.CreateExecution(exec); err != nil { + t.Fatalf("create execution: %v", err) + } - payload := `{"answer": "blue"}` - req := httptest.NewRequest("POST", "/api/tasks/answer-task-3/answer", bytes.NewBufferString(payload)) + req := httptest.NewRequest("POST", "/api/tasks/answer-task-3/answer", bytes.NewBufferString(`{"answer":"main"}`)) req.Header.Set("Content-Type", "application/json") w := httptest.NewRecorder() srv.Handler().ServeHTTP(w, req) - if w.Code != http.StatusBadRequest { - t.Errorf("status: want 400, got %d", w.Code) + if w.Code != http.StatusOK { + t.Errorf("status: want 200, got %d; body: %s", w.Code, w.Body.String()) + } + + // Task should now be QUEUED (or RUNNING since the mock runner is instant). + got, _ := store.GetTask("answer-task-3") + if got.State != task.StateQueued && got.State != task.StateRunning && got.State != task.StateReady { + t.Errorf("task state: want QUEUED/RUNNING/READY after answer, got %v", got.State) } } diff --git a/internal/executor/claude.go b/internal/executor/claude.go index d3f5751..b97f202 100644 --- a/internal/executor/claude.go +++ b/internal/executor/claude.go @@ -27,6 +27,15 @@ type ClaudeRunner struct { APIURL string // base URL of the Claudomator API, passed to subprocesses } +// BlockedError is returned by Run when the agent wrote a question file and exited. +// The pool transitions the task to BLOCKED and stores the question for the user. +type BlockedError struct { + QuestionJSON string // raw JSON from the question file + SessionID string // claude session to resume once the user answers +} + +func (e *BlockedError) Error() string { return fmt.Sprintf("task blocked: %s", e.QuestionJSON) } + // ExecLogDir returns the log directory for the given execution ID. // Implements LogPather so the pool can persist paths before execution starts. func (r *ClaudeRunner) ExecLogDir(execID string) string { @@ -45,9 +54,8 @@ func (r *ClaudeRunner) binaryPath() string { // Run executes a claude -p invocation, streaming output to log files. // It retries up to 3 times on rate-limit errors using exponential backoff. +// If the agent writes a question file and exits, Run returns *BlockedError. func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error { - args := r.buildArgs(t) - if t.Claude.WorkingDir != "" { if _, err := os.Stat(t.Claude.WorkingDir); err != nil { return fmt.Errorf("working_dir %q: %w", t.Claude.WorkingDir, err) @@ -55,11 +63,9 @@ func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Executi } // Setup log directory once; retries overwrite the log files. - // Use pre-set paths if the pool already populated them via LogPather; - // otherwise fall back to computing from LogDir + execID. logDir := r.ExecLogDir(e.ID) if logDir == "" { - logDir = e.ID // fallback: use execID as relative dir (tests without LogDir set) + logDir = e.ID // fallback for tests without LogDir set } if err := os.MkdirAll(logDir, 0700); err != nil { return fmt.Errorf("creating log dir: %w", err) @@ -70,8 +76,17 @@ func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Executi e.ArtifactDir = logDir } + // Pre-assign session ID so we can resume after a BLOCKED state. + // If this is a resume execution the session ID is already set. + if e.SessionID == "" { + e.SessionID = e.ID // reuse execution UUID as session UUID (both are UUIDs) + } + + questionFile := filepath.Join(logDir, "question.json") + args := r.buildArgs(t, e, questionFile) + attempt := 0 - return runWithBackoff(ctx, 3, 5*time.Second, func() error { + err := runWithBackoff(ctx, 3, 5*time.Second, func() error { if attempt > 0 { delay := 5 * time.Second * (1 << (attempt - 1)) r.Logger.Warn("rate-limited by Claude API, retrying", @@ -80,22 +95,34 @@ func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Executi ) } attempt++ - return r.execOnce(ctx, t, args, e) + return r.execOnce(ctx, args, t.Claude.WorkingDir, e) }) + if err != nil { + return err + } + + // Check whether the agent left a question before exiting. + data, readErr := os.ReadFile(questionFile) + if readErr == nil { + os.Remove(questionFile) // consumed + return &BlockedError{QuestionJSON: strings.TrimSpace(string(data)), SessionID: e.SessionID} + } + return nil } // execOnce runs the claude subprocess once, streaming output to e's log paths. -func (r *ClaudeRunner) execOnce(ctx context.Context, t *task.Task, args []string, e *storage.Execution) error { +func (r *ClaudeRunner) execOnce(ctx context.Context, args []string, workingDir string, e *storage.Execution) error { cmd := exec.CommandContext(ctx, r.binaryPath(), args...) cmd.Env = append(os.Environ(), "CLAUDOMATOR_API_URL="+r.APIURL, - "CLAUDOMATOR_TASK_ID="+t.ID, + "CLAUDOMATOR_TASK_ID="+e.TaskID, + "CLAUDOMATOR_QUESTION_FILE="+filepath.Join(e.ArtifactDir, "question.json"), ) // Put the subprocess in its own process group so we can SIGKILL the entire // group (MCP servers, bash children, etc.) on cancellation. cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} - if t.Claude.WorkingDir != "" { - cmd.Dir = t.Claude.WorkingDir + if workingDir != "" { + cmd.Dir = workingDir } stdoutFile, err := os.Create(e.StdoutPath) @@ -172,13 +199,32 @@ func (r *ClaudeRunner) execOnce(ctx context.Context, t *task.Task, args []string return nil } -func (r *ClaudeRunner) buildArgs(t *task.Task) []string { +func (r *ClaudeRunner) buildArgs(t *task.Task, e *storage.Execution, questionFile string) []string { + // Resume execution: the agent already has context; just deliver the answer. + if e.ResumeSessionID != "" { + args := []string{ + "-p", e.ResumeAnswer, + "--resume", e.ResumeSessionID, + "--output-format", "stream-json", + "--verbose", + } + permMode := t.Claude.PermissionMode + if permMode == "" { + permMode = "bypassPermissions" + } + args = append(args, "--permission-mode", permMode) + if t.Claude.Model != "" { + args = append(args, "--model", t.Claude.Model) + } + return args + } + instructions := t.Claude.Instructions allowedTools := t.Claude.AllowedTools if !t.Claude.SkipPlanning { instructions = withPlanningPreamble(instructions) - // Ensure Bash is available so the agent can POST subtasks. + // Ensure Bash is available so the agent can POST subtasks and ask questions. hasBash := false for _, tool := range allowedTools { if tool == "Bash" { @@ -193,6 +239,7 @@ func (r *ClaudeRunner) buildArgs(t *task.Task) []string { args := []string{ "-p", instructions, + "--session-id", e.SessionID, "--output-format", "stream-json", "--verbose", } diff --git a/internal/executor/claude_test.go b/internal/executor/claude_test.go index edf13b5..056c7e8 100644 --- a/internal/executor/claude_test.go +++ b/internal/executor/claude_test.go @@ -21,7 +21,7 @@ func TestClaudeRunner_BuildArgs_BasicTask(t *testing.T) { }, } - args := r.buildArgs(tk) + args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") argMap := make(map[string]bool) for _, a := range args { @@ -51,7 +51,7 @@ func TestClaudeRunner_BuildArgs_FullConfig(t *testing.T) { }, } - args := r.buildArgs(tk) + args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") // Check key args are present. argMap := make(map[string]bool) @@ -86,7 +86,7 @@ func TestClaudeRunner_BuildArgs_DefaultsToBypassPermissions(t *testing.T) { }, } - args := r.buildArgs(tk) + args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") found := false for i, a := range args { @@ -109,7 +109,7 @@ func TestClaudeRunner_BuildArgs_RespectsExplicitPermissionMode(t *testing.T) { }, } - args := r.buildArgs(tk) + args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") for i, a := range args { if a == "--permission-mode" && i+1 < len(args) { @@ -131,7 +131,7 @@ func TestClaudeRunner_BuildArgs_AlwaysIncludesVerbose(t *testing.T) { }, } - args := r.buildArgs(tk) + args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") found := false for _, a := range args { @@ -154,7 +154,7 @@ func TestClaudeRunner_BuildArgs_PreamblePrepended(t *testing.T) { }, } - args := r.buildArgs(tk) + args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") // The -p value should start with the preamble and end with the original instructions. if len(args) < 2 || args[0] != "-p" { @@ -178,7 +178,7 @@ func TestClaudeRunner_BuildArgs_PreambleAddsBash(t *testing.T) { }, } - args := r.buildArgs(tk) + args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") // Bash should be appended to allowed tools. foundBash := false @@ -202,7 +202,7 @@ func TestClaudeRunner_BuildArgs_PreambleBashNotDuplicated(t *testing.T) { }, } - args := r.buildArgs(tk) + args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") // Count Bash occurrences in --allowedTools values. bashCount := 0 diff --git a/internal/executor/executor.go b/internal/executor/executor.go index eb23c02..f6932f4 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -2,6 +2,7 @@ package executor import ( "context" + "errors" "fmt" "log/slog" "path/filepath" @@ -80,6 +81,94 @@ func (p *Pool) Results() <-chan *Result { return p.resultCh } +// SubmitResume re-queues a blocked task using the provided resume execution. +// The execution must have ResumeSessionID and ResumeAnswer set. +func (p *Pool) SubmitResume(ctx context.Context, t *task.Task, exec *storage.Execution) error { + p.mu.Lock() + if p.active >= p.maxConcurrent { + active := p.active + max := p.maxConcurrent + p.mu.Unlock() + return fmt.Errorf("executor pool at capacity (%d/%d)", active, max) + } + p.active++ + p.mu.Unlock() + + go p.executeResume(ctx, t, exec) + return nil +} + +func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Execution) { + defer func() { + p.mu.Lock() + p.active-- + p.mu.Unlock() + }() + + // Pre-populate log paths. + if lp, ok := p.runner.(LogPather); ok { + if logDir := lp.ExecLogDir(exec.ID); logDir != "" { + exec.StdoutPath = filepath.Join(logDir, "stdout.log") + exec.StderrPath = filepath.Join(logDir, "stderr.log") + exec.ArtifactDir = logDir + } + } + exec.StartTime = time.Now().UTC() + exec.Status = "RUNNING" + + if err := p.store.CreateExecution(exec); err != nil { + p.logger.Error("failed to create resume execution record", "error", err) + } + if err := p.store.UpdateTaskState(t.ID, task.StateRunning); err != nil { + p.logger.Error("failed to update task state", "error", err) + } + + var cancel context.CancelFunc + if t.Timeout.Duration > 0 { + ctx, cancel = context.WithTimeout(ctx, t.Timeout.Duration) + } else { + ctx, cancel = context.WithCancel(ctx) + } + defer cancel() + + err := p.runner.Run(ctx, t, exec) + exec.EndTime = time.Now().UTC() + + if err != nil { + var blockedErr *BlockedError + if errors.As(err, &blockedErr) { + exec.Status = "BLOCKED" + p.store.UpdateTaskState(t.ID, task.StateBlocked) + p.store.UpdateTaskQuestion(t.ID, blockedErr.QuestionJSON) + } else if ctx.Err() == context.DeadlineExceeded { + exec.Status = "TIMED_OUT" + exec.ErrorMsg = "execution timed out" + p.store.UpdateTaskState(t.ID, task.StateTimedOut) + } else if ctx.Err() == context.Canceled { + exec.Status = "CANCELLED" + exec.ErrorMsg = "execution cancelled" + p.store.UpdateTaskState(t.ID, task.StateCancelled) + } else { + exec.Status = "FAILED" + exec.ErrorMsg = err.Error() + p.store.UpdateTaskState(t.ID, task.StateFailed) + } + } else { + if t.ParentTaskID == "" { + exec.Status = "READY" + p.store.UpdateTaskState(t.ID, task.StateReady) + } else { + exec.Status = "COMPLETED" + p.store.UpdateTaskState(t.ID, task.StateCompleted) + } + } + + if updateErr := p.store.UpdateExecution(exec); updateErr != nil { + p.logger.Error("failed to update resume execution", "error", updateErr) + } + p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} +} + // ActiveCount returns the number of currently running tasks. func (p *Pool) ActiveCount() int { p.mu.Lock() @@ -155,7 +244,12 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { exec.EndTime = time.Now().UTC() if err != nil { - if ctx.Err() == context.DeadlineExceeded { + var blockedErr *BlockedError + if errors.As(err, &blockedErr) { + exec.Status = "BLOCKED" + p.store.UpdateTaskState(t.ID, task.StateBlocked) + p.store.UpdateTaskQuestion(t.ID, blockedErr.QuestionJSON) + } else if ctx.Err() == context.DeadlineExceeded { exec.Status = "TIMED_OUT" exec.ErrorMsg = "execution timed out" p.store.UpdateTaskState(t.ID, task.StateTimedOut) diff --git a/internal/executor/preamble.go b/internal/executor/preamble.go index cd1a2cc..8a2cce3 100644 --- a/internal/executor/preamble.go +++ b/internal/executor/preamble.go @@ -1,6 +1,23 @@ package executor -const planningPreamble = `## Planning Step (do this first) +const planningPreamble = `## Runtime Environment + +You are running as a background agent inside Claudomator. You cannot interact +with the user directly. However, if you need a decision or clarification: + +**To ask the user a question and pause:** +1. Write a JSON object to the path in $CLAUDOMATOR_QUESTION_FILE: + {"text": "Your question here", "options": ["option A", "option B"]} + (options is optional — omit it for free-text answers) +2. Exit immediately. Do not wait. The task will be resumed with the user's answer + as the next message in this conversation. + +Only ask a question when truly blocked. Prefer making a reasonable decision +and noting it in your output. + +--- + +## Planning Step (do this first) Before doing any implementation work: diff --git a/internal/storage/db.go b/internal/storage/db.go index e656f98..0e4d6f1 100644 --- a/internal/storage/db.go +++ b/internal/storage/db.go @@ -88,6 +88,8 @@ func (s *DB) migrate() error { migrations := []string{ `ALTER TABLE tasks ADD COLUMN parent_task_id TEXT`, `ALTER TABLE tasks ADD COLUMN rejection_comment TEXT`, + `ALTER TABLE tasks ADD COLUMN question_json TEXT`, + `ALTER TABLE executions ADD COLUMN session_id TEXT`, } for _, m := range migrations { if _, err := s.db.Exec(m); err != nil { @@ -136,13 +138,13 @@ func (s *DB) CreateTask(t *task.Task) error { // GetTask retrieves a task by ID. func (s *DB) GetTask(id string) (*task.Task, error) { - row := s.db.QueryRow(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment FROM tasks WHERE id = ?`, id) + row := s.db.QueryRow(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json FROM tasks WHERE id = ?`, id) return scanTask(row) } // ListTasks returns tasks matching the given filter. func (s *DB) ListTasks(filter TaskFilter) ([]*task.Task, error) { - query := `SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment FROM tasks WHERE 1=1` + query := `SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json FROM tasks WHERE 1=1` var args []interface{} if filter.State != "" { @@ -174,7 +176,7 @@ func (s *DB) ListTasks(filter TaskFilter) ([]*task.Task, error) { // ListSubtasks returns all tasks whose parent_task_id matches the given ID. func (s *DB) ListSubtasks(parentID string) ([]*task.Task, error) { - rows, err := s.db.Query(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment FROM tasks WHERE parent_task_id = ? ORDER BY created_at ASC`, parentID) + rows, err := s.db.Query(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json FROM tasks WHERE parent_task_id = ? ORDER BY created_at ASC`, parentID) if err != nil { return nil, err } @@ -308,28 +310,33 @@ type Execution struct { ArtifactDir string CostUSD float64 ErrorMsg string + SessionID string // claude --session-id; persisted for resume + + // In-memory only: set when creating a resume execution, not stored in DB. + ResumeSessionID string + ResumeAnswer string } // CreateExecution inserts an execution record. func (s *DB) CreateExecution(e *Execution) error { _, err := s.db.Exec(` - INSERT INTO executions (id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + INSERT INTO executions (id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, e.ID, e.TaskID, e.StartTime.UTC(), e.EndTime.UTC(), e.ExitCode, e.Status, - e.StdoutPath, e.StderrPath, e.ArtifactDir, e.CostUSD, e.ErrorMsg, + e.StdoutPath, e.StderrPath, e.ArtifactDir, e.CostUSD, e.ErrorMsg, e.SessionID, ) return err } // GetExecution retrieves an execution by ID. func (s *DB) GetExecution(id string) (*Execution, error) { - row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg FROM executions WHERE id = ?`, id) + row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id FROM executions WHERE id = ?`, id) return scanExecution(row) } // ListExecutions returns executions for a task. func (s *DB) ListExecutions(taskID string) ([]*Execution, error) { - rows, err := s.db.Query(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg FROM executions WHERE task_id = ? ORDER BY start_time DESC`, taskID) + rows, err := s.db.Query(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id FROM executions WHERE task_id = ? ORDER BY start_time DESC`, taskID) if err != nil { return nil, err } @@ -346,6 +353,20 @@ func (s *DB) ListExecutions(taskID string) ([]*Execution, error) { return execs, rows.Err() } +// GetLatestExecution returns the most recent execution for a task. +func (s *DB) GetLatestExecution(taskID string) (*Execution, error) { + row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id FROM executions WHERE task_id = ? ORDER BY start_time DESC LIMIT 1`, taskID) + return scanExecution(row) +} + +// UpdateTaskQuestion stores the pending question JSON on a task. +// Pass empty string to clear the question after it has been answered. +func (s *DB) UpdateTaskQuestion(taskID, questionJSON string) error { + _, err := s.db.Exec(`UPDATE tasks SET question_json = ?, updated_at = ? WHERE id = ?`, + questionJSON, time.Now().UTC(), taskID) + return err +} + // UpdateExecution updates a completed execution. func (s *DB) UpdateExecution(e *Execution) error { _, err := s.db.Exec(` @@ -374,10 +395,12 @@ func scanTask(row scanner) (*task.Task, error) { timeoutNS int64 parentTaskID sql.NullString rejectionComment sql.NullString + questionJSON sql.NullString ) - err := row.Scan(&t.ID, &t.Name, &t.Description, &configJSON, &priority, &timeoutNS, &retryJSON, &tagsJSON, &depsJSON, &parentTaskID, &state, &t.CreatedAt, &t.UpdatedAt, &rejectionComment) + err := row.Scan(&t.ID, &t.Name, &t.Description, &configJSON, &priority, &timeoutNS, &retryJSON, &tagsJSON, &depsJSON, &parentTaskID, &state, &t.CreatedAt, &t.UpdatedAt, &rejectionComment, &questionJSON) t.ParentTaskID = parentTaskID.String t.RejectionComment = rejectionComment.String + t.QuestionJSON = questionJSON.String if err != nil { return nil, err } @@ -405,11 +428,13 @@ func scanTaskRows(rows *sql.Rows) (*task.Task, error) { func scanExecution(row scanner) (*Execution, error) { var e Execution + var sessionID sql.NullString err := row.Scan(&e.ID, &e.TaskID, &e.StartTime, &e.EndTime, &e.ExitCode, &e.Status, - &e.StdoutPath, &e.StderrPath, &e.ArtifactDir, &e.CostUSD, &e.ErrorMsg) + &e.StdoutPath, &e.StderrPath, &e.ArtifactDir, &e.CostUSD, &e.ErrorMsg, &sessionID) if err != nil { return nil, err } + e.SessionID = sessionID.String return &e, nil } diff --git a/internal/storage/db_test.go b/internal/storage/db_test.go index 4f9069a..395574c 100644 --- a/internal/storage/db_test.go +++ b/internal/storage/db_test.go @@ -421,3 +421,71 @@ func TestUpdateExecution(t *testing.T) { t.Errorf("artifact_dir: want /tmp/exec, got %q", got.ArtifactDir) } } + +func makeTestTask(id string, now time.Time) *task.Task { + return &task.Task{ + ID: id, Name: "T-" + id, Claude: task.ClaudeConfig{Instructions: "x"}, + Priority: task.PriorityNormal, Retry: task.RetryConfig{MaxAttempts: 1, Backoff: "linear"}, + Tags: []string{}, DependsOn: []string{}, + State: task.StatePending, CreatedAt: now, UpdatedAt: now, + } +} + +func TestStorage_SessionID_RoundTrip(t *testing.T) { + db := testDB(t) + now := time.Now().UTC() + db.CreateTask(makeTestTask("sid-task", now)) + + exec := &Execution{ + ID: "sid-exec", TaskID: "sid-task", StartTime: now, Status: "RUNNING", + SessionID: "550e8400-e29b-41d4-a716-446655440000", + } + if err := db.CreateExecution(exec); err != nil { + t.Fatalf("create: %v", err) + } + + got, err := db.GetExecution("sid-exec") + if err != nil { + t.Fatalf("get: %v", err) + } + if got.SessionID != exec.SessionID { + t.Errorf("session_id: want %q, got %q", exec.SessionID, got.SessionID) + } +} + +func TestStorage_UpdateTaskQuestion(t *testing.T) { + db := testDB(t) + now := time.Now().UTC() + tk := makeTestTask("q-task", now) + db.CreateTask(tk) + + q := `{"text":"Which branch?","options":["main","develop"]}` + if err := db.UpdateTaskQuestion("q-task", q); err != nil { + t.Fatalf("update question: %v", err) + } + + got, err := db.GetTask("q-task") + if err != nil { + t.Fatalf("get: %v", err) + } + if got.QuestionJSON != q { + t.Errorf("question_json: want %q, got %q", q, got.QuestionJSON) + } +} + +func TestStorage_GetLatestExecution(t *testing.T) { + db := testDB(t) + now := time.Now().UTC() + db.CreateTask(makeTestTask("le-task", now)) + + db.CreateExecution(&Execution{ID: "le-1", TaskID: "le-task", StartTime: now, Status: "COMPLETED"}) + db.CreateExecution(&Execution{ID: "le-2", TaskID: "le-task", StartTime: now.Add(time.Minute), Status: "RUNNING"}) + + got, err := db.GetLatestExecution("le-task") + if err != nil { + t.Fatalf("get latest: %v", err) + } + if got.ID != "le-2" { + t.Errorf("want le-2, got %q", got.ID) + } +} diff --git a/internal/task/task.go b/internal/task/task.go index 3e74a82..587993f 100644 --- a/internal/task/task.go +++ b/internal/task/task.go @@ -14,6 +14,7 @@ const ( StateTimedOut State = "TIMED_OUT" StateCancelled State = "CANCELLED" StateBudgetExceeded State = "BUDGET_EXCEEDED" + StateBlocked State = "BLOCKED" ) type Priority string @@ -56,6 +57,7 @@ type Task struct { DependsOn []string `yaml:"depends_on" json:"depends_on"` State State `yaml:"-" json:"state"` RejectionComment string `yaml:"-" json:"rejection_comment,omitempty"` + QuestionJSON string `yaml:"-" json:"question,omitempty"` CreatedAt time.Time `yaml:"-" json:"created_at"` UpdatedAt time.Time `yaml:"-" json:"updated_at"` } @@ -92,10 +94,11 @@ func ValidTransition(from, to State) bool { transitions := map[State][]State{ StatePending: {StateQueued, StateCancelled}, StateQueued: {StateRunning, StateCancelled}, - StateRunning: {StateReady, StateCompleted, StateFailed, StateTimedOut, StateCancelled, StateBudgetExceeded}, + StateRunning: {StateReady, StateCompleted, StateFailed, StateTimedOut, StateCancelled, StateBudgetExceeded, StateBlocked}, StateReady: {StateCompleted, StatePending}, StateFailed: {StateQueued}, // retry StateTimedOut: {StateQueued}, // retry + StateBlocked: {StateQueued}, // answer received → re-queue as resume execution } for _, allowed := range transitions[from] { if allowed == to { diff --git a/internal/task/task_test.go b/internal/task/task_test.go index 6498271..5d997ac 100644 --- a/internal/task/task_test.go +++ b/internal/task/task_test.go @@ -22,6 +22,8 @@ func TestValidTransition_AllowedTransitions(t *testing.T) { {"running to budget exceeded", StateRunning, StateBudgetExceeded}, {"failed to queued (retry)", StateFailed, StateQueued}, {"timed out to queued (retry)", StateTimedOut, StateQueued}, + {"running to blocked (question)", StateRunning, StateBlocked}, + {"blocked to queued (answer resume)", StateBlocked, StateQueued}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/scripts/deploy b/scripts/deploy index 4722dab..0b8e162 100755 --- a/scripts/deploy +++ b/scripts/deploy @@ -18,6 +18,12 @@ export GOPATH="${SITE_DIR}/cache/gopath" mkdir -p "${GOCACHE}" "${GOPATH}" go build -o "${BIN_DIR}/claudomator" ./cmd/claudomator/ +echo "==> Copying scripts..." +mkdir -p "${SITE_DIR}/scripts" +cp "${REPO_DIR}/scripts/"* "${SITE_DIR}/scripts/" +chown -R www-data:www-data "${SITE_DIR}/scripts" +chmod +x "${SITE_DIR}/scripts/"* + echo "==> Fixing permissions..." chown www-data:www-data "${BIN_DIR}/claudomator" chmod +x "${BIN_DIR}/claudomator" diff --git a/scripts/start-next-task b/scripts/start-next-task index 1e45eb3..da019b6 100755 --- a/scripts/start-next-task +++ b/scripts/start-next-task @@ -12,4 +12,4 @@ if [[ -z "$task_id" ]]; then exit 0 fi -echo claudomator start "$task_id" +claudomator start "$task_id" @@ -71,7 +71,7 @@ function createTaskCard(task) { // Footer: action buttons based on state const RESTART_STATES = new Set(['FAILED', 'TIMED_OUT', 'CANCELLED']); - if (task.state === 'PENDING' || task.state === 'RUNNING' || task.state === 'READY' || RESTART_STATES.has(task.state)) { + if (task.state === 'PENDING' || task.state === 'RUNNING' || task.state === 'READY' || task.state === 'BLOCKED' || RESTART_STATES.has(task.state)) { const footer = document.createElement('div'); footer.className = 'task-card-footer'; @@ -110,6 +110,8 @@ function createTaskCard(task) { }); footer.appendChild(acceptBtn); footer.appendChild(rejectBtn); + } else if (task.state === 'BLOCKED') { + renderQuestionFooter(task, footer); } else if (RESTART_STATES.has(task.state)) { const btn = document.createElement('button'); btn.className = 'btn-restart'; @@ -312,6 +314,79 @@ async function restartTask(taskId) { return res.json(); } +function renderQuestionFooter(task, footer) { + let question = { text: 'Waiting for your input.', options: [] }; + if (task.question) { + try { question = JSON.parse(task.question); } catch {} + } + + const questionEl = document.createElement('p'); + questionEl.className = 'task-question-text'; + questionEl.textContent = question.text; + footer.appendChild(questionEl); + + if (question.options && question.options.length > 0) { + question.options.forEach(opt => { + const btn = document.createElement('button'); + btn.className = 'btn-answer'; + btn.textContent = opt; + btn.addEventListener('click', (e) => { + e.stopPropagation(); + handleAnswer(task.id, opt, footer); + }); + footer.appendChild(btn); + }); + } else { + const row = document.createElement('div'); + row.className = 'task-answer-row'; + const input = document.createElement('input'); + input.type = 'text'; + input.className = 'task-answer-input'; + input.placeholder = 'Your answer…'; + const btn = document.createElement('button'); + btn.className = 'btn-answer'; + btn.textContent = 'Submit'; + btn.addEventListener('click', (e) => { + e.stopPropagation(); + if (input.value.trim()) handleAnswer(task.id, input.value.trim(), footer); + }); + input.addEventListener('keydown', (e) => { + if (e.key === 'Enter' && input.value.trim()) { + e.stopPropagation(); + handleAnswer(task.id, input.value.trim(), footer); + } + }); + row.append(input, btn); + footer.appendChild(row); + } +} + +async function handleAnswer(taskId, answer, footer) { + const btns = footer.querySelectorAll('button, input'); + btns.forEach(el => { el.disabled = true; }); + const prev = footer.querySelector('.task-error'); + if (prev) prev.remove(); + + try { + const res = await fetch(`${API_BASE}/api/tasks/${taskId}/answer`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ answer }), + }); + if (!res.ok) { + const body = await res.json().catch(() => ({})); + throw new Error(body.error || `HTTP ${res.status}`); + } + await poll(); + } catch (err) { + btns.forEach(el => { el.disabled = false; }); + const errEl = document.createElement('span'); + errEl.className = 'task-error'; + errEl.textContent = `Failed: ${err.message}`; + footer.appendChild(errEl); + } +} + async function handleCancel(taskId, btn, footer) { btn.disabled = true; btn.textContent = 'Cancelling…'; diff --git a/web/style.css b/web/style.css index 1478b36..91466ee 100644 --- a/web/style.css +++ b/web/style.css @@ -8,6 +8,7 @@ --state-timed-out: #c084fc; --state-cancelled: #9ca3af; --state-budget-exceeded: #fb923c; + --state-blocked: #818cf8; --bg: #0f172a; --surface: #1e293b; @@ -181,6 +182,7 @@ main { .state-badge[data-state="TIMED_OUT"] { background: var(--state-timed-out); } .state-badge[data-state="CANCELLED"] { background: var(--state-cancelled); } .state-badge[data-state="BUDGET_EXCEEDED"] { background: var(--state-budget-exceeded); } +.state-badge[data-state="BLOCKED"] { background: var(--state-blocked); } /* Task meta */ .task-meta { @@ -294,6 +296,48 @@ main { cursor: not-allowed; } +.task-question-text { + font-size: 0.82rem; + color: var(--text); + margin: 0 0 0.5rem 0; + line-height: 1.4; + width: 100%; +} + +.task-answer-row { + display: flex; + gap: 0.375rem; + width: 100%; +} + +.task-answer-input { + flex: 1; + font-size: 0.8rem; + padding: 0.3em 0.6em; + border-radius: 0.375rem; + border: 1px solid var(--border); + background: var(--bg); + color: var(--text); +} + +.btn-answer { + font-size: 0.8rem; + font-weight: 600; + padding: 0.35em 0.85em; + border-radius: 0.375rem; + border: none; + cursor: pointer; + background: var(--state-blocked); + color: #0f172a; + transition: opacity 0.15s; + margin-right: 0.375rem; +} + +.btn-answer:disabled { + opacity: 0.5; + cursor: not-allowed; +} + .task-error { font-size: 0.78rem; color: var(--state-failed); |
