diff options
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/cli/run.go | 3 | ||||
| -rw-r--r-- | internal/cli/serve.go | 3 | ||||
| -rw-r--r-- | internal/executor/executor.go | 7 | ||||
| -rw-r--r-- | internal/executor/executor_test.go | 17 | ||||
| -rw-r--r-- | internal/executor/summary.go | 95 | ||||
| -rw-r--r-- | internal/executor/summary_synth_test.go | 241 |
6 files changed, 365 insertions, 1 deletions
diff --git a/internal/cli/run.go b/internal/cli/run.go index 2da7b79..2d7c3d7 100644 --- a/internal/cli/run.go +++ b/internal/cli/run.go @@ -100,6 +100,9 @@ func runTasks(file string, parallel int, dryRun bool) error { LLM: localClient, GeminiBinaryPath: cfg.GeminiBinaryPath, } + if localClient != nil { + pool.LLM = localClient + } // Handle graceful shutdown. ctx, cancel := context.WithCancel(context.Background()) diff --git a/internal/cli/serve.go b/internal/cli/serve.go index 2263d01..5101b81 100644 --- a/internal/cli/serve.go +++ b/internal/cli/serve.go @@ -88,6 +88,9 @@ func serve(addr string) error { LLM: localClient, GeminiBinaryPath: cfg.GeminiBinaryPath, } + if localClient != nil { + pool.LLM = localClient + } pool.RecoverStaleRunning(context.Background()) pool.RecoverStaleQueued(context.Background()) pool.RecoverStaleBlocked() diff --git a/internal/executor/executor.go b/internal/executor/executor.go index f5aabe1..4501a3c 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -10,6 +10,7 @@ import ( "sync" "time" + "github.com/thepeterstone/claudomator/internal/llm" "github.com/thepeterstone/claudomator/internal/retry" "github.com/thepeterstone/claudomator/internal/storage" "github.com/thepeterstone/claudomator/internal/task" @@ -70,6 +71,9 @@ type Pool struct { doneCh chan struct{} // signals when a worker slot is freed Questions *QuestionRegistry Classifier *Classifier + // LLM, when non-nil, enables LLM-synthesized summaries for executions + // whose stdout did not include a "## Summary" heading. + LLM *llm.Client } // Result is emitted when a task execution completes. @@ -349,6 +353,9 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. if summary == "" && exec.StdoutPath != "" { summary = extractSummary(exec.StdoutPath) } + if summary == "" && p.LLM != nil && exec.StdoutPath != "" { + summary = synthesizeSummary(ctx, p.LLM, exec.StdoutPath) + } if summary != "" { if summaryErr := p.store.UpdateTaskSummary(t.ID, summary); summaryErr != nil { p.logger.Error("failed to update task summary", "taskID", t.ID, "error", summaryErr) diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go index 878a32d..b1173cb 100644 --- a/internal/executor/executor_test.go +++ b/internal/executor/executor_test.go @@ -980,6 +980,7 @@ type minimalMockStore struct { executions map[string]*storage.Execution stateUpdates []struct{ id string; state task.State } questionUpdates []string + summaryUpdates []struct{ taskID, summary string } changestatCalls []struct { execID string stats *task.Changestats @@ -1035,7 +1036,21 @@ func (m *minimalMockStore) UpdateTaskQuestion(taskID, questionJSON string) error m.mu.Unlock() return nil } -func (m *minimalMockStore) UpdateTaskSummary(taskID, summary string) error { return nil } +func (m *minimalMockStore) UpdateTaskSummary(taskID, summary string) error { + m.mu.Lock() + m.summaryUpdates = append(m.summaryUpdates, struct{ taskID, summary string }{taskID, summary}) + m.mu.Unlock() + return nil +} +func (m *minimalMockStore) lastSummaryUpdate() (string, string, bool) { + m.mu.Lock() + defer m.mu.Unlock() + if len(m.summaryUpdates) == 0 { + return "", "", false + } + last := m.summaryUpdates[len(m.summaryUpdates)-1] + return last.taskID, last.summary, true +} func (m *minimalMockStore) AppendTaskInteraction(taskID string, _ task.Interaction) error { return nil } diff --git a/internal/executor/summary.go b/internal/executor/summary.go index a942de0..bcf5cfd 100644 --- a/internal/executor/summary.go +++ b/internal/executor/summary.go @@ -2,11 +2,26 @@ package executor import ( "bufio" + "context" "encoding/json" + "io" "os" "strings" + "time" + + "github.com/thepeterstone/claudomator/internal/llm" ) +// synthesizeSummaryMaxBytes caps how much of the stdout log we send to the +// LLM. Larger values cost more tokens with diminishing returns for a 2-4 +// sentence summary. +const synthesizeSummaryMaxBytes = 16 * 1024 + +// synthesizeSummaryTimeout caps the LLM call so a slow local model can't +// stall executor finalization. On timeout, we return "" (the existing +// no-summary path takes over). +const synthesizeSummaryTimeout = 6 * time.Second + // extractSummary reads a stream-json stdout log and returns the text following // the last "## Summary" heading found in any assistant text block. // Returns empty string if the file cannot be read or no summary is found. @@ -28,6 +43,86 @@ func extractSummary(stdoutPath string) string { return last } +// synthesizeSummary asks the LLM to summarize the assistant text content in +// stdoutPath when no "## Summary" heading was present. Returns "" on any +// error, an empty file, or an empty model response — preserving the +// existing "no summary" behavior so the new path is purely additive. +func synthesizeSummary(parent context.Context, c *llm.Client, stdoutPath string) string { + if c == nil || stdoutPath == "" { + return "" + } + text := readAssistantTextTail(stdoutPath, synthesizeSummaryMaxBytes) + if strings.TrimSpace(text) == "" { + return "" + } + + cctx, cancel := context.WithTimeout(parent, synthesizeSummaryTimeout) + defer cancel() + resp, err := c.Chat(cctx, llm.ChatRequest{ + Messages: []llm.Message{ + {Role: "system", Content: "You summarize what an automated coding agent did. Reply with 2-4 sentences of plain prose. No bullets, no headings, no preamble."}, + {Role: "user", Content: "Here is the agent's output. Summarize what it accomplished:\n\n" + text}, + }, + }) + if err != nil { + return "" + } + return strings.TrimSpace(resp.Content) +} + +// readAssistantTextTail returns the concatenated `text` blocks from assistant +// stream-json events in the last maxBytes of the file. Non-assistant events +// (system, result, tool_use, etc.) are skipped so the LLM sees just what the +// agent said. Returns "" on any error. +func readAssistantTextTail(stdoutPath string, maxBytes int64) string { + f, err := os.Open(stdoutPath) + if err != nil { + return "" + } + defer f.Close() + + stat, err := f.Stat() + if err != nil { + return "" + } + size := stat.Size() + if size > maxBytes { + if _, err := f.Seek(size-maxBytes, io.SeekStart); err != nil { + return "" + } + } + + var sb strings.Builder + scanner := bufio.NewScanner(f) + scanner.Buffer(make([]byte, 1024*1024), 1024*1024) + first := size > maxBytes // if we seeked, drop the first (likely partial) line + for scanner.Scan() { + if first { + first = false + continue + } + var event struct { + Type string `json:"type"` + Message struct { + Content []struct { + Type string `json:"type"` + Text string `json:"text"` + } `json:"content"` + } `json:"message"` + } + if err := json.Unmarshal(scanner.Bytes(), &event); err != nil || event.Type != "assistant" { + continue + } + for _, block := range event.Message.Content { + if block.Type == "text" && block.Text != "" { + sb.WriteString(block.Text) + sb.WriteString("\n") + } + } + } + return sb.String() +} + // summaryFromLine parses a single stream-json line and returns the text after // "## Summary" if the line is an assistant text block containing that heading. func summaryFromLine(line []byte) string { diff --git a/internal/executor/summary_synth_test.go b/internal/executor/summary_synth_test.go new file mode 100644 index 0000000..7ad396d --- /dev/null +++ b/internal/executor/summary_synth_test.go @@ -0,0 +1,241 @@ +package executor + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strings" + "sync/atomic" + "testing" + + "github.com/thepeterstone/claudomator/internal/llm" + "github.com/thepeterstone/claudomator/internal/storage" +) + +func writeStreamLog(t *testing.T, lines []string) string { + t.Helper() + dir := t.TempDir() + path := filepath.Join(dir, "stdout.log") + var sb strings.Builder + for _, l := range lines { + sb.WriteString(l) + sb.WriteString("\n") + } + if err := os.WriteFile(path, []byte(sb.String()), 0600); err != nil { + t.Fatal(err) + } + return path +} + +func TestSynthesizeSummary_NilClient(t *testing.T) { + got := synthesizeSummary(context.Background(), nil, "/some/path") + if got != "" { + t.Errorf("nil client: want empty, got %q", got) + } +} + +func TestSynthesizeSummary_EmptyPath(t *testing.T) { + c := &llm.Client{Endpoint: "http://unused", Model: "x"} + got := synthesizeSummary(context.Background(), c, "") + if got != "" { + t.Errorf("empty path: want empty, got %q", got) + } +} + +func TestSynthesizeSummary_MissingFile(t *testing.T) { + c := &llm.Client{Endpoint: "http://unused", Model: "x"} + got := synthesizeSummary(context.Background(), c, "/nonexistent/file.log") + if got != "" { + t.Errorf("missing file: want empty, got %q", got) + } +} + +func TestSynthesizeSummary_EmptyAssistantContent(t *testing.T) { + // Log contains only system/result events — no assistant text. The function + // should short-circuit without calling the LLM. + path := writeStreamLog(t, []string{ + `{"type":"system","subtype":"init"}`, + `{"type":"result","subtype":"success","total_cost_usd":0}`, + }) + + var calls int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&calls, 1) + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, `{"choices":[{"message":{"content":"should not be returned"},"finish_reason":"stop"}],"usage":{}}`) + })) + defer srv.Close() + c := &llm.Client{Endpoint: srv.URL + "/v1", Model: "x"} + + got := synthesizeSummary(context.Background(), c, path) + if got != "" { + t.Errorf("empty content: want empty, got %q", got) + } + if atomic.LoadInt32(&calls) != 0 { + t.Errorf("LLM should not be called for empty assistant content") + } +} + +func TestSynthesizeSummary_LLMSuccess(t *testing.T) { + path := writeStreamLog(t, []string{ + `{"type":"assistant","message":{"content":[{"type":"text","text":"Ran the tests."}]}}`, + `{"type":"assistant","message":{"content":[{"type":"text","text":"Fixed the import."}]}}`, + `{"type":"result","subtype":"success"}`, + }) + + var capturedUser string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var body struct { + Messages []struct { + Role, Content string + } `json:"messages"` + } + json.NewDecoder(r.Body).Decode(&body) + for _, m := range body.Messages { + if m.Role == "user" { + capturedUser = m.Content + } + } + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, `{"choices":[{"message":{"content":" Agent ran tests and fixed an import. "},"finish_reason":"stop"}],"usage":{}}`) + })) + defer srv.Close() + c := &llm.Client{Endpoint: srv.URL + "/v1", Model: "x"} + + got := synthesizeSummary(context.Background(), c, path) + if got != "Agent ran tests and fixed an import." { + t.Errorf("summary: got %q", got) + } + if !strings.Contains(capturedUser, "Ran the tests.") { + t.Errorf("user prompt missing first assistant text; got: %s", capturedUser) + } + if !strings.Contains(capturedUser, "Fixed the import.") { + t.Errorf("user prompt missing second assistant text; got: %s", capturedUser) + } +} + +// TestPool_HandleRunResult_LLMSummaryFallback verifies the Pool falls back to +// LLM-synthesized summary when extractSummary returns empty. +func TestPool_HandleRunResult_LLMSummaryFallback(t *testing.T) { + // stdout has assistant text but no "## Summary" heading. + stdoutPath := writeStreamLog(t, []string{ + `{"type":"assistant","message":{"content":[{"type":"text","text":"Did the work without writing a summary section."}]}}`, + }) + + llmSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, `{"choices":[{"message":{"content":"Synthesized summary."},"finish_reason":"stop"}],"usage":{}}`) + })) + defer llmSrv.Close() + + store := newMinimalMockStore() + pool := newPoolWithMockStore(store) + pool.LLM = &llm.Client{Endpoint: llmSrv.URL + "/v1", Model: "x"} + + tk := makeTask("synth-summary") + store.tasks[tk.ID] = tk + exec := &storage.Execution{ID: "e-synth", TaskID: tk.ID, Status: "RUNNING", StdoutPath: stdoutPath} + + pool.handleRunResult(context.Background(), tk, exec, nil, "claude") + + id, summary, ok := store.lastSummaryUpdate() + if !ok { + t.Fatalf("expected UpdateTaskSummary to be called") + } + if id != tk.ID { + t.Errorf("summary recorded for wrong task: %q", id) + } + if summary != "Synthesized summary." { + t.Errorf("summary: got %q", summary) + } + + // Drain the result channel so the test exits cleanly. + <-pool.resultCh +} + +// TestPool_HandleRunResult_ExtractSummaryWins verifies the LLM is NOT called +// when the agent already wrote a "## Summary" section. +func TestPool_HandleRunResult_ExtractSummaryWins(t *testing.T) { + stdoutPath := writeStreamLog(t, []string{ + `{"type":"assistant","message":{"content":[{"type":"text","text":"## Summary\nAgent wrote its own summary."}]}}`, + }) + + var llmCalls int32 + llmSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&llmCalls, 1) + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, `{"choices":[{"message":{"content":"should not be used"},"finish_reason":"stop"}],"usage":{}}`) + })) + defer llmSrv.Close() + + store := newMinimalMockStore() + pool := newPoolWithMockStore(store) + pool.LLM = &llm.Client{Endpoint: llmSrv.URL + "/v1", Model: "x"} + + tk := makeTask("agent-summary") + store.tasks[tk.ID] = tk + exec := &storage.Execution{ID: "e-agent", TaskID: tk.ID, Status: "RUNNING", StdoutPath: stdoutPath} + + pool.handleRunResult(context.Background(), tk, exec, nil, "claude") + + if got := atomic.LoadInt32(&llmCalls); got != 0 { + t.Errorf("LLM should not be called when ## Summary is present; got %d calls", got) + } + _, summary, ok := store.lastSummaryUpdate() + if !ok { + t.Fatalf("expected UpdateTaskSummary") + } + if summary != "Agent wrote its own summary." { + t.Errorf("summary: got %q (want extractSummary output)", summary) + } + <-pool.resultCh +} + +func TestSynthesizeSummary_LLMFailure_ReturnsEmpty(t *testing.T) { + path := writeStreamLog(t, []string{ + `{"type":"assistant","message":{"content":[{"type":"text","text":"Did something."}]}}`, + }) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "boom", http.StatusInternalServerError) + })) + defer srv.Close() + c := &llm.Client{Endpoint: srv.URL + "/v1", Model: "x"} + + got := synthesizeSummary(context.Background(), c, path) + if got != "" { + t.Errorf("LLM failure: want empty, got %q", got) + } +} + +// TestReadAssistantTextTail_TailingLargeFile verifies the seek-to-tail +// behavior drops early content but keeps later assistant text. +func TestReadAssistantTextTail_TailingLargeFile(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "stdout.log") + f, err := os.Create(path) + if err != nil { + t.Fatal(err) + } + // Write a ton of garbage assistant lines, then a final marker. + for i := 0; i < 500; i++ { + fmt.Fprintf(f, `{"type":"assistant","message":{"content":[{"type":"text","text":"filler line that should be in the early part of a large file %04d"}]}}`+"\n", i) + } + fmt.Fprintln(f, `{"type":"assistant","message":{"content":[{"type":"text","text":"FINAL_MARKER_LINE"}]}}`) + f.Close() + + got := readAssistantTextTail(path, 4*1024) // 4 KB cap + if !strings.Contains(got, "FINAL_MARKER_LINE") { + t.Errorf("tail should contain final line; got: %s", got) + } + if strings.Contains(got, "filler line that should be in the early part of a large file 0000") { + end := 200 + if len(got) < end { + end = len(got) + } + t.Errorf("tail should NOT contain very-early line; got first 200 chars: %s", got[:end]) + } +} |
