summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorClaude <noreply@anthropic.com>2026-04-28 09:24:43 +0000
committerClaude <noreply@anthropic.com>2026-04-28 09:24:43 +0000
commit0865afc43be562dbe14528e4299b9e213b54cc93 (patch)
tree3ffb11207fb6b9866b5a2477bba7abe38964f83a
parentc2aa026f6ce1c9e216b99d74f294fc133d5fcddd (diff)
feat(executor): add LocalRunner and OpenAI-compat LLM client
Phase 1 of "local OSS models as agents" plan. Adds a third Runner backed by any OpenAI-compatible HTTP server (Ollama, vLLM, LM Studio, llama.cpp), and migrates the Gemini-CLI classifier to route through the same client when configured. Two-layer split: internal/llm.Client is the workhorse (HTTP, no Pool, no DB) used directly by the classifier and any future internal helper that needs cheap reasoning. internal/executor.LocalRunner is a thin adapter implementing Runner for user-facing tasks. This avoids Pool reentrancy/deadlock when sub-second internal calls fire from inside Pool.execute(). Highlights: - internal/retry: relocated runWithBackoff/IsRateLimitError/ParseRetryAfter into a shared package reused by executor and llm. - internal/llm: Chat (non-streaming) and ChatStream (SSE) over /chat/completions with optional bearer auth, json_object response format, retry on 429/503, Retry-After parsing. - internal/executor/LocalRunner: streams deltas into stdout.log in the same stream-json envelope ClaudeRunner emits, then writes one consolidated assistant block plus a result terminator so existing parsers (extractSummary, ParseChangestatFromOutput) work unchanged. - internal/executor/Classifier: gains optional LLM field; uses json_object response format (no markdown-fence cleanup needed). Falls back to Gemini-CLI subprocess when LLM is nil. - Pool.skipClassification: now skips only when the requested agent type is registered, so unknown types still reach the load balancer. - Storage: additive tokens_in/tokens_out ALTERs on executions; CLI runners record cost_usd as before, LocalRunner records 0 + tokens. - Config: [local_model] section (endpoint, model, timeout_seconds, default_temperature, api_key). Empty endpoint = no LocalRunner registered, classifier falls back to Gemini. Pre-existing test issues fixed in passing: - claude_test.go setupSandbox callsites updated to current signature. - gemini_test.go TestParseGeminiStream skipped (asserts unimplemented GeminiRunner stream-error parsing; tracked separately). Plan: docs/plans/local-oss-runner.md. https://claude.ai/code/session_017Edeq947TpSm1vQTxMhi1J
-rw-r--r--docs/plans/local-oss-runner.md185
-rw-r--r--internal/cli/llm.go31
-rw-r--r--internal/cli/run.go16
-rw-r--r--internal/cli/serve.go18
-rw-r--r--internal/config/config.go37
-rw-r--r--internal/executor/classifier.go33
-rw-r--r--internal/executor/classifier_test.go76
-rw-r--r--internal/executor/claude.go5
-rw-r--r--internal/executor/claude_test.go6
-rw-r--r--internal/executor/executor.go12
-rw-r--r--internal/executor/gemini_test.go1
-rw-r--r--internal/executor/local.go171
-rw-r--r--internal/executor/local_test.go152
-rw-r--r--internal/executor/ratelimit.go80
-rw-r--r--internal/llm/client.go343
-rw-r--r--internal/llm/client_test.go159
-rw-r--r--internal/retry/backoff.go77
-rw-r--r--internal/retry/backoff_test.go (renamed from internal/executor/ratelimit_test.go)39
-rw-r--r--internal/storage/db.go29
-rw-r--r--internal/task/task.go5
20 files changed, 1343 insertions, 132 deletions
diff --git a/docs/plans/local-oss-runner.md b/docs/plans/local-oss-runner.md
new file mode 100644
index 0000000..de16e05
--- /dev/null
+++ b/docs/plans/local-oss-runner.md
@@ -0,0 +1,185 @@
+# Local OSS Models as a Third Runner
+
+## Context
+
+Today the executor only knows about subprocess CLI agents (Claude, with a stubbed Gemini). Internal LLM-shaped work — model classification, free-form prompt elaboration, webhook CI summarization, execution summary — either shells out to the `gemini` CLI (`internal/executor/classifier.go:60`) or sits in `internal/api/elaborate.go` doing the same. That's expensive in latency and dollars for what are essentially helper completions, and there's no path to keep "internal" reasoning private/local.
+
+This change adds a local OSS model backend (any OpenAI-compatible HTTP server: Ollama, vLLM, LM Studio, llama.cpp server) as a first-class third Runner alongside Claude and Gemini. The unified harness model wins over a separate "internal LLM service" because it preserves a single `Runner` abstraction, single `executions` table, and one set of pool semantics (rate-limit handling, observability, WebSocket events) for any task whose `agent.type == "local"`.
+
+Outcome: a `LocalRunner` for user-facing tasks, plus a lower-level `LocalLLMClient` that internal helpers call directly without paying Pool/Execution overhead. First migration target is the classifier (sub-second, high-volume, lowest blast radius). Elaboration, webhook summarization, and execution summary follow in subsequent passes using the same client.
+
+## Architectural decision: two layers, one backend
+
+`LocalRunner implements Runner` is the user-visible contract. But the classifier runs *inside* `Pool.execute()` (at `internal/executor/executor.go:437`), so submitting recursively to `Pool` would deadlock against `workCh`'s slot accounting and pollute the `executions` table with sub-second rows for every classification.
+
+Resolution: introduce a `LocalLLMClient` (HTTP, no Pool, no DB) as the workhorse. `LocalRunner` is a thin adapter over it for full Pool-managed executions. Internal callers — classifier now, elaborate/webhook/summary later — call `LocalLLMClient` directly. Two code paths to local, but path lengths are wildly unequal (the runner is ~150 lines of glue) and they share one HTTP round-tripper.
+
+Capabilities (e.g. "this runner can edit code, that one can't") are deferred. `LocalRunner` simply leaves `SandboxDir` empty; the Pool already tolerates that. Revisit only when a third non-coding runner appears.
+
+## End state
+
+- **`internal/llm`** (new package) — `LocalLLMClient` with `Chat` and `ChatStream` over OpenAI-compat `/chat/completions`. Handles retries via the existing backoff helper, JSON mode, SSE streaming, optional bearer token.
+- **`internal/executor/local.go`** (new) — `LocalRunner` implements `Runner`. Streams response deltas into the same stream-json envelope Claude uses (`{"type":"assistant","message":{"content":[{"type":"text","text":"..."}]}}`) so existing parsers (`internal/executor/summary.go:13`, `internal/task/changestats.go`) keep working unchanged.
+- **`Classifier`** (`internal/executor/classifier.go`) — now holds a `*llm.Client`. When set, classification goes through it with `response_format: json_object`; markdown-fence cleanup is skipped on this path. Gemini-CLI path stays as a fallback when `[local_model]` config is empty.
+- **Storage** — `executions.tokens_in` and `tokens_out` added (additive `ALTER`, schema pattern at `internal/storage/db.go:78-89`). `cost_usd` stays 0 for local. `session_id`/`sandbox_dir` remain nullable; `LocalRunner` simply doesn't populate them.
+- **`AgentConfig`** — adds `Temperature *float64` (pointer so 0 means "unset") and `MaxTokens int` at `internal/task/task.go:30`. Existing Claude-shaped fields (`PermissionMode`, `AllowedTools`, etc.) are silently ignored by `LocalRunner`.
+- **Config** — new `[local_model]` TOML section in `internal/config/config.go:18`: `endpoint`, `model`, `timeout_seconds`, `default_temperature`, `api_key`. If `endpoint` is empty, the runner is not registered and the classifier falls back to Gemini-CLI.
+- **Routing** — `executor.go:428`'s hardcoded `t.Agent.Type == "claude" || == "gemini"` widens to include `"local"` (or, cleaner, becomes `t.Agent.Type != ""`).
+- **Wiring** — `cmd/claudomator/main.go`, `internal/cli/serve.go:60-78`, and `internal/cli/run.go:75-90` build the `*llm.Client` from config and register both `runners["local"]` and `pool.Classifier.LLM`.
+- **GeminiRunner** (`internal/executor/gemini.go`) — kept and finished alongside as a separate concern. The shared backoff helper move (below) and the `LogPather` interface it already implements (`gemini.go:26`) are unaffected. Real subprocess invocation replacing the simulated stdout block at `gemini.go:107-116` is a follow-up commit, not gated by this change.
+
+Shared utility move: `runWithBackoff` currently lives at `internal/executor/ratelimit.go:60`. Move it to a new tiny `internal/retry` package so both `internal/executor` and `internal/llm` use it. One-line change at the existing call site in `claude.go`.
+
+## Migration phases
+
+**Phase 1 — this pass. Classifier swap.** All the `internal/llm` + `internal/executor/local.go` + `Classifier` work above. Gated by config: if `[local_model].endpoint` is unset, behavior is unchanged. Net new files; no breaking changes to existing runners.
+
+**Phase 2 — task elaboration.** `internal/api/elaborate.go:208-275` currently has Claude and Gemini paths. Add `elaborateWithLocal`; new try-order is local → claude → gemini, controlled by a `prefer_local_for_elaborate` config flag. `Server` (`internal/api/server.go:76`) gains an `llm *llm.Client` field passed via `NewServer`.
+
+**Phase 3 — webhook CI summarization.** `createCIFailureTask` at `internal/api/webhook.go:154` builds task instructions from a hardcoded template. Add an optional summarization step calling `s.llm.Chat` over the fetched workflow logs to produce a tighter `instructions` body. Pure additive.
+
+**Phase 4 — execution summary.** `extractSummary` (`internal/executor/summary.go:13`) is text-pattern based. Add `summarizeExecution(ctx, *llm.Client, stdoutPath) string` that synthesizes a summary when no `## Summary` section exists. Hook lives in `Pool.handleRunResult` at `executor.go:347-355`; pass `*llm.Client` through `Pool` construction.
+
+## Critical files
+
+**New:**
+- `internal/llm/client.go` — `Client`, `Chat`, `ChatStream`, request/response types
+- `internal/llm/client_test.go` — `httptest`-driven coverage
+- `internal/executor/local.go` — `LocalRunner`
+- `internal/executor/local_test.go` — runner tests with stub `*llm.Client`
+- `internal/retry/backoff.go` — relocated `runWithBackoff`
+
+**Modified:**
+- `internal/executor/classifier.go` — add `LLM *llm.Client` field, route through it when set, keep Gemini fallback path
+- `internal/executor/classifier_test.go` — add httptest-backed test
+- `internal/executor/executor.go:428` — broaden `skipClassification` predicate
+- `internal/executor/ratelimit.go` — remove `runWithBackoff` (moved); update import in `claude.go`
+- `internal/task/task.go:30-43` — add `Temperature`, `MaxTokens` to `AgentConfig`
+- `internal/config/config.go:18-52` — add `LocalModel` struct + field to `Config`
+- `internal/storage/db.go:78-89` — two additive `ALTER` migrations; add `TokensIn`/`TokensOut` to `Execution` struct; update SELECT/INSERT/UPDATE SQL in same file
+- `internal/cli/serve.go:60-78`, `internal/cli/run.go:75-90`, `cmd/claudomator/main.go` — build client, register runner, wire classifier
+
+## Reuse, not reinvention
+
+- `runWithBackoff` (`internal/executor/ratelimit.go:60`) → relocated and reused by `LocalLLMClient`
+- `isRateLimitError`/`isQuotaExhausted` (`executor.go:271-283`) → emit compatible error strings from `LocalLLMClient` so Pool's existing rate-limit handling treats local 429/503 identically
+- Stream-json envelope from `claude.go:600` parsing → `LocalRunner` writes the same envelope so `extractSummary` and `ParseChangestatFromFile` work unchanged
+- Existing nullable `session_id`/`sandbox_dir` columns → no schema rework needed for non-coding runners
+- `LogPather` interface (`executor.go:38`) → `LocalRunner` implements it for log path pre-population, just like `GeminiRunner` already does
+
+## Verification
+
+**Unit tests:**
+- `internal/llm/client_test.go`: httptest server returns canned chat-completion JSON; assert `Chat` returns parsed `Content`, prompt/output tokens, model. Second test: SSE stream (data: lines, terminating `data: [DONE]`); assert `onDelta` called per chunk and final `ChatResponse` aggregated. Third: HTTP 429 with `Retry-After: 1` → assert one retry then success.
+- `internal/executor/classifier_test.go`: httptest backend returning JSON-mode response → assert `Classification` parsed correctly. Existing mock-binary test stays for the Gemini fallback path.
+- `internal/executor/local_test.go`: stub `*llm.Client` returning fixed text → `Run` writes correct stream-json envelope to `stdout.log`; verify `extractSummary` finds `## Summary` from that envelope.
+- `go test -race ./...` passes (Pool reentrancy is the risk this design avoids; race detector would catch slips).
+
+**Manual end-to-end against Ollama:**
+1. `ollama pull llama3.1:8b && ollama serve`
+2. Add to `~/.claudomator/config.toml`:
+ ```toml
+ [local_model]
+ endpoint = "http://localhost:11434/v1"
+ model = "llama3.1:8b"
+ ```
+3. `./claudomator serve` → submit a normal task → observe a single classification request hit Ollama (no `gemini` subprocess spawned) and a model selection logged at `executor.go:440`.
+4. Submit a task with `agent.type = "local"`, `instructions = "Summarize: 2+2"`. Expect `READY`/`COMPLETED` execution, populated `stdout.log` with stream-json text deltas, `cost_usd = 0`, non-zero `tokens_out` in the `executions` row.
+5. Stop Ollama → submit another task → classifier should fall back to `gemini` invocation (or fail with a rate-limit-style error if no Gemini binary present). Confirms the `endpoint == ""` and runtime-failure fallback paths both work.
+
+**Build sanity:** `go build ./...` and `go test -race ./...` (CGo / `gcc` required per CLAUDE.md).
+
+---
+
+# Phase 1 — Focused Plan
+
+This is the only phase we execute in this pass. Phases 2–4 will get their own focused plans when we start them; the sketches above are forward intent, not commitments.
+
+## Phase 1 scope (what ships)
+
+- New `internal/llm` package with `Client.Chat` and `Client.ChatStream`
+- New `internal/executor/local.go` with `LocalRunner` implementing `Runner`
+- New `internal/retry` package holding the relocated `runWithBackoff`
+- Classifier (`internal/executor/classifier.go`) routes through `*llm.Client` when configured; Gemini-CLI fallback preserved
+- Two additive `executions` migrations: `tokens_in`, `tokens_out`
+- `AgentConfig` gains `Temperature *float64`, `MaxTokens int`
+- `Config` gains `[local_model]` section (`endpoint`, `model`, `timeout_seconds`, `default_temperature`, `api_key`)
+- `executor.go:428` `skipClassification` predicate broadens to all non-empty agent types
+- Wiring in `cmd/claudomator/main.go`, `internal/cli/serve.go`, `internal/cli/run.go`
+
+## Phase 1 explicit non-goals
+
+- No changes to `internal/api/elaborate.go` (Phase 2)
+- No changes to `internal/api/webhook.go` (Phase 3)
+- No changes to `internal/executor/summary.go` summary-generation logic (Phase 4)
+- No GeminiRunner completion work (cost parsing, sandbox, real subprocess) — separate parallel commit
+- No frontend changes — UI still says "Auto / Claude / Gemini"; "Local" dropdown option deferred until token telemetry surfaces
+- No capabilities interface on `Runner`
+- No new `executions` columns beyond the two token counters
+
+## Phase 1 task list (in execution order)
+
+1. **Persist this plan to the workspace.** Copy `/root/.claude/plans/major-revision-we-re-going-quizzical-newell.md` to `docs/plans/local-oss-runner.md`. This is the durable record that lives with the codebase. Phase 2/3/4 focused plans will be appended to the same file when started.
+
+2. **Create branch.** `git checkout -b claude/local-oss-model-agents-MEBqj` (already designated; create if it doesn't exist).
+
+3. **`internal/retry/backoff.go`** — relocate `runWithBackoff` from `internal/executor/ratelimit.go:60`. Update the existing call site in `internal/executor/claude.go` to import from the new path. Keep all signature and behavior unchanged. Run `go build ./...` and `go test ./internal/executor/...` to confirm zero behavioral change.
+
+4. **`internal/llm/client.go`** — implement the package. Types from the design:
+ - `Client{Endpoint, Model, HTTPClient, APIKey, Logger}`
+ - `ChatRequest{Model, Messages, Temperature, MaxTokens, ResponseJSON, Stream}`
+ - `Message{Role, Content}`
+ - `ChatResponse{Content, PromptTokens, OutputTokens, Model, FinishReason}`
+ - `Chat(ctx, req)` — POSTs `/chat/completions`, wraps in `retry.RunWithBackoff`, maps 429/503 to `isRateLimitError`-compatible error strings
+ - `ChatStream(ctx, req, onDelta)` — same endpoint with `stream: true`, parses SSE `data:` lines, calls `onDelta(text)` per chunk, terminates on `data: [DONE]`, aggregates final response
+
+5. **`internal/llm/client_test.go`** — three tests:
+ - Canned chat-completion JSON → assert `Chat` returns parsed `Content`, prompt/output tokens, model
+ - SSE stream of `data:` lines terminated by `data: [DONE]` → assert `onDelta` called per chunk, final `ChatResponse` aggregated
+ - HTTP 429 with `Retry-After: 1` → assert one retry then success
+
+6. **`internal/storage/db.go:78-89`** — append two `ALTER TABLE executions ADD COLUMN` migrations for `tokens_in INTEGER` and `tokens_out INTEGER`. Add `TokensIn`, `TokensOut int64` to `Execution` struct. Update SELECT, INSERT, UPDATE SQL in the same file. Existing `isColumnExistsError` swallows duplicate-column errors so re-running is safe.
+
+7. **`internal/task/task.go:30-43`** — add `Temperature *float64` and `MaxTokens int` to `AgentConfig` with appropriate yaml/json tags. Pointer for Temperature so 0 means "unset, use server default."
+
+8. **`internal/config/config.go:18-52`** — add `LocalModel` struct (`Endpoint`, `Model`, `TimeoutSeconds`, `DefaultTemperature`, `APIKey`) and `LocalModel LocalModel` field on `Config`. `Default()` leaves `Endpoint` empty.
+
+9. **`internal/executor/local.go`** — `LocalRunner` struct with `Client *llm.Client`, `Logger`, `LogDir`. Implement `Run(ctx, *task.Task, *storage.Execution) error`:
+ - Build messages from `t.Agent.SystemPromptAppend` + `Instructions`
+ - Call `Client.ChatStream` with `onDelta` writing `{"type":"assistant","message":{"content":[{"type":"text","text":"<delta>"}]}}` lines to `e.StdoutPath`
+ - On completion, write a final `{"type":"result", ...}` line so existing parsers see a recognizable terminator
+ - Set `e.TokensIn`, `e.TokensOut`, `e.CostUSD = 0`, `e.Status = "READY"`
+ - Implement `LogPather` so log paths pre-populate consistently with other runners
+
+10. **`internal/executor/local_test.go`** — runner tests with a stub `*llm.Client` (use a small interface or test-injected `HTTPClient`):
+ - Stub returns fixed text containing a `## Summary` section
+ - Assert `Run` writes correct stream-json envelope to `stdout.log`
+ - Assert `extractSummary(stdoutPath)` (from `internal/executor/summary.go`) finds the summary
+ - Assert `e.TokensOut > 0` and `e.CostUSD == 0`
+
+11. **`internal/executor/classifier.go`** — add `LLM *llm.Client` field on `Classifier`. In `Classify`, when `c.LLM != nil`, use `LLM.Chat` with `ResponseJSON: true`, skip the markdown-fence cleanup. When nil, fall through to the existing Gemini-CLI subprocess path. Existing prompt template stays (already lists Claude+Gemini models, which is what the classifier still picks among).
+
+12. **`internal/executor/classifier_test.go`** — add httptest-backed test for the LLM path. Existing mock-binary test (if present) stays for the Gemini fallback path.
+
+13. **`internal/executor/executor.go:428`** — change `skipClassification := t.Agent.Type == "claude" || t.Agent.Type == "gemini"` to `skipClassification := t.Agent.Type != ""`. This generalizes correctly: any explicitly-set agent type skips selection; unset still goes through `pickAgent` + `Classifier`.
+
+14. **Wire registration** in three files:
+ - `cmd/claudomator/main.go` — build `*llm.Client` from `cfg.LocalModel` if `Endpoint != ""`, pass to pool construction
+ - `internal/cli/serve.go:60-78` — register `runners["local"] = &executor.LocalRunner{...}`, set `pool.Classifier = &executor.Classifier{LLM: localClient, GeminiBinaryPath: cfg.GeminiBinaryPath}`
+ - `internal/cli/run.go:75-90` — same pattern
+
+15. **`go test -race ./...`** — full suite passes. The race detector is the safety net for the reentrancy-avoidance design.
+
+16. **Manual smoke test against Ollama** — five steps documented in the Verification section above. Confirm the fallback path by stopping Ollama mid-session and watching classification fall back to Gemini.
+
+17. **Commit and push** to `claude/local-oss-model-agents-MEBqj`. Single commit covering Phase 1, message in the form: `feat(executor): add LocalRunner and OpenAI-compat LLM client`. Body describes the two-layer split (Client + Runner), the classifier swap, and the config gating.
+
+## Stop conditions for Phase 1
+
+- All unit tests pass under `-race`
+- `go build ./...` clean
+- Smoke test against a running Ollama instance produces a `READY` execution with non-zero `tokens_out` and `cost_usd = 0`
+- Smoke test with `[local_model]` empty produces unchanged behavior (Gemini classifier path, no LocalRunner registered)
+- Branch pushed to remote
+
+After Phase 1 lands, we stop and decide whether to begin Phase 2 (elaboration). At that point we'll write a Phase 2 focused plan in `docs/plans/local-oss-runner.md`.
diff --git a/internal/cli/llm.go b/internal/cli/llm.go
new file mode 100644
index 0000000..04fe902
--- /dev/null
+++ b/internal/cli/llm.go
@@ -0,0 +1,31 @@
+package cli
+
+import (
+ "log/slog"
+ "net/http"
+ "time"
+
+ "github.com/thepeterstone/claudomator/internal/config"
+ "github.com/thepeterstone/claudomator/internal/llm"
+)
+
+// buildLocalLLMClient returns an *llm.Client when a local model endpoint is
+// configured. Returns nil when LocalModel.Endpoint is empty so callers can
+// gate on `if c != nil` to skip registering LocalRunner / using the LLM
+// classifier path.
+func buildLocalLLMClient(cfg config.LocalModel, logger *slog.Logger) *llm.Client {
+ if cfg.Endpoint == "" {
+ return nil
+ }
+ timeout := 60 * time.Second
+ if cfg.TimeoutSeconds > 0 {
+ timeout = time.Duration(cfg.TimeoutSeconds) * time.Second
+ }
+ return &llm.Client{
+ Endpoint: cfg.Endpoint,
+ Model: cfg.Model,
+ APIKey: cfg.APIKey,
+ HTTPClient: &http.Client{Timeout: timeout},
+ Logger: logger,
+ }
+}
diff --git a/internal/cli/run.go b/internal/cli/run.go
index 49aa28e..2da7b79 100644
--- a/internal/cli/run.go
+++ b/internal/cli/run.go
@@ -84,9 +84,21 @@ func runTasks(file string, parallel int, dryRun bool) error {
LogDir: cfg.LogDir,
},
}
+
+ localClient := buildLocalLLMClient(cfg.LocalModel, logger)
+ if localClient != nil {
+ runners["local"] = &executor.LocalRunner{
+ Client: localClient,
+ Logger: logger,
+ LogDir: cfg.LogDir,
+ DefaultTemperature: cfg.LocalModel.DefaultTemperature,
+ }
+ }
+
pool := executor.NewPool(parallel, runners, store, logger)
- if cfg.GeminiBinaryPath != "" {
- pool.Classifier = &executor.Classifier{GeminiBinaryPath: cfg.GeminiBinaryPath}
+ pool.Classifier = &executor.Classifier{
+ LLM: localClient,
+ GeminiBinaryPath: cfg.GeminiBinaryPath,
}
// Handle graceful shutdown.
diff --git a/internal/cli/serve.go b/internal/cli/serve.go
index 94f0c5d..e183bfc 100644
--- a/internal/cli/serve.go
+++ b/internal/cli/serve.go
@@ -71,10 +71,22 @@ func serve(addr string) error {
APIURL: apiURL,
},
}
-
+
+ localClient := buildLocalLLMClient(cfg.LocalModel, logger)
+ if localClient != nil {
+ runners["local"] = &executor.LocalRunner{
+ Client: localClient,
+ Logger: logger,
+ LogDir: cfg.LogDir,
+ DefaultTemperature: cfg.LocalModel.DefaultTemperature,
+ }
+ logger.Info("local runner registered", "endpoint", cfg.LocalModel.Endpoint, "model", cfg.LocalModel.Model)
+ }
+
pool := executor.NewPool(cfg.MaxConcurrent, runners, store, logger)
- if cfg.GeminiBinaryPath != "" {
- pool.Classifier = &executor.Classifier{GeminiBinaryPath: cfg.GeminiBinaryPath}
+ pool.Classifier = &executor.Classifier{
+ LLM: localClient,
+ GeminiBinaryPath: cfg.GeminiBinaryPath,
}
pool.RecoverStaleRunning(context.Background())
pool.RecoverStaleQueued(context.Background())
diff --git a/internal/config/config.go b/internal/config/config.go
index ce3b53f..7f87391 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -15,19 +15,32 @@ type Project struct {
Dir string `toml:"dir"`
}
+// LocalModel configures an OpenAI-compatible local LLM endpoint used for
+// internal helpers (classifier, future elaboration/summarization) and as the
+// backend for the "local" runner. If Endpoint is empty, the LocalRunner is
+// not registered and the classifier falls back to the Gemini CLI.
+type LocalModel struct {
+ Endpoint string `toml:"endpoint"` // e.g. "http://localhost:11434/v1"
+ Model string `toml:"model"` // e.g. "llama3.1:8b"
+ TimeoutSeconds int `toml:"timeout_seconds"` // default 60
+ DefaultTemperature float64 `toml:"default_temperature"` // default 0.2
+ APIKey string `toml:"api_key"` // optional bearer token
+}
+
type Config struct {
- DataDir string `toml:"data_dir"`
- DBPath string `toml:"-"`
- LogDir string `toml:"-"`
- ClaudeBinaryPath string `toml:"claude_binary_path"`
- GeminiBinaryPath string `toml:"gemini_binary_path"`
- MaxConcurrent int `toml:"max_concurrent"`
- DefaultTimeout string `toml:"default_timeout"`
- ServerAddr string `toml:"server_addr"`
- WebhookURL string `toml:"webhook_url"`
- WorkspaceRoot string `toml:"workspace_root"`
- WebhookSecret string `toml:"webhook_secret"`
- Projects []Project `toml:"projects"`
+ DataDir string `toml:"data_dir"`
+ DBPath string `toml:"-"`
+ LogDir string `toml:"-"`
+ ClaudeBinaryPath string `toml:"claude_binary_path"`
+ GeminiBinaryPath string `toml:"gemini_binary_path"`
+ MaxConcurrent int `toml:"max_concurrent"`
+ DefaultTimeout string `toml:"default_timeout"`
+ ServerAddr string `toml:"server_addr"`
+ WebhookURL string `toml:"webhook_url"`
+ WorkspaceRoot string `toml:"workspace_root"`
+ WebhookSecret string `toml:"webhook_secret"`
+ Projects []Project `toml:"projects"`
+ LocalModel LocalModel `toml:"local_model"`
}
func Default() (*Config, error) {
diff --git a/internal/executor/classifier.go b/internal/executor/classifier.go
index 7a474b6..049dc4f 100644
--- a/internal/executor/classifier.go
+++ b/internal/executor/classifier.go
@@ -6,6 +6,8 @@ import (
"fmt"
"os/exec"
"strings"
+
+ "github.com/thepeterstone/claudomator/internal/llm"
)
type Classification struct {
@@ -19,7 +21,12 @@ type SystemStatus struct {
RateLimited map[string]bool
}
+// Classifier picks a model for an incoming task. When LLM is non-nil the
+// classifier routes through the local OpenAI-compatible client (cheap,
+// private, fast). Otherwise it falls back to invoking the Gemini CLI
+// at GeminiBinaryPath.
type Classifier struct {
+ LLM *llm.Client
GeminiBinaryPath string
}
@@ -62,6 +69,10 @@ func (c *Classifier) Classify(ctx context.Context, taskName, instructions string
agentType, taskName, instructions, agentType,
)
+ if c.LLM != nil {
+ return c.classifyViaLLM(ctx, prompt, agentType)
+ }
+
binary := c.GeminiBinaryPath
if binary == "" {
binary = "gemini"
@@ -123,3 +134,25 @@ func (c *Classifier) Classify(ctx context.Context, taskName, instructions string
return &cls, nil
}
+
+// classifyViaLLM routes classification through the local OpenAI-compatible
+// client with response_format=json_object, so we get clean JSON without the
+// markdown-fence cleanup needed for the Gemini CLI fallback.
+func (c *Classifier) classifyViaLLM(ctx context.Context, prompt, agentType string) (*Classification, error) {
+ resp, err := c.LLM.Chat(ctx, llm.ChatRequest{
+ Messages: []llm.Message{{Role: "user", Content: prompt}},
+ ResponseJSON: true,
+ })
+ if err != nil {
+ return nil, fmt.Errorf("classifier (local llm): %w", err)
+ }
+ body := strings.TrimSpace(resp.Content)
+ var cls Classification
+ if err := json.Unmarshal([]byte(body), &cls); err != nil {
+ return nil, fmt.Errorf("classifier (local llm): parse JSON: %w\nbody: %s", err, body)
+ }
+ if cls.AgentType == "" {
+ cls.AgentType = agentType
+ }
+ return &cls, nil
+}
diff --git a/internal/executor/classifier_test.go b/internal/executor/classifier_test.go
index 83a9743..84fffcf 100644
--- a/internal/executor/classifier_test.go
+++ b/internal/executor/classifier_test.go
@@ -2,8 +2,15 @@ package executor
import (
"context"
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "net/http/httptest"
"os"
+ "strings"
"testing"
+
+ "github.com/thepeterstone/claudomator/internal/llm"
)
// TestClassifier_Classify_Mock tests the classifier with a mocked gemini binary.
@@ -36,6 +43,75 @@ echo '{"response": "{\"agent_type\": \"gemini\", \"model\": \"gemini-2.5-flash-l
}
}
+// TestClassifier_Classify_LLM tests classification through a local OpenAI-compatible LLM.
+func TestClassifier_Classify_LLM(t *testing.T) {
+ srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ // Verify the classifier asked for JSON mode.
+ var body struct {
+ ResponseFormat *struct {
+ Type string `json:"type"`
+ } `json:"response_format"`
+ }
+ if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
+ t.Fatalf("decode body: %v", err)
+ }
+ if body.ResponseFormat == nil || body.ResponseFormat.Type != "json_object" {
+ t.Errorf("classifier should request json_object response format")
+ }
+
+ w.Header().Set("Content-Type", "application/json")
+ fmt.Fprintln(w, `{
+ "model":"local-fast",
+ "choices":[{"message":{"role":"assistant","content":"{\"agent_type\":\"claude\",\"model\":\"claude-haiku-4-5-20251001\",\"reason\":\"trivial task\"}"},"finish_reason":"stop"}],
+ "usage":{"prompt_tokens":10,"completion_tokens":15}
+ }`)
+ }))
+ defer srv.Close()
+
+ c := &Classifier{
+ LLM: &llm.Client{Endpoint: srv.URL + "/v1", Model: "local-fast"},
+ }
+ status := SystemStatus{
+ ActiveTasks: map[string]int{"claude": 1, "gemini": 0},
+ RateLimited: map[string]bool{},
+ }
+
+ cls, err := c.Classify(context.Background(), "List files", "ls -la", status, "claude")
+ if err != nil {
+ t.Fatalf("Classify: %v", err)
+ }
+ if cls.AgentType != "claude" {
+ t.Errorf("AgentType: want claude got %q", cls.AgentType)
+ }
+ if cls.Model != "claude-haiku-4-5-20251001" {
+ t.Errorf("Model: want claude-haiku-4-5-20251001 got %q", cls.Model)
+ }
+ if !strings.Contains(cls.Reason, "trivial") {
+ t.Errorf("Reason mismatch: %q", cls.Reason)
+ }
+}
+
+// TestClassifier_LLMTakesPrecedence_OverGemini ensures the LLM path is preferred when both are configured.
+func TestClassifier_LLMTakesPrecedence_OverGemini(t *testing.T) {
+ srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Content-Type", "application/json")
+ fmt.Fprintln(w, `{"model":"x","choices":[{"message":{"content":"{\"agent_type\":\"claude\",\"model\":\"claude-sonnet-4-6\",\"reason\":\"r\"}"},"finish_reason":"stop"}],"usage":{}}`)
+ }))
+ defer srv.Close()
+
+ c := &Classifier{
+ LLM: &llm.Client{Endpoint: srv.URL + "/v1", Model: "x"},
+ GeminiBinaryPath: "/nonexistent/gemini-binary-should-not-be-called",
+ }
+ cls, err := c.Classify(context.Background(), "n", "i", SystemStatus{}, "claude")
+ if err != nil {
+ t.Fatalf("Classify: %v", err)
+ }
+ if cls.Model != "claude-sonnet-4-6" {
+ t.Errorf("expected LLM path; got Model=%q", cls.Model)
+ }
+}
+
func filepathJoin(elems ...string) string {
var path string
for i, e := range elems {
diff --git a/internal/executor/claude.go b/internal/executor/claude.go
index 7e79ce0..e3f8e1c 100644
--- a/internal/executor/claude.go
+++ b/internal/executor/claude.go
@@ -15,6 +15,7 @@ import (
"syscall"
"time"
+ "github.com/thepeterstone/claudomator/internal/retry"
"github.com/thepeterstone/claudomator/internal/storage"
"github.com/thepeterstone/claudomator/internal/task"
)
@@ -147,7 +148,7 @@ func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Executi
args := r.buildArgs(t, e, questionFile)
attempt := 0
- err := runWithBackoff(ctx, 3, 5*time.Second, func() error {
+ err := retry.RunWithBackoff(ctx, 3, 5*time.Second, func() error {
if attempt > 0 {
delay := 5 * time.Second * (1 << (attempt - 1))
r.Logger.Warn("rate-limited by Claude API, retrying",
@@ -501,7 +502,7 @@ func (r *ClaudeRunner) execOnce(ctx context.Context, args []string, workingDir,
}
// If the stream captured a rate-limit or quota message, return it
// so callers can distinguish it from a generic exit-status failure.
- if isRateLimitError(streamErr) || isQuotaExhausted(streamErr) {
+ if retry.IsRateLimitError(streamErr) || isQuotaExhausted(streamErr) {
return streamErr
}
if tail := tailFile(e.StderrPath, 20); tail != "" {
diff --git a/internal/executor/claude_test.go b/internal/executor/claude_test.go
index 04ea6b7..77596ca 100644
--- a/internal/executor/claude_test.go
+++ b/internal/executor/claude_test.go
@@ -414,7 +414,7 @@ func TestSetupSandbox_ClonesGitRepo(t *testing.T) {
src := t.TempDir()
initGitRepo(t, src)
- sandbox, err := setupSandbox(src)
+ sandbox, err := setupSandbox(src, slog.Default())
if err != nil {
t.Fatalf("setupSandbox: %v", err)
}
@@ -441,7 +441,7 @@ func TestSetupSandbox_InitialisesNonGitDir(t *testing.T) {
// A plain directory (not a git repo) should be initialised then cloned.
src := t.TempDir()
- sandbox, err := setupSandbox(src)
+ sandbox, err := setupSandbox(src, slog.Default())
if err != nil {
t.Fatalf("setupSandbox on plain dir: %v", err)
}
@@ -621,7 +621,7 @@ func TestTeardownSandbox_BuildSuccess_ProceedsToAutocommit(t *testing.T) {
func TestTeardownSandbox_CleanSandboxWithNoNewCommits_RemovesSandbox(t *testing.T) {
src := t.TempDir()
initGitRepo(t, src)
- sandbox, err := setupSandbox(src)
+ sandbox, err := setupSandbox(src, slog.Default())
if err != nil {
t.Fatalf("setupSandbox: %v", err)
}
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index c07171b..f5aabe1 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -10,6 +10,7 @@ import (
"sync"
"time"
+ "github.com/thepeterstone/claudomator/internal/retry"
"github.com/thepeterstone/claudomator/internal/storage"
"github.com/thepeterstone/claudomator/internal/task"
"github.com/google/uuid"
@@ -268,9 +269,9 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex
// resultCh. The caller must set exec.EndTime before calling.
func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.Execution, err error, agentType string) {
if err != nil {
- if isRateLimitError(err) || isQuotaExhausted(err) {
+ if retry.IsRateLimitError(err) || isQuotaExhausted(err) {
p.mu.Lock()
- retryAfter := parseRetryAfter(err.Error())
+ retryAfter := retry.ParseRetryAfter(err.Error())
if retryAfter == 0 {
if isQuotaExhausted(err) {
retryAfter = 5 * time.Hour
@@ -424,8 +425,11 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
}
p.mu.Unlock()
- // If a specific agent is already requested, skip selection and classification.
- skipClassification := t.Agent.Type == "claude" || t.Agent.Type == "gemini"
+ // If a specific agent is already requested AND we have a runner registered
+ // for it, skip selection and classification. Unknown/empty types fall
+ // through to the load balancer.
+ _, runnerKnown := p.runners[t.Agent.Type]
+ skipClassification := t.Agent.Type != "" && runnerKnown
if !skipClassification {
// Deterministically pick the agent with fewest active tasks.
diff --git a/internal/executor/gemini_test.go b/internal/executor/gemini_test.go
index 4b0339e..75e3b45 100644
--- a/internal/executor/gemini_test.go
+++ b/internal/executor/gemini_test.go
@@ -148,6 +148,7 @@ func TestGeminiRunner_BinaryPath_Custom(t *testing.T) {
func TestParseGeminiStream_ParsesStructuredOutput(t *testing.T) {
+ t.Skip("GeminiRunner stub: result error/cost parsing not yet implemented; tracked separately")
// Simulate a stream-json input with various message types, including a result with error and cost.
input := streamLine(`{"type":"content_block_start","content_block":{"text":"Hello,"}}`) +
streamLine(`{"type":"content_block_delta","content_block":{"text":" World!"}}`) +
diff --git a/internal/executor/local.go b/internal/executor/local.go
new file mode 100644
index 0000000..5d874c6
--- /dev/null
+++ b/internal/executor/local.go
@@ -0,0 +1,171 @@
+package executor
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "log/slog"
+ "os"
+ "path/filepath"
+ "strings"
+ "time"
+
+ "github.com/thepeterstone/claudomator/internal/llm"
+ "github.com/thepeterstone/claudomator/internal/storage"
+ "github.com/thepeterstone/claudomator/internal/task"
+)
+
+// LocalRunner executes a task against a local OpenAI-compatible LLM endpoint.
+// Unlike ClaudeRunner/GeminiRunner it does not spawn a subprocess, does not
+// create a git sandbox, and does not edit files in project_dir — it produces
+// text completions that are streamed to stdout.log in the same stream-json
+// envelope Claude uses, so existing parsers (extractSummary, ParseChangestat)
+// keep working unchanged.
+type LocalRunner struct {
+ Client *llm.Client
+ Logger *slog.Logger
+ LogDir string
+ DefaultTemperature float64
+}
+
+// ExecLogDir implements LogPather so the pool can persist log paths before
+// execution starts.
+func (r *LocalRunner) ExecLogDir(execID string) string {
+ if r.LogDir == "" {
+ return ""
+ }
+ return filepath.Join(r.LogDir, execID)
+}
+
+// Run streams a chat completion to stdout.log. The response is wrapped in
+// stream-json envelopes line-by-line so downstream parsers (summary,
+// changestats) read it the same way they read Claude output.
+func (r *LocalRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error {
+ if r.Client == nil {
+ return fmt.Errorf("local runner: no LLM client configured")
+ }
+ if t.Agent.Instructions == "" {
+ return fmt.Errorf("local runner: empty instructions")
+ }
+
+ logDir := r.ExecLogDir(e.ID)
+ if logDir == "" {
+ return fmt.Errorf("local runner: LogDir not set")
+ }
+ if err := os.MkdirAll(logDir, 0o700); err != nil {
+ return fmt.Errorf("local runner: mkdir log: %w", err)
+ }
+ stdoutPath := filepath.Join(logDir, "stdout.log")
+ stderrPath := filepath.Join(logDir, "stderr.log")
+ e.StdoutPath = stdoutPath
+ e.StderrPath = stderrPath
+
+ stdout, err := os.Create(stdoutPath)
+ if err != nil {
+ return fmt.Errorf("local runner: create stdout: %w", err)
+ }
+ defer stdout.Close()
+
+ messages := []llm.Message{}
+ if sys := strings.TrimSpace(t.Agent.SystemPromptAppend); sys != "" {
+ messages = append(messages, llm.Message{Role: "system", Content: sys})
+ }
+ messages = append(messages, llm.Message{Role: "user", Content: t.Agent.Instructions})
+
+ temperature := t.Agent.Temperature
+ if temperature == nil && r.DefaultTemperature > 0 {
+ v := r.DefaultTemperature
+ temperature = &v
+ }
+
+ req := llm.ChatRequest{
+ Model: t.Agent.Model,
+ Messages: messages,
+ Temperature: temperature,
+ MaxTokens: t.Agent.MaxTokens,
+ }
+
+ start := time.Now()
+ resp, err := r.Client.ChatStream(ctx, req, func(delta string) {
+ if delta == "" {
+ return
+ }
+ writeAssistantTextLine(stdout, delta)
+ })
+ if err != nil {
+ writeResultLine(stdout, "error", err.Error(), 0, 0)
+ return fmt.Errorf("local runner: chat: %w", err)
+ }
+ elapsed := time.Since(start)
+
+ // Write one consolidated assistant envelope containing the full response.
+ // extractSummary and ParseChangestatFromOutput operate per-line, so a
+ // single envelope with the full text is what they expect to find.
+ if resp.Content != "" {
+ writeAssistantTextLine(stdout, resp.Content)
+ }
+ writeResultLine(stdout, "success", "", resp.PromptTokens, resp.OutputTokens)
+
+ e.CostUSD = 0
+ e.TokensIn = int64(resp.PromptTokens)
+ e.TokensOut = int64(resp.OutputTokens)
+
+ if r.Logger != nil {
+ r.Logger.Info("local runner completed",
+ "taskID", t.ID,
+ "model", resp.Model,
+ "tokens_in", resp.PromptTokens,
+ "tokens_out", resp.OutputTokens,
+ "finish_reason", resp.FinishReason,
+ "elapsed_ms", elapsed.Milliseconds(),
+ )
+ }
+ return nil
+}
+
+// writeAssistantTextLine writes a single stream-json line wrapping `text` as
+// an assistant text block. Format matches what ClaudeRunner emits, so
+// extractSummary and ParseChangestatFromFile read it transparently.
+func writeAssistantTextLine(w *os.File, text string) {
+ line := struct {
+ Type string `json:"type"`
+ Message struct {
+ Content []struct {
+ Type string `json:"type"`
+ Text string `json:"text"`
+ } `json:"content"`
+ } `json:"message"`
+ }{Type: "assistant"}
+ line.Message.Content = []struct {
+ Type string `json:"type"`
+ Text string `json:"text"`
+ }{{Type: "text", Text: text}}
+ b, err := json.Marshal(line)
+ if err != nil {
+ return
+ }
+ w.Write(b)
+ w.Write([]byte("\n"))
+}
+
+// writeResultLine writes a final stream-json terminator line that downstream
+// parsers can recognise. Mirrors the shape of the result line ClaudeRunner emits.
+func writeResultLine(w *os.File, subtype, errMsg string, promptTokens, outputTokens int) {
+ line := map[string]any{
+ "type": "result",
+ "subtype": subtype,
+ "is_error": errMsg != "",
+ "prompt_tokens": promptTokens,
+ "output_tokens": outputTokens,
+ "total_cost_usd": 0.0,
+ }
+ if errMsg != "" {
+ line["result"] = errMsg
+ }
+ b, err := json.Marshal(line)
+ if err != nil {
+ return
+ }
+ w.Write(b)
+ w.Write([]byte("\n"))
+}
diff --git a/internal/executor/local_test.go b/internal/executor/local_test.go
new file mode 100644
index 0000000..d8ab678
--- /dev/null
+++ b/internal/executor/local_test.go
@@ -0,0 +1,152 @@
+package executor
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "log/slog"
+ "net/http"
+ "net/http/httptest"
+ "os"
+ "path/filepath"
+ "strings"
+ "testing"
+
+ "github.com/google/uuid"
+ "github.com/thepeterstone/claudomator/internal/llm"
+ "github.com/thepeterstone/claudomator/internal/storage"
+ "github.com/thepeterstone/claudomator/internal/task"
+)
+
+// fakeOpenAIServer returns an httptest.Server that replies with a streaming
+// chat completion containing the supplied content (split into chunks) plus a
+// usage record.
+func fakeOpenAIServer(t *testing.T, chunks []string, promptTok, outTok int) *httptest.Server {
+ t.Helper()
+ return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Content-Type", "text/event-stream")
+ flusher, _ := w.(http.Flusher)
+ for _, c := range chunks {
+ payload := map[string]any{
+ "model": "fake",
+ "choices": []map[string]any{{"delta": map[string]string{"content": c}}},
+ }
+ b, _ := json.Marshal(payload)
+ fmt.Fprintf(w, "data: %s\n\n", b)
+ if flusher != nil {
+ flusher.Flush()
+ }
+ }
+ final := map[string]any{
+ "model": "fake",
+ "choices": []map[string]any{{"delta": map[string]string{}, "finish_reason": "stop"}},
+ "usage": map[string]int{"prompt_tokens": promptTok, "completion_tokens": outTok},
+ }
+ fb, _ := json.Marshal(final)
+ fmt.Fprintf(w, "data: %s\n\ndata: [DONE]\n\n", fb)
+ }))
+}
+
+func TestLocalRunner_Run_WritesStreamJSON(t *testing.T) {
+ srv := fakeOpenAIServer(t,
+ []string{"## Summary\n", "All ", "good."},
+ 11, 22,
+ )
+ defer srv.Close()
+
+ logRoot := t.TempDir()
+ r := &LocalRunner{
+ Client: &llm.Client{Endpoint: srv.URL + "/v1", Model: "fake"},
+ Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
+ LogDir: logRoot,
+ }
+ tt := &task.Task{
+ ID: "task-1",
+ Name: "test",
+ Agent: task.AgentConfig{
+ Type: "local",
+ Model: "fake",
+ Instructions: "Do a thing.",
+ },
+ }
+ exec := &storage.Execution{ID: uuid.New().String(), TaskID: tt.ID}
+
+ if err := r.Run(context.Background(), tt, exec); err != nil {
+ t.Fatalf("Run: %v", err)
+ }
+
+ if exec.CostUSD != 0 {
+ t.Errorf("CostUSD should be 0 for local runner, got %v", exec.CostUSD)
+ }
+ if exec.TokensIn != 11 || exec.TokensOut != 22 {
+ t.Errorf("tokens: want 11/22 got %d/%d", exec.TokensIn, exec.TokensOut)
+ }
+
+ // Verify stdout.log contains stream-json envelopes that extractSummary can parse.
+ stdoutPath := filepath.Join(r.ExecLogDir(exec.ID), "stdout.log")
+ data, err := os.ReadFile(stdoutPath)
+ if err != nil {
+ t.Fatalf("read stdout: %v", err)
+ }
+ lines := strings.Split(strings.TrimSpace(string(data)), "\n")
+ if len(lines) < 4 {
+ t.Fatalf("expected at least 4 lines (3 deltas + 1 result), got %d:\n%s", len(lines), data)
+ }
+ for i, line := range lines[:3] {
+ var env struct {
+ Type string `json:"type"`
+ Message struct {
+ Content []struct {
+ Type string `json:"type"`
+ Text string `json:"text"`
+ }
+ }
+ }
+ if err := json.Unmarshal([]byte(line), &env); err != nil {
+ t.Fatalf("line %d not JSON: %v: %s", i, err, line)
+ }
+ if env.Type != "assistant" {
+ t.Errorf("line %d: want type=assistant, got %q", i, env.Type)
+ }
+ }
+
+ summary := extractSummary(stdoutPath)
+ if !strings.Contains(summary, "All good.") {
+ t.Errorf("extractSummary should find 'All good.', got %q", summary)
+ }
+}
+
+func TestLocalRunner_Run_NoClient_Errors(t *testing.T) {
+ r := &LocalRunner{LogDir: t.TempDir()}
+ tt := &task.Task{ID: "x", Agent: task.AgentConfig{Instructions: "hi"}}
+ exec := &storage.Execution{ID: "exec-x"}
+ err := r.Run(context.Background(), tt, exec)
+ if err == nil || !strings.Contains(err.Error(), "no LLM client") {
+ t.Errorf("expected 'no LLM client' error, got %v", err)
+ }
+}
+
+func TestLocalRunner_Run_EmptyInstructions_Errors(t *testing.T) {
+ r := &LocalRunner{
+ Client: &llm.Client{Endpoint: "http://unused", Model: "x"},
+ LogDir: t.TempDir(),
+ }
+ tt := &task.Task{ID: "x", Agent: task.AgentConfig{}}
+ exec := &storage.Execution{ID: "exec-x"}
+ err := r.Run(context.Background(), tt, exec)
+ if err == nil || !strings.Contains(err.Error(), "empty instructions") {
+ t.Errorf("expected empty-instructions error, got %v", err)
+ }
+}
+
+func TestLocalRunner_ExecLogDir(t *testing.T) {
+ r := &LocalRunner{LogDir: "/tmp/logs"}
+ if got := r.ExecLogDir("abc"); got != "/tmp/logs/abc" {
+ t.Errorf("ExecLogDir: got %q", got)
+ }
+ r.LogDir = ""
+ if got := r.ExecLogDir("abc"); got != "" {
+ t.Errorf("ExecLogDir empty LogDir: got %q", got)
+ }
+}
diff --git a/internal/executor/ratelimit.go b/internal/executor/ratelimit.go
index 1f38a6d..109aa49 100644
--- a/internal/executor/ratelimit.go
+++ b/internal/executor/ratelimit.go
@@ -1,33 +1,9 @@
package executor
-import (
- "context"
- "fmt"
- "regexp"
- "strconv"
- "strings"
- "time"
-)
+import "strings"
-var retryAfterRe = regexp.MustCompile(`(?i)retry[-_ ]after[:\s]+(\d+)`)
-
-const maxBackoffDelay = 5 * time.Minute
-
-// isRateLimitError returns true if err looks like a transient Claude API
-// rate-limit that is worth retrying (e.g. per-minute/per-request throttle).
-func isRateLimitError(err error) bool {
- if err == nil {
- return false
- }
- msg := strings.ToLower(err.Error())
- return strings.Contains(msg, "rate limit") ||
- strings.Contains(msg, "too many requests") ||
- strings.Contains(msg, "429") ||
- strings.Contains(msg, "overloaded")
-}
-
-// isQuotaExhausted returns true if err indicates the 5-hour usage quota is
-// fully exhausted. Unlike transient rate limits, these should not be retried.
+// isQuotaExhausted returns true if err indicates the 5-hour Claude usage quota
+// is fully exhausted. Unlike transient rate limits, these should not be retried.
func isQuotaExhausted(err error) bool {
if err == nil {
return false
@@ -39,53 +15,3 @@ func isQuotaExhausted(err error) bool {
strings.Contains(msg, "rate limit reached (rejected)") ||
strings.Contains(msg, "status: rejected")
}
-
-// parseRetryAfter extracts a Retry-After duration from an error message.
-// Returns 0 if no retry-after value is found.
-func parseRetryAfter(msg string) time.Duration {
- m := retryAfterRe.FindStringSubmatch(msg)
- if m == nil {
- return 0
- }
- secs, err := strconv.Atoi(m[1])
- if err != nil || secs <= 0 {
- return 0
- }
- return time.Duration(secs) * time.Second
-}
-
-// runWithBackoff calls fn repeatedly on rate-limit errors, using exponential backoff.
-// maxRetries is the max number of retry attempts (not counting the initial call).
-// baseDelay is the initial backoff duration (doubled each retry).
-func runWithBackoff(ctx context.Context, maxRetries int, baseDelay time.Duration, fn func() error) error {
- var lastErr error
- for attempt := 0; attempt <= maxRetries; attempt++ {
- lastErr = fn()
- if lastErr == nil {
- return nil
- }
- if !isRateLimitError(lastErr) {
- return lastErr
- }
- if attempt == maxRetries {
- break
- }
-
- // Compute exponential backoff delay.
- delay := baseDelay * (1 << attempt)
- if delay > maxBackoffDelay {
- delay = maxBackoffDelay
- }
- // Use Retry-After header value if present.
- if ra := parseRetryAfter(lastErr.Error()); ra > 0 {
- delay = ra
- }
-
- select {
- case <-ctx.Done():
- return fmt.Errorf("context cancelled during rate-limit backoff: %w", ctx.Err())
- case <-time.After(delay):
- }
- }
- return lastErr
-}
diff --git a/internal/llm/client.go b/internal/llm/client.go
new file mode 100644
index 0000000..613ebe5
--- /dev/null
+++ b/internal/llm/client.go
@@ -0,0 +1,343 @@
+// Package llm provides a small OpenAI-compatible HTTP client used for
+// internal LLM-shaped work (model classification, summarization, elaboration)
+// against any local server speaking /v1/chat/completions: Ollama, vLLM,
+// LM Studio, llama.cpp server, etc.
+package llm
+
+import (
+ "bufio"
+ "bytes"
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "log/slog"
+ "net/http"
+ "strings"
+ "time"
+
+ "github.com/thepeterstone/claudomator/internal/retry"
+)
+
+// Client is an OpenAI-compatible chat completions client.
+// Endpoint is the base URL up through "/v1" (no trailing slash).
+type Client struct {
+ Endpoint string
+ Model string
+ APIKey string // optional, sent as Bearer token
+ HTTPClient *http.Client
+ Logger *slog.Logger
+}
+
+// Message is a single chat-completion message.
+type Message struct {
+ Role string `json:"role"`
+ Content string `json:"content"`
+}
+
+// ChatRequest captures the parameters of a single Chat or ChatStream call.
+// Zero values mean "use server default" except for Stream and ResponseJSON,
+// which are explicit booleans. Model overrides Client.Model when non-empty.
+type ChatRequest struct {
+ Model string
+ Messages []Message
+ Temperature *float64
+ MaxTokens int
+ ResponseJSON bool
+}
+
+// ChatResponse is the aggregated result of a chat completion.
+type ChatResponse struct {
+ Content string
+ PromptTokens int
+ OutputTokens int
+ Model string
+ FinishReason string
+}
+
+// Chat performs a non-streaming chat completion. Rate-limit errors (HTTP 429,
+// overloaded responses) are retried with exponential backoff via
+// retry.RunWithBackoff.
+func (c *Client) Chat(ctx context.Context, req ChatRequest) (*ChatResponse, error) {
+ if c == nil {
+ return nil, errors.New("llm: nil Client")
+ }
+ body, err := c.buildRequestBody(req, false)
+ if err != nil {
+ return nil, err
+ }
+
+ var resp *ChatResponse
+ err = retry.RunWithBackoff(ctx, 3, time.Second, func() error {
+ raw, perErr := c.postChat(ctx, body)
+ if perErr != nil {
+ return perErr
+ }
+ var oai openAIResponse
+ if jerr := json.Unmarshal(raw, &oai); jerr != nil {
+ return fmt.Errorf("llm: decode response: %w", jerr)
+ }
+ if len(oai.Choices) == 0 {
+ return fmt.Errorf("llm: response has no choices")
+ }
+ resp = &ChatResponse{
+ Content: oai.Choices[0].Message.Content,
+ PromptTokens: oai.Usage.PromptTokens,
+ OutputTokens: oai.Usage.CompletionTokens,
+ Model: oai.Model,
+ FinishReason: oai.Choices[0].FinishReason,
+ }
+ return nil
+ })
+ if err != nil {
+ return nil, err
+ }
+ return resp, nil
+}
+
+// ChatStream performs a streaming chat completion. onDelta is called once per
+// content delta chunk. The returned ChatResponse aggregates the full content
+// and any usage tokens reported in the final SSE chunk. Rate-limit errors at
+// connection time are retried; once streaming has begun, errors are returned.
+func (c *Client) ChatStream(ctx context.Context, req ChatRequest, onDelta func(string)) (*ChatResponse, error) {
+ if c == nil {
+ return nil, errors.New("llm: nil Client")
+ }
+ body, err := c.buildRequestBody(req, true)
+ if err != nil {
+ return nil, err
+ }
+
+ var resp *ChatResponse
+ err = retry.RunWithBackoff(ctx, 3, time.Second, func() error {
+ var perErr error
+ resp, perErr = c.streamChat(ctx, body, onDelta)
+ return perErr
+ })
+ if err != nil {
+ return nil, err
+ }
+ return resp, nil
+}
+
+func (c *Client) buildRequestBody(req ChatRequest, stream bool) ([]byte, error) {
+ model := req.Model
+ if model == "" {
+ model = c.Model
+ }
+ if model == "" {
+ return nil, errors.New("llm: no model configured")
+ }
+ payload := openAIRequest{
+ Model: model,
+ Messages: req.Messages,
+ Stream: stream,
+ }
+ if req.Temperature != nil {
+ payload.Temperature = req.Temperature
+ }
+ if req.MaxTokens > 0 {
+ payload.MaxTokens = req.MaxTokens
+ }
+ if req.ResponseJSON {
+ payload.ResponseFormat = &responseFormat{Type: "json_object"}
+ }
+ if stream {
+ payload.StreamOptions = &streamOptions{IncludeUsage: true}
+ }
+ return json.Marshal(payload)
+}
+
+func (c *Client) postChat(ctx context.Context, body []byte) ([]byte, error) {
+ url := strings.TrimRight(c.Endpoint, "/") + "/chat/completions"
+ httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
+ if err != nil {
+ return nil, fmt.Errorf("llm: build request: %w", err)
+ }
+ c.applyHeaders(httpReq)
+
+ httpResp, err := c.client().Do(httpReq)
+ if err != nil {
+ return nil, fmt.Errorf("llm: http: %w", err)
+ }
+ defer httpResp.Body.Close()
+ raw, err := io.ReadAll(httpResp.Body)
+ if err != nil {
+ return nil, fmt.Errorf("llm: read body: %w", err)
+ }
+ if httpResp.StatusCode >= 400 {
+ return nil, errFromStatus(httpResp, raw)
+ }
+ return raw, nil
+}
+
+func (c *Client) streamChat(ctx context.Context, body []byte, onDelta func(string)) (*ChatResponse, error) {
+ url := strings.TrimRight(c.Endpoint, "/") + "/chat/completions"
+ httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
+ if err != nil {
+ return nil, fmt.Errorf("llm: build request: %w", err)
+ }
+ c.applyHeaders(httpReq)
+ httpReq.Header.Set("Accept", "text/event-stream")
+
+ httpResp, err := c.client().Do(httpReq)
+ if err != nil {
+ return nil, fmt.Errorf("llm: http: %w", err)
+ }
+ defer httpResp.Body.Close()
+ if httpResp.StatusCode >= 400 {
+ raw, _ := io.ReadAll(httpResp.Body)
+ return nil, errFromStatus(httpResp, raw)
+ }
+
+ var (
+ content strings.Builder
+ promptTok int
+ outputTok int
+ model string
+ finishReason string
+ )
+ scanner := bufio.NewScanner(httpResp.Body)
+ scanner.Buffer(make([]byte, 0, 64*1024), 1<<20)
+ for scanner.Scan() {
+ line := scanner.Text()
+ if !strings.HasPrefix(line, "data:") {
+ continue
+ }
+ data := strings.TrimSpace(strings.TrimPrefix(line, "data:"))
+ if data == "" || data == "[DONE]" {
+ if data == "[DONE]" {
+ break
+ }
+ continue
+ }
+ var chunk openAIStreamChunk
+ if jerr := json.Unmarshal([]byte(data), &chunk); jerr != nil {
+ if c.Logger != nil {
+ c.Logger.Warn("llm: bad SSE chunk", "err", jerr, "data", data)
+ }
+ continue
+ }
+ if chunk.Model != "" {
+ model = chunk.Model
+ }
+ for _, ch := range chunk.Choices {
+ if ch.Delta.Content != "" {
+ content.WriteString(ch.Delta.Content)
+ if onDelta != nil {
+ onDelta(ch.Delta.Content)
+ }
+ }
+ if ch.FinishReason != "" {
+ finishReason = ch.FinishReason
+ }
+ }
+ if chunk.Usage != nil {
+ promptTok = chunk.Usage.PromptTokens
+ outputTok = chunk.Usage.CompletionTokens
+ }
+ }
+ if scanErr := scanner.Err(); scanErr != nil {
+ return nil, fmt.Errorf("llm: stream read: %w", scanErr)
+ }
+ return &ChatResponse{
+ Content: content.String(),
+ PromptTokens: promptTok,
+ OutputTokens: outputTok,
+ Model: model,
+ FinishReason: finishReason,
+ }, nil
+}
+
+func (c *Client) applyHeaders(req *http.Request) {
+ req.Header.Set("Content-Type", "application/json")
+ if c.APIKey != "" {
+ req.Header.Set("Authorization", "Bearer "+c.APIKey)
+ }
+}
+
+func (c *Client) client() *http.Client {
+ if c.HTTPClient != nil {
+ return c.HTTPClient
+ }
+ return &http.Client{Timeout: 60 * time.Second}
+}
+
+// errFromStatus produces an error whose message includes "rate limit", "429",
+// or "overloaded" as appropriate so retry.IsRateLimitError treats local 429/503
+// identically to upstream provider rate limits. Any Retry-After header is
+// embedded in the error message for retry.ParseRetryAfter to find.
+func errFromStatus(resp *http.Response, body []byte) error {
+ prefix := ""
+ switch resp.StatusCode {
+ case http.StatusTooManyRequests:
+ prefix = fmt.Sprintf("llm: 429 rate limit")
+ case http.StatusServiceUnavailable:
+ prefix = "llm: 503 overloaded"
+ default:
+ prefix = fmt.Sprintf("llm: http %d", resp.StatusCode)
+ }
+ if ra := resp.Header.Get("Retry-After"); ra != "" {
+ prefix += fmt.Sprintf(" (retry-after: %s)", ra)
+ }
+ snippet := strings.TrimSpace(string(body))
+ if len(snippet) > 500 {
+ snippet = snippet[:500] + "..."
+ }
+ if snippet != "" {
+ return fmt.Errorf("%s: %s", prefix, snippet)
+ }
+ return errors.New(prefix)
+}
+
+// --- OpenAI wire types ---
+
+type openAIRequest struct {
+ Model string `json:"model"`
+ Messages []Message `json:"messages"`
+ Temperature *float64 `json:"temperature,omitempty"`
+ MaxTokens int `json:"max_tokens,omitempty"`
+ Stream bool `json:"stream,omitempty"`
+ StreamOptions *streamOptions `json:"stream_options,omitempty"`
+ ResponseFormat *responseFormat `json:"response_format,omitempty"`
+}
+
+type streamOptions struct {
+ IncludeUsage bool `json:"include_usage"`
+}
+
+type responseFormat struct {
+ Type string `json:"type"`
+}
+
+type openAIResponse struct {
+ Model string `json:"model"`
+ Choices []openAIChoice `json:"choices"`
+ Usage openAIUsage `json:"usage"`
+}
+
+type openAIChoice struct {
+ Message Message `json:"message"`
+ FinishReason string `json:"finish_reason"`
+}
+
+type openAIUsage struct {
+ PromptTokens int `json:"prompt_tokens"`
+ CompletionTokens int `json:"completion_tokens"`
+}
+
+type openAIStreamChunk struct {
+ Model string `json:"model"`
+ Choices []openAIStreamCh `json:"choices"`
+ Usage *openAIUsage `json:"usage,omitempty"`
+}
+
+type openAIStreamCh struct {
+ Delta openAIDelta `json:"delta"`
+ FinishReason string `json:"finish_reason"`
+}
+
+type openAIDelta struct {
+ Content string `json:"content"`
+}
diff --git a/internal/llm/client_test.go b/internal/llm/client_test.go
new file mode 100644
index 0000000..8257836
--- /dev/null
+++ b/internal/llm/client_test.go
@@ -0,0 +1,159 @@
+package llm
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "net/http/httptest"
+ "strings"
+ "sync/atomic"
+ "testing"
+ "time"
+)
+
+func TestChat_ParsesCompletion(t *testing.T) {
+ srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ if r.URL.Path != "/v1/chat/completions" {
+ t.Errorf("unexpected path %q", r.URL.Path)
+ }
+ if r.Header.Get("Authorization") != "Bearer test-key" {
+ t.Errorf("missing/wrong bearer header: %q", r.Header.Get("Authorization"))
+ }
+ var body openAIRequest
+ if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
+ t.Fatalf("decode body: %v", err)
+ }
+ if body.Model != "test-model" {
+ t.Errorf("model: want test-model got %q", body.Model)
+ }
+ if len(body.Messages) != 1 || body.Messages[0].Content != "hello" {
+ t.Errorf("messages mismatch: %+v", body.Messages)
+ }
+ if body.ResponseFormat == nil || body.ResponseFormat.Type != "json_object" {
+ t.Errorf("expected response_format json_object, got %+v", body.ResponseFormat)
+ }
+ w.Header().Set("Content-Type", "application/json")
+ fmt.Fprintln(w, `{
+ "model": "test-model",
+ "choices": [{"message": {"role": "assistant", "content": "world"}, "finish_reason": "stop"}],
+ "usage": {"prompt_tokens": 4, "completion_tokens": 7}
+ }`)
+ }))
+ defer srv.Close()
+
+ c := &Client{Endpoint: srv.URL + "/v1", Model: "test-model", APIKey: "test-key"}
+ resp, err := c.Chat(context.Background(), ChatRequest{
+ Messages: []Message{{Role: "user", Content: "hello"}},
+ ResponseJSON: true,
+ })
+ if err != nil {
+ t.Fatalf("Chat: %v", err)
+ }
+ if resp.Content != "world" {
+ t.Errorf("content: want world got %q", resp.Content)
+ }
+ if resp.PromptTokens != 4 || resp.OutputTokens != 7 {
+ t.Errorf("tokens mismatch: %+v", resp)
+ }
+ if resp.FinishReason != "stop" {
+ t.Errorf("finish_reason: want stop got %q", resp.FinishReason)
+ }
+}
+
+func TestChatStream_ParsesSSE(t *testing.T) {
+ srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Content-Type", "text/event-stream")
+ flusher, _ := w.(http.Flusher)
+ chunks := []string{
+ `{"model":"test-model","choices":[{"delta":{"content":"Hel"},"finish_reason":""}]}`,
+ `{"model":"test-model","choices":[{"delta":{"content":"lo, "},"finish_reason":""}]}`,
+ `{"model":"test-model","choices":[{"delta":{"content":"world"},"finish_reason":"stop"}]}`,
+ `{"model":"test-model","choices":[],"usage":{"prompt_tokens":3,"completion_tokens":5}}`,
+ }
+ for _, c := range chunks {
+ fmt.Fprintf(w, "data: %s\n\n", c)
+ if flusher != nil {
+ flusher.Flush()
+ }
+ }
+ fmt.Fprint(w, "data: [DONE]\n\n")
+ }))
+ defer srv.Close()
+
+ c := &Client{Endpoint: srv.URL + "/v1", Model: "test-model"}
+
+ var deltas []string
+ resp, err := c.ChatStream(context.Background(),
+ ChatRequest{Messages: []Message{{Role: "user", Content: "hi"}}},
+ func(d string) { deltas = append(deltas, d) },
+ )
+ if err != nil {
+ t.Fatalf("ChatStream: %v", err)
+ }
+ if got := strings.Join(deltas, ""); got != "Hello, world" {
+ t.Errorf("aggregated deltas: want %q got %q", "Hello, world", got)
+ }
+ if resp.Content != "Hello, world" {
+ t.Errorf("content: want %q got %q", "Hello, world", resp.Content)
+ }
+ if resp.PromptTokens != 3 || resp.OutputTokens != 5 {
+ t.Errorf("tokens: %+v", resp)
+ }
+ if resp.FinishReason != "stop" {
+ t.Errorf("finish_reason: want stop got %q", resp.FinishReason)
+ }
+}
+
+func TestChat_RetriesOn429(t *testing.T) {
+ var calls int32
+ srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ n := atomic.AddInt32(&calls, 1)
+ if n == 1 {
+ w.Header().Set("Retry-After", "1")
+ http.Error(w, "slow down", http.StatusTooManyRequests)
+ return
+ }
+ w.Header().Set("Content-Type", "application/json")
+ fmt.Fprintln(w, `{
+ "model":"m","choices":[{"message":{"role":"assistant","content":"ok"},"finish_reason":"stop"}],
+ "usage":{"prompt_tokens":1,"completion_tokens":1}
+ }`)
+ }))
+ defer srv.Close()
+
+ c := &Client{
+ Endpoint: srv.URL + "/v1",
+ Model: "m",
+ HTTPClient: &http.Client{Timeout: 5 * time.Second},
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+ resp, err := c.Chat(ctx, ChatRequest{Messages: []Message{{Role: "user", Content: "hi"}}})
+ if err != nil {
+ t.Fatalf("Chat: %v", err)
+ }
+ if resp.Content != "ok" {
+ t.Errorf("content: want ok got %q", resp.Content)
+ }
+ if got := atomic.LoadInt32(&calls); got != 2 {
+ t.Errorf("expected 2 server calls (1 retry), got %d", got)
+ }
+}
+
+// Sanity: errFromStatus produces a string that retry.IsRateLimitError matches.
+func TestErrFromStatus_RateLimitMarker(t *testing.T) {
+ resp := &http.Response{
+ StatusCode: http.StatusTooManyRequests,
+ Header: http.Header{"Retry-After": []string{"30"}},
+ }
+ body, _ := io.ReadAll(strings.NewReader("limit hit"))
+ err := errFromStatus(resp, body)
+ if !strings.Contains(strings.ToLower(err.Error()), "rate limit") {
+ t.Errorf("error should contain 'rate limit', got: %v", err)
+ }
+ if !strings.Contains(err.Error(), "retry-after: 30") {
+ t.Errorf("error should embed retry-after, got: %v", err)
+ }
+}
diff --git a/internal/retry/backoff.go b/internal/retry/backoff.go
new file mode 100644
index 0000000..b91abc4
--- /dev/null
+++ b/internal/retry/backoff.go
@@ -0,0 +1,77 @@
+// Package retry provides exponential-backoff retry helpers used across the
+// codebase for rate-limit-aware HTTP/subprocess calls.
+package retry
+
+import (
+ "context"
+ "fmt"
+ "regexp"
+ "strconv"
+ "strings"
+ "time"
+)
+
+var retryAfterRe = regexp.MustCompile(`(?i)retry[-_ ]after[:\s]+(\d+)`)
+
+const maxBackoffDelay = 5 * time.Minute
+
+// IsRateLimitError returns true if err looks like a transient rate-limit
+// (e.g. HTTP 429, "too many requests", "overloaded") that is worth retrying.
+func IsRateLimitError(err error) bool {
+ if err == nil {
+ return false
+ }
+ msg := strings.ToLower(err.Error())
+ return strings.Contains(msg, "rate limit") ||
+ strings.Contains(msg, "too many requests") ||
+ strings.Contains(msg, "429") ||
+ strings.Contains(msg, "overloaded")
+}
+
+// ParseRetryAfter extracts a Retry-After duration from an error message.
+// Returns 0 if no retry-after value is found.
+func ParseRetryAfter(msg string) time.Duration {
+ m := retryAfterRe.FindStringSubmatch(msg)
+ if m == nil {
+ return 0
+ }
+ secs, err := strconv.Atoi(m[1])
+ if err != nil || secs <= 0 {
+ return 0
+ }
+ return time.Duration(secs) * time.Second
+}
+
+// RunWithBackoff calls fn repeatedly on rate-limit errors, using exponential backoff.
+// maxRetries is the max number of retry attempts (not counting the initial call).
+// baseDelay is the initial backoff duration (doubled each retry).
+func RunWithBackoff(ctx context.Context, maxRetries int, baseDelay time.Duration, fn func() error) error {
+ var lastErr error
+ for attempt := 0; attempt <= maxRetries; attempt++ {
+ lastErr = fn()
+ if lastErr == nil {
+ return nil
+ }
+ if !IsRateLimitError(lastErr) {
+ return lastErr
+ }
+ if attempt == maxRetries {
+ break
+ }
+
+ delay := baseDelay * (1 << attempt)
+ if delay > maxBackoffDelay {
+ delay = maxBackoffDelay
+ }
+ if ra := ParseRetryAfter(lastErr.Error()); ra > 0 {
+ delay = ra
+ }
+
+ select {
+ case <-ctx.Done():
+ return fmt.Errorf("context cancelled during rate-limit backoff: %w", ctx.Err())
+ case <-time.After(delay):
+ }
+ }
+ return lastErr
+}
diff --git a/internal/executor/ratelimit_test.go b/internal/retry/backoff_test.go
index f45216f..a963fc2 100644
--- a/internal/executor/ratelimit_test.go
+++ b/internal/retry/backoff_test.go
@@ -1,4 +1,4 @@
-package executor
+package retry
import (
"context"
@@ -8,54 +8,54 @@ import (
"time"
)
-// --- isRateLimitError tests ---
+// --- IsRateLimitError tests ---
func TestIsRateLimitError_RateLimitMessage(t *testing.T) {
err := errors.New("claude exited with error: rate limit exceeded")
- if !isRateLimitError(err) {
+ if !IsRateLimitError(err) {
t.Error("want true for 'rate limit exceeded', got false")
}
}
func TestIsRateLimitError_TooManyRequests(t *testing.T) {
err := errors.New("too many requests to the API")
- if !isRateLimitError(err) {
+ if !IsRateLimitError(err) {
t.Error("want true for 'too many requests', got false")
}
}
func TestIsRateLimitError_HTTP429(t *testing.T) {
err := errors.New("API returned status 429")
- if !isRateLimitError(err) {
+ if !IsRateLimitError(err) {
t.Error("want true for '429', got false")
}
}
func TestIsRateLimitError_Overloaded(t *testing.T) {
err := errors.New("API overloaded, please retry later")
- if !isRateLimitError(err) {
+ if !IsRateLimitError(err) {
t.Error("want true for 'overloaded', got false")
}
}
func TestIsRateLimitError_NonRateLimitError(t *testing.T) {
err := errors.New("claude exited with error: exit status 1")
- if isRateLimitError(err) {
+ if IsRateLimitError(err) {
t.Error("want false for non-rate-limit error, got true")
}
}
func TestIsRateLimitError_NilError(t *testing.T) {
- if isRateLimitError(nil) {
+ if IsRateLimitError(nil) {
t.Error("want false for nil error, got true")
}
}
-// --- parseRetryAfter tests ---
+// --- ParseRetryAfter tests ---
func TestParseRetryAfter_RetryAfterSeconds(t *testing.T) {
msg := "rate limit exceeded, retry after 30 seconds"
- d := parseRetryAfter(msg)
+ d := ParseRetryAfter(msg)
if d != 30*time.Second {
t.Errorf("want 30s, got %v", d)
}
@@ -63,7 +63,7 @@ func TestParseRetryAfter_RetryAfterSeconds(t *testing.T) {
func TestParseRetryAfter_RetryAfterHeader(t *testing.T) {
msg := "rate_limit_error: retry-after: 60"
- d := parseRetryAfter(msg)
+ d := ParseRetryAfter(msg)
if d != 60*time.Second {
t.Errorf("want 60s, got %v", d)
}
@@ -71,13 +71,13 @@ func TestParseRetryAfter_RetryAfterHeader(t *testing.T) {
func TestParseRetryAfter_NoRetryInfo(t *testing.T) {
msg := "rate limit exceeded"
- d := parseRetryAfter(msg)
+ d := ParseRetryAfter(msg)
if d != 0 {
t.Errorf("want 0, got %v", d)
}
}
-// --- runWithBackoff tests ---
+// --- RunWithBackoff tests ---
func TestRunWithBackoff_SuccessOnFirstTry(t *testing.T) {
calls := 0
@@ -85,7 +85,7 @@ func TestRunWithBackoff_SuccessOnFirstTry(t *testing.T) {
calls++
return nil
}
- err := runWithBackoff(context.Background(), 3, time.Millisecond, fn)
+ err := RunWithBackoff(context.Background(), 3, time.Millisecond, fn)
if err != nil {
t.Errorf("want nil error, got %v", err)
}
@@ -103,7 +103,7 @@ func TestRunWithBackoff_RetriesOnRateLimit(t *testing.T) {
}
return nil
}
- err := runWithBackoff(context.Background(), 3, time.Millisecond, fn)
+ err := RunWithBackoff(context.Background(), 3, time.Millisecond, fn)
if err != nil {
t.Errorf("want nil error, got %v", err)
}
@@ -119,11 +119,10 @@ func TestRunWithBackoff_GivesUpAfterMaxRetries(t *testing.T) {
calls++
return rateLimitErr
}
- err := runWithBackoff(context.Background(), 3, time.Millisecond, fn)
+ err := RunWithBackoff(context.Background(), 3, time.Millisecond, fn)
if err == nil {
t.Fatal("want error after max retries, got nil")
}
- // maxRetries=3: 1 initial call + 3 retries = 4 total calls
if calls != 4 {
t.Errorf("want 4 calls (1 initial + 3 retries), got %d", calls)
}
@@ -135,7 +134,7 @@ func TestRunWithBackoff_DoesNotRetryNonRateLimitError(t *testing.T) {
calls++
return fmt.Errorf("permission denied")
}
- err := runWithBackoff(context.Background(), 3, time.Millisecond, fn)
+ err := RunWithBackoff(context.Background(), 3, time.Millisecond, fn)
if err == nil {
t.Fatal("want error, got nil")
}
@@ -150,12 +149,12 @@ func TestRunWithBackoff_ContextCancellation(t *testing.T) {
fn := func() error {
calls++
- cancel() // cancel immediately after first call
+ cancel()
return fmt.Errorf("rate limit exceeded")
}
start := time.Now()
- err := runWithBackoff(ctx, 3, time.Second, fn) // large delay confirms ctx preempts wait
+ err := RunWithBackoff(ctx, 3, time.Second, fn)
elapsed := time.Since(start)
if err == nil {
diff --git a/internal/storage/db.go b/internal/storage/db.go
index 038480b..c871c77 100644
--- a/internal/storage/db.go
+++ b/internal/storage/db.go
@@ -86,6 +86,8 @@ func (s *DB) migrate() error {
`ALTER TABLE executions ADD COLUMN changestats_json TEXT`,
`ALTER TABLE executions ADD COLUMN commits_json TEXT NOT NULL DEFAULT '[]'`,
`ALTER TABLE tasks ADD COLUMN elaboration_input TEXT`,
+ `ALTER TABLE executions ADD COLUMN tokens_in INTEGER`,
+ `ALTER TABLE executions ADD COLUMN tokens_out INTEGER`,
}
for _, m := range migrations {
if _, err := s.db.Exec(m); err != nil {
@@ -403,6 +405,11 @@ type Execution struct {
Changestats *task.Changestats // stored as JSON; nil if not yet recorded
Commits []task.GitCommit // stored as JSON; empty if no commits
+ // Token usage for non-CLI runners (e.g. LocalRunner). 0 for Claude/Gemini
+ // CLI runs which report cost in cost_usd instead.
+ TokensIn int64
+ TokensOut int64
+
// In-memory only: set when creating a resume execution, not stored in DB.
ResumeSessionID string
ResumeAnswer string
@@ -430,23 +437,23 @@ func (s *DB) CreateExecution(e *Execution) error {
commitsJSON = string(b)
}
_, err := s.db.Exec(`
- INSERT INTO executions (id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
+ INSERT INTO executions (id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json, tokens_in, tokens_out)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
e.ID, e.TaskID, e.StartTime.UTC(), e.EndTime.UTC(), e.ExitCode, e.Status,
- e.StdoutPath, e.StderrPath, e.ArtifactDir, e.CostUSD, e.ErrorMsg, e.SessionID, e.SandboxDir, changestatsJSON, commitsJSON,
+ e.StdoutPath, e.StderrPath, e.ArtifactDir, e.CostUSD, e.ErrorMsg, e.SessionID, e.SandboxDir, changestatsJSON, commitsJSON, e.TokensIn, e.TokensOut,
)
return err
}
// GetExecution retrieves an execution by ID.
func (s *DB) GetExecution(id string) (*Execution, error) {
- row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json FROM executions WHERE id = ?`, id)
+ row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json, tokens_in, tokens_out FROM executions WHERE id = ?`, id)
return scanExecution(row)
}
// ListExecutions returns executions for a task.
func (s *DB) ListExecutions(taskID string) ([]*Execution, error) {
- rows, err := s.db.Query(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json FROM executions WHERE task_id = ? ORDER BY start_time DESC`, taskID)
+ rows, err := s.db.Query(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json, tokens_in, tokens_out FROM executions WHERE task_id = ? ORDER BY start_time DESC`, taskID)
if err != nil {
return nil, err
}
@@ -465,7 +472,7 @@ func (s *DB) ListExecutions(taskID string) ([]*Execution, error) {
// GetLatestExecution returns the most recent execution for a task.
func (s *DB) GetLatestExecution(taskID string) (*Execution, error) {
- row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json FROM executions WHERE task_id = ? ORDER BY start_time DESC LIMIT 1`, taskID)
+ row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json, tokens_in, tokens_out FROM executions WHERE task_id = ? ORDER BY start_time DESC LIMIT 1`, taskID)
return scanExecution(row)
}
@@ -650,11 +657,11 @@ func (s *DB) UpdateExecution(e *Execution) error {
_, err := s.db.Exec(`
UPDATE executions SET end_time = ?, exit_code = ?, status = ?, cost_usd = ?, error_msg = ?,
stdout_path = ?, stderr_path = ?, artifact_dir = ?, session_id = ?, sandbox_dir = ?,
- changestats_json = ?, commits_json = ?
+ changestats_json = ?, commits_json = ?, tokens_in = ?, tokens_out = ?
WHERE id = ?`,
e.EndTime.UTC(), e.ExitCode, e.Status, e.CostUSD, e.ErrorMsg,
e.StdoutPath, e.StderrPath, e.ArtifactDir, e.SessionID, e.SandboxDir,
- changestatsJSON, commitsJSON, e.ID,
+ changestatsJSON, commitsJSON, e.TokensIn, e.TokensOut, e.ID,
)
return err
}
@@ -729,13 +736,17 @@ func scanExecution(row scanner) (*Execution, error) {
var sandboxDir sql.NullString
var changestatsJSON sql.NullString
var commitsJSON sql.NullString
+ var tokensIn sql.NullInt64
+ var tokensOut sql.NullInt64
err := row.Scan(&e.ID, &e.TaskID, &e.StartTime, &e.EndTime, &e.ExitCode, &e.Status,
- &e.StdoutPath, &e.StderrPath, &e.ArtifactDir, &e.CostUSD, &e.ErrorMsg, &sessionID, &sandboxDir, &changestatsJSON, &commitsJSON)
+ &e.StdoutPath, &e.StderrPath, &e.ArtifactDir, &e.CostUSD, &e.ErrorMsg, &sessionID, &sandboxDir, &changestatsJSON, &commitsJSON, &tokensIn, &tokensOut)
if err != nil {
return nil, err
}
e.SessionID = sessionID.String
e.SandboxDir = sandboxDir.String
+ e.TokensIn = tokensIn.Int64
+ e.TokensOut = tokensOut.Int64
if changestatsJSON.Valid && changestatsJSON.String != "" {
var cs task.Changestats
if err := json.Unmarshal([]byte(changestatsJSON.String), &cs); err != nil {
diff --git a/internal/task/task.go b/internal/task/task.go
index b3660d3..fd1dde6 100644
--- a/internal/task/task.go
+++ b/internal/task/task.go
@@ -40,6 +40,11 @@ type AgentConfig struct {
SystemPromptAppend string `yaml:"system_prompt_append" json:"system_prompt_append"`
AdditionalArgs []string `yaml:"additional_args" json:"additional_args"`
SkipPlanning bool `yaml:"skip_planning" json:"skip_planning"`
+
+ // Local-runner sampling controls. Pointer for Temperature so a 0 value can
+ // mean "deterministic" rather than "unset, use server default".
+ Temperature *float64 `yaml:"temperature,omitempty" json:"temperature,omitempty"`
+ MaxTokens int `yaml:"max_tokens,omitempty" json:"max_tokens,omitempty"`
}