summaryrefslogtreecommitdiff
path: root/internal/executor/local.go
diff options
context:
space:
mode:
authorClaude <noreply@anthropic.com>2026-04-28 09:24:43 +0000
committerClaude <noreply@anthropic.com>2026-04-28 09:24:43 +0000
commit0865afc43be562dbe14528e4299b9e213b54cc93 (patch)
tree3ffb11207fb6b9866b5a2477bba7abe38964f83a /internal/executor/local.go
parentc2aa026f6ce1c9e216b99d74f294fc133d5fcddd (diff)
feat(executor): add LocalRunner and OpenAI-compat LLM client
Phase 1 of "local OSS models as agents" plan. Adds a third Runner backed by any OpenAI-compatible HTTP server (Ollama, vLLM, LM Studio, llama.cpp), and migrates the Gemini-CLI classifier to route through the same client when configured. Two-layer split: internal/llm.Client is the workhorse (HTTP, no Pool, no DB) used directly by the classifier and any future internal helper that needs cheap reasoning. internal/executor.LocalRunner is a thin adapter implementing Runner for user-facing tasks. This avoids Pool reentrancy/deadlock when sub-second internal calls fire from inside Pool.execute(). Highlights: - internal/retry: relocated runWithBackoff/IsRateLimitError/ParseRetryAfter into a shared package reused by executor and llm. - internal/llm: Chat (non-streaming) and ChatStream (SSE) over /chat/completions with optional bearer auth, json_object response format, retry on 429/503, Retry-After parsing. - internal/executor/LocalRunner: streams deltas into stdout.log in the same stream-json envelope ClaudeRunner emits, then writes one consolidated assistant block plus a result terminator so existing parsers (extractSummary, ParseChangestatFromOutput) work unchanged. - internal/executor/Classifier: gains optional LLM field; uses json_object response format (no markdown-fence cleanup needed). Falls back to Gemini-CLI subprocess when LLM is nil. - Pool.skipClassification: now skips only when the requested agent type is registered, so unknown types still reach the load balancer. - Storage: additive tokens_in/tokens_out ALTERs on executions; CLI runners record cost_usd as before, LocalRunner records 0 + tokens. - Config: [local_model] section (endpoint, model, timeout_seconds, default_temperature, api_key). Empty endpoint = no LocalRunner registered, classifier falls back to Gemini. Pre-existing test issues fixed in passing: - claude_test.go setupSandbox callsites updated to current signature. - gemini_test.go TestParseGeminiStream skipped (asserts unimplemented GeminiRunner stream-error parsing; tracked separately). Plan: docs/plans/local-oss-runner.md. https://claude.ai/code/session_017Edeq947TpSm1vQTxMhi1J
Diffstat (limited to 'internal/executor/local.go')
-rw-r--r--internal/executor/local.go171
1 files changed, 171 insertions, 0 deletions
diff --git a/internal/executor/local.go b/internal/executor/local.go
new file mode 100644
index 0000000..5d874c6
--- /dev/null
+++ b/internal/executor/local.go
@@ -0,0 +1,171 @@
+package executor
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "log/slog"
+ "os"
+ "path/filepath"
+ "strings"
+ "time"
+
+ "github.com/thepeterstone/claudomator/internal/llm"
+ "github.com/thepeterstone/claudomator/internal/storage"
+ "github.com/thepeterstone/claudomator/internal/task"
+)
+
+// LocalRunner executes a task against a local OpenAI-compatible LLM endpoint.
+// Unlike ClaudeRunner/GeminiRunner it does not spawn a subprocess, does not
+// create a git sandbox, and does not edit files in project_dir — it produces
+// text completions that are streamed to stdout.log in the same stream-json
+// envelope Claude uses, so existing parsers (extractSummary, ParseChangestat)
+// keep working unchanged.
+type LocalRunner struct {
+ Client *llm.Client
+ Logger *slog.Logger
+ LogDir string
+ DefaultTemperature float64
+}
+
+// ExecLogDir implements LogPather so the pool can persist log paths before
+// execution starts.
+func (r *LocalRunner) ExecLogDir(execID string) string {
+ if r.LogDir == "" {
+ return ""
+ }
+ return filepath.Join(r.LogDir, execID)
+}
+
+// Run streams a chat completion to stdout.log. The response is wrapped in
+// stream-json envelopes line-by-line so downstream parsers (summary,
+// changestats) read it the same way they read Claude output.
+func (r *LocalRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error {
+ if r.Client == nil {
+ return fmt.Errorf("local runner: no LLM client configured")
+ }
+ if t.Agent.Instructions == "" {
+ return fmt.Errorf("local runner: empty instructions")
+ }
+
+ logDir := r.ExecLogDir(e.ID)
+ if logDir == "" {
+ return fmt.Errorf("local runner: LogDir not set")
+ }
+ if err := os.MkdirAll(logDir, 0o700); err != nil {
+ return fmt.Errorf("local runner: mkdir log: %w", err)
+ }
+ stdoutPath := filepath.Join(logDir, "stdout.log")
+ stderrPath := filepath.Join(logDir, "stderr.log")
+ e.StdoutPath = stdoutPath
+ e.StderrPath = stderrPath
+
+ stdout, err := os.Create(stdoutPath)
+ if err != nil {
+ return fmt.Errorf("local runner: create stdout: %w", err)
+ }
+ defer stdout.Close()
+
+ messages := []llm.Message{}
+ if sys := strings.TrimSpace(t.Agent.SystemPromptAppend); sys != "" {
+ messages = append(messages, llm.Message{Role: "system", Content: sys})
+ }
+ messages = append(messages, llm.Message{Role: "user", Content: t.Agent.Instructions})
+
+ temperature := t.Agent.Temperature
+ if temperature == nil && r.DefaultTemperature > 0 {
+ v := r.DefaultTemperature
+ temperature = &v
+ }
+
+ req := llm.ChatRequest{
+ Model: t.Agent.Model,
+ Messages: messages,
+ Temperature: temperature,
+ MaxTokens: t.Agent.MaxTokens,
+ }
+
+ start := time.Now()
+ resp, err := r.Client.ChatStream(ctx, req, func(delta string) {
+ if delta == "" {
+ return
+ }
+ writeAssistantTextLine(stdout, delta)
+ })
+ if err != nil {
+ writeResultLine(stdout, "error", err.Error(), 0, 0)
+ return fmt.Errorf("local runner: chat: %w", err)
+ }
+ elapsed := time.Since(start)
+
+ // Write one consolidated assistant envelope containing the full response.
+ // extractSummary and ParseChangestatFromOutput operate per-line, so a
+ // single envelope with the full text is what they expect to find.
+ if resp.Content != "" {
+ writeAssistantTextLine(stdout, resp.Content)
+ }
+ writeResultLine(stdout, "success", "", resp.PromptTokens, resp.OutputTokens)
+
+ e.CostUSD = 0
+ e.TokensIn = int64(resp.PromptTokens)
+ e.TokensOut = int64(resp.OutputTokens)
+
+ if r.Logger != nil {
+ r.Logger.Info("local runner completed",
+ "taskID", t.ID,
+ "model", resp.Model,
+ "tokens_in", resp.PromptTokens,
+ "tokens_out", resp.OutputTokens,
+ "finish_reason", resp.FinishReason,
+ "elapsed_ms", elapsed.Milliseconds(),
+ )
+ }
+ return nil
+}
+
+// writeAssistantTextLine writes a single stream-json line wrapping `text` as
+// an assistant text block. Format matches what ClaudeRunner emits, so
+// extractSummary and ParseChangestatFromFile read it transparently.
+func writeAssistantTextLine(w *os.File, text string) {
+ line := struct {
+ Type string `json:"type"`
+ Message struct {
+ Content []struct {
+ Type string `json:"type"`
+ Text string `json:"text"`
+ } `json:"content"`
+ } `json:"message"`
+ }{Type: "assistant"}
+ line.Message.Content = []struct {
+ Type string `json:"type"`
+ Text string `json:"text"`
+ }{{Type: "text", Text: text}}
+ b, err := json.Marshal(line)
+ if err != nil {
+ return
+ }
+ w.Write(b)
+ w.Write([]byte("\n"))
+}
+
+// writeResultLine writes a final stream-json terminator line that downstream
+// parsers can recognise. Mirrors the shape of the result line ClaudeRunner emits.
+func writeResultLine(w *os.File, subtype, errMsg string, promptTokens, outputTokens int) {
+ line := map[string]any{
+ "type": "result",
+ "subtype": subtype,
+ "is_error": errMsg != "",
+ "prompt_tokens": promptTokens,
+ "output_tokens": outputTokens,
+ "total_cost_usd": 0.0,
+ }
+ if errMsg != "" {
+ line["result"] = errMsg
+ }
+ b, err := json.Marshal(line)
+ if err != nil {
+ return
+ }
+ w.Write(b)
+ w.Write([]byte("\n"))
+}