summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
Diffstat (limited to 'internal')
-rw-r--r--internal/api/elaborate.go60
-rw-r--r--internal/api/elaborate_local_test.go214
-rw-r--r--internal/api/server.go10
-rw-r--r--internal/api/webhook.go15
-rw-r--r--internal/api/webhook_llm.go127
-rw-r--r--internal/api/webhook_llm_test.go228
-rw-r--r--internal/cli/llm.go31
-rw-r--r--internal/cli/run.go19
-rw-r--r--internal/cli/serve.go24
-rw-r--r--internal/config/config.go68
-rw-r--r--internal/config/config_test.go30
-rw-r--r--internal/executor/classifier.go33
-rw-r--r--internal/executor/classifier_test.go76
-rw-r--r--internal/executor/claude.go552
-rw-r--r--internal/executor/claude_test.go810
-rw-r--r--internal/executor/container_test.go2
-rw-r--r--internal/executor/executor.go101
-rw-r--r--internal/executor/executor_test.go17
-rw-r--r--internal/executor/gemini.go346
-rw-r--r--internal/executor/gemini_test.go447
-rw-r--r--internal/executor/helpers.go6
-rw-r--r--internal/executor/local.go171
-rw-r--r--internal/executor/local_test.go152
-rw-r--r--internal/executor/ratelimit.go80
-rw-r--r--internal/executor/summary.go95
-rw-r--r--internal/executor/summary_synth_test.go241
-rw-r--r--internal/llm/client.go343
-rw-r--r--internal/llm/client_test.go159
-rw-r--r--internal/retry/backoff.go77
-rw-r--r--internal/retry/backoff_test.go (renamed from internal/executor/ratelimit_test.go)69
-rw-r--r--internal/storage/db.go34
-rw-r--r--internal/task/task.go7
32 files changed, 4416 insertions, 228 deletions
diff --git a/internal/api/elaborate.go b/internal/api/elaborate.go
index 0cb298d..8676b36 100644
--- a/internal/api/elaborate.go
+++ b/internal/api/elaborate.go
@@ -12,6 +12,8 @@ import (
"sort"
"strings"
"time"
+
+ "github.com/thepeterstone/claudomator/internal/llm"
)
const elaborateTimeout = 30 * time.Second
@@ -245,6 +247,33 @@ func (s *Server) elaborateWithClaude(ctx context.Context, workDir, fullPrompt st
return &result, nil
}
+// elaborateWithLocal runs elaboration through an OpenAI-compatible local LLM.
+// It uses the same prompt template as the Claude/Gemini paths and requests
+// json_object response format so we can decode directly without the
+// markdown-fence cleanup needed for the CLI paths.
+func elaborateWithLocal(ctx context.Context, c *llm.Client, workDir, fullPrompt string) (*elaboratedTask, error) {
+ if c == nil {
+ return nil, fmt.Errorf("local llm: no client configured")
+ }
+ systemPrompt := buildElaboratePrompt(workDir)
+ resp, err := c.Chat(ctx, llm.ChatRequest{
+ Messages: []llm.Message{
+ {Role: "system", Content: systemPrompt},
+ {Role: "user", Content: fullPrompt},
+ },
+ ResponseJSON: true,
+ })
+ if err != nil {
+ return nil, fmt.Errorf("local llm: %w", err)
+ }
+ body := strings.TrimSpace(resp.Content)
+ var result elaboratedTask
+ if jerr := json.Unmarshal([]byte(extractJSON(body)), &result); jerr != nil {
+ return nil, fmt.Errorf("local llm: parse JSON: %w (response: %s)", jerr, body)
+ }
+ return &result, nil
+}
+
func (s *Server) elaborateWithGemini(ctx context.Context, workDir, fullPrompt string) (*elaboratedTask, error) {
combinedPrompt := fmt.Sprintf("%s\n\n%s", buildElaboratePrompt(workDir), fullPrompt)
cmd := exec.CommandContext(ctx, s.geminiBinaryPath(),
@@ -511,18 +540,27 @@ func (s *Server) handleElaborateTask(w http.ResponseWriter, r *http.Request) {
var result *elaboratedTask
var err error
- // Try Claude first.
- result, err = s.elaborateWithClaude(ctx, workDir, fullPrompt)
- if err != nil {
- s.logger.Warn("elaborate: claude failed, falling back to gemini", "error", err)
- // Fallback to Gemini.
- result, err = s.elaborateWithGemini(ctx, workDir, fullPrompt)
+ // Try local LLM first when configured. Falls back to Claude → Gemini on
+ // hard failure of each prior attempt.
+ if s.llm != nil {
+ result, err = elaborateWithLocal(ctx, s.llm, workDir, fullPrompt)
if err != nil {
- s.logger.Error("elaborate: fallback gemini also failed", "error", err)
- writeJSON(w, http.StatusBadGateway, map[string]string{
- "error": fmt.Sprintf("elaboration failed: %v", err),
- })
- return
+ s.logger.Warn("elaborate: local llm failed, falling back to claude", "error", err)
+ result = nil
+ }
+ }
+ if result == nil {
+ result, err = s.elaborateWithClaude(ctx, workDir, fullPrompt)
+ if err != nil {
+ s.logger.Warn("elaborate: claude failed, falling back to gemini", "error", err)
+ result, err = s.elaborateWithGemini(ctx, workDir, fullPrompt)
+ if err != nil {
+ s.logger.Error("elaborate: gemini also failed", "error", err)
+ writeJSON(w, http.StatusBadGateway, map[string]string{
+ "error": fmt.Sprintf("elaboration failed: %v", err),
+ })
+ return
+ }
}
}
diff --git a/internal/api/elaborate_local_test.go b/internal/api/elaborate_local_test.go
new file mode 100644
index 0000000..09a8f9e
--- /dev/null
+++ b/internal/api/elaborate_local_test.go
@@ -0,0 +1,214 @@
+package api
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "net/http/httptest"
+ "strings"
+ "sync/atomic"
+ "testing"
+
+ "github.com/thepeterstone/claudomator/internal/llm"
+)
+
+// fakeChatCompletionsServer returns an httptest server that responds to a
+// /chat/completions POST with the given assistant content (which should be a
+// JSON-encoded elaboratedTask). Returns the server and a counter of calls
+// received so tests can assert dispatch ordering.
+func fakeChatCompletionsServer(t *testing.T, assistantContent string) (*httptest.Server, *int32) {
+ t.Helper()
+ var calls int32
+ srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ atomic.AddInt32(&calls, 1)
+ w.Header().Set("Content-Type", "application/json")
+ // The assistant content has to be JSON-encoded inside the wire format.
+ escaped, _ := json.Marshal(assistantContent)
+ fmt.Fprintf(w, `{
+ "model":"local",
+ "choices":[{"message":{"role":"assistant","content":%s},"finish_reason":"stop"}],
+ "usage":{"prompt_tokens":10,"completion_tokens":50}
+ }`, string(escaped))
+ }))
+ t.Cleanup(srv.Close)
+ return srv, &calls
+}
+
+func TestElaborateWithLocal_ParsesValidResponse(t *testing.T) {
+ taskBody, _ := json.Marshal(elaboratedTask{
+ Name: "Test elaborated task",
+ Description: "From local llm",
+ Agent: elaboratedAgent{
+ Type: "claude",
+ Model: "sonnet",
+ Instructions: "Run go build.",
+ MaxBudgetUSD: 0.25,
+ AllowedTools: []string{"Bash"},
+ },
+ Timeout: "10m",
+ Priority: "normal",
+ Tags: []string{"build"},
+ })
+ srv, calls := fakeChatCompletionsServer(t, string(taskBody))
+
+ c := &llm.Client{Endpoint: srv.URL + "/v1", Model: "fake"}
+ result, err := elaborateWithLocal(context.Background(), c, "/some/dir", "build the project")
+ if err != nil {
+ t.Fatalf("elaborateWithLocal: %v", err)
+ }
+ if result.Name != "Test elaborated task" {
+ t.Errorf("Name: %q", result.Name)
+ }
+ if result.Agent.Instructions != "Run go build." {
+ t.Errorf("Instructions: %q", result.Agent.Instructions)
+ }
+ if got := atomic.LoadInt32(calls); got != 1 {
+ t.Errorf("expected 1 call, got %d", got)
+ }
+}
+
+func TestElaborateWithLocal_NilClient(t *testing.T) {
+ _, err := elaborateWithLocal(context.Background(), nil, "", "p")
+ if err == nil || !strings.Contains(err.Error(), "no client") {
+ t.Errorf("expected nil-client error, got %v", err)
+ }
+}
+
+func TestElaborateWithLocal_BadJSON(t *testing.T) {
+ srv, _ := fakeChatCompletionsServer(t, "this is not JSON at all")
+ c := &llm.Client{Endpoint: srv.URL + "/v1", Model: "fake"}
+ _, err := elaborateWithLocal(context.Background(), c, "", "p")
+ if err == nil || !strings.Contains(err.Error(), "parse JSON") {
+ t.Errorf("expected parse error, got %v", err)
+ }
+}
+
+// TestElaborateTask_LocalLLMPreferred verifies the dispatcher uses local LLM
+// when SetLLM is configured, and does not invoke claude.
+func TestElaborateTask_LocalLLMPreferred(t *testing.T) {
+ srv, _ := testServer(t)
+
+ taskBody, _ := json.Marshal(elaboratedTask{
+ Name: "Local-elaborated",
+ Description: "From local",
+ Agent: elaboratedAgent{
+ Type: "claude",
+ Model: "sonnet",
+ Instructions: "Do work. Tests pass when complete.",
+ MaxBudgetUSD: 0.25,
+ AllowedTools: []string{"Bash"},
+ },
+ Timeout: "10m",
+ Priority: "normal",
+ })
+ llmSrv, _ := fakeChatCompletionsServer(t, string(taskBody))
+ srv.SetLLM(&llm.Client{Endpoint: llmSrv.URL + "/v1", Model: "fake"})
+ // Point Claude binary at a path that would fail if called.
+ srv.elaborateCmdPath = "/nonexistent/claude-should-not-run"
+
+ body := `{"prompt":"do work"}`
+ req := httptest.NewRequest("POST", "/api/tasks/elaborate", bytes.NewBufferString(body))
+ req.Header.Set("Content-Type", "application/json")
+ w := httptest.NewRecorder()
+ srv.Handler().ServeHTTP(w, req)
+
+ if w.Code != http.StatusOK {
+ t.Fatalf("status: want 200, got %d; body: %s", w.Code, w.Body.String())
+ }
+ var got elaboratedTask
+ if err := json.NewDecoder(w.Body).Decode(&got); err != nil {
+ t.Fatalf("decode response: %v", err)
+ }
+ if got.Name != "Local-elaborated" {
+ t.Errorf("Name: want Local-elaborated got %q", got.Name)
+ }
+}
+
+// TestElaborateTask_LocalFails_FallsBackToClaude verifies the dispatcher
+// falls back to the Claude path when the local LLM returns an error.
+func TestElaborateTask_LocalFails_FallsBackToClaude(t *testing.T) {
+ srv, _ := testServer(t)
+
+ // Local LLM server that always 500s.
+ failSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ http.Error(w, "boom", http.StatusInternalServerError)
+ }))
+ t.Cleanup(failSrv.Close)
+ srv.SetLLM(&llm.Client{Endpoint: failSrv.URL + "/v1", Model: "fake"})
+
+ // Configure a working fake Claude binary.
+ taskBody, _ := json.Marshal(elaboratedTask{
+ Name: "Claude-fallback",
+ Description: "From claude after local failed",
+ Agent: elaboratedAgent{
+ Type: "claude",
+ Model: "sonnet",
+ Instructions: "Run tests.",
+ MaxBudgetUSD: 0.25,
+ AllowedTools: []string{"Bash"},
+ },
+ Timeout: "10m",
+ Priority: "normal",
+ })
+ wrapper, _ := json.Marshal(map[string]string{"result": string(taskBody)})
+ srv.elaborateCmdPath = createFakeClaude(t, string(wrapper), 0)
+
+ body := `{"prompt":"run tests"}`
+ req := httptest.NewRequest("POST", "/api/tasks/elaborate", bytes.NewBufferString(body))
+ req.Header.Set("Content-Type", "application/json")
+ w := httptest.NewRecorder()
+ srv.Handler().ServeHTTP(w, req)
+
+ if w.Code != http.StatusOK {
+ t.Fatalf("status: want 200, got %d; body: %s", w.Code, w.Body.String())
+ }
+ var got elaboratedTask
+ if err := json.NewDecoder(w.Body).Decode(&got); err != nil {
+ t.Fatalf("decode response: %v", err)
+ }
+ if got.Name != "Claude-fallback" {
+ t.Errorf("Name: want Claude-fallback (fallback path) got %q", got.Name)
+ }
+}
+
+// TestElaborateTask_NoLocalLLM_UsesClaude verifies that when SetLLM is not
+// called, behavior is unchanged (Claude path still primary).
+func TestElaborateTask_NoLocalLLM_UsesClaude(t *testing.T) {
+ srv, _ := testServer(t)
+
+ taskBody, _ := json.Marshal(elaboratedTask{
+ Name: "Claude-only",
+ Description: "no local llm configured",
+ Agent: elaboratedAgent{
+ Type: "claude",
+ Model: "sonnet",
+ Instructions: "Do work.",
+ MaxBudgetUSD: 0.25,
+ AllowedTools: []string{"Bash"},
+ },
+ Timeout: "10m",
+ Priority: "normal",
+ })
+ wrapper, _ := json.Marshal(map[string]string{"result": string(taskBody)})
+ srv.elaborateCmdPath = createFakeClaude(t, string(wrapper), 0)
+
+ body := `{"prompt":"do work"}`
+ req := httptest.NewRequest("POST", "/api/tasks/elaborate", bytes.NewBufferString(body))
+ req.Header.Set("Content-Type", "application/json")
+ w := httptest.NewRecorder()
+ srv.Handler().ServeHTTP(w, req)
+
+ if w.Code != http.StatusOK {
+ t.Fatalf("status: want 200, got %d; body: %s", w.Code, w.Body.String())
+ }
+ var got elaboratedTask
+ if err := json.NewDecoder(w.Body).Decode(&got); err != nil {
+ t.Fatalf("decode response: %v", err)
+ }
+ if got.Name != "Claude-only" {
+ t.Errorf("Name: %q", got.Name)
+ }
+}
+
diff --git a/internal/api/server.go b/internal/api/server.go
index e7756d1..28cfe4a 100644
--- a/internal/api/server.go
+++ b/internal/api/server.go
@@ -12,6 +12,7 @@ import (
"github.com/thepeterstone/claudomator/internal/config"
"github.com/thepeterstone/claudomator/internal/executor"
+ "github.com/thepeterstone/claudomator/internal/llm"
"github.com/thepeterstone/claudomator/internal/notify"
"github.com/thepeterstone/claudomator/internal/storage"
"github.com/thepeterstone/claudomator/internal/task"
@@ -57,6 +58,7 @@ type Server struct {
vapidEmail string
pushStore pushSubscriptionStore
dropsDir string
+ llm *llm.Client
}
// SetAPIToken configures a bearer token that must be supplied to access the API.
@@ -89,6 +91,14 @@ func (s *Server) SetWorkspaceRoot(path string) {
// Pool returns the executor pool, for graceful shutdown by the caller.
func (s *Server) Pool() *executor.Pool { return s.pool }
+// SetLLM wires a local OpenAI-compatible LLM client for use by elaboration
+// (and future internal helpers). When non-nil, elaboration will prefer it
+// over the Claude CLI; on failure it falls back to claude → gemini.
+func (s *Server) SetLLM(c *llm.Client) {
+ s.llm = c
+}
+
+
func NewServer(store *storage.DB, pool *executor.Pool, logger *slog.Logger, claudeBinPath, geminiBinPath string) *Server {
wd, _ := os.Getwd()
s := &Server{
diff --git a/internal/api/webhook.go b/internal/api/webhook.go
index 141224f..3af4cc8 100644
--- a/internal/api/webhook.go
+++ b/internal/api/webhook.go
@@ -1,6 +1,7 @@
package api
import (
+ "context"
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
@@ -184,7 +185,7 @@ func (s *Server) createCIFailureTask(w http.ResponseWriter, repoName, fullName,
htmlURL = fmt.Sprintf("https://github.com/%s/commit/%s", fullName, sha)
}
- instructions := fmt.Sprintf(
+ fallback := fmt.Sprintf(
"A CI failure has been detected and requires investigation.\n\n"+
"Repository: %s\n"+
"Branch: %s\n"+
@@ -199,6 +200,18 @@ func (s *Server) createCIFailureTask(w http.ResponseWriter, repoName, fullName,
fullName, branch, sha, checkName, htmlURL,
)
+ tctx := ciTriageContext{
+ Repo: fullName,
+ Branch: branch,
+ SHA: sha,
+ CheckName: checkName,
+ URL: htmlURL,
+ }
+ if project != nil {
+ tctx.ProjectDir = project.Dir
+ }
+ instructions := enrichCIInstructions(context.Background(), s.llm, tctx, fallback)
+
now := time.Now().UTC()
t := &task.Task{
ID: uuid.New().String(),
diff --git a/internal/api/webhook_llm.go b/internal/api/webhook_llm.go
new file mode 100644
index 0000000..1cbca17
--- /dev/null
+++ b/internal/api/webhook_llm.go
@@ -0,0 +1,127 @@
+package api
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "strings"
+ "time"
+
+ "github.com/thepeterstone/claudomator/internal/llm"
+)
+
+// ciTriagePromptTimeout caps the LLM enrichment call so a slow local model
+// can't stall webhook handling. On timeout the original template is used.
+const ciTriagePromptTimeout = 10 * time.Second
+
+// ciTriageContext holds everything we know at webhook time, plus best-effort
+// project-side signals (recent git log, CLAUDE.md content) when project_dir
+// is available.
+type ciTriageContext struct {
+ Repo string
+ Branch string
+ SHA string
+ CheckName string
+ URL string
+ ProjectDir string
+ RecentCommits string // multi-line, may be ""
+ ProjectDoc string // first ~4 KB of CLAUDE.md, may be ""
+}
+
+// enrichCIInstructions asks the local LLM to produce a tighter, project-aware
+// investigation plan than the hardcoded template. On any error (no client,
+// timeout, parse failure) it returns fallback unchanged so the webhook flow
+// is never worse off for trying.
+func enrichCIInstructions(parent context.Context, c *llm.Client, ctx ciTriageContext, fallback string) string {
+ if c == nil {
+ return fallback
+ }
+
+ // Pull project-side signals best-effort. Errors are silently swallowed —
+ // the LLM still gets the metadata it does have.
+ if ctx.ProjectDir != "" {
+ ctx.RecentCommits = readRecentCommits(ctx.ProjectDir, 5)
+ ctx.ProjectDoc = readProjectDoc(ctx.ProjectDir)
+ }
+
+ cctx, cancel := context.WithTimeout(parent, ciTriagePromptTimeout)
+ defer cancel()
+
+ prompt := buildCITriagePrompt(ctx)
+ resp, err := c.Chat(cctx, llm.ChatRequest{
+ Messages: []llm.Message{
+ {Role: "system", Content: "You produce concise, actionable CI failure investigation plans. Respond with plain text only — no markdown fences, no JSON, no preamble."},
+ {Role: "user", Content: prompt},
+ },
+ })
+ if err != nil {
+ return fallback
+ }
+ body := strings.TrimSpace(resp.Content)
+ if body == "" {
+ return fallback
+ }
+ // Always preserve the metadata header from the fallback so investigators
+ // can see repo/branch/SHA/URL even if the LLM body is terse.
+ return ciInstructionsHeader(ctx) + "\n\n" + body
+}
+
+func buildCITriagePrompt(ctx ciTriageContext) string {
+ var sb strings.Builder
+ fmt.Fprintf(&sb, "CI just failed.\n\nRepository: %s\nBranch: %s\nCommit SHA: %s\nCheck/Workflow: %s\nRun URL: %s\n",
+ ctx.Repo, ctx.Branch, ctx.SHA, ctx.CheckName, ctx.URL)
+ if ctx.RecentCommits != "" {
+ fmt.Fprintf(&sb, "\nRecent commits on this branch (newest first):\n%s\n", ctx.RecentCommits)
+ }
+ if ctx.ProjectDoc != "" {
+ fmt.Fprintf(&sb, "\nProject context (CLAUDE.md, truncated):\n%s\n", ctx.ProjectDoc)
+ }
+ sb.WriteString("\nProduce 6–12 lines of investigation steps. Name suspect commits or files when you can; otherwise give concrete starting actions (which logs to read, which tests to re-run locally). End with an explicit 'Acceptance Criteria' section listing what 'fixed' looks like.")
+ return sb.String()
+}
+
+func ciInstructionsHeader(ctx ciTriageContext) string {
+ return fmt.Sprintf(
+ "A CI failure has been detected and requires investigation.\n\n"+
+ "Repository: %s\n"+
+ "Branch: %s\n"+
+ "Commit SHA: %s\n"+
+ "Check/Workflow: %s\n"+
+ "Run URL: %s",
+ ctx.Repo, ctx.Branch, ctx.SHA, ctx.CheckName, ctx.URL,
+ )
+}
+
+// readRecentCommits returns the last n commits as a `git log --oneline`-style
+// string, or "" on any error.
+func readRecentCommits(projectDir string, n int) string {
+ if projectDir == "" {
+ return ""
+ }
+ cctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
+ defer cancel()
+ cmd := exec.CommandContext(cctx, "git", "-C", projectDir, "log", "--oneline", fmt.Sprintf("-n%d", n))
+ out, err := cmd.Output()
+ if err != nil {
+ return ""
+ }
+ return strings.TrimSpace(string(out))
+}
+
+// readProjectDoc returns CLAUDE.md content (capped at 4KB) or "".
+func readProjectDoc(projectDir string) string {
+ if projectDir == "" {
+ return ""
+ }
+ data, err := os.ReadFile(filepath.Join(projectDir, "CLAUDE.md"))
+ if err != nil {
+ return ""
+ }
+ const cap = 4096
+ if len(data) > cap {
+ data = data[:cap]
+ }
+ return strings.TrimSpace(string(data))
+}
diff --git a/internal/api/webhook_llm_test.go b/internal/api/webhook_llm_test.go
new file mode 100644
index 0000000..f2381a1
--- /dev/null
+++ b/internal/api/webhook_llm_test.go
@@ -0,0 +1,228 @@
+package api
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "net/http/httptest"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "strings"
+ "testing"
+
+ "github.com/thepeterstone/claudomator/internal/config"
+ "github.com/thepeterstone/claudomator/internal/llm"
+)
+
+// initGitRepo creates a fresh git repo with two commits and returns its path.
+// Used to verify enrichCIInstructions picks up recent commits.
+func initGitRepo(t *testing.T) string {
+ t.Helper()
+ dir := t.TempDir()
+ run := func(args ...string) {
+ cmd := exec.Command("git", append([]string{"-C", dir}, args...)...)
+ cmd.Env = append(os.Environ(),
+ "GIT_AUTHOR_NAME=test", "GIT_AUTHOR_EMAIL=test@example.com",
+ "GIT_COMMITTER_NAME=test", "GIT_COMMITTER_EMAIL=test@example.com",
+ // Disable signing in case the host has a global pre-commit signer.
+ "GIT_CONFIG_GLOBAL=/dev/null",
+ )
+ if out, err := cmd.CombinedOutput(); err != nil {
+ t.Fatalf("git %v: %v\n%s", args, err, out)
+ }
+ }
+ run("init", "-q")
+ run("config", "commit.gpgsign", "false")
+ run("config", "tag.gpgsign", "false")
+ if err := os.WriteFile(filepath.Join(dir, "README"), []byte("v1\n"), 0644); err != nil {
+ t.Fatal(err)
+ }
+ run("add", "README")
+ run("commit", "-q", "-m", "first commit", "--no-gpg-sign")
+ if err := os.WriteFile(filepath.Join(dir, "README"), []byte("v2\n"), 0644); err != nil {
+ t.Fatal(err)
+ }
+ run("add", "README")
+ run("commit", "-q", "-m", "fix: bump readme", "--no-gpg-sign")
+ return dir
+}
+
+func TestEnrichCIInstructions_NilClient_ReturnsFallback(t *testing.T) {
+ got := enrichCIInstructions(context.Background(), nil, ciTriageContext{}, "FALLBACK")
+ if got != "FALLBACK" {
+ t.Errorf("nil client: want FALLBACK, got %q", got)
+ }
+}
+
+func TestEnrichCIInstructions_LLMFailure_ReturnsFallback(t *testing.T) {
+ // Server that always 500s.
+ srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ http.Error(w, "boom", http.StatusInternalServerError)
+ }))
+ defer srv.Close()
+
+ c := &llm.Client{Endpoint: srv.URL + "/v1", Model: "fake"}
+ got := enrichCIInstructions(context.Background(), c,
+ ciTriageContext{Repo: "x", Branch: "main"}, "FALLBACK")
+ if got != "FALLBACK" {
+ t.Errorf("llm failure: want FALLBACK, got %q", got)
+ }
+}
+
+func TestEnrichCIInstructions_EmptyLLMBody_ReturnsFallback(t *testing.T) {
+ srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Content-Type", "application/json")
+ fmt.Fprintln(w, `{"model":"x","choices":[{"message":{"content":""},"finish_reason":"stop"}],"usage":{}}`)
+ }))
+ defer srv.Close()
+ c := &llm.Client{Endpoint: srv.URL + "/v1", Model: "fake"}
+ got := enrichCIInstructions(context.Background(), c, ciTriageContext{}, "FALLBACK-2")
+ if got != "FALLBACK-2" {
+ t.Errorf("empty body: want fallback, got %q", got)
+ }
+}
+
+func TestEnrichCIInstructions_LLMSuccess_ReturnsEnriched(t *testing.T) {
+ expected := "1. Look at commit abc123\n2. Re-run build locally\n3. Check unit tests"
+
+ var capturedPrompt string
+ srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ var body struct {
+ Messages []struct {
+ Role string `json:"role"`
+ Content string `json:"content"`
+ } `json:"messages"`
+ }
+ if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
+ t.Fatal(err)
+ }
+ // Capture the user message so we can assert metadata is in the prompt.
+ for _, m := range body.Messages {
+ if m.Role == "user" {
+ capturedPrompt = m.Content
+ }
+ }
+
+ w.Header().Set("Content-Type", "application/json")
+ fmt.Fprintf(w, `{"model":"x","choices":[{"message":{"content":%q},"finish_reason":"stop"}],"usage":{}}`, expected)
+ }))
+ defer srv.Close()
+
+ c := &llm.Client{Endpoint: srv.URL + "/v1", Model: "fake"}
+ tctx := ciTriageContext{
+ Repo: "owner/myrepo",
+ Branch: "main",
+ SHA: "abc123",
+ CheckName: "CI Build",
+ URL: "https://github.com/owner/myrepo/runs/1",
+ }
+ got := enrichCIInstructions(context.Background(), c, tctx, "FALLBACK")
+
+ if !strings.Contains(got, expected) {
+ t.Errorf("enriched body missing LLM content; got: %s", got)
+ }
+ if !strings.Contains(got, "Repository: owner/myrepo") {
+ t.Errorf("enriched body missing metadata header; got: %s", got)
+ }
+ for _, want := range []string{"owner/myrepo", "main", "abc123", "CI Build"} {
+ if !strings.Contains(capturedPrompt, want) {
+ t.Errorf("prompt missing %q; got: %s", want, capturedPrompt)
+ }
+ }
+}
+
+func TestEnrichCIInstructions_IncludesRecentCommits(t *testing.T) {
+ repo := initGitRepo(t)
+
+ var capturedPrompt string
+ srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ var body struct {
+ Messages []struct {
+ Role string `json:"role"`
+ Content string `json:"content"`
+ } `json:"messages"`
+ }
+ json.NewDecoder(r.Body).Decode(&body)
+ for _, m := range body.Messages {
+ if m.Role == "user" {
+ capturedPrompt = m.Content
+ }
+ }
+ w.Header().Set("Content-Type", "application/json")
+ fmt.Fprintln(w, `{"model":"x","choices":[{"message":{"content":"plan"},"finish_reason":"stop"}],"usage":{}}`)
+ }))
+ defer srv.Close()
+
+ c := &llm.Client{Endpoint: srv.URL + "/v1", Model: "fake"}
+ enrichCIInstructions(context.Background(), c,
+ ciTriageContext{Repo: "x", Branch: "y", ProjectDir: repo}, "FALLBACK")
+
+ if !strings.Contains(capturedPrompt, "Recent commits") {
+ t.Errorf("expected prompt to include recent commits section; got:\n%s", capturedPrompt)
+ }
+ if !strings.Contains(capturedPrompt, "fix: bump readme") {
+ t.Errorf("expected most recent commit message in prompt; got:\n%s", capturedPrompt)
+ }
+}
+
+// TestWebhook_NoLLM_InstructionsPreserved is the regression guard: when no
+// LLM is configured, webhook task instructions match the historical template
+// exactly.
+func TestWebhook_NoLLM_InstructionsPreserved(t *testing.T) {
+ srv, store := testServer(t)
+ srv.projects = []config.Project{{Name: "myrepo", Dir: "/workspace/myrepo"}}
+
+ w := webhookPost(t, srv, "check_run", checkRunFailurePayload, "")
+ if w.Code != http.StatusOK {
+ t.Fatalf("status: %d", w.Code)
+ }
+ var resp map[string]string
+ json.NewDecoder(w.Body).Decode(&resp)
+ tk, err := store.GetTask(resp["task_id"])
+ if err != nil {
+ t.Fatal(err)
+ }
+ for _, want := range []string{
+ "A CI failure has been detected",
+ "Please investigate the failure by:",
+ "1. Reviewing recent commits on the branch",
+ "4. Fixing the root cause and ensuring the build passes",
+ } {
+ if !strings.Contains(tk.Agent.Instructions, want) {
+ t.Errorf("instructions missing %q (regression: LLM path leaked into no-LLM case)", want)
+ }
+ }
+}
+
+// TestWebhook_WithLLM_InstructionsEnriched verifies the LLM body appears in
+// the created task's instructions when SetLLM is configured.
+func TestWebhook_WithLLM_InstructionsEnriched(t *testing.T) {
+ srv, store := testServer(t)
+ srv.projects = []config.Project{{Name: "myrepo", Dir: "/workspace/myrepo"}}
+
+ llmSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Content-Type", "application/json")
+ fmt.Fprintln(w, `{"model":"x","choices":[{"message":{"content":"LLM-GENERATED-PLAN"},"finish_reason":"stop"}],"usage":{}}`)
+ }))
+ defer llmSrv.Close()
+ srv.SetLLM(&llm.Client{Endpoint: llmSrv.URL + "/v1", Model: "fake"})
+
+ w := webhookPost(t, srv, "check_run", checkRunFailurePayload, "")
+ if w.Code != http.StatusOK {
+ t.Fatalf("status: %d body: %s", w.Code, w.Body.String())
+ }
+ var resp map[string]string
+ json.NewDecoder(w.Body).Decode(&resp)
+ tk, err := store.GetTask(resp["task_id"])
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !strings.Contains(tk.Agent.Instructions, "LLM-GENERATED-PLAN") {
+ t.Errorf("instructions missing LLM body; got:\n%s", tk.Agent.Instructions)
+ }
+ if !strings.Contains(tk.Agent.Instructions, "Repository: owner/myrepo") {
+ t.Errorf("instructions missing metadata header; got:\n%s", tk.Agent.Instructions)
+ }
+}
diff --git a/internal/cli/llm.go b/internal/cli/llm.go
new file mode 100644
index 0000000..04fe902
--- /dev/null
+++ b/internal/cli/llm.go
@@ -0,0 +1,31 @@
+package cli
+
+import (
+ "log/slog"
+ "net/http"
+ "time"
+
+ "github.com/thepeterstone/claudomator/internal/config"
+ "github.com/thepeterstone/claudomator/internal/llm"
+)
+
+// buildLocalLLMClient returns an *llm.Client when a local model endpoint is
+// configured. Returns nil when LocalModel.Endpoint is empty so callers can
+// gate on `if c != nil` to skip registering LocalRunner / using the LLM
+// classifier path.
+func buildLocalLLMClient(cfg config.LocalModel, logger *slog.Logger) *llm.Client {
+ if cfg.Endpoint == "" {
+ return nil
+ }
+ timeout := 60 * time.Second
+ if cfg.TimeoutSeconds > 0 {
+ timeout = time.Duration(cfg.TimeoutSeconds) * time.Second
+ }
+ return &llm.Client{
+ Endpoint: cfg.Endpoint,
+ Model: cfg.Model,
+ APIKey: cfg.APIKey,
+ HTTPClient: &http.Client{Timeout: timeout},
+ Logger: logger,
+ }
+}
diff --git a/internal/cli/run.go b/internal/cli/run.go
index cfac893..48f34b7 100644
--- a/internal/cli/run.go
+++ b/internal/cli/run.go
@@ -100,9 +100,24 @@ func runTasks(file string, parallel int, dryRun bool) error {
},
}
+ localClient := buildLocalLLMClient(cfg.LocalModel, logger)
+ if localClient != nil {
+ runners["local"] = &executor.LocalRunner{
+ Client: localClient,
+ Logger: logger,
+ LogDir: cfg.LogDir,
+ DefaultTemperature: cfg.LocalModel.DefaultTemperature,
+ }
+ }
+
+
pool := executor.NewPool(parallel, runners, store, logger)
- if cfg.GeminiBinaryPath != "" {
- pool.Classifier = &executor.Classifier{GeminiBinaryPath: cfg.GeminiBinaryPath}
+ pool.Classifier = &executor.Classifier{
+ LLM: localClient,
+ GeminiBinaryPath: cfg.GeminiBinaryPath,
+ }
+ if localClient != nil {
+ pool.LLM = localClient
}
// Handle graceful shutdown.
diff --git a/internal/cli/serve.go b/internal/cli/serve.go
index 581a064..459c35b 100644
--- a/internal/cli/serve.go
+++ b/internal/cli/serve.go
@@ -117,9 +117,25 @@ func serve(addr string) error {
},
}
+ localClient := buildLocalLLMClient(cfg.LocalModel, logger)
+ if localClient != nil {
+ runners["local"] = &executor.LocalRunner{
+ Client: localClient,
+ Logger: logger,
+ LogDir: cfg.LogDir,
+ DefaultTemperature: cfg.LocalModel.DefaultTemperature,
+ }
+ logger.Info("local runner registered", "endpoint", cfg.LocalModel.Endpoint, "model", cfg.LocalModel.Model)
+ }
+
+
pool := executor.NewPool(cfg.MaxConcurrent, runners, store, logger)
- if cfg.GeminiBinaryPath != "" {
- pool.Classifier = &executor.Classifier{GeminiBinaryPath: cfg.GeminiBinaryPath}
+ pool.Classifier = &executor.Classifier{
+ LLM: localClient,
+ GeminiBinaryPath: cfg.GeminiBinaryPath,
+ }
+ if localClient != nil {
+ pool.LLM = localClient
}
if err := store.SeedProjects(); err != nil {
@@ -154,6 +170,10 @@ func serve(addr string) error {
if cfg.WorkspaceRoot != "" {
srv.SetWorkspaceRoot(cfg.WorkspaceRoot)
}
+ if cfg.LocalModel.UseForElaborate() {
+ srv.SetLLM(localClient)
+ logger.Info("elaboration prefers local llm", "endpoint", cfg.LocalModel.Endpoint)
+ }
srv.SetGitHubWebhookConfig(cfg.WebhookSecret, cfg.Projects)
// Register scripts.
diff --git a/internal/config/config.go b/internal/config/config.go
index 71258c1..25187cf 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -16,28 +16,58 @@ type Project struct {
Dir string `toml:"dir"`
}
+// LocalModel configures an OpenAI-compatible local LLM endpoint used for
+// internal helpers (classifier, elaboration, future summarization) and as
+// the backend for the "local" runner. If Endpoint is empty, the LocalRunner
+// is not registered and the classifier falls back to the Gemini CLI.
+//
+// PreferForElaborate gates whether the API server's elaboration handler
+// uses this client. It defaults to true when Endpoint is set; users with a
+// slow or low-quality local model can disable it.
+type LocalModel struct {
+ Endpoint string `toml:"endpoint"` // e.g. "http://localhost:11434/v1"
+ Model string `toml:"model"` // e.g. "llama3.1:8b"
+ TimeoutSeconds int `toml:"timeout_seconds"` // default 60
+ DefaultTemperature float64 `toml:"default_temperature"` // default 0.2
+ APIKey string `toml:"api_key"` // optional bearer token
+ PreferForElaborate *bool `toml:"prefer_for_elaborate"` // pointer so default-true survives parse
+}
+
+// UseForElaborate returns true when elaboration should try this local model
+// before falling back to Claude/Gemini. Default is true when Endpoint is set.
+func (m LocalModel) UseForElaborate() bool {
+ if m.Endpoint == "" {
+ return false
+ }
+ if m.PreferForElaborate == nil {
+ return true
+ }
+ return *m.PreferForElaborate
+}
+
type Config struct {
- DataDir string `toml:"data_dir"`
- DBPath string `toml:"-"`
- LogDir string `toml:"-"`
- DropsDir string `toml:"-"`
- SSHAuthSock string `toml:"ssh_auth_sock"`
- ClaudeBinaryPath string `toml:"claude_binary_path"`
- GeminiBinaryPath string `toml:"gemini_binary_path"`
- ClaudeImage string `toml:"claude_image"`
- GeminiImage string `toml:"gemini_image"`
+ DataDir string `toml:"data_dir"`
+ DBPath string `toml:"-"`
+ LogDir string `toml:"-"`
+ DropsDir string `toml:"-"`
+ SSHAuthSock string `toml:"ssh_auth_sock"`
+ ClaudeBinaryPath string `toml:"claude_binary_path"`
+ GeminiBinaryPath string `toml:"gemini_binary_path"`
+ ClaudeImage string `toml:"claude_image"`
+ GeminiImage string `toml:"gemini_image"`
MaxConcurrent int `toml:"max_concurrent"`
ShutdownTimeout time.Duration `toml:"shutdown_timeout"`
- DefaultTimeout string `toml:"default_timeout"`
- ServerAddr string `toml:"server_addr"`
- WebhookURL string `toml:"webhook_url"`
- WorkspaceRoot string `toml:"workspace_root"`
- WebhookSecret string `toml:"webhook_secret"`
- Projects []Project `toml:"projects"`
- VAPIDPublicKey string `toml:"vapid_public_key"`
- VAPIDPrivateKey string `toml:"vapid_private_key"`
- VAPIDEmail string `toml:"vapid_email"`
- ClaudeConfigDir string `toml:"claude_config_dir"`
+ DefaultTimeout string `toml:"default_timeout"`
+ ServerAddr string `toml:"server_addr"`
+ WebhookURL string `toml:"webhook_url"`
+ WorkspaceRoot string `toml:"workspace_root"`
+ WebhookSecret string `toml:"webhook_secret"`
+ Projects []Project `toml:"projects"`
+ VAPIDPublicKey string `toml:"vapid_public_key"`
+ VAPIDPrivateKey string `toml:"vapid_private_key"`
+ VAPIDEmail string `toml:"vapid_email"`
+ ClaudeConfigDir string `toml:"claude_config_dir"`
+ LocalModel LocalModel `toml:"local_model"`
}
func Default() (*Config, error) {
diff --git a/internal/config/config_test.go b/internal/config/config_test.go
index 2bba2c4..e4f1a5d 100644
--- a/internal/config/config_test.go
+++ b/internal/config/config_test.go
@@ -53,3 +53,33 @@ func TestLoadFile_MissingFile_ReturnsError(t *testing.T) {
t.Fatal("expected error for missing file, got nil")
}
}
+
+func TestLocalModel_UseForElaborate_EmptyEndpoint(t *testing.T) {
+ m := LocalModel{}
+ if m.UseForElaborate() {
+ t.Error("empty endpoint should never opt into elaborate")
+ }
+}
+
+func TestLocalModel_UseForElaborate_DefaultTrue(t *testing.T) {
+ m := LocalModel{Endpoint: "http://localhost:11434/v1"}
+ if !m.UseForElaborate() {
+ t.Error("endpoint set + default flag should opt in")
+ }
+}
+
+func TestLocalModel_UseForElaborate_ExplicitFalse(t *testing.T) {
+ f := false
+ m := LocalModel{Endpoint: "http://localhost:11434/v1", PreferForElaborate: &f}
+ if m.UseForElaborate() {
+ t.Error("explicit false should opt out")
+ }
+}
+
+func TestLocalModel_UseForElaborate_ExplicitTrue(t *testing.T) {
+ tr := true
+ m := LocalModel{Endpoint: "http://localhost:11434/v1", PreferForElaborate: &tr}
+ if !m.UseForElaborate() {
+ t.Error("explicit true should opt in")
+ }
+}
diff --git a/internal/executor/classifier.go b/internal/executor/classifier.go
index 7a474b6..049dc4f 100644
--- a/internal/executor/classifier.go
+++ b/internal/executor/classifier.go
@@ -6,6 +6,8 @@ import (
"fmt"
"os/exec"
"strings"
+
+ "github.com/thepeterstone/claudomator/internal/llm"
)
type Classification struct {
@@ -19,7 +21,12 @@ type SystemStatus struct {
RateLimited map[string]bool
}
+// Classifier picks a model for an incoming task. When LLM is non-nil the
+// classifier routes through the local OpenAI-compatible client (cheap,
+// private, fast). Otherwise it falls back to invoking the Gemini CLI
+// at GeminiBinaryPath.
type Classifier struct {
+ LLM *llm.Client
GeminiBinaryPath string
}
@@ -62,6 +69,10 @@ func (c *Classifier) Classify(ctx context.Context, taskName, instructions string
agentType, taskName, instructions, agentType,
)
+ if c.LLM != nil {
+ return c.classifyViaLLM(ctx, prompt, agentType)
+ }
+
binary := c.GeminiBinaryPath
if binary == "" {
binary = "gemini"
@@ -123,3 +134,25 @@ func (c *Classifier) Classify(ctx context.Context, taskName, instructions string
return &cls, nil
}
+
+// classifyViaLLM routes classification through the local OpenAI-compatible
+// client with response_format=json_object, so we get clean JSON without the
+// markdown-fence cleanup needed for the Gemini CLI fallback.
+func (c *Classifier) classifyViaLLM(ctx context.Context, prompt, agentType string) (*Classification, error) {
+ resp, err := c.LLM.Chat(ctx, llm.ChatRequest{
+ Messages: []llm.Message{{Role: "user", Content: prompt}},
+ ResponseJSON: true,
+ })
+ if err != nil {
+ return nil, fmt.Errorf("classifier (local llm): %w", err)
+ }
+ body := strings.TrimSpace(resp.Content)
+ var cls Classification
+ if err := json.Unmarshal([]byte(body), &cls); err != nil {
+ return nil, fmt.Errorf("classifier (local llm): parse JSON: %w\nbody: %s", err, body)
+ }
+ if cls.AgentType == "" {
+ cls.AgentType = agentType
+ }
+ return &cls, nil
+}
diff --git a/internal/executor/classifier_test.go b/internal/executor/classifier_test.go
index 83a9743..84fffcf 100644
--- a/internal/executor/classifier_test.go
+++ b/internal/executor/classifier_test.go
@@ -2,8 +2,15 @@ package executor
import (
"context"
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "net/http/httptest"
"os"
+ "strings"
"testing"
+
+ "github.com/thepeterstone/claudomator/internal/llm"
)
// TestClassifier_Classify_Mock tests the classifier with a mocked gemini binary.
@@ -36,6 +43,75 @@ echo '{"response": "{\"agent_type\": \"gemini\", \"model\": \"gemini-2.5-flash-l
}
}
+// TestClassifier_Classify_LLM tests classification through a local OpenAI-compatible LLM.
+func TestClassifier_Classify_LLM(t *testing.T) {
+ srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ // Verify the classifier asked for JSON mode.
+ var body struct {
+ ResponseFormat *struct {
+ Type string `json:"type"`
+ } `json:"response_format"`
+ }
+ if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
+ t.Fatalf("decode body: %v", err)
+ }
+ if body.ResponseFormat == nil || body.ResponseFormat.Type != "json_object" {
+ t.Errorf("classifier should request json_object response format")
+ }
+
+ w.Header().Set("Content-Type", "application/json")
+ fmt.Fprintln(w, `{
+ "model":"local-fast",
+ "choices":[{"message":{"role":"assistant","content":"{\"agent_type\":\"claude\",\"model\":\"claude-haiku-4-5-20251001\",\"reason\":\"trivial task\"}"},"finish_reason":"stop"}],
+ "usage":{"prompt_tokens":10,"completion_tokens":15}
+ }`)
+ }))
+ defer srv.Close()
+
+ c := &Classifier{
+ LLM: &llm.Client{Endpoint: srv.URL + "/v1", Model: "local-fast"},
+ }
+ status := SystemStatus{
+ ActiveTasks: map[string]int{"claude": 1, "gemini": 0},
+ RateLimited: map[string]bool{},
+ }
+
+ cls, err := c.Classify(context.Background(), "List files", "ls -la", status, "claude")
+ if err != nil {
+ t.Fatalf("Classify: %v", err)
+ }
+ if cls.AgentType != "claude" {
+ t.Errorf("AgentType: want claude got %q", cls.AgentType)
+ }
+ if cls.Model != "claude-haiku-4-5-20251001" {
+ t.Errorf("Model: want claude-haiku-4-5-20251001 got %q", cls.Model)
+ }
+ if !strings.Contains(cls.Reason, "trivial") {
+ t.Errorf("Reason mismatch: %q", cls.Reason)
+ }
+}
+
+// TestClassifier_LLMTakesPrecedence_OverGemini ensures the LLM path is preferred when both are configured.
+func TestClassifier_LLMTakesPrecedence_OverGemini(t *testing.T) {
+ srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Content-Type", "application/json")
+ fmt.Fprintln(w, `{"model":"x","choices":[{"message":{"content":"{\"agent_type\":\"claude\",\"model\":\"claude-sonnet-4-6\",\"reason\":\"r\"}"},"finish_reason":"stop"}],"usage":{}}`)
+ }))
+ defer srv.Close()
+
+ c := &Classifier{
+ LLM: &llm.Client{Endpoint: srv.URL + "/v1", Model: "x"},
+ GeminiBinaryPath: "/nonexistent/gemini-binary-should-not-be-called",
+ }
+ cls, err := c.Classify(context.Background(), "n", "i", SystemStatus{}, "claude")
+ if err != nil {
+ t.Fatalf("Classify: %v", err)
+ }
+ if cls.Model != "claude-sonnet-4-6" {
+ t.Errorf("expected LLM path; got Model=%q", cls.Model)
+ }
+}
+
func filepathJoin(elems ...string) string {
var path string
for i, e := range elems {
diff --git a/internal/executor/claude.go b/internal/executor/claude.go
new file mode 100644
index 0000000..3c87f26
--- /dev/null
+++ b/internal/executor/claude.go
@@ -0,0 +1,552 @@
+package executor
+
+import (
+ "context"
+ "fmt"
+ "log/slog"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "strings"
+ "sync"
+ "syscall"
+ "time"
+
+ "github.com/thepeterstone/claudomator/internal/retry"
+ "github.com/thepeterstone/claudomator/internal/storage"
+ "github.com/thepeterstone/claudomator/internal/task"
+)
+
+// ClaudeRunner spawns the `claude` CLI in non-interactive mode.
+type ClaudeRunner struct {
+ BinaryPath string // defaults to "claude"
+ Logger *slog.Logger
+ LogDir string // base directory for execution logs
+ APIURL string // base URL of the Claudomator API, passed to subprocesses
+}
+
+// BlockedError is returned by Run when the agent wrote a question file and exited.
+// The pool transitions the task to BLOCKED and stores the question for the user.
+// ExecLogDir returns the log directory for the given execution ID.
+// Implements LogPather so the pool can persist paths before execution starts.
+func (r *ClaudeRunner) ExecLogDir(execID string) string {
+ if r.LogDir == "" {
+ return ""
+ }
+ return filepath.Join(r.LogDir, execID)
+}
+
+func (r *ClaudeRunner) binaryPath() string {
+ if r.BinaryPath != "" {
+ return r.BinaryPath
+ }
+ return "claude"
+}
+
+// Run executes a claude -p invocation, streaming output to log files.
+// It retries up to 3 times on rate-limit errors using exponential backoff.
+// If the agent writes a question file and exits, Run returns *BlockedError.
+//
+// When project_dir is set and this is not a resume execution, Run clones the
+// project into a temp sandbox, runs the agent there, then merges committed
+// changes back to project_dir. On failure the sandbox is preserved and its
+// path is included in the error.
+func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error {
+ projectDir := t.Agent.ProjectDir
+
+ // Validate project_dir exists when set.
+ if projectDir != "" {
+ if _, err := os.Stat(projectDir); err != nil {
+ return fmt.Errorf("project_dir %q: %w", projectDir, err)
+ }
+ }
+
+ // Setup log directory once; retries overwrite the log files.
+ logDir := r.ExecLogDir(e.ID)
+ if logDir == "" {
+ logDir = e.ID // fallback for tests without LogDir set
+ }
+ if err := os.MkdirAll(logDir, 0700); err != nil {
+ return fmt.Errorf("creating log dir: %w", err)
+ }
+ if e.StdoutPath == "" {
+ e.StdoutPath = filepath.Join(logDir, "stdout.log")
+ e.StderrPath = filepath.Join(logDir, "stderr.log")
+ e.ArtifactDir = logDir
+ }
+
+ // Pre-assign session ID so we can resume after a BLOCKED state.
+ // For resume executions, the claude session continues under the original
+ // session ID (the one passed to --resume). Using the new exec's own UUID
+ // would cause a second block-and-resume cycle to pass the wrong --resume
+ // argument.
+ if e.SessionID == "" {
+ if e.ResumeSessionID != "" {
+ e.SessionID = e.ResumeSessionID
+ } else {
+ e.SessionID = e.ID // reuse execution UUID as session UUID (both are UUIDs)
+ }
+ }
+
+ // For new (non-resume) executions with a project_dir, clone into a sandbox.
+ // Resume executions run in the preserved sandbox (e.SandboxDir) so Claude
+ // finds its session files under the same project slug. If no sandbox was
+ // preserved (e.g. task had no project_dir), fall back to project_dir.
+ var sandboxDir string
+ var startHEAD string
+ effectiveWorkingDir := projectDir
+ if e.ResumeSessionID != "" {
+ if e.SandboxDir != "" {
+ if _, statErr := os.Stat(e.SandboxDir); statErr == nil {
+ effectiveWorkingDir = e.SandboxDir
+ } else {
+ // Preserved sandbox was cleaned up (e.g. /tmp purge after reboot).
+ // Clone a fresh sandbox so the task can run rather than fail immediately.
+ r.Logger.Warn("preserved sandbox missing, cloning fresh", "sandbox", e.SandboxDir, "project_dir", projectDir)
+ e.SandboxDir = ""
+ if projectDir != "" {
+ var err error
+ sandboxDir, err = setupSandbox(t.Agent.ProjectDir, r.Logger)
+ if err != nil {
+ return fmt.Errorf("setting up sandbox: %w", err)
+ }
+
+ effectiveWorkingDir = sandboxDir
+ r.Logger.Info("fresh sandbox created for resume", "sandbox", sandboxDir, "project_dir", projectDir)
+ }
+ }
+ }
+ } else if projectDir != "" {
+ var err error
+ sandboxDir, err = setupSandbox(t.Agent.ProjectDir, r.Logger)
+ if err != nil {
+ return fmt.Errorf("setting up sandbox: %w", err)
+ }
+
+ effectiveWorkingDir = sandboxDir
+ r.Logger.Info("sandbox created", "sandbox", sandboxDir, "project_dir", projectDir)
+ }
+
+ if effectiveWorkingDir != "" {
+ // Capture the initial HEAD so we can identify new commits later.
+ headOut, _ := exec.Command("git", gitSafe("-C", effectiveWorkingDir, "rev-parse", "HEAD")...).Output()
+ startHEAD = strings.TrimSpace(string(headOut))
+ }
+
+ questionFile := filepath.Join(logDir, "question.json")
+ args := r.buildArgs(t, e, questionFile)
+
+ attempt := 0
+ err := retry.RunWithBackoff(ctx, 3, 5*time.Second, func() error {
+ if attempt > 0 {
+ delay := 5 * time.Second * (1 << (attempt - 1))
+ r.Logger.Warn("rate-limited by Claude API, retrying",
+ "attempt", attempt,
+ "delay", delay,
+ )
+ }
+ attempt++
+ return r.execOnce(ctx, args, effectiveWorkingDir, projectDir, e)
+ })
+ if err != nil {
+ if sandboxDir != "" {
+ return fmt.Errorf("%w (sandbox preserved at %s)", err, sandboxDir)
+ }
+ return err
+ }
+
+ // Check whether the agent left a question before exiting.
+ data, readErr := os.ReadFile(questionFile)
+ if readErr == nil {
+ os.Remove(questionFile) // consumed
+ questionJSON := strings.TrimSpace(string(data))
+ // If the agent wrote a completion report instead of a real question,
+ // extract the text as the summary and fall through to normal completion.
+ if isCompletionReport(questionJSON) {
+ r.Logger.Info("treating question file as completion report", "taskID", e.TaskID)
+ e.Summary = extractQuestionText(questionJSON)
+ } else {
+ // Preserve sandbox on BLOCKED — agent may have partial work and its
+ // Claude session files are stored under the sandbox's project slug.
+ // The resume execution must run in the same directory.
+ return &BlockedError{QuestionJSON: questionJSON, SessionID: e.SessionID, SandboxDir: sandboxDir}
+ }
+ }
+
+ // Read agent summary if written.
+ summaryFile := filepath.Join(logDir, "summary.txt")
+ if summaryData, readErr := os.ReadFile(summaryFile); readErr == nil {
+ os.Remove(summaryFile) // consumed
+ e.Summary = strings.TrimSpace(string(summaryData))
+ }
+
+ // Merge sandbox back to project_dir and clean up.
+ if sandboxDir != "" {
+ if mergeErr := teardownSandbox(projectDir, sandboxDir, startHEAD, r.Logger, e); mergeErr != nil {
+ return fmt.Errorf("sandbox teardown: %w (sandbox preserved at %s)", mergeErr, sandboxDir)
+ }
+ }
+ return nil
+}
+
+// sandboxCloneSource returns the URL to clone the sandbox from. It prefers a
+// remote named "local" (a local bare repo that accepts pushes cleanly), then
+// falls back to "origin", then to the working copy path itself.
+func sandboxCloneSource(projectDir string) string {
+ for _, remote := range []string{"local", "origin"} {
+ out, err := exec.Command("git", gitSafe("-C", projectDir, "remote", "get-url", remote)...).Output()
+ if err == nil {
+ u := strings.TrimSpace(string(out))
+ if u != "" && (strings.HasPrefix(u, "/") || strings.HasPrefix(u, "file://")) {
+ return u
+ }
+ }
+ }
+ return projectDir
+}
+
+// setupSandbox prepares a temporary git clone of projectDir.
+// If projectDir is not a git repo it is initialised with an initial commit first.
+func setupSandbox(projectDir string, logger *slog.Logger) (string, error) {
+ // Ensure projectDir is a git repo; initialise if not.
+ if err := exec.Command("git", gitSafe("-C", projectDir, "rev-parse", "--git-dir")...).Run(); err != nil {
+ cmds := [][]string{
+ gitSafe("-C", projectDir, "init"),
+ gitSafe("-C", projectDir, "add", "-A"),
+ gitSafe("-C", projectDir, "commit", "--allow-empty", "-m", "chore: initial commit"),
+ }
+ for _, args := range cmds {
+ if out, err := exec.Command("git", args...).CombinedOutput(); err != nil { //nolint:gosec
+ return "", fmt.Errorf("git init %s: %w\n%s", projectDir, err, out)
+ }
+ }
+ }
+
+ src := sandboxCloneSource(projectDir)
+
+ tempDir, err := os.MkdirTemp("", "claudomator-sandbox-*")
+ if err != nil {
+ return "", fmt.Errorf("creating sandbox dir: %w", err)
+ }
+ // git clone requires the target to not exist; remove the placeholder first.
+ if err := os.Remove(tempDir); err != nil {
+ return "", fmt.Errorf("removing temp dir placeholder: %w", err)
+ }
+ out, err := exec.Command("git", gitSafe("clone", "--no-hardlinks", src, tempDir)...).CombinedOutput()
+ if err != nil {
+ return "", fmt.Errorf("git clone: %w\n%s", err, out)
+ }
+ return tempDir, nil
+}
+
+// teardownSandbox verifies the sandbox is clean and pushes new commits to the
+// canonical bare repo. If the push is rejected because another task pushed
+// concurrently, it fetches and rebases then retries once.
+//
+// The working copy (projectDir) is NOT updated automatically — it is the
+// developer's workspace and is pulled manually. This avoids permission errors
+// from mixed-owner .git/objects directories.
+func teardownSandbox(projectDir, sandboxDir, startHEAD string, logger *slog.Logger, execRecord *storage.Execution) error {
+ // Automatically commit uncommitted changes.
+ out, err := exec.Command("git", "-C", sandboxDir, "status", "--porcelain").Output()
+ if err != nil {
+ return fmt.Errorf("git status: %w", err)
+ }
+ if len(strings.TrimSpace(string(out))) > 0 {
+ logger.Info("autocommitting uncommitted changes", "sandbox", sandboxDir)
+
+ // Run build before autocommitting.
+ if _, err := os.Stat(filepath.Join(sandboxDir, "Makefile")); err == nil {
+ logger.Info("running 'make build' before autocommit", "sandbox", sandboxDir)
+ if buildOut, buildErr := exec.Command("make", "-C", sandboxDir, "build").CombinedOutput(); buildErr != nil {
+ return fmt.Errorf("build failed before autocommit: %w\n%s", buildErr, buildOut)
+ }
+ } else if _, err := os.Stat(filepath.Join(sandboxDir, "gradlew")); err == nil {
+ logger.Info("running './gradlew build' before autocommit", "sandbox", sandboxDir)
+ cmd := exec.Command("./gradlew", "build")
+ cmd.Dir = sandboxDir
+ if buildOut, buildErr := cmd.CombinedOutput(); buildErr != nil {
+ return fmt.Errorf("build failed before autocommit: %w\n%s", buildErr, buildOut)
+ }
+ } else if _, err := os.Stat(filepath.Join(sandboxDir, "go.mod")); err == nil {
+ logger.Info("running 'go build ./...' before autocommit", "sandbox", sandboxDir)
+ cmd := exec.Command("go", "build", "./...")
+ cmd.Dir = sandboxDir
+ if buildOut, buildErr := cmd.CombinedOutput(); buildErr != nil {
+ return fmt.Errorf("build failed before autocommit: %w\n%s", buildErr, buildOut)
+ }
+ }
+
+ cmds := [][]string{
+ gitSafe("-C", sandboxDir, "add", "-A"),
+ gitSafe("-C", sandboxDir, "commit", "-m", "chore: autocommit uncommitted changes"),
+ }
+ for _, args := range cmds {
+ if out, err := exec.Command("git", args...).CombinedOutput(); err != nil {
+ return fmt.Errorf("autocommit failed (%v): %w\n%s", args, err, out)
+ }
+ }
+ }
+
+ // Capture commits before pushing/deleting.
+ // Use startHEAD..HEAD to find all commits made during this execution.
+ logRange := "origin/HEAD..HEAD"
+ if startHEAD != "" && startHEAD != "HEAD" {
+ logRange = startHEAD + "..HEAD"
+ }
+
+ logCmd := exec.Command("git", gitSafe("-C", sandboxDir, "log", logRange, "--pretty=format:%H|%s")...)
+ logOut, logErr := logCmd.CombinedOutput()
+ if logErr == nil {
+ lines := strings.Split(strings.TrimSpace(string(logOut)), "\n")
+ logger.Debug("captured commits", "count", len(lines), "range", logRange)
+ for _, line := range lines {
+ if line == "" {
+ continue
+ }
+ parts := strings.SplitN(line, "|", 2)
+ if len(parts) == 2 {
+ execRecord.Commits = append(execRecord.Commits, task.GitCommit{
+ Hash: parts[0],
+ Message: parts[1],
+ })
+ }
+ }
+ } else {
+ logger.Warn("failed to capture commits", "err", logErr, "range", logRange, "output", string(logOut))
+ }
+
+ // Check whether there are any new commits to push.
+ ahead, err := exec.Command("git", gitSafe("-C", sandboxDir, "rev-list", "--count", logRange)...).Output()
+ if err != nil {
+ logger.Warn("could not determine commits ahead of origin; proceeding", "err", err, "range", logRange)
+ }
+ if strings.TrimSpace(string(ahead)) == "0" {
+ os.RemoveAll(sandboxDir)
+ return nil
+ }
+
+ // Push from sandbox → bare repo (sandbox's origin is the bare repo).
+ if out, err := exec.Command("git", "-C", sandboxDir, "push", "origin", "HEAD").CombinedOutput(); err != nil {
+ // If rejected due to concurrent push, fetch+rebase and retry once.
+ if strings.Contains(string(out), "fetch first") || strings.Contains(string(out), "non-fast-forward") {
+ logger.Info("push rejected (concurrent task); rebasing and retrying", "sandbox", sandboxDir)
+ if out2, err2 := exec.Command("git", "-C", sandboxDir, "pull", "--rebase", "origin", "master").CombinedOutput(); err2 != nil {
+ return fmt.Errorf("git rebase before retry push: %w\n%s", err2, out2)
+ }
+ // Re-capture commits after rebase (hashes might have changed)
+ execRecord.Commits = nil
+ logOut, logErr = exec.Command("git", "-C", sandboxDir, "log", logRange, "--pretty=format:%H|%s").Output()
+ if logErr == nil {
+ lines := strings.Split(strings.TrimSpace(string(logOut)), "\n")
+ for _, line := range lines {
+ parts := strings.SplitN(line, "|", 2)
+ if len(parts) == 2 {
+ execRecord.Commits = append(execRecord.Commits, task.GitCommit{
+ Hash: parts[0],
+ Message: parts[1],
+ })
+ }
+ }
+ }
+
+ if out3, err3 := exec.Command("git", "-C", sandboxDir, "push", "origin", "HEAD").CombinedOutput(); err3 != nil {
+ return fmt.Errorf("git push to origin (after rebase): %w\n%s", err3, out3)
+ }
+ } else {
+ return fmt.Errorf("git push to origin: %w\n%s", err, out)
+ }
+ }
+
+ logger.Info("sandbox pushed to bare repo", "sandbox", sandboxDir)
+ os.RemoveAll(sandboxDir)
+ return nil
+}
+
+// execOnce runs the claude subprocess once, streaming output to e's log paths.
+func (r *ClaudeRunner) execOnce(ctx context.Context, args []string, workingDir, projectDir string, e *storage.Execution) error {
+ cmd := exec.CommandContext(ctx, r.binaryPath(), args...)
+ cmd.Env = append(os.Environ(),
+ "CLAUDOMATOR_API_URL="+r.APIURL,
+ "CLAUDOMATOR_TASK_ID="+e.TaskID,
+ "CLAUDOMATOR_PROJECT_DIR="+projectDir,
+ "CLAUDOMATOR_QUESTION_FILE="+filepath.Join(e.ArtifactDir, "question.json"),
+ "CLAUDOMATOR_SUMMARY_FILE="+filepath.Join(e.ArtifactDir, "summary.txt"),
+ )
+ // Put the subprocess in its own process group so we can SIGKILL the entire
+ // group (MCP servers, bash children, etc.) on cancellation.
+ cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
+ if workingDir != "" {
+ cmd.Dir = workingDir
+ }
+
+ stdoutFile, err := os.Create(e.StdoutPath)
+ if err != nil {
+ return fmt.Errorf("creating stdout log: %w", err)
+ }
+ defer stdoutFile.Close()
+
+ stderrFile, err := os.Create(e.StderrPath)
+ if err != nil {
+ return fmt.Errorf("creating stderr log: %w", err)
+ }
+ defer stderrFile.Close()
+
+ // Use os.Pipe for stdout so we own the read-end lifetime.
+ // cmd.StdoutPipe() would add the read-end to closeAfterWait, causing
+ // cmd.Wait() to close it before our goroutine finishes reading.
+ stdoutR, stdoutW, err := os.Pipe()
+ if err != nil {
+ return fmt.Errorf("creating stdout pipe: %w", err)
+ }
+ cmd.Stdout = stdoutW // *os.File — not added to closeAfterStart/Wait
+ cmd.Stderr = stderrFile
+
+ if err := cmd.Start(); err != nil {
+ stdoutW.Close()
+ stdoutR.Close()
+ return fmt.Errorf("starting claude: %w", err)
+ }
+ // Close our write-end immediately; the subprocess holds its own copy.
+ // The goroutine below gets EOF when the subprocess exits.
+ stdoutW.Close()
+
+ // killDone is closed when cmd.Wait() returns, stopping the pgid-kill goroutine.
+ //
+ // Safety: this goroutine cannot block indefinitely. The select has two arms:
+ // • ctx.Done() — fires if the caller cancels (e.g. timeout, user cancel).
+ // The goroutine sends SIGKILL and exits immediately.
+ // • killDone — closed by close(killDone) below, immediately after cmd.Wait()
+ // returns. This fires when the process exits for any reason (natural exit,
+ // SIGKILL from the ctx arm, or any other signal). The goroutine exits without
+ // doing anything.
+ //
+ // Therefore: for a task that completes normally with a long-lived (non-cancelled)
+ // context, the killDone arm fires and the goroutine exits. There is no path where
+ // this goroutine outlives execOnce().
+ killDone := make(chan struct{})
+ go func() {
+ select {
+ case <-ctx.Done():
+ // SIGKILL the entire process group to reap orphan children.
+ syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)
+ case <-killDone:
+ }
+ }()
+
+ // Stream stdout to the log file and parse cost/errors.
+ // wg ensures costUSD and streamErr are fully written before we read them after cmd.Wait().
+ var costUSD float64
+ var streamErr error
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ costUSD, _, streamErr = parseStream(stdoutR, stdoutFile, r.Logger)
+ stdoutR.Close()
+ }()
+
+ waitErr := cmd.Wait()
+ close(killDone) // stop the pgid-kill goroutine
+ wg.Wait() // drain remaining stdout before reading costUSD/streamErr
+
+ e.CostUSD = costUSD
+
+ if waitErr != nil {
+ if exitErr, ok := waitErr.(*exec.ExitError); ok {
+ e.ExitCode = exitErr.ExitCode()
+ }
+ // If the stream captured a rate-limit or quota message, return it
+ // so callers can distinguish it from a generic exit-status failure.
+ if retry.IsRateLimitError(streamErr) || isQuotaExhausted(streamErr) {
+ return streamErr
+ }
+ if tail := tailFile(e.StderrPath, 20); tail != "" {
+ return fmt.Errorf("claude exited with error: %w\nstderr:\n%s", waitErr, tail)
+ }
+ return fmt.Errorf("claude exited with error: %w", waitErr)
+ }
+
+ e.ExitCode = 0
+ if streamErr != nil {
+ return streamErr
+ }
+ return nil
+}
+
+func (r *ClaudeRunner) buildArgs(t *task.Task, e *storage.Execution, questionFile string) []string {
+ // Resume execution: the agent already has context; just deliver the answer.
+ if e.ResumeSessionID != "" {
+ args := []string{
+ "-p", e.ResumeAnswer,
+ "--resume", e.ResumeSessionID,
+ "--output-format", "stream-json",
+ "--verbose",
+ }
+ permMode := t.Agent.PermissionMode
+ if permMode == "" {
+ permMode = "bypassPermissions"
+ }
+ args = append(args, "--permission-mode", permMode)
+ if t.Agent.Model != "" {
+ args = append(args, "--model", t.Agent.Model)
+ }
+ return args
+ }
+
+ instructions := t.Agent.Instructions
+ allowedTools := t.Agent.AllowedTools
+
+ if !t.Agent.SkipPlanning {
+ instructions = withPlanningPreamble(instructions)
+ // Ensure Bash is available so the agent can POST subtasks and ask questions.
+ hasBash := false
+ for _, tool := range allowedTools {
+ if tool == "Bash" {
+ hasBash = true
+ break
+ }
+ }
+ if !hasBash {
+ allowedTools = append(allowedTools, "Bash")
+ }
+ }
+
+ args := []string{
+ "-p", instructions,
+ "--session-id", e.SessionID,
+ "--output-format", "stream-json",
+ "--verbose",
+ }
+
+ if t.Agent.Model != "" {
+ args = append(args, "--model", t.Agent.Model)
+ }
+ if t.Agent.MaxBudgetUSD > 0 {
+ args = append(args, "--max-budget-usd", fmt.Sprintf("%.2f", t.Agent.MaxBudgetUSD))
+ }
+ // Default to bypassPermissions — claudomator runs tasks unattended, so
+ // prompting for write access would always stall execution. Tasks that need
+ // a more restrictive mode can set permission_mode explicitly.
+ permMode := t.Agent.PermissionMode
+ if permMode == "" {
+ permMode = "bypassPermissions"
+ }
+ args = append(args, "--permission-mode", permMode)
+ if t.Agent.SystemPromptAppend != "" {
+ args = append(args, "--append-system-prompt", t.Agent.SystemPromptAppend)
+ }
+ for _, tool := range allowedTools {
+ args = append(args, "--allowedTools", tool)
+ }
+ for _, tool := range t.Agent.DisallowedTools {
+ args = append(args, "--disallowedTools", tool)
+ }
+ for _, f := range t.Agent.ContextFiles {
+ args = append(args, "--add-dir", f)
+ }
+ args = append(args, t.Agent.AdditionalArgs...)
+
+ return args
+}
+
diff --git a/internal/executor/claude_test.go b/internal/executor/claude_test.go
new file mode 100644
index 0000000..c01e160
--- /dev/null
+++ b/internal/executor/claude_test.go
@@ -0,0 +1,810 @@
+package executor
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "log/slog"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "runtime"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/thepeterstone/claudomator/internal/storage"
+ "github.com/thepeterstone/claudomator/internal/task"
+)
+
+func TestClaudeRunner_BuildArgs_BasicTask(t *testing.T) {
+ r := &ClaudeRunner{}
+ tk := &task.Task{
+ Agent: task.AgentConfig{
+ Type: "claude",
+ Instructions: "fix the bug",
+ Model: "sonnet",
+ SkipPlanning: true,
+ },
+ }
+
+ args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
+
+ argMap := make(map[string]bool)
+ for _, a := range args {
+ argMap[a] = true
+ }
+ for _, want := range []string{"-p", "fix the bug", "--output-format", "stream-json", "--verbose", "--model", "sonnet"} {
+ if !argMap[want] {
+ t.Errorf("missing arg %q in %v", want, args)
+ }
+ }
+}
+
+func TestClaudeRunner_BuildArgs_FullConfig(t *testing.T) {
+ r := &ClaudeRunner{}
+ tk := &task.Task{
+ Agent: task.AgentConfig{
+ Type: "claude",
+ Instructions: "implement feature",
+ Model: "opus",
+ MaxBudgetUSD: 5.0,
+ PermissionMode: "bypassPermissions",
+ SystemPromptAppend: "Follow TDD",
+ AllowedTools: []string{"Bash", "Edit"},
+ DisallowedTools: []string{"Write"},
+ ContextFiles: []string{"/src"},
+ AdditionalArgs: []string{"--verbose"},
+ SkipPlanning: true,
+ },
+ }
+
+ args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
+
+ // Check key args are present.
+ argMap := make(map[string]bool)
+ for _, a := range args {
+ argMap[a] = true
+ }
+
+ requiredArgs := []string{
+ "-p", "implement feature", "--output-format", "stream-json",
+ "--model", "opus", "--max-budget-usd", "5.00",
+ "--permission-mode", "bypassPermissions",
+ "--append-system-prompt", "Follow TDD",
+ "--allowedTools", "Bash", "Edit",
+ "--disallowedTools", "Write",
+ "--add-dir", "/src",
+ "--verbose",
+ }
+ for _, req := range requiredArgs {
+ if !argMap[req] {
+ t.Errorf("missing arg %q in %v", req, args)
+ }
+ }
+}
+
+func TestClaudeRunner_BuildArgs_DefaultsToBypassPermissions(t *testing.T) {
+ r := &ClaudeRunner{}
+ tk := &task.Task{
+ Agent: task.AgentConfig{
+ Type: "claude",
+ Instructions: "do work",
+ SkipPlanning: true,
+ // PermissionMode intentionally not set
+ },
+ }
+
+ args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
+
+ found := false
+ for i, a := range args {
+ if a == "--permission-mode" && i+1 < len(args) && args[i+1] == "bypassPermissions" {
+ found = true
+ }
+ }
+ if !found {
+ t.Errorf("expected --permission-mode bypassPermissions when PermissionMode is empty, args: %v", args)
+ }
+}
+
+func TestClaudeRunner_BuildArgs_RespectsExplicitPermissionMode(t *testing.T) {
+ r := &ClaudeRunner{}
+ tk := &task.Task{
+ Agent: task.AgentConfig{
+ Type: "claude",
+ Instructions: "do work",
+ PermissionMode: "default",
+ SkipPlanning: true,
+ },
+ }
+
+ args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
+
+ for i, a := range args {
+ if a == "--permission-mode" && i+1 < len(args) {
+ if args[i+1] != "default" {
+ t.Errorf("expected --permission-mode default, got %q", args[i+1])
+ }
+ return
+ }
+ }
+ t.Errorf("--permission-mode flag not found in args: %v", args)
+}
+
+func TestClaudeRunner_BuildArgs_AlwaysIncludesVerbose(t *testing.T) {
+ r := &ClaudeRunner{}
+ tk := &task.Task{
+ Agent: task.AgentConfig{
+ Type: "claude",
+ Instructions: "do something",
+ SkipPlanning: true,
+ },
+ }
+
+ args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
+
+ found := false
+ for _, a := range args {
+ if a == "--verbose" {
+ found = true
+ break
+ }
+ }
+ if !found {
+ t.Errorf("--verbose missing from args: %v", args)
+ }
+}
+
+func TestClaudeRunner_BuildArgs_PreamblePrepended(t *testing.T) {
+ r := &ClaudeRunner{}
+ tk := &task.Task{
+ Agent: task.AgentConfig{
+ Type: "claude",
+ Instructions: "fix the bug",
+ SkipPlanning: false,
+ },
+ }
+
+ args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
+
+ // The -p value should start with the preamble and end with the original instructions.
+ if len(args) < 2 || args[0] != "-p" {
+ t.Fatalf("expected -p as first arg, got: %v", args)
+ }
+ if !strings.HasPrefix(args[1], "## Runtime Environment") {
+ t.Errorf("instructions should start with planning preamble, got prefix: %q", args[1][:min(len(args[1]), 20)])
+ }
+ if !strings.Contains(args[1], "$CLAUDOMATOR_PROJECT_DIR") {
+ t.Errorf("preamble should mention $CLAUDOMATOR_PROJECT_DIR")
+ }
+ if !strings.HasSuffix(args[1], "fix the bug") {
+ t.Errorf("instructions should end with original instructions")
+ }
+}
+
+func TestClaudeRunner_BuildArgs_PreambleAddsBash(t *testing.T) {
+ r := &ClaudeRunner{}
+ tk := &task.Task{
+ Agent: task.AgentConfig{
+ Type: "claude",
+ Instructions: "do work",
+ AllowedTools: []string{"Read"},
+ SkipPlanning: false,
+ },
+ }
+
+ args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
+
+ // Bash should be appended to allowed tools.
+ foundBash := false
+ for i, a := range args {
+ if a == "--allowedTools" && i+1 < len(args) && args[i+1] == "Bash" {
+ foundBash = true
+ }
+ }
+ if !foundBash {
+ t.Errorf("Bash should be added to --allowedTools when preamble is active: %v", args)
+ }
+}
+
+func TestClaudeRunner_BuildArgs_PreambleBashNotDuplicated(t *testing.T) {
+ r := &ClaudeRunner{}
+ tk := &task.Task{
+ Agent: task.AgentConfig{
+ Type: "claude",
+ Instructions: "do work",
+ AllowedTools: []string{"Bash", "Read"},
+ SkipPlanning: false,
+ },
+ }
+
+ args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
+
+ // Count Bash occurrences in --allowedTools values.
+ bashCount := 0
+ for i, a := range args {
+ if a == "--allowedTools" && i+1 < len(args) && args[i+1] == "Bash" {
+ bashCount++
+ }
+ }
+ if bashCount != 1 {
+ t.Errorf("Bash should appear exactly once in --allowedTools, got %d: %v", bashCount, args)
+ }
+}
+
+// TestClaudeRunner_Run_ResumeSetsSessionIDFromResumeSession verifies that when a
+// resume execution is itself blocked again, the stored SessionID is the original
+// resumed session, not the new execution's own UUID. Without this, a second
+// block-and-resume cycle passes the wrong --resume session ID and fails.
+func TestClaudeRunner_Run_ResumeSetsSessionIDFromResumeSession(t *testing.T) {
+ logDir := t.TempDir()
+ r := &ClaudeRunner{
+ BinaryPath: "true", // exits 0, no output
+ Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
+ LogDir: logDir,
+ }
+ tk := &task.Task{
+ Agent: task.AgentConfig{
+ Type: "claude",
+ Instructions: "continue",
+ SkipPlanning: true,
+ },
+ }
+ exec := &storage.Execution{
+ ID: "resume-exec-uuid",
+ TaskID: "task-1",
+ ResumeSessionID: "original-session-uuid",
+ ResumeAnswer: "yes",
+ }
+
+ // Run completes successfully (binary is "true").
+ _ = r.Run(context.Background(), tk, exec)
+
+ // SessionID must be the original session (ResumeSessionID), not the new
+ // exec's own ID. If it were exec.ID, a second blocked-then-resumed cycle
+ // would use the wrong --resume argument and fail.
+ if exec.SessionID != "original-session-uuid" {
+ t.Errorf("SessionID after resume Run: want %q, got %q", "original-session-uuid", exec.SessionID)
+ }
+}
+
+func TestClaudeRunner_Run_InaccessibleWorkingDir_ReturnsError(t *testing.T) {
+ r := &ClaudeRunner{
+ BinaryPath: "true", // would succeed if it ran
+ Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
+ LogDir: t.TempDir(),
+ }
+ tk := &task.Task{
+ Agent: task.AgentConfig{
+ Type: "claude",
+ ProjectDir: "/nonexistent/path/does/not/exist",
+ SkipPlanning: true,
+ },
+ }
+ exec := &storage.Execution{ID: "test-exec"}
+
+ err := r.Run(context.Background(), tk, exec)
+
+ if err == nil {
+ t.Fatal("expected error for inaccessible working_dir, got nil")
+ }
+ if !strings.Contains(err.Error(), "project_dir") {
+ t.Errorf("expected 'project_dir' in error, got: %v", err)
+ }
+}
+
+func TestClaudeRunner_BinaryPath_Default(t *testing.T) {
+ r := &ClaudeRunner{}
+ if r.binaryPath() != "claude" {
+ t.Errorf("want 'claude', got %q", r.binaryPath())
+ }
+}
+
+func TestClaudeRunner_BinaryPath_Custom(t *testing.T) {
+ r := &ClaudeRunner{BinaryPath: "/usr/local/bin/claude"}
+ if r.binaryPath() != "/usr/local/bin/claude" {
+ t.Errorf("want custom path, got %q", r.binaryPath())
+ }
+}
+
+// TestExecOnce_NoGoroutineLeak_OnNaturalExit verifies that execOnce does not
+// leave behind any goroutines when the subprocess exits normally (no context
+// cancellation). Both the pgid-kill goroutine and the parseStream goroutine
+// must have exited before execOnce returns.
+func TestExecOnce_NoGoroutineLeak_OnNaturalExit(t *testing.T) {
+ logDir := t.TempDir()
+ r := &ClaudeRunner{
+ BinaryPath: "true", // exits immediately with status 0, produces no output
+ Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
+ LogDir: logDir,
+ }
+ e := &storage.Execution{
+ ID: "goroutine-leak-test",
+ TaskID: "task-id",
+ StdoutPath: filepath.Join(logDir, "stdout.log"),
+ StderrPath: filepath.Join(logDir, "stderr.log"),
+ ArtifactDir: logDir,
+ }
+
+ // Let any goroutines from test infrastructure settle before sampling.
+ runtime.Gosched()
+ baseline := runtime.NumGoroutine()
+
+ if err := r.execOnce(context.Background(), []string{}, "", "", e); err != nil {
+ t.Fatalf("execOnce failed: %v", err)
+ }
+
+ // Give the scheduler a moment to let any leaked goroutines actually exit.
+ // In correct code the goroutines exit before execOnce returns, so this is
+ // just a safety buffer for the scheduler.
+ time.Sleep(10 * time.Millisecond)
+ runtime.Gosched()
+
+ after := runtime.NumGoroutine()
+ if after > baseline {
+ t.Errorf("goroutine leak: %d goroutines before execOnce, %d after (leaked %d)",
+ baseline, after, after-baseline)
+ }
+}
+
+// initGitRepo creates a git repo in dir with one commit so it is clonable.
+func initGitRepo(t *testing.T, dir string) {
+ t.Helper()
+ cmds := [][]string{
+ {"git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", dir, "init", "-b", "main"},
+ {"git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", dir, "config", "user.email", "test@test"},
+ {"git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", dir, "config", "user.name", "test"},
+ }
+ for _, args := range cmds {
+ if out, err := exec.Command(args[0], args[1:]...).CombinedOutput(); err != nil {
+ t.Fatalf("%v: %v\n%s", args, err, out)
+ }
+ }
+ if err := os.WriteFile(filepath.Join(dir, "init.txt"), []byte("init"), 0644); err != nil {
+ t.Fatal(err)
+ }
+ if out, err := exec.Command("git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", dir, "add", ".").CombinedOutput(); err != nil {
+ t.Fatalf("git add: %v\n%s", err, out)
+ }
+ if out, err := exec.Command("git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", dir, "commit", "-m", "init").CombinedOutput(); err != nil {
+ t.Fatalf("git commit: %v\n%s", err, out)
+ }
+}
+
+func TestSandboxCloneSource_PrefersLocalRemote(t *testing.T) {
+ dir := t.TempDir()
+ initGitRepo(t, dir)
+ // Add a "local" remote pointing to a bare repo.
+ bare := t.TempDir()
+ exec.Command("git", "init", "--bare", bare).Run()
+ exec.Command("git", "-C", dir, "remote", "add", "local", bare).Run()
+ exec.Command("git", "-C", dir, "remote", "add", "origin", "https://example.com/repo").Run()
+
+ got := sandboxCloneSource(dir)
+ if got != bare {
+ t.Errorf("expected bare repo path %q, got %q", bare, got)
+ }
+}
+
+func TestSandboxCloneSource_FallsBackToOrigin(t *testing.T) {
+ dir := t.TempDir()
+ initGitRepo(t, dir)
+ // sandboxCloneSource intentionally filters to local-FS remotes (so
+ // `git clone <src>` doesn't go over the network). Use a local path
+ // for origin to verify the fallback semantics.
+ originURL := t.TempDir()
+ exec.Command("git", "-C", dir, "remote", "add", "origin", originURL).Run()
+
+ got := sandboxCloneSource(dir)
+ if got != originURL {
+ t.Errorf("expected origin URL %q, got %q", originURL, got)
+ }
+}
+
+func TestSandboxCloneSource_FallsBackToProjectDir(t *testing.T) {
+ dir := t.TempDir()
+ initGitRepo(t, dir)
+ // No remotes configured.
+ got := sandboxCloneSource(dir)
+ if got != dir {
+ t.Errorf("expected projectDir %q (no remotes), got %q", dir, got)
+ }
+}
+
+func TestSetupSandbox_ClonesGitRepo(t *testing.T) {
+ src := t.TempDir()
+ initGitRepo(t, src)
+
+ sandbox, err := setupSandbox(src, slog.Default())
+ if err != nil {
+ t.Fatalf("setupSandbox: %v", err)
+ }
+ t.Cleanup(func() { os.RemoveAll(sandbox) })
+
+ // Force sandbox to master if it cloned as main
+ exec.Command("git", gitSafe("-C", sandbox, "checkout", "master")...).Run()
+
+ // Debug sandbox
+ logOut, _ := exec.Command("git", "-C", sandbox, "log", "-1").CombinedOutput()
+ fmt.Printf("DEBUG: sandbox log: %s\n", string(logOut))
+
+ // Verify sandbox is a git repo with at least one commit.
+ out, err := exec.Command("git", "-C", sandbox, "log", "--oneline").Output()
+ if err != nil {
+ t.Fatalf("git log in sandbox: %v", err)
+ }
+ if len(strings.TrimSpace(string(out))) == 0 {
+ t.Error("expected at least one commit in sandbox, got empty log")
+ }
+}
+
+func TestSetupSandbox_InitialisesNonGitDir(t *testing.T) {
+ // A plain directory (not a git repo) should be initialised then cloned.
+ src := t.TempDir()
+
+ sandbox, err := setupSandbox(src, slog.Default())
+ if err != nil {
+ t.Fatalf("setupSandbox on plain dir: %v", err)
+ }
+ t.Cleanup(func() { os.RemoveAll(sandbox) })
+
+ if _, err := os.Stat(filepath.Join(sandbox, ".git")); err != nil {
+ t.Errorf("sandbox should be a git repo: %v", err)
+ }
+}
+
+func TestTeardownSandbox_AutocommitsChanges(t *testing.T) {
+ // Create a bare repo as origin so push succeeds.
+ bare := t.TempDir()
+ if out, err := exec.Command("git", "init", "--bare", "-b", "main", bare).CombinedOutput(); err != nil {
+ t.Fatalf("git init bare: %v\n%s", err, out)
+ }
+
+ // Create a sandbox directly.
+ sandbox := t.TempDir()
+ initGitRepo(t, sandbox)
+ if out, err := exec.Command("git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", sandbox, "remote", "add", "origin", bare).CombinedOutput(); err != nil {
+ t.Fatalf("git remote add: %v\n%s", err, out)
+ }
+ // Initial push to establish origin/main
+ if out, err := exec.Command("git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", sandbox, "push", "origin", "main").CombinedOutput(); err != nil {
+ t.Fatalf("git push initial: %v\n%s", err, out)
+ }
+
+ // Capture startHEAD
+ headOut, err := exec.Command("git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", sandbox, "rev-parse", "HEAD").Output()
+ if err != nil {
+ t.Fatalf("rev-parse HEAD: %v", err)
+ }
+ startHEAD := strings.TrimSpace(string(headOut))
+
+ // Leave an uncommitted file in the sandbox.
+ if err := os.WriteFile(filepath.Join(sandbox, "dirty.txt"), []byte("autocommit me"), 0644); err != nil {
+ t.Fatal(err)
+ }
+
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug}))
+ execRecord := &storage.Execution{}
+
+ err = teardownSandbox("", sandbox, startHEAD, logger, execRecord)
+ if err != nil {
+ t.Fatalf("expected autocommit to succeed, got error: %v", err)
+ }
+
+ // Sandbox should be removed after successful autocommit and push.
+ if _, statErr := os.Stat(sandbox); !os.IsNotExist(statErr) {
+ t.Error("sandbox should have been removed after successful autocommit and push")
+ }
+
+ // Verify the commit exists in the bare repo.
+ out, err := exec.Command("git", "-C", bare, "log", "-1", "--pretty=%B").Output()
+ if err != nil {
+ t.Fatalf("git log in bare repo: %v", err)
+ }
+ if !strings.Contains(string(out), "chore: autocommit uncommitted changes") {
+ t.Errorf("expected autocommit message in log, got: %q", string(out))
+ }
+
+ // Verify the commit was captured in execRecord.
+ if len(execRecord.Commits) == 0 {
+ t.Error("expected at least one commit in execRecord")
+ } else if !strings.Contains(execRecord.Commits[0].Message, "chore: autocommit uncommitted changes") {
+ t.Errorf("unexpected commit message: %q", execRecord.Commits[0].Message)
+ }
+}
+
+func TestTeardownSandbox_BuildFailure_BlocksAutocommit(t *testing.T) {
+ bare := t.TempDir()
+ if out, err := exec.Command("git", "init", "--bare", "-b", "main", bare).CombinedOutput(); err != nil {
+ t.Fatalf("git init bare: %v\n%s", err, out)
+ }
+
+ sandbox := t.TempDir()
+ initGitRepo(t, sandbox)
+ if out, err := exec.Command("git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", sandbox, "remote", "add", "origin", bare).CombinedOutput(); err != nil {
+ t.Fatalf("git remote add: %v\n%s", err, out)
+ }
+
+ // Capture startHEAD
+ headOut, err := exec.Command("git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", sandbox, "rev-parse", "HEAD").Output()
+ if err != nil {
+ t.Fatalf("rev-parse HEAD: %v", err)
+ }
+ startHEAD := strings.TrimSpace(string(headOut))
+
+ // Leave an uncommitted file.
+ if err := os.WriteFile(filepath.Join(sandbox, "dirty.txt"), []byte("dirty"), 0644); err != nil {
+ t.Fatal(err)
+ }
+
+ // Add a failing Makefile.
+ makefile := "build:\n\t@echo 'build failed'\n\texit 1\n"
+ if err := os.WriteFile(filepath.Join(sandbox, "Makefile"), []byte(makefile), 0644); err != nil {
+ t.Fatal(err)
+ }
+
+ logger := slog.New(slog.NewTextHandler(io.Discard, nil))
+ execRecord := &storage.Execution{}
+
+ err = teardownSandbox("", sandbox, startHEAD, logger, execRecord)
+ if err == nil {
+ t.Error("expected teardown to fail due to build failure, but it succeeded")
+ } else if !strings.Contains(err.Error(), "build failed before autocommit") {
+ t.Errorf("expected build failure error message, got: %v", err)
+ }
+
+ // Sandbox should NOT be removed if teardown failed.
+ if _, statErr := os.Stat(sandbox); os.IsNotExist(statErr) {
+ t.Error("sandbox should have been preserved after build failure")
+ }
+
+ // Verify no new commit in bare repo.
+ out, err := exec.Command("git", "-C", bare, "log", "HEAD").CombinedOutput()
+ if strings.Contains(string(out), "chore: autocommit uncommitted changes") {
+ t.Error("autocommit should not have been pushed after build failure")
+ }
+}
+
+func TestTeardownSandbox_BuildSuccess_ProceedsToAutocommit(t *testing.T) {
+ bare := t.TempDir()
+ if out, err := exec.Command("git", "init", "--bare", "-b", "main", bare).CombinedOutput(); err != nil {
+ t.Fatalf("git init bare: %v\n%s", err, out)
+ }
+
+ sandbox := t.TempDir()
+ initGitRepo(t, sandbox)
+ if out, err := exec.Command("git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", sandbox, "remote", "add", "origin", bare).CombinedOutput(); err != nil {
+ t.Fatalf("git remote add: %v\n%s", err, out)
+ }
+
+ // Capture startHEAD
+ headOut, err := exec.Command("git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", sandbox, "rev-parse", "HEAD").Output()
+ if err != nil {
+ t.Fatalf("rev-parse HEAD: %v", err)
+ }
+ startHEAD := strings.TrimSpace(string(headOut))
+
+ // Leave an uncommitted file.
+ if err := os.WriteFile(filepath.Join(sandbox, "dirty.txt"), []byte("dirty"), 0644); err != nil {
+ t.Fatal(err)
+ }
+
+ // Add a successful Makefile.
+ makefile := "build:\n\t@echo 'build succeeded'\n"
+ if err := os.WriteFile(filepath.Join(sandbox, "Makefile"), []byte(makefile), 0644); err != nil {
+ t.Fatal(err)
+ }
+
+ logger := slog.New(slog.NewTextHandler(io.Discard, nil))
+ execRecord := &storage.Execution{}
+
+ err = teardownSandbox("", sandbox, startHEAD, logger, execRecord)
+ if err != nil {
+ t.Fatalf("expected teardown to succeed after build success, got error: %v", err)
+ }
+
+ // Sandbox should be removed after success.
+ if _, statErr := os.Stat(sandbox); !os.IsNotExist(statErr) {
+ t.Error("sandbox should have been removed after successful build and autocommit")
+ }
+
+ // Verify new commit in bare repo.
+ out, err := exec.Command("git", "-C", bare, "log", "-1", "--pretty=%B").Output()
+ if err != nil {
+ t.Fatalf("git log in bare repo: %v", err)
+ }
+ if !strings.Contains(string(out), "chore: autocommit uncommitted changes") {
+ t.Errorf("expected autocommit message in log, got: %q", string(out))
+ }
+}
+
+
+func TestTeardownSandbox_CapturesExplicitCommits(t *testing.T) {
+ bare := t.TempDir()
+ if out, err := exec.Command("git", "init", "--bare", "-b", "main", bare).CombinedOutput(); err != nil {
+ t.Fatalf("git init bare: %v\n%s", err, out)
+ }
+
+ sandbox := t.TempDir()
+ initGitRepo(t, sandbox)
+ if out, err := exec.Command("git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", sandbox, "remote", "add", "origin", bare).CombinedOutput(); err != nil {
+ t.Fatalf("git remote add: %v\n%s", err, out)
+ }
+ if out, err := exec.Command("git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", sandbox, "push", "origin", "main").CombinedOutput(); err != nil {
+ t.Fatalf("git push initial: %v\n%s", err, out)
+ }
+
+ headOut, err := exec.Command("git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", sandbox, "rev-parse", "HEAD").Output()
+ if err != nil {
+ t.Fatalf("rev-parse HEAD: %v", err)
+ }
+ startHEAD := strings.TrimSpace(string(headOut))
+
+ // Simulate Claude explicitly committing changes.
+ if err := os.WriteFile(filepath.Join(sandbox, "work.txt"), []byte("done"), 0644); err != nil {
+ t.Fatal(err)
+ }
+ for _, args := range [][]string{
+ {"-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", sandbox, "add", "-A"},
+ {"-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", sandbox, "commit", "-m", "feat: implement the feature"},
+ } {
+ if out, err := exec.Command("git", args...).CombinedOutput(); err != nil {
+ t.Fatalf("git %v: %v\n%s", args, err, out)
+ }
+ }
+
+ logger := slog.New(slog.NewTextHandler(io.Discard, nil))
+ execRecord := &storage.Execution{}
+
+ if err := teardownSandbox("", sandbox, startHEAD, logger, execRecord); err != nil {
+ t.Fatalf("teardownSandbox: %v", err)
+ }
+
+ if len(execRecord.Commits) == 0 {
+ t.Fatal("expected commits to be captured in execRecord")
+ }
+ if !strings.Contains(execRecord.Commits[0].Message, "feat: implement the feature") {
+ t.Errorf("unexpected commit message: %q", execRecord.Commits[0].Message)
+ }
+ if execRecord.Commits[0].Hash == "" {
+ t.Error("commit hash should not be empty")
+ }
+}
+
+func TestTeardownSandbox_CleanSandboxWithNoNewCommits_RemovesSandbox(t *testing.T) {
+ src := t.TempDir()
+ initGitRepo(t, src)
+ sandbox, err := setupSandbox(src, slog.Default())
+ if err != nil {
+ t.Fatalf("setupSandbox: %v", err)
+ }
+
+ logger := slog.New(slog.NewTextHandler(io.Discard, nil))
+ execRecord := &storage.Execution{}
+
+ headOut, _ := exec.Command("git", "-C", sandbox, "rev-parse", "HEAD").Output()
+ startHEAD := strings.TrimSpace(string(headOut))
+
+ // Sandbox has no new commits beyond origin; teardown should succeed and remove it.
+ if err := teardownSandbox(src, sandbox, startHEAD, logger, execRecord); err != nil {
+ t.Fatalf("teardownSandbox: %v", err)
+ }
+ if _, statErr := os.Stat(sandbox); !os.IsNotExist(statErr) {
+ t.Error("sandbox should have been removed after clean teardown")
+ os.RemoveAll(sandbox)
+ }
+}
+
+
+// TestClaudeRunner_Run_ResumeUsesStoredSandboxDir verifies that when a resume
+// execution has SandboxDir set, the runner uses that directory (not project_dir)
+// as the working directory, so Claude finds its session files there.
+func TestClaudeRunner_Run_ResumeUsesStoredSandboxDir(t *testing.T) {
+ logDir := t.TempDir()
+ sandboxDir := t.TempDir()
+ cwdFile := filepath.Join(logDir, "cwd.txt")
+
+ // Use a script that writes its working directory to a file in logDir (stable path).
+ scriptPath := filepath.Join(t.TempDir(), "fake-claude.sh")
+ script := "#!/bin/sh\nprintf '%s' \"$PWD\" > " + cwdFile + "\n"
+ if err := os.WriteFile(scriptPath, []byte(script), 0755); err != nil {
+ t.Fatalf("write script: %v", err)
+ }
+
+ r := &ClaudeRunner{
+ BinaryPath: scriptPath,
+ Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
+ LogDir: logDir,
+ }
+ tk := &task.Task{
+ Agent: task.AgentConfig{
+ Type: "claude",
+ ProjectDir: sandboxDir, // must exist; resume overrides it with SandboxDir anyway
+ SkipPlanning: true,
+ },
+ }
+ exec := &storage.Execution{
+ ID: "resume-exec-uuid",
+ TaskID: "task-1",
+ ResumeSessionID: "original-session",
+ ResumeAnswer: "yes",
+ SandboxDir: sandboxDir,
+ }
+
+ _ = r.Run(context.Background(), tk, exec)
+
+ got, err := os.ReadFile(cwdFile)
+ if err != nil {
+ t.Fatalf("cwd file not written: %v", err)
+ }
+ // The runner should have executed claude in sandboxDir, not in project_dir.
+ if string(got) != sandboxDir {
+ t.Errorf("resume working dir: want %q, got %q", sandboxDir, string(got))
+ }
+}
+
+func TestClaudeRunner_Run_StaleSandboxDir_ClonesAfresh(t *testing.T) {
+ logDir := t.TempDir()
+ projectDir := t.TempDir()
+ initGitRepo(t, projectDir)
+
+ cwdFile := filepath.Join(logDir, "cwd.txt")
+ scriptPath := filepath.Join(t.TempDir(), "fake-claude.sh")
+ script := "#!/bin/sh\nprintf '%s' \"$PWD\" > " + cwdFile + "\n"
+ if err := os.WriteFile(scriptPath, []byte(script), 0755); err != nil {
+ t.Fatalf("write script: %v", err)
+ }
+
+ r := &ClaudeRunner{
+ BinaryPath: scriptPath,
+ Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
+ LogDir: logDir,
+ }
+ tk := &task.Task{
+ Agent: task.AgentConfig{
+ Type: "claude",
+ ProjectDir: projectDir,
+ SkipPlanning: true,
+ },
+ }
+ // Point to a sandbox that no longer exists (e.g. /tmp was purged).
+ staleSandbox := filepath.Join(t.TempDir(), "gone")
+ e := &storage.Execution{
+ ID: "resume-exec-2",
+ TaskID: "task-2",
+ ResumeSessionID: "session-abc",
+ ResumeAnswer: "ok",
+ SandboxDir: staleSandbox,
+ }
+
+ if err := r.Run(context.Background(), tk, e); err != nil {
+ t.Fatalf("Run with stale sandbox: %v", err)
+ }
+
+ got, err := os.ReadFile(cwdFile)
+ if err != nil {
+ t.Fatalf("cwd file not written: %v", err)
+ }
+ // Should have run in a fresh sandbox (not the stale path, not the raw projectDir).
+ // The sandbox is removed after teardown, so we only check what it wasn't.
+ cwd := string(got)
+ if cwd == staleSandbox {
+ t.Error("ran in stale sandbox dir that doesn't exist")
+ }
+ if cwd == projectDir {
+ t.Error("ran directly in project_dir; expected a fresh sandbox clone")
+ }
+ // cwd should look like a claudomator sandbox path.
+ if !strings.Contains(cwd, "claudomator-sandbox-") {
+ t.Errorf("expected sandbox path, got %q", cwd)
+ }
+}
+
+func TestTailFile_MissingFile_ReturnsEmpty(t *testing.T) {
+ got := tailFile("/nonexistent/path/file.log", 10)
+ if got != "" {
+ t.Errorf("want empty string for missing file, got %q", got)
+ }
+}
+
diff --git a/internal/executor/container_test.go b/internal/executor/container_test.go
index f840f85..f0b2a3a 100644
--- a/internal/executor/container_test.go
+++ b/internal/executor/container_test.go
@@ -334,7 +334,7 @@ func TestDetectUncommittedChanges_CleanRepo(t *testing.T) {
func TestGitSafe_PrependsSafeDirectory(t *testing.T) {
got := gitSafe("-C", "/some/path", "status")
- want := []string{"-c", "safe.directory=*", "-C", "/some/path", "status"}
+ want := []string{"-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-c", "tag.gpgsign=false", "-C", "/some/path", "status"}
if len(got) != len(want) {
t.Fatalf("gitSafe() = %v, want %v", got, want)
}
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index 376d62c..09169bd 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -12,6 +12,8 @@ import (
"sync"
"time"
+ "github.com/thepeterstone/claudomator/internal/llm"
+ "github.com/thepeterstone/claudomator/internal/retry"
"github.com/thepeterstone/claudomator/internal/storage"
"github.com/thepeterstone/claudomator/internal/task"
"github.com/google/uuid"
@@ -75,17 +77,18 @@ type Pool struct {
mu sync.Mutex
active int
activePerAgent map[string]int
- rateLimited map[string]time.Time // agentType -> until
+ rateLimited map[string]time.Time // agentType -> until
cancels map[string]context.CancelFunc // taskID → cancel
- consecutiveFailures map[string]int // agentType -> count
- closed bool // set to true when Shutdown has been called
+ consecutiveFailures map[string]int // agentType -> count
+ closed bool // set to true when Shutdown has been called
resultCh chan *Result
- startedCh chan string // task IDs that just transitioned to RUNNING
- workCh chan workItem // internal bounded queue; Submit enqueues here
- doneCh chan struct{} // signals when a worker slot is freed
- workerWg sync.WaitGroup // tracks in-flight execute/executeResume goroutines
- dispatchDone chan struct{} // closed when the dispatch goroutine exits
+ startedCh chan string // task IDs that just transitioned to RUNNING
+ workCh chan workItem // internal bounded queue; Submit enqueues here
+ doneCh chan struct{} // signals when a worker slot is freed
+ workerWg sync.WaitGroup // tracks in-flight execute/executeResume goroutines
+ dispatchDone chan struct{} // closed when the dispatch goroutine exits
Classifier *Classifier
+ LLM *llm.Client
}
// Result is emitted when a task execution completes.
@@ -258,6 +261,28 @@ func (p *Pool) getRunner(t *task.Task) (Runner, error) {
return runner, nil
}
+// decActiveAgent decrements the active counters for a finished task. Safe to
+// call multiple times — subsequent calls are no-ops via the cleaned flag.
+// Always call this before sending on resultCh so consumers observing a result
+// see the accounting already settled (no zero-count map entries lingering).
+func (p *Pool) decActiveAgent(agentType string, cleaned *bool) {
+ if *cleaned {
+ return
+ }
+ *cleaned = true
+ p.mu.Lock()
+ p.active--
+ p.activePerAgent[agentType]--
+ if p.activePerAgent[agentType] == 0 {
+ delete(p.activePerAgent, agentType)
+ }
+ p.mu.Unlock()
+ select {
+ case p.doneCh <- struct{}{}:
+ default:
+ }
+}
+
func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Execution) {
agentType := t.Agent.Type
if agentType == "" {
@@ -268,25 +293,13 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex
p.activePerAgent[agentType]++
p.mu.Unlock()
- defer func() {
- p.mu.Lock()
- p.active--
- p.mu.Unlock()
- select {
- case p.doneCh <- struct{}{}:
- default:
- }
- }()
+ var cleaned bool
+ defer p.decActiveAgent(agentType, &cleaned)
runner, err := p.getRunner(t)
if err != nil {
p.logger.Error("failed to get runner for resume", "error", err, "taskID", t.ID)
- p.mu.Lock()
- p.activePerAgent[agentType]--
- if p.activePerAgent[agentType] == 0 {
- delete(p.activePerAgent, agentType)
- }
- p.mu.Unlock()
+ p.decActiveAgent(agentType, &cleaned)
p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err}
return
}
@@ -342,6 +355,7 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex
err = runner.Run(ctx, t, exec)
exec.EndTime = time.Now().UTC()
+ p.decActiveAgent(agentType, &cleaned)
p.handleRunResult(ctx, t, exec, err, agentType)
}
@@ -351,9 +365,9 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex
// resultCh. The caller must set exec.EndTime before calling.
func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.Execution, err error, agentType string) {
if err != nil {
- if isRateLimitError(err) || isQuotaExhausted(err) {
+ if retry.IsRateLimitError(err) || isQuotaExhausted(err) {
p.mu.Lock()
- retryAfter := parseRetryAfter(err.Error())
+ retryAfter := retry.ParseRetryAfter(err.Error())
reason := "transient"
if isQuotaExhausted(err) {
reason = "quota"
@@ -505,6 +519,9 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.
if summary == "" && exec.StdoutPath != "" {
summary = extractSummary(exec.StdoutPath)
}
+ if summary == "" && p.LLM != nil && exec.StdoutPath != "" {
+ summary = synthesizeSummary(ctx, p.LLM, exec.StdoutPath)
+ }
if summary != "" {
if summaryErr := p.store.UpdateTaskSummary(t.ID, summary); summaryErr != nil {
p.logger.Error("failed to update task summary", "taskID", t.ID, "error", summaryErr)
@@ -528,12 +545,6 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.
if updateErr := p.store.UpdateExecution(exec); updateErr != nil {
p.logger.Error("failed to update execution", "error", updateErr)
}
- p.mu.Lock()
- p.activePerAgent[agentType]--
- if p.activePerAgent[agentType] == 0 {
- delete(p.activePerAgent, agentType)
- }
- p.mu.Unlock()
p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err}
}
@@ -884,8 +895,11 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
}
p.mu.Unlock()
- // If a specific agent is already requested, skip selection and classification.
- skipClassification := t.Agent.Type == "claude" || t.Agent.Type == "gemini"
+ // If a specific agent is already requested AND we have a runner registered
+ // for it, skip selection and classification. Unknown/empty types fall
+ // through to the load balancer.
+ _, runnerKnown := p.runners[t.Agent.Type]
+ skipClassification := t.Agent.Type != "" && runnerKnown
if !skipClassification {
// Deterministically pick the agent with fewest active tasks.
@@ -915,16 +929,6 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
agentType = "claude"
}
- defer func() {
- p.mu.Lock()
- p.active--
- p.mu.Unlock()
- select {
- case p.doneCh <- struct{}{}:
- default:
- }
- }()
-
// Check dependencies before taking the per-agent slot to avoid deadlock:
// if a dependent task holds the slot while waiting for its dependency to run,
// the dependency can never start (maxPerAgent=1).
@@ -981,6 +985,9 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
p.activePerAgent[agentType]++
p.mu.Unlock()
+ var cleaned bool
+ defer p.decActiveAgent(agentType, &cleaned)
+
runner, err := p.getRunner(t)
if err != nil {
p.logger.Error("failed to get runner", "error", err, "taskID", t.ID)
@@ -999,12 +1006,7 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil {
p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateFailed, "error", err)
}
- p.mu.Lock()
- p.activePerAgent[agentType]--
- if p.activePerAgent[agentType] == 0 {
- delete(p.activePerAgent, agentType)
- }
- p.mu.Unlock()
+ p.decActiveAgent(agentType, &cleaned)
p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err}
return
}
@@ -1074,6 +1076,7 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
err = runner.Run(ctx, t, exec)
exec.EndTime = time.Now().UTC()
+ p.decActiveAgent(agentType, &cleaned)
p.handleRunResult(ctx, t, exec, err, agentType)
}
diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go
index fac7e9c..9214872 100644
--- a/internal/executor/executor_test.go
+++ b/internal/executor/executor_test.go
@@ -1101,6 +1101,7 @@ type minimalMockStore struct {
executions map[string]*storage.Execution
stateUpdates []struct{ id string; state task.State }
questionUpdates []string
+ summaryUpdates []struct{ taskID, summary string }
changestatCalls []struct {
execID string
stats *task.Changestats
@@ -1159,7 +1160,21 @@ func (m *minimalMockStore) UpdateTaskQuestion(taskID, questionJSON string) error
m.mu.Unlock()
return nil
}
-func (m *minimalMockStore) UpdateTaskSummary(taskID, summary string) error { return nil }
+func (m *minimalMockStore) UpdateTaskSummary(taskID, summary string) error {
+ m.mu.Lock()
+ m.summaryUpdates = append(m.summaryUpdates, struct{ taskID, summary string }{taskID, summary})
+ m.mu.Unlock()
+ return nil
+}
+func (m *minimalMockStore) lastSummaryUpdate() (string, string, bool) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ if len(m.summaryUpdates) == 0 {
+ return "", "", false
+ }
+ last := m.summaryUpdates[len(m.summaryUpdates)-1]
+ return last.taskID, last.summary, true
+}
func (m *minimalMockStore) AppendTaskInteraction(taskID string, _ task.Interaction) error {
return nil
}
diff --git a/internal/executor/gemini.go b/internal/executor/gemini.go
new file mode 100644
index 0000000..3abec05
--- /dev/null
+++ b/internal/executor/gemini.go
@@ -0,0 +1,346 @@
+package executor
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "log/slog"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "strings"
+ "sync"
+ "syscall"
+
+ "github.com/thepeterstone/claudomator/internal/storage"
+ "github.com/thepeterstone/claudomator/internal/task"
+)
+
+// GeminiRunner spawns the `gemini` CLI in non-interactive mode.
+type GeminiRunner struct {
+ BinaryPath string // defaults to "gemini"
+ Logger *slog.Logger
+ LogDir string // base directory for execution logs
+ APIURL string // base URL of the Claudomator API, passed to subprocesses
+}
+
+// ExecLogDir returns the log directory for the given execution ID.
+func (r *GeminiRunner) ExecLogDir(execID string) string {
+ if r.LogDir == "" {
+ return ""
+ }
+ return filepath.Join(r.LogDir, execID)
+}
+
+func (r *GeminiRunner) binaryPath() string {
+ if r.BinaryPath != "" {
+ return r.BinaryPath
+ }
+ return "gemini"
+}
+
+// Run executes the gemini CLI inside a sandboxed clone of project_dir.
+// When project_dir is set, claudomator first clones it into a temp sandbox
+// (preferring a `local` bare remote, then `origin`, then the working tree)
+// and runs the agent there. On success the sandbox is autocommitted and
+// pushed back to origin/master, then removed. On failure the sandbox is
+// preserved and its path is included in the returned error so the user can
+// inspect partial work. If the agent writes a question file before exiting,
+// Run returns *BlockedError with SandboxDir populated so a resume execution
+// can pick up in the same directory.
+func (r *GeminiRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error {
+ projectDir := t.Agent.ProjectDir
+
+ if projectDir != "" {
+ if _, err := os.Stat(projectDir); err != nil {
+ return fmt.Errorf("project_dir %q: %w", projectDir, err)
+ }
+ }
+
+ logDir := r.ExecLogDir(e.ID)
+ if logDir == "" {
+ logDir = e.ID
+ }
+ if err := os.MkdirAll(logDir, 0700); err != nil {
+ return fmt.Errorf("creating log dir: %w", err)
+ }
+
+ if e.StdoutPath == "" {
+ e.StdoutPath = filepath.Join(logDir, "stdout.log")
+ e.StderrPath = filepath.Join(logDir, "stderr.log")
+ e.ArtifactDir = logDir
+ }
+
+ if e.SessionID == "" {
+ if e.ResumeSessionID != "" {
+ e.SessionID = e.ResumeSessionID
+ } else {
+ e.SessionID = e.ID
+ }
+ }
+
+ // Sandbox setup: for new executions with a project_dir, clone into a sandbox.
+ // Resume executions reuse the preserved sandbox so any partial work survives.
+ // If the preserved sandbox is missing (e.g. /tmp was purged), clone fresh.
+ var sandboxDir string
+ var startHEAD string
+ effectiveWorkingDir := projectDir
+ if e.ResumeSessionID != "" {
+ if e.SandboxDir != "" {
+ if _, statErr := os.Stat(e.SandboxDir); statErr == nil {
+ effectiveWorkingDir = e.SandboxDir
+ } else {
+ r.Logger.Warn("preserved sandbox missing, cloning fresh", "sandbox", e.SandboxDir, "project_dir", projectDir)
+ e.SandboxDir = ""
+ if projectDir != "" {
+ var err error
+ sandboxDir, err = setupSandbox(projectDir, r.Logger)
+ if err != nil {
+ return fmt.Errorf("setting up sandbox: %w", err)
+ }
+ effectiveWorkingDir = sandboxDir
+ r.Logger.Info("fresh sandbox created for resume", "sandbox", sandboxDir, "project_dir", projectDir)
+ }
+ }
+ }
+ } else if projectDir != "" {
+ var err error
+ sandboxDir, err = setupSandbox(projectDir, r.Logger)
+ if err != nil {
+ return fmt.Errorf("setting up sandbox: %w", err)
+ }
+ effectiveWorkingDir = sandboxDir
+ r.Logger.Info("sandbox created", "sandbox", sandboxDir, "project_dir", projectDir)
+ }
+
+ if effectiveWorkingDir != "" {
+ headOut, _ := exec.Command("git", gitSafe("-C", effectiveWorkingDir, "rev-parse", "HEAD")...).Output()
+ startHEAD = strings.TrimSpace(string(headOut))
+ }
+
+ questionFile := filepath.Join(logDir, "question.json")
+ args := r.buildArgs(t, e, questionFile)
+
+ if err := r.execOnce(ctx, args, effectiveWorkingDir, projectDir, e); err != nil {
+ if sandboxDir != "" {
+ return fmt.Errorf("%w (sandbox preserved at %s)", err, sandboxDir)
+ }
+ return err
+ }
+
+ // Check whether the agent left a question before exiting.
+ data, readErr := os.ReadFile(questionFile)
+ if readErr == nil {
+ os.Remove(questionFile)
+ questionJSON := strings.TrimSpace(string(data))
+ if isCompletionReport(questionJSON) {
+ r.Logger.Info("treating question file as completion report", "taskID", e.TaskID)
+ e.Summary = extractQuestionText(questionJSON)
+ } else {
+ // Preserve sandbox on BLOCKED so a resume can pick up in the same dir.
+ return &BlockedError{QuestionJSON: questionJSON, SessionID: e.SessionID, SandboxDir: sandboxDir}
+ }
+ }
+
+ // Read agent summary if written.
+ summaryFile := filepath.Join(logDir, "summary.txt")
+ if summaryData, readErr := os.ReadFile(summaryFile); readErr == nil {
+ os.Remove(summaryFile)
+ e.Summary = strings.TrimSpace(string(summaryData))
+ }
+
+ // Merge sandbox back to project_dir and clean up.
+ if sandboxDir != "" {
+ if mergeErr := teardownSandbox(projectDir, sandboxDir, startHEAD, r.Logger, e); mergeErr != nil {
+ return fmt.Errorf("sandbox teardown: %w (sandbox preserved at %s)", mergeErr, sandboxDir)
+ }
+ }
+ return nil
+}
+
+func (r *GeminiRunner) execOnce(ctx context.Context, args []string, workingDir, projectDir string, e *storage.Execution) error {
+ cmd := exec.CommandContext(ctx, r.binaryPath(), args...)
+ cmd.Env = append(os.Environ(),
+ "CLAUDOMATOR_API_URL="+r.APIURL,
+ "CLAUDOMATOR_TASK_ID="+e.TaskID,
+ "CLAUDOMATOR_PROJECT_DIR="+projectDir,
+ "CLAUDOMATOR_QUESTION_FILE="+filepath.Join(e.ArtifactDir, "question.json"),
+ "CLAUDOMATOR_SUMMARY_FILE="+filepath.Join(e.ArtifactDir, "summary.txt"),
+ )
+ cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
+ if workingDir != "" {
+ cmd.Dir = workingDir
+ }
+
+ stdoutFile, err := os.Create(e.StdoutPath)
+ if err != nil {
+ return fmt.Errorf("creating stdout log: %w", err)
+ }
+ defer stdoutFile.Close()
+
+ stderrFile, err := os.Create(e.StderrPath)
+ if err != nil {
+ return fmt.Errorf("creating stderr log: %w", err)
+ }
+ defer stderrFile.Close()
+
+ stdoutR, stdoutW, err := os.Pipe()
+ if err != nil {
+ return fmt.Errorf("creating stdout pipe: %w", err)
+ }
+ cmd.Stdout = stdoutW
+ cmd.Stderr = stderrFile
+
+ if err := cmd.Start(); err != nil {
+ stdoutW.Close()
+ stdoutR.Close()
+ return fmt.Errorf("starting gemini: %w", err)
+ }
+ stdoutW.Close()
+
+ killDone := make(chan struct{})
+ go func() {
+ select {
+ case <-ctx.Done():
+ syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)
+ case <-killDone:
+ }
+ }()
+
+ var streamCost float64
+ var streamErr error
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ streamCost, streamErr = parseGeminiStream(stdoutR, stdoutFile, r.Logger)
+ stdoutR.Close()
+ }()
+
+ waitErr := cmd.Wait()
+ close(killDone)
+ wg.Wait()
+
+ if streamCost > 0 {
+ e.CostUSD = streamCost
+ }
+
+ if waitErr != nil {
+ if exitErr, ok := waitErr.(*exec.ExitError); ok {
+ e.ExitCode = exitErr.ExitCode()
+ }
+ if streamErr != nil {
+ return streamErr
+ }
+ if tail := tailFile(e.StderrPath, 20); tail != "" {
+ return fmt.Errorf("gemini exited with error: %w\nstderr:\n%s", waitErr, tail)
+ }
+ return fmt.Errorf("gemini exited with error: %w", waitErr)
+ }
+
+ if streamErr != nil {
+ return streamErr
+ }
+ return nil
+}
+
+// parseGeminiStream reads streaming JSON from the gemini CLI, strips markdown
+// code fences if the output is wrapped in them, writes the inner stream-json
+// to w, and returns (costUSD, error). If a `result` event has `is_error: true`,
+// an error wrapping the result message is returned.
+func parseGeminiStream(r io.Reader, w io.Writer, logger *slog.Logger) (float64, error) {
+ fullOutput, err := io.ReadAll(r)
+ if err != nil {
+ return 0, fmt.Errorf("reading full gemini output: %w", err)
+ }
+ logger.Debug("parseGeminiStream: raw output received", "output", string(fullOutput))
+
+ inner := stripGeminiFences(string(fullOutput), logger)
+ if _, writeErr := w.Write([]byte(inner)); writeErr != nil {
+ return 0, fmt.Errorf("writing gemini output: %w", writeErr)
+ }
+
+ // Walk lines looking for a result event so we can surface errors and cost.
+ var (
+ cost float64
+ errMsg string
+ isError bool
+ )
+ for _, raw := range strings.Split(inner, "\n") {
+ line := strings.TrimSpace(raw)
+ if line == "" {
+ continue
+ }
+ var evt struct {
+ Type string `json:"type"`
+ IsError bool `json:"is_error"`
+ Result string `json:"result"`
+ Cost float64 `json:"total_cost_usd"`
+ }
+ if err := json.Unmarshal([]byte(line), &evt); err != nil {
+ continue
+ }
+ if evt.Type == "result" {
+ if evt.Cost > 0 {
+ cost = evt.Cost
+ }
+ if evt.IsError {
+ isError = true
+ errMsg = evt.Result
+ }
+ }
+ }
+ if isError {
+ return cost, fmt.Errorf("gemini reported error: %s", errMsg)
+ }
+ return cost, nil
+}
+
+// stripGeminiFences removes a surrounding ```json ... ``` markdown block if
+// present, returning the trimmed inner content. If no markdown fence is
+// found, the input is returned verbatim (no whitespace trimming) so callers
+// that expect byte-exact pass-through behavior get it.
+func stripGeminiFences(raw string, logger *slog.Logger) string {
+ trimmed := strings.TrimSpace(raw)
+ if start := strings.Index(trimmed, "```json"); start != -1 {
+ if end := strings.LastIndex(trimmed, "```"); end > start {
+ return strings.TrimSpace(trimmed[start+len("```json") : end])
+ }
+ logger.Warn("malformed gemini markdown block (missing closing fence); using raw output", "len", len(trimmed))
+ return trimmed
+ }
+ return raw
+}
+
+func (r *GeminiRunner) buildArgs(t *task.Task, e *storage.Execution, questionFile string) []string {
+ // Gemini CLI uses a different command structure: gemini "instructions" [flags]
+
+ instructions := t.Agent.Instructions
+ if !t.Agent.SkipPlanning {
+ instructions = withPlanningPreamble(instructions)
+ }
+
+ args := []string{
+ "-p", instructions,
+ "--output-format", "stream-json",
+ "--yolo", // auto-approve all tools (equivalent to Claude's bypassPermissions)
+ }
+
+ // Note: Gemini CLI flags might differ from Claude CLI.
+ // Assuming common flags for now, but these may need adjustment.
+ if t.Agent.Model != "" {
+ args = append(args, "--model", t.Agent.Model)
+ }
+
+ // Gemini CLI doesn't use --session-id for the first run in the same way,
+ // or it might use it differently. For now we assume compatibility.
+ if e.SessionID != "" {
+ // If it's a resume, it might use different flags.
+ if e.ResumeSessionID != "" {
+ // This is a placeholder for Gemini's resume logic
+ }
+ }
+
+ return args
+}
diff --git a/internal/executor/gemini_test.go b/internal/executor/gemini_test.go
new file mode 100644
index 0000000..cd11ebc
--- /dev/null
+++ b/internal/executor/gemini_test.go
@@ -0,0 +1,447 @@
+package executor
+
+import (
+ "bytes"
+ "context"
+ "errors"
+ "io"
+ "log/slog"
+ "os"
+ "path/filepath"
+ "strings"
+ "testing"
+
+ "github.com/thepeterstone/claudomator/internal/storage"
+ "github.com/thepeterstone/claudomator/internal/task"
+)
+
+func TestGeminiRunner_BuildArgs_BasicTask(t *testing.T) {
+ r := &GeminiRunner{}
+ tk := &task.Task{
+ Agent: task.AgentConfig{
+ Type: "gemini",
+ Instructions: "fix the bug",
+ Model: "gemini-2.5-flash-lite",
+ SkipPlanning: true,
+ },
+ }
+
+ args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
+
+ // Gemini CLI: instructions passed via -p for non-interactive mode
+ if len(args) < 2 || args[0] != "-p" || args[1] != "fix the bug" {
+ t.Errorf("expected -p <instructions> as first args, got: %v", args)
+ }
+
+ argMap := make(map[string]bool)
+ for _, a := range args {
+ argMap[a] = true
+ }
+ for _, want := range []string{"--output-format", "stream-json", "--model", "gemini-2.5-flash-lite"} {
+ if !argMap[want] {
+ t.Errorf("missing arg %q in %v", want, args)
+ }
+ }
+}
+
+func TestGeminiRunner_BuildArgs_PreamblePrepended(t *testing.T) {
+ r := &GeminiRunner{}
+ tk := &task.Task{
+ Agent: task.AgentConfig{
+ Type: "gemini",
+ Instructions: "fix the bug",
+ SkipPlanning: false,
+ },
+ }
+
+ args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
+
+ if len(args) < 2 || args[0] != "-p" {
+ t.Fatalf("expected -p <instructions> as first args, got: %v", args)
+ }
+ if !strings.HasPrefix(args[1], planningPreamble) {
+ t.Errorf("instructions should start with planning preamble")
+ }
+ if !strings.HasSuffix(args[1], "fix the bug") {
+ t.Errorf("instructions should end with original instructions")
+ }
+}
+
+func TestGeminiRunner_BuildArgs_IncludesYolo(t *testing.T) {
+ r := &GeminiRunner{}
+ tk := &task.Task{
+ Agent: task.AgentConfig{
+ Type: "gemini",
+ Instructions: "write a doc",
+ SkipPlanning: true,
+ },
+ }
+ args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
+ argMap := make(map[string]bool)
+ for _, a := range args {
+ argMap[a] = true
+ }
+ if !argMap["--yolo"] {
+ t.Errorf("expected --yolo in gemini args (enables all tools); got: %v", args)
+ }
+}
+
+func TestGeminiRunner_BuildArgs_IncludesPromptFlag(t *testing.T) {
+ r := &GeminiRunner{}
+ tk := &task.Task{
+ Agent: task.AgentConfig{
+ Type: "gemini",
+ Instructions: "do the thing",
+ SkipPlanning: true,
+ },
+ }
+ args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
+ // Instructions must be passed via -p/--prompt for non-interactive headless mode,
+ // not as a bare positional (which starts interactive mode).
+ found := false
+ for i, a := range args {
+ if (a == "-p" || a == "--prompt") && i+1 < len(args) && args[i+1] == "do the thing" {
+ found = true
+ break
+ }
+ }
+ if !found {
+ t.Errorf("expected instructions passed via -p/--prompt flag; got: %v", args)
+ }
+}
+
+func TestGeminiRunner_Run_InaccessibleProjectDir_ReturnsError(t *testing.T) {
+ r := &GeminiRunner{
+ BinaryPath: "true", // would succeed if it ran
+ Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
+ LogDir: t.TempDir(),
+ }
+ tk := &task.Task{
+ Agent: task.AgentConfig{
+ Type: "gemini",
+ ProjectDir: "/nonexistent/path/does/not/exist",
+ SkipPlanning: true,
+ },
+ }
+ exec := &storage.Execution{ID: "test-exec"}
+
+ err := r.Run(context.Background(), tk, exec)
+
+ if err == nil {
+ t.Fatal("expected error for inaccessible project_dir, got nil")
+ }
+ if !strings.Contains(err.Error(), "project_dir") {
+ t.Errorf("expected 'project_dir' in error, got: %v", err)
+ }
+}
+
+func TestGeminiRunner_BinaryPath_Default(t *testing.T) {
+ r := &GeminiRunner{}
+ if r.binaryPath() != "gemini" {
+ t.Errorf("want 'gemini', got %q", r.binaryPath())
+ }
+}
+
+func TestGeminiRunner_BinaryPath_Custom(t *testing.T) {
+ r := &GeminiRunner{BinaryPath: "/usr/local/bin/gemini"}
+ if r.binaryPath() != "/usr/local/bin/gemini" {
+ t.Errorf("want custom path, got %q", r.binaryPath())
+ }
+}
+
+
+func TestParseGeminiStream_ParsesStructuredOutput(t *testing.T) {
+ // Simulate a stream-json input with various message types, including a result with error and cost.
+ input := streamLine(`{"type":"content_block_start","content_block":{"text":"Hello,"}}`) +
+ streamLine(`{"type":"content_block_delta","content_block":{"text":" World!"}}`) +
+ streamLine(`{"type":"content_block_end"}`) +
+ streamLine(`{"type":"result","subtype":"error_during_execution","is_error":true,"result":"something went wrong","total_cost_usd":0.123}`)
+
+ reader := strings.NewReader(input)
+ var writer bytes.Buffer // To capture what's written to the output log
+ logger := slog.New(slog.NewTextHandler(io.Discard, nil))
+
+ cost, err := parseGeminiStream(reader, &writer, logger)
+
+ if err == nil {
+ t.Errorf("expected an error, got nil")
+ }
+ if !strings.Contains(err.Error(), "something went wrong") {
+ t.Errorf("expected error message to contain 'something went wrong', got: %v", err)
+ }
+
+ if cost != 0.123 {
+ t.Errorf("expected cost 0.123, got %f", cost)
+ }
+
+ // Verify that the writer received the content (even if parseGeminiStream isn't fully parsing it yet)
+ expectedWriterContent := input
+ if writer.String() != expectedWriterContent {
+ t.Errorf("writer content mismatch:\nwant:\n%s\ngot:\n%s", expectedWriterContent, writer.String())
+ }
+}
+
+// TestGeminiRunner_Run_ProjectDir_RunsInSandbox verifies that when project_dir
+// is set, the gemini subprocess runs inside a sandbox clone — not in
+// project_dir itself.
+func TestGeminiRunner_Run_ProjectDir_RunsInSandbox(t *testing.T) {
+ projectDir := t.TempDir()
+ initGitRepo(t, projectDir)
+
+ logDir := t.TempDir()
+ cwdFile := filepath.Join(logDir, "gemini-cwd.txt")
+
+ // Fake gemini binary that records its $PWD then exits 0.
+ scriptPath := filepath.Join(t.TempDir(), "fake-gemini.sh")
+ script := "#!/bin/sh\nprintf '%s' \"$PWD\" > " + cwdFile + "\n"
+ if err := os.WriteFile(scriptPath, []byte(script), 0755); err != nil {
+ t.Fatalf("write script: %v", err)
+ }
+
+ r := &GeminiRunner{
+ BinaryPath: scriptPath,
+ Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
+ LogDir: logDir,
+ }
+ tk := &task.Task{
+ Agent: task.AgentConfig{
+ Type: "gemini",
+ Instructions: "do work",
+ ProjectDir: projectDir,
+ SkipPlanning: true,
+ },
+ }
+ e := &storage.Execution{ID: "sandbox-exec", TaskID: "task-1"}
+
+ if err := r.Run(context.Background(), tk, e); err != nil {
+ t.Fatalf("Run: %v", err)
+ }
+
+ got, err := os.ReadFile(cwdFile)
+ if err != nil {
+ t.Fatalf("cwd file not written: %v", err)
+ }
+ cwd := string(got)
+ if cwd == projectDir {
+ t.Errorf("ran directly in project_dir; expected sandbox clone (cwd=%q)", cwd)
+ }
+ // Sandbox should be removed after successful teardown (no edits → nothing to push).
+ // We can't assert the exact dir, but it should not be projectDir.
+}
+
+// TestGeminiRunner_Run_BlockedError_IncludesSandboxDir verifies that when the
+// agent writes a question file before exiting, the BlockedError carries the
+// sandbox path so resume runs in the same dir.
+func TestGeminiRunner_Run_BlockedError_IncludesSandboxDir(t *testing.T) {
+ src := t.TempDir()
+ initGitRepo(t, src)
+ logDir := t.TempDir()
+
+ scriptPath := filepath.Join(t.TempDir(), "fake-gemini.sh")
+ if err := os.WriteFile(scriptPath, []byte(`#!/bin/sh
+if [ -n "$CLAUDOMATOR_QUESTION_FILE" ]; then
+ printf '{"text":"Should I continue?"}' > "$CLAUDOMATOR_QUESTION_FILE"
+fi
+`), 0755); err != nil {
+ t.Fatalf("write script: %v", err)
+ }
+
+ r := &GeminiRunner{
+ BinaryPath: scriptPath,
+ Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
+ LogDir: logDir,
+ }
+ tk := &task.Task{
+ Agent: task.AgentConfig{
+ Type: "gemini",
+ Instructions: "do something",
+ ProjectDir: src,
+ SkipPlanning: true,
+ },
+ }
+ e := &storage.Execution{ID: "blocked-gemini-exec", TaskID: "task-1"}
+
+ err := r.Run(context.Background(), tk, e)
+
+ var blocked *BlockedError
+ if !errors.As(err, &blocked) {
+ t.Fatalf("expected BlockedError, got: %v", err)
+ }
+ if blocked.SandboxDir == "" {
+ t.Error("BlockedError.SandboxDir should be set when gemini task runs in a sandbox")
+ }
+ if _, statErr := os.Stat(blocked.SandboxDir); os.IsNotExist(statErr) {
+ t.Error("sandbox directory should be preserved when blocked")
+ } else {
+ os.RemoveAll(blocked.SandboxDir)
+ }
+}
+
+// TestGeminiRunner_Run_ExecError_PreservesSandbox verifies that when gemini
+// exits non-zero, the sandbox path is included in the wrapped error so the
+// user can inspect partial work.
+func TestGeminiRunner_Run_ExecError_PreservesSandbox(t *testing.T) {
+ src := t.TempDir()
+ initGitRepo(t, src)
+ logDir := t.TempDir()
+
+ // "false" exits 1, no output.
+ r := &GeminiRunner{
+ BinaryPath: "false",
+ Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
+ LogDir: logDir,
+ }
+ tk := &task.Task{
+ Agent: task.AgentConfig{
+ Type: "gemini",
+ Instructions: "do something",
+ ProjectDir: src,
+ SkipPlanning: true,
+ },
+ }
+ e := &storage.Execution{ID: "err-gemini-exec", TaskID: "task-1"}
+
+ err := r.Run(context.Background(), tk, e)
+ if err == nil {
+ t.Fatal("expected error from failing gemini exit")
+ }
+ if !strings.Contains(err.Error(), "sandbox preserved at ") {
+ t.Errorf("expected error to include sandbox path; got: %v", err)
+ }
+ // Extract path and verify it exists.
+ idx := strings.Index(err.Error(), "sandbox preserved at ")
+ rest := err.Error()[idx+len("sandbox preserved at "):]
+ rest = strings.TrimSuffix(rest, ")")
+ rest = strings.TrimSpace(rest)
+ if _, statErr := os.Stat(rest); os.IsNotExist(statErr) {
+ t.Errorf("sandbox path from error should exist on disk: %q", rest)
+ } else {
+ os.RemoveAll(rest)
+ }
+}
+
+// TestGeminiRunner_Run_ResumeUsesStoredSandboxDir verifies that a resume
+// execution runs in the preserved SandboxDir rather than cloning fresh.
+func TestGeminiRunner_Run_ResumeUsesStoredSandboxDir(t *testing.T) {
+ logDir := t.TempDir()
+ sandboxDir := t.TempDir()
+ initGitRepo(t, sandboxDir)
+ cwdFile := filepath.Join(logDir, "cwd.txt")
+
+ scriptPath := filepath.Join(t.TempDir(), "fake-gemini.sh")
+ script := "#!/bin/sh\nprintf '%s' \"$PWD\" > " + cwdFile + "\n"
+ if err := os.WriteFile(scriptPath, []byte(script), 0755); err != nil {
+ t.Fatalf("write script: %v", err)
+ }
+
+ r := &GeminiRunner{
+ BinaryPath: scriptPath,
+ Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
+ LogDir: logDir,
+ }
+ tk := &task.Task{
+ Agent: task.AgentConfig{
+ Type: "gemini",
+ SkipPlanning: true,
+ },
+ }
+ e := &storage.Execution{
+ ID: "resume-gemini-1",
+ TaskID: "task-resume",
+ ResumeSessionID: "session-abc",
+ SandboxDir: sandboxDir,
+ }
+
+ if err := r.Run(context.Background(), tk, e); err != nil {
+ t.Fatalf("Run with preserved sandbox: %v", err)
+ }
+
+ got, err := os.ReadFile(cwdFile)
+ if err != nil {
+ t.Fatalf("cwd file not written: %v", err)
+ }
+ if string(got) != sandboxDir {
+ t.Errorf("resume should run in preserved sandbox; got cwd=%q want %q", got, sandboxDir)
+ }
+}
+
+// TestGeminiRunner_Run_StaleSandboxDir_ClonesAfresh verifies that a resume
+// pointing at a missing sandbox falls back to cloning a fresh sandbox from
+// project_dir rather than failing outright.
+func TestGeminiRunner_Run_StaleSandboxDir_ClonesAfresh(t *testing.T) {
+ logDir := t.TempDir()
+ projectDir := t.TempDir()
+ initGitRepo(t, projectDir)
+
+ cwdFile := filepath.Join(logDir, "cwd.txt")
+ scriptPath := filepath.Join(t.TempDir(), "fake-gemini.sh")
+ script := "#!/bin/sh\nprintf '%s' \"$PWD\" > " + cwdFile + "\n"
+ if err := os.WriteFile(scriptPath, []byte(script), 0755); err != nil {
+ t.Fatalf("write script: %v", err)
+ }
+
+ r := &GeminiRunner{
+ BinaryPath: scriptPath,
+ Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
+ LogDir: logDir,
+ }
+ tk := &task.Task{
+ Agent: task.AgentConfig{
+ Type: "gemini",
+ ProjectDir: projectDir,
+ SkipPlanning: true,
+ },
+ }
+ staleSandbox := filepath.Join(t.TempDir(), "gone")
+ e := &storage.Execution{
+ ID: "resume-gemini-2",
+ TaskID: "task-stale",
+ ResumeSessionID: "session-xyz",
+ SandboxDir: staleSandbox,
+ }
+
+ if err := r.Run(context.Background(), tk, e); err != nil {
+ t.Fatalf("Run with stale sandbox: %v", err)
+ }
+
+ got, err := os.ReadFile(cwdFile)
+ if err != nil {
+ t.Fatalf("cwd file not written: %v", err)
+ }
+ cwd := string(got)
+ if cwd == staleSandbox {
+ t.Error("ran in stale (nonexistent) sandbox dir")
+ }
+ if cwd == projectDir {
+ t.Error("ran directly in project_dir; expected a fresh sandbox clone")
+ }
+}
+
+// TestGeminiRunner_Run_NoProjectDir_SkipsSandbox verifies that a task with no
+// project_dir doesn't trigger sandbox setup (matches LocalRunner/non-coding
+// task semantics).
+func TestGeminiRunner_Run_NoProjectDir_SkipsSandbox(t *testing.T) {
+ logDir := t.TempDir()
+
+ r := &GeminiRunner{
+ BinaryPath: "true", // exits 0, no output
+ Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
+ LogDir: logDir,
+ }
+ tk := &task.Task{
+ Agent: task.AgentConfig{
+ Type: "gemini",
+ Instructions: "summarize: 2+2",
+ SkipPlanning: true,
+ // No ProjectDir
+ },
+ }
+ e := &storage.Execution{ID: "no-pd-gemini", TaskID: "task-nopd"}
+
+ if err := r.Run(context.Background(), tk, e); err != nil {
+ t.Fatalf("Run without project_dir: %v", err)
+ }
+ if e.SandboxDir != "" {
+ t.Errorf("SandboxDir should be empty for tasks without project_dir, got %q", e.SandboxDir)
+ }
+}
diff --git a/internal/executor/helpers.go b/internal/executor/helpers.go
index bd5d9d5..76bf8b1 100644
--- a/internal/executor/helpers.go
+++ b/internal/executor/helpers.go
@@ -171,7 +171,11 @@ func readFileTail(path string, maxBytes int64) string {
}
func gitSafe(args ...string) []string {
- return append([]string{"-c", "safe.directory=*"}, args...)
+ return append([]string{
+ "-c", "safe.directory=*",
+ "-c", "commit.gpgsign=false",
+ "-c", "tag.gpgsign=false",
+ }, args...)
}
// isCompletionReport returns true when a question-file JSON looks like a
diff --git a/internal/executor/local.go b/internal/executor/local.go
new file mode 100644
index 0000000..5d874c6
--- /dev/null
+++ b/internal/executor/local.go
@@ -0,0 +1,171 @@
+package executor
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "log/slog"
+ "os"
+ "path/filepath"
+ "strings"
+ "time"
+
+ "github.com/thepeterstone/claudomator/internal/llm"
+ "github.com/thepeterstone/claudomator/internal/storage"
+ "github.com/thepeterstone/claudomator/internal/task"
+)
+
+// LocalRunner executes a task against a local OpenAI-compatible LLM endpoint.
+// Unlike ClaudeRunner/GeminiRunner it does not spawn a subprocess, does not
+// create a git sandbox, and does not edit files in project_dir — it produces
+// text completions that are streamed to stdout.log in the same stream-json
+// envelope Claude uses, so existing parsers (extractSummary, ParseChangestat)
+// keep working unchanged.
+type LocalRunner struct {
+ Client *llm.Client
+ Logger *slog.Logger
+ LogDir string
+ DefaultTemperature float64
+}
+
+// ExecLogDir implements LogPather so the pool can persist log paths before
+// execution starts.
+func (r *LocalRunner) ExecLogDir(execID string) string {
+ if r.LogDir == "" {
+ return ""
+ }
+ return filepath.Join(r.LogDir, execID)
+}
+
+// Run streams a chat completion to stdout.log. The response is wrapped in
+// stream-json envelopes line-by-line so downstream parsers (summary,
+// changestats) read it the same way they read Claude output.
+func (r *LocalRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error {
+ if r.Client == nil {
+ return fmt.Errorf("local runner: no LLM client configured")
+ }
+ if t.Agent.Instructions == "" {
+ return fmt.Errorf("local runner: empty instructions")
+ }
+
+ logDir := r.ExecLogDir(e.ID)
+ if logDir == "" {
+ return fmt.Errorf("local runner: LogDir not set")
+ }
+ if err := os.MkdirAll(logDir, 0o700); err != nil {
+ return fmt.Errorf("local runner: mkdir log: %w", err)
+ }
+ stdoutPath := filepath.Join(logDir, "stdout.log")
+ stderrPath := filepath.Join(logDir, "stderr.log")
+ e.StdoutPath = stdoutPath
+ e.StderrPath = stderrPath
+
+ stdout, err := os.Create(stdoutPath)
+ if err != nil {
+ return fmt.Errorf("local runner: create stdout: %w", err)
+ }
+ defer stdout.Close()
+
+ messages := []llm.Message{}
+ if sys := strings.TrimSpace(t.Agent.SystemPromptAppend); sys != "" {
+ messages = append(messages, llm.Message{Role: "system", Content: sys})
+ }
+ messages = append(messages, llm.Message{Role: "user", Content: t.Agent.Instructions})
+
+ temperature := t.Agent.Temperature
+ if temperature == nil && r.DefaultTemperature > 0 {
+ v := r.DefaultTemperature
+ temperature = &v
+ }
+
+ req := llm.ChatRequest{
+ Model: t.Agent.Model,
+ Messages: messages,
+ Temperature: temperature,
+ MaxTokens: t.Agent.MaxTokens,
+ }
+
+ start := time.Now()
+ resp, err := r.Client.ChatStream(ctx, req, func(delta string) {
+ if delta == "" {
+ return
+ }
+ writeAssistantTextLine(stdout, delta)
+ })
+ if err != nil {
+ writeResultLine(stdout, "error", err.Error(), 0, 0)
+ return fmt.Errorf("local runner: chat: %w", err)
+ }
+ elapsed := time.Since(start)
+
+ // Write one consolidated assistant envelope containing the full response.
+ // extractSummary and ParseChangestatFromOutput operate per-line, so a
+ // single envelope with the full text is what they expect to find.
+ if resp.Content != "" {
+ writeAssistantTextLine(stdout, resp.Content)
+ }
+ writeResultLine(stdout, "success", "", resp.PromptTokens, resp.OutputTokens)
+
+ e.CostUSD = 0
+ e.TokensIn = int64(resp.PromptTokens)
+ e.TokensOut = int64(resp.OutputTokens)
+
+ if r.Logger != nil {
+ r.Logger.Info("local runner completed",
+ "taskID", t.ID,
+ "model", resp.Model,
+ "tokens_in", resp.PromptTokens,
+ "tokens_out", resp.OutputTokens,
+ "finish_reason", resp.FinishReason,
+ "elapsed_ms", elapsed.Milliseconds(),
+ )
+ }
+ return nil
+}
+
+// writeAssistantTextLine writes a single stream-json line wrapping `text` as
+// an assistant text block. Format matches what ClaudeRunner emits, so
+// extractSummary and ParseChangestatFromFile read it transparently.
+func writeAssistantTextLine(w *os.File, text string) {
+ line := struct {
+ Type string `json:"type"`
+ Message struct {
+ Content []struct {
+ Type string `json:"type"`
+ Text string `json:"text"`
+ } `json:"content"`
+ } `json:"message"`
+ }{Type: "assistant"}
+ line.Message.Content = []struct {
+ Type string `json:"type"`
+ Text string `json:"text"`
+ }{{Type: "text", Text: text}}
+ b, err := json.Marshal(line)
+ if err != nil {
+ return
+ }
+ w.Write(b)
+ w.Write([]byte("\n"))
+}
+
+// writeResultLine writes a final stream-json terminator line that downstream
+// parsers can recognise. Mirrors the shape of the result line ClaudeRunner emits.
+func writeResultLine(w *os.File, subtype, errMsg string, promptTokens, outputTokens int) {
+ line := map[string]any{
+ "type": "result",
+ "subtype": subtype,
+ "is_error": errMsg != "",
+ "prompt_tokens": promptTokens,
+ "output_tokens": outputTokens,
+ "total_cost_usd": 0.0,
+ }
+ if errMsg != "" {
+ line["result"] = errMsg
+ }
+ b, err := json.Marshal(line)
+ if err != nil {
+ return
+ }
+ w.Write(b)
+ w.Write([]byte("\n"))
+}
diff --git a/internal/executor/local_test.go b/internal/executor/local_test.go
new file mode 100644
index 0000000..d8ab678
--- /dev/null
+++ b/internal/executor/local_test.go
@@ -0,0 +1,152 @@
+package executor
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "log/slog"
+ "net/http"
+ "net/http/httptest"
+ "os"
+ "path/filepath"
+ "strings"
+ "testing"
+
+ "github.com/google/uuid"
+ "github.com/thepeterstone/claudomator/internal/llm"
+ "github.com/thepeterstone/claudomator/internal/storage"
+ "github.com/thepeterstone/claudomator/internal/task"
+)
+
+// fakeOpenAIServer returns an httptest.Server that replies with a streaming
+// chat completion containing the supplied content (split into chunks) plus a
+// usage record.
+func fakeOpenAIServer(t *testing.T, chunks []string, promptTok, outTok int) *httptest.Server {
+ t.Helper()
+ return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Content-Type", "text/event-stream")
+ flusher, _ := w.(http.Flusher)
+ for _, c := range chunks {
+ payload := map[string]any{
+ "model": "fake",
+ "choices": []map[string]any{{"delta": map[string]string{"content": c}}},
+ }
+ b, _ := json.Marshal(payload)
+ fmt.Fprintf(w, "data: %s\n\n", b)
+ if flusher != nil {
+ flusher.Flush()
+ }
+ }
+ final := map[string]any{
+ "model": "fake",
+ "choices": []map[string]any{{"delta": map[string]string{}, "finish_reason": "stop"}},
+ "usage": map[string]int{"prompt_tokens": promptTok, "completion_tokens": outTok},
+ }
+ fb, _ := json.Marshal(final)
+ fmt.Fprintf(w, "data: %s\n\ndata: [DONE]\n\n", fb)
+ }))
+}
+
+func TestLocalRunner_Run_WritesStreamJSON(t *testing.T) {
+ srv := fakeOpenAIServer(t,
+ []string{"## Summary\n", "All ", "good."},
+ 11, 22,
+ )
+ defer srv.Close()
+
+ logRoot := t.TempDir()
+ r := &LocalRunner{
+ Client: &llm.Client{Endpoint: srv.URL + "/v1", Model: "fake"},
+ Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
+ LogDir: logRoot,
+ }
+ tt := &task.Task{
+ ID: "task-1",
+ Name: "test",
+ Agent: task.AgentConfig{
+ Type: "local",
+ Model: "fake",
+ Instructions: "Do a thing.",
+ },
+ }
+ exec := &storage.Execution{ID: uuid.New().String(), TaskID: tt.ID}
+
+ if err := r.Run(context.Background(), tt, exec); err != nil {
+ t.Fatalf("Run: %v", err)
+ }
+
+ if exec.CostUSD != 0 {
+ t.Errorf("CostUSD should be 0 for local runner, got %v", exec.CostUSD)
+ }
+ if exec.TokensIn != 11 || exec.TokensOut != 22 {
+ t.Errorf("tokens: want 11/22 got %d/%d", exec.TokensIn, exec.TokensOut)
+ }
+
+ // Verify stdout.log contains stream-json envelopes that extractSummary can parse.
+ stdoutPath := filepath.Join(r.ExecLogDir(exec.ID), "stdout.log")
+ data, err := os.ReadFile(stdoutPath)
+ if err != nil {
+ t.Fatalf("read stdout: %v", err)
+ }
+ lines := strings.Split(strings.TrimSpace(string(data)), "\n")
+ if len(lines) < 4 {
+ t.Fatalf("expected at least 4 lines (3 deltas + 1 result), got %d:\n%s", len(lines), data)
+ }
+ for i, line := range lines[:3] {
+ var env struct {
+ Type string `json:"type"`
+ Message struct {
+ Content []struct {
+ Type string `json:"type"`
+ Text string `json:"text"`
+ }
+ }
+ }
+ if err := json.Unmarshal([]byte(line), &env); err != nil {
+ t.Fatalf("line %d not JSON: %v: %s", i, err, line)
+ }
+ if env.Type != "assistant" {
+ t.Errorf("line %d: want type=assistant, got %q", i, env.Type)
+ }
+ }
+
+ summary := extractSummary(stdoutPath)
+ if !strings.Contains(summary, "All good.") {
+ t.Errorf("extractSummary should find 'All good.', got %q", summary)
+ }
+}
+
+func TestLocalRunner_Run_NoClient_Errors(t *testing.T) {
+ r := &LocalRunner{LogDir: t.TempDir()}
+ tt := &task.Task{ID: "x", Agent: task.AgentConfig{Instructions: "hi"}}
+ exec := &storage.Execution{ID: "exec-x"}
+ err := r.Run(context.Background(), tt, exec)
+ if err == nil || !strings.Contains(err.Error(), "no LLM client") {
+ t.Errorf("expected 'no LLM client' error, got %v", err)
+ }
+}
+
+func TestLocalRunner_Run_EmptyInstructions_Errors(t *testing.T) {
+ r := &LocalRunner{
+ Client: &llm.Client{Endpoint: "http://unused", Model: "x"},
+ LogDir: t.TempDir(),
+ }
+ tt := &task.Task{ID: "x", Agent: task.AgentConfig{}}
+ exec := &storage.Execution{ID: "exec-x"}
+ err := r.Run(context.Background(), tt, exec)
+ if err == nil || !strings.Contains(err.Error(), "empty instructions") {
+ t.Errorf("expected empty-instructions error, got %v", err)
+ }
+}
+
+func TestLocalRunner_ExecLogDir(t *testing.T) {
+ r := &LocalRunner{LogDir: "/tmp/logs"}
+ if got := r.ExecLogDir("abc"); got != "/tmp/logs/abc" {
+ t.Errorf("ExecLogDir: got %q", got)
+ }
+ r.LogDir = ""
+ if got := r.ExecLogDir("abc"); got != "" {
+ t.Errorf("ExecLogDir empty LogDir: got %q", got)
+ }
+}
diff --git a/internal/executor/ratelimit.go b/internal/executor/ratelimit.go
index c916291..ee9a336 100644
--- a/internal/executor/ratelimit.go
+++ b/internal/executor/ratelimit.go
@@ -1,33 +1,9 @@
package executor
-import (
- "context"
- "fmt"
- "regexp"
- "strconv"
- "strings"
- "time"
-)
+import "strings"
-var retryAfterRe = regexp.MustCompile(`(?i)retry[-_ ]after[:\s]+(\d+)`)
-
-const maxBackoffDelay = 5 * time.Minute
-
-// isRateLimitError returns true if err looks like a transient Claude API
-// rate-limit that is worth retrying (e.g. per-minute/per-request throttle).
-func isRateLimitError(err error) bool {
- if err == nil {
- return false
- }
- msg := strings.ToLower(err.Error())
- return strings.Contains(msg, "rate limit") ||
- strings.Contains(msg, "too many requests") ||
- strings.Contains(msg, "429") ||
- strings.Contains(msg, "overloaded")
-}
-
-// isQuotaExhausted returns true if err indicates the 5-hour usage quota is
-// fully exhausted. Unlike transient rate limits, these should not be retried.
+// isQuotaExhausted returns true if err indicates the 5-hour Claude usage quota
+// is fully exhausted. Unlike transient rate limits, these should not be retried.
func isQuotaExhausted(err error) bool {
if err == nil {
return false
@@ -43,53 +19,3 @@ func isQuotaExhausted(err error) bool {
strings.Contains(msg, "exhausted your daily quota") ||
strings.Contains(msg, "generate_content_free_tier_requests")
}
-
-// parseRetryAfter extracts a Retry-After duration from an error message.
-// Returns 0 if no retry-after value is found.
-func parseRetryAfter(msg string) time.Duration {
- m := retryAfterRe.FindStringSubmatch(msg)
- if m == nil {
- return 0
- }
- secs, err := strconv.Atoi(m[1])
- if err != nil || secs <= 0 {
- return 0
- }
- return time.Duration(secs) * time.Second
-}
-
-// runWithBackoff calls fn repeatedly on rate-limit errors, using exponential backoff.
-// maxRetries is the max number of retry attempts (not counting the initial call).
-// baseDelay is the initial backoff duration (doubled each retry).
-func runWithBackoff(ctx context.Context, maxRetries int, baseDelay time.Duration, fn func() error) error {
- var lastErr error
- for attempt := 0; attempt <= maxRetries; attempt++ {
- lastErr = fn()
- if lastErr == nil {
- return nil
- }
- if !isRateLimitError(lastErr) {
- return lastErr
- }
- if attempt == maxRetries {
- break
- }
-
- // Compute exponential backoff delay.
- delay := baseDelay * (1 << attempt)
- if delay > maxBackoffDelay {
- delay = maxBackoffDelay
- }
- // Use Retry-After header value if present.
- if ra := parseRetryAfter(lastErr.Error()); ra > 0 {
- delay = ra
- }
-
- select {
- case <-ctx.Done():
- return fmt.Errorf("context cancelled during rate-limit backoff: %w", ctx.Err())
- case <-time.After(delay):
- }
- }
- return lastErr
-}
diff --git a/internal/executor/summary.go b/internal/executor/summary.go
index a942de0..bcf5cfd 100644
--- a/internal/executor/summary.go
+++ b/internal/executor/summary.go
@@ -2,11 +2,26 @@ package executor
import (
"bufio"
+ "context"
"encoding/json"
+ "io"
"os"
"strings"
+ "time"
+
+ "github.com/thepeterstone/claudomator/internal/llm"
)
+// synthesizeSummaryMaxBytes caps how much of the stdout log we send to the
+// LLM. Larger values cost more tokens with diminishing returns for a 2-4
+// sentence summary.
+const synthesizeSummaryMaxBytes = 16 * 1024
+
+// synthesizeSummaryTimeout caps the LLM call so a slow local model can't
+// stall executor finalization. On timeout, we return "" (the existing
+// no-summary path takes over).
+const synthesizeSummaryTimeout = 6 * time.Second
+
// extractSummary reads a stream-json stdout log and returns the text following
// the last "## Summary" heading found in any assistant text block.
// Returns empty string if the file cannot be read or no summary is found.
@@ -28,6 +43,86 @@ func extractSummary(stdoutPath string) string {
return last
}
+// synthesizeSummary asks the LLM to summarize the assistant text content in
+// stdoutPath when no "## Summary" heading was present. Returns "" on any
+// error, an empty file, or an empty model response — preserving the
+// existing "no summary" behavior so the new path is purely additive.
+func synthesizeSummary(parent context.Context, c *llm.Client, stdoutPath string) string {
+ if c == nil || stdoutPath == "" {
+ return ""
+ }
+ text := readAssistantTextTail(stdoutPath, synthesizeSummaryMaxBytes)
+ if strings.TrimSpace(text) == "" {
+ return ""
+ }
+
+ cctx, cancel := context.WithTimeout(parent, synthesizeSummaryTimeout)
+ defer cancel()
+ resp, err := c.Chat(cctx, llm.ChatRequest{
+ Messages: []llm.Message{
+ {Role: "system", Content: "You summarize what an automated coding agent did. Reply with 2-4 sentences of plain prose. No bullets, no headings, no preamble."},
+ {Role: "user", Content: "Here is the agent's output. Summarize what it accomplished:\n\n" + text},
+ },
+ })
+ if err != nil {
+ return ""
+ }
+ return strings.TrimSpace(resp.Content)
+}
+
+// readAssistantTextTail returns the concatenated `text` blocks from assistant
+// stream-json events in the last maxBytes of the file. Non-assistant events
+// (system, result, tool_use, etc.) are skipped so the LLM sees just what the
+// agent said. Returns "" on any error.
+func readAssistantTextTail(stdoutPath string, maxBytes int64) string {
+ f, err := os.Open(stdoutPath)
+ if err != nil {
+ return ""
+ }
+ defer f.Close()
+
+ stat, err := f.Stat()
+ if err != nil {
+ return ""
+ }
+ size := stat.Size()
+ if size > maxBytes {
+ if _, err := f.Seek(size-maxBytes, io.SeekStart); err != nil {
+ return ""
+ }
+ }
+
+ var sb strings.Builder
+ scanner := bufio.NewScanner(f)
+ scanner.Buffer(make([]byte, 1024*1024), 1024*1024)
+ first := size > maxBytes // if we seeked, drop the first (likely partial) line
+ for scanner.Scan() {
+ if first {
+ first = false
+ continue
+ }
+ var event struct {
+ Type string `json:"type"`
+ Message struct {
+ Content []struct {
+ Type string `json:"type"`
+ Text string `json:"text"`
+ } `json:"content"`
+ } `json:"message"`
+ }
+ if err := json.Unmarshal(scanner.Bytes(), &event); err != nil || event.Type != "assistant" {
+ continue
+ }
+ for _, block := range event.Message.Content {
+ if block.Type == "text" && block.Text != "" {
+ sb.WriteString(block.Text)
+ sb.WriteString("\n")
+ }
+ }
+ }
+ return sb.String()
+}
+
// summaryFromLine parses a single stream-json line and returns the text after
// "## Summary" if the line is an assistant text block containing that heading.
func summaryFromLine(line []byte) string {
diff --git a/internal/executor/summary_synth_test.go b/internal/executor/summary_synth_test.go
new file mode 100644
index 0000000..7ad396d
--- /dev/null
+++ b/internal/executor/summary_synth_test.go
@@ -0,0 +1,241 @@
+package executor
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "net/http/httptest"
+ "os"
+ "path/filepath"
+ "strings"
+ "sync/atomic"
+ "testing"
+
+ "github.com/thepeterstone/claudomator/internal/llm"
+ "github.com/thepeterstone/claudomator/internal/storage"
+)
+
+func writeStreamLog(t *testing.T, lines []string) string {
+ t.Helper()
+ dir := t.TempDir()
+ path := filepath.Join(dir, "stdout.log")
+ var sb strings.Builder
+ for _, l := range lines {
+ sb.WriteString(l)
+ sb.WriteString("\n")
+ }
+ if err := os.WriteFile(path, []byte(sb.String()), 0600); err != nil {
+ t.Fatal(err)
+ }
+ return path
+}
+
+func TestSynthesizeSummary_NilClient(t *testing.T) {
+ got := synthesizeSummary(context.Background(), nil, "/some/path")
+ if got != "" {
+ t.Errorf("nil client: want empty, got %q", got)
+ }
+}
+
+func TestSynthesizeSummary_EmptyPath(t *testing.T) {
+ c := &llm.Client{Endpoint: "http://unused", Model: "x"}
+ got := synthesizeSummary(context.Background(), c, "")
+ if got != "" {
+ t.Errorf("empty path: want empty, got %q", got)
+ }
+}
+
+func TestSynthesizeSummary_MissingFile(t *testing.T) {
+ c := &llm.Client{Endpoint: "http://unused", Model: "x"}
+ got := synthesizeSummary(context.Background(), c, "/nonexistent/file.log")
+ if got != "" {
+ t.Errorf("missing file: want empty, got %q", got)
+ }
+}
+
+func TestSynthesizeSummary_EmptyAssistantContent(t *testing.T) {
+ // Log contains only system/result events — no assistant text. The function
+ // should short-circuit without calling the LLM.
+ path := writeStreamLog(t, []string{
+ `{"type":"system","subtype":"init"}`,
+ `{"type":"result","subtype":"success","total_cost_usd":0}`,
+ })
+
+ var calls int32
+ srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ atomic.AddInt32(&calls, 1)
+ w.Header().Set("Content-Type", "application/json")
+ fmt.Fprintln(w, `{"choices":[{"message":{"content":"should not be returned"},"finish_reason":"stop"}],"usage":{}}`)
+ }))
+ defer srv.Close()
+ c := &llm.Client{Endpoint: srv.URL + "/v1", Model: "x"}
+
+ got := synthesizeSummary(context.Background(), c, path)
+ if got != "" {
+ t.Errorf("empty content: want empty, got %q", got)
+ }
+ if atomic.LoadInt32(&calls) != 0 {
+ t.Errorf("LLM should not be called for empty assistant content")
+ }
+}
+
+func TestSynthesizeSummary_LLMSuccess(t *testing.T) {
+ path := writeStreamLog(t, []string{
+ `{"type":"assistant","message":{"content":[{"type":"text","text":"Ran the tests."}]}}`,
+ `{"type":"assistant","message":{"content":[{"type":"text","text":"Fixed the import."}]}}`,
+ `{"type":"result","subtype":"success"}`,
+ })
+
+ var capturedUser string
+ srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ var body struct {
+ Messages []struct {
+ Role, Content string
+ } `json:"messages"`
+ }
+ json.NewDecoder(r.Body).Decode(&body)
+ for _, m := range body.Messages {
+ if m.Role == "user" {
+ capturedUser = m.Content
+ }
+ }
+ w.Header().Set("Content-Type", "application/json")
+ fmt.Fprintln(w, `{"choices":[{"message":{"content":" Agent ran tests and fixed an import. "},"finish_reason":"stop"}],"usage":{}}`)
+ }))
+ defer srv.Close()
+ c := &llm.Client{Endpoint: srv.URL + "/v1", Model: "x"}
+
+ got := synthesizeSummary(context.Background(), c, path)
+ if got != "Agent ran tests and fixed an import." {
+ t.Errorf("summary: got %q", got)
+ }
+ if !strings.Contains(capturedUser, "Ran the tests.") {
+ t.Errorf("user prompt missing first assistant text; got: %s", capturedUser)
+ }
+ if !strings.Contains(capturedUser, "Fixed the import.") {
+ t.Errorf("user prompt missing second assistant text; got: %s", capturedUser)
+ }
+}
+
+// TestPool_HandleRunResult_LLMSummaryFallback verifies the Pool falls back to
+// LLM-synthesized summary when extractSummary returns empty.
+func TestPool_HandleRunResult_LLMSummaryFallback(t *testing.T) {
+ // stdout has assistant text but no "## Summary" heading.
+ stdoutPath := writeStreamLog(t, []string{
+ `{"type":"assistant","message":{"content":[{"type":"text","text":"Did the work without writing a summary section."}]}}`,
+ })
+
+ llmSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Content-Type", "application/json")
+ fmt.Fprintln(w, `{"choices":[{"message":{"content":"Synthesized summary."},"finish_reason":"stop"}],"usage":{}}`)
+ }))
+ defer llmSrv.Close()
+
+ store := newMinimalMockStore()
+ pool := newPoolWithMockStore(store)
+ pool.LLM = &llm.Client{Endpoint: llmSrv.URL + "/v1", Model: "x"}
+
+ tk := makeTask("synth-summary")
+ store.tasks[tk.ID] = tk
+ exec := &storage.Execution{ID: "e-synth", TaskID: tk.ID, Status: "RUNNING", StdoutPath: stdoutPath}
+
+ pool.handleRunResult(context.Background(), tk, exec, nil, "claude")
+
+ id, summary, ok := store.lastSummaryUpdate()
+ if !ok {
+ t.Fatalf("expected UpdateTaskSummary to be called")
+ }
+ if id != tk.ID {
+ t.Errorf("summary recorded for wrong task: %q", id)
+ }
+ if summary != "Synthesized summary." {
+ t.Errorf("summary: got %q", summary)
+ }
+
+ // Drain the result channel so the test exits cleanly.
+ <-pool.resultCh
+}
+
+// TestPool_HandleRunResult_ExtractSummaryWins verifies the LLM is NOT called
+// when the agent already wrote a "## Summary" section.
+func TestPool_HandleRunResult_ExtractSummaryWins(t *testing.T) {
+ stdoutPath := writeStreamLog(t, []string{
+ `{"type":"assistant","message":{"content":[{"type":"text","text":"## Summary\nAgent wrote its own summary."}]}}`,
+ })
+
+ var llmCalls int32
+ llmSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ atomic.AddInt32(&llmCalls, 1)
+ w.Header().Set("Content-Type", "application/json")
+ fmt.Fprintln(w, `{"choices":[{"message":{"content":"should not be used"},"finish_reason":"stop"}],"usage":{}}`)
+ }))
+ defer llmSrv.Close()
+
+ store := newMinimalMockStore()
+ pool := newPoolWithMockStore(store)
+ pool.LLM = &llm.Client{Endpoint: llmSrv.URL + "/v1", Model: "x"}
+
+ tk := makeTask("agent-summary")
+ store.tasks[tk.ID] = tk
+ exec := &storage.Execution{ID: "e-agent", TaskID: tk.ID, Status: "RUNNING", StdoutPath: stdoutPath}
+
+ pool.handleRunResult(context.Background(), tk, exec, nil, "claude")
+
+ if got := atomic.LoadInt32(&llmCalls); got != 0 {
+ t.Errorf("LLM should not be called when ## Summary is present; got %d calls", got)
+ }
+ _, summary, ok := store.lastSummaryUpdate()
+ if !ok {
+ t.Fatalf("expected UpdateTaskSummary")
+ }
+ if summary != "Agent wrote its own summary." {
+ t.Errorf("summary: got %q (want extractSummary output)", summary)
+ }
+ <-pool.resultCh
+}
+
+func TestSynthesizeSummary_LLMFailure_ReturnsEmpty(t *testing.T) {
+ path := writeStreamLog(t, []string{
+ `{"type":"assistant","message":{"content":[{"type":"text","text":"Did something."}]}}`,
+ })
+ srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ http.Error(w, "boom", http.StatusInternalServerError)
+ }))
+ defer srv.Close()
+ c := &llm.Client{Endpoint: srv.URL + "/v1", Model: "x"}
+
+ got := synthesizeSummary(context.Background(), c, path)
+ if got != "" {
+ t.Errorf("LLM failure: want empty, got %q", got)
+ }
+}
+
+// TestReadAssistantTextTail_TailingLargeFile verifies the seek-to-tail
+// behavior drops early content but keeps later assistant text.
+func TestReadAssistantTextTail_TailingLargeFile(t *testing.T) {
+ dir := t.TempDir()
+ path := filepath.Join(dir, "stdout.log")
+ f, err := os.Create(path)
+ if err != nil {
+ t.Fatal(err)
+ }
+ // Write a ton of garbage assistant lines, then a final marker.
+ for i := 0; i < 500; i++ {
+ fmt.Fprintf(f, `{"type":"assistant","message":{"content":[{"type":"text","text":"filler line that should be in the early part of a large file %04d"}]}}`+"\n", i)
+ }
+ fmt.Fprintln(f, `{"type":"assistant","message":{"content":[{"type":"text","text":"FINAL_MARKER_LINE"}]}}`)
+ f.Close()
+
+ got := readAssistantTextTail(path, 4*1024) // 4 KB cap
+ if !strings.Contains(got, "FINAL_MARKER_LINE") {
+ t.Errorf("tail should contain final line; got: %s", got)
+ }
+ if strings.Contains(got, "filler line that should be in the early part of a large file 0000") {
+ end := 200
+ if len(got) < end {
+ end = len(got)
+ }
+ t.Errorf("tail should NOT contain very-early line; got first 200 chars: %s", got[:end])
+ }
+}
diff --git a/internal/llm/client.go b/internal/llm/client.go
new file mode 100644
index 0000000..613ebe5
--- /dev/null
+++ b/internal/llm/client.go
@@ -0,0 +1,343 @@
+// Package llm provides a small OpenAI-compatible HTTP client used for
+// internal LLM-shaped work (model classification, summarization, elaboration)
+// against any local server speaking /v1/chat/completions: Ollama, vLLM,
+// LM Studio, llama.cpp server, etc.
+package llm
+
+import (
+ "bufio"
+ "bytes"
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "log/slog"
+ "net/http"
+ "strings"
+ "time"
+
+ "github.com/thepeterstone/claudomator/internal/retry"
+)
+
+// Client is an OpenAI-compatible chat completions client.
+// Endpoint is the base URL up through "/v1" (no trailing slash).
+type Client struct {
+ Endpoint string
+ Model string
+ APIKey string // optional, sent as Bearer token
+ HTTPClient *http.Client
+ Logger *slog.Logger
+}
+
+// Message is a single chat-completion message.
+type Message struct {
+ Role string `json:"role"`
+ Content string `json:"content"`
+}
+
+// ChatRequest captures the parameters of a single Chat or ChatStream call.
+// Zero values mean "use server default" except for Stream and ResponseJSON,
+// which are explicit booleans. Model overrides Client.Model when non-empty.
+type ChatRequest struct {
+ Model string
+ Messages []Message
+ Temperature *float64
+ MaxTokens int
+ ResponseJSON bool
+}
+
+// ChatResponse is the aggregated result of a chat completion.
+type ChatResponse struct {
+ Content string
+ PromptTokens int
+ OutputTokens int
+ Model string
+ FinishReason string
+}
+
+// Chat performs a non-streaming chat completion. Rate-limit errors (HTTP 429,
+// overloaded responses) are retried with exponential backoff via
+// retry.RunWithBackoff.
+func (c *Client) Chat(ctx context.Context, req ChatRequest) (*ChatResponse, error) {
+ if c == nil {
+ return nil, errors.New("llm: nil Client")
+ }
+ body, err := c.buildRequestBody(req, false)
+ if err != nil {
+ return nil, err
+ }
+
+ var resp *ChatResponse
+ err = retry.RunWithBackoff(ctx, 3, time.Second, func() error {
+ raw, perErr := c.postChat(ctx, body)
+ if perErr != nil {
+ return perErr
+ }
+ var oai openAIResponse
+ if jerr := json.Unmarshal(raw, &oai); jerr != nil {
+ return fmt.Errorf("llm: decode response: %w", jerr)
+ }
+ if len(oai.Choices) == 0 {
+ return fmt.Errorf("llm: response has no choices")
+ }
+ resp = &ChatResponse{
+ Content: oai.Choices[0].Message.Content,
+ PromptTokens: oai.Usage.PromptTokens,
+ OutputTokens: oai.Usage.CompletionTokens,
+ Model: oai.Model,
+ FinishReason: oai.Choices[0].FinishReason,
+ }
+ return nil
+ })
+ if err != nil {
+ return nil, err
+ }
+ return resp, nil
+}
+
+// ChatStream performs a streaming chat completion. onDelta is called once per
+// content delta chunk. The returned ChatResponse aggregates the full content
+// and any usage tokens reported in the final SSE chunk. Rate-limit errors at
+// connection time are retried; once streaming has begun, errors are returned.
+func (c *Client) ChatStream(ctx context.Context, req ChatRequest, onDelta func(string)) (*ChatResponse, error) {
+ if c == nil {
+ return nil, errors.New("llm: nil Client")
+ }
+ body, err := c.buildRequestBody(req, true)
+ if err != nil {
+ return nil, err
+ }
+
+ var resp *ChatResponse
+ err = retry.RunWithBackoff(ctx, 3, time.Second, func() error {
+ var perErr error
+ resp, perErr = c.streamChat(ctx, body, onDelta)
+ return perErr
+ })
+ if err != nil {
+ return nil, err
+ }
+ return resp, nil
+}
+
+func (c *Client) buildRequestBody(req ChatRequest, stream bool) ([]byte, error) {
+ model := req.Model
+ if model == "" {
+ model = c.Model
+ }
+ if model == "" {
+ return nil, errors.New("llm: no model configured")
+ }
+ payload := openAIRequest{
+ Model: model,
+ Messages: req.Messages,
+ Stream: stream,
+ }
+ if req.Temperature != nil {
+ payload.Temperature = req.Temperature
+ }
+ if req.MaxTokens > 0 {
+ payload.MaxTokens = req.MaxTokens
+ }
+ if req.ResponseJSON {
+ payload.ResponseFormat = &responseFormat{Type: "json_object"}
+ }
+ if stream {
+ payload.StreamOptions = &streamOptions{IncludeUsage: true}
+ }
+ return json.Marshal(payload)
+}
+
+func (c *Client) postChat(ctx context.Context, body []byte) ([]byte, error) {
+ url := strings.TrimRight(c.Endpoint, "/") + "/chat/completions"
+ httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
+ if err != nil {
+ return nil, fmt.Errorf("llm: build request: %w", err)
+ }
+ c.applyHeaders(httpReq)
+
+ httpResp, err := c.client().Do(httpReq)
+ if err != nil {
+ return nil, fmt.Errorf("llm: http: %w", err)
+ }
+ defer httpResp.Body.Close()
+ raw, err := io.ReadAll(httpResp.Body)
+ if err != nil {
+ return nil, fmt.Errorf("llm: read body: %w", err)
+ }
+ if httpResp.StatusCode >= 400 {
+ return nil, errFromStatus(httpResp, raw)
+ }
+ return raw, nil
+}
+
+func (c *Client) streamChat(ctx context.Context, body []byte, onDelta func(string)) (*ChatResponse, error) {
+ url := strings.TrimRight(c.Endpoint, "/") + "/chat/completions"
+ httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
+ if err != nil {
+ return nil, fmt.Errorf("llm: build request: %w", err)
+ }
+ c.applyHeaders(httpReq)
+ httpReq.Header.Set("Accept", "text/event-stream")
+
+ httpResp, err := c.client().Do(httpReq)
+ if err != nil {
+ return nil, fmt.Errorf("llm: http: %w", err)
+ }
+ defer httpResp.Body.Close()
+ if httpResp.StatusCode >= 400 {
+ raw, _ := io.ReadAll(httpResp.Body)
+ return nil, errFromStatus(httpResp, raw)
+ }
+
+ var (
+ content strings.Builder
+ promptTok int
+ outputTok int
+ model string
+ finishReason string
+ )
+ scanner := bufio.NewScanner(httpResp.Body)
+ scanner.Buffer(make([]byte, 0, 64*1024), 1<<20)
+ for scanner.Scan() {
+ line := scanner.Text()
+ if !strings.HasPrefix(line, "data:") {
+ continue
+ }
+ data := strings.TrimSpace(strings.TrimPrefix(line, "data:"))
+ if data == "" || data == "[DONE]" {
+ if data == "[DONE]" {
+ break
+ }
+ continue
+ }
+ var chunk openAIStreamChunk
+ if jerr := json.Unmarshal([]byte(data), &chunk); jerr != nil {
+ if c.Logger != nil {
+ c.Logger.Warn("llm: bad SSE chunk", "err", jerr, "data", data)
+ }
+ continue
+ }
+ if chunk.Model != "" {
+ model = chunk.Model
+ }
+ for _, ch := range chunk.Choices {
+ if ch.Delta.Content != "" {
+ content.WriteString(ch.Delta.Content)
+ if onDelta != nil {
+ onDelta(ch.Delta.Content)
+ }
+ }
+ if ch.FinishReason != "" {
+ finishReason = ch.FinishReason
+ }
+ }
+ if chunk.Usage != nil {
+ promptTok = chunk.Usage.PromptTokens
+ outputTok = chunk.Usage.CompletionTokens
+ }
+ }
+ if scanErr := scanner.Err(); scanErr != nil {
+ return nil, fmt.Errorf("llm: stream read: %w", scanErr)
+ }
+ return &ChatResponse{
+ Content: content.String(),
+ PromptTokens: promptTok,
+ OutputTokens: outputTok,
+ Model: model,
+ FinishReason: finishReason,
+ }, nil
+}
+
+func (c *Client) applyHeaders(req *http.Request) {
+ req.Header.Set("Content-Type", "application/json")
+ if c.APIKey != "" {
+ req.Header.Set("Authorization", "Bearer "+c.APIKey)
+ }
+}
+
+func (c *Client) client() *http.Client {
+ if c.HTTPClient != nil {
+ return c.HTTPClient
+ }
+ return &http.Client{Timeout: 60 * time.Second}
+}
+
+// errFromStatus produces an error whose message includes "rate limit", "429",
+// or "overloaded" as appropriate so retry.IsRateLimitError treats local 429/503
+// identically to upstream provider rate limits. Any Retry-After header is
+// embedded in the error message for retry.ParseRetryAfter to find.
+func errFromStatus(resp *http.Response, body []byte) error {
+ prefix := ""
+ switch resp.StatusCode {
+ case http.StatusTooManyRequests:
+ prefix = fmt.Sprintf("llm: 429 rate limit")
+ case http.StatusServiceUnavailable:
+ prefix = "llm: 503 overloaded"
+ default:
+ prefix = fmt.Sprintf("llm: http %d", resp.StatusCode)
+ }
+ if ra := resp.Header.Get("Retry-After"); ra != "" {
+ prefix += fmt.Sprintf(" (retry-after: %s)", ra)
+ }
+ snippet := strings.TrimSpace(string(body))
+ if len(snippet) > 500 {
+ snippet = snippet[:500] + "..."
+ }
+ if snippet != "" {
+ return fmt.Errorf("%s: %s", prefix, snippet)
+ }
+ return errors.New(prefix)
+}
+
+// --- OpenAI wire types ---
+
+type openAIRequest struct {
+ Model string `json:"model"`
+ Messages []Message `json:"messages"`
+ Temperature *float64 `json:"temperature,omitempty"`
+ MaxTokens int `json:"max_tokens,omitempty"`
+ Stream bool `json:"stream,omitempty"`
+ StreamOptions *streamOptions `json:"stream_options,omitempty"`
+ ResponseFormat *responseFormat `json:"response_format,omitempty"`
+}
+
+type streamOptions struct {
+ IncludeUsage bool `json:"include_usage"`
+}
+
+type responseFormat struct {
+ Type string `json:"type"`
+}
+
+type openAIResponse struct {
+ Model string `json:"model"`
+ Choices []openAIChoice `json:"choices"`
+ Usage openAIUsage `json:"usage"`
+}
+
+type openAIChoice struct {
+ Message Message `json:"message"`
+ FinishReason string `json:"finish_reason"`
+}
+
+type openAIUsage struct {
+ PromptTokens int `json:"prompt_tokens"`
+ CompletionTokens int `json:"completion_tokens"`
+}
+
+type openAIStreamChunk struct {
+ Model string `json:"model"`
+ Choices []openAIStreamCh `json:"choices"`
+ Usage *openAIUsage `json:"usage,omitempty"`
+}
+
+type openAIStreamCh struct {
+ Delta openAIDelta `json:"delta"`
+ FinishReason string `json:"finish_reason"`
+}
+
+type openAIDelta struct {
+ Content string `json:"content"`
+}
diff --git a/internal/llm/client_test.go b/internal/llm/client_test.go
new file mode 100644
index 0000000..8257836
--- /dev/null
+++ b/internal/llm/client_test.go
@@ -0,0 +1,159 @@
+package llm
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "net/http/httptest"
+ "strings"
+ "sync/atomic"
+ "testing"
+ "time"
+)
+
+func TestChat_ParsesCompletion(t *testing.T) {
+ srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ if r.URL.Path != "/v1/chat/completions" {
+ t.Errorf("unexpected path %q", r.URL.Path)
+ }
+ if r.Header.Get("Authorization") != "Bearer test-key" {
+ t.Errorf("missing/wrong bearer header: %q", r.Header.Get("Authorization"))
+ }
+ var body openAIRequest
+ if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
+ t.Fatalf("decode body: %v", err)
+ }
+ if body.Model != "test-model" {
+ t.Errorf("model: want test-model got %q", body.Model)
+ }
+ if len(body.Messages) != 1 || body.Messages[0].Content != "hello" {
+ t.Errorf("messages mismatch: %+v", body.Messages)
+ }
+ if body.ResponseFormat == nil || body.ResponseFormat.Type != "json_object" {
+ t.Errorf("expected response_format json_object, got %+v", body.ResponseFormat)
+ }
+ w.Header().Set("Content-Type", "application/json")
+ fmt.Fprintln(w, `{
+ "model": "test-model",
+ "choices": [{"message": {"role": "assistant", "content": "world"}, "finish_reason": "stop"}],
+ "usage": {"prompt_tokens": 4, "completion_tokens": 7}
+ }`)
+ }))
+ defer srv.Close()
+
+ c := &Client{Endpoint: srv.URL + "/v1", Model: "test-model", APIKey: "test-key"}
+ resp, err := c.Chat(context.Background(), ChatRequest{
+ Messages: []Message{{Role: "user", Content: "hello"}},
+ ResponseJSON: true,
+ })
+ if err != nil {
+ t.Fatalf("Chat: %v", err)
+ }
+ if resp.Content != "world" {
+ t.Errorf("content: want world got %q", resp.Content)
+ }
+ if resp.PromptTokens != 4 || resp.OutputTokens != 7 {
+ t.Errorf("tokens mismatch: %+v", resp)
+ }
+ if resp.FinishReason != "stop" {
+ t.Errorf("finish_reason: want stop got %q", resp.FinishReason)
+ }
+}
+
+func TestChatStream_ParsesSSE(t *testing.T) {
+ srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Content-Type", "text/event-stream")
+ flusher, _ := w.(http.Flusher)
+ chunks := []string{
+ `{"model":"test-model","choices":[{"delta":{"content":"Hel"},"finish_reason":""}]}`,
+ `{"model":"test-model","choices":[{"delta":{"content":"lo, "},"finish_reason":""}]}`,
+ `{"model":"test-model","choices":[{"delta":{"content":"world"},"finish_reason":"stop"}]}`,
+ `{"model":"test-model","choices":[],"usage":{"prompt_tokens":3,"completion_tokens":5}}`,
+ }
+ for _, c := range chunks {
+ fmt.Fprintf(w, "data: %s\n\n", c)
+ if flusher != nil {
+ flusher.Flush()
+ }
+ }
+ fmt.Fprint(w, "data: [DONE]\n\n")
+ }))
+ defer srv.Close()
+
+ c := &Client{Endpoint: srv.URL + "/v1", Model: "test-model"}
+
+ var deltas []string
+ resp, err := c.ChatStream(context.Background(),
+ ChatRequest{Messages: []Message{{Role: "user", Content: "hi"}}},
+ func(d string) { deltas = append(deltas, d) },
+ )
+ if err != nil {
+ t.Fatalf("ChatStream: %v", err)
+ }
+ if got := strings.Join(deltas, ""); got != "Hello, world" {
+ t.Errorf("aggregated deltas: want %q got %q", "Hello, world", got)
+ }
+ if resp.Content != "Hello, world" {
+ t.Errorf("content: want %q got %q", "Hello, world", resp.Content)
+ }
+ if resp.PromptTokens != 3 || resp.OutputTokens != 5 {
+ t.Errorf("tokens: %+v", resp)
+ }
+ if resp.FinishReason != "stop" {
+ t.Errorf("finish_reason: want stop got %q", resp.FinishReason)
+ }
+}
+
+func TestChat_RetriesOn429(t *testing.T) {
+ var calls int32
+ srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ n := atomic.AddInt32(&calls, 1)
+ if n == 1 {
+ w.Header().Set("Retry-After", "1")
+ http.Error(w, "slow down", http.StatusTooManyRequests)
+ return
+ }
+ w.Header().Set("Content-Type", "application/json")
+ fmt.Fprintln(w, `{
+ "model":"m","choices":[{"message":{"role":"assistant","content":"ok"},"finish_reason":"stop"}],
+ "usage":{"prompt_tokens":1,"completion_tokens":1}
+ }`)
+ }))
+ defer srv.Close()
+
+ c := &Client{
+ Endpoint: srv.URL + "/v1",
+ Model: "m",
+ HTTPClient: &http.Client{Timeout: 5 * time.Second},
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+ resp, err := c.Chat(ctx, ChatRequest{Messages: []Message{{Role: "user", Content: "hi"}}})
+ if err != nil {
+ t.Fatalf("Chat: %v", err)
+ }
+ if resp.Content != "ok" {
+ t.Errorf("content: want ok got %q", resp.Content)
+ }
+ if got := atomic.LoadInt32(&calls); got != 2 {
+ t.Errorf("expected 2 server calls (1 retry), got %d", got)
+ }
+}
+
+// Sanity: errFromStatus produces a string that retry.IsRateLimitError matches.
+func TestErrFromStatus_RateLimitMarker(t *testing.T) {
+ resp := &http.Response{
+ StatusCode: http.StatusTooManyRequests,
+ Header: http.Header{"Retry-After": []string{"30"}},
+ }
+ body, _ := io.ReadAll(strings.NewReader("limit hit"))
+ err := errFromStatus(resp, body)
+ if !strings.Contains(strings.ToLower(err.Error()), "rate limit") {
+ t.Errorf("error should contain 'rate limit', got: %v", err)
+ }
+ if !strings.Contains(err.Error(), "retry-after: 30") {
+ t.Errorf("error should embed retry-after, got: %v", err)
+ }
+}
diff --git a/internal/retry/backoff.go b/internal/retry/backoff.go
new file mode 100644
index 0000000..b91abc4
--- /dev/null
+++ b/internal/retry/backoff.go
@@ -0,0 +1,77 @@
+// Package retry provides exponential-backoff retry helpers used across the
+// codebase for rate-limit-aware HTTP/subprocess calls.
+package retry
+
+import (
+ "context"
+ "fmt"
+ "regexp"
+ "strconv"
+ "strings"
+ "time"
+)
+
+var retryAfterRe = regexp.MustCompile(`(?i)retry[-_ ]after[:\s]+(\d+)`)
+
+const maxBackoffDelay = 5 * time.Minute
+
+// IsRateLimitError returns true if err looks like a transient rate-limit
+// (e.g. HTTP 429, "too many requests", "overloaded") that is worth retrying.
+func IsRateLimitError(err error) bool {
+ if err == nil {
+ return false
+ }
+ msg := strings.ToLower(err.Error())
+ return strings.Contains(msg, "rate limit") ||
+ strings.Contains(msg, "too many requests") ||
+ strings.Contains(msg, "429") ||
+ strings.Contains(msg, "overloaded")
+}
+
+// ParseRetryAfter extracts a Retry-After duration from an error message.
+// Returns 0 if no retry-after value is found.
+func ParseRetryAfter(msg string) time.Duration {
+ m := retryAfterRe.FindStringSubmatch(msg)
+ if m == nil {
+ return 0
+ }
+ secs, err := strconv.Atoi(m[1])
+ if err != nil || secs <= 0 {
+ return 0
+ }
+ return time.Duration(secs) * time.Second
+}
+
+// RunWithBackoff calls fn repeatedly on rate-limit errors, using exponential backoff.
+// maxRetries is the max number of retry attempts (not counting the initial call).
+// baseDelay is the initial backoff duration (doubled each retry).
+func RunWithBackoff(ctx context.Context, maxRetries int, baseDelay time.Duration, fn func() error) error {
+ var lastErr error
+ for attempt := 0; attempt <= maxRetries; attempt++ {
+ lastErr = fn()
+ if lastErr == nil {
+ return nil
+ }
+ if !IsRateLimitError(lastErr) {
+ return lastErr
+ }
+ if attempt == maxRetries {
+ break
+ }
+
+ delay := baseDelay * (1 << attempt)
+ if delay > maxBackoffDelay {
+ delay = maxBackoffDelay
+ }
+ if ra := ParseRetryAfter(lastErr.Error()); ra > 0 {
+ delay = ra
+ }
+
+ select {
+ case <-ctx.Done():
+ return fmt.Errorf("context cancelled during rate-limit backoff: %w", ctx.Err())
+ case <-time.After(delay):
+ }
+ }
+ return lastErr
+}
diff --git a/internal/executor/ratelimit_test.go b/internal/retry/backoff_test.go
index 1434810..a963fc2 100644
--- a/internal/executor/ratelimit_test.go
+++ b/internal/retry/backoff_test.go
@@ -1,4 +1,4 @@
-package executor
+package retry
import (
"context"
@@ -8,54 +8,54 @@ import (
"time"
)
-// --- isRateLimitError tests ---
+// --- IsRateLimitError tests ---
func TestIsRateLimitError_RateLimitMessage(t *testing.T) {
err := errors.New("claude exited with error: rate limit exceeded")
- if !isRateLimitError(err) {
+ if !IsRateLimitError(err) {
t.Error("want true for 'rate limit exceeded', got false")
}
}
func TestIsRateLimitError_TooManyRequests(t *testing.T) {
err := errors.New("too many requests to the API")
- if !isRateLimitError(err) {
+ if !IsRateLimitError(err) {
t.Error("want true for 'too many requests', got false")
}
}
func TestIsRateLimitError_HTTP429(t *testing.T) {
err := errors.New("API returned status 429")
- if !isRateLimitError(err) {
+ if !IsRateLimitError(err) {
t.Error("want true for '429', got false")
}
}
func TestIsRateLimitError_Overloaded(t *testing.T) {
err := errors.New("API overloaded, please retry later")
- if !isRateLimitError(err) {
+ if !IsRateLimitError(err) {
t.Error("want true for 'overloaded', got false")
}
}
func TestIsRateLimitError_NonRateLimitError(t *testing.T) {
err := errors.New("claude exited with error: exit status 1")
- if isRateLimitError(err) {
+ if IsRateLimitError(err) {
t.Error("want false for non-rate-limit error, got true")
}
}
func TestIsRateLimitError_NilError(t *testing.T) {
- if isRateLimitError(nil) {
+ if IsRateLimitError(nil) {
t.Error("want false for nil error, got true")
}
}
-// --- parseRetryAfter tests ---
+// --- ParseRetryAfter tests ---
func TestParseRetryAfter_RetryAfterSeconds(t *testing.T) {
msg := "rate limit exceeded, retry after 30 seconds"
- d := parseRetryAfter(msg)
+ d := ParseRetryAfter(msg)
if d != 30*time.Second {
t.Errorf("want 30s, got %v", d)
}
@@ -63,7 +63,7 @@ func TestParseRetryAfter_RetryAfterSeconds(t *testing.T) {
func TestParseRetryAfter_RetryAfterHeader(t *testing.T) {
msg := "rate_limit_error: retry-after: 60"
- d := parseRetryAfter(msg)
+ d := ParseRetryAfter(msg)
if d != 60*time.Second {
t.Errorf("want 60s, got %v", d)
}
@@ -71,43 +71,13 @@ func TestParseRetryAfter_RetryAfterHeader(t *testing.T) {
func TestParseRetryAfter_NoRetryInfo(t *testing.T) {
msg := "rate limit exceeded"
- d := parseRetryAfter(msg)
+ d := ParseRetryAfter(msg)
if d != 0 {
t.Errorf("want 0, got %v", d)
}
}
-// --- isQuotaExhausted tests ---
-
-func TestIsQuotaExhausted_GeminiDailyQuota(t *testing.T) {
- err := errors.New("container execution failed: exit status 1\nTerminalQuotaError: You have exhausted your daily quota on this model.")
- if !isQuotaExhausted(err) {
- t.Error("want true for Gemini TerminalQuotaError, got false")
- }
-}
-
-func TestIsQuotaExhausted_GeminiExhaustedMessage(t *testing.T) {
- err := errors.New("container execution failed: exit status 1\nyou have exhausted your daily quota")
- if !isQuotaExhausted(err) {
- t.Error("want true for 'exhausted your daily quota', got false")
- }
-}
-
-func TestIsQuotaExhausted_GeminiQuotaExceeded(t *testing.T) {
- err := errors.New("container execution failed: exit status 1\nQuota exceeded for metric: generativelanguage.googleapis.com/generate_content_free_tier_requests")
- if !isQuotaExhausted(err) {
- t.Error("want true for Gemini free tier quota exceeded, got false")
- }
-}
-
-func TestIsQuotaExhausted_NotQuota(t *testing.T) {
- err := errors.New("container execution failed: exit status 1")
- if isQuotaExhausted(err) {
- t.Error("want false for generic exit status 1, got true")
- }
-}
-
-// --- runWithBackoff tests ---
+// --- RunWithBackoff tests ---
func TestRunWithBackoff_SuccessOnFirstTry(t *testing.T) {
calls := 0
@@ -115,7 +85,7 @@ func TestRunWithBackoff_SuccessOnFirstTry(t *testing.T) {
calls++
return nil
}
- err := runWithBackoff(context.Background(), 3, time.Millisecond, fn)
+ err := RunWithBackoff(context.Background(), 3, time.Millisecond, fn)
if err != nil {
t.Errorf("want nil error, got %v", err)
}
@@ -133,7 +103,7 @@ func TestRunWithBackoff_RetriesOnRateLimit(t *testing.T) {
}
return nil
}
- err := runWithBackoff(context.Background(), 3, time.Millisecond, fn)
+ err := RunWithBackoff(context.Background(), 3, time.Millisecond, fn)
if err != nil {
t.Errorf("want nil error, got %v", err)
}
@@ -149,11 +119,10 @@ func TestRunWithBackoff_GivesUpAfterMaxRetries(t *testing.T) {
calls++
return rateLimitErr
}
- err := runWithBackoff(context.Background(), 3, time.Millisecond, fn)
+ err := RunWithBackoff(context.Background(), 3, time.Millisecond, fn)
if err == nil {
t.Fatal("want error after max retries, got nil")
}
- // maxRetries=3: 1 initial call + 3 retries = 4 total calls
if calls != 4 {
t.Errorf("want 4 calls (1 initial + 3 retries), got %d", calls)
}
@@ -165,7 +134,7 @@ func TestRunWithBackoff_DoesNotRetryNonRateLimitError(t *testing.T) {
calls++
return fmt.Errorf("permission denied")
}
- err := runWithBackoff(context.Background(), 3, time.Millisecond, fn)
+ err := RunWithBackoff(context.Background(), 3, time.Millisecond, fn)
if err == nil {
t.Fatal("want error, got nil")
}
@@ -180,12 +149,12 @@ func TestRunWithBackoff_ContextCancellation(t *testing.T) {
fn := func() error {
calls++
- cancel() // cancel immediately after first call
+ cancel()
return fmt.Errorf("rate limit exceeded")
}
start := time.Now()
- err := runWithBackoff(ctx, 3, time.Second, fn) // large delay confirms ctx preempts wait
+ err := RunWithBackoff(ctx, 3, time.Second, fn)
elapsed := time.Since(start)
if err == nil {
diff --git a/internal/storage/db.go b/internal/storage/db.go
index 3a3e6b2..4adc1ba 100644
--- a/internal/storage/db.go
+++ b/internal/storage/db.go
@@ -137,6 +137,8 @@ func (s *DB) migrate() error {
`ALTER TABLE tasks ADD COLUMN acceptance_criteria TEXT NOT NULL DEFAULT ''`,
`ALTER TABLE tasks ADD COLUMN checker_for_task_id TEXT NOT NULL DEFAULT ''`,
`ALTER TABLE tasks ADD COLUMN checker_report TEXT NOT NULL DEFAULT ''`,
+ `ALTER TABLE executions ADD COLUMN tokens_in INTEGER`,
+ `ALTER TABLE executions ADD COLUMN tokens_out INTEGER`,
}
for _, m := range migrations {
if _, err := s.db.Exec(m); err != nil {
@@ -456,6 +458,11 @@ type Execution struct {
Changestats *task.Changestats // stored as JSON; nil if not yet recorded
Commits []task.GitCommit // stored as JSON; empty if no commits
+ // Token usage for non-CLI runners (e.g. LocalRunner). 0 for Claude/Gemini
+ // CLI runs which report cost in cost_usd instead.
+ TokensIn int64
+ TokensOut int64
+
// In-memory only: set when creating a resume execution, not stored in DB.
ResumeSessionID string
ResumeAnswer string
@@ -532,23 +539,23 @@ func (s *DB) CreateExecution(e *Execution) error {
commitsJSON = string(b)
}
_, err := s.db.Exec(`
- INSERT INTO executions (id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
+ INSERT INTO executions (id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json, tokens_in, tokens_out)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
e.ID, e.TaskID, e.StartTime.UTC(), e.EndTime.UTC(), e.ExitCode, e.Status,
- e.StdoutPath, e.StderrPath, e.ArtifactDir, e.CostUSD, e.ErrorMsg, e.SessionID, e.SandboxDir, changestatsJSON, commitsJSON,
+ e.StdoutPath, e.StderrPath, e.ArtifactDir, e.CostUSD, e.ErrorMsg, e.SessionID, e.SandboxDir, changestatsJSON, commitsJSON, e.TokensIn, e.TokensOut,
)
return err
}
// GetExecution retrieves an execution by ID.
func (s *DB) GetExecution(id string) (*Execution, error) {
- row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json FROM executions WHERE id = ?`, id)
+ row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json, tokens_in, tokens_out FROM executions WHERE id = ?`, id)
return scanExecution(row)
}
// ListExecutions returns executions for a task.
func (s *DB) ListExecutions(taskID string) ([]*Execution, error) {
- rows, err := s.db.Query(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json FROM executions WHERE task_id = ? ORDER BY start_time DESC`, taskID)
+ rows, err := s.db.Query(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json, tokens_in, tokens_out FROM executions WHERE task_id = ? ORDER BY start_time DESC`, taskID)
if err != nil {
return nil, err
}
@@ -567,7 +574,7 @@ func (s *DB) ListExecutions(taskID string) ([]*Execution, error) {
// GetLatestExecution returns the most recent execution for a task.
func (s *DB) GetLatestExecution(taskID string) (*Execution, error) {
- row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json FROM executions WHERE task_id = ? ORDER BY start_time DESC LIMIT 1`, taskID)
+ row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json, tokens_in, tokens_out FROM executions WHERE task_id = ? ORDER BY start_time DESC LIMIT 1`, taskID)
return scanExecution(row)
}
@@ -905,11 +912,11 @@ func (s *DB) UpdateExecution(e *Execution) error {
_, err := s.db.Exec(`
UPDATE executions SET end_time = ?, exit_code = ?, status = ?, cost_usd = ?, error_msg = ?,
stdout_path = ?, stderr_path = ?, artifact_dir = ?, session_id = ?, sandbox_dir = ?,
- changestats_json = ?, commits_json = ?
+ changestats_json = ?, commits_json = ?, tokens_in = ?, tokens_out = ?
WHERE id = ?`,
e.EndTime.UTC(), e.ExitCode, e.Status, e.CostUSD, e.ErrorMsg,
e.StdoutPath, e.StderrPath, e.ArtifactDir, e.SessionID, e.SandboxDir,
- changestatsJSON, commitsJSON, e.ID,
+ changestatsJSON, commitsJSON, e.TokensIn, e.TokensOut, e.ID,
)
return err
}
@@ -965,11 +972,6 @@ func scanTask(row scanner) (*task.Task, error) {
t.State = task.State(state)
t.Priority = task.Priority(priority)
t.Timeout.Duration = time.Duration(timeoutNS)
- // Add debug log for configJSON
- // The logger is not available directly in db.go, so I'll use fmt.Printf for now.
- // For production code, a logger should be injected.
- // fmt.Printf("DEBUG: configJSON from DB: %s\n", configJSON)
- // TODO: Replace with proper logger when available.
if err := json.Unmarshal([]byte(configJSON), &t.Agent); err != nil {
return nil, fmt.Errorf("unmarshaling agent config: %w", err)
}
@@ -1002,13 +1004,17 @@ func scanExecution(row scanner) (*Execution, error) {
var sandboxDir sql.NullString
var changestatsJSON sql.NullString
var commitsJSON sql.NullString
+ var tokensIn sql.NullInt64
+ var tokensOut sql.NullInt64
err := row.Scan(&e.ID, &e.TaskID, &e.StartTime, &e.EndTime, &e.ExitCode, &e.Status,
- &e.StdoutPath, &e.StderrPath, &e.ArtifactDir, &e.CostUSD, &e.ErrorMsg, &sessionID, &sandboxDir, &changestatsJSON, &commitsJSON)
+ &e.StdoutPath, &e.StderrPath, &e.ArtifactDir, &e.CostUSD, &e.ErrorMsg, &sessionID, &sandboxDir, &changestatsJSON, &commitsJSON, &tokensIn, &tokensOut)
if err != nil {
return nil, err
}
e.SessionID = sessionID.String
e.SandboxDir = sandboxDir.String
+ e.TokensIn = tokensIn.Int64
+ e.TokensOut = tokensOut.Int64
if changestatsJSON.Valid && changestatsJSON.String != "" {
var cs task.Changestats
if err := json.Unmarshal([]byte(changestatsJSON.String), &cs); err != nil {
diff --git a/internal/task/task.go b/internal/task/task.go
index 9f1f92f..935a238 100644
--- a/internal/task/task.go
+++ b/internal/task/task.go
@@ -39,6 +39,13 @@ type AgentConfig struct {
DisallowedTools []string `yaml:"disallowed_tools" json:"disallowed_tools"`
SystemPromptAppend string `yaml:"system_prompt_append" json:"system_prompt_append"`
AdditionalArgs []string `yaml:"additional_args" json:"additional_args"`
+ ProjectDir string `yaml:"project_dir" json:"project_dir,omitempty"`
+ SkipPlanning bool `yaml:"skip_planning" json:"skip_planning"`
+
+ // Local-runner sampling controls. Pointer for Temperature so a 0 value can
+ // mean "deterministic" rather than "unset, use server default".
+ Temperature *float64 `yaml:"temperature,omitempty" json:"temperature,omitempty"`
+ MaxTokens int `yaml:"max_tokens,omitempty" json:"max_tokens,omitempty"`
}