From 5303a68d67e435da863353cdce09fa2e3a8c2ccd Mon Sep 17 00:00:00 2001 From: Peter Stone Date: Fri, 13 Mar 2026 03:14:40 +0000 Subject: feat: resume support, summary extraction, and task state improvements - Extend Resume to CANCELLED, FAILED, and BUDGET_EXCEEDED tasks - Add summary extraction from agent stdout stream-json output - Fix storage: persist stdout/stderr/artifact_dir paths in UpdateExecution - Clear question_json on ResetTaskForRetry - Resume BLOCKED tasks in preserved sandbox so Claude finds its session - Add planning preamble: CLAUDOMATOR_SUMMARY_FILE env var + summary step - Update ADR-002 with new state transitions - UI style improvements Co-Authored-By: Claude Sonnet 4.6 --- internal/api/docs/RAW_NARRATIVE.md | 117 +++++++++++++++++++++++++++++++++++++ internal/executor/claude.go | 8 +++ internal/executor/preamble.go | 11 ++++ internal/executor/summary.go | 57 ++++++++++++++++++ internal/executor/summary_test.go | 49 ++++++++++++++++ internal/storage/db.go | 66 +++++++++++++++++++-- internal/task/task.go | 51 +++++++++------- 7 files changed, 332 insertions(+), 27 deletions(-) create mode 100644 internal/api/docs/RAW_NARRATIVE.md create mode 100644 internal/executor/summary.go create mode 100644 internal/executor/summary_test.go (limited to 'internal') diff --git a/internal/api/docs/RAW_NARRATIVE.md b/internal/api/docs/RAW_NARRATIVE.md new file mode 100644 index 0000000..7944463 --- /dev/null +++ b/internal/api/docs/RAW_NARRATIVE.md @@ -0,0 +1,117 @@ + +--- 2026-03-10T09:33:34Z --- +run the Go test suite with race detector and fail if coverage < 80% + +--- 2026-03-10T09:33:34Z --- +do something + +--- 2026-03-10T09:33:34Z --- +do something + +--- 2026-03-10T16:46:39Z --- +run the Go test suite with race detector and fail if coverage < 80% + +--- 2026-03-10T16:46:39Z --- +do something + +--- 2026-03-10T16:46:39Z --- +do something + +--- 2026-03-10T17:16:31Z --- +run the Go test suite with race detector and fail if coverage < 80% + +--- 2026-03-10T17:16:31Z --- +do something + +--- 2026-03-10T17:16:31Z --- +do something + +--- 2026-03-10T17:25:16Z --- +run the Go test suite with race detector and fail if coverage < 80% + +--- 2026-03-10T17:25:16Z --- +do something + +--- 2026-03-10T17:25:16Z --- +do something + +--- 2026-03-10T23:54:53Z --- +run the Go test suite with race detector and fail if coverage < 80% + +--- 2026-03-10T23:54:53Z --- +do something + +--- 2026-03-10T23:54:53Z --- +do something + +--- 2026-03-10T23:55:54Z --- +run the Go test suite with race detector and fail if coverage < 80% + +--- 2026-03-10T23:55:54Z --- +do something + +--- 2026-03-10T23:55:54Z --- +do something + +--- 2026-03-10T23:56:06Z --- +run the Go test suite with race detector and fail if coverage < 80% + +--- 2026-03-10T23:56:06Z --- +do something + +--- 2026-03-10T23:56:06Z --- +do something + +--- 2026-03-10T23:57:26Z --- +run the Go test suite with race detector and fail if coverage < 80% + +--- 2026-03-10T23:57:26Z --- +do something + +--- 2026-03-10T23:57:26Z --- +do something + +--- 2026-03-11T07:40:17Z --- +run the Go test suite with race detector and fail if coverage < 80% + +--- 2026-03-11T07:40:17Z --- +do something + +--- 2026-03-11T07:40:17Z --- +do something + +--- 2026-03-11T08:25:03Z --- +run the Go test suite with race detector and fail if coverage < 80% + +--- 2026-03-11T08:25:04Z --- +do something + +--- 2026-03-11T08:25:04Z --- +do something + +--- 2026-03-12T21:00:28Z --- +generate a report + +--- 2026-03-12T21:00:33Z --- +generate a report + +--- 2026-03-12T21:00:34Z --- +run the Go test suite with race detector and fail if coverage < 80% + +--- 2026-03-12T21:00:34Z --- +do something + +--- 2026-03-12T21:00:34Z --- +do something + +--- 2026-03-13T02:27:38Z --- +generate a report + +--- 2026-03-13T02:27:38Z --- +run the Go test suite with race detector and fail if coverage < 80% + +--- 2026-03-13T02:27:38Z --- +do something + +--- 2026-03-13T02:27:38Z --- +do something diff --git a/internal/executor/claude.go b/internal/executor/claude.go index 0e29f7f..a58f1ad 100644 --- a/internal/executor/claude.go +++ b/internal/executor/claude.go @@ -150,6 +150,13 @@ func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Executi return &BlockedError{QuestionJSON: strings.TrimSpace(string(data)), SessionID: e.SessionID, SandboxDir: sandboxDir} } + // Read agent summary if written. + summaryFile := filepath.Join(logDir, "summary.txt") + if summaryData, readErr := os.ReadFile(summaryFile); readErr == nil { + os.Remove(summaryFile) // consumed + e.Summary = strings.TrimSpace(string(summaryData)) + } + // Merge sandbox back to project_dir and clean up. if sandboxDir != "" { if mergeErr := teardownSandbox(projectDir, sandboxDir, r.Logger); mergeErr != nil { @@ -261,6 +268,7 @@ func (r *ClaudeRunner) execOnce(ctx context.Context, args []string, workingDir s "CLAUDOMATOR_API_URL="+r.APIURL, "CLAUDOMATOR_TASK_ID="+e.TaskID, "CLAUDOMATOR_QUESTION_FILE="+filepath.Join(e.ArtifactDir, "question.json"), + "CLAUDOMATOR_SUMMARY_FILE="+filepath.Join(e.ArtifactDir, "summary.txt"), ) // Put the subprocess in its own process group so we can SIGKILL the entire // group (MCP servers, bash children, etc.) on cancellation. diff --git a/internal/executor/preamble.go b/internal/executor/preamble.go index e50c16f..bc5c32c 100644 --- a/internal/executor/preamble.go +++ b/internal/executor/preamble.go @@ -46,6 +46,17 @@ The sandbox is rejected if there are any uncommitted modifications. --- +## Final Summary (mandatory) + +Before exiting, write a brief summary paragraph (2–5 sentences) describing what you did +and the outcome. Write it to the path in $CLAUDOMATOR_SUMMARY_FILE: + + echo "Your summary here." > "$CLAUDOMATOR_SUMMARY_FILE" + +This summary is displayed in the task UI so the user knows what happened. + +--- + ` func withPlanningPreamble(instructions string) string { diff --git a/internal/executor/summary.go b/internal/executor/summary.go new file mode 100644 index 0000000..a942de0 --- /dev/null +++ b/internal/executor/summary.go @@ -0,0 +1,57 @@ +package executor + +import ( + "bufio" + "encoding/json" + "os" + "strings" +) + +// extractSummary reads a stream-json stdout log and returns the text following +// the last "## Summary" heading found in any assistant text block. +// Returns empty string if the file cannot be read or no summary is found. +func extractSummary(stdoutPath string) string { + f, err := os.Open(stdoutPath) + if err != nil { + return "" + } + defer f.Close() + + var last string + scanner := bufio.NewScanner(f) + scanner.Buffer(make([]byte, 1024*1024), 1024*1024) + for scanner.Scan() { + if text := summaryFromLine(scanner.Bytes()); text != "" { + last = text + } + } + return last +} + +// summaryFromLine parses a single stream-json line and returns the text after +// "## Summary" if the line is an assistant text block containing that heading. +func summaryFromLine(line []byte) string { + var event struct { + Type string `json:"type"` + Message struct { + Content []struct { + Type string `json:"type"` + Text string `json:"text"` + } `json:"content"` + } `json:"message"` + } + if err := json.Unmarshal(line, &event); err != nil || event.Type != "assistant" { + return "" + } + for _, block := range event.Message.Content { + if block.Type != "text" { + continue + } + idx := strings.Index(block.Text, "## Summary") + if idx == -1 { + continue + } + return strings.TrimSpace(block.Text[idx+len("## Summary"):]) + } + return "" +} diff --git a/internal/executor/summary_test.go b/internal/executor/summary_test.go new file mode 100644 index 0000000..4a73711 --- /dev/null +++ b/internal/executor/summary_test.go @@ -0,0 +1,49 @@ +package executor + +import ( + "os" + "path/filepath" + "testing" +) + +func TestExtractSummary_WithSummarySection(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "stdout.log") + content := streamLine(`{"type":"assistant","message":{"content":[{"type":"text","text":"## Summary\nThe task was completed successfully."}]}}`) + if err := os.WriteFile(path, []byte(content), 0600); err != nil { + t.Fatal(err) + } + got := extractSummary(path) + want := "The task was completed successfully." + if got != want { + t.Errorf("got %q, want %q", got, want) + } +} + +func TestExtractSummary_NoSummary(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "stdout.log") + content := streamLine(`{"type":"assistant","message":{"content":[{"type":"text","text":"All done, no summary heading."}]}}`) + if err := os.WriteFile(path, []byte(content), 0600); err != nil { + t.Fatal(err) + } + got := extractSummary(path) + if got != "" { + t.Errorf("expected empty string, got %q", got) + } +} + +func TestExtractSummary_MultipleSections_PicksLast(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "stdout.log") + content := streamLine(`{"type":"assistant","message":{"content":[{"type":"text","text":"## Summary\nFirst summary."}]}}`) + + streamLine(`{"type":"assistant","message":{"content":[{"type":"text","text":"## Summary\nFinal summary."}]}}`) + if err := os.WriteFile(path, []byte(content), 0600); err != nil { + t.Fatal(err) + } + got := extractSummary(path) + want := "Final summary." + if got != want { + t.Errorf("got %q, want %q", got, want) + } +} diff --git a/internal/storage/db.go b/internal/storage/db.go index aaf1e09..b8a7085 100644 --- a/internal/storage/db.go +++ b/internal/storage/db.go @@ -81,6 +81,8 @@ func (s *DB) migrate() error { `ALTER TABLE tasks ADD COLUMN question_json TEXT`, `ALTER TABLE executions ADD COLUMN session_id TEXT`, `ALTER TABLE executions ADD COLUMN sandbox_dir TEXT`, + `ALTER TABLE tasks ADD COLUMN summary TEXT`, + `ALTER TABLE tasks ADD COLUMN interactions_json TEXT NOT NULL DEFAULT '[]'`, } for _, m := range migrations { if _, err := s.db.Exec(m); err != nil { @@ -129,13 +131,13 @@ func (s *DB) CreateTask(t *task.Task) error { // GetTask retrieves a task by ID. func (s *DB) GetTask(id string) (*task.Task, error) { - row := s.db.QueryRow(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json FROM tasks WHERE id = ?`, id) + row := s.db.QueryRow(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json, summary, interactions_json FROM tasks WHERE id = ?`, id) return scanTask(row) } // ListTasks returns tasks matching the given filter. func (s *DB) ListTasks(filter TaskFilter) ([]*task.Task, error) { - query := `SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json FROM tasks WHERE 1=1` + query := `SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json, summary, interactions_json FROM tasks WHERE 1=1` var args []interface{} if filter.State != "" { @@ -167,7 +169,7 @@ func (s *DB) ListTasks(filter TaskFilter) ([]*task.Task, error) { // ListSubtasks returns all tasks whose parent_task_id matches the given ID. func (s *DB) ListSubtasks(parentID string) ([]*task.Task, error) { - rows, err := s.db.Query(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json FROM tasks WHERE parent_task_id = ? ORDER BY created_at ASC`, parentID) + rows, err := s.db.Query(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json, summary, interactions_json FROM tasks WHERE parent_task_id = ? ORDER BY created_at ASC`, parentID) if err != nil { return nil, err } @@ -220,7 +222,7 @@ func (s *DB) ResetTaskForRetry(id string) (*task.Task, error) { } defer tx.Rollback() //nolint:errcheck - t, err := scanTask(tx.QueryRow(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json FROM tasks WHERE id = ?`, id)) + t, err := scanTask(tx.QueryRow(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json, summary, interactions_json FROM tasks WHERE id = ?`, id)) if err != nil { if err == sql.ErrNoRows { return nil, fmt.Errorf("task %q not found", id) @@ -355,6 +357,8 @@ type Execution struct { // In-memory only: set when creating a resume execution, not stored in DB. ResumeSessionID string ResumeAnswer string + // In-memory only: populated by the runner after successful execution. + Summary string } // CreateExecution inserts an execution record. @@ -516,6 +520,48 @@ func (s *DB) UpdateTaskQuestion(taskID, questionJSON string) error { return err } +// UpdateTaskSummary stores the agent's final summary paragraph on a task. +func (s *DB) UpdateTaskSummary(taskID, summary string) error { + _, err := s.db.Exec(`UPDATE tasks SET summary = ?, updated_at = ? WHERE id = ?`, + summary, time.Now().UTC(), taskID) + return err +} + +// AppendTaskInteraction appends a Q&A interaction to the task's interaction history. +func (s *DB) AppendTaskInteraction(taskID string, interaction task.Interaction) error { + tx, err := s.db.Begin() + if err != nil { + return err + } + defer tx.Rollback() //nolint:errcheck + + var raw sql.NullString + if err := tx.QueryRow(`SELECT interactions_json FROM tasks WHERE id = ?`, taskID).Scan(&raw); err != nil { + if err == sql.ErrNoRows { + return fmt.Errorf("task %q not found", taskID) + } + return err + } + existing := raw.String + if existing == "" { + existing = "[]" + } + var interactions []task.Interaction + if err := json.Unmarshal([]byte(existing), &interactions); err != nil { + return fmt.Errorf("unmarshaling interactions: %w", err) + } + interactions = append(interactions, interaction) + updated, err := json.Marshal(interactions) + if err != nil { + return fmt.Errorf("marshaling interactions: %w", err) + } + if _, err := tx.Exec(`UPDATE tasks SET interactions_json = ?, updated_at = ? WHERE id = ?`, + string(updated), time.Now().UTC(), taskID); err != nil { + return err + } + return tx.Commit() +} + // UpdateExecution updates a completed execution. func (s *DB) UpdateExecution(e *Execution) error { _, err := s.db.Exec(` @@ -545,11 +591,14 @@ func scanTask(row scanner) (*task.Task, error) { parentTaskID sql.NullString rejectionComment sql.NullString questionJSON sql.NullString + summary sql.NullString + interactionsJSON sql.NullString ) - err := row.Scan(&t.ID, &t.Name, &t.Description, &configJSON, &priority, &timeoutNS, &retryJSON, &tagsJSON, &depsJSON, &parentTaskID, &state, &t.CreatedAt, &t.UpdatedAt, &rejectionComment, &questionJSON) + err := row.Scan(&t.ID, &t.Name, &t.Description, &configJSON, &priority, &timeoutNS, &retryJSON, &tagsJSON, &depsJSON, &parentTaskID, &state, &t.CreatedAt, &t.UpdatedAt, &rejectionComment, &questionJSON, &summary, &interactionsJSON) t.ParentTaskID = parentTaskID.String t.RejectionComment = rejectionComment.String t.QuestionJSON = questionJSON.String + t.Summary = summary.String if err != nil { return nil, err } @@ -568,6 +617,13 @@ func scanTask(row scanner) (*task.Task, error) { if err := json.Unmarshal([]byte(depsJSON), &t.DependsOn); err != nil { return nil, fmt.Errorf("unmarshaling depends_on: %w", err) } + raw := interactionsJSON.String + if raw == "" { + raw = "[]" + } + if err := json.Unmarshal([]byte(raw), &t.Interactions); err != nil { + return nil, fmt.Errorf("unmarshaling interactions: %w", err) + } return &t, nil } diff --git a/internal/task/task.go b/internal/task/task.go index 9968b15..2c57922 100644 --- a/internal/task/task.go +++ b/internal/task/task.go @@ -48,6 +48,14 @@ type RetryConfig struct { Backoff string `yaml:"backoff" json:"backoff"` // "linear", "exponential" } +// Interaction records a single question/answer exchange between an agent and the user. +type Interaction struct { + QuestionText string `json:"question_text"` + Options []string `json:"options,omitempty"` + Answer string `json:"answer,omitempty"` + AskedAt time.Time `json:"asked_at"` +} + type Task struct { ID string `yaml:"id" json:"id"` ParentTaskID string `yaml:"parent_task_id" json:"parent_task_id"` @@ -59,11 +67,13 @@ type Task struct { Priority Priority `yaml:"priority" json:"priority"` Tags []string `yaml:"tags" json:"tags"` DependsOn []string `yaml:"depends_on" json:"depends_on"` - State State `yaml:"-" json:"state"` - RejectionComment string `yaml:"-" json:"rejection_comment,omitempty"` - QuestionJSON string `yaml:"-" json:"question,omitempty"` - CreatedAt time.Time `yaml:"-" json:"created_at"` - UpdatedAt time.Time `yaml:"-" json:"updated_at"` + State State `yaml:"-" json:"state"` + RejectionComment string `yaml:"-" json:"rejection_comment,omitempty"` + QuestionJSON string `yaml:"-" json:"question,omitempty"` + Summary string `yaml:"-" json:"summary,omitempty"` + Interactions []Interaction `yaml:"-" json:"interactions,omitempty"` + CreatedAt time.Time `yaml:"-" json:"created_at"` + UpdatedAt time.Time `yaml:"-" json:"updated_at"` } // Duration wraps time.Duration for YAML unmarshaling from strings like "30m". @@ -94,27 +104,24 @@ type BatchFile struct { } // validTransitions maps each state to the set of states it may transition into. -// Terminal state COMPLETED has no outgoing edges. +// COMPLETED is the only true terminal state (no outgoing edges). // CANCELLED, FAILED, TIMED_OUT, and BUDGET_EXCEEDED all allow re-entry at QUEUED // (restart or retry). -var validTransitions = map[State][]State{ - StatePending: {StateQueued, StateCancelled}, - StateQueued: {StateRunning, StateCancelled}, - StateRunning: {StateReady, StateCompleted, StateFailed, StateTimedOut, StateCancelled, StateBudgetExceeded, StateBlocked}, - StateReady: {StateCompleted, StatePending}, - StateFailed: {StateQueued}, // retry - StateTimedOut: {StateQueued}, // retry or resume - StateCancelled: {StateQueued}, // restart - StateBudgetExceeded: {StateQueued}, // retry - StateBlocked: {StateQueued, StateReady}, +// READY may go back to PENDING on user rejection. +// BLOCKED may advance to READY when all subtasks complete, or back to QUEUED on user answer. +var validTransitions = map[State]map[State]bool{ + StatePending: {StateQueued: true, StateCancelled: true}, + StateQueued: {StateRunning: true, StateCancelled: true}, + StateRunning: {StateReady: true, StateCompleted: true, StateFailed: true, StateTimedOut: true, StateCancelled: true, StateBudgetExceeded: true, StateBlocked: true}, + StateReady: {StateCompleted: true, StatePending: true}, + StateFailed: {StateQueued: true}, // retry + StateTimedOut: {StateQueued: true}, // retry or resume + StateCancelled: {StateQueued: true}, // restart + StateBudgetExceeded: {StateQueued: true}, // retry + StateBlocked: {StateQueued: true, StateReady: true}, } // ValidTransition returns true if moving from the current state to next is allowed. func ValidTransition(from, to State) bool { - for _, allowed := range validTransitions[from] { - if allowed == to { - return true - } - } - return false + return validTransitions[from][to] } -- cgit v1.2.3