From 0865afc43be562dbe14528e4299b9e213b54cc93 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 28 Apr 2026 09:24:43 +0000 Subject: feat(executor): add LocalRunner and OpenAI-compat LLM client Phase 1 of "local OSS models as agents" plan. Adds a third Runner backed by any OpenAI-compatible HTTP server (Ollama, vLLM, LM Studio, llama.cpp), and migrates the Gemini-CLI classifier to route through the same client when configured. Two-layer split: internal/llm.Client is the workhorse (HTTP, no Pool, no DB) used directly by the classifier and any future internal helper that needs cheap reasoning. internal/executor.LocalRunner is a thin adapter implementing Runner for user-facing tasks. This avoids Pool reentrancy/deadlock when sub-second internal calls fire from inside Pool.execute(). Highlights: - internal/retry: relocated runWithBackoff/IsRateLimitError/ParseRetryAfter into a shared package reused by executor and llm. - internal/llm: Chat (non-streaming) and ChatStream (SSE) over /chat/completions with optional bearer auth, json_object response format, retry on 429/503, Retry-After parsing. - internal/executor/LocalRunner: streams deltas into stdout.log in the same stream-json envelope ClaudeRunner emits, then writes one consolidated assistant block plus a result terminator so existing parsers (extractSummary, ParseChangestatFromOutput) work unchanged. - internal/executor/Classifier: gains optional LLM field; uses json_object response format (no markdown-fence cleanup needed). Falls back to Gemini-CLI subprocess when LLM is nil. - Pool.skipClassification: now skips only when the requested agent type is registered, so unknown types still reach the load balancer. - Storage: additive tokens_in/tokens_out ALTERs on executions; CLI runners record cost_usd as before, LocalRunner records 0 + tokens. - Config: [local_model] section (endpoint, model, timeout_seconds, default_temperature, api_key). Empty endpoint = no LocalRunner registered, classifier falls back to Gemini. Pre-existing test issues fixed in passing: - claude_test.go setupSandbox callsites updated to current signature. - gemini_test.go TestParseGeminiStream skipped (asserts unimplemented GeminiRunner stream-error parsing; tracked separately). Plan: docs/plans/local-oss-runner.md. https://claude.ai/code/session_017Edeq947TpSm1vQTxMhi1J --- internal/executor/executor.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) (limited to 'internal/executor/executor.go') diff --git a/internal/executor/executor.go b/internal/executor/executor.go index c07171b..f5aabe1 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -10,6 +10,7 @@ import ( "sync" "time" + "github.com/thepeterstone/claudomator/internal/retry" "github.com/thepeterstone/claudomator/internal/storage" "github.com/thepeterstone/claudomator/internal/task" "github.com/google/uuid" @@ -268,9 +269,9 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex // resultCh. The caller must set exec.EndTime before calling. func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.Execution, err error, agentType string) { if err != nil { - if isRateLimitError(err) || isQuotaExhausted(err) { + if retry.IsRateLimitError(err) || isQuotaExhausted(err) { p.mu.Lock() - retryAfter := parseRetryAfter(err.Error()) + retryAfter := retry.ParseRetryAfter(err.Error()) if retryAfter == 0 { if isQuotaExhausted(err) { retryAfter = 5 * time.Hour @@ -424,8 +425,11 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { } p.mu.Unlock() - // If a specific agent is already requested, skip selection and classification. - skipClassification := t.Agent.Type == "claude" || t.Agent.Type == "gemini" + // If a specific agent is already requested AND we have a runner registered + // for it, skip selection and classification. Unknown/empty types fall + // through to the load balancer. + _, runnerKnown := p.runners[t.Agent.Type] + skipClassification := t.Agent.Type != "" && runnerKnown if !skipClassification { // Deterministically pick the agent with fewest active tasks. -- cgit v1.2.3 From 50f8fe8c1ff8b82e0bd399e5776e58bda3e57d1c Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 2 May 2026 08:00:17 +0000 Subject: feat(executor): synthesize execution summary via local LLM fallback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 4 of "local OSS models as agents" plan. Closes the epic. When an execution finishes and the agent did NOT write a "## Summary" heading in its stdout (so the existing extractSummary path returns empty), and the Pool has a local LLM configured, we now synthesize a 2-4 sentence summary from the assistant text content of the log tail. Behavior: - Primary path unchanged: if the agent wrote "## Summary", that wins byte-for-byte (TestPool_HandleRunResult_ExtractSummaryWins guards). - Fallback path: empty extractSummary + Pool.LLM != nil → synthesize. - All-empty path: when no LLM is configured, summary stays empty — identical to pre-Phase-4 behavior. Implementation: - Pool gains an LLM *llm.Client field, wired in serve.go and run.go alongside Classifier.LLM (same localClient used everywhere). - New synthesizeSummary in internal/executor/summary.go: * 6s timeout so a slow local model can't stall finalization * 16 KB tail cap on the stdout log * readAssistantTextTail seeks to the last 16 KB and skips the first (likely partial) line, parses each line as a stream-json event, joins assistant `text` blocks (skips system/result/etc). * Returns "" on any error so the caller's behavior never regresses. - handleRunResult: 3-tier summary resolution — exec.Summary set by runner → extractSummary → synthesizeSummary → empty. - minimalMockStore now records UpdateTaskSummary calls (additive; existing tests unaffected) so integration tests can assert. Tests (9 new): - synthesizeSummary nil client / empty path / missing file all return "" without HTTP calls. - empty assistant content short-circuits without LLM call. - success path returns trimmed body, with both assistant texts in the user prompt. - LLM 500 returns "" (caller handles same as no-summary). - readAssistantTextTail seeks past early content in a large file. - Pool integration: ## Summary present → LLM not called, agent text used. ## Summary absent + LLM set → LLM called, synthesized summary recorded against the right task ID. Plan: docs/plans/local-oss-runner.md. Epic complete. Post-epic deep cleanup queue captured in the same plan file for follow-up. https://claude.ai/code/session_017Edeq947TpSm1vQTxMhi1J --- docs/plans/local-oss-runner.md | 50 +++++++ internal/cli/run.go | 3 + internal/cli/serve.go | 3 + internal/executor/executor.go | 7 + internal/executor/executor_test.go | 17 ++- internal/executor/summary.go | 95 +++++++++++++ internal/executor/summary_synth_test.go | 241 ++++++++++++++++++++++++++++++++ 7 files changed, 415 insertions(+), 1 deletion(-) create mode 100644 internal/executor/summary_synth_test.go (limited to 'internal/executor/executor.go') diff --git a/docs/plans/local-oss-runner.md b/docs/plans/local-oss-runner.md index c065483..c3d6291 100644 --- a/docs/plans/local-oss-runner.md +++ b/docs/plans/local-oss-runner.md @@ -304,3 +304,53 @@ What the LLM can do with that: produce a tighter, project-aware investigation pr - All new tests green under `-race` - Existing webhook tests pass byte-for-byte when LLM not configured - Build clean; pushed + +--- + +# Phase 4 — Focused Plan (Execution Summary) + +## Scope + +`extractSummary` in `internal/executor/summary.go` is text-pattern based: it returns the body following the last `## Summary` heading in any assistant text block. When the agent didn't write one, summary stays empty. This is fine for Claude (which is prompted to write a summary), but not for arbitrary local-runner outputs, and not for cases where Claude exits early or hits a budget cap before the summary section. + +Phase 4 adds an LLM-based fallback: when `extractSummary` returns "" and the Pool has an LLM client, synthesize a 2-4 sentence summary from the tail of the stdout log. + +## What ships + +- New `synthesizeSummary(ctx, *llm.Client, stdoutPath string) string` in `internal/executor/summary.go`. Reads the last ~16 KB of the stdout log, strips stream-json envelopes to extract just the text content, and asks the LLM to summarize. +- New `LLM *llm.Client` field on `executor.Pool` (wired identically to `Classifier.LLM` in Phase 1). +- Hook into `Pool.handleRunResult` at the existing summary block: after `extractSummary` returns "", call `synthesizeSummary` if `p.LLM != nil`. +- Wiring in `cmd/claudomator/main.go` (none — main.go is a thin wrapper), `internal/cli/serve.go`, `internal/cli/run.go`: pass `localClient` to Pool. + +## Explicit non-goals + +- No changes to the Claude prompt or the `## Summary` extraction (that path stays primary) +- No changes to the storage schema (summary is already a `tasks.summary` TEXT column via `UpdateTaskSummary`) +- No streaming the summary — one-shot 2-4 sentence completion +- No new config knob for "prefer local for summary" — same `s.llm`/`p.LLM` gate applies; users opt out by not setting LocalModel.Endpoint +- No retroactive backfill of summaries on existing executions + +## Task list + +1. Add `LLM *llm.Client` field on `executor.Pool` (matches the `Classifier` pattern from Phase 1) +2. Implement `synthesizeSummary(ctx, *llm.Client, stdoutPath) string` in `internal/executor/summary.go`. Reads last ~16 KB, parses each line as a stream-json event, joins the assistant text content, calls `Chat` with a 6-second timeout asking for 2-4 sentences plain text. Returns "" on any error so the caller's existing empty-summary path stays unchanged. +3. Modify `Pool.handleRunResult`: after `extractSummary` returns empty, if `p.LLM != nil`, try `synthesizeSummary(ctx, p.LLM, exec.StdoutPath)`. If it returns non-empty, persist via `UpdateTaskSummary`. +4. Wire `Pool.LLM = localClient` in `internal/cli/serve.go` and `internal/cli/run.go` +5. Tests in `internal/executor/summary_test.go` (or a new file): + - `synthesizeSummary` with stub LLM: stdout.log containing stream-json text → assistant content extracted → LLM called → returned summary + - `synthesizeSummary` with no `## Summary` heading anywhere → still produces synthesized summary + - `synthesizeSummary` LLM failure → returns "" + - `synthesizeSummary` empty stdout file → returns "" + - Pool integration test: LocalRunner produces a stdout with no `## Summary` section, Pool's LLM is set, after handleRunResult the task's summary is non-empty +6. `go build ./... && go test -race ./...` +7. Commit as Phase 4 on the branch +8. Push + +## Stop conditions + +- New tests green under `-race` +- Existing tests unchanged (the extractSummary primary path keeps winning whenever a `## Summary` heading exists) +- Build clean; pushed +- Epic complete: `## Local OSS Models as a Third Runner` shipped end-to-end + +After Phase 4 lands, execute the post-epic deep cleanup using the queue at the top of this section. diff --git a/internal/cli/run.go b/internal/cli/run.go index 2da7b79..2d7c3d7 100644 --- a/internal/cli/run.go +++ b/internal/cli/run.go @@ -100,6 +100,9 @@ func runTasks(file string, parallel int, dryRun bool) error { LLM: localClient, GeminiBinaryPath: cfg.GeminiBinaryPath, } + if localClient != nil { + pool.LLM = localClient + } // Handle graceful shutdown. ctx, cancel := context.WithCancel(context.Background()) diff --git a/internal/cli/serve.go b/internal/cli/serve.go index 2263d01..5101b81 100644 --- a/internal/cli/serve.go +++ b/internal/cli/serve.go @@ -88,6 +88,9 @@ func serve(addr string) error { LLM: localClient, GeminiBinaryPath: cfg.GeminiBinaryPath, } + if localClient != nil { + pool.LLM = localClient + } pool.RecoverStaleRunning(context.Background()) pool.RecoverStaleQueued(context.Background()) pool.RecoverStaleBlocked() diff --git a/internal/executor/executor.go b/internal/executor/executor.go index f5aabe1..4501a3c 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -10,6 +10,7 @@ import ( "sync" "time" + "github.com/thepeterstone/claudomator/internal/llm" "github.com/thepeterstone/claudomator/internal/retry" "github.com/thepeterstone/claudomator/internal/storage" "github.com/thepeterstone/claudomator/internal/task" @@ -70,6 +71,9 @@ type Pool struct { doneCh chan struct{} // signals when a worker slot is freed Questions *QuestionRegistry Classifier *Classifier + // LLM, when non-nil, enables LLM-synthesized summaries for executions + // whose stdout did not include a "## Summary" heading. + LLM *llm.Client } // Result is emitted when a task execution completes. @@ -349,6 +353,9 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. if summary == "" && exec.StdoutPath != "" { summary = extractSummary(exec.StdoutPath) } + if summary == "" && p.LLM != nil && exec.StdoutPath != "" { + summary = synthesizeSummary(ctx, p.LLM, exec.StdoutPath) + } if summary != "" { if summaryErr := p.store.UpdateTaskSummary(t.ID, summary); summaryErr != nil { p.logger.Error("failed to update task summary", "taskID", t.ID, "error", summaryErr) diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go index 878a32d..b1173cb 100644 --- a/internal/executor/executor_test.go +++ b/internal/executor/executor_test.go @@ -980,6 +980,7 @@ type minimalMockStore struct { executions map[string]*storage.Execution stateUpdates []struct{ id string; state task.State } questionUpdates []string + summaryUpdates []struct{ taskID, summary string } changestatCalls []struct { execID string stats *task.Changestats @@ -1035,7 +1036,21 @@ func (m *minimalMockStore) UpdateTaskQuestion(taskID, questionJSON string) error m.mu.Unlock() return nil } -func (m *minimalMockStore) UpdateTaskSummary(taskID, summary string) error { return nil } +func (m *minimalMockStore) UpdateTaskSummary(taskID, summary string) error { + m.mu.Lock() + m.summaryUpdates = append(m.summaryUpdates, struct{ taskID, summary string }{taskID, summary}) + m.mu.Unlock() + return nil +} +func (m *minimalMockStore) lastSummaryUpdate() (string, string, bool) { + m.mu.Lock() + defer m.mu.Unlock() + if len(m.summaryUpdates) == 0 { + return "", "", false + } + last := m.summaryUpdates[len(m.summaryUpdates)-1] + return last.taskID, last.summary, true +} func (m *minimalMockStore) AppendTaskInteraction(taskID string, _ task.Interaction) error { return nil } diff --git a/internal/executor/summary.go b/internal/executor/summary.go index a942de0..bcf5cfd 100644 --- a/internal/executor/summary.go +++ b/internal/executor/summary.go @@ -2,11 +2,26 @@ package executor import ( "bufio" + "context" "encoding/json" + "io" "os" "strings" + "time" + + "github.com/thepeterstone/claudomator/internal/llm" ) +// synthesizeSummaryMaxBytes caps how much of the stdout log we send to the +// LLM. Larger values cost more tokens with diminishing returns for a 2-4 +// sentence summary. +const synthesizeSummaryMaxBytes = 16 * 1024 + +// synthesizeSummaryTimeout caps the LLM call so a slow local model can't +// stall executor finalization. On timeout, we return "" (the existing +// no-summary path takes over). +const synthesizeSummaryTimeout = 6 * time.Second + // extractSummary reads a stream-json stdout log and returns the text following // the last "## Summary" heading found in any assistant text block. // Returns empty string if the file cannot be read or no summary is found. @@ -28,6 +43,86 @@ func extractSummary(stdoutPath string) string { return last } +// synthesizeSummary asks the LLM to summarize the assistant text content in +// stdoutPath when no "## Summary" heading was present. Returns "" on any +// error, an empty file, or an empty model response — preserving the +// existing "no summary" behavior so the new path is purely additive. +func synthesizeSummary(parent context.Context, c *llm.Client, stdoutPath string) string { + if c == nil || stdoutPath == "" { + return "" + } + text := readAssistantTextTail(stdoutPath, synthesizeSummaryMaxBytes) + if strings.TrimSpace(text) == "" { + return "" + } + + cctx, cancel := context.WithTimeout(parent, synthesizeSummaryTimeout) + defer cancel() + resp, err := c.Chat(cctx, llm.ChatRequest{ + Messages: []llm.Message{ + {Role: "system", Content: "You summarize what an automated coding agent did. Reply with 2-4 sentences of plain prose. No bullets, no headings, no preamble."}, + {Role: "user", Content: "Here is the agent's output. Summarize what it accomplished:\n\n" + text}, + }, + }) + if err != nil { + return "" + } + return strings.TrimSpace(resp.Content) +} + +// readAssistantTextTail returns the concatenated `text` blocks from assistant +// stream-json events in the last maxBytes of the file. Non-assistant events +// (system, result, tool_use, etc.) are skipped so the LLM sees just what the +// agent said. Returns "" on any error. +func readAssistantTextTail(stdoutPath string, maxBytes int64) string { + f, err := os.Open(stdoutPath) + if err != nil { + return "" + } + defer f.Close() + + stat, err := f.Stat() + if err != nil { + return "" + } + size := stat.Size() + if size > maxBytes { + if _, err := f.Seek(size-maxBytes, io.SeekStart); err != nil { + return "" + } + } + + var sb strings.Builder + scanner := bufio.NewScanner(f) + scanner.Buffer(make([]byte, 1024*1024), 1024*1024) + first := size > maxBytes // if we seeked, drop the first (likely partial) line + for scanner.Scan() { + if first { + first = false + continue + } + var event struct { + Type string `json:"type"` + Message struct { + Content []struct { + Type string `json:"type"` + Text string `json:"text"` + } `json:"content"` + } `json:"message"` + } + if err := json.Unmarshal(scanner.Bytes(), &event); err != nil || event.Type != "assistant" { + continue + } + for _, block := range event.Message.Content { + if block.Type == "text" && block.Text != "" { + sb.WriteString(block.Text) + sb.WriteString("\n") + } + } + } + return sb.String() +} + // summaryFromLine parses a single stream-json line and returns the text after // "## Summary" if the line is an assistant text block containing that heading. func summaryFromLine(line []byte) string { diff --git a/internal/executor/summary_synth_test.go b/internal/executor/summary_synth_test.go new file mode 100644 index 0000000..7ad396d --- /dev/null +++ b/internal/executor/summary_synth_test.go @@ -0,0 +1,241 @@ +package executor + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strings" + "sync/atomic" + "testing" + + "github.com/thepeterstone/claudomator/internal/llm" + "github.com/thepeterstone/claudomator/internal/storage" +) + +func writeStreamLog(t *testing.T, lines []string) string { + t.Helper() + dir := t.TempDir() + path := filepath.Join(dir, "stdout.log") + var sb strings.Builder + for _, l := range lines { + sb.WriteString(l) + sb.WriteString("\n") + } + if err := os.WriteFile(path, []byte(sb.String()), 0600); err != nil { + t.Fatal(err) + } + return path +} + +func TestSynthesizeSummary_NilClient(t *testing.T) { + got := synthesizeSummary(context.Background(), nil, "/some/path") + if got != "" { + t.Errorf("nil client: want empty, got %q", got) + } +} + +func TestSynthesizeSummary_EmptyPath(t *testing.T) { + c := &llm.Client{Endpoint: "http://unused", Model: "x"} + got := synthesizeSummary(context.Background(), c, "") + if got != "" { + t.Errorf("empty path: want empty, got %q", got) + } +} + +func TestSynthesizeSummary_MissingFile(t *testing.T) { + c := &llm.Client{Endpoint: "http://unused", Model: "x"} + got := synthesizeSummary(context.Background(), c, "/nonexistent/file.log") + if got != "" { + t.Errorf("missing file: want empty, got %q", got) + } +} + +func TestSynthesizeSummary_EmptyAssistantContent(t *testing.T) { + // Log contains only system/result events — no assistant text. The function + // should short-circuit without calling the LLM. + path := writeStreamLog(t, []string{ + `{"type":"system","subtype":"init"}`, + `{"type":"result","subtype":"success","total_cost_usd":0}`, + }) + + var calls int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&calls, 1) + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, `{"choices":[{"message":{"content":"should not be returned"},"finish_reason":"stop"}],"usage":{}}`) + })) + defer srv.Close() + c := &llm.Client{Endpoint: srv.URL + "/v1", Model: "x"} + + got := synthesizeSummary(context.Background(), c, path) + if got != "" { + t.Errorf("empty content: want empty, got %q", got) + } + if atomic.LoadInt32(&calls) != 0 { + t.Errorf("LLM should not be called for empty assistant content") + } +} + +func TestSynthesizeSummary_LLMSuccess(t *testing.T) { + path := writeStreamLog(t, []string{ + `{"type":"assistant","message":{"content":[{"type":"text","text":"Ran the tests."}]}}`, + `{"type":"assistant","message":{"content":[{"type":"text","text":"Fixed the import."}]}}`, + `{"type":"result","subtype":"success"}`, + }) + + var capturedUser string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var body struct { + Messages []struct { + Role, Content string + } `json:"messages"` + } + json.NewDecoder(r.Body).Decode(&body) + for _, m := range body.Messages { + if m.Role == "user" { + capturedUser = m.Content + } + } + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, `{"choices":[{"message":{"content":" Agent ran tests and fixed an import. "},"finish_reason":"stop"}],"usage":{}}`) + })) + defer srv.Close() + c := &llm.Client{Endpoint: srv.URL + "/v1", Model: "x"} + + got := synthesizeSummary(context.Background(), c, path) + if got != "Agent ran tests and fixed an import." { + t.Errorf("summary: got %q", got) + } + if !strings.Contains(capturedUser, "Ran the tests.") { + t.Errorf("user prompt missing first assistant text; got: %s", capturedUser) + } + if !strings.Contains(capturedUser, "Fixed the import.") { + t.Errorf("user prompt missing second assistant text; got: %s", capturedUser) + } +} + +// TestPool_HandleRunResult_LLMSummaryFallback verifies the Pool falls back to +// LLM-synthesized summary when extractSummary returns empty. +func TestPool_HandleRunResult_LLMSummaryFallback(t *testing.T) { + // stdout has assistant text but no "## Summary" heading. + stdoutPath := writeStreamLog(t, []string{ + `{"type":"assistant","message":{"content":[{"type":"text","text":"Did the work without writing a summary section."}]}}`, + }) + + llmSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, `{"choices":[{"message":{"content":"Synthesized summary."},"finish_reason":"stop"}],"usage":{}}`) + })) + defer llmSrv.Close() + + store := newMinimalMockStore() + pool := newPoolWithMockStore(store) + pool.LLM = &llm.Client{Endpoint: llmSrv.URL + "/v1", Model: "x"} + + tk := makeTask("synth-summary") + store.tasks[tk.ID] = tk + exec := &storage.Execution{ID: "e-synth", TaskID: tk.ID, Status: "RUNNING", StdoutPath: stdoutPath} + + pool.handleRunResult(context.Background(), tk, exec, nil, "claude") + + id, summary, ok := store.lastSummaryUpdate() + if !ok { + t.Fatalf("expected UpdateTaskSummary to be called") + } + if id != tk.ID { + t.Errorf("summary recorded for wrong task: %q", id) + } + if summary != "Synthesized summary." { + t.Errorf("summary: got %q", summary) + } + + // Drain the result channel so the test exits cleanly. + <-pool.resultCh +} + +// TestPool_HandleRunResult_ExtractSummaryWins verifies the LLM is NOT called +// when the agent already wrote a "## Summary" section. +func TestPool_HandleRunResult_ExtractSummaryWins(t *testing.T) { + stdoutPath := writeStreamLog(t, []string{ + `{"type":"assistant","message":{"content":[{"type":"text","text":"## Summary\nAgent wrote its own summary."}]}}`, + }) + + var llmCalls int32 + llmSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&llmCalls, 1) + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, `{"choices":[{"message":{"content":"should not be used"},"finish_reason":"stop"}],"usage":{}}`) + })) + defer llmSrv.Close() + + store := newMinimalMockStore() + pool := newPoolWithMockStore(store) + pool.LLM = &llm.Client{Endpoint: llmSrv.URL + "/v1", Model: "x"} + + tk := makeTask("agent-summary") + store.tasks[tk.ID] = tk + exec := &storage.Execution{ID: "e-agent", TaskID: tk.ID, Status: "RUNNING", StdoutPath: stdoutPath} + + pool.handleRunResult(context.Background(), tk, exec, nil, "claude") + + if got := atomic.LoadInt32(&llmCalls); got != 0 { + t.Errorf("LLM should not be called when ## Summary is present; got %d calls", got) + } + _, summary, ok := store.lastSummaryUpdate() + if !ok { + t.Fatalf("expected UpdateTaskSummary") + } + if summary != "Agent wrote its own summary." { + t.Errorf("summary: got %q (want extractSummary output)", summary) + } + <-pool.resultCh +} + +func TestSynthesizeSummary_LLMFailure_ReturnsEmpty(t *testing.T) { + path := writeStreamLog(t, []string{ + `{"type":"assistant","message":{"content":[{"type":"text","text":"Did something."}]}}`, + }) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "boom", http.StatusInternalServerError) + })) + defer srv.Close() + c := &llm.Client{Endpoint: srv.URL + "/v1", Model: "x"} + + got := synthesizeSummary(context.Background(), c, path) + if got != "" { + t.Errorf("LLM failure: want empty, got %q", got) + } +} + +// TestReadAssistantTextTail_TailingLargeFile verifies the seek-to-tail +// behavior drops early content but keeps later assistant text. +func TestReadAssistantTextTail_TailingLargeFile(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "stdout.log") + f, err := os.Create(path) + if err != nil { + t.Fatal(err) + } + // Write a ton of garbage assistant lines, then a final marker. + for i := 0; i < 500; i++ { + fmt.Fprintf(f, `{"type":"assistant","message":{"content":[{"type":"text","text":"filler line that should be in the early part of a large file %04d"}]}}`+"\n", i) + } + fmt.Fprintln(f, `{"type":"assistant","message":{"content":[{"type":"text","text":"FINAL_MARKER_LINE"}]}}`) + f.Close() + + got := readAssistantTextTail(path, 4*1024) // 4 KB cap + if !strings.Contains(got, "FINAL_MARKER_LINE") { + t.Errorf("tail should contain final line; got: %s", got) + } + if strings.Contains(got, "filler line that should be in the early part of a large file 0000") { + end := 200 + if len(got) < end { + end = len(got) + } + t.Errorf("tail should NOT contain very-early line; got first 200 chars: %s", got[:end]) + } +} -- cgit v1.2.3 From 85c3bf4d28b0903a2005356339e6ea56855b8c80 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 3 May 2026 03:58:19 +0000 Subject: chore: post-epic cleanup — green test suite, no skips MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses the cleanup queue captured in docs/plans/local-oss-runner.md after the local-OSS-models epic landed. After this commit `go test -race ./...` is green across every package with zero `t.Skip` calls and no excluded tests. Real bugs fixed: - claude.go setupSandbox callsites used `sandboxDir, err := ...` which shadowed the outer variable, so BlockedError.SandboxDir was always empty. Resume-after-block was broken for both new and stale-sandbox paths. TestBlockedError_IncludesSandboxDir now exercises the right invariant. - TestPool_ActivePerAgent_DeletesZeroEntries flake under -race: the cleanup defer in execute()/executeResume() runs AFTER handleRunResult sends on resultCh, so consumers observing a result could see a still-counted activePerAgent entry. Extracted decActiveAgent(agentType, *cleaned) helper; called explicitly before every resultCh send, defer becomes a no-op via the cleaned flag. Verified clean over `go test -race -count=10`. Test infrastructure made hermetic: - gitSafe now also passes -c commit.gpgsign=false / -c tag.gpgsign=false so sandbox tests pass on hosts whose global config requires signing. - Bare repos in tests initialized with `-b main` (HEAD symbolic ref matched to the branch we push) so `git log` after push works. - TestSandboxCloneSource_FallsBackToOrigin uses a local-FS origin URL, matching sandboxCloneSource's intentional filter against network URLs. - TestGeminiLogs_ParsedCorrectly URL fixed to the actual log route (/api/executions/{id}/log). GeminiRunner gap closed (partial): - parseGeminiStream now walks lines for `result` events, surfacing is_error as an error and total_cost_usd as the float return value. - GeminiRunner.Run propagates parsed cost to Execution.CostUSD. - TestParseGeminiStream_ParsesStructuredOutput unskipped. Notes: - GeminiRunner is still simulated end-to-end (Run writes hardcoded stream data instead of execing the binary). The result/cost parser now exists; finishing the runner is a smaller, contained follow-up. Kept on the deferred queue. - Frontend "Local" agent option and a minor storage.db.go logger TODO remain on the deferred queue, both intentionally — neither blocks anything in flight. https://claude.ai/code/session_017Edeq947TpSm1vQTxMhi1J --- docs/plans/local-oss-runner.md | 20 +++++++++ internal/api/server_test.go | 2 +- internal/executor/claude.go | 23 ++++++++--- internal/executor/claude_test.go | 37 +++++++++-------- internal/executor/executor.go | 57 ++++++++++++++------------ internal/executor/gemini.go | 87 ++++++++++++++++++++++++++++------------ internal/executor/gemini_test.go | 1 - 7 files changed, 151 insertions(+), 76 deletions(-) (limited to 'internal/executor/executor.go') diff --git a/docs/plans/local-oss-runner.md b/docs/plans/local-oss-runner.md index c3d6291..4d5cb87 100644 --- a/docs/plans/local-oss-runner.md +++ b/docs/plans/local-oss-runner.md @@ -202,6 +202,26 @@ After all four phases land, plan and execute a deep cleanup pass. Things noticed Goal: clean `go test -race ./...` with zero skips and zero environmental failures on whatever platform CI runs on. +## Cleanup pass — DONE + +All eight items in the cleanup queue above have been addressed in the post-epic cleanup commit. Summary of fixes: + +- `gitSafe` now disables `commit.gpgsign` and `tag.gpgsign` so sandbox tests pass on hosts with surprise signing config; matching `safe.directory=*` literals in test helpers updated for parity. +- Real bug found and fixed: `setupSandbox(...)` callsites in `claude.go` used `sandboxDir, err := ...` which shadowed the outer variable. `BlockedError.SandboxDir` was always empty as a result; `TestBlockedError_IncludesSandboxDir` now passes for the right reason. +- `parseGeminiStream` now parses `result` events for `is_error`/`total_cost_usd` and returns errors/cost accordingly; `TestParseGeminiStream_ParsesStructuredOutput` is unskipped. +- `GeminiRunner.Run` propagates parsed cost to `Execution.CostUSD`. +- `TestGeminiLogs_ParsedCorrectly` test URL fixed (`/api/tasks/{id}/executions/{exec-id}/log` → `/api/executions/{id}/log`, matching the actual route). +- `TestPool_ActivePerAgent_DeletesZeroEntries` flake root-caused: `handleRunResult` was sending on `resultCh` before `execute()`'s deferred cleanup ran, so consumers could observe a zero-count map entry. Extracted `decActiveAgent(agentType, *cleaned)` helper, called explicitly before each `resultCh` send, defer becomes no-op via the cleaned flag. Verified clean over `-count=10` under `-race`. +- `TestSandboxCloneSource_FallsBackToOrigin` updated to use a local-FS origin URL, matching `sandboxCloneSource`'s actual semantics (it filters non-local URLs to avoid network clones). +- All bare repos in tests created with `git init --bare -b main` so `HEAD` symbolically points at `main` (not the default `master`), unblocking the `git log` queries the tests perform after pushing. + +Test-suite state after cleanup: `go test -race ./...` is green across all packages with zero `t.Skip` calls and zero excluded tests. + +Items not chased (deferred deliberately): +- **GeminiRunner is still simulated** (`gemini.go` `Run` writes hardcoded stream data instead of executing the binary). The result/cost parsing now exists, so finishing the runner is a smaller, contained change. Kept on the queue but doesn't block anything else. +- **Frontend "Local" agent option** — UI dropdown still says "Auto / Claude / Gemini". Pending token telemetry surface. +- **`storage.db.go:706` TODO comment** — minor logger plumbing nit. Skipping unless it blocks something. + --- # Phase 2 — Focused Plan (Elaboration) diff --git a/internal/api/server_test.go b/internal/api/server_test.go index 5c0deba..516e289 100644 --- a/internal/api/server_test.go +++ b/internal/api/server_test.go @@ -255,7 +255,7 @@ func TestGeminiLogs_ParsedCorrectly(t *testing.T) { } // 6. Verify the content retrieved via the API endpoint. - req = httptest.NewRequest("GET", "/api/tasks/"+tk.ID+"/executions/"+exec.ID+"/log", nil) + req = httptest.NewRequest("GET", "/api/executions/"+exec.ID+"/log", nil) w = httptest.NewRecorder() srv.Handler().ServeHTTP(w, req) diff --git a/internal/executor/claude.go b/internal/executor/claude.go index e3f8e1c..fa68382 100644 --- a/internal/executor/claude.go +++ b/internal/executor/claude.go @@ -117,7 +117,7 @@ func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Executi e.SandboxDir = "" if projectDir != "" { var err error - sandboxDir, err := setupSandbox(t.Agent.ProjectDir, r.Logger) + sandboxDir, err = setupSandbox(t.Agent.ProjectDir, r.Logger) if err != nil { return fmt.Errorf("setting up sandbox: %w", err) } @@ -129,7 +129,7 @@ func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Executi } } else if projectDir != "" { var err error - sandboxDir, err := setupSandbox(t.Agent.ProjectDir, r.Logger) + sandboxDir, err = setupSandbox(t.Agent.ProjectDir, r.Logger) if err != nil { return fmt.Errorf("setting up sandbox: %w", err) } @@ -226,11 +226,22 @@ func extractQuestionText(questionJSON string) string { return strings.TrimSpace(q.Text) } -// gitSafe returns git arguments that prepend "-c safe.directory=*" so that -// commands succeed regardless of the repository owner. This is needed when -// claudomator operates on project directories owned by a different OS user. +// gitSafe returns git arguments that prepend safety overrides so that +// commands succeed regardless of the repository owner or the host's global +// git configuration. Specifically: +// +// - "-c safe.directory=*" lets us operate on directories owned by a +// different OS user. +// - "-c commit.gpgsign=false" / "-c tag.gpgsign=false" stop git from +// trying to sign commits via the host's signing tooling. Sandbox commits +// are internal and don't need to be signed; an unconfigured or broken +// signing setup on the host should never block a sandbox merge. func gitSafe(args ...string) []string { - return append([]string{"-c", "safe.directory=*"}, args...) + return append([]string{ + "-c", "safe.directory=*", + "-c", "commit.gpgsign=false", + "-c", "tag.gpgsign=false", + }, args...) } // sandboxCloneSource returns the URL to clone the sandbox from. It prefers a diff --git a/internal/executor/claude_test.go b/internal/executor/claude_test.go index 77596ca..b40c4ae 100644 --- a/internal/executor/claude_test.go +++ b/internal/executor/claude_test.go @@ -353,9 +353,9 @@ func TestExecOnce_NoGoroutineLeak_OnNaturalExit(t *testing.T) { func initGitRepo(t *testing.T, dir string) { t.Helper() cmds := [][]string{ - {"git", "-c", "safe.directory=*", "-C", dir, "init", "-b", "main"}, - {"git", "-c", "safe.directory=*", "-C", dir, "config", "user.email", "test@test"}, - {"git", "-c", "safe.directory=*", "-C", dir, "config", "user.name", "test"}, + {"git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", dir, "init", "-b", "main"}, + {"git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", dir, "config", "user.email", "test@test"}, + {"git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", dir, "config", "user.name", "test"}, } for _, args := range cmds { if out, err := exec.Command(args[0], args[1:]...).CombinedOutput(); err != nil { @@ -365,10 +365,10 @@ func initGitRepo(t *testing.T, dir string) { if err := os.WriteFile(filepath.Join(dir, "init.txt"), []byte("init"), 0644); err != nil { t.Fatal(err) } - if out, err := exec.Command("git", "-c", "safe.directory=*", "-C", dir, "add", ".").CombinedOutput(); err != nil { + if out, err := exec.Command("git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", dir, "add", ".").CombinedOutput(); err != nil { t.Fatalf("git add: %v\n%s", err, out) } - if out, err := exec.Command("git", "-c", "safe.directory=*", "-C", dir, "commit", "-m", "init").CombinedOutput(); err != nil { + if out, err := exec.Command("git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", dir, "commit", "-m", "init").CombinedOutput(); err != nil { t.Fatalf("git commit: %v\n%s", err, out) } } @@ -391,7 +391,10 @@ func TestSandboxCloneSource_PrefersLocalRemote(t *testing.T) { func TestSandboxCloneSource_FallsBackToOrigin(t *testing.T) { dir := t.TempDir() initGitRepo(t, dir) - originURL := "https://example.com/origin-repo" + // sandboxCloneSource intentionally filters to local-FS remotes (so + // `git clone ` doesn't go over the network). Use a local path + // for origin to verify the fallback semantics. + originURL := t.TempDir() exec.Command("git", "-C", dir, "remote", "add", "origin", originURL).Run() got := sandboxCloneSource(dir) @@ -455,23 +458,23 @@ func TestSetupSandbox_InitialisesNonGitDir(t *testing.T) { func TestTeardownSandbox_AutocommitsChanges(t *testing.T) { // Create a bare repo as origin so push succeeds. bare := t.TempDir() - if out, err := exec.Command("git", "init", "--bare", bare).CombinedOutput(); err != nil { + if out, err := exec.Command("git", "init", "--bare", "-b", "main", bare).CombinedOutput(); err != nil { t.Fatalf("git init bare: %v\n%s", err, out) } // Create a sandbox directly. sandbox := t.TempDir() initGitRepo(t, sandbox) - if out, err := exec.Command("git", "-c", "safe.directory=*", "-C", sandbox, "remote", "add", "origin", bare).CombinedOutput(); err != nil { + if out, err := exec.Command("git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", sandbox, "remote", "add", "origin", bare).CombinedOutput(); err != nil { t.Fatalf("git remote add: %v\n%s", err, out) } // Initial push to establish origin/main - if out, err := exec.Command("git", "-c", "safe.directory=*", "-C", sandbox, "push", "origin", "main").CombinedOutput(); err != nil { + if out, err := exec.Command("git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", sandbox, "push", "origin", "main").CombinedOutput(); err != nil { t.Fatalf("git push initial: %v\n%s", err, out) } // Capture startHEAD - headOut, err := exec.Command("git", "-c", "safe.directory=*", "-C", sandbox, "rev-parse", "HEAD").Output() + headOut, err := exec.Command("git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", sandbox, "rev-parse", "HEAD").Output() if err != nil { t.Fatalf("rev-parse HEAD: %v", err) } @@ -514,18 +517,18 @@ func TestTeardownSandbox_AutocommitsChanges(t *testing.T) { func TestTeardownSandbox_BuildFailure_BlocksAutocommit(t *testing.T) { bare := t.TempDir() - if out, err := exec.Command("git", "init", "--bare", bare).CombinedOutput(); err != nil { + if out, err := exec.Command("git", "init", "--bare", "-b", "main", bare).CombinedOutput(); err != nil { t.Fatalf("git init bare: %v\n%s", err, out) } sandbox := t.TempDir() initGitRepo(t, sandbox) - if out, err := exec.Command("git", "-c", "safe.directory=*", "-C", sandbox, "remote", "add", "origin", bare).CombinedOutput(); err != nil { + if out, err := exec.Command("git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", sandbox, "remote", "add", "origin", bare).CombinedOutput(); err != nil { t.Fatalf("git remote add: %v\n%s", err, out) } // Capture startHEAD - headOut, err := exec.Command("git", "-c", "safe.directory=*", "-C", sandbox, "rev-parse", "HEAD").Output() + headOut, err := exec.Command("git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", sandbox, "rev-parse", "HEAD").Output() if err != nil { t.Fatalf("rev-parse HEAD: %v", err) } @@ -566,18 +569,18 @@ func TestTeardownSandbox_BuildFailure_BlocksAutocommit(t *testing.T) { func TestTeardownSandbox_BuildSuccess_ProceedsToAutocommit(t *testing.T) { bare := t.TempDir() - if out, err := exec.Command("git", "init", "--bare", bare).CombinedOutput(); err != nil { + if out, err := exec.Command("git", "init", "--bare", "-b", "main", bare).CombinedOutput(); err != nil { t.Fatalf("git init bare: %v\n%s", err, out) } sandbox := t.TempDir() initGitRepo(t, sandbox) - if out, err := exec.Command("git", "-c", "safe.directory=*", "-C", sandbox, "remote", "add", "origin", bare).CombinedOutput(); err != nil { + if out, err := exec.Command("git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", sandbox, "remote", "add", "origin", bare).CombinedOutput(); err != nil { t.Fatalf("git remote add: %v\n%s", err, out) } // Capture startHEAD - headOut, err := exec.Command("git", "-c", "safe.directory=*", "-C", sandbox, "rev-parse", "HEAD").Output() + headOut, err := exec.Command("git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", sandbox, "rev-parse", "HEAD").Output() if err != nil { t.Fatalf("rev-parse HEAD: %v", err) } @@ -870,7 +873,7 @@ func TestTailFile_MissingFile_ReturnsEmpty(t *testing.T) { func TestGitSafe_PrependsSafeDirectory(t *testing.T) { got := gitSafe("-C", "/some/path", "status") - want := []string{"-c", "safe.directory=*", "-C", "/some/path", "status"} + want := []string{"-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-c", "tag.gpgsign=false", "-C", "/some/path", "status"} if len(got) != len(want) { t.Fatalf("gitSafe() = %v, want %v", got, want) } diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 4501a3c..315030d 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -196,6 +196,28 @@ func (p *Pool) getRunner(t *task.Task) (Runner, error) { return runner, nil } +// decActiveAgent decrements the active counters for a finished task. Safe to +// call multiple times — subsequent calls are no-ops via the cleaned flag. +// Always call this before sending on resultCh so consumers observing a result +// see the accounting already settled (no zero-count map entries lingering). +func (p *Pool) decActiveAgent(agentType string, cleaned *bool) { + if *cleaned { + return + } + *cleaned = true + p.mu.Lock() + p.active-- + p.activePerAgent[agentType]-- + if p.activePerAgent[agentType] == 0 { + delete(p.activePerAgent, agentType) + } + p.mu.Unlock() + select { + case p.doneCh <- struct{}{}: + default: + } +} + func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Execution) { agentType := t.Agent.Type if agentType == "" { @@ -206,23 +228,13 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex p.activePerAgent[agentType]++ p.mu.Unlock() - defer func() { - p.mu.Lock() - p.active-- - p.activePerAgent[agentType]-- - if p.activePerAgent[agentType] == 0 { - delete(p.activePerAgent, agentType) - } - p.mu.Unlock() - select { - case p.doneCh <- struct{}{}: - default: - } - }() + var cleaned bool + defer p.decActiveAgent(agentType, &cleaned) runner, err := p.getRunner(t) if err != nil { p.logger.Error("failed to get runner for resume", "error", err, "taskID", t.ID) + p.decActiveAgent(agentType, &cleaned) p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} return } @@ -264,6 +276,7 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex err = runner.Run(ctx, t, exec) exec.EndTime = time.Now().UTC() + p.decActiveAgent(agentType, &cleaned) p.handleRunResult(ctx, t, exec, err, agentType) } @@ -473,19 +486,8 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { p.activePerAgent[agentType]++ p.mu.Unlock() - defer func() { - p.mu.Lock() - p.active-- - p.activePerAgent[agentType]-- - if p.activePerAgent[agentType] == 0 { - delete(p.activePerAgent, agentType) - } - p.mu.Unlock() - select { - case p.doneCh <- struct{}{}: - default: - } - }() + var cleaned bool + defer p.decActiveAgent(agentType, &cleaned) runner, err := p.getRunner(t) if err != nil { @@ -505,6 +507,7 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil { p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateFailed, "error", err) } + p.decActiveAgent(agentType, &cleaned) p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} return } @@ -527,6 +530,7 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil { p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateFailed, "error", err) } + p.decActiveAgent(agentType, &cleaned) p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} return } @@ -583,6 +587,7 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { err = runner.Run(ctx, t, exec) exec.EndTime = time.Now().UTC() + p.decActiveAgent(agentType, &cleaned) p.handleRunResult(ctx, t, exec, err, agentType) } diff --git a/internal/executor/gemini.go b/internal/executor/gemini.go index d79c47d..7f2f54f 100644 --- a/internal/executor/gemini.go +++ b/internal/executor/gemini.go @@ -2,6 +2,7 @@ package executor import ( "context" + "encoding/json" "fmt" "io" "log/slog" @@ -117,16 +118,21 @@ func (r *GeminiRunner) execOnce(ctx context.Context, args []string, workingDir, var streamErr error + var streamCost float64 var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() - _, streamErr = parseGeminiStream(stdoutR, stdoutFile, r.Logger) + streamCost, streamErr = parseGeminiStream(stdoutR, stdoutFile, r.Logger) stdoutR.Close() }() wg.Wait() // Wait for parseGeminiStream to finish + if streamCost > 0 { + e.CostUSD = streamCost + } + // Set a dummy exit code for this simulated run e.ExitCode = 0 @@ -136,9 +142,10 @@ func (r *GeminiRunner) execOnce(ctx context.Context, args []string, workingDir, return nil } -// parseGeminiStream reads streaming JSON from the gemini CLI, unwraps markdown -// code blocks, writes the inner JSON to w, and returns (costUSD, error). -// For now, it focuses on unwrapping and writing, not detailed parsing of cost/errors. +// parseGeminiStream reads streaming JSON from the gemini CLI, strips markdown +// code fences if the output is wrapped in them, writes the inner stream-json +// to w, and returns (costUSD, error). If a `result` event has `is_error: true`, +// an error wrapping the result message is returned. func parseGeminiStream(r io.Reader, w io.Writer, logger *slog.Logger) (float64, error) { fullOutput, err := io.ReadAll(r) if err != nil { @@ -146,31 +153,61 @@ func parseGeminiStream(r io.Reader, w io.Writer, logger *slog.Logger) (float64, } logger.Debug("parseGeminiStream: raw output received", "output", string(fullOutput)) - outputStr := strings.TrimSpace(string(fullOutput)) // Trim leading/trailing whitespace/newlines from the whole output - - jsonContent := outputStr // Default to raw output if no markdown block is found or malformed - jsonStartIdx := strings.Index(outputStr, "```json") - if jsonStartIdx != -1 { - // Found "```json", now look for the closing "```" - jsonEndIdx := strings.LastIndex(outputStr, "```") - if jsonEndIdx != -1 && jsonEndIdx > jsonStartIdx { - // Extract content between the markdown fences. - jsonContent = outputStr[jsonStartIdx+len("```json"):jsonEndIdx] - jsonContent = strings.TrimSpace(jsonContent) // Trim again after extraction, to remove potential inner newlines - } else { - logger.Warn("Malformed markdown JSON block from Gemini (missing closing ``` or invalid structure), falling back to raw output.", "outputLength", len(outputStr)) + inner := stripGeminiFences(string(fullOutput), logger) + if _, writeErr := w.Write([]byte(inner)); writeErr != nil { + return 0, fmt.Errorf("writing gemini output: %w", writeErr) + } + + // Walk lines looking for a result event so we can surface errors and cost. + var ( + cost float64 + errMsg string + isError bool + ) + for _, raw := range strings.Split(inner, "\n") { + line := strings.TrimSpace(raw) + if line == "" { + continue + } + var evt struct { + Type string `json:"type"` + IsError bool `json:"is_error"` + Result string `json:"result"` + Cost float64 `json:"total_cost_usd"` + } + if err := json.Unmarshal([]byte(line), &evt); err != nil { + continue + } + if evt.Type == "result" { + if evt.Cost > 0 { + cost = evt.Cost + } + if evt.IsError { + isError = true + errMsg = evt.Result + } } - } else { - logger.Warn("No markdown JSON block found from Gemini, falling back to raw output.", "outputLength", len(outputStr)) } - - // Write the (possibly extracted and trimmed) JSON content to the writer. - _, writeErr := w.Write([]byte(jsonContent)) - if writeErr != nil { - return 0, fmt.Errorf("writing extracted gemini json: %w", writeErr) + if isError { + return cost, fmt.Errorf("gemini reported error: %s", errMsg) } + return cost, nil +} - return 0, nil // For now, no cost/error parsing for Gemini stream +// stripGeminiFences removes a surrounding ```json ... ``` markdown block if +// present, returning the trimmed inner content. If no markdown fence is +// found, the input is returned verbatim (no whitespace trimming) so callers +// that expect byte-exact pass-through behavior get it. +func stripGeminiFences(raw string, logger *slog.Logger) string { + trimmed := strings.TrimSpace(raw) + if start := strings.Index(trimmed, "```json"); start != -1 { + if end := strings.LastIndex(trimmed, "```"); end > start { + return strings.TrimSpace(trimmed[start+len("```json") : end]) + } + logger.Warn("malformed gemini markdown block (missing closing fence); using raw output", "len", len(trimmed)) + return trimmed + } + return raw } func (r *GeminiRunner) buildArgs(t *task.Task, e *storage.Execution, questionFile string) []string { diff --git a/internal/executor/gemini_test.go b/internal/executor/gemini_test.go index 75e3b45..4b0339e 100644 --- a/internal/executor/gemini_test.go +++ b/internal/executor/gemini_test.go @@ -148,7 +148,6 @@ func TestGeminiRunner_BinaryPath_Custom(t *testing.T) { func TestParseGeminiStream_ParsesStructuredOutput(t *testing.T) { - t.Skip("GeminiRunner stub: result error/cost parsing not yet implemented; tracked separately") // Simulate a stream-json input with various message types, including a result with error and cost. input := streamLine(`{"type":"content_block_start","content_block":{"text":"Hello,"}}`) + streamLine(`{"type":"content_block_delta","content_block":{"text":" World!"}}`) + -- cgit v1.2.3