diff options
| author | Peter Stone <thepeterstone@gmail.com> | 2026-03-06 00:07:18 +0000 |
|---|---|---|
| committer | Peter Stone <thepeterstone@gmail.com> | 2026-03-06 00:07:18 +0000 |
| commit | 7466b1751c4126735769a3304e1db80dab166a9e (patch) | |
| tree | c5d0fe9d1018e62e3857480d471a0f6f8ebee104 /internal/storage/db.go | |
| parent | a33211d0ad07f5aaf2d8bb51ba18e6790a153bb4 (diff) | |
feat: blocked task state for agent questions via session resume
When an agent needs user input it writes a question to
$CLAUDOMATOR_QUESTION_FILE and exits. The runner detects the file and
returns BlockedError; the pool transitions the task to BLOCKED and
stores the question JSON on the task record.
The user answers via POST /api/tasks/{id}/answer. The server looks up
the claude session_id from the most recent execution and submits a
resume execution (claude --resume <session-id> "<answer>"), freeing the
executor slot entirely while waiting.
Changes:
- task: add StateBlocked, transitions RUNNING→BLOCKED, BLOCKED→QUEUED
- storage: add session_id to executions, question_json to tasks;
add GetLatestExecution and UpdateTaskQuestion methods
- executor: BlockedError type; ClaudeRunner pre-assigns --session-id,
sets CLAUDOMATOR_QUESTION_FILE env var, detects question file on exit;
buildArgs handles --resume mode; Pool.SubmitResume for resume path
- api: handleAnswerQuestion rewritten to create resume execution
- preamble: add question protocol instructions for agents
- web: BLOCKED state badge (indigo), question text + option buttons or
free-text input with Submit on the task card footer
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/storage/db.go')
| -rw-r--r-- | internal/storage/db.go | 45 |
1 files changed, 35 insertions, 10 deletions
diff --git a/internal/storage/db.go b/internal/storage/db.go index e656f98..0e4d6f1 100644 --- a/internal/storage/db.go +++ b/internal/storage/db.go @@ -88,6 +88,8 @@ func (s *DB) migrate() error { migrations := []string{ `ALTER TABLE tasks ADD COLUMN parent_task_id TEXT`, `ALTER TABLE tasks ADD COLUMN rejection_comment TEXT`, + `ALTER TABLE tasks ADD COLUMN question_json TEXT`, + `ALTER TABLE executions ADD COLUMN session_id TEXT`, } for _, m := range migrations { if _, err := s.db.Exec(m); err != nil { @@ -136,13 +138,13 @@ func (s *DB) CreateTask(t *task.Task) error { // GetTask retrieves a task by ID. func (s *DB) GetTask(id string) (*task.Task, error) { - row := s.db.QueryRow(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment FROM tasks WHERE id = ?`, id) + row := s.db.QueryRow(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json FROM tasks WHERE id = ?`, id) return scanTask(row) } // ListTasks returns tasks matching the given filter. func (s *DB) ListTasks(filter TaskFilter) ([]*task.Task, error) { - query := `SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment FROM tasks WHERE 1=1` + query := `SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json FROM tasks WHERE 1=1` var args []interface{} if filter.State != "" { @@ -174,7 +176,7 @@ func (s *DB) ListTasks(filter TaskFilter) ([]*task.Task, error) { // ListSubtasks returns all tasks whose parent_task_id matches the given ID. func (s *DB) ListSubtasks(parentID string) ([]*task.Task, error) { - rows, err := s.db.Query(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment FROM tasks WHERE parent_task_id = ? ORDER BY created_at ASC`, parentID) + rows, err := s.db.Query(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json FROM tasks WHERE parent_task_id = ? ORDER BY created_at ASC`, parentID) if err != nil { return nil, err } @@ -308,28 +310,33 @@ type Execution struct { ArtifactDir string CostUSD float64 ErrorMsg string + SessionID string // claude --session-id; persisted for resume + + // In-memory only: set when creating a resume execution, not stored in DB. + ResumeSessionID string + ResumeAnswer string } // CreateExecution inserts an execution record. func (s *DB) CreateExecution(e *Execution) error { _, err := s.db.Exec(` - INSERT INTO executions (id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + INSERT INTO executions (id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, e.ID, e.TaskID, e.StartTime.UTC(), e.EndTime.UTC(), e.ExitCode, e.Status, - e.StdoutPath, e.StderrPath, e.ArtifactDir, e.CostUSD, e.ErrorMsg, + e.StdoutPath, e.StderrPath, e.ArtifactDir, e.CostUSD, e.ErrorMsg, e.SessionID, ) return err } // GetExecution retrieves an execution by ID. func (s *DB) GetExecution(id string) (*Execution, error) { - row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg FROM executions WHERE id = ?`, id) + row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id FROM executions WHERE id = ?`, id) return scanExecution(row) } // ListExecutions returns executions for a task. func (s *DB) ListExecutions(taskID string) ([]*Execution, error) { - rows, err := s.db.Query(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg FROM executions WHERE task_id = ? ORDER BY start_time DESC`, taskID) + rows, err := s.db.Query(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id FROM executions WHERE task_id = ? ORDER BY start_time DESC`, taskID) if err != nil { return nil, err } @@ -346,6 +353,20 @@ func (s *DB) ListExecutions(taskID string) ([]*Execution, error) { return execs, rows.Err() } +// GetLatestExecution returns the most recent execution for a task. +func (s *DB) GetLatestExecution(taskID string) (*Execution, error) { + row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id FROM executions WHERE task_id = ? ORDER BY start_time DESC LIMIT 1`, taskID) + return scanExecution(row) +} + +// UpdateTaskQuestion stores the pending question JSON on a task. +// Pass empty string to clear the question after it has been answered. +func (s *DB) UpdateTaskQuestion(taskID, questionJSON string) error { + _, err := s.db.Exec(`UPDATE tasks SET question_json = ?, updated_at = ? WHERE id = ?`, + questionJSON, time.Now().UTC(), taskID) + return err +} + // UpdateExecution updates a completed execution. func (s *DB) UpdateExecution(e *Execution) error { _, err := s.db.Exec(` @@ -374,10 +395,12 @@ func scanTask(row scanner) (*task.Task, error) { timeoutNS int64 parentTaskID sql.NullString rejectionComment sql.NullString + questionJSON sql.NullString ) - err := row.Scan(&t.ID, &t.Name, &t.Description, &configJSON, &priority, &timeoutNS, &retryJSON, &tagsJSON, &depsJSON, &parentTaskID, &state, &t.CreatedAt, &t.UpdatedAt, &rejectionComment) + err := row.Scan(&t.ID, &t.Name, &t.Description, &configJSON, &priority, &timeoutNS, &retryJSON, &tagsJSON, &depsJSON, &parentTaskID, &state, &t.CreatedAt, &t.UpdatedAt, &rejectionComment, &questionJSON) t.ParentTaskID = parentTaskID.String t.RejectionComment = rejectionComment.String + t.QuestionJSON = questionJSON.String if err != nil { return nil, err } @@ -405,11 +428,13 @@ func scanTaskRows(rows *sql.Rows) (*task.Task, error) { func scanExecution(row scanner) (*Execution, error) { var e Execution + var sessionID sql.NullString err := row.Scan(&e.ID, &e.TaskID, &e.StartTime, &e.EndTime, &e.ExitCode, &e.Status, - &e.StdoutPath, &e.StderrPath, &e.ArtifactDir, &e.CostUSD, &e.ErrorMsg) + &e.StdoutPath, &e.StderrPath, &e.ArtifactDir, &e.CostUSD, &e.ErrorMsg, &sessionID) if err != nil { return nil, err } + e.SessionID = sessionID.String return &e, nil } |
