summaryrefslogtreecommitdiff
path: root/internal/storage/db.go
diff options
context:
space:
mode:
authorPeter Stone <thepeterstone@gmail.com>2026-03-04 21:25:34 +0000
committerPeter Stone <thepeterstone@gmail.com>2026-03-04 21:25:34 +0000
commit6511d6e0ff139495413c7848a9b4aabb9d9ee4e2 (patch)
tree95bd6a0efc0ace206a5716da62a5956491cb46e7 /internal/storage/db.go
parent3962597950421e422b6e1ce57764550f5600ded6 (diff)
Add READY state for human-in-the-loop verification
Top-level tasks now land in READY after successful execution instead of going directly to COMPLETED. Subtasks (with parent_task_id) skip the gate and remain COMPLETED. Users accept or reject via new API endpoints: POST /api/tasks/{id}/accept → READY → COMPLETED POST /api/tasks/{id}/reject → READY → PENDING (with rejection_comment) - task: add StateReady, RejectionComment field, update ValidTransition - storage: migrate rejection_comment column, add RejectTask method - executor: route top-level vs subtask to READY vs COMPLETED - api: /accept and /reject handlers with 409 on invalid state Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/storage/db.go')
-rw-r--r--internal/storage/db.go53
1 files changed, 38 insertions, 15 deletions
diff --git a/internal/storage/db.go b/internal/storage/db.go
index 0117ae7..e656f98 100644
--- a/internal/storage/db.go
+++ b/internal/storage/db.go
@@ -87,6 +87,7 @@ func (s *DB) migrate() error {
// Additive migrations for columns added after initial schema.
migrations := []string{
`ALTER TABLE tasks ADD COLUMN parent_task_id TEXT`,
+ `ALTER TABLE tasks ADD COLUMN rejection_comment TEXT`,
}
for _, m := range migrations {
if _, err := s.db.Exec(m); err != nil {
@@ -135,13 +136,13 @@ func (s *DB) CreateTask(t *task.Task) error {
// GetTask retrieves a task by ID.
func (s *DB) GetTask(id string) (*task.Task, error) {
- row := s.db.QueryRow(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at FROM tasks WHERE id = ?`, id)
+ row := s.db.QueryRow(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment FROM tasks WHERE id = ?`, id)
return scanTask(row)
}
// ListTasks returns tasks matching the given filter.
func (s *DB) ListTasks(filter TaskFilter) ([]*task.Task, error) {
- query := `SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at FROM tasks WHERE 1=1`
+ query := `SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment FROM tasks WHERE 1=1`
var args []interface{}
if filter.State != "" {
@@ -173,7 +174,7 @@ func (s *DB) ListTasks(filter TaskFilter) ([]*task.Task, error) {
// ListSubtasks returns all tasks whose parent_task_id matches the given ID.
func (s *DB) ListSubtasks(parentID string) ([]*task.Task, error) {
- rows, err := s.db.Query(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at FROM tasks WHERE parent_task_id = ? ORDER BY created_at ASC`, parentID)
+ rows, err := s.db.Query(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment FROM tasks WHERE parent_task_id = ? ORDER BY created_at ASC`, parentID)
if err != nil {
return nil, err
}
@@ -207,6 +208,24 @@ func (s *DB) UpdateTaskState(id string, newState task.State) error {
return nil
}
+// RejectTask sets a task's state to PENDING and stores the rejection comment.
+func (s *DB) RejectTask(id, comment string) error {
+ now := time.Now().UTC()
+ result, err := s.db.Exec(`UPDATE tasks SET state = ?, rejection_comment = ?, updated_at = ? WHERE id = ?`,
+ string(task.StatePending), comment, now, id)
+ if err != nil {
+ return err
+ }
+ n, err := result.RowsAffected()
+ if err != nil {
+ return err
+ }
+ if n == 0 {
+ return fmt.Errorf("task %q not found", id)
+ }
+ return nil
+}
+
// TaskUpdate holds the fields that UpdateTask may change.
type TaskUpdate struct {
Name string
@@ -330,9 +349,11 @@ func (s *DB) ListExecutions(taskID string) ([]*Execution, error) {
// UpdateExecution updates a completed execution.
func (s *DB) UpdateExecution(e *Execution) error {
_, err := s.db.Exec(`
- UPDATE executions SET end_time = ?, exit_code = ?, status = ?, cost_usd = ?, error_msg = ?
+ UPDATE executions SET end_time = ?, exit_code = ?, status = ?, cost_usd = ?, error_msg = ?,
+ stdout_path = ?, stderr_path = ?, artifact_dir = ?
WHERE id = ?`,
- e.EndTime.UTC(), e.ExitCode, e.Status, e.CostUSD, e.ErrorMsg, e.ID,
+ e.EndTime.UTC(), e.ExitCode, e.Status, e.CostUSD, e.ErrorMsg,
+ e.StdoutPath, e.StderrPath, e.ArtifactDir, e.ID,
)
return err
}
@@ -343,18 +364,20 @@ type scanner interface {
func scanTask(row scanner) (*task.Task, error) {
var (
- t task.Task
- configJSON string
- retryJSON string
- tagsJSON string
- depsJSON string
- state string
- priority string
- timeoutNS int64
- parentTaskID sql.NullString
+ t task.Task
+ configJSON string
+ retryJSON string
+ tagsJSON string
+ depsJSON string
+ state string
+ priority string
+ timeoutNS int64
+ parentTaskID sql.NullString
+ rejectionComment sql.NullString
)
- err := row.Scan(&t.ID, &t.Name, &t.Description, &configJSON, &priority, &timeoutNS, &retryJSON, &tagsJSON, &depsJSON, &parentTaskID, &state, &t.CreatedAt, &t.UpdatedAt)
+ err := row.Scan(&t.ID, &t.Name, &t.Description, &configJSON, &priority, &timeoutNS, &retryJSON, &tagsJSON, &depsJSON, &parentTaskID, &state, &t.CreatedAt, &t.UpdatedAt, &rejectionComment)
t.ParentTaskID = parentTaskID.String
+ t.RejectionComment = rejectionComment.String
if err != nil {
return nil, err
}