summaryrefslogtreecommitdiff
path: root/internal/executor
diff options
context:
space:
mode:
Diffstat (limited to 'internal/executor')
-rw-r--r--internal/executor/claude.go8
-rw-r--r--internal/executor/executor.go43
-rw-r--r--internal/executor/executor_test.go58
-rw-r--r--internal/executor/preamble.go11
-rw-r--r--internal/executor/summary.go57
-rw-r--r--internal/executor/summary_test.go49
6 files changed, 217 insertions, 9 deletions
diff --git a/internal/executor/claude.go b/internal/executor/claude.go
index 0e29f7f..a58f1ad 100644
--- a/internal/executor/claude.go
+++ b/internal/executor/claude.go
@@ -150,6 +150,13 @@ func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Executi
return &BlockedError{QuestionJSON: strings.TrimSpace(string(data)), SessionID: e.SessionID, SandboxDir: sandboxDir}
}
+ // Read agent summary if written.
+ summaryFile := filepath.Join(logDir, "summary.txt")
+ if summaryData, readErr := os.ReadFile(summaryFile); readErr == nil {
+ os.Remove(summaryFile) // consumed
+ e.Summary = strings.TrimSpace(string(summaryData))
+ }
+
// Merge sandbox back to project_dir and clean up.
if sandboxDir != "" {
if mergeErr := teardownSandbox(projectDir, sandboxDir, r.Logger); mergeErr != nil {
@@ -261,6 +268,7 @@ func (r *ClaudeRunner) execOnce(ctx context.Context, args []string, workingDir s
"CLAUDOMATOR_API_URL="+r.APIURL,
"CLAUDOMATOR_TASK_ID="+e.TaskID,
"CLAUDOMATOR_QUESTION_FILE="+filepath.Join(e.ArtifactDir, "question.json"),
+ "CLAUDOMATOR_SUMMARY_FILE="+filepath.Join(e.ArtifactDir, "summary.txt"),
)
// Put the subprocess in its own process group so we can SIGKILL the entire
// group (MCP servers, bash children, etc.) on cancellation.
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index 76c8ac7..7ae4e2d 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -26,6 +26,8 @@ type Store interface {
UpdateExecution(e *storage.Execution) error
UpdateTaskState(id string, newState task.State) error
UpdateTaskQuestion(taskID, questionJSON string) error
+ UpdateTaskSummary(taskID, summary string) error
+ AppendTaskInteraction(taskID string, interaction task.Interaction) error
}
// LogPather is an optional interface runners can implement to provide the log
@@ -149,11 +151,20 @@ func (p *Pool) Cancel(taskID string) bool {
return true
}
-// SubmitResume re-queues a blocked task using the provided resume execution.
+// resumablePoolStates are the task states that may be submitted for session resume.
+var resumablePoolStates = map[task.State]bool{
+ task.StateBlocked: true,
+ task.StateTimedOut: true,
+ task.StateCancelled: true,
+ task.StateFailed: true,
+ task.StateBudgetExceeded: true,
+}
+
+// SubmitResume re-queues a blocked or interrupted task using the provided resume execution.
// The execution must have ResumeSessionID and ResumeAnswer set.
func (p *Pool) SubmitResume(ctx context.Context, t *task.Task, exec *storage.Execution) error {
- if t.State != task.StateBlocked && t.State != task.StateTimedOut {
- return fmt.Errorf("task %s must be in BLOCKED or TIMED_OUT state to resume (current: %s)", t.ID, t.State)
+ if !resumablePoolStates[t.State] {
+ return fmt.Errorf("task %s must be in a resumable state to resume (current: %s)", t.ID, t.State)
}
if exec.ResumeSessionID == "" {
return fmt.Errorf("resume execution for task %s must have a ResumeSessionID", t.ID)
@@ -331,6 +342,15 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.
}
}
+ summary := exec.Summary
+ if summary == "" && exec.StdoutPath != "" {
+ summary = extractSummary(exec.StdoutPath)
+ }
+ if summary != "" {
+ if summaryErr := p.store.UpdateTaskSummary(t.ID, summary); summaryErr != nil {
+ p.logger.Error("failed to update task summary", "taskID", t.ID, "error", summaryErr)
+ }
+ }
if updateErr := p.store.UpdateExecution(exec); updateErr != nil {
p.logger.Error("failed to update execution", "error", updateErr)
}
@@ -566,6 +586,23 @@ func (p *Pool) RecoverStaleRunning() {
}
}
+// RecoverStaleQueued re-submits any tasks that are stuck in QUEUED state from
+// a previous server instance. Call this once on server startup, after
+// RecoverStaleRunning.
+func (p *Pool) RecoverStaleQueued(ctx context.Context) {
+ tasks, err := p.store.ListTasks(storage.TaskFilter{State: task.StateQueued})
+ if err != nil {
+ p.logger.Error("RecoverStaleQueued: list tasks", "error", err)
+ return
+ }
+ for _, t := range tasks {
+ p.logger.Info("resubmitting stale QUEUED task", "taskID", t.ID, "name", t.Name)
+ if err := p.Submit(ctx, t); err != nil {
+ p.logger.Error("RecoverStaleQueued: submit", "error", err, "taskID", t.ID)
+ }
+ }
+}
+
// terminalFailureStates are dependency states that cause the waiting task to fail immediately.
var terminalFailureStates = map[task.State]bool{
task.StateFailed: true,
diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go
index 9448816..7e676eb 100644
--- a/internal/executor/executor_test.go
+++ b/internal/executor/executor_test.go
@@ -614,6 +614,60 @@ func TestPool_RecoverStaleRunning(t *testing.T) {
}
}
+func TestPool_RecoverStaleQueued_ResubmitsToPool(t *testing.T) {
+ store := testStore(t)
+ runner := &mockRunner{}
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, map[string]Runner{"claude": runner}, store, logger)
+
+ // Create a task already in QUEUED state (persisted from before a server restart).
+ tk := makeTask("stale-queued-1")
+ tk.State = task.StateQueued
+ store.CreateTask(tk)
+
+ pool.RecoverStaleQueued(context.Background())
+
+ // Wait for the pool to pick it up and complete it.
+ select {
+ case result := <-pool.Results():
+ if result.TaskID != tk.ID {
+ t.Errorf("unexpected task in results: %s", result.TaskID)
+ }
+ case <-time.After(2 * time.Second):
+ t.Fatal("timed out waiting for stale QUEUED task to complete")
+ }
+
+ got, err := store.GetTask(tk.ID)
+ if err != nil {
+ t.Fatalf("get task: %v", err)
+ }
+ if got.State != task.StateCompleted && got.State != task.StateReady {
+ t.Errorf("state: want COMPLETED or READY, got %q", got.State)
+ }
+ if runner.callCount() != 1 {
+ t.Errorf("runner call count: want 1, got %d", runner.callCount())
+ }
+}
+
+func TestPool_RecoverStaleQueued_SkipsNonQueuedTasks(t *testing.T) {
+ store := testStore(t)
+ runner := &mockRunner{}
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, map[string]Runner{"claude": runner}, store, logger)
+
+ // PENDING task should NOT be resubmitted.
+ tk := makeTask("pending-1")
+ tk.State = task.StatePending
+ store.CreateTask(tk)
+
+ pool.RecoverStaleQueued(context.Background())
+ time.Sleep(50 * time.Millisecond)
+
+ if runner.callCount() != 0 {
+ t.Errorf("runner should not have been called for PENDING task, got %d calls", runner.callCount())
+ }
+}
+
func TestPool_ActivePerAgent_DeletesZeroEntries(t *testing.T) {
store := testStore(t)
runner := &mockRunner{}
@@ -906,6 +960,10 @@ func (m *minimalMockStore) UpdateTaskQuestion(taskID, questionJSON string) error
m.mu.Unlock()
return nil
}
+func (m *minimalMockStore) UpdateTaskSummary(taskID, summary string) error { return nil }
+func (m *minimalMockStore) AppendTaskInteraction(taskID string, _ task.Interaction) error {
+ return nil
+}
func (m *minimalMockStore) lastStateUpdate() (string, task.State, bool) {
m.mu.Lock()
diff --git a/internal/executor/preamble.go b/internal/executor/preamble.go
index 1993361..5e57852 100644
--- a/internal/executor/preamble.go
+++ b/internal/executor/preamble.go
@@ -48,15 +48,14 @@ The sandbox is rejected if there are any uncommitted modifications.
## Final Summary (mandatory)
-Before exiting, write a final summary paragraph (2-5 sentences) as your last output. Start it with "## Summary" on its own line. Describe:
-- What was accomplished
-- Key decisions made
-- Any issues or follow-ups needed
+Before exiting, write a brief summary paragraph (2–5 sentences) describing what you did
+and the outcome. Write it to the path in $CLAUDOMATOR_SUMMARY_FILE:
-This summary will be extracted and displayed in the task UI.
+ echo "Your summary here." > "$CLAUDOMATOR_SUMMARY_FILE"
----
+This summary is displayed in the task UI so the user knows what happened.
+---
`
func withPlanningPreamble(instructions string) string {
diff --git a/internal/executor/summary.go b/internal/executor/summary.go
new file mode 100644
index 0000000..a942de0
--- /dev/null
+++ b/internal/executor/summary.go
@@ -0,0 +1,57 @@
+package executor
+
+import (
+ "bufio"
+ "encoding/json"
+ "os"
+ "strings"
+)
+
+// extractSummary reads a stream-json stdout log and returns the text following
+// the last "## Summary" heading found in any assistant text block.
+// Returns empty string if the file cannot be read or no summary is found.
+func extractSummary(stdoutPath string) string {
+ f, err := os.Open(stdoutPath)
+ if err != nil {
+ return ""
+ }
+ defer f.Close()
+
+ var last string
+ scanner := bufio.NewScanner(f)
+ scanner.Buffer(make([]byte, 1024*1024), 1024*1024)
+ for scanner.Scan() {
+ if text := summaryFromLine(scanner.Bytes()); text != "" {
+ last = text
+ }
+ }
+ return last
+}
+
+// summaryFromLine parses a single stream-json line and returns the text after
+// "## Summary" if the line is an assistant text block containing that heading.
+func summaryFromLine(line []byte) string {
+ var event struct {
+ Type string `json:"type"`
+ Message struct {
+ Content []struct {
+ Type string `json:"type"`
+ Text string `json:"text"`
+ } `json:"content"`
+ } `json:"message"`
+ }
+ if err := json.Unmarshal(line, &event); err != nil || event.Type != "assistant" {
+ return ""
+ }
+ for _, block := range event.Message.Content {
+ if block.Type != "text" {
+ continue
+ }
+ idx := strings.Index(block.Text, "## Summary")
+ if idx == -1 {
+ continue
+ }
+ return strings.TrimSpace(block.Text[idx+len("## Summary"):])
+ }
+ return ""
+}
diff --git a/internal/executor/summary_test.go b/internal/executor/summary_test.go
new file mode 100644
index 0000000..4a73711
--- /dev/null
+++ b/internal/executor/summary_test.go
@@ -0,0 +1,49 @@
+package executor
+
+import (
+ "os"
+ "path/filepath"
+ "testing"
+)
+
+func TestExtractSummary_WithSummarySection(t *testing.T) {
+ dir := t.TempDir()
+ path := filepath.Join(dir, "stdout.log")
+ content := streamLine(`{"type":"assistant","message":{"content":[{"type":"text","text":"## Summary\nThe task was completed successfully."}]}}`)
+ if err := os.WriteFile(path, []byte(content), 0600); err != nil {
+ t.Fatal(err)
+ }
+ got := extractSummary(path)
+ want := "The task was completed successfully."
+ if got != want {
+ t.Errorf("got %q, want %q", got, want)
+ }
+}
+
+func TestExtractSummary_NoSummary(t *testing.T) {
+ dir := t.TempDir()
+ path := filepath.Join(dir, "stdout.log")
+ content := streamLine(`{"type":"assistant","message":{"content":[{"type":"text","text":"All done, no summary heading."}]}}`)
+ if err := os.WriteFile(path, []byte(content), 0600); err != nil {
+ t.Fatal(err)
+ }
+ got := extractSummary(path)
+ if got != "" {
+ t.Errorf("expected empty string, got %q", got)
+ }
+}
+
+func TestExtractSummary_MultipleSections_PicksLast(t *testing.T) {
+ dir := t.TempDir()
+ path := filepath.Join(dir, "stdout.log")
+ content := streamLine(`{"type":"assistant","message":{"content":[{"type":"text","text":"## Summary\nFirst summary."}]}}`) +
+ streamLine(`{"type":"assistant","message":{"content":[{"type":"text","text":"## Summary\nFinal summary."}]}}`)
+ if err := os.WriteFile(path, []byte(content), 0600); err != nil {
+ t.Fatal(err)
+ }
+ got := extractSummary(path)
+ want := "Final summary."
+ if got != want {
+ t.Errorf("got %q, want %q", got, want)
+ }
+}