From 0865afc43be562dbe14528e4299b9e213b54cc93 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 28 Apr 2026 09:24:43 +0000 Subject: feat(executor): add LocalRunner and OpenAI-compat LLM client Phase 1 of "local OSS models as agents" plan. Adds a third Runner backed by any OpenAI-compatible HTTP server (Ollama, vLLM, LM Studio, llama.cpp), and migrates the Gemini-CLI classifier to route through the same client when configured. Two-layer split: internal/llm.Client is the workhorse (HTTP, no Pool, no DB) used directly by the classifier and any future internal helper that needs cheap reasoning. internal/executor.LocalRunner is a thin adapter implementing Runner for user-facing tasks. This avoids Pool reentrancy/deadlock when sub-second internal calls fire from inside Pool.execute(). Highlights: - internal/retry: relocated runWithBackoff/IsRateLimitError/ParseRetryAfter into a shared package reused by executor and llm. - internal/llm: Chat (non-streaming) and ChatStream (SSE) over /chat/completions with optional bearer auth, json_object response format, retry on 429/503, Retry-After parsing. - internal/executor/LocalRunner: streams deltas into stdout.log in the same stream-json envelope ClaudeRunner emits, then writes one consolidated assistant block plus a result terminator so existing parsers (extractSummary, ParseChangestatFromOutput) work unchanged. - internal/executor/Classifier: gains optional LLM field; uses json_object response format (no markdown-fence cleanup needed). Falls back to Gemini-CLI subprocess when LLM is nil. - Pool.skipClassification: now skips only when the requested agent type is registered, so unknown types still reach the load balancer. - Storage: additive tokens_in/tokens_out ALTERs on executions; CLI runners record cost_usd as before, LocalRunner records 0 + tokens. - Config: [local_model] section (endpoint, model, timeout_seconds, default_temperature, api_key). Empty endpoint = no LocalRunner registered, classifier falls back to Gemini. Pre-existing test issues fixed in passing: - claude_test.go setupSandbox callsites updated to current signature. - gemini_test.go TestParseGeminiStream skipped (asserts unimplemented GeminiRunner stream-error parsing; tracked separately). Plan: docs/plans/local-oss-runner.md. https://claude.ai/code/session_017Edeq947TpSm1vQTxMhi1J --- internal/storage/db.go | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) (limited to 'internal/storage/db.go') diff --git a/internal/storage/db.go b/internal/storage/db.go index 038480b..c871c77 100644 --- a/internal/storage/db.go +++ b/internal/storage/db.go @@ -86,6 +86,8 @@ func (s *DB) migrate() error { `ALTER TABLE executions ADD COLUMN changestats_json TEXT`, `ALTER TABLE executions ADD COLUMN commits_json TEXT NOT NULL DEFAULT '[]'`, `ALTER TABLE tasks ADD COLUMN elaboration_input TEXT`, + `ALTER TABLE executions ADD COLUMN tokens_in INTEGER`, + `ALTER TABLE executions ADD COLUMN tokens_out INTEGER`, } for _, m := range migrations { if _, err := s.db.Exec(m); err != nil { @@ -403,6 +405,11 @@ type Execution struct { Changestats *task.Changestats // stored as JSON; nil if not yet recorded Commits []task.GitCommit // stored as JSON; empty if no commits + // Token usage for non-CLI runners (e.g. LocalRunner). 0 for Claude/Gemini + // CLI runs which report cost in cost_usd instead. + TokensIn int64 + TokensOut int64 + // In-memory only: set when creating a resume execution, not stored in DB. ResumeSessionID string ResumeAnswer string @@ -430,23 +437,23 @@ func (s *DB) CreateExecution(e *Execution) error { commitsJSON = string(b) } _, err := s.db.Exec(` - INSERT INTO executions (id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + INSERT INTO executions (id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json, tokens_in, tokens_out) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, e.ID, e.TaskID, e.StartTime.UTC(), e.EndTime.UTC(), e.ExitCode, e.Status, - e.StdoutPath, e.StderrPath, e.ArtifactDir, e.CostUSD, e.ErrorMsg, e.SessionID, e.SandboxDir, changestatsJSON, commitsJSON, + e.StdoutPath, e.StderrPath, e.ArtifactDir, e.CostUSD, e.ErrorMsg, e.SessionID, e.SandboxDir, changestatsJSON, commitsJSON, e.TokensIn, e.TokensOut, ) return err } // GetExecution retrieves an execution by ID. func (s *DB) GetExecution(id string) (*Execution, error) { - row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json FROM executions WHERE id = ?`, id) + row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json, tokens_in, tokens_out FROM executions WHERE id = ?`, id) return scanExecution(row) } // ListExecutions returns executions for a task. func (s *DB) ListExecutions(taskID string) ([]*Execution, error) { - rows, err := s.db.Query(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json FROM executions WHERE task_id = ? ORDER BY start_time DESC`, taskID) + rows, err := s.db.Query(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json, tokens_in, tokens_out FROM executions WHERE task_id = ? ORDER BY start_time DESC`, taskID) if err != nil { return nil, err } @@ -465,7 +472,7 @@ func (s *DB) ListExecutions(taskID string) ([]*Execution, error) { // GetLatestExecution returns the most recent execution for a task. func (s *DB) GetLatestExecution(taskID string) (*Execution, error) { - row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json FROM executions WHERE task_id = ? ORDER BY start_time DESC LIMIT 1`, taskID) + row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json, tokens_in, tokens_out FROM executions WHERE task_id = ? ORDER BY start_time DESC LIMIT 1`, taskID) return scanExecution(row) } @@ -650,11 +657,11 @@ func (s *DB) UpdateExecution(e *Execution) error { _, err := s.db.Exec(` UPDATE executions SET end_time = ?, exit_code = ?, status = ?, cost_usd = ?, error_msg = ?, stdout_path = ?, stderr_path = ?, artifact_dir = ?, session_id = ?, sandbox_dir = ?, - changestats_json = ?, commits_json = ? + changestats_json = ?, commits_json = ?, tokens_in = ?, tokens_out = ? WHERE id = ?`, e.EndTime.UTC(), e.ExitCode, e.Status, e.CostUSD, e.ErrorMsg, e.StdoutPath, e.StderrPath, e.ArtifactDir, e.SessionID, e.SandboxDir, - changestatsJSON, commitsJSON, e.ID, + changestatsJSON, commitsJSON, e.TokensIn, e.TokensOut, e.ID, ) return err } @@ -729,13 +736,17 @@ func scanExecution(row scanner) (*Execution, error) { var sandboxDir sql.NullString var changestatsJSON sql.NullString var commitsJSON sql.NullString + var tokensIn sql.NullInt64 + var tokensOut sql.NullInt64 err := row.Scan(&e.ID, &e.TaskID, &e.StartTime, &e.EndTime, &e.ExitCode, &e.Status, - &e.StdoutPath, &e.StderrPath, &e.ArtifactDir, &e.CostUSD, &e.ErrorMsg, &sessionID, &sandboxDir, &changestatsJSON, &commitsJSON) + &e.StdoutPath, &e.StderrPath, &e.ArtifactDir, &e.CostUSD, &e.ErrorMsg, &sessionID, &sandboxDir, &changestatsJSON, &commitsJSON, &tokensIn, &tokensOut) if err != nil { return nil, err } e.SessionID = sessionID.String e.SandboxDir = sandboxDir.String + e.TokensIn = tokensIn.Int64 + e.TokensOut = tokensOut.Int64 if changestatsJSON.Valid && changestatsJSON.String != "" { var cs task.Changestats if err := json.Unmarshal([]byte(changestatsJSON.String), &cs); err != nil { -- cgit v1.2.3 From e7b382bf177cbe518af3d86c3ee6c49344d225f4 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 3 May 2026 08:00:20 +0000 Subject: chore: close deferred work — real GeminiRunner, Local UI option, db.go cleanup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the three items left on the deferred queue after the post-epic cleanup. GeminiRunner.execOnce now actually executes the gemini binary instead of writing hardcoded stream data. Mirrors ClaudeRunner.execOnce: - exec.CommandContext with the same env vars (CLAUDOMATOR_API_URL etc.) - process group SIGKILL on context cancel - stdout piped through parseGeminiStream → stdoutFile - stderr to file - exit codes captured, stderr tail surfaced on failure Test infrastructure bug uncovered in passing: testServerWithGeminiMockRunner's mock script used double-quoted echo with literal triple-backticks, which bash interpreted as command substitution. The script always produced empty output. The bug was invisible until now because GeminiRunner ignored the script entirely. Switched to a single-quoted heredoc. Frontend: index.html dropdown gains a "Local" option. No JS branching needed — the value flows through to agent.type verbatim and downstream display reads the type string as-is. storage/db.go: removed stale debug-comment scaffolding (the "TODO: Replace with proper logger" block) that was tracking a dead `fmt.Printf` call. The path it commented on is fine without logging — unmarshal errors are returned wrapped. Test status: `go test -race ./...` green across every package, zero skips, zero excluded tests. https://claude.ai/code/session_017Edeq947TpSm1vQTxMhi1J --- docs/plans/local-oss-runner.md | 9 +++++++ internal/api/server_test.go | 23 ++++++++-------- internal/executor/gemini.go | 61 +++++++++++++++++++++++++++++++----------- internal/storage/db.go | 5 ---- web/index.html | 1 + 5 files changed, 67 insertions(+), 32 deletions(-) (limited to 'internal/storage/db.go') diff --git a/docs/plans/local-oss-runner.md b/docs/plans/local-oss-runner.md index 4d5cb87..4504bbb 100644 --- a/docs/plans/local-oss-runner.md +++ b/docs/plans/local-oss-runner.md @@ -222,6 +222,15 @@ Items not chased (deferred deliberately): - **Frontend "Local" agent option** — UI dropdown still says "Auto / Claude / Gemini". Pending token telemetry surface. - **`storage.db.go:706` TODO comment** — minor logger plumbing nit. Skipping unless it blocks something. +## Deferred work — DONE + +Follow-up commit closed the three deferred items above: + +- `GeminiRunner.execOnce` now invokes the actual `gemini` binary via `exec.CommandContext`, mirroring the `ClaudeRunner` pattern: pipe stdout to `parseGeminiStream`, kill the process group on context cancel, capture stderr to file, surface exit codes. Hardcoded simulation removed. +- Test infrastructure bug uncovered and fixed in passing: the mock gemini script in `testServerWithGeminiMockRunner` was using `"\``json\`"` which bash interpreted as command substitution, so the script always produced empty output. Switched to a single-quoted heredoc. The bug was masked previously because the runner ignored the script entirely. +- Frontend `index.html` dropdown gains a `Local` option. No JS branching changes needed — the value flows through to `agent.type` verbatim and downstream display reads the type string as-is. +- Stale debug-comment scaffolding around `storage.db.go:706` deleted. + --- # Phase 2 — Focused Plan (Elaboration) diff --git a/internal/api/server_test.go b/internal/api/server_test.go index 516e289..2139e36 100644 --- a/internal/api/server_test.go +++ b/internal/api/server_test.go @@ -143,20 +143,21 @@ func testServerWithGeminiMockRunner(t *testing.T) (*Server, *storage.DB) { logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug})) - // Create the mock gemini binary script. + // Create the mock gemini binary script. Use single-quoted heredoc so + // bash does not try to evaluate the literal backticks as command + // substitution. mockBinDir := t.TempDir() mockGeminiPath := filepath.Join(mockBinDir, "mock-gemini-binary.sh") mockScriptContent := `#!/bin/bash -OUTPUT_FILE=$(mktemp) -echo "` + "```json" + `" > "$OUTPUT_FILE" -echo "{\"type\":\"content_block_start\",\"content_block\":{\"text\":\"Hello, Gemini!\",\"type\":\"text\"}}" >> "$OUTPUT_FILE" -echo "{\"type\":\"content_block_delta\",\"content_block\":{\"text\":\" How are you?\"}}" >> "$OUTPUT_FILE" -echo "{\"type\":\"content_block_end\"}" >> "$OUTPUT_FILE" -echo "{\"type\":\"message_delta\",\"message\":{\"role\":\"model\"}}" >> "$OUTPUT_FILE" -echo "{\"type\":\"message_end\"}" >> "$OUTPUT_FILE" -echo "` + "```" + `" >> "$OUTPUT_FILE" -cat "$OUTPUT_FILE" -rm "$OUTPUT_FILE" +cat <<'EOF' +` + "```json" + ` +{"type":"content_block_start","content_block":{"text":"Hello, Gemini!","type":"text"}} +{"type":"content_block_delta","content_block":{"text":" How are you?"}} +{"type":"content_block_end"} +{"type":"message_delta","message":{"role":"model"}} +{"type":"message_end"} +` + "```" + ` +EOF exit 0 ` if err := os.WriteFile(mockGeminiPath, []byte(mockScriptContent), 0755); err != nil { diff --git a/internal/executor/gemini.go b/internal/executor/gemini.go index 7f2f54f..04382ae 100644 --- a/internal/executor/gemini.go +++ b/internal/executor/gemini.go @@ -7,9 +7,11 @@ import ( "io" "log/slog" "os" + "os/exec" "path/filepath" "strings" "sync" + "syscall" "github.com/thepeterstone/claudomator/internal/storage" "github.com/thepeterstone/claudomator/internal/task" @@ -84,8 +86,18 @@ func (r *GeminiRunner) Run(ctx context.Context, t *task.Task, e *storage.Executi } func (r *GeminiRunner) execOnce(ctx context.Context, args []string, workingDir, projectDir string, e *storage.Execution) error { - // Temporarily bypass external command execution to debug pipe. - // We will simulate outputting to stdoutW directly. + cmd := exec.CommandContext(ctx, r.binaryPath(), args...) + cmd.Env = append(os.Environ(), + "CLAUDOMATOR_API_URL="+r.APIURL, + "CLAUDOMATOR_TASK_ID="+e.TaskID, + "CLAUDOMATOR_PROJECT_DIR="+projectDir, + "CLAUDOMATOR_QUESTION_FILE="+filepath.Join(e.ArtifactDir, "question.json"), + "CLAUDOMATOR_SUMMARY_FILE="+filepath.Join(e.ArtifactDir, "summary.txt"), + ) + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + if workingDir != "" { + cmd.Dir = workingDir + } stdoutFile, err := os.Create(e.StdoutPath) if err != nil { @@ -103,22 +115,27 @@ func (r *GeminiRunner) execOnce(ctx context.Context, args []string, workingDir, if err != nil { return fmt.Errorf("creating stdout pipe: %w", err) } + cmd.Stdout = stdoutW + cmd.Stderr = stderrFile - // Simulate writing to stdoutW + if err := cmd.Start(); err != nil { + stdoutW.Close() + stdoutR.Close() + return fmt.Errorf("starting gemini: %w", err) + } + stdoutW.Close() + + killDone := make(chan struct{}) go func() { - defer stdoutW.Close() // Close the writer when done. - fmt.Fprintf(stdoutW, "```json\n") - fmt.Fprintf(stdoutW, "{\"type\":\"content_block_start\",\"content_block\":{\"text\":\"Hello, Gemini!\",\"type\":\"text\"}}\n") - fmt.Fprintf(stdoutW, "{\"type\":\"content_block_delta\",\"content_block\":{\"text\":\" How are you?\"}}\n") - fmt.Fprintf(stdoutW, "{\"type\":\"content_block_end\"}\n") - fmt.Fprintf(stdoutW, "{\"type\":\"message_delta\",\"message\":{\"role\":\"model\"}}\n") - fmt.Fprintf(stdoutW, "{\"type\":\"message_end\"}\n") - fmt.Fprintf(stdoutW, "```\n") + select { + case <-ctx.Done(): + syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) + case <-killDone: + } }() - - var streamErr error var streamCost float64 + var streamErr error var wg sync.WaitGroup wg.Add(1) go func() { @@ -127,14 +144,26 @@ func (r *GeminiRunner) execOnce(ctx context.Context, args []string, workingDir, stdoutR.Close() }() - wg.Wait() // Wait for parseGeminiStream to finish + waitErr := cmd.Wait() + close(killDone) + wg.Wait() if streamCost > 0 { e.CostUSD = streamCost } - // Set a dummy exit code for this simulated run - e.ExitCode = 0 + if waitErr != nil { + if exitErr, ok := waitErr.(*exec.ExitError); ok { + e.ExitCode = exitErr.ExitCode() + } + if streamErr != nil { + return streamErr + } + if tail := tailFile(e.StderrPath, 20); tail != "" { + return fmt.Errorf("gemini exited with error: %w\nstderr:\n%s", waitErr, tail) + } + return fmt.Errorf("gemini exited with error: %w", waitErr) + } if streamErr != nil { return streamErr diff --git a/internal/storage/db.go b/internal/storage/db.go index c871c77..ce60e2f 100644 --- a/internal/storage/db.go +++ b/internal/storage/db.go @@ -699,11 +699,6 @@ func scanTask(row scanner) (*task.Task, error) { t.State = task.State(state) t.Priority = task.Priority(priority) t.Timeout.Duration = time.Duration(timeoutNS) - // Add debug log for configJSON - // The logger is not available directly in db.go, so I'll use fmt.Printf for now. - // For production code, a logger should be injected. - // fmt.Printf("DEBUG: configJSON from DB: %s\n", configJSON) - // TODO: Replace with proper logger when available. if err := json.Unmarshal([]byte(configJSON), &t.Agent); err != nil { return nil, fmt.Errorf("unmarshaling agent config: %w", err) } diff --git a/web/index.html b/web/index.html index 1746baf..7c0b030 100644 --- a/web/index.html +++ b/web/index.html @@ -16,6 +16,7 @@ + -- cgit v1.2.3