diff options
| author | Peter Stone <thepeterstone@gmail.com> | 2026-03-13 03:14:40 +0000 |
|---|---|---|
| committer | Peter Stone <thepeterstone@gmail.com> | 2026-03-13 03:14:40 +0000 |
| commit | 5303a68d67e435da863353cdce09fa2e3a8c2ccd (patch) | |
| tree | 2e16b9c17c11cbb3b7c9395e1b3fb119b73ef2ca /internal/storage | |
| parent | f28c22352aa1a8ede7552ee0277f7d60552d9094 (diff) | |
feat: resume support, summary extraction, and task state improvements
- Extend Resume to CANCELLED, FAILED, and BUDGET_EXCEEDED tasks
- Add summary extraction from agent stdout stream-json output
- Fix storage: persist stdout/stderr/artifact_dir paths in UpdateExecution
- Clear question_json on ResetTaskForRetry
- Resume BLOCKED tasks in preserved sandbox so Claude finds its session
- Add planning preamble: CLAUDOMATOR_SUMMARY_FILE env var + summary step
- Update ADR-002 with new state transitions
- UI style improvements
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/storage')
| -rw-r--r-- | internal/storage/db.go | 66 |
1 files changed, 61 insertions, 5 deletions
diff --git a/internal/storage/db.go b/internal/storage/db.go index aaf1e09..b8a7085 100644 --- a/internal/storage/db.go +++ b/internal/storage/db.go @@ -81,6 +81,8 @@ func (s *DB) migrate() error { `ALTER TABLE tasks ADD COLUMN question_json TEXT`, `ALTER TABLE executions ADD COLUMN session_id TEXT`, `ALTER TABLE executions ADD COLUMN sandbox_dir TEXT`, + `ALTER TABLE tasks ADD COLUMN summary TEXT`, + `ALTER TABLE tasks ADD COLUMN interactions_json TEXT NOT NULL DEFAULT '[]'`, } for _, m := range migrations { if _, err := s.db.Exec(m); err != nil { @@ -129,13 +131,13 @@ func (s *DB) CreateTask(t *task.Task) error { // GetTask retrieves a task by ID. func (s *DB) GetTask(id string) (*task.Task, error) { - row := s.db.QueryRow(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json FROM tasks WHERE id = ?`, id) + row := s.db.QueryRow(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json, summary, interactions_json FROM tasks WHERE id = ?`, id) return scanTask(row) } // ListTasks returns tasks matching the given filter. func (s *DB) ListTasks(filter TaskFilter) ([]*task.Task, error) { - query := `SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json FROM tasks WHERE 1=1` + query := `SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json, summary, interactions_json FROM tasks WHERE 1=1` var args []interface{} if filter.State != "" { @@ -167,7 +169,7 @@ func (s *DB) ListTasks(filter TaskFilter) ([]*task.Task, error) { // ListSubtasks returns all tasks whose parent_task_id matches the given ID. func (s *DB) ListSubtasks(parentID string) ([]*task.Task, error) { - rows, err := s.db.Query(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json FROM tasks WHERE parent_task_id = ? ORDER BY created_at ASC`, parentID) + rows, err := s.db.Query(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json, summary, interactions_json FROM tasks WHERE parent_task_id = ? ORDER BY created_at ASC`, parentID) if err != nil { return nil, err } @@ -220,7 +222,7 @@ func (s *DB) ResetTaskForRetry(id string) (*task.Task, error) { } defer tx.Rollback() //nolint:errcheck - t, err := scanTask(tx.QueryRow(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json FROM tasks WHERE id = ?`, id)) + t, err := scanTask(tx.QueryRow(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json, summary, interactions_json FROM tasks WHERE id = ?`, id)) if err != nil { if err == sql.ErrNoRows { return nil, fmt.Errorf("task %q not found", id) @@ -355,6 +357,8 @@ type Execution struct { // In-memory only: set when creating a resume execution, not stored in DB. ResumeSessionID string ResumeAnswer string + // In-memory only: populated by the runner after successful execution. + Summary string } // CreateExecution inserts an execution record. @@ -516,6 +520,48 @@ func (s *DB) UpdateTaskQuestion(taskID, questionJSON string) error { return err } +// UpdateTaskSummary stores the agent's final summary paragraph on a task. +func (s *DB) UpdateTaskSummary(taskID, summary string) error { + _, err := s.db.Exec(`UPDATE tasks SET summary = ?, updated_at = ? WHERE id = ?`, + summary, time.Now().UTC(), taskID) + return err +} + +// AppendTaskInteraction appends a Q&A interaction to the task's interaction history. +func (s *DB) AppendTaskInteraction(taskID string, interaction task.Interaction) error { + tx, err := s.db.Begin() + if err != nil { + return err + } + defer tx.Rollback() //nolint:errcheck + + var raw sql.NullString + if err := tx.QueryRow(`SELECT interactions_json FROM tasks WHERE id = ?`, taskID).Scan(&raw); err != nil { + if err == sql.ErrNoRows { + return fmt.Errorf("task %q not found", taskID) + } + return err + } + existing := raw.String + if existing == "" { + existing = "[]" + } + var interactions []task.Interaction + if err := json.Unmarshal([]byte(existing), &interactions); err != nil { + return fmt.Errorf("unmarshaling interactions: %w", err) + } + interactions = append(interactions, interaction) + updated, err := json.Marshal(interactions) + if err != nil { + return fmt.Errorf("marshaling interactions: %w", err) + } + if _, err := tx.Exec(`UPDATE tasks SET interactions_json = ?, updated_at = ? WHERE id = ?`, + string(updated), time.Now().UTC(), taskID); err != nil { + return err + } + return tx.Commit() +} + // UpdateExecution updates a completed execution. func (s *DB) UpdateExecution(e *Execution) error { _, err := s.db.Exec(` @@ -545,11 +591,14 @@ func scanTask(row scanner) (*task.Task, error) { parentTaskID sql.NullString rejectionComment sql.NullString questionJSON sql.NullString + summary sql.NullString + interactionsJSON sql.NullString ) - err := row.Scan(&t.ID, &t.Name, &t.Description, &configJSON, &priority, &timeoutNS, &retryJSON, &tagsJSON, &depsJSON, &parentTaskID, &state, &t.CreatedAt, &t.UpdatedAt, &rejectionComment, &questionJSON) + err := row.Scan(&t.ID, &t.Name, &t.Description, &configJSON, &priority, &timeoutNS, &retryJSON, &tagsJSON, &depsJSON, &parentTaskID, &state, &t.CreatedAt, &t.UpdatedAt, &rejectionComment, &questionJSON, &summary, &interactionsJSON) t.ParentTaskID = parentTaskID.String t.RejectionComment = rejectionComment.String t.QuestionJSON = questionJSON.String + t.Summary = summary.String if err != nil { return nil, err } @@ -568,6 +617,13 @@ func scanTask(row scanner) (*task.Task, error) { if err := json.Unmarshal([]byte(depsJSON), &t.DependsOn); err != nil { return nil, fmt.Errorf("unmarshaling depends_on: %w", err) } + raw := interactionsJSON.String + if raw == "" { + raw = "[]" + } + if err := json.Unmarshal([]byte(raw), &t.Interactions); err != nil { + return nil, fmt.Errorf("unmarshaling interactions: %w", err) + } return &t, nil } |
