From 6511d6e0ff139495413c7848a9b4aabb9d9ee4e2 Mon Sep 17 00:00:00 2001 From: Peter Stone Date: Wed, 4 Mar 2026 21:25:34 +0000 Subject: Add READY state for human-in-the-loop verification MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Top-level tasks now land in READY after successful execution instead of going directly to COMPLETED. Subtasks (with parent_task_id) skip the gate and remain COMPLETED. Users accept or reject via new API endpoints: POST /api/tasks/{id}/accept → READY → COMPLETED POST /api/tasks/{id}/reject → READY → PENDING (with rejection_comment) - task: add StateReady, RejectionComment field, update ValidTransition - storage: migrate rejection_comment column, add RejectTask method - executor: route top-level vs subtask to READY vs COMPLETED - api: /accept and /reject handlers with 409 on invalid state Co-Authored-By: Claude Sonnet 4.6 --- internal/storage/db.go | 53 ++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 38 insertions(+), 15 deletions(-) (limited to 'internal/storage/db.go') diff --git a/internal/storage/db.go b/internal/storage/db.go index 0117ae7..e656f98 100644 --- a/internal/storage/db.go +++ b/internal/storage/db.go @@ -87,6 +87,7 @@ func (s *DB) migrate() error { // Additive migrations for columns added after initial schema. migrations := []string{ `ALTER TABLE tasks ADD COLUMN parent_task_id TEXT`, + `ALTER TABLE tasks ADD COLUMN rejection_comment TEXT`, } for _, m := range migrations { if _, err := s.db.Exec(m); err != nil { @@ -135,13 +136,13 @@ func (s *DB) CreateTask(t *task.Task) error { // GetTask retrieves a task by ID. func (s *DB) GetTask(id string) (*task.Task, error) { - row := s.db.QueryRow(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at FROM tasks WHERE id = ?`, id) + row := s.db.QueryRow(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment FROM tasks WHERE id = ?`, id) return scanTask(row) } // ListTasks returns tasks matching the given filter. func (s *DB) ListTasks(filter TaskFilter) ([]*task.Task, error) { - query := `SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at FROM tasks WHERE 1=1` + query := `SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment FROM tasks WHERE 1=1` var args []interface{} if filter.State != "" { @@ -173,7 +174,7 @@ func (s *DB) ListTasks(filter TaskFilter) ([]*task.Task, error) { // ListSubtasks returns all tasks whose parent_task_id matches the given ID. func (s *DB) ListSubtasks(parentID string) ([]*task.Task, error) { - rows, err := s.db.Query(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at FROM tasks WHERE parent_task_id = ? ORDER BY created_at ASC`, parentID) + rows, err := s.db.Query(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment FROM tasks WHERE parent_task_id = ? ORDER BY created_at ASC`, parentID) if err != nil { return nil, err } @@ -207,6 +208,24 @@ func (s *DB) UpdateTaskState(id string, newState task.State) error { return nil } +// RejectTask sets a task's state to PENDING and stores the rejection comment. +func (s *DB) RejectTask(id, comment string) error { + now := time.Now().UTC() + result, err := s.db.Exec(`UPDATE tasks SET state = ?, rejection_comment = ?, updated_at = ? WHERE id = ?`, + string(task.StatePending), comment, now, id) + if err != nil { + return err + } + n, err := result.RowsAffected() + if err != nil { + return err + } + if n == 0 { + return fmt.Errorf("task %q not found", id) + } + return nil +} + // TaskUpdate holds the fields that UpdateTask may change. type TaskUpdate struct { Name string @@ -330,9 +349,11 @@ func (s *DB) ListExecutions(taskID string) ([]*Execution, error) { // UpdateExecution updates a completed execution. func (s *DB) UpdateExecution(e *Execution) error { _, err := s.db.Exec(` - UPDATE executions SET end_time = ?, exit_code = ?, status = ?, cost_usd = ?, error_msg = ? + UPDATE executions SET end_time = ?, exit_code = ?, status = ?, cost_usd = ?, error_msg = ?, + stdout_path = ?, stderr_path = ?, artifact_dir = ? WHERE id = ?`, - e.EndTime.UTC(), e.ExitCode, e.Status, e.CostUSD, e.ErrorMsg, e.ID, + e.EndTime.UTC(), e.ExitCode, e.Status, e.CostUSD, e.ErrorMsg, + e.StdoutPath, e.StderrPath, e.ArtifactDir, e.ID, ) return err } @@ -343,18 +364,20 @@ type scanner interface { func scanTask(row scanner) (*task.Task, error) { var ( - t task.Task - configJSON string - retryJSON string - tagsJSON string - depsJSON string - state string - priority string - timeoutNS int64 - parentTaskID sql.NullString + t task.Task + configJSON string + retryJSON string + tagsJSON string + depsJSON string + state string + priority string + timeoutNS int64 + parentTaskID sql.NullString + rejectionComment sql.NullString ) - err := row.Scan(&t.ID, &t.Name, &t.Description, &configJSON, &priority, &timeoutNS, &retryJSON, &tagsJSON, &depsJSON, &parentTaskID, &state, &t.CreatedAt, &t.UpdatedAt) + err := row.Scan(&t.ID, &t.Name, &t.Description, &configJSON, &priority, &timeoutNS, &retryJSON, &tagsJSON, &depsJSON, &parentTaskID, &state, &t.CreatedAt, &t.UpdatedAt, &rejectionComment) t.ParentTaskID = parentTaskID.String + t.RejectionComment = rejectionComment.String if err != nil { return nil, err } -- cgit v1.2.3