summaryrefslogtreecommitdiff
path: root/internal/storage/db.go
diff options
context:
space:
mode:
authorPeter Stone <thepeterstone@gmail.com>2026-03-06 00:07:18 +0000
committerPeter Stone <thepeterstone@gmail.com>2026-03-06 00:07:18 +0000
commit7466b1751c4126735769a3304e1db80dab166a9e (patch)
treec5d0fe9d1018e62e3857480d471a0f6f8ebee104 /internal/storage/db.go
parenta33211d0ad07f5aaf2d8bb51ba18e6790a153bb4 (diff)
feat: blocked task state for agent questions via session resume
When an agent needs user input it writes a question to $CLAUDOMATOR_QUESTION_FILE and exits. The runner detects the file and returns BlockedError; the pool transitions the task to BLOCKED and stores the question JSON on the task record. The user answers via POST /api/tasks/{id}/answer. The server looks up the claude session_id from the most recent execution and submits a resume execution (claude --resume <session-id> "<answer>"), freeing the executor slot entirely while waiting. Changes: - task: add StateBlocked, transitions RUNNING→BLOCKED, BLOCKED→QUEUED - storage: add session_id to executions, question_json to tasks; add GetLatestExecution and UpdateTaskQuestion methods - executor: BlockedError type; ClaudeRunner pre-assigns --session-id, sets CLAUDOMATOR_QUESTION_FILE env var, detects question file on exit; buildArgs handles --resume mode; Pool.SubmitResume for resume path - api: handleAnswerQuestion rewritten to create resume execution - preamble: add question protocol instructions for agents - web: BLOCKED state badge (indigo), question text + option buttons or free-text input with Submit on the task card footer Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/storage/db.go')
-rw-r--r--internal/storage/db.go45
1 files changed, 35 insertions, 10 deletions
diff --git a/internal/storage/db.go b/internal/storage/db.go
index e656f98..0e4d6f1 100644
--- a/internal/storage/db.go
+++ b/internal/storage/db.go
@@ -88,6 +88,8 @@ func (s *DB) migrate() error {
migrations := []string{
`ALTER TABLE tasks ADD COLUMN parent_task_id TEXT`,
`ALTER TABLE tasks ADD COLUMN rejection_comment TEXT`,
+ `ALTER TABLE tasks ADD COLUMN question_json TEXT`,
+ `ALTER TABLE executions ADD COLUMN session_id TEXT`,
}
for _, m := range migrations {
if _, err := s.db.Exec(m); err != nil {
@@ -136,13 +138,13 @@ func (s *DB) CreateTask(t *task.Task) error {
// GetTask retrieves a task by ID.
func (s *DB) GetTask(id string) (*task.Task, error) {
- row := s.db.QueryRow(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment FROM tasks WHERE id = ?`, id)
+ row := s.db.QueryRow(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json FROM tasks WHERE id = ?`, id)
return scanTask(row)
}
// ListTasks returns tasks matching the given filter.
func (s *DB) ListTasks(filter TaskFilter) ([]*task.Task, error) {
- query := `SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment FROM tasks WHERE 1=1`
+ query := `SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json FROM tasks WHERE 1=1`
var args []interface{}
if filter.State != "" {
@@ -174,7 +176,7 @@ func (s *DB) ListTasks(filter TaskFilter) ([]*task.Task, error) {
// ListSubtasks returns all tasks whose parent_task_id matches the given ID.
func (s *DB) ListSubtasks(parentID string) ([]*task.Task, error) {
- rows, err := s.db.Query(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment FROM tasks WHERE parent_task_id = ? ORDER BY created_at ASC`, parentID)
+ rows, err := s.db.Query(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json FROM tasks WHERE parent_task_id = ? ORDER BY created_at ASC`, parentID)
if err != nil {
return nil, err
}
@@ -308,28 +310,33 @@ type Execution struct {
ArtifactDir string
CostUSD float64
ErrorMsg string
+ SessionID string // claude --session-id; persisted for resume
+
+ // In-memory only: set when creating a resume execution, not stored in DB.
+ ResumeSessionID string
+ ResumeAnswer string
}
// CreateExecution inserts an execution record.
func (s *DB) CreateExecution(e *Execution) error {
_, err := s.db.Exec(`
- INSERT INTO executions (id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
+ INSERT INTO executions (id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
e.ID, e.TaskID, e.StartTime.UTC(), e.EndTime.UTC(), e.ExitCode, e.Status,
- e.StdoutPath, e.StderrPath, e.ArtifactDir, e.CostUSD, e.ErrorMsg,
+ e.StdoutPath, e.StderrPath, e.ArtifactDir, e.CostUSD, e.ErrorMsg, e.SessionID,
)
return err
}
// GetExecution retrieves an execution by ID.
func (s *DB) GetExecution(id string) (*Execution, error) {
- row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg FROM executions WHERE id = ?`, id)
+ row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id FROM executions WHERE id = ?`, id)
return scanExecution(row)
}
// ListExecutions returns executions for a task.
func (s *DB) ListExecutions(taskID string) ([]*Execution, error) {
- rows, err := s.db.Query(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg FROM executions WHERE task_id = ? ORDER BY start_time DESC`, taskID)
+ rows, err := s.db.Query(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id FROM executions WHERE task_id = ? ORDER BY start_time DESC`, taskID)
if err != nil {
return nil, err
}
@@ -346,6 +353,20 @@ func (s *DB) ListExecutions(taskID string) ([]*Execution, error) {
return execs, rows.Err()
}
+// GetLatestExecution returns the most recent execution for a task.
+func (s *DB) GetLatestExecution(taskID string) (*Execution, error) {
+ row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id FROM executions WHERE task_id = ? ORDER BY start_time DESC LIMIT 1`, taskID)
+ return scanExecution(row)
+}
+
+// UpdateTaskQuestion stores the pending question JSON on a task.
+// Pass empty string to clear the question after it has been answered.
+func (s *DB) UpdateTaskQuestion(taskID, questionJSON string) error {
+ _, err := s.db.Exec(`UPDATE tasks SET question_json = ?, updated_at = ? WHERE id = ?`,
+ questionJSON, time.Now().UTC(), taskID)
+ return err
+}
+
// UpdateExecution updates a completed execution.
func (s *DB) UpdateExecution(e *Execution) error {
_, err := s.db.Exec(`
@@ -374,10 +395,12 @@ func scanTask(row scanner) (*task.Task, error) {
timeoutNS int64
parentTaskID sql.NullString
rejectionComment sql.NullString
+ questionJSON sql.NullString
)
- err := row.Scan(&t.ID, &t.Name, &t.Description, &configJSON, &priority, &timeoutNS, &retryJSON, &tagsJSON, &depsJSON, &parentTaskID, &state, &t.CreatedAt, &t.UpdatedAt, &rejectionComment)
+ err := row.Scan(&t.ID, &t.Name, &t.Description, &configJSON, &priority, &timeoutNS, &retryJSON, &tagsJSON, &depsJSON, &parentTaskID, &state, &t.CreatedAt, &t.UpdatedAt, &rejectionComment, &questionJSON)
t.ParentTaskID = parentTaskID.String
t.RejectionComment = rejectionComment.String
+ t.QuestionJSON = questionJSON.String
if err != nil {
return nil, err
}
@@ -405,11 +428,13 @@ func scanTaskRows(rows *sql.Rows) (*task.Task, error) {
func scanExecution(row scanner) (*Execution, error) {
var e Execution
+ var sessionID sql.NullString
err := row.Scan(&e.ID, &e.TaskID, &e.StartTime, &e.EndTime, &e.ExitCode, &e.Status,
- &e.StdoutPath, &e.StderrPath, &e.ArtifactDir, &e.CostUSD, &e.ErrorMsg)
+ &e.StdoutPath, &e.StderrPath, &e.ArtifactDir, &e.CostUSD, &e.ErrorMsg, &sessionID)
if err != nil {
return nil, err
}
+ e.SessionID = sessionID.String
return &e, nil
}