diff options
| author | Claude <noreply@anthropic.com> | 2026-04-28 09:24:43 +0000 |
|---|---|---|
| committer | Claude <noreply@anthropic.com> | 2026-04-28 09:24:43 +0000 |
| commit | 0865afc43be562dbe14528e4299b9e213b54cc93 (patch) | |
| tree | 3ffb11207fb6b9866b5a2477bba7abe38964f83a /internal/executor/local.go | |
| parent | c2aa026f6ce1c9e216b99d74f294fc133d5fcddd (diff) | |
feat(executor): add LocalRunner and OpenAI-compat LLM client
Phase 1 of "local OSS models as agents" plan. Adds a third Runner
backed by any OpenAI-compatible HTTP server (Ollama, vLLM, LM Studio,
llama.cpp), and migrates the Gemini-CLI classifier to route through
the same client when configured.
Two-layer split: internal/llm.Client is the workhorse (HTTP, no Pool,
no DB) used directly by the classifier and any future internal helper
that needs cheap reasoning. internal/executor.LocalRunner is a thin
adapter implementing Runner for user-facing tasks. This avoids
Pool reentrancy/deadlock when sub-second internal calls fire from
inside Pool.execute().
Highlights:
- internal/retry: relocated runWithBackoff/IsRateLimitError/ParseRetryAfter
into a shared package reused by executor and llm.
- internal/llm: Chat (non-streaming) and ChatStream (SSE) over
/chat/completions with optional bearer auth, json_object response
format, retry on 429/503, Retry-After parsing.
- internal/executor/LocalRunner: streams deltas into stdout.log in the
same stream-json envelope ClaudeRunner emits, then writes one
consolidated assistant block plus a result terminator so existing
parsers (extractSummary, ParseChangestatFromOutput) work unchanged.
- internal/executor/Classifier: gains optional LLM field; uses
json_object response format (no markdown-fence cleanup needed).
Falls back to Gemini-CLI subprocess when LLM is nil.
- Pool.skipClassification: now skips only when the requested agent
type is registered, so unknown types still reach the load balancer.
- Storage: additive tokens_in/tokens_out ALTERs on executions; CLI
runners record cost_usd as before, LocalRunner records 0 + tokens.
- Config: [local_model] section (endpoint, model, timeout_seconds,
default_temperature, api_key). Empty endpoint = no LocalRunner
registered, classifier falls back to Gemini.
Pre-existing test issues fixed in passing:
- claude_test.go setupSandbox callsites updated to current signature.
- gemini_test.go TestParseGeminiStream skipped (asserts unimplemented
GeminiRunner stream-error parsing; tracked separately).
Plan: docs/plans/local-oss-runner.md.
https://claude.ai/code/session_017Edeq947TpSm1vQTxMhi1J
Diffstat (limited to 'internal/executor/local.go')
| -rw-r--r-- | internal/executor/local.go | 171 |
1 files changed, 171 insertions, 0 deletions
diff --git a/internal/executor/local.go b/internal/executor/local.go new file mode 100644 index 0000000..5d874c6 --- /dev/null +++ b/internal/executor/local.go @@ -0,0 +1,171 @@ +package executor + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "os" + "path/filepath" + "strings" + "time" + + "github.com/thepeterstone/claudomator/internal/llm" + "github.com/thepeterstone/claudomator/internal/storage" + "github.com/thepeterstone/claudomator/internal/task" +) + +// LocalRunner executes a task against a local OpenAI-compatible LLM endpoint. +// Unlike ClaudeRunner/GeminiRunner it does not spawn a subprocess, does not +// create a git sandbox, and does not edit files in project_dir — it produces +// text completions that are streamed to stdout.log in the same stream-json +// envelope Claude uses, so existing parsers (extractSummary, ParseChangestat) +// keep working unchanged. +type LocalRunner struct { + Client *llm.Client + Logger *slog.Logger + LogDir string + DefaultTemperature float64 +} + +// ExecLogDir implements LogPather so the pool can persist log paths before +// execution starts. +func (r *LocalRunner) ExecLogDir(execID string) string { + if r.LogDir == "" { + return "" + } + return filepath.Join(r.LogDir, execID) +} + +// Run streams a chat completion to stdout.log. The response is wrapped in +// stream-json envelopes line-by-line so downstream parsers (summary, +// changestats) read it the same way they read Claude output. +func (r *LocalRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error { + if r.Client == nil { + return fmt.Errorf("local runner: no LLM client configured") + } + if t.Agent.Instructions == "" { + return fmt.Errorf("local runner: empty instructions") + } + + logDir := r.ExecLogDir(e.ID) + if logDir == "" { + return fmt.Errorf("local runner: LogDir not set") + } + if err := os.MkdirAll(logDir, 0o700); err != nil { + return fmt.Errorf("local runner: mkdir log: %w", err) + } + stdoutPath := filepath.Join(logDir, "stdout.log") + stderrPath := filepath.Join(logDir, "stderr.log") + e.StdoutPath = stdoutPath + e.StderrPath = stderrPath + + stdout, err := os.Create(stdoutPath) + if err != nil { + return fmt.Errorf("local runner: create stdout: %w", err) + } + defer stdout.Close() + + messages := []llm.Message{} + if sys := strings.TrimSpace(t.Agent.SystemPromptAppend); sys != "" { + messages = append(messages, llm.Message{Role: "system", Content: sys}) + } + messages = append(messages, llm.Message{Role: "user", Content: t.Agent.Instructions}) + + temperature := t.Agent.Temperature + if temperature == nil && r.DefaultTemperature > 0 { + v := r.DefaultTemperature + temperature = &v + } + + req := llm.ChatRequest{ + Model: t.Agent.Model, + Messages: messages, + Temperature: temperature, + MaxTokens: t.Agent.MaxTokens, + } + + start := time.Now() + resp, err := r.Client.ChatStream(ctx, req, func(delta string) { + if delta == "" { + return + } + writeAssistantTextLine(stdout, delta) + }) + if err != nil { + writeResultLine(stdout, "error", err.Error(), 0, 0) + return fmt.Errorf("local runner: chat: %w", err) + } + elapsed := time.Since(start) + + // Write one consolidated assistant envelope containing the full response. + // extractSummary and ParseChangestatFromOutput operate per-line, so a + // single envelope with the full text is what they expect to find. + if resp.Content != "" { + writeAssistantTextLine(stdout, resp.Content) + } + writeResultLine(stdout, "success", "", resp.PromptTokens, resp.OutputTokens) + + e.CostUSD = 0 + e.TokensIn = int64(resp.PromptTokens) + e.TokensOut = int64(resp.OutputTokens) + + if r.Logger != nil { + r.Logger.Info("local runner completed", + "taskID", t.ID, + "model", resp.Model, + "tokens_in", resp.PromptTokens, + "tokens_out", resp.OutputTokens, + "finish_reason", resp.FinishReason, + "elapsed_ms", elapsed.Milliseconds(), + ) + } + return nil +} + +// writeAssistantTextLine writes a single stream-json line wrapping `text` as +// an assistant text block. Format matches what ClaudeRunner emits, so +// extractSummary and ParseChangestatFromFile read it transparently. +func writeAssistantTextLine(w *os.File, text string) { + line := struct { + Type string `json:"type"` + Message struct { + Content []struct { + Type string `json:"type"` + Text string `json:"text"` + } `json:"content"` + } `json:"message"` + }{Type: "assistant"} + line.Message.Content = []struct { + Type string `json:"type"` + Text string `json:"text"` + }{{Type: "text", Text: text}} + b, err := json.Marshal(line) + if err != nil { + return + } + w.Write(b) + w.Write([]byte("\n")) +} + +// writeResultLine writes a final stream-json terminator line that downstream +// parsers can recognise. Mirrors the shape of the result line ClaudeRunner emits. +func writeResultLine(w *os.File, subtype, errMsg string, promptTokens, outputTokens int) { + line := map[string]any{ + "type": "result", + "subtype": subtype, + "is_error": errMsg != "", + "prompt_tokens": promptTokens, + "output_tokens": outputTokens, + "total_cost_usd": 0.0, + } + if errMsg != "" { + line["result"] = errMsg + } + b, err := json.Marshal(line) + if err != nil { + return + } + w.Write(b) + w.Write([]byte("\n")) +} |
