From 0865afc43be562dbe14528e4299b9e213b54cc93 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 28 Apr 2026 09:24:43 +0000 Subject: feat(executor): add LocalRunner and OpenAI-compat LLM client Phase 1 of "local OSS models as agents" plan. Adds a third Runner backed by any OpenAI-compatible HTTP server (Ollama, vLLM, LM Studio, llama.cpp), and migrates the Gemini-CLI classifier to route through the same client when configured. Two-layer split: internal/llm.Client is the workhorse (HTTP, no Pool, no DB) used directly by the classifier and any future internal helper that needs cheap reasoning. internal/executor.LocalRunner is a thin adapter implementing Runner for user-facing tasks. This avoids Pool reentrancy/deadlock when sub-second internal calls fire from inside Pool.execute(). Highlights: - internal/retry: relocated runWithBackoff/IsRateLimitError/ParseRetryAfter into a shared package reused by executor and llm. - internal/llm: Chat (non-streaming) and ChatStream (SSE) over /chat/completions with optional bearer auth, json_object response format, retry on 429/503, Retry-After parsing. - internal/executor/LocalRunner: streams deltas into stdout.log in the same stream-json envelope ClaudeRunner emits, then writes one consolidated assistant block plus a result terminator so existing parsers (extractSummary, ParseChangestatFromOutput) work unchanged. - internal/executor/Classifier: gains optional LLM field; uses json_object response format (no markdown-fence cleanup needed). Falls back to Gemini-CLI subprocess when LLM is nil. - Pool.skipClassification: now skips only when the requested agent type is registered, so unknown types still reach the load balancer. - Storage: additive tokens_in/tokens_out ALTERs on executions; CLI runners record cost_usd as before, LocalRunner records 0 + tokens. - Config: [local_model] section (endpoint, model, timeout_seconds, default_temperature, api_key). Empty endpoint = no LocalRunner registered, classifier falls back to Gemini. Pre-existing test issues fixed in passing: - claude_test.go setupSandbox callsites updated to current signature. - gemini_test.go TestParseGeminiStream skipped (asserts unimplemented GeminiRunner stream-error parsing; tracked separately). Plan: docs/plans/local-oss-runner.md. https://claude.ai/code/session_017Edeq947TpSm1vQTxMhi1J --- internal/retry/backoff.go | 77 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100644 internal/retry/backoff.go (limited to 'internal/retry/backoff.go') diff --git a/internal/retry/backoff.go b/internal/retry/backoff.go new file mode 100644 index 0000000..b91abc4 --- /dev/null +++ b/internal/retry/backoff.go @@ -0,0 +1,77 @@ +// Package retry provides exponential-backoff retry helpers used across the +// codebase for rate-limit-aware HTTP/subprocess calls. +package retry + +import ( + "context" + "fmt" + "regexp" + "strconv" + "strings" + "time" +) + +var retryAfterRe = regexp.MustCompile(`(?i)retry[-_ ]after[:\s]+(\d+)`) + +const maxBackoffDelay = 5 * time.Minute + +// IsRateLimitError returns true if err looks like a transient rate-limit +// (e.g. HTTP 429, "too many requests", "overloaded") that is worth retrying. +func IsRateLimitError(err error) bool { + if err == nil { + return false + } + msg := strings.ToLower(err.Error()) + return strings.Contains(msg, "rate limit") || + strings.Contains(msg, "too many requests") || + strings.Contains(msg, "429") || + strings.Contains(msg, "overloaded") +} + +// ParseRetryAfter extracts a Retry-After duration from an error message. +// Returns 0 if no retry-after value is found. +func ParseRetryAfter(msg string) time.Duration { + m := retryAfterRe.FindStringSubmatch(msg) + if m == nil { + return 0 + } + secs, err := strconv.Atoi(m[1]) + if err != nil || secs <= 0 { + return 0 + } + return time.Duration(secs) * time.Second +} + +// RunWithBackoff calls fn repeatedly on rate-limit errors, using exponential backoff. +// maxRetries is the max number of retry attempts (not counting the initial call). +// baseDelay is the initial backoff duration (doubled each retry). +func RunWithBackoff(ctx context.Context, maxRetries int, baseDelay time.Duration, fn func() error) error { + var lastErr error + for attempt := 0; attempt <= maxRetries; attempt++ { + lastErr = fn() + if lastErr == nil { + return nil + } + if !IsRateLimitError(lastErr) { + return lastErr + } + if attempt == maxRetries { + break + } + + delay := baseDelay * (1 << attempt) + if delay > maxBackoffDelay { + delay = maxBackoffDelay + } + if ra := ParseRetryAfter(lastErr.Error()); ra > 0 { + delay = ra + } + + select { + case <-ctx.Done(): + return fmt.Errorf("context cancelled during rate-limit backoff: %w", ctx.Err()) + case <-time.After(delay): + } + } + return lastErr +} -- cgit v1.2.3