diff options
| author | Claude <noreply@anthropic.com> | 2026-04-28 09:24:43 +0000 |
|---|---|---|
| committer | Claude <noreply@anthropic.com> | 2026-04-28 09:24:43 +0000 |
| commit | 0865afc43be562dbe14528e4299b9e213b54cc93 (patch) | |
| tree | 3ffb11207fb6b9866b5a2477bba7abe38964f83a /internal/llm/client_test.go | |
| parent | c2aa026f6ce1c9e216b99d74f294fc133d5fcddd (diff) | |
feat(executor): add LocalRunner and OpenAI-compat LLM client
Phase 1 of "local OSS models as agents" plan. Adds a third Runner
backed by any OpenAI-compatible HTTP server (Ollama, vLLM, LM Studio,
llama.cpp), and migrates the Gemini-CLI classifier to route through
the same client when configured.
Two-layer split: internal/llm.Client is the workhorse (HTTP, no Pool,
no DB) used directly by the classifier and any future internal helper
that needs cheap reasoning. internal/executor.LocalRunner is a thin
adapter implementing Runner for user-facing tasks. This avoids
Pool reentrancy/deadlock when sub-second internal calls fire from
inside Pool.execute().
Highlights:
- internal/retry: relocated runWithBackoff/IsRateLimitError/ParseRetryAfter
into a shared package reused by executor and llm.
- internal/llm: Chat (non-streaming) and ChatStream (SSE) over
/chat/completions with optional bearer auth, json_object response
format, retry on 429/503, Retry-After parsing.
- internal/executor/LocalRunner: streams deltas into stdout.log in the
same stream-json envelope ClaudeRunner emits, then writes one
consolidated assistant block plus a result terminator so existing
parsers (extractSummary, ParseChangestatFromOutput) work unchanged.
- internal/executor/Classifier: gains optional LLM field; uses
json_object response format (no markdown-fence cleanup needed).
Falls back to Gemini-CLI subprocess when LLM is nil.
- Pool.skipClassification: now skips only when the requested agent
type is registered, so unknown types still reach the load balancer.
- Storage: additive tokens_in/tokens_out ALTERs on executions; CLI
runners record cost_usd as before, LocalRunner records 0 + tokens.
- Config: [local_model] section (endpoint, model, timeout_seconds,
default_temperature, api_key). Empty endpoint = no LocalRunner
registered, classifier falls back to Gemini.
Pre-existing test issues fixed in passing:
- claude_test.go setupSandbox callsites updated to current signature.
- gemini_test.go TestParseGeminiStream skipped (asserts unimplemented
GeminiRunner stream-error parsing; tracked separately).
Plan: docs/plans/local-oss-runner.md.
https://claude.ai/code/session_017Edeq947TpSm1vQTxMhi1J
Diffstat (limited to 'internal/llm/client_test.go')
| -rw-r--r-- | internal/llm/client_test.go | 159 |
1 files changed, 159 insertions, 0 deletions
diff --git a/internal/llm/client_test.go b/internal/llm/client_test.go new file mode 100644 index 0000000..8257836 --- /dev/null +++ b/internal/llm/client_test.go @@ -0,0 +1,159 @@ +package llm + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/http/httptest" + "strings" + "sync/atomic" + "testing" + "time" +) + +func TestChat_ParsesCompletion(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/v1/chat/completions" { + t.Errorf("unexpected path %q", r.URL.Path) + } + if r.Header.Get("Authorization") != "Bearer test-key" { + t.Errorf("missing/wrong bearer header: %q", r.Header.Get("Authorization")) + } + var body openAIRequest + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + t.Fatalf("decode body: %v", err) + } + if body.Model != "test-model" { + t.Errorf("model: want test-model got %q", body.Model) + } + if len(body.Messages) != 1 || body.Messages[0].Content != "hello" { + t.Errorf("messages mismatch: %+v", body.Messages) + } + if body.ResponseFormat == nil || body.ResponseFormat.Type != "json_object" { + t.Errorf("expected response_format json_object, got %+v", body.ResponseFormat) + } + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, `{ + "model": "test-model", + "choices": [{"message": {"role": "assistant", "content": "world"}, "finish_reason": "stop"}], + "usage": {"prompt_tokens": 4, "completion_tokens": 7} + }`) + })) + defer srv.Close() + + c := &Client{Endpoint: srv.URL + "/v1", Model: "test-model", APIKey: "test-key"} + resp, err := c.Chat(context.Background(), ChatRequest{ + Messages: []Message{{Role: "user", Content: "hello"}}, + ResponseJSON: true, + }) + if err != nil { + t.Fatalf("Chat: %v", err) + } + if resp.Content != "world" { + t.Errorf("content: want world got %q", resp.Content) + } + if resp.PromptTokens != 4 || resp.OutputTokens != 7 { + t.Errorf("tokens mismatch: %+v", resp) + } + if resp.FinishReason != "stop" { + t.Errorf("finish_reason: want stop got %q", resp.FinishReason) + } +} + +func TestChatStream_ParsesSSE(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + flusher, _ := w.(http.Flusher) + chunks := []string{ + `{"model":"test-model","choices":[{"delta":{"content":"Hel"},"finish_reason":""}]}`, + `{"model":"test-model","choices":[{"delta":{"content":"lo, "},"finish_reason":""}]}`, + `{"model":"test-model","choices":[{"delta":{"content":"world"},"finish_reason":"stop"}]}`, + `{"model":"test-model","choices":[],"usage":{"prompt_tokens":3,"completion_tokens":5}}`, + } + for _, c := range chunks { + fmt.Fprintf(w, "data: %s\n\n", c) + if flusher != nil { + flusher.Flush() + } + } + fmt.Fprint(w, "data: [DONE]\n\n") + })) + defer srv.Close() + + c := &Client{Endpoint: srv.URL + "/v1", Model: "test-model"} + + var deltas []string + resp, err := c.ChatStream(context.Background(), + ChatRequest{Messages: []Message{{Role: "user", Content: "hi"}}}, + func(d string) { deltas = append(deltas, d) }, + ) + if err != nil { + t.Fatalf("ChatStream: %v", err) + } + if got := strings.Join(deltas, ""); got != "Hello, world" { + t.Errorf("aggregated deltas: want %q got %q", "Hello, world", got) + } + if resp.Content != "Hello, world" { + t.Errorf("content: want %q got %q", "Hello, world", resp.Content) + } + if resp.PromptTokens != 3 || resp.OutputTokens != 5 { + t.Errorf("tokens: %+v", resp) + } + if resp.FinishReason != "stop" { + t.Errorf("finish_reason: want stop got %q", resp.FinishReason) + } +} + +func TestChat_RetriesOn429(t *testing.T) { + var calls int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + n := atomic.AddInt32(&calls, 1) + if n == 1 { + w.Header().Set("Retry-After", "1") + http.Error(w, "slow down", http.StatusTooManyRequests) + return + } + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, `{ + "model":"m","choices":[{"message":{"role":"assistant","content":"ok"},"finish_reason":"stop"}], + "usage":{"prompt_tokens":1,"completion_tokens":1} + }`) + })) + defer srv.Close() + + c := &Client{ + Endpoint: srv.URL + "/v1", + Model: "m", + HTTPClient: &http.Client{Timeout: 5 * time.Second}, + } + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + resp, err := c.Chat(ctx, ChatRequest{Messages: []Message{{Role: "user", Content: "hi"}}}) + if err != nil { + t.Fatalf("Chat: %v", err) + } + if resp.Content != "ok" { + t.Errorf("content: want ok got %q", resp.Content) + } + if got := atomic.LoadInt32(&calls); got != 2 { + t.Errorf("expected 2 server calls (1 retry), got %d", got) + } +} + +// Sanity: errFromStatus produces a string that retry.IsRateLimitError matches. +func TestErrFromStatus_RateLimitMarker(t *testing.T) { + resp := &http.Response{ + StatusCode: http.StatusTooManyRequests, + Header: http.Header{"Retry-After": []string{"30"}}, + } + body, _ := io.ReadAll(strings.NewReader("limit hit")) + err := errFromStatus(resp, body) + if !strings.Contains(strings.ToLower(err.Error()), "rate limit") { + t.Errorf("error should contain 'rate limit', got: %v", err) + } + if !strings.Contains(err.Error(), "retry-after: 30") { + t.Errorf("error should embed retry-after, got: %v", err) + } +} |
