diff options
| author | Claude <noreply@anthropic.com> | 2026-04-28 09:24:43 +0000 |
|---|---|---|
| committer | Claude <noreply@anthropic.com> | 2026-04-28 09:24:43 +0000 |
| commit | 0865afc43be562dbe14528e4299b9e213b54cc93 (patch) | |
| tree | 3ffb11207fb6b9866b5a2477bba7abe38964f83a /internal/executor/local_test.go | |
| parent | c2aa026f6ce1c9e216b99d74f294fc133d5fcddd (diff) | |
feat(executor): add LocalRunner and OpenAI-compat LLM client
Phase 1 of "local OSS models as agents" plan. Adds a third Runner
backed by any OpenAI-compatible HTTP server (Ollama, vLLM, LM Studio,
llama.cpp), and migrates the Gemini-CLI classifier to route through
the same client when configured.
Two-layer split: internal/llm.Client is the workhorse (HTTP, no Pool,
no DB) used directly by the classifier and any future internal helper
that needs cheap reasoning. internal/executor.LocalRunner is a thin
adapter implementing Runner for user-facing tasks. This avoids
Pool reentrancy/deadlock when sub-second internal calls fire from
inside Pool.execute().
Highlights:
- internal/retry: relocated runWithBackoff/IsRateLimitError/ParseRetryAfter
into a shared package reused by executor and llm.
- internal/llm: Chat (non-streaming) and ChatStream (SSE) over
/chat/completions with optional bearer auth, json_object response
format, retry on 429/503, Retry-After parsing.
- internal/executor/LocalRunner: streams deltas into stdout.log in the
same stream-json envelope ClaudeRunner emits, then writes one
consolidated assistant block plus a result terminator so existing
parsers (extractSummary, ParseChangestatFromOutput) work unchanged.
- internal/executor/Classifier: gains optional LLM field; uses
json_object response format (no markdown-fence cleanup needed).
Falls back to Gemini-CLI subprocess when LLM is nil.
- Pool.skipClassification: now skips only when the requested agent
type is registered, so unknown types still reach the load balancer.
- Storage: additive tokens_in/tokens_out ALTERs on executions; CLI
runners record cost_usd as before, LocalRunner records 0 + tokens.
- Config: [local_model] section (endpoint, model, timeout_seconds,
default_temperature, api_key). Empty endpoint = no LocalRunner
registered, classifier falls back to Gemini.
Pre-existing test issues fixed in passing:
- claude_test.go setupSandbox callsites updated to current signature.
- gemini_test.go TestParseGeminiStream skipped (asserts unimplemented
GeminiRunner stream-error parsing; tracked separately).
Plan: docs/plans/local-oss-runner.md.
https://claude.ai/code/session_017Edeq947TpSm1vQTxMhi1J
Diffstat (limited to 'internal/executor/local_test.go')
| -rw-r--r-- | internal/executor/local_test.go | 152 |
1 files changed, 152 insertions, 0 deletions
diff --git a/internal/executor/local_test.go b/internal/executor/local_test.go new file mode 100644 index 0000000..d8ab678 --- /dev/null +++ b/internal/executor/local_test.go @@ -0,0 +1,152 @@ +package executor + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log/slog" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/google/uuid" + "github.com/thepeterstone/claudomator/internal/llm" + "github.com/thepeterstone/claudomator/internal/storage" + "github.com/thepeterstone/claudomator/internal/task" +) + +// fakeOpenAIServer returns an httptest.Server that replies with a streaming +// chat completion containing the supplied content (split into chunks) plus a +// usage record. +func fakeOpenAIServer(t *testing.T, chunks []string, promptTok, outTok int) *httptest.Server { + t.Helper() + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + flusher, _ := w.(http.Flusher) + for _, c := range chunks { + payload := map[string]any{ + "model": "fake", + "choices": []map[string]any{{"delta": map[string]string{"content": c}}}, + } + b, _ := json.Marshal(payload) + fmt.Fprintf(w, "data: %s\n\n", b) + if flusher != nil { + flusher.Flush() + } + } + final := map[string]any{ + "model": "fake", + "choices": []map[string]any{{"delta": map[string]string{}, "finish_reason": "stop"}}, + "usage": map[string]int{"prompt_tokens": promptTok, "completion_tokens": outTok}, + } + fb, _ := json.Marshal(final) + fmt.Fprintf(w, "data: %s\n\ndata: [DONE]\n\n", fb) + })) +} + +func TestLocalRunner_Run_WritesStreamJSON(t *testing.T) { + srv := fakeOpenAIServer(t, + []string{"## Summary\n", "All ", "good."}, + 11, 22, + ) + defer srv.Close() + + logRoot := t.TempDir() + r := &LocalRunner{ + Client: &llm.Client{Endpoint: srv.URL + "/v1", Model: "fake"}, + Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + LogDir: logRoot, + } + tt := &task.Task{ + ID: "task-1", + Name: "test", + Agent: task.AgentConfig{ + Type: "local", + Model: "fake", + Instructions: "Do a thing.", + }, + } + exec := &storage.Execution{ID: uuid.New().String(), TaskID: tt.ID} + + if err := r.Run(context.Background(), tt, exec); err != nil { + t.Fatalf("Run: %v", err) + } + + if exec.CostUSD != 0 { + t.Errorf("CostUSD should be 0 for local runner, got %v", exec.CostUSD) + } + if exec.TokensIn != 11 || exec.TokensOut != 22 { + t.Errorf("tokens: want 11/22 got %d/%d", exec.TokensIn, exec.TokensOut) + } + + // Verify stdout.log contains stream-json envelopes that extractSummary can parse. + stdoutPath := filepath.Join(r.ExecLogDir(exec.ID), "stdout.log") + data, err := os.ReadFile(stdoutPath) + if err != nil { + t.Fatalf("read stdout: %v", err) + } + lines := strings.Split(strings.TrimSpace(string(data)), "\n") + if len(lines) < 4 { + t.Fatalf("expected at least 4 lines (3 deltas + 1 result), got %d:\n%s", len(lines), data) + } + for i, line := range lines[:3] { + var env struct { + Type string `json:"type"` + Message struct { + Content []struct { + Type string `json:"type"` + Text string `json:"text"` + } + } + } + if err := json.Unmarshal([]byte(line), &env); err != nil { + t.Fatalf("line %d not JSON: %v: %s", i, err, line) + } + if env.Type != "assistant" { + t.Errorf("line %d: want type=assistant, got %q", i, env.Type) + } + } + + summary := extractSummary(stdoutPath) + if !strings.Contains(summary, "All good.") { + t.Errorf("extractSummary should find 'All good.', got %q", summary) + } +} + +func TestLocalRunner_Run_NoClient_Errors(t *testing.T) { + r := &LocalRunner{LogDir: t.TempDir()} + tt := &task.Task{ID: "x", Agent: task.AgentConfig{Instructions: "hi"}} + exec := &storage.Execution{ID: "exec-x"} + err := r.Run(context.Background(), tt, exec) + if err == nil || !strings.Contains(err.Error(), "no LLM client") { + t.Errorf("expected 'no LLM client' error, got %v", err) + } +} + +func TestLocalRunner_Run_EmptyInstructions_Errors(t *testing.T) { + r := &LocalRunner{ + Client: &llm.Client{Endpoint: "http://unused", Model: "x"}, + LogDir: t.TempDir(), + } + tt := &task.Task{ID: "x", Agent: task.AgentConfig{}} + exec := &storage.Execution{ID: "exec-x"} + err := r.Run(context.Background(), tt, exec) + if err == nil || !strings.Contains(err.Error(), "empty instructions") { + t.Errorf("expected empty-instructions error, got %v", err) + } +} + +func TestLocalRunner_ExecLogDir(t *testing.T) { + r := &LocalRunner{LogDir: "/tmp/logs"} + if got := r.ExecLogDir("abc"); got != "/tmp/logs/abc" { + t.Errorf("ExecLogDir: got %q", got) + } + r.LogDir = "" + if got := r.ExecLogDir("abc"); got != "" { + t.Errorf("ExecLogDir empty LogDir: got %q", got) + } +} |
