diff options
| author | Peter Stone <thepeterstone@gmail.com> | 2026-05-13 04:02:20 +0000 |
|---|---|---|
| committer | Peter Stone <thepeterstone@gmail.com> | 2026-05-13 04:02:20 +0000 |
| commit | 68399a598924775a3ec22a39c2336ae497fb07f3 (patch) | |
| tree | 29ade8224eb51eca47a1d9d03bb4d0d3653a72aa /internal/executor | |
| parent | f01231cc45f41ce2dc37072e77428e467ef3fc15 (diff) | |
| parent | d970c0730ff0dc7d714d3261197d8ba52b5d21f4 (diff) | |
Merges 12 commits from github/main (formerly master) that were developed
independently. Key additions:
- LocalRunner: OpenAI-compatible local LLM execution (Ollama, LM Studio)
- Real GeminiRunner with full sandbox parity to ClaudeRunner
- llm.Client for enriching CI failures and elaboration via local model
- retry.ParseRetryAfter moved to shared package
- tokens_in/tokens_out columns in executions table
Conflict resolutions:
- Kept local main's VAPID/push, stories, projects, agent events schema
- Merged both sets of Config fields (local + LocalModel from github/main)
- Unified activePerAgent accounting (decActiveAgent helper)
- Removed duplicate helpers from claude.go (now in helpers.go)
- Fixed double-decrement bug in handleRunResult vs decActiveAgent
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/executor')
| -rw-r--r-- | internal/executor/classifier.go | 33 | ||||
| -rw-r--r-- | internal/executor/classifier_test.go | 76 | ||||
| -rw-r--r-- | internal/executor/claude.go | 552 | ||||
| -rw-r--r-- | internal/executor/claude_test.go | 810 | ||||
| -rw-r--r-- | internal/executor/container_test.go | 2 | ||||
| -rw-r--r-- | internal/executor/executor.go | 101 | ||||
| -rw-r--r-- | internal/executor/executor_test.go | 17 | ||||
| -rw-r--r-- | internal/executor/gemini.go | 346 | ||||
| -rw-r--r-- | internal/executor/gemini_test.go | 447 | ||||
| -rw-r--r-- | internal/executor/helpers.go | 6 | ||||
| -rw-r--r-- | internal/executor/local.go | 171 | ||||
| -rw-r--r-- | internal/executor/local_test.go | 152 | ||||
| -rw-r--r-- | internal/executor/ratelimit.go | 80 | ||||
| -rw-r--r-- | internal/executor/ratelimit_test.go | 200 | ||||
| -rw-r--r-- | internal/executor/summary.go | 95 | ||||
| -rw-r--r-- | internal/executor/summary_synth_test.go | 241 |
16 files changed, 3000 insertions, 329 deletions
diff --git a/internal/executor/classifier.go b/internal/executor/classifier.go index 7a474b6..049dc4f 100644 --- a/internal/executor/classifier.go +++ b/internal/executor/classifier.go @@ -6,6 +6,8 @@ import ( "fmt" "os/exec" "strings" + + "github.com/thepeterstone/claudomator/internal/llm" ) type Classification struct { @@ -19,7 +21,12 @@ type SystemStatus struct { RateLimited map[string]bool } +// Classifier picks a model for an incoming task. When LLM is non-nil the +// classifier routes through the local OpenAI-compatible client (cheap, +// private, fast). Otherwise it falls back to invoking the Gemini CLI +// at GeminiBinaryPath. type Classifier struct { + LLM *llm.Client GeminiBinaryPath string } @@ -62,6 +69,10 @@ func (c *Classifier) Classify(ctx context.Context, taskName, instructions string agentType, taskName, instructions, agentType, ) + if c.LLM != nil { + return c.classifyViaLLM(ctx, prompt, agentType) + } + binary := c.GeminiBinaryPath if binary == "" { binary = "gemini" @@ -123,3 +134,25 @@ func (c *Classifier) Classify(ctx context.Context, taskName, instructions string return &cls, nil } + +// classifyViaLLM routes classification through the local OpenAI-compatible +// client with response_format=json_object, so we get clean JSON without the +// markdown-fence cleanup needed for the Gemini CLI fallback. +func (c *Classifier) classifyViaLLM(ctx context.Context, prompt, agentType string) (*Classification, error) { + resp, err := c.LLM.Chat(ctx, llm.ChatRequest{ + Messages: []llm.Message{{Role: "user", Content: prompt}}, + ResponseJSON: true, + }) + if err != nil { + return nil, fmt.Errorf("classifier (local llm): %w", err) + } + body := strings.TrimSpace(resp.Content) + var cls Classification + if err := json.Unmarshal([]byte(body), &cls); err != nil { + return nil, fmt.Errorf("classifier (local llm): parse JSON: %w\nbody: %s", err, body) + } + if cls.AgentType == "" { + cls.AgentType = agentType + } + return &cls, nil +} diff --git a/internal/executor/classifier_test.go b/internal/executor/classifier_test.go index 83a9743..84fffcf 100644 --- a/internal/executor/classifier_test.go +++ b/internal/executor/classifier_test.go @@ -2,8 +2,15 @@ package executor import ( "context" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" "os" + "strings" "testing" + + "github.com/thepeterstone/claudomator/internal/llm" ) // TestClassifier_Classify_Mock tests the classifier with a mocked gemini binary. @@ -36,6 +43,75 @@ echo '{"response": "{\"agent_type\": \"gemini\", \"model\": \"gemini-2.5-flash-l } } +// TestClassifier_Classify_LLM tests classification through a local OpenAI-compatible LLM. +func TestClassifier_Classify_LLM(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Verify the classifier asked for JSON mode. + var body struct { + ResponseFormat *struct { + Type string `json:"type"` + } `json:"response_format"` + } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + t.Fatalf("decode body: %v", err) + } + if body.ResponseFormat == nil || body.ResponseFormat.Type != "json_object" { + t.Errorf("classifier should request json_object response format") + } + + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, `{ + "model":"local-fast", + "choices":[{"message":{"role":"assistant","content":"{\"agent_type\":\"claude\",\"model\":\"claude-haiku-4-5-20251001\",\"reason\":\"trivial task\"}"},"finish_reason":"stop"}], + "usage":{"prompt_tokens":10,"completion_tokens":15} + }`) + })) + defer srv.Close() + + c := &Classifier{ + LLM: &llm.Client{Endpoint: srv.URL + "/v1", Model: "local-fast"}, + } + status := SystemStatus{ + ActiveTasks: map[string]int{"claude": 1, "gemini": 0}, + RateLimited: map[string]bool{}, + } + + cls, err := c.Classify(context.Background(), "List files", "ls -la", status, "claude") + if err != nil { + t.Fatalf("Classify: %v", err) + } + if cls.AgentType != "claude" { + t.Errorf("AgentType: want claude got %q", cls.AgentType) + } + if cls.Model != "claude-haiku-4-5-20251001" { + t.Errorf("Model: want claude-haiku-4-5-20251001 got %q", cls.Model) + } + if !strings.Contains(cls.Reason, "trivial") { + t.Errorf("Reason mismatch: %q", cls.Reason) + } +} + +// TestClassifier_LLMTakesPrecedence_OverGemini ensures the LLM path is preferred when both are configured. +func TestClassifier_LLMTakesPrecedence_OverGemini(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, `{"model":"x","choices":[{"message":{"content":"{\"agent_type\":\"claude\",\"model\":\"claude-sonnet-4-6\",\"reason\":\"r\"}"},"finish_reason":"stop"}],"usage":{}}`) + })) + defer srv.Close() + + c := &Classifier{ + LLM: &llm.Client{Endpoint: srv.URL + "/v1", Model: "x"}, + GeminiBinaryPath: "/nonexistent/gemini-binary-should-not-be-called", + } + cls, err := c.Classify(context.Background(), "n", "i", SystemStatus{}, "claude") + if err != nil { + t.Fatalf("Classify: %v", err) + } + if cls.Model != "claude-sonnet-4-6" { + t.Errorf("expected LLM path; got Model=%q", cls.Model) + } +} + func filepathJoin(elems ...string) string { var path string for i, e := range elems { diff --git a/internal/executor/claude.go b/internal/executor/claude.go new file mode 100644 index 0000000..3c87f26 --- /dev/null +++ b/internal/executor/claude.go @@ -0,0 +1,552 @@ +package executor + +import ( + "context" + "fmt" + "log/slog" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" + "syscall" + "time" + + "github.com/thepeterstone/claudomator/internal/retry" + "github.com/thepeterstone/claudomator/internal/storage" + "github.com/thepeterstone/claudomator/internal/task" +) + +// ClaudeRunner spawns the `claude` CLI in non-interactive mode. +type ClaudeRunner struct { + BinaryPath string // defaults to "claude" + Logger *slog.Logger + LogDir string // base directory for execution logs + APIURL string // base URL of the Claudomator API, passed to subprocesses +} + +// BlockedError is returned by Run when the agent wrote a question file and exited. +// The pool transitions the task to BLOCKED and stores the question for the user. +// ExecLogDir returns the log directory for the given execution ID. +// Implements LogPather so the pool can persist paths before execution starts. +func (r *ClaudeRunner) ExecLogDir(execID string) string { + if r.LogDir == "" { + return "" + } + return filepath.Join(r.LogDir, execID) +} + +func (r *ClaudeRunner) binaryPath() string { + if r.BinaryPath != "" { + return r.BinaryPath + } + return "claude" +} + +// Run executes a claude -p invocation, streaming output to log files. +// It retries up to 3 times on rate-limit errors using exponential backoff. +// If the agent writes a question file and exits, Run returns *BlockedError. +// +// When project_dir is set and this is not a resume execution, Run clones the +// project into a temp sandbox, runs the agent there, then merges committed +// changes back to project_dir. On failure the sandbox is preserved and its +// path is included in the error. +func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error { + projectDir := t.Agent.ProjectDir + + // Validate project_dir exists when set. + if projectDir != "" { + if _, err := os.Stat(projectDir); err != nil { + return fmt.Errorf("project_dir %q: %w", projectDir, err) + } + } + + // Setup log directory once; retries overwrite the log files. + logDir := r.ExecLogDir(e.ID) + if logDir == "" { + logDir = e.ID // fallback for tests without LogDir set + } + if err := os.MkdirAll(logDir, 0700); err != nil { + return fmt.Errorf("creating log dir: %w", err) + } + if e.StdoutPath == "" { + e.StdoutPath = filepath.Join(logDir, "stdout.log") + e.StderrPath = filepath.Join(logDir, "stderr.log") + e.ArtifactDir = logDir + } + + // Pre-assign session ID so we can resume after a BLOCKED state. + // For resume executions, the claude session continues under the original + // session ID (the one passed to --resume). Using the new exec's own UUID + // would cause a second block-and-resume cycle to pass the wrong --resume + // argument. + if e.SessionID == "" { + if e.ResumeSessionID != "" { + e.SessionID = e.ResumeSessionID + } else { + e.SessionID = e.ID // reuse execution UUID as session UUID (both are UUIDs) + } + } + + // For new (non-resume) executions with a project_dir, clone into a sandbox. + // Resume executions run in the preserved sandbox (e.SandboxDir) so Claude + // finds its session files under the same project slug. If no sandbox was + // preserved (e.g. task had no project_dir), fall back to project_dir. + var sandboxDir string + var startHEAD string + effectiveWorkingDir := projectDir + if e.ResumeSessionID != "" { + if e.SandboxDir != "" { + if _, statErr := os.Stat(e.SandboxDir); statErr == nil { + effectiveWorkingDir = e.SandboxDir + } else { + // Preserved sandbox was cleaned up (e.g. /tmp purge after reboot). + // Clone a fresh sandbox so the task can run rather than fail immediately. + r.Logger.Warn("preserved sandbox missing, cloning fresh", "sandbox", e.SandboxDir, "project_dir", projectDir) + e.SandboxDir = "" + if projectDir != "" { + var err error + sandboxDir, err = setupSandbox(t.Agent.ProjectDir, r.Logger) + if err != nil { + return fmt.Errorf("setting up sandbox: %w", err) + } + + effectiveWorkingDir = sandboxDir + r.Logger.Info("fresh sandbox created for resume", "sandbox", sandboxDir, "project_dir", projectDir) + } + } + } + } else if projectDir != "" { + var err error + sandboxDir, err = setupSandbox(t.Agent.ProjectDir, r.Logger) + if err != nil { + return fmt.Errorf("setting up sandbox: %w", err) + } + + effectiveWorkingDir = sandboxDir + r.Logger.Info("sandbox created", "sandbox", sandboxDir, "project_dir", projectDir) + } + + if effectiveWorkingDir != "" { + // Capture the initial HEAD so we can identify new commits later. + headOut, _ := exec.Command("git", gitSafe("-C", effectiveWorkingDir, "rev-parse", "HEAD")...).Output() + startHEAD = strings.TrimSpace(string(headOut)) + } + + questionFile := filepath.Join(logDir, "question.json") + args := r.buildArgs(t, e, questionFile) + + attempt := 0 + err := retry.RunWithBackoff(ctx, 3, 5*time.Second, func() error { + if attempt > 0 { + delay := 5 * time.Second * (1 << (attempt - 1)) + r.Logger.Warn("rate-limited by Claude API, retrying", + "attempt", attempt, + "delay", delay, + ) + } + attempt++ + return r.execOnce(ctx, args, effectiveWorkingDir, projectDir, e) + }) + if err != nil { + if sandboxDir != "" { + return fmt.Errorf("%w (sandbox preserved at %s)", err, sandboxDir) + } + return err + } + + // Check whether the agent left a question before exiting. + data, readErr := os.ReadFile(questionFile) + if readErr == nil { + os.Remove(questionFile) // consumed + questionJSON := strings.TrimSpace(string(data)) + // If the agent wrote a completion report instead of a real question, + // extract the text as the summary and fall through to normal completion. + if isCompletionReport(questionJSON) { + r.Logger.Info("treating question file as completion report", "taskID", e.TaskID) + e.Summary = extractQuestionText(questionJSON) + } else { + // Preserve sandbox on BLOCKED — agent may have partial work and its + // Claude session files are stored under the sandbox's project slug. + // The resume execution must run in the same directory. + return &BlockedError{QuestionJSON: questionJSON, SessionID: e.SessionID, SandboxDir: sandboxDir} + } + } + + // Read agent summary if written. + summaryFile := filepath.Join(logDir, "summary.txt") + if summaryData, readErr := os.ReadFile(summaryFile); readErr == nil { + os.Remove(summaryFile) // consumed + e.Summary = strings.TrimSpace(string(summaryData)) + } + + // Merge sandbox back to project_dir and clean up. + if sandboxDir != "" { + if mergeErr := teardownSandbox(projectDir, sandboxDir, startHEAD, r.Logger, e); mergeErr != nil { + return fmt.Errorf("sandbox teardown: %w (sandbox preserved at %s)", mergeErr, sandboxDir) + } + } + return nil +} + +// sandboxCloneSource returns the URL to clone the sandbox from. It prefers a +// remote named "local" (a local bare repo that accepts pushes cleanly), then +// falls back to "origin", then to the working copy path itself. +func sandboxCloneSource(projectDir string) string { + for _, remote := range []string{"local", "origin"} { + out, err := exec.Command("git", gitSafe("-C", projectDir, "remote", "get-url", remote)...).Output() + if err == nil { + u := strings.TrimSpace(string(out)) + if u != "" && (strings.HasPrefix(u, "/") || strings.HasPrefix(u, "file://")) { + return u + } + } + } + return projectDir +} + +// setupSandbox prepares a temporary git clone of projectDir. +// If projectDir is not a git repo it is initialised with an initial commit first. +func setupSandbox(projectDir string, logger *slog.Logger) (string, error) { + // Ensure projectDir is a git repo; initialise if not. + if err := exec.Command("git", gitSafe("-C", projectDir, "rev-parse", "--git-dir")...).Run(); err != nil { + cmds := [][]string{ + gitSafe("-C", projectDir, "init"), + gitSafe("-C", projectDir, "add", "-A"), + gitSafe("-C", projectDir, "commit", "--allow-empty", "-m", "chore: initial commit"), + } + for _, args := range cmds { + if out, err := exec.Command("git", args...).CombinedOutput(); err != nil { //nolint:gosec + return "", fmt.Errorf("git init %s: %w\n%s", projectDir, err, out) + } + } + } + + src := sandboxCloneSource(projectDir) + + tempDir, err := os.MkdirTemp("", "claudomator-sandbox-*") + if err != nil { + return "", fmt.Errorf("creating sandbox dir: %w", err) + } + // git clone requires the target to not exist; remove the placeholder first. + if err := os.Remove(tempDir); err != nil { + return "", fmt.Errorf("removing temp dir placeholder: %w", err) + } + out, err := exec.Command("git", gitSafe("clone", "--no-hardlinks", src, tempDir)...).CombinedOutput() + if err != nil { + return "", fmt.Errorf("git clone: %w\n%s", err, out) + } + return tempDir, nil +} + +// teardownSandbox verifies the sandbox is clean and pushes new commits to the +// canonical bare repo. If the push is rejected because another task pushed +// concurrently, it fetches and rebases then retries once. +// +// The working copy (projectDir) is NOT updated automatically — it is the +// developer's workspace and is pulled manually. This avoids permission errors +// from mixed-owner .git/objects directories. +func teardownSandbox(projectDir, sandboxDir, startHEAD string, logger *slog.Logger, execRecord *storage.Execution) error { + // Automatically commit uncommitted changes. + out, err := exec.Command("git", "-C", sandboxDir, "status", "--porcelain").Output() + if err != nil { + return fmt.Errorf("git status: %w", err) + } + if len(strings.TrimSpace(string(out))) > 0 { + logger.Info("autocommitting uncommitted changes", "sandbox", sandboxDir) + + // Run build before autocommitting. + if _, err := os.Stat(filepath.Join(sandboxDir, "Makefile")); err == nil { + logger.Info("running 'make build' before autocommit", "sandbox", sandboxDir) + if buildOut, buildErr := exec.Command("make", "-C", sandboxDir, "build").CombinedOutput(); buildErr != nil { + return fmt.Errorf("build failed before autocommit: %w\n%s", buildErr, buildOut) + } + } else if _, err := os.Stat(filepath.Join(sandboxDir, "gradlew")); err == nil { + logger.Info("running './gradlew build' before autocommit", "sandbox", sandboxDir) + cmd := exec.Command("./gradlew", "build") + cmd.Dir = sandboxDir + if buildOut, buildErr := cmd.CombinedOutput(); buildErr != nil { + return fmt.Errorf("build failed before autocommit: %w\n%s", buildErr, buildOut) + } + } else if _, err := os.Stat(filepath.Join(sandboxDir, "go.mod")); err == nil { + logger.Info("running 'go build ./...' before autocommit", "sandbox", sandboxDir) + cmd := exec.Command("go", "build", "./...") + cmd.Dir = sandboxDir + if buildOut, buildErr := cmd.CombinedOutput(); buildErr != nil { + return fmt.Errorf("build failed before autocommit: %w\n%s", buildErr, buildOut) + } + } + + cmds := [][]string{ + gitSafe("-C", sandboxDir, "add", "-A"), + gitSafe("-C", sandboxDir, "commit", "-m", "chore: autocommit uncommitted changes"), + } + for _, args := range cmds { + if out, err := exec.Command("git", args...).CombinedOutput(); err != nil { + return fmt.Errorf("autocommit failed (%v): %w\n%s", args, err, out) + } + } + } + + // Capture commits before pushing/deleting. + // Use startHEAD..HEAD to find all commits made during this execution. + logRange := "origin/HEAD..HEAD" + if startHEAD != "" && startHEAD != "HEAD" { + logRange = startHEAD + "..HEAD" + } + + logCmd := exec.Command("git", gitSafe("-C", sandboxDir, "log", logRange, "--pretty=format:%H|%s")...) + logOut, logErr := logCmd.CombinedOutput() + if logErr == nil { + lines := strings.Split(strings.TrimSpace(string(logOut)), "\n") + logger.Debug("captured commits", "count", len(lines), "range", logRange) + for _, line := range lines { + if line == "" { + continue + } + parts := strings.SplitN(line, "|", 2) + if len(parts) == 2 { + execRecord.Commits = append(execRecord.Commits, task.GitCommit{ + Hash: parts[0], + Message: parts[1], + }) + } + } + } else { + logger.Warn("failed to capture commits", "err", logErr, "range", logRange, "output", string(logOut)) + } + + // Check whether there are any new commits to push. + ahead, err := exec.Command("git", gitSafe("-C", sandboxDir, "rev-list", "--count", logRange)...).Output() + if err != nil { + logger.Warn("could not determine commits ahead of origin; proceeding", "err", err, "range", logRange) + } + if strings.TrimSpace(string(ahead)) == "0" { + os.RemoveAll(sandboxDir) + return nil + } + + // Push from sandbox → bare repo (sandbox's origin is the bare repo). + if out, err := exec.Command("git", "-C", sandboxDir, "push", "origin", "HEAD").CombinedOutput(); err != nil { + // If rejected due to concurrent push, fetch+rebase and retry once. + if strings.Contains(string(out), "fetch first") || strings.Contains(string(out), "non-fast-forward") { + logger.Info("push rejected (concurrent task); rebasing and retrying", "sandbox", sandboxDir) + if out2, err2 := exec.Command("git", "-C", sandboxDir, "pull", "--rebase", "origin", "master").CombinedOutput(); err2 != nil { + return fmt.Errorf("git rebase before retry push: %w\n%s", err2, out2) + } + // Re-capture commits after rebase (hashes might have changed) + execRecord.Commits = nil + logOut, logErr = exec.Command("git", "-C", sandboxDir, "log", logRange, "--pretty=format:%H|%s").Output() + if logErr == nil { + lines := strings.Split(strings.TrimSpace(string(logOut)), "\n") + for _, line := range lines { + parts := strings.SplitN(line, "|", 2) + if len(parts) == 2 { + execRecord.Commits = append(execRecord.Commits, task.GitCommit{ + Hash: parts[0], + Message: parts[1], + }) + } + } + } + + if out3, err3 := exec.Command("git", "-C", sandboxDir, "push", "origin", "HEAD").CombinedOutput(); err3 != nil { + return fmt.Errorf("git push to origin (after rebase): %w\n%s", err3, out3) + } + } else { + return fmt.Errorf("git push to origin: %w\n%s", err, out) + } + } + + logger.Info("sandbox pushed to bare repo", "sandbox", sandboxDir) + os.RemoveAll(sandboxDir) + return nil +} + +// execOnce runs the claude subprocess once, streaming output to e's log paths. +func (r *ClaudeRunner) execOnce(ctx context.Context, args []string, workingDir, projectDir string, e *storage.Execution) error { + cmd := exec.CommandContext(ctx, r.binaryPath(), args...) + cmd.Env = append(os.Environ(), + "CLAUDOMATOR_API_URL="+r.APIURL, + "CLAUDOMATOR_TASK_ID="+e.TaskID, + "CLAUDOMATOR_PROJECT_DIR="+projectDir, + "CLAUDOMATOR_QUESTION_FILE="+filepath.Join(e.ArtifactDir, "question.json"), + "CLAUDOMATOR_SUMMARY_FILE="+filepath.Join(e.ArtifactDir, "summary.txt"), + ) + // Put the subprocess in its own process group so we can SIGKILL the entire + // group (MCP servers, bash children, etc.) on cancellation. + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + if workingDir != "" { + cmd.Dir = workingDir + } + + stdoutFile, err := os.Create(e.StdoutPath) + if err != nil { + return fmt.Errorf("creating stdout log: %w", err) + } + defer stdoutFile.Close() + + stderrFile, err := os.Create(e.StderrPath) + if err != nil { + return fmt.Errorf("creating stderr log: %w", err) + } + defer stderrFile.Close() + + // Use os.Pipe for stdout so we own the read-end lifetime. + // cmd.StdoutPipe() would add the read-end to closeAfterWait, causing + // cmd.Wait() to close it before our goroutine finishes reading. + stdoutR, stdoutW, err := os.Pipe() + if err != nil { + return fmt.Errorf("creating stdout pipe: %w", err) + } + cmd.Stdout = stdoutW // *os.File — not added to closeAfterStart/Wait + cmd.Stderr = stderrFile + + if err := cmd.Start(); err != nil { + stdoutW.Close() + stdoutR.Close() + return fmt.Errorf("starting claude: %w", err) + } + // Close our write-end immediately; the subprocess holds its own copy. + // The goroutine below gets EOF when the subprocess exits. + stdoutW.Close() + + // killDone is closed when cmd.Wait() returns, stopping the pgid-kill goroutine. + // + // Safety: this goroutine cannot block indefinitely. The select has two arms: + // • ctx.Done() — fires if the caller cancels (e.g. timeout, user cancel). + // The goroutine sends SIGKILL and exits immediately. + // • killDone — closed by close(killDone) below, immediately after cmd.Wait() + // returns. This fires when the process exits for any reason (natural exit, + // SIGKILL from the ctx arm, or any other signal). The goroutine exits without + // doing anything. + // + // Therefore: for a task that completes normally with a long-lived (non-cancelled) + // context, the killDone arm fires and the goroutine exits. There is no path where + // this goroutine outlives execOnce(). + killDone := make(chan struct{}) + go func() { + select { + case <-ctx.Done(): + // SIGKILL the entire process group to reap orphan children. + syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) + case <-killDone: + } + }() + + // Stream stdout to the log file and parse cost/errors. + // wg ensures costUSD and streamErr are fully written before we read them after cmd.Wait(). + var costUSD float64 + var streamErr error + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + costUSD, _, streamErr = parseStream(stdoutR, stdoutFile, r.Logger) + stdoutR.Close() + }() + + waitErr := cmd.Wait() + close(killDone) // stop the pgid-kill goroutine + wg.Wait() // drain remaining stdout before reading costUSD/streamErr + + e.CostUSD = costUSD + + if waitErr != nil { + if exitErr, ok := waitErr.(*exec.ExitError); ok { + e.ExitCode = exitErr.ExitCode() + } + // If the stream captured a rate-limit or quota message, return it + // so callers can distinguish it from a generic exit-status failure. + if retry.IsRateLimitError(streamErr) || isQuotaExhausted(streamErr) { + return streamErr + } + if tail := tailFile(e.StderrPath, 20); tail != "" { + return fmt.Errorf("claude exited with error: %w\nstderr:\n%s", waitErr, tail) + } + return fmt.Errorf("claude exited with error: %w", waitErr) + } + + e.ExitCode = 0 + if streamErr != nil { + return streamErr + } + return nil +} + +func (r *ClaudeRunner) buildArgs(t *task.Task, e *storage.Execution, questionFile string) []string { + // Resume execution: the agent already has context; just deliver the answer. + if e.ResumeSessionID != "" { + args := []string{ + "-p", e.ResumeAnswer, + "--resume", e.ResumeSessionID, + "--output-format", "stream-json", + "--verbose", + } + permMode := t.Agent.PermissionMode + if permMode == "" { + permMode = "bypassPermissions" + } + args = append(args, "--permission-mode", permMode) + if t.Agent.Model != "" { + args = append(args, "--model", t.Agent.Model) + } + return args + } + + instructions := t.Agent.Instructions + allowedTools := t.Agent.AllowedTools + + if !t.Agent.SkipPlanning { + instructions = withPlanningPreamble(instructions) + // Ensure Bash is available so the agent can POST subtasks and ask questions. + hasBash := false + for _, tool := range allowedTools { + if tool == "Bash" { + hasBash = true + break + } + } + if !hasBash { + allowedTools = append(allowedTools, "Bash") + } + } + + args := []string{ + "-p", instructions, + "--session-id", e.SessionID, + "--output-format", "stream-json", + "--verbose", + } + + if t.Agent.Model != "" { + args = append(args, "--model", t.Agent.Model) + } + if t.Agent.MaxBudgetUSD > 0 { + args = append(args, "--max-budget-usd", fmt.Sprintf("%.2f", t.Agent.MaxBudgetUSD)) + } + // Default to bypassPermissions — claudomator runs tasks unattended, so + // prompting for write access would always stall execution. Tasks that need + // a more restrictive mode can set permission_mode explicitly. + permMode := t.Agent.PermissionMode + if permMode == "" { + permMode = "bypassPermissions" + } + args = append(args, "--permission-mode", permMode) + if t.Agent.SystemPromptAppend != "" { + args = append(args, "--append-system-prompt", t.Agent.SystemPromptAppend) + } + for _, tool := range allowedTools { + args = append(args, "--allowedTools", tool) + } + for _, tool := range t.Agent.DisallowedTools { + args = append(args, "--disallowedTools", tool) + } + for _, f := range t.Agent.ContextFiles { + args = append(args, "--add-dir", f) + } + args = append(args, t.Agent.AdditionalArgs...) + + return args +} + diff --git a/internal/executor/claude_test.go b/internal/executor/claude_test.go new file mode 100644 index 0000000..c01e160 --- /dev/null +++ b/internal/executor/claude_test.go @@ -0,0 +1,810 @@ +package executor + +import ( + "context" + "fmt" + "io" + "log/slog" + "os" + "os/exec" + "path/filepath" + "runtime" + "strings" + "testing" + "time" + + "github.com/thepeterstone/claudomator/internal/storage" + "github.com/thepeterstone/claudomator/internal/task" +) + +func TestClaudeRunner_BuildArgs_BasicTask(t *testing.T) { + r := &ClaudeRunner{} + tk := &task.Task{ + Agent: task.AgentConfig{ + Type: "claude", + Instructions: "fix the bug", + Model: "sonnet", + SkipPlanning: true, + }, + } + + args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") + + argMap := make(map[string]bool) + for _, a := range args { + argMap[a] = true + } + for _, want := range []string{"-p", "fix the bug", "--output-format", "stream-json", "--verbose", "--model", "sonnet"} { + if !argMap[want] { + t.Errorf("missing arg %q in %v", want, args) + } + } +} + +func TestClaudeRunner_BuildArgs_FullConfig(t *testing.T) { + r := &ClaudeRunner{} + tk := &task.Task{ + Agent: task.AgentConfig{ + Type: "claude", + Instructions: "implement feature", + Model: "opus", + MaxBudgetUSD: 5.0, + PermissionMode: "bypassPermissions", + SystemPromptAppend: "Follow TDD", + AllowedTools: []string{"Bash", "Edit"}, + DisallowedTools: []string{"Write"}, + ContextFiles: []string{"/src"}, + AdditionalArgs: []string{"--verbose"}, + SkipPlanning: true, + }, + } + + args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") + + // Check key args are present. + argMap := make(map[string]bool) + for _, a := range args { + argMap[a] = true + } + + requiredArgs := []string{ + "-p", "implement feature", "--output-format", "stream-json", + "--model", "opus", "--max-budget-usd", "5.00", + "--permission-mode", "bypassPermissions", + "--append-system-prompt", "Follow TDD", + "--allowedTools", "Bash", "Edit", + "--disallowedTools", "Write", + "--add-dir", "/src", + "--verbose", + } + for _, req := range requiredArgs { + if !argMap[req] { + t.Errorf("missing arg %q in %v", req, args) + } + } +} + +func TestClaudeRunner_BuildArgs_DefaultsToBypassPermissions(t *testing.T) { + r := &ClaudeRunner{} + tk := &task.Task{ + Agent: task.AgentConfig{ + Type: "claude", + Instructions: "do work", + SkipPlanning: true, + // PermissionMode intentionally not set + }, + } + + args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") + + found := false + for i, a := range args { + if a == "--permission-mode" && i+1 < len(args) && args[i+1] == "bypassPermissions" { + found = true + } + } + if !found { + t.Errorf("expected --permission-mode bypassPermissions when PermissionMode is empty, args: %v", args) + } +} + +func TestClaudeRunner_BuildArgs_RespectsExplicitPermissionMode(t *testing.T) { + r := &ClaudeRunner{} + tk := &task.Task{ + Agent: task.AgentConfig{ + Type: "claude", + Instructions: "do work", + PermissionMode: "default", + SkipPlanning: true, + }, + } + + args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") + + for i, a := range args { + if a == "--permission-mode" && i+1 < len(args) { + if args[i+1] != "default" { + t.Errorf("expected --permission-mode default, got %q", args[i+1]) + } + return + } + } + t.Errorf("--permission-mode flag not found in args: %v", args) +} + +func TestClaudeRunner_BuildArgs_AlwaysIncludesVerbose(t *testing.T) { + r := &ClaudeRunner{} + tk := &task.Task{ + Agent: task.AgentConfig{ + Type: "claude", + Instructions: "do something", + SkipPlanning: true, + }, + } + + args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") + + found := false + for _, a := range args { + if a == "--verbose" { + found = true + break + } + } + if !found { + t.Errorf("--verbose missing from args: %v", args) + } +} + +func TestClaudeRunner_BuildArgs_PreamblePrepended(t *testing.T) { + r := &ClaudeRunner{} + tk := &task.Task{ + Agent: task.AgentConfig{ + Type: "claude", + Instructions: "fix the bug", + SkipPlanning: false, + }, + } + + args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") + + // The -p value should start with the preamble and end with the original instructions. + if len(args) < 2 || args[0] != "-p" { + t.Fatalf("expected -p as first arg, got: %v", args) + } + if !strings.HasPrefix(args[1], "## Runtime Environment") { + t.Errorf("instructions should start with planning preamble, got prefix: %q", args[1][:min(len(args[1]), 20)]) + } + if !strings.Contains(args[1], "$CLAUDOMATOR_PROJECT_DIR") { + t.Errorf("preamble should mention $CLAUDOMATOR_PROJECT_DIR") + } + if !strings.HasSuffix(args[1], "fix the bug") { + t.Errorf("instructions should end with original instructions") + } +} + +func TestClaudeRunner_BuildArgs_PreambleAddsBash(t *testing.T) { + r := &ClaudeRunner{} + tk := &task.Task{ + Agent: task.AgentConfig{ + Type: "claude", + Instructions: "do work", + AllowedTools: []string{"Read"}, + SkipPlanning: false, + }, + } + + args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") + + // Bash should be appended to allowed tools. + foundBash := false + for i, a := range args { + if a == "--allowedTools" && i+1 < len(args) && args[i+1] == "Bash" { + foundBash = true + } + } + if !foundBash { + t.Errorf("Bash should be added to --allowedTools when preamble is active: %v", args) + } +} + +func TestClaudeRunner_BuildArgs_PreambleBashNotDuplicated(t *testing.T) { + r := &ClaudeRunner{} + tk := &task.Task{ + Agent: task.AgentConfig{ + Type: "claude", + Instructions: "do work", + AllowedTools: []string{"Bash", "Read"}, + SkipPlanning: false, + }, + } + + args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") + + // Count Bash occurrences in --allowedTools values. + bashCount := 0 + for i, a := range args { + if a == "--allowedTools" && i+1 < len(args) && args[i+1] == "Bash" { + bashCount++ + } + } + if bashCount != 1 { + t.Errorf("Bash should appear exactly once in --allowedTools, got %d: %v", bashCount, args) + } +} + +// TestClaudeRunner_Run_ResumeSetsSessionIDFromResumeSession verifies that when a +// resume execution is itself blocked again, the stored SessionID is the original +// resumed session, not the new execution's own UUID. Without this, a second +// block-and-resume cycle passes the wrong --resume session ID and fails. +func TestClaudeRunner_Run_ResumeSetsSessionIDFromResumeSession(t *testing.T) { + logDir := t.TempDir() + r := &ClaudeRunner{ + BinaryPath: "true", // exits 0, no output + Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + LogDir: logDir, + } + tk := &task.Task{ + Agent: task.AgentConfig{ + Type: "claude", + Instructions: "continue", + SkipPlanning: true, + }, + } + exec := &storage.Execution{ + ID: "resume-exec-uuid", + TaskID: "task-1", + ResumeSessionID: "original-session-uuid", + ResumeAnswer: "yes", + } + + // Run completes successfully (binary is "true"). + _ = r.Run(context.Background(), tk, exec) + + // SessionID must be the original session (ResumeSessionID), not the new + // exec's own ID. If it were exec.ID, a second blocked-then-resumed cycle + // would use the wrong --resume argument and fail. + if exec.SessionID != "original-session-uuid" { + t.Errorf("SessionID after resume Run: want %q, got %q", "original-session-uuid", exec.SessionID) + } +} + +func TestClaudeRunner_Run_InaccessibleWorkingDir_ReturnsError(t *testing.T) { + r := &ClaudeRunner{ + BinaryPath: "true", // would succeed if it ran + Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + LogDir: t.TempDir(), + } + tk := &task.Task{ + Agent: task.AgentConfig{ + Type: "claude", + ProjectDir: "/nonexistent/path/does/not/exist", + SkipPlanning: true, + }, + } + exec := &storage.Execution{ID: "test-exec"} + + err := r.Run(context.Background(), tk, exec) + + if err == nil { + t.Fatal("expected error for inaccessible working_dir, got nil") + } + if !strings.Contains(err.Error(), "project_dir") { + t.Errorf("expected 'project_dir' in error, got: %v", err) + } +} + +func TestClaudeRunner_BinaryPath_Default(t *testing.T) { + r := &ClaudeRunner{} + if r.binaryPath() != "claude" { + t.Errorf("want 'claude', got %q", r.binaryPath()) + } +} + +func TestClaudeRunner_BinaryPath_Custom(t *testing.T) { + r := &ClaudeRunner{BinaryPath: "/usr/local/bin/claude"} + if r.binaryPath() != "/usr/local/bin/claude" { + t.Errorf("want custom path, got %q", r.binaryPath()) + } +} + +// TestExecOnce_NoGoroutineLeak_OnNaturalExit verifies that execOnce does not +// leave behind any goroutines when the subprocess exits normally (no context +// cancellation). Both the pgid-kill goroutine and the parseStream goroutine +// must have exited before execOnce returns. +func TestExecOnce_NoGoroutineLeak_OnNaturalExit(t *testing.T) { + logDir := t.TempDir() + r := &ClaudeRunner{ + BinaryPath: "true", // exits immediately with status 0, produces no output + Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + LogDir: logDir, + } + e := &storage.Execution{ + ID: "goroutine-leak-test", + TaskID: "task-id", + StdoutPath: filepath.Join(logDir, "stdout.log"), + StderrPath: filepath.Join(logDir, "stderr.log"), + ArtifactDir: logDir, + } + + // Let any goroutines from test infrastructure settle before sampling. + runtime.Gosched() + baseline := runtime.NumGoroutine() + + if err := r.execOnce(context.Background(), []string{}, "", "", e); err != nil { + t.Fatalf("execOnce failed: %v", err) + } + + // Give the scheduler a moment to let any leaked goroutines actually exit. + // In correct code the goroutines exit before execOnce returns, so this is + // just a safety buffer for the scheduler. + time.Sleep(10 * time.Millisecond) + runtime.Gosched() + + after := runtime.NumGoroutine() + if after > baseline { + t.Errorf("goroutine leak: %d goroutines before execOnce, %d after (leaked %d)", + baseline, after, after-baseline) + } +} + +// initGitRepo creates a git repo in dir with one commit so it is clonable. +func initGitRepo(t *testing.T, dir string) { + t.Helper() + cmds := [][]string{ + {"git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", dir, "init", "-b", "main"}, + {"git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", dir, "config", "user.email", "test@test"}, + {"git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", dir, "config", "user.name", "test"}, + } + for _, args := range cmds { + if out, err := exec.Command(args[0], args[1:]...).CombinedOutput(); err != nil { + t.Fatalf("%v: %v\n%s", args, err, out) + } + } + if err := os.WriteFile(filepath.Join(dir, "init.txt"), []byte("init"), 0644); err != nil { + t.Fatal(err) + } + if out, err := exec.Command("git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", dir, "add", ".").CombinedOutput(); err != nil { + t.Fatalf("git add: %v\n%s", err, out) + } + if out, err := exec.Command("git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", dir, "commit", "-m", "init").CombinedOutput(); err != nil { + t.Fatalf("git commit: %v\n%s", err, out) + } +} + +func TestSandboxCloneSource_PrefersLocalRemote(t *testing.T) { + dir := t.TempDir() + initGitRepo(t, dir) + // Add a "local" remote pointing to a bare repo. + bare := t.TempDir() + exec.Command("git", "init", "--bare", bare).Run() + exec.Command("git", "-C", dir, "remote", "add", "local", bare).Run() + exec.Command("git", "-C", dir, "remote", "add", "origin", "https://example.com/repo").Run() + + got := sandboxCloneSource(dir) + if got != bare { + t.Errorf("expected bare repo path %q, got %q", bare, got) + } +} + +func TestSandboxCloneSource_FallsBackToOrigin(t *testing.T) { + dir := t.TempDir() + initGitRepo(t, dir) + // sandboxCloneSource intentionally filters to local-FS remotes (so + // `git clone <src>` doesn't go over the network). Use a local path + // for origin to verify the fallback semantics. + originURL := t.TempDir() + exec.Command("git", "-C", dir, "remote", "add", "origin", originURL).Run() + + got := sandboxCloneSource(dir) + if got != originURL { + t.Errorf("expected origin URL %q, got %q", originURL, got) + } +} + +func TestSandboxCloneSource_FallsBackToProjectDir(t *testing.T) { + dir := t.TempDir() + initGitRepo(t, dir) + // No remotes configured. + got := sandboxCloneSource(dir) + if got != dir { + t.Errorf("expected projectDir %q (no remotes), got %q", dir, got) + } +} + +func TestSetupSandbox_ClonesGitRepo(t *testing.T) { + src := t.TempDir() + initGitRepo(t, src) + + sandbox, err := setupSandbox(src, slog.Default()) + if err != nil { + t.Fatalf("setupSandbox: %v", err) + } + t.Cleanup(func() { os.RemoveAll(sandbox) }) + + // Force sandbox to master if it cloned as main + exec.Command("git", gitSafe("-C", sandbox, "checkout", "master")...).Run() + + // Debug sandbox + logOut, _ := exec.Command("git", "-C", sandbox, "log", "-1").CombinedOutput() + fmt.Printf("DEBUG: sandbox log: %s\n", string(logOut)) + + // Verify sandbox is a git repo with at least one commit. + out, err := exec.Command("git", "-C", sandbox, "log", "--oneline").Output() + if err != nil { + t.Fatalf("git log in sandbox: %v", err) + } + if len(strings.TrimSpace(string(out))) == 0 { + t.Error("expected at least one commit in sandbox, got empty log") + } +} + +func TestSetupSandbox_InitialisesNonGitDir(t *testing.T) { + // A plain directory (not a git repo) should be initialised then cloned. + src := t.TempDir() + + sandbox, err := setupSandbox(src, slog.Default()) + if err != nil { + t.Fatalf("setupSandbox on plain dir: %v", err) + } + t.Cleanup(func() { os.RemoveAll(sandbox) }) + + if _, err := os.Stat(filepath.Join(sandbox, ".git")); err != nil { + t.Errorf("sandbox should be a git repo: %v", err) + } +} + +func TestTeardownSandbox_AutocommitsChanges(t *testing.T) { + // Create a bare repo as origin so push succeeds. + bare := t.TempDir() + if out, err := exec.Command("git", "init", "--bare", "-b", "main", bare).CombinedOutput(); err != nil { + t.Fatalf("git init bare: %v\n%s", err, out) + } + + // Create a sandbox directly. + sandbox := t.TempDir() + initGitRepo(t, sandbox) + if out, err := exec.Command("git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", sandbox, "remote", "add", "origin", bare).CombinedOutput(); err != nil { + t.Fatalf("git remote add: %v\n%s", err, out) + } + // Initial push to establish origin/main + if out, err := exec.Command("git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", sandbox, "push", "origin", "main").CombinedOutput(); err != nil { + t.Fatalf("git push initial: %v\n%s", err, out) + } + + // Capture startHEAD + headOut, err := exec.Command("git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", sandbox, "rev-parse", "HEAD").Output() + if err != nil { + t.Fatalf("rev-parse HEAD: %v", err) + } + startHEAD := strings.TrimSpace(string(headOut)) + + // Leave an uncommitted file in the sandbox. + if err := os.WriteFile(filepath.Join(sandbox, "dirty.txt"), []byte("autocommit me"), 0644); err != nil { + t.Fatal(err) + } + + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug})) + execRecord := &storage.Execution{} + + err = teardownSandbox("", sandbox, startHEAD, logger, execRecord) + if err != nil { + t.Fatalf("expected autocommit to succeed, got error: %v", err) + } + + // Sandbox should be removed after successful autocommit and push. + if _, statErr := os.Stat(sandbox); !os.IsNotExist(statErr) { + t.Error("sandbox should have been removed after successful autocommit and push") + } + + // Verify the commit exists in the bare repo. + out, err := exec.Command("git", "-C", bare, "log", "-1", "--pretty=%B").Output() + if err != nil { + t.Fatalf("git log in bare repo: %v", err) + } + if !strings.Contains(string(out), "chore: autocommit uncommitted changes") { + t.Errorf("expected autocommit message in log, got: %q", string(out)) + } + + // Verify the commit was captured in execRecord. + if len(execRecord.Commits) == 0 { + t.Error("expected at least one commit in execRecord") + } else if !strings.Contains(execRecord.Commits[0].Message, "chore: autocommit uncommitted changes") { + t.Errorf("unexpected commit message: %q", execRecord.Commits[0].Message) + } +} + +func TestTeardownSandbox_BuildFailure_BlocksAutocommit(t *testing.T) { + bare := t.TempDir() + if out, err := exec.Command("git", "init", "--bare", "-b", "main", bare).CombinedOutput(); err != nil { + t.Fatalf("git init bare: %v\n%s", err, out) + } + + sandbox := t.TempDir() + initGitRepo(t, sandbox) + if out, err := exec.Command("git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", sandbox, "remote", "add", "origin", bare).CombinedOutput(); err != nil { + t.Fatalf("git remote add: %v\n%s", err, out) + } + + // Capture startHEAD + headOut, err := exec.Command("git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", sandbox, "rev-parse", "HEAD").Output() + if err != nil { + t.Fatalf("rev-parse HEAD: %v", err) + } + startHEAD := strings.TrimSpace(string(headOut)) + + // Leave an uncommitted file. + if err := os.WriteFile(filepath.Join(sandbox, "dirty.txt"), []byte("dirty"), 0644); err != nil { + t.Fatal(err) + } + + // Add a failing Makefile. + makefile := "build:\n\t@echo 'build failed'\n\texit 1\n" + if err := os.WriteFile(filepath.Join(sandbox, "Makefile"), []byte(makefile), 0644); err != nil { + t.Fatal(err) + } + + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + execRecord := &storage.Execution{} + + err = teardownSandbox("", sandbox, startHEAD, logger, execRecord) + if err == nil { + t.Error("expected teardown to fail due to build failure, but it succeeded") + } else if !strings.Contains(err.Error(), "build failed before autocommit") { + t.Errorf("expected build failure error message, got: %v", err) + } + + // Sandbox should NOT be removed if teardown failed. + if _, statErr := os.Stat(sandbox); os.IsNotExist(statErr) { + t.Error("sandbox should have been preserved after build failure") + } + + // Verify no new commit in bare repo. + out, err := exec.Command("git", "-C", bare, "log", "HEAD").CombinedOutput() + if strings.Contains(string(out), "chore: autocommit uncommitted changes") { + t.Error("autocommit should not have been pushed after build failure") + } +} + +func TestTeardownSandbox_BuildSuccess_ProceedsToAutocommit(t *testing.T) { + bare := t.TempDir() + if out, err := exec.Command("git", "init", "--bare", "-b", "main", bare).CombinedOutput(); err != nil { + t.Fatalf("git init bare: %v\n%s", err, out) + } + + sandbox := t.TempDir() + initGitRepo(t, sandbox) + if out, err := exec.Command("git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", sandbox, "remote", "add", "origin", bare).CombinedOutput(); err != nil { + t.Fatalf("git remote add: %v\n%s", err, out) + } + + // Capture startHEAD + headOut, err := exec.Command("git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", sandbox, "rev-parse", "HEAD").Output() + if err != nil { + t.Fatalf("rev-parse HEAD: %v", err) + } + startHEAD := strings.TrimSpace(string(headOut)) + + // Leave an uncommitted file. + if err := os.WriteFile(filepath.Join(sandbox, "dirty.txt"), []byte("dirty"), 0644); err != nil { + t.Fatal(err) + } + + // Add a successful Makefile. + makefile := "build:\n\t@echo 'build succeeded'\n" + if err := os.WriteFile(filepath.Join(sandbox, "Makefile"), []byte(makefile), 0644); err != nil { + t.Fatal(err) + } + + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + execRecord := &storage.Execution{} + + err = teardownSandbox("", sandbox, startHEAD, logger, execRecord) + if err != nil { + t.Fatalf("expected teardown to succeed after build success, got error: %v", err) + } + + // Sandbox should be removed after success. + if _, statErr := os.Stat(sandbox); !os.IsNotExist(statErr) { + t.Error("sandbox should have been removed after successful build and autocommit") + } + + // Verify new commit in bare repo. + out, err := exec.Command("git", "-C", bare, "log", "-1", "--pretty=%B").Output() + if err != nil { + t.Fatalf("git log in bare repo: %v", err) + } + if !strings.Contains(string(out), "chore: autocommit uncommitted changes") { + t.Errorf("expected autocommit message in log, got: %q", string(out)) + } +} + + +func TestTeardownSandbox_CapturesExplicitCommits(t *testing.T) { + bare := t.TempDir() + if out, err := exec.Command("git", "init", "--bare", "-b", "main", bare).CombinedOutput(); err != nil { + t.Fatalf("git init bare: %v\n%s", err, out) + } + + sandbox := t.TempDir() + initGitRepo(t, sandbox) + if out, err := exec.Command("git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", sandbox, "remote", "add", "origin", bare).CombinedOutput(); err != nil { + t.Fatalf("git remote add: %v\n%s", err, out) + } + if out, err := exec.Command("git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", sandbox, "push", "origin", "main").CombinedOutput(); err != nil { + t.Fatalf("git push initial: %v\n%s", err, out) + } + + headOut, err := exec.Command("git", "-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", sandbox, "rev-parse", "HEAD").Output() + if err != nil { + t.Fatalf("rev-parse HEAD: %v", err) + } + startHEAD := strings.TrimSpace(string(headOut)) + + // Simulate Claude explicitly committing changes. + if err := os.WriteFile(filepath.Join(sandbox, "work.txt"), []byte("done"), 0644); err != nil { + t.Fatal(err) + } + for _, args := range [][]string{ + {"-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", sandbox, "add", "-A"}, + {"-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-C", sandbox, "commit", "-m", "feat: implement the feature"}, + } { + if out, err := exec.Command("git", args...).CombinedOutput(); err != nil { + t.Fatalf("git %v: %v\n%s", args, err, out) + } + } + + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + execRecord := &storage.Execution{} + + if err := teardownSandbox("", sandbox, startHEAD, logger, execRecord); err != nil { + t.Fatalf("teardownSandbox: %v", err) + } + + if len(execRecord.Commits) == 0 { + t.Fatal("expected commits to be captured in execRecord") + } + if !strings.Contains(execRecord.Commits[0].Message, "feat: implement the feature") { + t.Errorf("unexpected commit message: %q", execRecord.Commits[0].Message) + } + if execRecord.Commits[0].Hash == "" { + t.Error("commit hash should not be empty") + } +} + +func TestTeardownSandbox_CleanSandboxWithNoNewCommits_RemovesSandbox(t *testing.T) { + src := t.TempDir() + initGitRepo(t, src) + sandbox, err := setupSandbox(src, slog.Default()) + if err != nil { + t.Fatalf("setupSandbox: %v", err) + } + + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + execRecord := &storage.Execution{} + + headOut, _ := exec.Command("git", "-C", sandbox, "rev-parse", "HEAD").Output() + startHEAD := strings.TrimSpace(string(headOut)) + + // Sandbox has no new commits beyond origin; teardown should succeed and remove it. + if err := teardownSandbox(src, sandbox, startHEAD, logger, execRecord); err != nil { + t.Fatalf("teardownSandbox: %v", err) + } + if _, statErr := os.Stat(sandbox); !os.IsNotExist(statErr) { + t.Error("sandbox should have been removed after clean teardown") + os.RemoveAll(sandbox) + } +} + + +// TestClaudeRunner_Run_ResumeUsesStoredSandboxDir verifies that when a resume +// execution has SandboxDir set, the runner uses that directory (not project_dir) +// as the working directory, so Claude finds its session files there. +func TestClaudeRunner_Run_ResumeUsesStoredSandboxDir(t *testing.T) { + logDir := t.TempDir() + sandboxDir := t.TempDir() + cwdFile := filepath.Join(logDir, "cwd.txt") + + // Use a script that writes its working directory to a file in logDir (stable path). + scriptPath := filepath.Join(t.TempDir(), "fake-claude.sh") + script := "#!/bin/sh\nprintf '%s' \"$PWD\" > " + cwdFile + "\n" + if err := os.WriteFile(scriptPath, []byte(script), 0755); err != nil { + t.Fatalf("write script: %v", err) + } + + r := &ClaudeRunner{ + BinaryPath: scriptPath, + Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + LogDir: logDir, + } + tk := &task.Task{ + Agent: task.AgentConfig{ + Type: "claude", + ProjectDir: sandboxDir, // must exist; resume overrides it with SandboxDir anyway + SkipPlanning: true, + }, + } + exec := &storage.Execution{ + ID: "resume-exec-uuid", + TaskID: "task-1", + ResumeSessionID: "original-session", + ResumeAnswer: "yes", + SandboxDir: sandboxDir, + } + + _ = r.Run(context.Background(), tk, exec) + + got, err := os.ReadFile(cwdFile) + if err != nil { + t.Fatalf("cwd file not written: %v", err) + } + // The runner should have executed claude in sandboxDir, not in project_dir. + if string(got) != sandboxDir { + t.Errorf("resume working dir: want %q, got %q", sandboxDir, string(got)) + } +} + +func TestClaudeRunner_Run_StaleSandboxDir_ClonesAfresh(t *testing.T) { + logDir := t.TempDir() + projectDir := t.TempDir() + initGitRepo(t, projectDir) + + cwdFile := filepath.Join(logDir, "cwd.txt") + scriptPath := filepath.Join(t.TempDir(), "fake-claude.sh") + script := "#!/bin/sh\nprintf '%s' \"$PWD\" > " + cwdFile + "\n" + if err := os.WriteFile(scriptPath, []byte(script), 0755); err != nil { + t.Fatalf("write script: %v", err) + } + + r := &ClaudeRunner{ + BinaryPath: scriptPath, + Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + LogDir: logDir, + } + tk := &task.Task{ + Agent: task.AgentConfig{ + Type: "claude", + ProjectDir: projectDir, + SkipPlanning: true, + }, + } + // Point to a sandbox that no longer exists (e.g. /tmp was purged). + staleSandbox := filepath.Join(t.TempDir(), "gone") + e := &storage.Execution{ + ID: "resume-exec-2", + TaskID: "task-2", + ResumeSessionID: "session-abc", + ResumeAnswer: "ok", + SandboxDir: staleSandbox, + } + + if err := r.Run(context.Background(), tk, e); err != nil { + t.Fatalf("Run with stale sandbox: %v", err) + } + + got, err := os.ReadFile(cwdFile) + if err != nil { + t.Fatalf("cwd file not written: %v", err) + } + // Should have run in a fresh sandbox (not the stale path, not the raw projectDir). + // The sandbox is removed after teardown, so we only check what it wasn't. + cwd := string(got) + if cwd == staleSandbox { + t.Error("ran in stale sandbox dir that doesn't exist") + } + if cwd == projectDir { + t.Error("ran directly in project_dir; expected a fresh sandbox clone") + } + // cwd should look like a claudomator sandbox path. + if !strings.Contains(cwd, "claudomator-sandbox-") { + t.Errorf("expected sandbox path, got %q", cwd) + } +} + +func TestTailFile_MissingFile_ReturnsEmpty(t *testing.T) { + got := tailFile("/nonexistent/path/file.log", 10) + if got != "" { + t.Errorf("want empty string for missing file, got %q", got) + } +} + diff --git a/internal/executor/container_test.go b/internal/executor/container_test.go index f840f85..f0b2a3a 100644 --- a/internal/executor/container_test.go +++ b/internal/executor/container_test.go @@ -334,7 +334,7 @@ func TestDetectUncommittedChanges_CleanRepo(t *testing.T) { func TestGitSafe_PrependsSafeDirectory(t *testing.T) { got := gitSafe("-C", "/some/path", "status") - want := []string{"-c", "safe.directory=*", "-C", "/some/path", "status"} + want := []string{"-c", "safe.directory=*", "-c", "commit.gpgsign=false", "-c", "tag.gpgsign=false", "-C", "/some/path", "status"} if len(got) != len(want) { t.Fatalf("gitSafe() = %v, want %v", got, want) } diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 376d62c..09169bd 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -12,6 +12,8 @@ import ( "sync" "time" + "github.com/thepeterstone/claudomator/internal/llm" + "github.com/thepeterstone/claudomator/internal/retry" "github.com/thepeterstone/claudomator/internal/storage" "github.com/thepeterstone/claudomator/internal/task" "github.com/google/uuid" @@ -75,17 +77,18 @@ type Pool struct { mu sync.Mutex active int activePerAgent map[string]int - rateLimited map[string]time.Time // agentType -> until + rateLimited map[string]time.Time // agentType -> until cancels map[string]context.CancelFunc // taskID → cancel - consecutiveFailures map[string]int // agentType -> count - closed bool // set to true when Shutdown has been called + consecutiveFailures map[string]int // agentType -> count + closed bool // set to true when Shutdown has been called resultCh chan *Result - startedCh chan string // task IDs that just transitioned to RUNNING - workCh chan workItem // internal bounded queue; Submit enqueues here - doneCh chan struct{} // signals when a worker slot is freed - workerWg sync.WaitGroup // tracks in-flight execute/executeResume goroutines - dispatchDone chan struct{} // closed when the dispatch goroutine exits + startedCh chan string // task IDs that just transitioned to RUNNING + workCh chan workItem // internal bounded queue; Submit enqueues here + doneCh chan struct{} // signals when a worker slot is freed + workerWg sync.WaitGroup // tracks in-flight execute/executeResume goroutines + dispatchDone chan struct{} // closed when the dispatch goroutine exits Classifier *Classifier + LLM *llm.Client } // Result is emitted when a task execution completes. @@ -258,6 +261,28 @@ func (p *Pool) getRunner(t *task.Task) (Runner, error) { return runner, nil } +// decActiveAgent decrements the active counters for a finished task. Safe to +// call multiple times — subsequent calls are no-ops via the cleaned flag. +// Always call this before sending on resultCh so consumers observing a result +// see the accounting already settled (no zero-count map entries lingering). +func (p *Pool) decActiveAgent(agentType string, cleaned *bool) { + if *cleaned { + return + } + *cleaned = true + p.mu.Lock() + p.active-- + p.activePerAgent[agentType]-- + if p.activePerAgent[agentType] == 0 { + delete(p.activePerAgent, agentType) + } + p.mu.Unlock() + select { + case p.doneCh <- struct{}{}: + default: + } +} + func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Execution) { agentType := t.Agent.Type if agentType == "" { @@ -268,25 +293,13 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex p.activePerAgent[agentType]++ p.mu.Unlock() - defer func() { - p.mu.Lock() - p.active-- - p.mu.Unlock() - select { - case p.doneCh <- struct{}{}: - default: - } - }() + var cleaned bool + defer p.decActiveAgent(agentType, &cleaned) runner, err := p.getRunner(t) if err != nil { p.logger.Error("failed to get runner for resume", "error", err, "taskID", t.ID) - p.mu.Lock() - p.activePerAgent[agentType]-- - if p.activePerAgent[agentType] == 0 { - delete(p.activePerAgent, agentType) - } - p.mu.Unlock() + p.decActiveAgent(agentType, &cleaned) p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} return } @@ -342,6 +355,7 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex err = runner.Run(ctx, t, exec) exec.EndTime = time.Now().UTC() + p.decActiveAgent(agentType, &cleaned) p.handleRunResult(ctx, t, exec, err, agentType) } @@ -351,9 +365,9 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex // resultCh. The caller must set exec.EndTime before calling. func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.Execution, err error, agentType string) { if err != nil { - if isRateLimitError(err) || isQuotaExhausted(err) { + if retry.IsRateLimitError(err) || isQuotaExhausted(err) { p.mu.Lock() - retryAfter := parseRetryAfter(err.Error()) + retryAfter := retry.ParseRetryAfter(err.Error()) reason := "transient" if isQuotaExhausted(err) { reason = "quota" @@ -505,6 +519,9 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. if summary == "" && exec.StdoutPath != "" { summary = extractSummary(exec.StdoutPath) } + if summary == "" && p.LLM != nil && exec.StdoutPath != "" { + summary = synthesizeSummary(ctx, p.LLM, exec.StdoutPath) + } if summary != "" { if summaryErr := p.store.UpdateTaskSummary(t.ID, summary); summaryErr != nil { p.logger.Error("failed to update task summary", "taskID", t.ID, "error", summaryErr) @@ -528,12 +545,6 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. if updateErr := p.store.UpdateExecution(exec); updateErr != nil { p.logger.Error("failed to update execution", "error", updateErr) } - p.mu.Lock() - p.activePerAgent[agentType]-- - if p.activePerAgent[agentType] == 0 { - delete(p.activePerAgent, agentType) - } - p.mu.Unlock() p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} } @@ -884,8 +895,11 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { } p.mu.Unlock() - // If a specific agent is already requested, skip selection and classification. - skipClassification := t.Agent.Type == "claude" || t.Agent.Type == "gemini" + // If a specific agent is already requested AND we have a runner registered + // for it, skip selection and classification. Unknown/empty types fall + // through to the load balancer. + _, runnerKnown := p.runners[t.Agent.Type] + skipClassification := t.Agent.Type != "" && runnerKnown if !skipClassification { // Deterministically pick the agent with fewest active tasks. @@ -915,16 +929,6 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { agentType = "claude" } - defer func() { - p.mu.Lock() - p.active-- - p.mu.Unlock() - select { - case p.doneCh <- struct{}{}: - default: - } - }() - // Check dependencies before taking the per-agent slot to avoid deadlock: // if a dependent task holds the slot while waiting for its dependency to run, // the dependency can never start (maxPerAgent=1). @@ -981,6 +985,9 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { p.activePerAgent[agentType]++ p.mu.Unlock() + var cleaned bool + defer p.decActiveAgent(agentType, &cleaned) + runner, err := p.getRunner(t) if err != nil { p.logger.Error("failed to get runner", "error", err, "taskID", t.ID) @@ -999,12 +1006,7 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil { p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateFailed, "error", err) } - p.mu.Lock() - p.activePerAgent[agentType]-- - if p.activePerAgent[agentType] == 0 { - delete(p.activePerAgent, agentType) - } - p.mu.Unlock() + p.decActiveAgent(agentType, &cleaned) p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} return } @@ -1074,6 +1076,7 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { err = runner.Run(ctx, t, exec) exec.EndTime = time.Now().UTC() + p.decActiveAgent(agentType, &cleaned) p.handleRunResult(ctx, t, exec, err, agentType) } diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go index fac7e9c..9214872 100644 --- a/internal/executor/executor_test.go +++ b/internal/executor/executor_test.go @@ -1101,6 +1101,7 @@ type minimalMockStore struct { executions map[string]*storage.Execution stateUpdates []struct{ id string; state task.State } questionUpdates []string + summaryUpdates []struct{ taskID, summary string } changestatCalls []struct { execID string stats *task.Changestats @@ -1159,7 +1160,21 @@ func (m *minimalMockStore) UpdateTaskQuestion(taskID, questionJSON string) error m.mu.Unlock() return nil } -func (m *minimalMockStore) UpdateTaskSummary(taskID, summary string) error { return nil } +func (m *minimalMockStore) UpdateTaskSummary(taskID, summary string) error { + m.mu.Lock() + m.summaryUpdates = append(m.summaryUpdates, struct{ taskID, summary string }{taskID, summary}) + m.mu.Unlock() + return nil +} +func (m *minimalMockStore) lastSummaryUpdate() (string, string, bool) { + m.mu.Lock() + defer m.mu.Unlock() + if len(m.summaryUpdates) == 0 { + return "", "", false + } + last := m.summaryUpdates[len(m.summaryUpdates)-1] + return last.taskID, last.summary, true +} func (m *minimalMockStore) AppendTaskInteraction(taskID string, _ task.Interaction) error { return nil } diff --git a/internal/executor/gemini.go b/internal/executor/gemini.go new file mode 100644 index 0000000..3abec05 --- /dev/null +++ b/internal/executor/gemini.go @@ -0,0 +1,346 @@ +package executor + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log/slog" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" + "syscall" + + "github.com/thepeterstone/claudomator/internal/storage" + "github.com/thepeterstone/claudomator/internal/task" +) + +// GeminiRunner spawns the `gemini` CLI in non-interactive mode. +type GeminiRunner struct { + BinaryPath string // defaults to "gemini" + Logger *slog.Logger + LogDir string // base directory for execution logs + APIURL string // base URL of the Claudomator API, passed to subprocesses +} + +// ExecLogDir returns the log directory for the given execution ID. +func (r *GeminiRunner) ExecLogDir(execID string) string { + if r.LogDir == "" { + return "" + } + return filepath.Join(r.LogDir, execID) +} + +func (r *GeminiRunner) binaryPath() string { + if r.BinaryPath != "" { + return r.BinaryPath + } + return "gemini" +} + +// Run executes the gemini CLI inside a sandboxed clone of project_dir. +// When project_dir is set, claudomator first clones it into a temp sandbox +// (preferring a `local` bare remote, then `origin`, then the working tree) +// and runs the agent there. On success the sandbox is autocommitted and +// pushed back to origin/master, then removed. On failure the sandbox is +// preserved and its path is included in the returned error so the user can +// inspect partial work. If the agent writes a question file before exiting, +// Run returns *BlockedError with SandboxDir populated so a resume execution +// can pick up in the same directory. +func (r *GeminiRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error { + projectDir := t.Agent.ProjectDir + + if projectDir != "" { + if _, err := os.Stat(projectDir); err != nil { + return fmt.Errorf("project_dir %q: %w", projectDir, err) + } + } + + logDir := r.ExecLogDir(e.ID) + if logDir == "" { + logDir = e.ID + } + if err := os.MkdirAll(logDir, 0700); err != nil { + return fmt.Errorf("creating log dir: %w", err) + } + + if e.StdoutPath == "" { + e.StdoutPath = filepath.Join(logDir, "stdout.log") + e.StderrPath = filepath.Join(logDir, "stderr.log") + e.ArtifactDir = logDir + } + + if e.SessionID == "" { + if e.ResumeSessionID != "" { + e.SessionID = e.ResumeSessionID + } else { + e.SessionID = e.ID + } + } + + // Sandbox setup: for new executions with a project_dir, clone into a sandbox. + // Resume executions reuse the preserved sandbox so any partial work survives. + // If the preserved sandbox is missing (e.g. /tmp was purged), clone fresh. + var sandboxDir string + var startHEAD string + effectiveWorkingDir := projectDir + if e.ResumeSessionID != "" { + if e.SandboxDir != "" { + if _, statErr := os.Stat(e.SandboxDir); statErr == nil { + effectiveWorkingDir = e.SandboxDir + } else { + r.Logger.Warn("preserved sandbox missing, cloning fresh", "sandbox", e.SandboxDir, "project_dir", projectDir) + e.SandboxDir = "" + if projectDir != "" { + var err error + sandboxDir, err = setupSandbox(projectDir, r.Logger) + if err != nil { + return fmt.Errorf("setting up sandbox: %w", err) + } + effectiveWorkingDir = sandboxDir + r.Logger.Info("fresh sandbox created for resume", "sandbox", sandboxDir, "project_dir", projectDir) + } + } + } + } else if projectDir != "" { + var err error + sandboxDir, err = setupSandbox(projectDir, r.Logger) + if err != nil { + return fmt.Errorf("setting up sandbox: %w", err) + } + effectiveWorkingDir = sandboxDir + r.Logger.Info("sandbox created", "sandbox", sandboxDir, "project_dir", projectDir) + } + + if effectiveWorkingDir != "" { + headOut, _ := exec.Command("git", gitSafe("-C", effectiveWorkingDir, "rev-parse", "HEAD")...).Output() + startHEAD = strings.TrimSpace(string(headOut)) + } + + questionFile := filepath.Join(logDir, "question.json") + args := r.buildArgs(t, e, questionFile) + + if err := r.execOnce(ctx, args, effectiveWorkingDir, projectDir, e); err != nil { + if sandboxDir != "" { + return fmt.Errorf("%w (sandbox preserved at %s)", err, sandboxDir) + } + return err + } + + // Check whether the agent left a question before exiting. + data, readErr := os.ReadFile(questionFile) + if readErr == nil { + os.Remove(questionFile) + questionJSON := strings.TrimSpace(string(data)) + if isCompletionReport(questionJSON) { + r.Logger.Info("treating question file as completion report", "taskID", e.TaskID) + e.Summary = extractQuestionText(questionJSON) + } else { + // Preserve sandbox on BLOCKED so a resume can pick up in the same dir. + return &BlockedError{QuestionJSON: questionJSON, SessionID: e.SessionID, SandboxDir: sandboxDir} + } + } + + // Read agent summary if written. + summaryFile := filepath.Join(logDir, "summary.txt") + if summaryData, readErr := os.ReadFile(summaryFile); readErr == nil { + os.Remove(summaryFile) + e.Summary = strings.TrimSpace(string(summaryData)) + } + + // Merge sandbox back to project_dir and clean up. + if sandboxDir != "" { + if mergeErr := teardownSandbox(projectDir, sandboxDir, startHEAD, r.Logger, e); mergeErr != nil { + return fmt.Errorf("sandbox teardown: %w (sandbox preserved at %s)", mergeErr, sandboxDir) + } + } + return nil +} + +func (r *GeminiRunner) execOnce(ctx context.Context, args []string, workingDir, projectDir string, e *storage.Execution) error { + cmd := exec.CommandContext(ctx, r.binaryPath(), args...) + cmd.Env = append(os.Environ(), + "CLAUDOMATOR_API_URL="+r.APIURL, + "CLAUDOMATOR_TASK_ID="+e.TaskID, + "CLAUDOMATOR_PROJECT_DIR="+projectDir, + "CLAUDOMATOR_QUESTION_FILE="+filepath.Join(e.ArtifactDir, "question.json"), + "CLAUDOMATOR_SUMMARY_FILE="+filepath.Join(e.ArtifactDir, "summary.txt"), + ) + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + if workingDir != "" { + cmd.Dir = workingDir + } + + stdoutFile, err := os.Create(e.StdoutPath) + if err != nil { + return fmt.Errorf("creating stdout log: %w", err) + } + defer stdoutFile.Close() + + stderrFile, err := os.Create(e.StderrPath) + if err != nil { + return fmt.Errorf("creating stderr log: %w", err) + } + defer stderrFile.Close() + + stdoutR, stdoutW, err := os.Pipe() + if err != nil { + return fmt.Errorf("creating stdout pipe: %w", err) + } + cmd.Stdout = stdoutW + cmd.Stderr = stderrFile + + if err := cmd.Start(); err != nil { + stdoutW.Close() + stdoutR.Close() + return fmt.Errorf("starting gemini: %w", err) + } + stdoutW.Close() + + killDone := make(chan struct{}) + go func() { + select { + case <-ctx.Done(): + syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) + case <-killDone: + } + }() + + var streamCost float64 + var streamErr error + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + streamCost, streamErr = parseGeminiStream(stdoutR, stdoutFile, r.Logger) + stdoutR.Close() + }() + + waitErr := cmd.Wait() + close(killDone) + wg.Wait() + + if streamCost > 0 { + e.CostUSD = streamCost + } + + if waitErr != nil { + if exitErr, ok := waitErr.(*exec.ExitError); ok { + e.ExitCode = exitErr.ExitCode() + } + if streamErr != nil { + return streamErr + } + if tail := tailFile(e.StderrPath, 20); tail != "" { + return fmt.Errorf("gemini exited with error: %w\nstderr:\n%s", waitErr, tail) + } + return fmt.Errorf("gemini exited with error: %w", waitErr) + } + + if streamErr != nil { + return streamErr + } + return nil +} + +// parseGeminiStream reads streaming JSON from the gemini CLI, strips markdown +// code fences if the output is wrapped in them, writes the inner stream-json +// to w, and returns (costUSD, error). If a `result` event has `is_error: true`, +// an error wrapping the result message is returned. +func parseGeminiStream(r io.Reader, w io.Writer, logger *slog.Logger) (float64, error) { + fullOutput, err := io.ReadAll(r) + if err != nil { + return 0, fmt.Errorf("reading full gemini output: %w", err) + } + logger.Debug("parseGeminiStream: raw output received", "output", string(fullOutput)) + + inner := stripGeminiFences(string(fullOutput), logger) + if _, writeErr := w.Write([]byte(inner)); writeErr != nil { + return 0, fmt.Errorf("writing gemini output: %w", writeErr) + } + + // Walk lines looking for a result event so we can surface errors and cost. + var ( + cost float64 + errMsg string + isError bool + ) + for _, raw := range strings.Split(inner, "\n") { + line := strings.TrimSpace(raw) + if line == "" { + continue + } + var evt struct { + Type string `json:"type"` + IsError bool `json:"is_error"` + Result string `json:"result"` + Cost float64 `json:"total_cost_usd"` + } + if err := json.Unmarshal([]byte(line), &evt); err != nil { + continue + } + if evt.Type == "result" { + if evt.Cost > 0 { + cost = evt.Cost + } + if evt.IsError { + isError = true + errMsg = evt.Result + } + } + } + if isError { + return cost, fmt.Errorf("gemini reported error: %s", errMsg) + } + return cost, nil +} + +// stripGeminiFences removes a surrounding ```json ... ``` markdown block if +// present, returning the trimmed inner content. If no markdown fence is +// found, the input is returned verbatim (no whitespace trimming) so callers +// that expect byte-exact pass-through behavior get it. +func stripGeminiFences(raw string, logger *slog.Logger) string { + trimmed := strings.TrimSpace(raw) + if start := strings.Index(trimmed, "```json"); start != -1 { + if end := strings.LastIndex(trimmed, "```"); end > start { + return strings.TrimSpace(trimmed[start+len("```json") : end]) + } + logger.Warn("malformed gemini markdown block (missing closing fence); using raw output", "len", len(trimmed)) + return trimmed + } + return raw +} + +func (r *GeminiRunner) buildArgs(t *task.Task, e *storage.Execution, questionFile string) []string { + // Gemini CLI uses a different command structure: gemini "instructions" [flags] + + instructions := t.Agent.Instructions + if !t.Agent.SkipPlanning { + instructions = withPlanningPreamble(instructions) + } + + args := []string{ + "-p", instructions, + "--output-format", "stream-json", + "--yolo", // auto-approve all tools (equivalent to Claude's bypassPermissions) + } + + // Note: Gemini CLI flags might differ from Claude CLI. + // Assuming common flags for now, but these may need adjustment. + if t.Agent.Model != "" { + args = append(args, "--model", t.Agent.Model) + } + + // Gemini CLI doesn't use --session-id for the first run in the same way, + // or it might use it differently. For now we assume compatibility. + if e.SessionID != "" { + // If it's a resume, it might use different flags. + if e.ResumeSessionID != "" { + // This is a placeholder for Gemini's resume logic + } + } + + return args +} diff --git a/internal/executor/gemini_test.go b/internal/executor/gemini_test.go new file mode 100644 index 0000000..cd11ebc --- /dev/null +++ b/internal/executor/gemini_test.go @@ -0,0 +1,447 @@ +package executor + +import ( + "bytes" + "context" + "errors" + "io" + "log/slog" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/thepeterstone/claudomator/internal/storage" + "github.com/thepeterstone/claudomator/internal/task" +) + +func TestGeminiRunner_BuildArgs_BasicTask(t *testing.T) { + r := &GeminiRunner{} + tk := &task.Task{ + Agent: task.AgentConfig{ + Type: "gemini", + Instructions: "fix the bug", + Model: "gemini-2.5-flash-lite", + SkipPlanning: true, + }, + } + + args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") + + // Gemini CLI: instructions passed via -p for non-interactive mode + if len(args) < 2 || args[0] != "-p" || args[1] != "fix the bug" { + t.Errorf("expected -p <instructions> as first args, got: %v", args) + } + + argMap := make(map[string]bool) + for _, a := range args { + argMap[a] = true + } + for _, want := range []string{"--output-format", "stream-json", "--model", "gemini-2.5-flash-lite"} { + if !argMap[want] { + t.Errorf("missing arg %q in %v", want, args) + } + } +} + +func TestGeminiRunner_BuildArgs_PreamblePrepended(t *testing.T) { + r := &GeminiRunner{} + tk := &task.Task{ + Agent: task.AgentConfig{ + Type: "gemini", + Instructions: "fix the bug", + SkipPlanning: false, + }, + } + + args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") + + if len(args) < 2 || args[0] != "-p" { + t.Fatalf("expected -p <instructions> as first args, got: %v", args) + } + if !strings.HasPrefix(args[1], planningPreamble) { + t.Errorf("instructions should start with planning preamble") + } + if !strings.HasSuffix(args[1], "fix the bug") { + t.Errorf("instructions should end with original instructions") + } +} + +func TestGeminiRunner_BuildArgs_IncludesYolo(t *testing.T) { + r := &GeminiRunner{} + tk := &task.Task{ + Agent: task.AgentConfig{ + Type: "gemini", + Instructions: "write a doc", + SkipPlanning: true, + }, + } + args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") + argMap := make(map[string]bool) + for _, a := range args { + argMap[a] = true + } + if !argMap["--yolo"] { + t.Errorf("expected --yolo in gemini args (enables all tools); got: %v", args) + } +} + +func TestGeminiRunner_BuildArgs_IncludesPromptFlag(t *testing.T) { + r := &GeminiRunner{} + tk := &task.Task{ + Agent: task.AgentConfig{ + Type: "gemini", + Instructions: "do the thing", + SkipPlanning: true, + }, + } + args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") + // Instructions must be passed via -p/--prompt for non-interactive headless mode, + // not as a bare positional (which starts interactive mode). + found := false + for i, a := range args { + if (a == "-p" || a == "--prompt") && i+1 < len(args) && args[i+1] == "do the thing" { + found = true + break + } + } + if !found { + t.Errorf("expected instructions passed via -p/--prompt flag; got: %v", args) + } +} + +func TestGeminiRunner_Run_InaccessibleProjectDir_ReturnsError(t *testing.T) { + r := &GeminiRunner{ + BinaryPath: "true", // would succeed if it ran + Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + LogDir: t.TempDir(), + } + tk := &task.Task{ + Agent: task.AgentConfig{ + Type: "gemini", + ProjectDir: "/nonexistent/path/does/not/exist", + SkipPlanning: true, + }, + } + exec := &storage.Execution{ID: "test-exec"} + + err := r.Run(context.Background(), tk, exec) + + if err == nil { + t.Fatal("expected error for inaccessible project_dir, got nil") + } + if !strings.Contains(err.Error(), "project_dir") { + t.Errorf("expected 'project_dir' in error, got: %v", err) + } +} + +func TestGeminiRunner_BinaryPath_Default(t *testing.T) { + r := &GeminiRunner{} + if r.binaryPath() != "gemini" { + t.Errorf("want 'gemini', got %q", r.binaryPath()) + } +} + +func TestGeminiRunner_BinaryPath_Custom(t *testing.T) { + r := &GeminiRunner{BinaryPath: "/usr/local/bin/gemini"} + if r.binaryPath() != "/usr/local/bin/gemini" { + t.Errorf("want custom path, got %q", r.binaryPath()) + } +} + + +func TestParseGeminiStream_ParsesStructuredOutput(t *testing.T) { + // Simulate a stream-json input with various message types, including a result with error and cost. + input := streamLine(`{"type":"content_block_start","content_block":{"text":"Hello,"}}`) + + streamLine(`{"type":"content_block_delta","content_block":{"text":" World!"}}`) + + streamLine(`{"type":"content_block_end"}`) + + streamLine(`{"type":"result","subtype":"error_during_execution","is_error":true,"result":"something went wrong","total_cost_usd":0.123}`) + + reader := strings.NewReader(input) + var writer bytes.Buffer // To capture what's written to the output log + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + + cost, err := parseGeminiStream(reader, &writer, logger) + + if err == nil { + t.Errorf("expected an error, got nil") + } + if !strings.Contains(err.Error(), "something went wrong") { + t.Errorf("expected error message to contain 'something went wrong', got: %v", err) + } + + if cost != 0.123 { + t.Errorf("expected cost 0.123, got %f", cost) + } + + // Verify that the writer received the content (even if parseGeminiStream isn't fully parsing it yet) + expectedWriterContent := input + if writer.String() != expectedWriterContent { + t.Errorf("writer content mismatch:\nwant:\n%s\ngot:\n%s", expectedWriterContent, writer.String()) + } +} + +// TestGeminiRunner_Run_ProjectDir_RunsInSandbox verifies that when project_dir +// is set, the gemini subprocess runs inside a sandbox clone — not in +// project_dir itself. +func TestGeminiRunner_Run_ProjectDir_RunsInSandbox(t *testing.T) { + projectDir := t.TempDir() + initGitRepo(t, projectDir) + + logDir := t.TempDir() + cwdFile := filepath.Join(logDir, "gemini-cwd.txt") + + // Fake gemini binary that records its $PWD then exits 0. + scriptPath := filepath.Join(t.TempDir(), "fake-gemini.sh") + script := "#!/bin/sh\nprintf '%s' \"$PWD\" > " + cwdFile + "\n" + if err := os.WriteFile(scriptPath, []byte(script), 0755); err != nil { + t.Fatalf("write script: %v", err) + } + + r := &GeminiRunner{ + BinaryPath: scriptPath, + Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + LogDir: logDir, + } + tk := &task.Task{ + Agent: task.AgentConfig{ + Type: "gemini", + Instructions: "do work", + ProjectDir: projectDir, + SkipPlanning: true, + }, + } + e := &storage.Execution{ID: "sandbox-exec", TaskID: "task-1"} + + if err := r.Run(context.Background(), tk, e); err != nil { + t.Fatalf("Run: %v", err) + } + + got, err := os.ReadFile(cwdFile) + if err != nil { + t.Fatalf("cwd file not written: %v", err) + } + cwd := string(got) + if cwd == projectDir { + t.Errorf("ran directly in project_dir; expected sandbox clone (cwd=%q)", cwd) + } + // Sandbox should be removed after successful teardown (no edits → nothing to push). + // We can't assert the exact dir, but it should not be projectDir. +} + +// TestGeminiRunner_Run_BlockedError_IncludesSandboxDir verifies that when the +// agent writes a question file before exiting, the BlockedError carries the +// sandbox path so resume runs in the same dir. +func TestGeminiRunner_Run_BlockedError_IncludesSandboxDir(t *testing.T) { + src := t.TempDir() + initGitRepo(t, src) + logDir := t.TempDir() + + scriptPath := filepath.Join(t.TempDir(), "fake-gemini.sh") + if err := os.WriteFile(scriptPath, []byte(`#!/bin/sh +if [ -n "$CLAUDOMATOR_QUESTION_FILE" ]; then + printf '{"text":"Should I continue?"}' > "$CLAUDOMATOR_QUESTION_FILE" +fi +`), 0755); err != nil { + t.Fatalf("write script: %v", err) + } + + r := &GeminiRunner{ + BinaryPath: scriptPath, + Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + LogDir: logDir, + } + tk := &task.Task{ + Agent: task.AgentConfig{ + Type: "gemini", + Instructions: "do something", + ProjectDir: src, + SkipPlanning: true, + }, + } + e := &storage.Execution{ID: "blocked-gemini-exec", TaskID: "task-1"} + + err := r.Run(context.Background(), tk, e) + + var blocked *BlockedError + if !errors.As(err, &blocked) { + t.Fatalf("expected BlockedError, got: %v", err) + } + if blocked.SandboxDir == "" { + t.Error("BlockedError.SandboxDir should be set when gemini task runs in a sandbox") + } + if _, statErr := os.Stat(blocked.SandboxDir); os.IsNotExist(statErr) { + t.Error("sandbox directory should be preserved when blocked") + } else { + os.RemoveAll(blocked.SandboxDir) + } +} + +// TestGeminiRunner_Run_ExecError_PreservesSandbox verifies that when gemini +// exits non-zero, the sandbox path is included in the wrapped error so the +// user can inspect partial work. +func TestGeminiRunner_Run_ExecError_PreservesSandbox(t *testing.T) { + src := t.TempDir() + initGitRepo(t, src) + logDir := t.TempDir() + + // "false" exits 1, no output. + r := &GeminiRunner{ + BinaryPath: "false", + Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + LogDir: logDir, + } + tk := &task.Task{ + Agent: task.AgentConfig{ + Type: "gemini", + Instructions: "do something", + ProjectDir: src, + SkipPlanning: true, + }, + } + e := &storage.Execution{ID: "err-gemini-exec", TaskID: "task-1"} + + err := r.Run(context.Background(), tk, e) + if err == nil { + t.Fatal("expected error from failing gemini exit") + } + if !strings.Contains(err.Error(), "sandbox preserved at ") { + t.Errorf("expected error to include sandbox path; got: %v", err) + } + // Extract path and verify it exists. + idx := strings.Index(err.Error(), "sandbox preserved at ") + rest := err.Error()[idx+len("sandbox preserved at "):] + rest = strings.TrimSuffix(rest, ")") + rest = strings.TrimSpace(rest) + if _, statErr := os.Stat(rest); os.IsNotExist(statErr) { + t.Errorf("sandbox path from error should exist on disk: %q", rest) + } else { + os.RemoveAll(rest) + } +} + +// TestGeminiRunner_Run_ResumeUsesStoredSandboxDir verifies that a resume +// execution runs in the preserved SandboxDir rather than cloning fresh. +func TestGeminiRunner_Run_ResumeUsesStoredSandboxDir(t *testing.T) { + logDir := t.TempDir() + sandboxDir := t.TempDir() + initGitRepo(t, sandboxDir) + cwdFile := filepath.Join(logDir, "cwd.txt") + + scriptPath := filepath.Join(t.TempDir(), "fake-gemini.sh") + script := "#!/bin/sh\nprintf '%s' \"$PWD\" > " + cwdFile + "\n" + if err := os.WriteFile(scriptPath, []byte(script), 0755); err != nil { + t.Fatalf("write script: %v", err) + } + + r := &GeminiRunner{ + BinaryPath: scriptPath, + Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + LogDir: logDir, + } + tk := &task.Task{ + Agent: task.AgentConfig{ + Type: "gemini", + SkipPlanning: true, + }, + } + e := &storage.Execution{ + ID: "resume-gemini-1", + TaskID: "task-resume", + ResumeSessionID: "session-abc", + SandboxDir: sandboxDir, + } + + if err := r.Run(context.Background(), tk, e); err != nil { + t.Fatalf("Run with preserved sandbox: %v", err) + } + + got, err := os.ReadFile(cwdFile) + if err != nil { + t.Fatalf("cwd file not written: %v", err) + } + if string(got) != sandboxDir { + t.Errorf("resume should run in preserved sandbox; got cwd=%q want %q", got, sandboxDir) + } +} + +// TestGeminiRunner_Run_StaleSandboxDir_ClonesAfresh verifies that a resume +// pointing at a missing sandbox falls back to cloning a fresh sandbox from +// project_dir rather than failing outright. +func TestGeminiRunner_Run_StaleSandboxDir_ClonesAfresh(t *testing.T) { + logDir := t.TempDir() + projectDir := t.TempDir() + initGitRepo(t, projectDir) + + cwdFile := filepath.Join(logDir, "cwd.txt") + scriptPath := filepath.Join(t.TempDir(), "fake-gemini.sh") + script := "#!/bin/sh\nprintf '%s' \"$PWD\" > " + cwdFile + "\n" + if err := os.WriteFile(scriptPath, []byte(script), 0755); err != nil { + t.Fatalf("write script: %v", err) + } + + r := &GeminiRunner{ + BinaryPath: scriptPath, + Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + LogDir: logDir, + } + tk := &task.Task{ + Agent: task.AgentConfig{ + Type: "gemini", + ProjectDir: projectDir, + SkipPlanning: true, + }, + } + staleSandbox := filepath.Join(t.TempDir(), "gone") + e := &storage.Execution{ + ID: "resume-gemini-2", + TaskID: "task-stale", + ResumeSessionID: "session-xyz", + SandboxDir: staleSandbox, + } + + if err := r.Run(context.Background(), tk, e); err != nil { + t.Fatalf("Run with stale sandbox: %v", err) + } + + got, err := os.ReadFile(cwdFile) + if err != nil { + t.Fatalf("cwd file not written: %v", err) + } + cwd := string(got) + if cwd == staleSandbox { + t.Error("ran in stale (nonexistent) sandbox dir") + } + if cwd == projectDir { + t.Error("ran directly in project_dir; expected a fresh sandbox clone") + } +} + +// TestGeminiRunner_Run_NoProjectDir_SkipsSandbox verifies that a task with no +// project_dir doesn't trigger sandbox setup (matches LocalRunner/non-coding +// task semantics). +func TestGeminiRunner_Run_NoProjectDir_SkipsSandbox(t *testing.T) { + logDir := t.TempDir() + + r := &GeminiRunner{ + BinaryPath: "true", // exits 0, no output + Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + LogDir: logDir, + } + tk := &task.Task{ + Agent: task.AgentConfig{ + Type: "gemini", + Instructions: "summarize: 2+2", + SkipPlanning: true, + // No ProjectDir + }, + } + e := &storage.Execution{ID: "no-pd-gemini", TaskID: "task-nopd"} + + if err := r.Run(context.Background(), tk, e); err != nil { + t.Fatalf("Run without project_dir: %v", err) + } + if e.SandboxDir != "" { + t.Errorf("SandboxDir should be empty for tasks without project_dir, got %q", e.SandboxDir) + } +} diff --git a/internal/executor/helpers.go b/internal/executor/helpers.go index bd5d9d5..76bf8b1 100644 --- a/internal/executor/helpers.go +++ b/internal/executor/helpers.go @@ -171,7 +171,11 @@ func readFileTail(path string, maxBytes int64) string { } func gitSafe(args ...string) []string { - return append([]string{"-c", "safe.directory=*"}, args...) + return append([]string{ + "-c", "safe.directory=*", + "-c", "commit.gpgsign=false", + "-c", "tag.gpgsign=false", + }, args...) } // isCompletionReport returns true when a question-file JSON looks like a diff --git a/internal/executor/local.go b/internal/executor/local.go new file mode 100644 index 0000000..5d874c6 --- /dev/null +++ b/internal/executor/local.go @@ -0,0 +1,171 @@ +package executor + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "os" + "path/filepath" + "strings" + "time" + + "github.com/thepeterstone/claudomator/internal/llm" + "github.com/thepeterstone/claudomator/internal/storage" + "github.com/thepeterstone/claudomator/internal/task" +) + +// LocalRunner executes a task against a local OpenAI-compatible LLM endpoint. +// Unlike ClaudeRunner/GeminiRunner it does not spawn a subprocess, does not +// create a git sandbox, and does not edit files in project_dir — it produces +// text completions that are streamed to stdout.log in the same stream-json +// envelope Claude uses, so existing parsers (extractSummary, ParseChangestat) +// keep working unchanged. +type LocalRunner struct { + Client *llm.Client + Logger *slog.Logger + LogDir string + DefaultTemperature float64 +} + +// ExecLogDir implements LogPather so the pool can persist log paths before +// execution starts. +func (r *LocalRunner) ExecLogDir(execID string) string { + if r.LogDir == "" { + return "" + } + return filepath.Join(r.LogDir, execID) +} + +// Run streams a chat completion to stdout.log. The response is wrapped in +// stream-json envelopes line-by-line so downstream parsers (summary, +// changestats) read it the same way they read Claude output. +func (r *LocalRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error { + if r.Client == nil { + return fmt.Errorf("local runner: no LLM client configured") + } + if t.Agent.Instructions == "" { + return fmt.Errorf("local runner: empty instructions") + } + + logDir := r.ExecLogDir(e.ID) + if logDir == "" { + return fmt.Errorf("local runner: LogDir not set") + } + if err := os.MkdirAll(logDir, 0o700); err != nil { + return fmt.Errorf("local runner: mkdir log: %w", err) + } + stdoutPath := filepath.Join(logDir, "stdout.log") + stderrPath := filepath.Join(logDir, "stderr.log") + e.StdoutPath = stdoutPath + e.StderrPath = stderrPath + + stdout, err := os.Create(stdoutPath) + if err != nil { + return fmt.Errorf("local runner: create stdout: %w", err) + } + defer stdout.Close() + + messages := []llm.Message{} + if sys := strings.TrimSpace(t.Agent.SystemPromptAppend); sys != "" { + messages = append(messages, llm.Message{Role: "system", Content: sys}) + } + messages = append(messages, llm.Message{Role: "user", Content: t.Agent.Instructions}) + + temperature := t.Agent.Temperature + if temperature == nil && r.DefaultTemperature > 0 { + v := r.DefaultTemperature + temperature = &v + } + + req := llm.ChatRequest{ + Model: t.Agent.Model, + Messages: messages, + Temperature: temperature, + MaxTokens: t.Agent.MaxTokens, + } + + start := time.Now() + resp, err := r.Client.ChatStream(ctx, req, func(delta string) { + if delta == "" { + return + } + writeAssistantTextLine(stdout, delta) + }) + if err != nil { + writeResultLine(stdout, "error", err.Error(), 0, 0) + return fmt.Errorf("local runner: chat: %w", err) + } + elapsed := time.Since(start) + + // Write one consolidated assistant envelope containing the full response. + // extractSummary and ParseChangestatFromOutput operate per-line, so a + // single envelope with the full text is what they expect to find. + if resp.Content != "" { + writeAssistantTextLine(stdout, resp.Content) + } + writeResultLine(stdout, "success", "", resp.PromptTokens, resp.OutputTokens) + + e.CostUSD = 0 + e.TokensIn = int64(resp.PromptTokens) + e.TokensOut = int64(resp.OutputTokens) + + if r.Logger != nil { + r.Logger.Info("local runner completed", + "taskID", t.ID, + "model", resp.Model, + "tokens_in", resp.PromptTokens, + "tokens_out", resp.OutputTokens, + "finish_reason", resp.FinishReason, + "elapsed_ms", elapsed.Milliseconds(), + ) + } + return nil +} + +// writeAssistantTextLine writes a single stream-json line wrapping `text` as +// an assistant text block. Format matches what ClaudeRunner emits, so +// extractSummary and ParseChangestatFromFile read it transparently. +func writeAssistantTextLine(w *os.File, text string) { + line := struct { + Type string `json:"type"` + Message struct { + Content []struct { + Type string `json:"type"` + Text string `json:"text"` + } `json:"content"` + } `json:"message"` + }{Type: "assistant"} + line.Message.Content = []struct { + Type string `json:"type"` + Text string `json:"text"` + }{{Type: "text", Text: text}} + b, err := json.Marshal(line) + if err != nil { + return + } + w.Write(b) + w.Write([]byte("\n")) +} + +// writeResultLine writes a final stream-json terminator line that downstream +// parsers can recognise. Mirrors the shape of the result line ClaudeRunner emits. +func writeResultLine(w *os.File, subtype, errMsg string, promptTokens, outputTokens int) { + line := map[string]any{ + "type": "result", + "subtype": subtype, + "is_error": errMsg != "", + "prompt_tokens": promptTokens, + "output_tokens": outputTokens, + "total_cost_usd": 0.0, + } + if errMsg != "" { + line["result"] = errMsg + } + b, err := json.Marshal(line) + if err != nil { + return + } + w.Write(b) + w.Write([]byte("\n")) +} diff --git a/internal/executor/local_test.go b/internal/executor/local_test.go new file mode 100644 index 0000000..d8ab678 --- /dev/null +++ b/internal/executor/local_test.go @@ -0,0 +1,152 @@ +package executor + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log/slog" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/google/uuid" + "github.com/thepeterstone/claudomator/internal/llm" + "github.com/thepeterstone/claudomator/internal/storage" + "github.com/thepeterstone/claudomator/internal/task" +) + +// fakeOpenAIServer returns an httptest.Server that replies with a streaming +// chat completion containing the supplied content (split into chunks) plus a +// usage record. +func fakeOpenAIServer(t *testing.T, chunks []string, promptTok, outTok int) *httptest.Server { + t.Helper() + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + flusher, _ := w.(http.Flusher) + for _, c := range chunks { + payload := map[string]any{ + "model": "fake", + "choices": []map[string]any{{"delta": map[string]string{"content": c}}}, + } + b, _ := json.Marshal(payload) + fmt.Fprintf(w, "data: %s\n\n", b) + if flusher != nil { + flusher.Flush() + } + } + final := map[string]any{ + "model": "fake", + "choices": []map[string]any{{"delta": map[string]string{}, "finish_reason": "stop"}}, + "usage": map[string]int{"prompt_tokens": promptTok, "completion_tokens": outTok}, + } + fb, _ := json.Marshal(final) + fmt.Fprintf(w, "data: %s\n\ndata: [DONE]\n\n", fb) + })) +} + +func TestLocalRunner_Run_WritesStreamJSON(t *testing.T) { + srv := fakeOpenAIServer(t, + []string{"## Summary\n", "All ", "good."}, + 11, 22, + ) + defer srv.Close() + + logRoot := t.TempDir() + r := &LocalRunner{ + Client: &llm.Client{Endpoint: srv.URL + "/v1", Model: "fake"}, + Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + LogDir: logRoot, + } + tt := &task.Task{ + ID: "task-1", + Name: "test", + Agent: task.AgentConfig{ + Type: "local", + Model: "fake", + Instructions: "Do a thing.", + }, + } + exec := &storage.Execution{ID: uuid.New().String(), TaskID: tt.ID} + + if err := r.Run(context.Background(), tt, exec); err != nil { + t.Fatalf("Run: %v", err) + } + + if exec.CostUSD != 0 { + t.Errorf("CostUSD should be 0 for local runner, got %v", exec.CostUSD) + } + if exec.TokensIn != 11 || exec.TokensOut != 22 { + t.Errorf("tokens: want 11/22 got %d/%d", exec.TokensIn, exec.TokensOut) + } + + // Verify stdout.log contains stream-json envelopes that extractSummary can parse. + stdoutPath := filepath.Join(r.ExecLogDir(exec.ID), "stdout.log") + data, err := os.ReadFile(stdoutPath) + if err != nil { + t.Fatalf("read stdout: %v", err) + } + lines := strings.Split(strings.TrimSpace(string(data)), "\n") + if len(lines) < 4 { + t.Fatalf("expected at least 4 lines (3 deltas + 1 result), got %d:\n%s", len(lines), data) + } + for i, line := range lines[:3] { + var env struct { + Type string `json:"type"` + Message struct { + Content []struct { + Type string `json:"type"` + Text string `json:"text"` + } + } + } + if err := json.Unmarshal([]byte(line), &env); err != nil { + t.Fatalf("line %d not JSON: %v: %s", i, err, line) + } + if env.Type != "assistant" { + t.Errorf("line %d: want type=assistant, got %q", i, env.Type) + } + } + + summary := extractSummary(stdoutPath) + if !strings.Contains(summary, "All good.") { + t.Errorf("extractSummary should find 'All good.', got %q", summary) + } +} + +func TestLocalRunner_Run_NoClient_Errors(t *testing.T) { + r := &LocalRunner{LogDir: t.TempDir()} + tt := &task.Task{ID: "x", Agent: task.AgentConfig{Instructions: "hi"}} + exec := &storage.Execution{ID: "exec-x"} + err := r.Run(context.Background(), tt, exec) + if err == nil || !strings.Contains(err.Error(), "no LLM client") { + t.Errorf("expected 'no LLM client' error, got %v", err) + } +} + +func TestLocalRunner_Run_EmptyInstructions_Errors(t *testing.T) { + r := &LocalRunner{ + Client: &llm.Client{Endpoint: "http://unused", Model: "x"}, + LogDir: t.TempDir(), + } + tt := &task.Task{ID: "x", Agent: task.AgentConfig{}} + exec := &storage.Execution{ID: "exec-x"} + err := r.Run(context.Background(), tt, exec) + if err == nil || !strings.Contains(err.Error(), "empty instructions") { + t.Errorf("expected empty-instructions error, got %v", err) + } +} + +func TestLocalRunner_ExecLogDir(t *testing.T) { + r := &LocalRunner{LogDir: "/tmp/logs"} + if got := r.ExecLogDir("abc"); got != "/tmp/logs/abc" { + t.Errorf("ExecLogDir: got %q", got) + } + r.LogDir = "" + if got := r.ExecLogDir("abc"); got != "" { + t.Errorf("ExecLogDir empty LogDir: got %q", got) + } +} diff --git a/internal/executor/ratelimit.go b/internal/executor/ratelimit.go index c916291..ee9a336 100644 --- a/internal/executor/ratelimit.go +++ b/internal/executor/ratelimit.go @@ -1,33 +1,9 @@ package executor -import ( - "context" - "fmt" - "regexp" - "strconv" - "strings" - "time" -) +import "strings" -var retryAfterRe = regexp.MustCompile(`(?i)retry[-_ ]after[:\s]+(\d+)`) - -const maxBackoffDelay = 5 * time.Minute - -// isRateLimitError returns true if err looks like a transient Claude API -// rate-limit that is worth retrying (e.g. per-minute/per-request throttle). -func isRateLimitError(err error) bool { - if err == nil { - return false - } - msg := strings.ToLower(err.Error()) - return strings.Contains(msg, "rate limit") || - strings.Contains(msg, "too many requests") || - strings.Contains(msg, "429") || - strings.Contains(msg, "overloaded") -} - -// isQuotaExhausted returns true if err indicates the 5-hour usage quota is -// fully exhausted. Unlike transient rate limits, these should not be retried. +// isQuotaExhausted returns true if err indicates the 5-hour Claude usage quota +// is fully exhausted. Unlike transient rate limits, these should not be retried. func isQuotaExhausted(err error) bool { if err == nil { return false @@ -43,53 +19,3 @@ func isQuotaExhausted(err error) bool { strings.Contains(msg, "exhausted your daily quota") || strings.Contains(msg, "generate_content_free_tier_requests") } - -// parseRetryAfter extracts a Retry-After duration from an error message. -// Returns 0 if no retry-after value is found. -func parseRetryAfter(msg string) time.Duration { - m := retryAfterRe.FindStringSubmatch(msg) - if m == nil { - return 0 - } - secs, err := strconv.Atoi(m[1]) - if err != nil || secs <= 0 { - return 0 - } - return time.Duration(secs) * time.Second -} - -// runWithBackoff calls fn repeatedly on rate-limit errors, using exponential backoff. -// maxRetries is the max number of retry attempts (not counting the initial call). -// baseDelay is the initial backoff duration (doubled each retry). -func runWithBackoff(ctx context.Context, maxRetries int, baseDelay time.Duration, fn func() error) error { - var lastErr error - for attempt := 0; attempt <= maxRetries; attempt++ { - lastErr = fn() - if lastErr == nil { - return nil - } - if !isRateLimitError(lastErr) { - return lastErr - } - if attempt == maxRetries { - break - } - - // Compute exponential backoff delay. - delay := baseDelay * (1 << attempt) - if delay > maxBackoffDelay { - delay = maxBackoffDelay - } - // Use Retry-After header value if present. - if ra := parseRetryAfter(lastErr.Error()); ra > 0 { - delay = ra - } - - select { - case <-ctx.Done(): - return fmt.Errorf("context cancelled during rate-limit backoff: %w", ctx.Err()) - case <-time.After(delay): - } - } - return lastErr -} diff --git a/internal/executor/ratelimit_test.go b/internal/executor/ratelimit_test.go deleted file mode 100644 index 1434810..0000000 --- a/internal/executor/ratelimit_test.go +++ /dev/null @@ -1,200 +0,0 @@ -package executor - -import ( - "context" - "errors" - "fmt" - "testing" - "time" -) - -// --- isRateLimitError tests --- - -func TestIsRateLimitError_RateLimitMessage(t *testing.T) { - err := errors.New("claude exited with error: rate limit exceeded") - if !isRateLimitError(err) { - t.Error("want true for 'rate limit exceeded', got false") - } -} - -func TestIsRateLimitError_TooManyRequests(t *testing.T) { - err := errors.New("too many requests to the API") - if !isRateLimitError(err) { - t.Error("want true for 'too many requests', got false") - } -} - -func TestIsRateLimitError_HTTP429(t *testing.T) { - err := errors.New("API returned status 429") - if !isRateLimitError(err) { - t.Error("want true for '429', got false") - } -} - -func TestIsRateLimitError_Overloaded(t *testing.T) { - err := errors.New("API overloaded, please retry later") - if !isRateLimitError(err) { - t.Error("want true for 'overloaded', got false") - } -} - -func TestIsRateLimitError_NonRateLimitError(t *testing.T) { - err := errors.New("claude exited with error: exit status 1") - if isRateLimitError(err) { - t.Error("want false for non-rate-limit error, got true") - } -} - -func TestIsRateLimitError_NilError(t *testing.T) { - if isRateLimitError(nil) { - t.Error("want false for nil error, got true") - } -} - -// --- parseRetryAfter tests --- - -func TestParseRetryAfter_RetryAfterSeconds(t *testing.T) { - msg := "rate limit exceeded, retry after 30 seconds" - d := parseRetryAfter(msg) - if d != 30*time.Second { - t.Errorf("want 30s, got %v", d) - } -} - -func TestParseRetryAfter_RetryAfterHeader(t *testing.T) { - msg := "rate_limit_error: retry-after: 60" - d := parseRetryAfter(msg) - if d != 60*time.Second { - t.Errorf("want 60s, got %v", d) - } -} - -func TestParseRetryAfter_NoRetryInfo(t *testing.T) { - msg := "rate limit exceeded" - d := parseRetryAfter(msg) - if d != 0 { - t.Errorf("want 0, got %v", d) - } -} - -// --- isQuotaExhausted tests --- - -func TestIsQuotaExhausted_GeminiDailyQuota(t *testing.T) { - err := errors.New("container execution failed: exit status 1\nTerminalQuotaError: You have exhausted your daily quota on this model.") - if !isQuotaExhausted(err) { - t.Error("want true for Gemini TerminalQuotaError, got false") - } -} - -func TestIsQuotaExhausted_GeminiExhaustedMessage(t *testing.T) { - err := errors.New("container execution failed: exit status 1\nyou have exhausted your daily quota") - if !isQuotaExhausted(err) { - t.Error("want true for 'exhausted your daily quota', got false") - } -} - -func TestIsQuotaExhausted_GeminiQuotaExceeded(t *testing.T) { - err := errors.New("container execution failed: exit status 1\nQuota exceeded for metric: generativelanguage.googleapis.com/generate_content_free_tier_requests") - if !isQuotaExhausted(err) { - t.Error("want true for Gemini free tier quota exceeded, got false") - } -} - -func TestIsQuotaExhausted_NotQuota(t *testing.T) { - err := errors.New("container execution failed: exit status 1") - if isQuotaExhausted(err) { - t.Error("want false for generic exit status 1, got true") - } -} - -// --- runWithBackoff tests --- - -func TestRunWithBackoff_SuccessOnFirstTry(t *testing.T) { - calls := 0 - fn := func() error { - calls++ - return nil - } - err := runWithBackoff(context.Background(), 3, time.Millisecond, fn) - if err != nil { - t.Errorf("want nil error, got %v", err) - } - if calls != 1 { - t.Errorf("want 1 call, got %d", calls) - } -} - -func TestRunWithBackoff_RetriesOnRateLimit(t *testing.T) { - calls := 0 - fn := func() error { - calls++ - if calls < 3 { - return fmt.Errorf("rate limit exceeded") - } - return nil - } - err := runWithBackoff(context.Background(), 3, time.Millisecond, fn) - if err != nil { - t.Errorf("want nil error, got %v", err) - } - if calls != 3 { - t.Errorf("want 3 calls, got %d", calls) - } -} - -func TestRunWithBackoff_GivesUpAfterMaxRetries(t *testing.T) { - calls := 0 - rateLimitErr := fmt.Errorf("rate limit exceeded") - fn := func() error { - calls++ - return rateLimitErr - } - err := runWithBackoff(context.Background(), 3, time.Millisecond, fn) - if err == nil { - t.Fatal("want error after max retries, got nil") - } - // maxRetries=3: 1 initial call + 3 retries = 4 total calls - if calls != 4 { - t.Errorf("want 4 calls (1 initial + 3 retries), got %d", calls) - } -} - -func TestRunWithBackoff_DoesNotRetryNonRateLimitError(t *testing.T) { - calls := 0 - fn := func() error { - calls++ - return fmt.Errorf("permission denied") - } - err := runWithBackoff(context.Background(), 3, time.Millisecond, fn) - if err == nil { - t.Fatal("want error, got nil") - } - if calls != 1 { - t.Errorf("want 1 call (no retry for non-rate-limit), got %d", calls) - } -} - -func TestRunWithBackoff_ContextCancellation(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - calls := 0 - - fn := func() error { - calls++ - cancel() // cancel immediately after first call - return fmt.Errorf("rate limit exceeded") - } - - start := time.Now() - err := runWithBackoff(ctx, 3, time.Second, fn) // large delay confirms ctx preempts wait - elapsed := time.Since(start) - - if err == nil { - t.Fatal("want error on context cancellation, got nil") - } - if elapsed > 500*time.Millisecond { - t.Errorf("context cancellation too slow: %v (want < 500ms)", elapsed) - } - if calls != 1 { - t.Errorf("want 1 call before cancellation, got %d", calls) - } -} diff --git a/internal/executor/summary.go b/internal/executor/summary.go index a942de0..bcf5cfd 100644 --- a/internal/executor/summary.go +++ b/internal/executor/summary.go @@ -2,11 +2,26 @@ package executor import ( "bufio" + "context" "encoding/json" + "io" "os" "strings" + "time" + + "github.com/thepeterstone/claudomator/internal/llm" ) +// synthesizeSummaryMaxBytes caps how much of the stdout log we send to the +// LLM. Larger values cost more tokens with diminishing returns for a 2-4 +// sentence summary. +const synthesizeSummaryMaxBytes = 16 * 1024 + +// synthesizeSummaryTimeout caps the LLM call so a slow local model can't +// stall executor finalization. On timeout, we return "" (the existing +// no-summary path takes over). +const synthesizeSummaryTimeout = 6 * time.Second + // extractSummary reads a stream-json stdout log and returns the text following // the last "## Summary" heading found in any assistant text block. // Returns empty string if the file cannot be read or no summary is found. @@ -28,6 +43,86 @@ func extractSummary(stdoutPath string) string { return last } +// synthesizeSummary asks the LLM to summarize the assistant text content in +// stdoutPath when no "## Summary" heading was present. Returns "" on any +// error, an empty file, or an empty model response — preserving the +// existing "no summary" behavior so the new path is purely additive. +func synthesizeSummary(parent context.Context, c *llm.Client, stdoutPath string) string { + if c == nil || stdoutPath == "" { + return "" + } + text := readAssistantTextTail(stdoutPath, synthesizeSummaryMaxBytes) + if strings.TrimSpace(text) == "" { + return "" + } + + cctx, cancel := context.WithTimeout(parent, synthesizeSummaryTimeout) + defer cancel() + resp, err := c.Chat(cctx, llm.ChatRequest{ + Messages: []llm.Message{ + {Role: "system", Content: "You summarize what an automated coding agent did. Reply with 2-4 sentences of plain prose. No bullets, no headings, no preamble."}, + {Role: "user", Content: "Here is the agent's output. Summarize what it accomplished:\n\n" + text}, + }, + }) + if err != nil { + return "" + } + return strings.TrimSpace(resp.Content) +} + +// readAssistantTextTail returns the concatenated `text` blocks from assistant +// stream-json events in the last maxBytes of the file. Non-assistant events +// (system, result, tool_use, etc.) are skipped so the LLM sees just what the +// agent said. Returns "" on any error. +func readAssistantTextTail(stdoutPath string, maxBytes int64) string { + f, err := os.Open(stdoutPath) + if err != nil { + return "" + } + defer f.Close() + + stat, err := f.Stat() + if err != nil { + return "" + } + size := stat.Size() + if size > maxBytes { + if _, err := f.Seek(size-maxBytes, io.SeekStart); err != nil { + return "" + } + } + + var sb strings.Builder + scanner := bufio.NewScanner(f) + scanner.Buffer(make([]byte, 1024*1024), 1024*1024) + first := size > maxBytes // if we seeked, drop the first (likely partial) line + for scanner.Scan() { + if first { + first = false + continue + } + var event struct { + Type string `json:"type"` + Message struct { + Content []struct { + Type string `json:"type"` + Text string `json:"text"` + } `json:"content"` + } `json:"message"` + } + if err := json.Unmarshal(scanner.Bytes(), &event); err != nil || event.Type != "assistant" { + continue + } + for _, block := range event.Message.Content { + if block.Type == "text" && block.Text != "" { + sb.WriteString(block.Text) + sb.WriteString("\n") + } + } + } + return sb.String() +} + // summaryFromLine parses a single stream-json line and returns the text after // "## Summary" if the line is an assistant text block containing that heading. func summaryFromLine(line []byte) string { diff --git a/internal/executor/summary_synth_test.go b/internal/executor/summary_synth_test.go new file mode 100644 index 0000000..7ad396d --- /dev/null +++ b/internal/executor/summary_synth_test.go @@ -0,0 +1,241 @@ +package executor + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strings" + "sync/atomic" + "testing" + + "github.com/thepeterstone/claudomator/internal/llm" + "github.com/thepeterstone/claudomator/internal/storage" +) + +func writeStreamLog(t *testing.T, lines []string) string { + t.Helper() + dir := t.TempDir() + path := filepath.Join(dir, "stdout.log") + var sb strings.Builder + for _, l := range lines { + sb.WriteString(l) + sb.WriteString("\n") + } + if err := os.WriteFile(path, []byte(sb.String()), 0600); err != nil { + t.Fatal(err) + } + return path +} + +func TestSynthesizeSummary_NilClient(t *testing.T) { + got := synthesizeSummary(context.Background(), nil, "/some/path") + if got != "" { + t.Errorf("nil client: want empty, got %q", got) + } +} + +func TestSynthesizeSummary_EmptyPath(t *testing.T) { + c := &llm.Client{Endpoint: "http://unused", Model: "x"} + got := synthesizeSummary(context.Background(), c, "") + if got != "" { + t.Errorf("empty path: want empty, got %q", got) + } +} + +func TestSynthesizeSummary_MissingFile(t *testing.T) { + c := &llm.Client{Endpoint: "http://unused", Model: "x"} + got := synthesizeSummary(context.Background(), c, "/nonexistent/file.log") + if got != "" { + t.Errorf("missing file: want empty, got %q", got) + } +} + +func TestSynthesizeSummary_EmptyAssistantContent(t *testing.T) { + // Log contains only system/result events — no assistant text. The function + // should short-circuit without calling the LLM. + path := writeStreamLog(t, []string{ + `{"type":"system","subtype":"init"}`, + `{"type":"result","subtype":"success","total_cost_usd":0}`, + }) + + var calls int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&calls, 1) + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, `{"choices":[{"message":{"content":"should not be returned"},"finish_reason":"stop"}],"usage":{}}`) + })) + defer srv.Close() + c := &llm.Client{Endpoint: srv.URL + "/v1", Model: "x"} + + got := synthesizeSummary(context.Background(), c, path) + if got != "" { + t.Errorf("empty content: want empty, got %q", got) + } + if atomic.LoadInt32(&calls) != 0 { + t.Errorf("LLM should not be called for empty assistant content") + } +} + +func TestSynthesizeSummary_LLMSuccess(t *testing.T) { + path := writeStreamLog(t, []string{ + `{"type":"assistant","message":{"content":[{"type":"text","text":"Ran the tests."}]}}`, + `{"type":"assistant","message":{"content":[{"type":"text","text":"Fixed the import."}]}}`, + `{"type":"result","subtype":"success"}`, + }) + + var capturedUser string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var body struct { + Messages []struct { + Role, Content string + } `json:"messages"` + } + json.NewDecoder(r.Body).Decode(&body) + for _, m := range body.Messages { + if m.Role == "user" { + capturedUser = m.Content + } + } + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, `{"choices":[{"message":{"content":" Agent ran tests and fixed an import. "},"finish_reason":"stop"}],"usage":{}}`) + })) + defer srv.Close() + c := &llm.Client{Endpoint: srv.URL + "/v1", Model: "x"} + + got := synthesizeSummary(context.Background(), c, path) + if got != "Agent ran tests and fixed an import." { + t.Errorf("summary: got %q", got) + } + if !strings.Contains(capturedUser, "Ran the tests.") { + t.Errorf("user prompt missing first assistant text; got: %s", capturedUser) + } + if !strings.Contains(capturedUser, "Fixed the import.") { + t.Errorf("user prompt missing second assistant text; got: %s", capturedUser) + } +} + +// TestPool_HandleRunResult_LLMSummaryFallback verifies the Pool falls back to +// LLM-synthesized summary when extractSummary returns empty. +func TestPool_HandleRunResult_LLMSummaryFallback(t *testing.T) { + // stdout has assistant text but no "## Summary" heading. + stdoutPath := writeStreamLog(t, []string{ + `{"type":"assistant","message":{"content":[{"type":"text","text":"Did the work without writing a summary section."}]}}`, + }) + + llmSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, `{"choices":[{"message":{"content":"Synthesized summary."},"finish_reason":"stop"}],"usage":{}}`) + })) + defer llmSrv.Close() + + store := newMinimalMockStore() + pool := newPoolWithMockStore(store) + pool.LLM = &llm.Client{Endpoint: llmSrv.URL + "/v1", Model: "x"} + + tk := makeTask("synth-summary") + store.tasks[tk.ID] = tk + exec := &storage.Execution{ID: "e-synth", TaskID: tk.ID, Status: "RUNNING", StdoutPath: stdoutPath} + + pool.handleRunResult(context.Background(), tk, exec, nil, "claude") + + id, summary, ok := store.lastSummaryUpdate() + if !ok { + t.Fatalf("expected UpdateTaskSummary to be called") + } + if id != tk.ID { + t.Errorf("summary recorded for wrong task: %q", id) + } + if summary != "Synthesized summary." { + t.Errorf("summary: got %q", summary) + } + + // Drain the result channel so the test exits cleanly. + <-pool.resultCh +} + +// TestPool_HandleRunResult_ExtractSummaryWins verifies the LLM is NOT called +// when the agent already wrote a "## Summary" section. +func TestPool_HandleRunResult_ExtractSummaryWins(t *testing.T) { + stdoutPath := writeStreamLog(t, []string{ + `{"type":"assistant","message":{"content":[{"type":"text","text":"## Summary\nAgent wrote its own summary."}]}}`, + }) + + var llmCalls int32 + llmSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&llmCalls, 1) + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, `{"choices":[{"message":{"content":"should not be used"},"finish_reason":"stop"}],"usage":{}}`) + })) + defer llmSrv.Close() + + store := newMinimalMockStore() + pool := newPoolWithMockStore(store) + pool.LLM = &llm.Client{Endpoint: llmSrv.URL + "/v1", Model: "x"} + + tk := makeTask("agent-summary") + store.tasks[tk.ID] = tk + exec := &storage.Execution{ID: "e-agent", TaskID: tk.ID, Status: "RUNNING", StdoutPath: stdoutPath} + + pool.handleRunResult(context.Background(), tk, exec, nil, "claude") + + if got := atomic.LoadInt32(&llmCalls); got != 0 { + t.Errorf("LLM should not be called when ## Summary is present; got %d calls", got) + } + _, summary, ok := store.lastSummaryUpdate() + if !ok { + t.Fatalf("expected UpdateTaskSummary") + } + if summary != "Agent wrote its own summary." { + t.Errorf("summary: got %q (want extractSummary output)", summary) + } + <-pool.resultCh +} + +func TestSynthesizeSummary_LLMFailure_ReturnsEmpty(t *testing.T) { + path := writeStreamLog(t, []string{ + `{"type":"assistant","message":{"content":[{"type":"text","text":"Did something."}]}}`, + }) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "boom", http.StatusInternalServerError) + })) + defer srv.Close() + c := &llm.Client{Endpoint: srv.URL + "/v1", Model: "x"} + + got := synthesizeSummary(context.Background(), c, path) + if got != "" { + t.Errorf("LLM failure: want empty, got %q", got) + } +} + +// TestReadAssistantTextTail_TailingLargeFile verifies the seek-to-tail +// behavior drops early content but keeps later assistant text. +func TestReadAssistantTextTail_TailingLargeFile(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "stdout.log") + f, err := os.Create(path) + if err != nil { + t.Fatal(err) + } + // Write a ton of garbage assistant lines, then a final marker. + for i := 0; i < 500; i++ { + fmt.Fprintf(f, `{"type":"assistant","message":{"content":[{"type":"text","text":"filler line that should be in the early part of a large file %04d"}]}}`+"\n", i) + } + fmt.Fprintln(f, `{"type":"assistant","message":{"content":[{"type":"text","text":"FINAL_MARKER_LINE"}]}}`) + f.Close() + + got := readAssistantTextTail(path, 4*1024) // 4 KB cap + if !strings.Contains(got, "FINAL_MARKER_LINE") { + t.Errorf("tail should contain final line; got: %s", got) + } + if strings.Contains(got, "filler line that should be in the early part of a large file 0000") { + end := 200 + if len(got) < end { + end = len(got) + } + t.Errorf("tail should NOT contain very-early line; got first 200 chars: %s", got[:end]) + } +} |
