summaryrefslogtreecommitdiff
path: root/internal/llm/client_test.go
diff options
context:
space:
mode:
authorClaude <noreply@anthropic.com>2026-04-28 09:24:43 +0000
committerClaude <noreply@anthropic.com>2026-04-28 09:24:43 +0000
commit0865afc43be562dbe14528e4299b9e213b54cc93 (patch)
tree3ffb11207fb6b9866b5a2477bba7abe38964f83a /internal/llm/client_test.go
parentc2aa026f6ce1c9e216b99d74f294fc133d5fcddd (diff)
feat(executor): add LocalRunner and OpenAI-compat LLM client
Phase 1 of "local OSS models as agents" plan. Adds a third Runner backed by any OpenAI-compatible HTTP server (Ollama, vLLM, LM Studio, llama.cpp), and migrates the Gemini-CLI classifier to route through the same client when configured. Two-layer split: internal/llm.Client is the workhorse (HTTP, no Pool, no DB) used directly by the classifier and any future internal helper that needs cheap reasoning. internal/executor.LocalRunner is a thin adapter implementing Runner for user-facing tasks. This avoids Pool reentrancy/deadlock when sub-second internal calls fire from inside Pool.execute(). Highlights: - internal/retry: relocated runWithBackoff/IsRateLimitError/ParseRetryAfter into a shared package reused by executor and llm. - internal/llm: Chat (non-streaming) and ChatStream (SSE) over /chat/completions with optional bearer auth, json_object response format, retry on 429/503, Retry-After parsing. - internal/executor/LocalRunner: streams deltas into stdout.log in the same stream-json envelope ClaudeRunner emits, then writes one consolidated assistant block plus a result terminator so existing parsers (extractSummary, ParseChangestatFromOutput) work unchanged. - internal/executor/Classifier: gains optional LLM field; uses json_object response format (no markdown-fence cleanup needed). Falls back to Gemini-CLI subprocess when LLM is nil. - Pool.skipClassification: now skips only when the requested agent type is registered, so unknown types still reach the load balancer. - Storage: additive tokens_in/tokens_out ALTERs on executions; CLI runners record cost_usd as before, LocalRunner records 0 + tokens. - Config: [local_model] section (endpoint, model, timeout_seconds, default_temperature, api_key). Empty endpoint = no LocalRunner registered, classifier falls back to Gemini. Pre-existing test issues fixed in passing: - claude_test.go setupSandbox callsites updated to current signature. - gemini_test.go TestParseGeminiStream skipped (asserts unimplemented GeminiRunner stream-error parsing; tracked separately). Plan: docs/plans/local-oss-runner.md. https://claude.ai/code/session_017Edeq947TpSm1vQTxMhi1J
Diffstat (limited to 'internal/llm/client_test.go')
-rw-r--r--internal/llm/client_test.go159
1 files changed, 159 insertions, 0 deletions
diff --git a/internal/llm/client_test.go b/internal/llm/client_test.go
new file mode 100644
index 0000000..8257836
--- /dev/null
+++ b/internal/llm/client_test.go
@@ -0,0 +1,159 @@
+package llm
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "net/http/httptest"
+ "strings"
+ "sync/atomic"
+ "testing"
+ "time"
+)
+
+func TestChat_ParsesCompletion(t *testing.T) {
+ srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ if r.URL.Path != "/v1/chat/completions" {
+ t.Errorf("unexpected path %q", r.URL.Path)
+ }
+ if r.Header.Get("Authorization") != "Bearer test-key" {
+ t.Errorf("missing/wrong bearer header: %q", r.Header.Get("Authorization"))
+ }
+ var body openAIRequest
+ if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
+ t.Fatalf("decode body: %v", err)
+ }
+ if body.Model != "test-model" {
+ t.Errorf("model: want test-model got %q", body.Model)
+ }
+ if len(body.Messages) != 1 || body.Messages[0].Content != "hello" {
+ t.Errorf("messages mismatch: %+v", body.Messages)
+ }
+ if body.ResponseFormat == nil || body.ResponseFormat.Type != "json_object" {
+ t.Errorf("expected response_format json_object, got %+v", body.ResponseFormat)
+ }
+ w.Header().Set("Content-Type", "application/json")
+ fmt.Fprintln(w, `{
+ "model": "test-model",
+ "choices": [{"message": {"role": "assistant", "content": "world"}, "finish_reason": "stop"}],
+ "usage": {"prompt_tokens": 4, "completion_tokens": 7}
+ }`)
+ }))
+ defer srv.Close()
+
+ c := &Client{Endpoint: srv.URL + "/v1", Model: "test-model", APIKey: "test-key"}
+ resp, err := c.Chat(context.Background(), ChatRequest{
+ Messages: []Message{{Role: "user", Content: "hello"}},
+ ResponseJSON: true,
+ })
+ if err != nil {
+ t.Fatalf("Chat: %v", err)
+ }
+ if resp.Content != "world" {
+ t.Errorf("content: want world got %q", resp.Content)
+ }
+ if resp.PromptTokens != 4 || resp.OutputTokens != 7 {
+ t.Errorf("tokens mismatch: %+v", resp)
+ }
+ if resp.FinishReason != "stop" {
+ t.Errorf("finish_reason: want stop got %q", resp.FinishReason)
+ }
+}
+
+func TestChatStream_ParsesSSE(t *testing.T) {
+ srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Content-Type", "text/event-stream")
+ flusher, _ := w.(http.Flusher)
+ chunks := []string{
+ `{"model":"test-model","choices":[{"delta":{"content":"Hel"},"finish_reason":""}]}`,
+ `{"model":"test-model","choices":[{"delta":{"content":"lo, "},"finish_reason":""}]}`,
+ `{"model":"test-model","choices":[{"delta":{"content":"world"},"finish_reason":"stop"}]}`,
+ `{"model":"test-model","choices":[],"usage":{"prompt_tokens":3,"completion_tokens":5}}`,
+ }
+ for _, c := range chunks {
+ fmt.Fprintf(w, "data: %s\n\n", c)
+ if flusher != nil {
+ flusher.Flush()
+ }
+ }
+ fmt.Fprint(w, "data: [DONE]\n\n")
+ }))
+ defer srv.Close()
+
+ c := &Client{Endpoint: srv.URL + "/v1", Model: "test-model"}
+
+ var deltas []string
+ resp, err := c.ChatStream(context.Background(),
+ ChatRequest{Messages: []Message{{Role: "user", Content: "hi"}}},
+ func(d string) { deltas = append(deltas, d) },
+ )
+ if err != nil {
+ t.Fatalf("ChatStream: %v", err)
+ }
+ if got := strings.Join(deltas, ""); got != "Hello, world" {
+ t.Errorf("aggregated deltas: want %q got %q", "Hello, world", got)
+ }
+ if resp.Content != "Hello, world" {
+ t.Errorf("content: want %q got %q", "Hello, world", resp.Content)
+ }
+ if resp.PromptTokens != 3 || resp.OutputTokens != 5 {
+ t.Errorf("tokens: %+v", resp)
+ }
+ if resp.FinishReason != "stop" {
+ t.Errorf("finish_reason: want stop got %q", resp.FinishReason)
+ }
+}
+
+func TestChat_RetriesOn429(t *testing.T) {
+ var calls int32
+ srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ n := atomic.AddInt32(&calls, 1)
+ if n == 1 {
+ w.Header().Set("Retry-After", "1")
+ http.Error(w, "slow down", http.StatusTooManyRequests)
+ return
+ }
+ w.Header().Set("Content-Type", "application/json")
+ fmt.Fprintln(w, `{
+ "model":"m","choices":[{"message":{"role":"assistant","content":"ok"},"finish_reason":"stop"}],
+ "usage":{"prompt_tokens":1,"completion_tokens":1}
+ }`)
+ }))
+ defer srv.Close()
+
+ c := &Client{
+ Endpoint: srv.URL + "/v1",
+ Model: "m",
+ HTTPClient: &http.Client{Timeout: 5 * time.Second},
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+ resp, err := c.Chat(ctx, ChatRequest{Messages: []Message{{Role: "user", Content: "hi"}}})
+ if err != nil {
+ t.Fatalf("Chat: %v", err)
+ }
+ if resp.Content != "ok" {
+ t.Errorf("content: want ok got %q", resp.Content)
+ }
+ if got := atomic.LoadInt32(&calls); got != 2 {
+ t.Errorf("expected 2 server calls (1 retry), got %d", got)
+ }
+}
+
+// Sanity: errFromStatus produces a string that retry.IsRateLimitError matches.
+func TestErrFromStatus_RateLimitMarker(t *testing.T) {
+ resp := &http.Response{
+ StatusCode: http.StatusTooManyRequests,
+ Header: http.Header{"Retry-After": []string{"30"}},
+ }
+ body, _ := io.ReadAll(strings.NewReader("limit hit"))
+ err := errFromStatus(resp, body)
+ if !strings.Contains(strings.ToLower(err.Error()), "rate limit") {
+ t.Errorf("error should contain 'rate limit', got: %v", err)
+ }
+ if !strings.Contains(err.Error(), "retry-after: 30") {
+ t.Errorf("error should embed retry-after, got: %v", err)
+ }
+}