summaryrefslogtreecommitdiff
path: root/internal/executor
diff options
context:
space:
mode:
Diffstat (limited to 'internal/executor')
-rw-r--r--internal/executor/claude.go714
-rw-r--r--internal/executor/claude_test.go882
-rw-r--r--internal/executor/container.go380
-rw-r--r--internal/executor/container_test.go244
-rw-r--r--internal/executor/executor_test.go11
-rw-r--r--internal/executor/gemini.go228
-rw-r--r--internal/executor/gemini_test.go179
-rw-r--r--internal/executor/helpers.go174
-rw-r--r--internal/executor/stream_test.go25
9 files changed, 826 insertions, 2011 deletions
diff --git a/internal/executor/claude.go b/internal/executor/claude.go
deleted file mode 100644
index 6346aa8..0000000
--- a/internal/executor/claude.go
+++ /dev/null
@@ -1,714 +0,0 @@
-package executor
-
-import (
- "bufio"
- "context"
- "encoding/json"
- "fmt"
- "io"
- "log/slog"
- "os"
- "os/exec"
- "path/filepath"
- "strings"
- "sync"
- "syscall"
- "time"
-
- "github.com/thepeterstone/claudomator/internal/storage"
- "github.com/thepeterstone/claudomator/internal/task"
-)
-
-// ClaudeRunner spawns the `claude` CLI in non-interactive mode.
-type ClaudeRunner struct {
- BinaryPath string // defaults to "claude"
- Logger *slog.Logger
- LogDir string // base directory for execution logs
- APIURL string // base URL of the Claudomator API, passed to subprocesses
- DropsDir string // path to the drops directory, passed to subprocesses
-}
-
-// BlockedError is returned by Run when the agent wrote a question file and exited.
-// The pool transitions the task to BLOCKED and stores the question for the user.
-type BlockedError struct {
- QuestionJSON string // raw JSON from the question file
- SessionID string // claude session to resume once the user answers
- SandboxDir string // preserved sandbox path; resume must run here so Claude finds its session files
-}
-
-func (e *BlockedError) Error() string { return fmt.Sprintf("task blocked: %s", e.QuestionJSON) }
-
-// ExecLogDir returns the log directory for the given execution ID.
-// Implements LogPather so the pool can persist paths before execution starts.
-func (r *ClaudeRunner) ExecLogDir(execID string) string {
- if r.LogDir == "" {
- return ""
- }
- return filepath.Join(r.LogDir, execID)
-}
-
-func (r *ClaudeRunner) binaryPath() string {
- if r.BinaryPath != "" {
- return r.BinaryPath
- }
- return "claude"
-}
-
-// Run executes a claude -p invocation, streaming output to log files.
-// It retries up to 3 times on rate-limit errors using exponential backoff.
-// If the agent writes a question file and exits, Run returns *BlockedError.
-//
-// When project_dir is set and this is not a resume execution, Run clones the
-// project into a temp sandbox, runs the agent there, then merges committed
-// changes back to project_dir. On failure the sandbox is preserved and its
-// path is included in the error.
-func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error {
- projectDir := t.Agent.ProjectDir
-
- // Validate project_dir exists when set.
- if projectDir != "" {
- if _, err := os.Stat(projectDir); err != nil {
- return fmt.Errorf("project_dir %q: %w", projectDir, err)
- }
- }
-
- // Setup log directory once; retries overwrite the log files.
- logDir := r.ExecLogDir(e.ID)
- if logDir == "" {
- logDir = e.ID // fallback for tests without LogDir set
- }
- if err := os.MkdirAll(logDir, 0700); err != nil {
- return fmt.Errorf("creating log dir: %w", err)
- }
- if e.StdoutPath == "" {
- e.StdoutPath = filepath.Join(logDir, "stdout.log")
- e.StderrPath = filepath.Join(logDir, "stderr.log")
- e.ArtifactDir = logDir
- }
-
- // Pre-assign session ID so we can resume after a BLOCKED state.
- // For resume executions, the claude session continues under the original
- // session ID (the one passed to --resume). Using the new exec's own UUID
- // would cause a second block-and-resume cycle to pass the wrong --resume
- // argument.
- if e.SessionID == "" {
- if e.ResumeSessionID != "" {
- e.SessionID = e.ResumeSessionID
- } else {
- e.SessionID = e.ID // reuse execution UUID as session UUID (both are UUIDs)
- }
- }
-
- // For new (non-resume) executions with a project_dir, clone into a sandbox.
- // Resume executions run in the preserved sandbox (e.SandboxDir) so Claude
- // finds its session files under the same project slug. If no sandbox was
- // preserved (e.g. task had no project_dir), fall back to project_dir.
- var sandboxDir string
- var startHEAD string
- effectiveWorkingDir := projectDir
- if e.ResumeSessionID != "" {
- if e.SandboxDir != "" {
- if _, statErr := os.Stat(e.SandboxDir); statErr == nil {
- effectiveWorkingDir = e.SandboxDir
- } else {
- // Preserved sandbox was cleaned up (e.g. /tmp purge after reboot).
- // Clone a fresh sandbox so the task can run rather than fail immediately.
- r.Logger.Warn("preserved sandbox missing, cloning fresh", "sandbox", e.SandboxDir, "project_dir", projectDir)
- e.SandboxDir = ""
- if projectDir != "" {
- var err error
- sandboxDir, err = setupSandbox(t.Agent.ProjectDir, r.Logger)
- if err != nil {
- return fmt.Errorf("setting up sandbox: %w", err)
- }
-
- effectiveWorkingDir = sandboxDir
- r.Logger.Info("fresh sandbox created for resume", "sandbox", sandboxDir, "project_dir", projectDir)
- }
- }
- }
- } else if projectDir != "" {
- var err error
- sandboxDir, err = setupSandbox(t.Agent.ProjectDir, r.Logger)
- if err != nil {
- return fmt.Errorf("setting up sandbox: %w", err)
- }
-
- effectiveWorkingDir = sandboxDir
- r.Logger.Info("sandbox created", "sandbox", sandboxDir, "project_dir", projectDir)
- }
-
- if effectiveWorkingDir != "" {
- // Capture the initial HEAD so we can identify new commits later.
- headOut, _ := exec.Command("git", gitSafe("-C", effectiveWorkingDir, "rev-parse", "HEAD")...).Output()
- startHEAD = strings.TrimSpace(string(headOut))
- }
-
- questionFile := filepath.Join(logDir, "question.json")
- args := r.buildArgs(t, e, questionFile)
-
- attempt := 0
- err := runWithBackoff(ctx, 3, 5*time.Second, func() error {
- if attempt > 0 {
- delay := 5 * time.Second * (1 << (attempt - 1))
- r.Logger.Warn("rate-limited by Claude API, retrying",
- "attempt", attempt,
- "delay", delay,
- )
- }
- attempt++
- return r.execOnce(ctx, args, effectiveWorkingDir, projectDir, e)
- })
- if err != nil {
- if sandboxDir != "" {
- return fmt.Errorf("%w (sandbox preserved at %s)", err, sandboxDir)
- }
- return err
- }
-
- // Check whether the agent left a question before exiting.
- data, readErr := os.ReadFile(questionFile)
- if readErr == nil {
- os.Remove(questionFile) // consumed
- questionJSON := strings.TrimSpace(string(data))
- // If the agent wrote a completion report instead of a real question,
- // extract the text as the summary and fall through to normal completion.
- if isCompletionReport(questionJSON) {
- r.Logger.Info("treating question file as completion report", "taskID", e.TaskID)
- e.Summary = extractQuestionText(questionJSON)
- } else {
- // Preserve sandbox on BLOCKED — agent may have partial work and its
- // Claude session files are stored under the sandbox's project slug.
- // The resume execution must run in the same directory.
- return &BlockedError{QuestionJSON: questionJSON, SessionID: e.SessionID, SandboxDir: sandboxDir}
- }
- }
-
- // Read agent summary if written.
- summaryFile := filepath.Join(logDir, "summary.txt")
- if summaryData, readErr := os.ReadFile(summaryFile); readErr == nil {
- os.Remove(summaryFile) // consumed
- e.Summary = strings.TrimSpace(string(summaryData))
- }
-
- // Merge sandbox back to project_dir and clean up.
- if sandboxDir != "" {
- if mergeErr := teardownSandbox(projectDir, sandboxDir, startHEAD, r.Logger, e); mergeErr != nil {
- return fmt.Errorf("sandbox teardown: %w (sandbox preserved at %s)", mergeErr, sandboxDir)
- }
- }
- return nil
-}
-
-// isCompletionReport returns true when a question-file JSON looks like a
-// completion report rather than a real user question. Heuristic: no options
-// (or empty options) and no "?" anywhere in the text.
-func isCompletionReport(questionJSON string) bool {
- var q struct {
- Text string `json:"text"`
- Options []string `json:"options"`
- }
- if err := json.Unmarshal([]byte(questionJSON), &q); err != nil {
- return false
- }
- return len(q.Options) == 0 && !strings.Contains(q.Text, "?")
-}
-
-// extractQuestionText returns the "text" field from a question-file JSON, or
-// the raw string if parsing fails.
-func extractQuestionText(questionJSON string) string {
- var q struct {
- Text string `json:"text"`
- }
- if err := json.Unmarshal([]byte(questionJSON), &q); err != nil {
- return questionJSON
- }
- return strings.TrimSpace(q.Text)
-}
-
-// gitSafe returns git arguments that prepend "-c safe.directory=*" so that
-// commands succeed regardless of the repository owner. This is needed when
-// claudomator operates on project directories owned by a different OS user.
-func gitSafe(args ...string) []string {
- return append([]string{"-c", "safe.directory=*"}, args...)
-}
-
-// sandboxCloneSource returns the URL to clone the sandbox from. It prefers a
-// remote named "local" (a local bare repo that accepts pushes cleanly), then
-// falls back to "origin", then to the working copy path itself.
-func sandboxCloneSource(projectDir string) string {
- // Prefer "local" remote, but only if it points to a local path (accepts pushes).
- if out, err := exec.Command("git", gitSafe("-C", projectDir, "remote", "get-url", "local")...).Output(); err == nil {
- u := strings.TrimSpace(string(out))
- if u != "" && (strings.HasPrefix(u, "/") || strings.HasPrefix(u, "file://")) {
- return u
- }
- }
- // Fall back to "origin" — any URL scheme is acceptable for cloning.
- if out, err := exec.Command("git", gitSafe("-C", projectDir, "remote", "get-url", "origin")...).Output(); err == nil {
- if u := strings.TrimSpace(string(out)); u != "" {
- return u
- }
- }
- return projectDir
-}
-
-// setupSandbox prepares a temporary git clone of projectDir.
-// If projectDir is not a git repo it is initialised with an initial commit first.
-func setupSandbox(projectDir string, logger *slog.Logger) (string, error) {
- // Ensure projectDir is a git repo; initialise if not.
- if err := exec.Command("git", gitSafe("-C", projectDir, "rev-parse", "--git-dir")...).Run(); err != nil {
- cmds := [][]string{
- gitSafe("-C", projectDir, "init"),
- gitSafe("-C", projectDir, "add", "-A"),
- gitSafe("-C", projectDir, "commit", "--allow-empty", "-m", "chore: initial commit"),
- }
- for _, args := range cmds {
- if out, err := exec.Command("git", args...).CombinedOutput(); err != nil { //nolint:gosec
- return "", fmt.Errorf("git init %s: %w\n%s", projectDir, err, out)
- }
- }
- }
-
- src := sandboxCloneSource(projectDir)
-
- tempDir, err := os.MkdirTemp("", "claudomator-sandbox-*")
- if err != nil {
- return "", fmt.Errorf("creating sandbox dir: %w", err)
- }
- // git clone requires the target to not exist; remove the placeholder first.
- if err := os.Remove(tempDir); err != nil {
- return "", fmt.Errorf("removing temp dir placeholder: %w", err)
- }
- out, err := exec.Command("git", gitSafe("clone", "--no-hardlinks", src, tempDir)...).CombinedOutput()
- if err != nil {
- return "", fmt.Errorf("git clone: %w\n%s", err, out)
- }
- return tempDir, nil
-}
-
-// teardownSandbox verifies the sandbox is clean and pushes new commits to the
-// canonical bare repo. If the push is rejected because another task pushed
-// concurrently, it fetches and rebases then retries once.
-//
-// The working copy (projectDir) is NOT updated automatically — it is the
-// developer's workspace and is pulled manually. This avoids permission errors
-// from mixed-owner .git/objects directories.
-func teardownSandbox(projectDir, sandboxDir, startHEAD string, logger *slog.Logger, execRecord *storage.Execution) error {
- // Automatically commit uncommitted changes.
- out, err := exec.Command("git", "-C", sandboxDir, "status", "--porcelain").Output()
- if err != nil {
- return fmt.Errorf("git status: %w", err)
- }
- if len(strings.TrimSpace(string(out))) > 0 {
- logger.Info("autocommitting uncommitted changes", "sandbox", sandboxDir)
-
- // Run build before autocommitting.
- if _, err := os.Stat(filepath.Join(sandboxDir, "Makefile")); err == nil {
- logger.Info("running 'make build' before autocommit", "sandbox", sandboxDir)
- if buildOut, buildErr := exec.Command("make", "-C", sandboxDir, "build").CombinedOutput(); buildErr != nil {
- return fmt.Errorf("build failed before autocommit: %w\n%s", buildErr, buildOut)
- }
- } else if _, err := os.Stat(filepath.Join(sandboxDir, "gradlew")); err == nil {
- logger.Info("running './gradlew build' before autocommit", "sandbox", sandboxDir)
- cmd := exec.Command("./gradlew", "build")
- cmd.Dir = sandboxDir
- if buildOut, buildErr := cmd.CombinedOutput(); buildErr != nil {
- return fmt.Errorf("build failed before autocommit: %w\n%s", buildErr, buildOut)
- }
- } else if _, err := os.Stat(filepath.Join(sandboxDir, "go.mod")); err == nil {
- logger.Info("running 'go build ./...' before autocommit", "sandbox", sandboxDir)
- cmd := exec.Command("go", "build", "./...")
- cmd.Dir = sandboxDir
- if buildOut, buildErr := cmd.CombinedOutput(); buildErr != nil {
- return fmt.Errorf("build failed before autocommit: %w\n%s", buildErr, buildOut)
- }
- }
-
- cmds := [][]string{
- gitSafe("-C", sandboxDir, "add", "-A"),
- gitSafe("-C", sandboxDir, "commit", "-m", "chore: autocommit uncommitted changes"),
- }
- for _, args := range cmds {
- if out, err := exec.Command("git", args...).CombinedOutput(); err != nil {
- return fmt.Errorf("autocommit failed (%v): %w\n%s", args, err, out)
- }
- }
- }
-
- // Capture commits before pushing/deleting.
- // Use startHEAD..HEAD to find all commits made during this execution.
- logRange := "origin/HEAD..HEAD"
- if startHEAD != "" && startHEAD != "HEAD" {
- logRange = startHEAD + "..HEAD"
- }
-
- logCmd := exec.Command("git", gitSafe("-C", sandboxDir, "log", logRange, "--pretty=format:%H|%s")...)
- logOut, logErr := logCmd.CombinedOutput()
- if logErr == nil {
- lines := strings.Split(strings.TrimSpace(string(logOut)), "\n")
- logger.Debug("captured commits", "count", len(lines), "range", logRange)
- for _, line := range lines {
- if line == "" {
- continue
- }
- parts := strings.SplitN(line, "|", 2)
- if len(parts) == 2 {
- execRecord.Commits = append(execRecord.Commits, task.GitCommit{
- Hash: parts[0],
- Message: parts[1],
- })
- }
- }
- } else {
- logger.Warn("failed to capture commits", "err", logErr, "range", logRange, "output", string(logOut))
- }
-
- // Check whether there are any new commits to push.
- ahead, err := exec.Command("git", gitSafe("-C", sandboxDir, "rev-list", "--count", logRange)...).Output()
- if err != nil {
- logger.Warn("could not determine commits ahead of origin; proceeding", "err", err, "range", logRange)
- }
- if strings.TrimSpace(string(ahead)) == "0" {
- os.RemoveAll(sandboxDir)
- return nil
- }
-
- // Push from sandbox → bare repo (sandbox's origin is the bare repo).
- if out, err := exec.Command("git", "-C", sandboxDir, "push", "origin", "HEAD").CombinedOutput(); err != nil {
- // If rejected due to concurrent push, fetch+rebase and retry once.
- if strings.Contains(string(out), "fetch first") || strings.Contains(string(out), "non-fast-forward") {
- logger.Info("push rejected (concurrent task); rebasing and retrying", "sandbox", sandboxDir)
- if out2, err2 := exec.Command("git", "-C", sandboxDir, "pull", "--rebase", "origin", "master").CombinedOutput(); err2 != nil {
- return fmt.Errorf("git rebase before retry push: %w\n%s", err2, out2)
- }
- // Re-capture commits after rebase (hashes might have changed)
- execRecord.Commits = nil
- logOut, logErr = exec.Command("git", "-C", sandboxDir, "log", logRange, "--pretty=format:%H|%s").Output()
- if logErr == nil {
- lines := strings.Split(strings.TrimSpace(string(logOut)), "\n")
- for _, line := range lines {
- parts := strings.SplitN(line, "|", 2)
- if len(parts) == 2 {
- execRecord.Commits = append(execRecord.Commits, task.GitCommit{
- Hash: parts[0],
- Message: parts[1],
- })
- }
- }
- }
-
- if out3, err3 := exec.Command("git", "-C", sandboxDir, "push", "origin", "HEAD").CombinedOutput(); err3 != nil {
- return fmt.Errorf("git push to origin (after rebase): %w\n%s", err3, out3)
- }
- } else {
- return fmt.Errorf("git push to origin: %w\n%s", err, out)
- }
- }
-
- logger.Info("sandbox pushed to bare repo", "sandbox", sandboxDir)
- os.RemoveAll(sandboxDir)
- return nil
-}
-
-// execOnce runs the claude subprocess once, streaming output to e's log paths.
-func (r *ClaudeRunner) execOnce(ctx context.Context, args []string, workingDir, projectDir string, e *storage.Execution) error {
- cmd := exec.CommandContext(ctx, r.binaryPath(), args...)
- cmd.Env = append(os.Environ(),
- "CLAUDOMATOR_API_URL="+r.APIURL,
- "CLAUDOMATOR_TASK_ID="+e.TaskID,
- "CLAUDOMATOR_PROJECT_DIR="+projectDir,
- "CLAUDOMATOR_QUESTION_FILE="+filepath.Join(e.ArtifactDir, "question.json"),
- "CLAUDOMATOR_SUMMARY_FILE="+filepath.Join(e.ArtifactDir, "summary.txt"),
- "CLAUDOMATOR_DROP_DIR="+r.DropsDir,
- )
- // Put the subprocess in its own process group so we can SIGKILL the entire
- // group (MCP servers, bash children, etc.) on cancellation.
- cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
- if workingDir != "" {
- cmd.Dir = workingDir
- }
-
- stdoutFile, err := os.Create(e.StdoutPath)
- if err != nil {
- return fmt.Errorf("creating stdout log: %w", err)
- }
- defer stdoutFile.Close()
-
- stderrFile, err := os.Create(e.StderrPath)
- if err != nil {
- return fmt.Errorf("creating stderr log: %w", err)
- }
- defer stderrFile.Close()
-
- // Use os.Pipe for stdout so we own the read-end lifetime.
- // cmd.StdoutPipe() would add the read-end to closeAfterWait, causing
- // cmd.Wait() to close it before our goroutine finishes reading.
- stdoutR, stdoutW, err := os.Pipe()
- if err != nil {
- return fmt.Errorf("creating stdout pipe: %w", err)
- }
- cmd.Stdout = stdoutW // *os.File — not added to closeAfterStart/Wait
- cmd.Stderr = stderrFile
-
- if err := cmd.Start(); err != nil {
- stdoutW.Close()
- stdoutR.Close()
- return fmt.Errorf("starting claude: %w", err)
- }
- // Close our write-end immediately; the subprocess holds its own copy.
- // The goroutine below gets EOF when the subprocess exits.
- stdoutW.Close()
-
- // killDone is closed when cmd.Wait() returns, stopping the pgid-kill goroutine.
- //
- // Safety: this goroutine cannot block indefinitely. The select has two arms:
- // • ctx.Done() — fires if the caller cancels (e.g. timeout, user cancel).
- // The goroutine sends SIGKILL and exits immediately.
- // • killDone — closed by close(killDone) below, immediately after cmd.Wait()
- // returns. This fires when the process exits for any reason (natural exit,
- // SIGKILL from the ctx arm, or any other signal). The goroutine exits without
- // doing anything.
- //
- // Therefore: for a task that completes normally with a long-lived (non-cancelled)
- // context, the killDone arm fires and the goroutine exits. There is no path where
- // this goroutine outlives execOnce().
- killDone := make(chan struct{})
- go func() {
- select {
- case <-ctx.Done():
- // SIGKILL the entire process group to reap orphan children.
- syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)
- case <-killDone:
- }
- }()
-
- // Stream stdout to the log file and parse cost/errors.
- // wg ensures costUSD and streamErr are fully written before we read them after cmd.Wait().
- var costUSD float64
- var streamErr error
- var wg sync.WaitGroup
- wg.Add(1)
- go func() {
- defer wg.Done()
- costUSD, streamErr = parseStream(stdoutR, stdoutFile, r.Logger)
- stdoutR.Close()
- }()
-
- waitErr := cmd.Wait()
- close(killDone) // stop the pgid-kill goroutine
- wg.Wait() // drain remaining stdout before reading costUSD/streamErr
-
- e.CostUSD = costUSD
-
- if waitErr != nil {
- if exitErr, ok := waitErr.(*exec.ExitError); ok {
- e.ExitCode = exitErr.ExitCode()
- }
- // If the stream captured a rate-limit or quota message, return it
- // so callers can distinguish it from a generic exit-status failure.
- if isRateLimitError(streamErr) || isQuotaExhausted(streamErr) {
- return streamErr
- }
- if tail := tailFile(e.StderrPath, 20); tail != "" {
- return fmt.Errorf("claude exited with error: %w\nstderr:\n%s", waitErr, tail)
- }
- return fmt.Errorf("claude exited with error: %w", waitErr)
- }
-
- e.ExitCode = 0
- if streamErr != nil {
- return streamErr
- }
- return nil
-}
-
-func (r *ClaudeRunner) buildArgs(t *task.Task, e *storage.Execution, questionFile string) []string {
- // Resume execution: the agent already has context; just deliver the answer.
- if e.ResumeSessionID != "" {
- args := []string{
- "-p", e.ResumeAnswer,
- "--resume", e.ResumeSessionID,
- "--output-format", "stream-json",
- "--verbose",
- }
- permMode := t.Agent.PermissionMode
- if permMode == "" {
- permMode = "bypassPermissions"
- }
- args = append(args, "--permission-mode", permMode)
- if t.Agent.Model != "" {
- args = append(args, "--model", t.Agent.Model)
- }
- return args
- }
-
- instructions := t.Agent.Instructions
- allowedTools := t.Agent.AllowedTools
-
- if !t.Agent.SkipPlanning {
- instructions = withPlanningPreamble(instructions)
- // Ensure Bash is available so the agent can POST subtasks and ask questions.
- hasBash := false
- for _, tool := range allowedTools {
- if tool == "Bash" {
- hasBash = true
- break
- }
- }
- if !hasBash {
- allowedTools = append(allowedTools, "Bash")
- }
- }
-
- args := []string{
- "-p", instructions,
- "--session-id", e.SessionID,
- "--output-format", "stream-json",
- "--verbose",
- }
-
- if t.Agent.Model != "" {
- args = append(args, "--model", t.Agent.Model)
- }
- if t.Agent.MaxBudgetUSD > 0 {
- args = append(args, "--max-budget-usd", fmt.Sprintf("%.2f", t.Agent.MaxBudgetUSD))
- }
- // Default to bypassPermissions — claudomator runs tasks unattended, so
- // prompting for write access would always stall execution. Tasks that need
- // a more restrictive mode can set permission_mode explicitly.
- permMode := t.Agent.PermissionMode
- if permMode == "" {
- permMode = "bypassPermissions"
- }
- args = append(args, "--permission-mode", permMode)
- if t.Agent.SystemPromptAppend != "" {
- args = append(args, "--append-system-prompt", t.Agent.SystemPromptAppend)
- }
- for _, tool := range allowedTools {
- args = append(args, "--allowedTools", tool)
- }
- for _, tool := range t.Agent.DisallowedTools {
- args = append(args, "--disallowedTools", tool)
- }
- for _, f := range t.Agent.ContextFiles {
- args = append(args, "--add-dir", f)
- }
- args = append(args, t.Agent.AdditionalArgs...)
-
- return args
-}
-
-// parseStream reads streaming JSON from claude, writes to w, and returns
-// (costUSD, error). error is non-nil if the stream signals task failure:
-// - result message has is_error:true
-// - a tool_result was denied due to missing permissions
-func parseStream(r io.Reader, w io.Writer, logger *slog.Logger) (float64, error) {
- tee := io.TeeReader(r, w)
- scanner := bufio.NewScanner(tee)
- scanner.Buffer(make([]byte, 1024*1024), 1024*1024) // 1MB buffer for large lines
-
- var totalCost float64
- var streamErr error
-
- for scanner.Scan() {
- line := scanner.Bytes()
- var msg map[string]interface{}
- if err := json.Unmarshal(line, &msg); err != nil {
- continue
- }
-
- msgType, _ := msg["type"].(string)
- switch msgType {
- case "rate_limit_event":
- if info, ok := msg["rate_limit_info"].(map[string]interface{}); ok {
- status, _ := info["status"].(string)
- if status == "rejected" {
- streamErr = fmt.Errorf("claude rate limit reached (rejected): %v", msg)
- // Immediately break since we can't continue anyway
- break
- }
- }
- case "assistant":
- if errStr, ok := msg["error"].(string); ok && errStr == "rate_limit" {
- streamErr = fmt.Errorf("claude rate limit reached: %v", msg)
- }
- case "result":
- if isErr, _ := msg["is_error"].(bool); isErr {
- result, _ := msg["result"].(string)
- if result != "" {
- streamErr = fmt.Errorf("claude task failed: %s", result)
- } else {
- streamErr = fmt.Errorf("claude task failed (is_error=true in result)")
- }
- }
- // Prefer total_cost_usd from result message; fall through to legacy check below.
- if cost, ok := msg["total_cost_usd"].(float64); ok {
- totalCost = cost
- }
- case "user":
- // Detect permission-denial tool_results. These occur when permission_mode
- // is not bypassPermissions and claude exits 0 without completing its task.
- if err := permissionDenialError(msg); err != nil && streamErr == nil {
- streamErr = err
- }
- }
-
- // Legacy cost field used by older claude versions.
- if cost, ok := msg["cost_usd"].(float64); ok {
- totalCost = cost
- }
- }
-
- return totalCost, streamErr
-}
-
-// permissionDenialError inspects a "user" stream message for tool_result entries
-// that were denied due to missing permissions. Returns an error if found.
-func permissionDenialError(msg map[string]interface{}) error {
- message, ok := msg["message"].(map[string]interface{})
- if !ok {
- return nil
- }
- content, ok := message["content"].([]interface{})
- if !ok {
- return nil
- }
- for _, item := range content {
- itemMap, ok := item.(map[string]interface{})
- if !ok {
- continue
- }
- if itemMap["type"] != "tool_result" {
- continue
- }
- if isErr, _ := itemMap["is_error"].(bool); !isErr {
- continue
- }
- text, _ := itemMap["content"].(string)
- if strings.Contains(text, "requested permissions") || strings.Contains(text, "haven't granted") {
- return fmt.Errorf("permission denied by host: %s", text)
- }
- }
- return nil
-}
-
-// tailFile returns the last n lines of the file at path, or empty string if
-// the file cannot be read. Used to surface subprocess stderr on failure.
-func tailFile(path string, n int) string {
- f, err := os.Open(path)
- if err != nil {
- return ""
- }
- defer f.Close()
-
- var lines []string
- scanner := bufio.NewScanner(f)
- for scanner.Scan() {
- lines = append(lines, scanner.Text())
- if len(lines) > n {
- lines = lines[1:]
- }
- }
- return strings.Join(lines, "\n")
-}
diff --git a/internal/executor/claude_test.go b/internal/executor/claude_test.go
deleted file mode 100644
index e76fbf2..0000000
--- a/internal/executor/claude_test.go
+++ /dev/null
@@ -1,882 +0,0 @@
-package executor
-
-import (
- "context"
- "errors"
- "fmt"
- "io"
- "log/slog"
- "os"
- "os/exec"
- "path/filepath"
- "runtime"
- "strings"
- "testing"
- "time"
-
- "github.com/thepeterstone/claudomator/internal/storage"
- "github.com/thepeterstone/claudomator/internal/task"
-)
-
-func TestClaudeRunner_BuildArgs_BasicTask(t *testing.T) {
- r := &ClaudeRunner{}
- tk := &task.Task{
- Agent: task.AgentConfig{
- Type: "claude",
- Instructions: "fix the bug",
- Model: "sonnet",
- SkipPlanning: true,
- },
- }
-
- args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
-
- argMap := make(map[string]bool)
- for _, a := range args {
- argMap[a] = true
- }
- for _, want := range []string{"-p", "fix the bug", "--output-format", "stream-json", "--verbose", "--model", "sonnet"} {
- if !argMap[want] {
- t.Errorf("missing arg %q in %v", want, args)
- }
- }
-}
-
-func TestClaudeRunner_BuildArgs_FullConfig(t *testing.T) {
- r := &ClaudeRunner{}
- tk := &task.Task{
- Agent: task.AgentConfig{
- Type: "claude",
- Instructions: "implement feature",
- Model: "opus",
- MaxBudgetUSD: 5.0,
- PermissionMode: "bypassPermissions",
- SystemPromptAppend: "Follow TDD",
- AllowedTools: []string{"Bash", "Edit"},
- DisallowedTools: []string{"Write"},
- ContextFiles: []string{"/src"},
- AdditionalArgs: []string{"--verbose"},
- SkipPlanning: true,
- },
- }
-
- args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
-
- // Check key args are present.
- argMap := make(map[string]bool)
- for _, a := range args {
- argMap[a] = true
- }
-
- requiredArgs := []string{
- "-p", "implement feature", "--output-format", "stream-json",
- "--model", "opus", "--max-budget-usd", "5.00",
- "--permission-mode", "bypassPermissions",
- "--append-system-prompt", "Follow TDD",
- "--allowedTools", "Bash", "Edit",
- "--disallowedTools", "Write",
- "--add-dir", "/src",
- "--verbose",
- }
- for _, req := range requiredArgs {
- if !argMap[req] {
- t.Errorf("missing arg %q in %v", req, args)
- }
- }
-}
-
-func TestClaudeRunner_BuildArgs_DefaultsToBypassPermissions(t *testing.T) {
- r := &ClaudeRunner{}
- tk := &task.Task{
- Agent: task.AgentConfig{
- Type: "claude",
- Instructions: "do work",
- SkipPlanning: true,
- // PermissionMode intentionally not set
- },
- }
-
- args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
-
- found := false
- for i, a := range args {
- if a == "--permission-mode" && i+1 < len(args) && args[i+1] == "bypassPermissions" {
- found = true
- }
- }
- if !found {
- t.Errorf("expected --permission-mode bypassPermissions when PermissionMode is empty, args: %v", args)
- }
-}
-
-func TestClaudeRunner_BuildArgs_RespectsExplicitPermissionMode(t *testing.T) {
- r := &ClaudeRunner{}
- tk := &task.Task{
- Agent: task.AgentConfig{
- Type: "claude",
- Instructions: "do work",
- PermissionMode: "default",
- SkipPlanning: true,
- },
- }
-
- args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
-
- for i, a := range args {
- if a == "--permission-mode" && i+1 < len(args) {
- if args[i+1] != "default" {
- t.Errorf("expected --permission-mode default, got %q", args[i+1])
- }
- return
- }
- }
- t.Errorf("--permission-mode flag not found in args: %v", args)
-}
-
-func TestClaudeRunner_BuildArgs_AlwaysIncludesVerbose(t *testing.T) {
- r := &ClaudeRunner{}
- tk := &task.Task{
- Agent: task.AgentConfig{
- Type: "claude",
- Instructions: "do something",
- SkipPlanning: true,
- },
- }
-
- args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
-
- found := false
- for _, a := range args {
- if a == "--verbose" {
- found = true
- break
- }
- }
- if !found {
- t.Errorf("--verbose missing from args: %v", args)
- }
-}
-
-func TestClaudeRunner_BuildArgs_PreamblePrepended(t *testing.T) {
- r := &ClaudeRunner{}
- tk := &task.Task{
- Agent: task.AgentConfig{
- Type: "claude",
- Instructions: "fix the bug",
- SkipPlanning: false,
- },
- }
-
- args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
-
- // The -p value should start with the preamble and end with the original instructions.
- if len(args) < 2 || args[0] != "-p" {
- t.Fatalf("expected -p as first arg, got: %v", args)
- }
- if !strings.HasPrefix(args[1], "## Runtime Environment") {
- t.Errorf("instructions should start with planning preamble, got prefix: %q", args[1][:min(len(args[1]), 20)])
- }
- if !strings.Contains(args[1], "$CLAUDOMATOR_PROJECT_DIR") {
- t.Errorf("preamble should mention $CLAUDOMATOR_PROJECT_DIR")
- }
- if !strings.HasSuffix(args[1], "fix the bug") {
- t.Errorf("instructions should end with original instructions")
- }
-}
-
-func TestClaudeRunner_BuildArgs_PreambleAddsBash(t *testing.T) {
- r := &ClaudeRunner{}
- tk := &task.Task{
- Agent: task.AgentConfig{
- Type: "claude",
- Instructions: "do work",
- AllowedTools: []string{"Read"},
- SkipPlanning: false,
- },
- }
-
- args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
-
- // Bash should be appended to allowed tools.
- foundBash := false
- for i, a := range args {
- if a == "--allowedTools" && i+1 < len(args) && args[i+1] == "Bash" {
- foundBash = true
- }
- }
- if !foundBash {
- t.Errorf("Bash should be added to --allowedTools when preamble is active: %v", args)
- }
-}
-
-func TestClaudeRunner_BuildArgs_PreambleBashNotDuplicated(t *testing.T) {
- r := &ClaudeRunner{}
- tk := &task.Task{
- Agent: task.AgentConfig{
- Type: "claude",
- Instructions: "do work",
- AllowedTools: []string{"Bash", "Read"},
- SkipPlanning: false,
- },
- }
-
- args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
-
- // Count Bash occurrences in --allowedTools values.
- bashCount := 0
- for i, a := range args {
- if a == "--allowedTools" && i+1 < len(args) && args[i+1] == "Bash" {
- bashCount++
- }
- }
- if bashCount != 1 {
- t.Errorf("Bash should appear exactly once in --allowedTools, got %d: %v", bashCount, args)
- }
-}
-
-// TestClaudeRunner_Run_ResumeSetsSessionIDFromResumeSession verifies that when a
-// resume execution is itself blocked again, the stored SessionID is the original
-// resumed session, not the new execution's own UUID. Without this, a second
-// block-and-resume cycle passes the wrong --resume session ID and fails.
-func TestClaudeRunner_Run_ResumeSetsSessionIDFromResumeSession(t *testing.T) {
- logDir := t.TempDir()
- r := &ClaudeRunner{
- BinaryPath: "true", // exits 0, no output
- Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
- LogDir: logDir,
- }
- tk := &task.Task{
- Agent: task.AgentConfig{
- Type: "claude",
- Instructions: "continue",
- SkipPlanning: true,
- },
- }
- exec := &storage.Execution{
- ID: "resume-exec-uuid",
- TaskID: "task-1",
- ResumeSessionID: "original-session-uuid",
- ResumeAnswer: "yes",
- }
-
- // Run completes successfully (binary is "true").
- _ = r.Run(context.Background(), tk, exec)
-
- // SessionID must be the original session (ResumeSessionID), not the new
- // exec's own ID. If it were exec.ID, a second blocked-then-resumed cycle
- // would use the wrong --resume argument and fail.
- if exec.SessionID != "original-session-uuid" {
- t.Errorf("SessionID after resume Run: want %q, got %q", "original-session-uuid", exec.SessionID)
- }
-}
-
-func TestClaudeRunner_Run_InaccessibleWorkingDir_ReturnsError(t *testing.T) {
- r := &ClaudeRunner{
- BinaryPath: "true", // would succeed if it ran
- Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
- LogDir: t.TempDir(),
- }
- tk := &task.Task{
- Agent: task.AgentConfig{
- Type: "claude",
- ProjectDir: "/nonexistent/path/does/not/exist",
- SkipPlanning: true,
- },
- }
- exec := &storage.Execution{ID: "test-exec"}
-
- err := r.Run(context.Background(), tk, exec)
-
- if err == nil {
- t.Fatal("expected error for inaccessible working_dir, got nil")
- }
- if !strings.Contains(err.Error(), "project_dir") {
- t.Errorf("expected 'project_dir' in error, got: %v", err)
- }
-}
-
-func TestClaudeRunner_BinaryPath_Default(t *testing.T) {
- r := &ClaudeRunner{}
- if r.binaryPath() != "claude" {
- t.Errorf("want 'claude', got %q", r.binaryPath())
- }
-}
-
-func TestClaudeRunner_BinaryPath_Custom(t *testing.T) {
- r := &ClaudeRunner{BinaryPath: "/usr/local/bin/claude"}
- if r.binaryPath() != "/usr/local/bin/claude" {
- t.Errorf("want custom path, got %q", r.binaryPath())
- }
-}
-
-// TestExecOnce_NoGoroutineLeak_OnNaturalExit verifies that execOnce does not
-// leave behind any goroutines when the subprocess exits normally (no context
-// cancellation). Both the pgid-kill goroutine and the parseStream goroutine
-// must have exited before execOnce returns.
-func TestExecOnce_NoGoroutineLeak_OnNaturalExit(t *testing.T) {
- logDir := t.TempDir()
- r := &ClaudeRunner{
- BinaryPath: "true", // exits immediately with status 0, produces no output
- Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
- LogDir: logDir,
- }
- e := &storage.Execution{
- ID: "goroutine-leak-test",
- TaskID: "task-id",
- StdoutPath: filepath.Join(logDir, "stdout.log"),
- StderrPath: filepath.Join(logDir, "stderr.log"),
- ArtifactDir: logDir,
- }
-
- // Let any goroutines from test infrastructure settle before sampling.
- runtime.Gosched()
- baseline := runtime.NumGoroutine()
-
- if err := r.execOnce(context.Background(), []string{}, "", "", e); err != nil {
- t.Fatalf("execOnce failed: %v", err)
- }
-
- // Give the scheduler a moment to let any leaked goroutines actually exit.
- // In correct code the goroutines exit before execOnce returns, so this is
- // just a safety buffer for the scheduler.
- time.Sleep(10 * time.Millisecond)
- runtime.Gosched()
-
- after := runtime.NumGoroutine()
- if after > baseline {
- t.Errorf("goroutine leak: %d goroutines before execOnce, %d after (leaked %d)",
- baseline, after, after-baseline)
- }
-}
-
-// initGitRepo creates a git repo in dir with one commit so it is clonable.
-func initGitRepo(t *testing.T, dir string) {
- t.Helper()
- cmds := [][]string{
- {"git", "-c", "safe.directory=*", "-C", dir, "init", "-b", "main"},
- {"git", "-c", "safe.directory=*", "-C", dir, "config", "user.email", "test@test"},
- {"git", "-c", "safe.directory=*", "-C", dir, "config", "user.name", "test"},
- }
- for _, args := range cmds {
- if out, err := exec.Command(args[0], args[1:]...).CombinedOutput(); err != nil {
- t.Fatalf("%v: %v\n%s", args, err, out)
- }
- }
- if err := os.WriteFile(filepath.Join(dir, "init.txt"), []byte("init"), 0644); err != nil {
- t.Fatal(err)
- }
- if out, err := exec.Command("git", "-c", "safe.directory=*", "-C", dir, "add", ".").CombinedOutput(); err != nil {
- t.Fatalf("git add: %v\n%s", err, out)
- }
- if out, err := exec.Command("git", "-c", "safe.directory=*", "-C", dir, "commit", "-m", "init").CombinedOutput(); err != nil {
- t.Fatalf("git commit: %v\n%s", err, out)
- }
-}
-
-func TestSandboxCloneSource_PrefersLocalRemote(t *testing.T) {
- dir := t.TempDir()
- initGitRepo(t, dir)
- // Add a "local" remote pointing to a bare repo.
- bare := t.TempDir()
- exec.Command("git", "init", "--bare", bare).Run()
- exec.Command("git", "-C", dir, "remote", "add", "local", bare).Run()
- exec.Command("git", "-C", dir, "remote", "add", "origin", "https://example.com/repo").Run()
-
- got := sandboxCloneSource(dir)
- if got != bare {
- t.Errorf("expected bare repo path %q, got %q", bare, got)
- }
-}
-
-func TestSandboxCloneSource_FallsBackToOrigin(t *testing.T) {
- dir := t.TempDir()
- initGitRepo(t, dir)
- originURL := "https://example.com/origin-repo"
- exec.Command("git", "-C", dir, "remote", "add", "origin", originURL).Run()
-
- got := sandboxCloneSource(dir)
- if got != originURL {
- t.Errorf("expected origin URL %q, got %q", originURL, got)
- }
-}
-
-func TestSandboxCloneSource_FallsBackToProjectDir(t *testing.T) {
- dir := t.TempDir()
- initGitRepo(t, dir)
- // No remotes configured.
- got := sandboxCloneSource(dir)
- if got != dir {
- t.Errorf("expected projectDir %q (no remotes), got %q", dir, got)
- }
-}
-
-func TestSetupSandbox_ClonesGitRepo(t *testing.T) {
- src := t.TempDir()
- initGitRepo(t, src)
-
- sandbox, err := setupSandbox(src, slog.New(slog.NewTextHandler(io.Discard, nil)))
- if err != nil {
- t.Fatalf("setupSandbox: %v", err)
- }
- t.Cleanup(func() { os.RemoveAll(sandbox) })
-
- // Force sandbox to master if it cloned as main
- exec.Command("git", gitSafe("-C", sandbox, "checkout", "master")...).Run()
-
- // Debug sandbox
- logOut, _ := exec.Command("git", "-C", sandbox, "log", "-1").CombinedOutput()
- fmt.Printf("DEBUG: sandbox log: %s\n", string(logOut))
-
- // Verify sandbox is a git repo with at least one commit.
- out, err := exec.Command("git", "-C", sandbox, "log", "--oneline").Output()
- if err != nil {
- t.Fatalf("git log in sandbox: %v", err)
- }
- if len(strings.TrimSpace(string(out))) == 0 {
- t.Error("expected at least one commit in sandbox, got empty log")
- }
-}
-
-func TestSetupSandbox_InitialisesNonGitDir(t *testing.T) {
- // A plain directory (not a git repo) should be initialised then cloned.
- src := t.TempDir()
-
- sandbox, err := setupSandbox(src, slog.New(slog.NewTextHandler(io.Discard, nil)))
- if err != nil {
- t.Fatalf("setupSandbox on plain dir: %v", err)
- }
- t.Cleanup(func() { os.RemoveAll(sandbox) })
-
- if _, err := os.Stat(filepath.Join(sandbox, ".git")); err != nil {
- t.Errorf("sandbox should be a git repo: %v", err)
- }
-}
-
-func TestTeardownSandbox_AutocommitsChanges(t *testing.T) {
- // Create a bare repo as origin so push succeeds.
- bare := t.TempDir()
- if out, err := exec.Command("git", "init", "--bare", bare).CombinedOutput(); err != nil {
- t.Fatalf("git init bare: %v\n%s", err, out)
- }
-
- // Create a sandbox directly.
- sandbox := t.TempDir()
- initGitRepo(t, sandbox)
- if out, err := exec.Command("git", "-c", "safe.directory=*", "-C", sandbox, "remote", "add", "origin", bare).CombinedOutput(); err != nil {
- t.Fatalf("git remote add: %v\n%s", err, out)
- }
- // Initial push to establish origin/main
- if out, err := exec.Command("git", "-c", "safe.directory=*", "-C", sandbox, "push", "origin", "main").CombinedOutput(); err != nil {
- t.Fatalf("git push initial: %v\n%s", err, out)
- }
-
- // Capture startHEAD
- headOut, err := exec.Command("git", "-c", "safe.directory=*", "-C", sandbox, "rev-parse", "HEAD").Output()
- if err != nil {
- t.Fatalf("rev-parse HEAD: %v", err)
- }
- startHEAD := strings.TrimSpace(string(headOut))
-
- // Leave an uncommitted file in the sandbox.
- if err := os.WriteFile(filepath.Join(sandbox, "dirty.txt"), []byte("autocommit me"), 0644); err != nil {
- t.Fatal(err)
- }
-
- logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug}))
- execRecord := &storage.Execution{}
-
- err = teardownSandbox("", sandbox, startHEAD, logger, execRecord)
- if err != nil {
- t.Fatalf("expected autocommit to succeed, got error: %v", err)
- }
-
- // Sandbox should be removed after successful autocommit and push.
- if _, statErr := os.Stat(sandbox); !os.IsNotExist(statErr) {
- t.Error("sandbox should have been removed after successful autocommit and push")
- }
-
- // Verify the commit exists in the bare repo.
- out, err := exec.Command("git", "-C", bare, "log", "-1", "--pretty=%B").Output()
- if err != nil {
- t.Fatalf("git log in bare repo: %v", err)
- }
- if !strings.Contains(string(out), "chore: autocommit uncommitted changes") {
- t.Errorf("expected autocommit message in log, got: %q", string(out))
- }
-
- // Verify the commit was captured in execRecord.
- if len(execRecord.Commits) == 0 {
- t.Error("expected at least one commit in execRecord")
- } else if !strings.Contains(execRecord.Commits[0].Message, "chore: autocommit uncommitted changes") {
- t.Errorf("unexpected commit message: %q", execRecord.Commits[0].Message)
- }
-}
-
-func TestTeardownSandbox_BuildFailure_BlocksAutocommit(t *testing.T) {
- bare := t.TempDir()
- if out, err := exec.Command("git", "init", "--bare", bare).CombinedOutput(); err != nil {
- t.Fatalf("git init bare: %v\n%s", err, out)
- }
-
- sandbox := t.TempDir()
- initGitRepo(t, sandbox)
- if out, err := exec.Command("git", "-c", "safe.directory=*", "-C", sandbox, "remote", "add", "origin", bare).CombinedOutput(); err != nil {
- t.Fatalf("git remote add: %v\n%s", err, out)
- }
-
- // Capture startHEAD
- headOut, err := exec.Command("git", "-c", "safe.directory=*", "-C", sandbox, "rev-parse", "HEAD").Output()
- if err != nil {
- t.Fatalf("rev-parse HEAD: %v", err)
- }
- startHEAD := strings.TrimSpace(string(headOut))
-
- // Leave an uncommitted file.
- if err := os.WriteFile(filepath.Join(sandbox, "dirty.txt"), []byte("dirty"), 0644); err != nil {
- t.Fatal(err)
- }
-
- // Add a failing Makefile.
- makefile := "build:\n\t@echo 'build failed'\n\texit 1\n"
- if err := os.WriteFile(filepath.Join(sandbox, "Makefile"), []byte(makefile), 0644); err != nil {
- t.Fatal(err)
- }
-
- logger := slog.New(slog.NewTextHandler(io.Discard, nil))
- execRecord := &storage.Execution{}
-
- err = teardownSandbox("", sandbox, startHEAD, logger, execRecord)
- if err == nil {
- t.Error("expected teardown to fail due to build failure, but it succeeded")
- } else if !strings.Contains(err.Error(), "build failed before autocommit") {
- t.Errorf("expected build failure error message, got: %v", err)
- }
-
- // Sandbox should NOT be removed if teardown failed.
- if _, statErr := os.Stat(sandbox); os.IsNotExist(statErr) {
- t.Error("sandbox should have been preserved after build failure")
- }
-
- // Verify no new commit in bare repo.
- out, err := exec.Command("git", "-C", bare, "log", "HEAD").CombinedOutput()
- if strings.Contains(string(out), "chore: autocommit uncommitted changes") {
- t.Error("autocommit should not have been pushed after build failure")
- }
-}
-
-func TestTeardownSandbox_BuildSuccess_ProceedsToAutocommit(t *testing.T) {
- bare := t.TempDir()
- if out, err := exec.Command("git", "init", "--bare", bare).CombinedOutput(); err != nil {
- t.Fatalf("git init bare: %v\n%s", err, out)
- }
-
- sandbox := t.TempDir()
- initGitRepo(t, sandbox)
- if out, err := exec.Command("git", "-c", "safe.directory=*", "-C", sandbox, "remote", "add", "origin", bare).CombinedOutput(); err != nil {
- t.Fatalf("git remote add: %v\n%s", err, out)
- }
-
- // Capture startHEAD
- headOut, err := exec.Command("git", "-c", "safe.directory=*", "-C", sandbox, "rev-parse", "HEAD").Output()
- if err != nil {
- t.Fatalf("rev-parse HEAD: %v", err)
- }
- startHEAD := strings.TrimSpace(string(headOut))
-
- // Leave an uncommitted file.
- if err := os.WriteFile(filepath.Join(sandbox, "dirty.txt"), []byte("dirty"), 0644); err != nil {
- t.Fatal(err)
- }
-
- // Add a successful Makefile.
- makefile := "build:\n\t@echo 'build succeeded'\n"
- if err := os.WriteFile(filepath.Join(sandbox, "Makefile"), []byte(makefile), 0644); err != nil {
- t.Fatal(err)
- }
-
- logger := slog.New(slog.NewTextHandler(io.Discard, nil))
- execRecord := &storage.Execution{}
-
- err = teardownSandbox("", sandbox, startHEAD, logger, execRecord)
- if err != nil {
- t.Fatalf("expected teardown to succeed after build success, got error: %v", err)
- }
-
- // Sandbox should be removed after success.
- if _, statErr := os.Stat(sandbox); !os.IsNotExist(statErr) {
- t.Error("sandbox should have been removed after successful build and autocommit")
- }
-
- // Verify new commit in bare repo.
- out, err := exec.Command("git", "-C", bare, "log", "-1", "--pretty=%B").Output()
- if err != nil {
- t.Fatalf("git log in bare repo: %v", err)
- }
- if !strings.Contains(string(out), "chore: autocommit uncommitted changes") {
- t.Errorf("expected autocommit message in log, got: %q", string(out))
- }
-}
-
-
-func TestTeardownSandbox_CleanSandboxWithNoNewCommits_RemovesSandbox(t *testing.T) {
- src := t.TempDir()
- initGitRepo(t, src)
- logger := slog.New(slog.NewTextHandler(io.Discard, nil))
- sandbox, err := setupSandbox(src, logger)
- if err != nil {
- t.Fatalf("setupSandbox: %v", err)
- }
-
- execRecord := &storage.Execution{}
-
- headOut, _ := exec.Command("git", "-C", sandbox, "rev-parse", "HEAD").Output()
- startHEAD := strings.TrimSpace(string(headOut))
-
- // Sandbox has no new commits beyond origin; teardown should succeed and remove it.
- if err := teardownSandbox(src, sandbox, startHEAD, logger, execRecord); err != nil {
- t.Fatalf("teardownSandbox: %v", err)
- }
- if _, statErr := os.Stat(sandbox); !os.IsNotExist(statErr) {
- t.Error("sandbox should have been removed after clean teardown")
- os.RemoveAll(sandbox)
- }
-}
-
-// TestBlockedError_IncludesSandboxDir verifies that when a task is blocked in a
-// sandbox, the BlockedError carries the sandbox path so the resume execution can
-// run in the same directory (where Claude's session files are stored).
-func TestBlockedError_IncludesSandboxDir(t *testing.T) {
- src := t.TempDir()
- initGitRepo(t, src)
-
- logDir := t.TempDir()
-
- // Use a script that writes question.json to the env-var path and exits 0
- // (simulating a blocked agent that asks a question before exiting).
- scriptPath := filepath.Join(t.TempDir(), "fake-claude.sh")
- if err := os.WriteFile(scriptPath, []byte(`#!/bin/sh
-if [ -n "$CLAUDOMATOR_QUESTION_FILE" ]; then
- printf '{"text":"Should I continue?"}' > "$CLAUDOMATOR_QUESTION_FILE"
-fi
-`), 0755); err != nil {
- t.Fatalf("write script: %v", err)
- }
-
- r := &ClaudeRunner{
- BinaryPath: scriptPath,
- Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
- LogDir: logDir,
- }
- tk := &task.Task{
- Agent: task.AgentConfig{
- Type: "claude",
- Instructions: "do something",
- ProjectDir: src,
- SkipPlanning: true,
- },
- }
- exec := &storage.Execution{ID: "blocked-exec-uuid", TaskID: "task-1"}
-
- err := r.Run(context.Background(), tk, exec)
-
- var blocked *BlockedError
- if !errors.As(err, &blocked) {
- t.Fatalf("expected BlockedError, got: %v", err)
- }
- if blocked.SandboxDir == "" {
- t.Error("BlockedError.SandboxDir should be set when task runs in a sandbox")
- }
- // Sandbox should still exist (preserved for resume).
- if _, statErr := os.Stat(blocked.SandboxDir); os.IsNotExist(statErr) {
- t.Error("sandbox directory should be preserved when blocked")
- } else {
- os.RemoveAll(blocked.SandboxDir) // cleanup
- }
-}
-
-// TestClaudeRunner_Run_ResumeUsesStoredSandboxDir verifies that when a resume
-// execution has SandboxDir set, the runner uses that directory (not project_dir)
-// as the working directory, so Claude finds its session files there.
-func TestClaudeRunner_Run_ResumeUsesStoredSandboxDir(t *testing.T) {
- logDir := t.TempDir()
- sandboxDir := t.TempDir()
- cwdFile := filepath.Join(logDir, "cwd.txt")
-
- // Use a script that writes its working directory to a file in logDir (stable path).
- scriptPath := filepath.Join(t.TempDir(), "fake-claude.sh")
- script := "#!/bin/sh\nprintf '%s' \"$PWD\" > " + cwdFile + "\n"
- if err := os.WriteFile(scriptPath, []byte(script), 0755); err != nil {
- t.Fatalf("write script: %v", err)
- }
-
- r := &ClaudeRunner{
- BinaryPath: scriptPath,
- Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
- LogDir: logDir,
- }
- tk := &task.Task{
- Agent: task.AgentConfig{
- Type: "claude",
- ProjectDir: sandboxDir, // must exist; resume overrides it with SandboxDir anyway
- SkipPlanning: true,
- },
- }
- exec := &storage.Execution{
- ID: "resume-exec-uuid",
- TaskID: "task-1",
- ResumeSessionID: "original-session",
- ResumeAnswer: "yes",
- SandboxDir: sandboxDir,
- }
-
- _ = r.Run(context.Background(), tk, exec)
-
- got, err := os.ReadFile(cwdFile)
- if err != nil {
- t.Fatalf("cwd file not written: %v", err)
- }
- // The runner should have executed claude in sandboxDir, not in project_dir.
- if string(got) != sandboxDir {
- t.Errorf("resume working dir: want %q, got %q", sandboxDir, string(got))
- }
-}
-
-func TestClaudeRunner_Run_StaleSandboxDir_ClonesAfresh(t *testing.T) {
- logDir := t.TempDir()
- projectDir := t.TempDir()
- initGitRepo(t, projectDir)
-
- cwdFile := filepath.Join(logDir, "cwd.txt")
- scriptPath := filepath.Join(t.TempDir(), "fake-claude.sh")
- script := "#!/bin/sh\nprintf '%s' \"$PWD\" > " + cwdFile + "\n"
- if err := os.WriteFile(scriptPath, []byte(script), 0755); err != nil {
- t.Fatalf("write script: %v", err)
- }
-
- r := &ClaudeRunner{
- BinaryPath: scriptPath,
- Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
- LogDir: logDir,
- }
- tk := &task.Task{
- Agent: task.AgentConfig{
- Type: "claude",
- ProjectDir: projectDir,
- SkipPlanning: true,
- },
- }
- // Point to a sandbox that no longer exists (e.g. /tmp was purged).
- staleSandbox := filepath.Join(t.TempDir(), "gone")
- e := &storage.Execution{
- ID: "resume-exec-2",
- TaskID: "task-2",
- ResumeSessionID: "session-abc",
- ResumeAnswer: "ok",
- SandboxDir: staleSandbox,
- }
-
- if err := r.Run(context.Background(), tk, e); err != nil {
- t.Fatalf("Run with stale sandbox: %v", err)
- }
-
- got, err := os.ReadFile(cwdFile)
- if err != nil {
- t.Fatalf("cwd file not written: %v", err)
- }
- // Should have run in a fresh sandbox (not the stale path, not the raw projectDir).
- // The sandbox is removed after teardown, so we only check what it wasn't.
- cwd := string(got)
- if cwd == staleSandbox {
- t.Error("ran in stale sandbox dir that doesn't exist")
- }
- if cwd == projectDir {
- t.Error("ran directly in project_dir; expected a fresh sandbox clone")
- }
- // cwd should look like a claudomator sandbox path.
- if !strings.Contains(cwd, "claudomator-sandbox-") {
- t.Errorf("expected sandbox path, got %q", cwd)
- }
-}
-
-func TestIsCompletionReport(t *testing.T) {
- tests := []struct {
- name string
- json string
- expected bool
- }{
- {
- name: "real question with options",
- json: `{"text": "Should I proceed with implementation?", "options": ["Yes", "No"]}`,
- expected: false,
- },
- {
- name: "real question no options",
- json: `{"text": "Which approach do you prefer?"}`,
- expected: false,
- },
- {
- name: "completion report no options no question mark",
- json: `{"text": "All tests pass. Implementation complete. Summary written to CLAUDOMATOR_SUMMARY_FILE."}`,
- expected: true,
- },
- {
- name: "completion report with empty options",
- json: `{"text": "Feature implemented and committed.", "options": []}`,
- expected: true,
- },
- {
- name: "invalid json treated as not a report",
- json: `not json`,
- expected: false,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- got := isCompletionReport(tt.json)
- if got != tt.expected {
- t.Errorf("isCompletionReport(%q) = %v, want %v", tt.json, got, tt.expected)
- }
- })
- }
-}
-
-func TestTailFile_ReturnsLastNLines(t *testing.T) {
- f, err := os.CreateTemp("", "tailfile-*")
- if err != nil {
- t.Fatal(err)
- }
- defer os.Remove(f.Name())
- for i := 1; i <= 30; i++ {
- fmt.Fprintf(f, "line %d\n", i)
- }
- f.Close()
-
- got := tailFile(f.Name(), 5)
- lines := strings.Split(got, "\n")
- if len(lines) != 5 {
- t.Fatalf("want 5 lines, got %d: %q", len(lines), got)
- }
- if lines[0] != "line 26" || lines[4] != "line 30" {
- t.Errorf("want lines 26-30, got: %q", got)
- }
-}
-
-func TestTailFile_MissingFile_ReturnsEmpty(t *testing.T) {
- got := tailFile("/nonexistent/path/file.log", 10)
- if got != "" {
- t.Errorf("want empty string for missing file, got %q", got)
- }
-}
-
-func TestGitSafe_PrependsSafeDirectory(t *testing.T) {
- got := gitSafe("-C", "/some/path", "status")
- want := []string{"-c", "safe.directory=*", "-C", "/some/path", "status"}
- if len(got) != len(want) {
- t.Fatalf("gitSafe() = %v, want %v", got, want)
- }
- for i := range want {
- if got[i] != want[i] {
- t.Errorf("gitSafe()[%d] = %q, want %q", i, got[i], want[i])
- }
- }
-}
diff --git a/internal/executor/container.go b/internal/executor/container.go
new file mode 100644
index 0000000..c43e201
--- /dev/null
+++ b/internal/executor/container.go
@@ -0,0 +1,380 @@
+package executor
+
+import (
+ "context"
+ "fmt"
+ "log/slog"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "strings"
+ "sync"
+ "syscall"
+
+ "github.com/thepeterstone/claudomator/internal/storage"
+ "github.com/thepeterstone/claudomator/internal/task"
+)
+
+// ContainerRunner executes an agent inside a container.
+type ContainerRunner struct {
+ Image string // default image if not specified in task
+ Logger *slog.Logger
+ LogDir string
+ APIURL string
+ DropsDir string
+ SSHAuthSock string // optional path to host SSH agent
+ ClaudeBinary string // optional path to claude binary in container
+ GeminiBinary string // optional path to gemini binary in container
+ ClaudeConfigDir string // host path to ~/.claude; mounted into container for auth credentials
+ // Command allows mocking exec.CommandContext for tests.
+ Command func(ctx context.Context, name string, arg ...string) *exec.Cmd
+}
+
+func (r *ContainerRunner) command(ctx context.Context, name string, arg ...string) *exec.Cmd {
+ if r.Command != nil {
+ return r.Command(ctx, name, arg...)
+ }
+ return exec.CommandContext(ctx, name, arg...)
+}
+
+func (r *ContainerRunner) ExecLogDir(execID string) string {
+ if r.LogDir == "" {
+ return ""
+ }
+ return filepath.Join(r.LogDir, execID)
+}
+
+func (r *ContainerRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error {
+ var err error
+ repoURL := t.RepositoryURL
+ if repoURL == "" {
+ repoURL = t.Agent.RepositoryURL
+ }
+ if repoURL == "" {
+ // Fallback to project_dir if repository_url is not set (legacy support).
+ // Prefer the 'local' bare remote so that git push succeeds after execution
+ // (pushing to a non-bare working copy on a checked-out branch is rejected by git).
+ if t.Agent.ProjectDir != "" {
+ repoURL = t.Agent.ProjectDir
+ if out, err2 := exec.Command("git", "-C", t.Agent.ProjectDir, "remote", "get-url", "local").Output(); err2 == nil {
+ repoURL = strings.TrimSpace(string(out))
+ }
+ } else {
+ return fmt.Errorf("task %s has no repository_url or project_dir", t.ID)
+ }
+ }
+
+ image := t.Agent.ContainerImage
+ if image == "" {
+ image = r.Image
+ }
+ if image == "" {
+ image = "claudomator-agent:latest"
+ }
+
+ // 1. Setup workspace on host
+ var workspace string
+ isResume := false
+ if e.SandboxDir != "" {
+ if _, err = os.Stat(e.SandboxDir); err == nil {
+ workspace = e.SandboxDir
+ isResume = true
+ r.Logger.Info("resuming in preserved workspace", "path", workspace)
+ }
+ }
+
+ if workspace == "" {
+ workspace, err = os.MkdirTemp("", "claudomator-workspace-*")
+ if err != nil {
+ return fmt.Errorf("creating workspace: %w", err)
+ }
+ // chmod applied after clone; see step 2.
+ }
+
+ // Note: workspace is only removed on success. On failure, it's preserved for debugging.
+ // If the task becomes BLOCKED, it's also preserved for resumption.
+ success := false
+ isBlocked := false
+ defer func() {
+ if success && !isBlocked {
+ os.RemoveAll(workspace)
+ } else {
+ r.Logger.Warn("preserving workspace", "path", workspace, "success", success, "blocked", isBlocked)
+ }
+ }()
+
+ // 2. Clone repo into workspace if not resuming.
+ // git clone requires the target directory to not exist; remove the MkdirTemp-created dir first.
+ if !isResume {
+ if err := os.Remove(workspace); err != nil {
+ return fmt.Errorf("removing workspace before clone: %w", err)
+ }
+ r.Logger.Info("cloning repository", "url", repoURL, "workspace", workspace)
+ if out, err := r.command(ctx, "git", "clone", repoURL, workspace).CombinedOutput(); err != nil {
+ return fmt.Errorf("git clone failed: %w\n%s", err, string(out))
+ }
+ if err = os.Chmod(workspace, 0755); err != nil {
+ return fmt.Errorf("chmod cloned workspace: %w", err)
+ }
+ }
+ e.SandboxDir = workspace
+
+ // 3. Prepare logs
+ logDir := r.ExecLogDir(e.ID)
+ if logDir == "" {
+ logDir = filepath.Join(workspace, ".claudomator-logs")
+ }
+ if err := os.MkdirAll(logDir, 0700); err != nil {
+ return fmt.Errorf("creating log dir: %w", err)
+ }
+ e.StdoutPath = filepath.Join(logDir, "stdout.log")
+ e.StderrPath = filepath.Join(logDir, "stderr.log")
+ e.ArtifactDir = logDir
+
+ stdoutFile, err := os.Create(e.StdoutPath)
+ if err != nil {
+ return fmt.Errorf("creating stdout log: %w", err)
+ }
+ defer stdoutFile.Close()
+
+ stderrFile, err := os.Create(e.StderrPath)
+ if err != nil {
+ return fmt.Errorf("creating stderr log: %w", err)
+ }
+ defer stderrFile.Close()
+
+ // 4. Run container
+
+ // Write API keys to a temporary env file to avoid exposure in 'ps' or 'docker inspect'
+ envFile := filepath.Join(workspace, ".claudomator-env")
+ envContent := fmt.Sprintf("ANTHROPIC_API_KEY=%s\nGOOGLE_API_KEY=%s\nGEMINI_API_KEY=%s\n", os.Getenv("ANTHROPIC_API_KEY"), os.Getenv("GOOGLE_API_KEY"), os.Getenv("GEMINI_API_KEY"))
+ if err := os.WriteFile(envFile, []byte(envContent), 0600); err != nil {
+ return fmt.Errorf("writing env file: %w", err)
+ }
+
+ // Inject custom instructions via file to avoid CLI length limits
+ instructionsFile := filepath.Join(workspace, ".claudomator-instructions.txt")
+ if err := os.WriteFile(instructionsFile, []byte(t.Agent.Instructions), 0644); err != nil {
+ return fmt.Errorf("writing instructions: %w", err)
+ }
+
+ // Set up a writable $HOME staging dir so any agent tool (claude, gemini, etc.)
+ // can freely create subdirs (session-env, .gemini, .cache, …) without hitting
+ // a non-existent or read-only home. We copy only the claude credentials into it.
+ agentHome := filepath.Join(workspace, ".agent-home")
+ if err := os.MkdirAll(filepath.Join(agentHome, ".claude"), 0755); err != nil {
+ return fmt.Errorf("creating agent home staging dir: %w", err)
+ }
+ if err := os.MkdirAll(filepath.Join(agentHome, ".gemini"), 0755); err != nil {
+ return fmt.Errorf("creating .gemini dir: %w", err)
+ }
+ if r.ClaudeConfigDir != "" {
+ // credentials
+ if srcData, readErr := os.ReadFile(filepath.Join(r.ClaudeConfigDir, ".credentials.json")); readErr == nil {
+ _ = os.WriteFile(filepath.Join(agentHome, ".claude", ".credentials.json"), srcData, 0600)
+ }
+ // settings (used by claude CLI; copy so it can write updates without hitting the host)
+ if srcData, readErr := os.ReadFile(filepath.Join(filepath.Dir(r.ClaudeConfigDir), ".claude.json")); readErr == nil {
+ _ = os.WriteFile(filepath.Join(agentHome, ".claude.json"), srcData, 0644)
+ }
+ }
+
+ args := r.buildDockerArgs(workspace, agentHome, e.TaskID)
+ innerCmd := r.buildInnerCmd(t, e, isResume)
+
+ fullArgs := append(args, image)
+ fullArgs = append(fullArgs, innerCmd...)
+
+ r.Logger.Info("starting container", "image", image, "taskID", t.ID)
+ cmd := r.command(ctx, "docker", fullArgs...)
+ cmd.Stderr = stderrFile
+ cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
+
+ // Use os.Pipe for stdout so we can parse it in real-time
+ var stdoutR, stdoutW *os.File
+ stdoutR, stdoutW, err = os.Pipe()
+ if err != nil {
+ return fmt.Errorf("creating stdout pipe: %w", err)
+ }
+ cmd.Stdout = stdoutW
+
+ if err := cmd.Start(); err != nil {
+ stdoutW.Close()
+ stdoutR.Close()
+ return fmt.Errorf("starting container: %w", err)
+ }
+ stdoutW.Close()
+
+ // Watch for context cancellation to kill the process group (Issue 1)
+ done := make(chan struct{})
+ defer close(done)
+ go func() {
+ select {
+ case <-ctx.Done():
+ r.Logger.Info("killing container process group due to context cancellation", "taskID", t.ID)
+ syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)
+ case <-done:
+ }
+ }()
+
+ // Stream stdout to the log file and parse cost/errors.
+ var costUSD float64
+ var sessionID string
+ var streamErr error
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ costUSD, sessionID, streamErr = parseStream(stdoutR, stdoutFile, r.Logger)
+ stdoutR.Close()
+ }()
+
+ waitErr := cmd.Wait()
+ wg.Wait()
+
+ e.CostUSD = costUSD
+ if sessionID != "" {
+ e.SessionID = sessionID
+ }
+
+ // Check whether the agent left a question before exiting.
+ questionFile := filepath.Join(logDir, "question.json")
+ if data, readErr := os.ReadFile(questionFile); readErr == nil {
+ os.Remove(questionFile) // consumed
+ questionJSON := strings.TrimSpace(string(data))
+ if isCompletionReport(questionJSON) {
+ r.Logger.Info("treating question file as completion report", "taskID", e.TaskID)
+ e.Summary = extractQuestionText(questionJSON)
+ } else {
+ isBlocked = true
+ success = true // We consider BLOCKED as a "success" for workspace preservation
+ if e.SessionID == "" {
+ r.Logger.Warn("missing session ID; resume will start fresh", "taskID", e.TaskID)
+ }
+ return &BlockedError{
+ QuestionJSON: questionJSON,
+ SessionID: e.SessionID,
+ SandboxDir: workspace,
+ }
+ }
+ }
+
+ // Read agent summary if written.
+ summaryFile := filepath.Join(logDir, "summary.txt")
+ if summaryData, readErr := os.ReadFile(summaryFile); readErr == nil {
+ os.Remove(summaryFile) // consumed
+ e.Summary = strings.TrimSpace(string(summaryData))
+ }
+
+ // 5. Post-execution: push changes if successful
+ if waitErr == nil && streamErr == nil {
+ // Check if there are any commits to push (HEAD ahead of origin/HEAD).
+ // If origin/HEAD doesn't exist (e.g. fresh clone with no commits), we attempt push anyway.
+ hasCommits := true
+ if out, err := r.command(ctx, "git", "-C", workspace, "rev-list", "origin/HEAD..HEAD").CombinedOutput(); err == nil {
+ if len(strings.TrimSpace(string(out))) == 0 {
+ hasCommits = false
+ }
+ }
+
+ if hasCommits {
+ r.Logger.Info("pushing changes back to remote", "url", repoURL)
+ if out, err := r.command(ctx, "git", "-C", workspace, "push", "origin", "HEAD").CombinedOutput(); err != nil {
+ r.Logger.Warn("git push failed", "error", err, "output", string(out))
+ return fmt.Errorf("git push failed: %w\n%s", err, string(out))
+ }
+ } else {
+ r.Logger.Info("no new commits to push", "taskID", t.ID)
+ }
+ success = true
+ }
+
+ if waitErr != nil {
+ return fmt.Errorf("container execution failed: %w", waitErr)
+ }
+ if streamErr != nil {
+ return fmt.Errorf("stream parsing failed: %w", streamErr)
+ }
+
+ return nil
+}
+
+func (r *ContainerRunner) buildDockerArgs(workspace, claudeHome, taskID string) []string {
+ // --env-file takes a HOST path.
+ hostEnvFile := filepath.Join(workspace, ".claudomator-env")
+
+ // Replace localhost with host.docker.internal so the container can reach the host API.
+ apiURL := strings.ReplaceAll(r.APIURL, "localhost", "host.docker.internal")
+
+ args := []string{
+ "run", "--rm",
+ // Allow container to reach the host via host.docker.internal.
+ "--add-host=host.docker.internal:host-gateway",
+ // Run as the current process UID:GID so the container can read host-owned files.
+ fmt.Sprintf("--user=%d:%d", os.Getuid(), os.Getgid()),
+ "-v", workspace + ":/workspace",
+ "-v", claudeHome + ":/home/agent",
+ "-w", "/workspace",
+ "--env-file", hostEnvFile,
+ "-e", "HOME=/home/agent",
+ "-e", "CLAUDOMATOR_API_URL=" + apiURL,
+ "-e", "CLAUDOMATOR_TASK_ID=" + taskID,
+ "-e", "CLAUDOMATOR_DROP_DIR=" + r.DropsDir,
+ }
+ if r.SSHAuthSock != "" {
+ args = append(args, "-v", r.SSHAuthSock+":/tmp/ssh-auth.sock", "-e", "SSH_AUTH_SOCK=/tmp/ssh-auth.sock")
+ }
+ return args
+}
+
+func (r *ContainerRunner) buildInnerCmd(t *task.Task, e *storage.Execution, isResume bool) []string {
+ // Claude CLI uses -p for prompt text. To pass a file, we use a shell to cat it.
+ // We use a shell variable to capture the expansion to avoid quoting issues with instructions contents.
+ // The outer single quotes around the sh -c argument prevent host-side expansion.
+
+ claudeBin := r.ClaudeBinary
+ if claudeBin == "" {
+ claudeBin = "claude"
+ }
+ geminiBin := r.GeminiBinary
+ if geminiBin == "" {
+ geminiBin = "gemini"
+ }
+
+ if t.Agent.Type == "gemini" {
+ return []string{"sh", "-c", fmt.Sprintf("INST=$(cat /workspace/.claudomator-instructions.txt); %s -p \"$INST\"", geminiBin)}
+ }
+
+ // Claude
+ var claudeCmd strings.Builder
+ claudeCmd.WriteString(fmt.Sprintf("INST=$(cat /workspace/.claudomator-instructions.txt); %s -p \"$INST\"", claudeBin))
+ if isResume && e.ResumeSessionID != "" {
+ claudeCmd.WriteString(fmt.Sprintf(" --resume %s", e.ResumeSessionID))
+ }
+ claudeCmd.WriteString(" --output-format stream-json --verbose --permission-mode bypassPermissions")
+
+ return []string{"sh", "-c", claudeCmd.String()}
+}
+
+
+func (r *ContainerRunner) fallbackGitInit(repoURL, workspace string) error {
+ // Ensure directory exists
+ if err := os.MkdirAll(workspace, 0755); err != nil {
+ return err
+ }
+ // If it's a local directory but not a repo, init it.
+ cmds := [][]string{
+ gitSafe("-C", workspace, "init"),
+ gitSafe("-C", workspace, "add", "-A"),
+ gitSafe("-C", workspace, "commit", "--allow-empty", "-m", "chore: initial commit"),
+ }
+ // If it was a local path, maybe we should have copied it?
+ // git clone handle local paths fine if they are repos.
+ // This fallback is only if it's NOT a repo.
+ for _, args := range cmds {
+ if out, err := r.command(context.Background(), "git", args...).CombinedOutput(); err != nil {
+ return fmt.Errorf("git init failed: %w\n%s", err, out)
+ }
+ }
+ return nil
+}
diff --git a/internal/executor/container_test.go b/internal/executor/container_test.go
new file mode 100644
index 0000000..f97f2b5
--- /dev/null
+++ b/internal/executor/container_test.go
@@ -0,0 +1,244 @@
+package executor
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "log/slog"
+ "os"
+ "os/exec"
+ "strings"
+ "testing"
+
+ "github.com/thepeterstone/claudomator/internal/storage"
+ "github.com/thepeterstone/claudomator/internal/task"
+)
+
+func TestContainerRunner_BuildDockerArgs(t *testing.T) {
+ runner := &ContainerRunner{
+ APIURL: "http://localhost:8484",
+ DropsDir: "/data/drops",
+ SSHAuthSock: "/tmp/ssh.sock",
+ }
+ workspace := "/tmp/ws"
+ taskID := "task-123"
+
+ agentHome := "/tmp/ws/.agent-home"
+ args := runner.buildDockerArgs(workspace, agentHome, taskID)
+
+ expected := []string{
+ "run", "--rm",
+ "--add-host=host.docker.internal:host-gateway",
+ fmt.Sprintf("--user=%d:%d", os.Getuid(), os.Getgid()),
+ "-v", "/tmp/ws:/workspace",
+ "-v", "/tmp/ws/.agent-home:/home/agent",
+ "-w", "/workspace",
+ "--env-file", "/tmp/ws/.claudomator-env",
+ "-e", "HOME=/home/agent",
+ "-e", "CLAUDOMATOR_API_URL=http://host.docker.internal:8484",
+ "-e", "CLAUDOMATOR_TASK_ID=task-123",
+ "-e", "CLAUDOMATOR_DROP_DIR=/data/drops",
+ "-v", "/tmp/ssh.sock:/tmp/ssh-auth.sock",
+ "-e", "SSH_AUTH_SOCK=/tmp/ssh-auth.sock",
+ }
+
+ if len(args) != len(expected) {
+ t.Fatalf("expected %d args, got %d. Got: %v", len(expected), len(args), args)
+ }
+ for i, v := range args {
+ if v != expected[i] {
+ t.Errorf("arg %d: expected %q, got %q", i, expected[i], v)
+ }
+ }
+}
+
+func TestContainerRunner_BuildInnerCmd(t *testing.T) {
+ runner := &ContainerRunner{}
+
+ t.Run("claude-fresh", func(t *testing.T) {
+ tk := &task.Task{Agent: task.AgentConfig{Type: "claude"}}
+ exec := &storage.Execution{}
+ cmd := runner.buildInnerCmd(tk, exec, false)
+
+ cmdStr := strings.Join(cmd, " ")
+ if strings.Contains(cmdStr, "--resume") {
+ t.Errorf("unexpected --resume flag in fresh run: %q", cmdStr)
+ }
+ if !strings.Contains(cmdStr, "INST=$(cat /workspace/.claudomator-instructions.txt); claude -p \"$INST\"") {
+ t.Errorf("expected cat instructions in sh command, got %q", cmdStr)
+ }
+ })
+
+ t.Run("claude-resume", func(t *testing.T) {
+ tk := &task.Task{Agent: task.AgentConfig{Type: "claude"}}
+ exec := &storage.Execution{ResumeSessionID: "orig-session-123"}
+ cmd := runner.buildInnerCmd(tk, exec, true)
+
+ cmdStr := strings.Join(cmd, " ")
+ if !strings.Contains(cmdStr, "--resume orig-session-123") {
+ t.Errorf("expected --resume flag with correct session ID, got %q", cmdStr)
+ }
+ })
+
+ t.Run("gemini", func(t *testing.T) {
+ tk := &task.Task{Agent: task.AgentConfig{Type: "gemini"}}
+ exec := &storage.Execution{}
+ cmd := runner.buildInnerCmd(tk, exec, false)
+
+ cmdStr := strings.Join(cmd, " ")
+ if !strings.Contains(cmdStr, "gemini -p \"$INST\"") {
+ t.Errorf("expected gemini command with safer quoting, got %q", cmdStr)
+ }
+ })
+
+ t.Run("custom-binaries", func(t *testing.T) {
+ runnerCustom := &ContainerRunner{
+ ClaudeBinary: "/usr/bin/claude-v2",
+ GeminiBinary: "/usr/local/bin/gemini-pro",
+ }
+
+ tkClaude := &task.Task{Agent: task.AgentConfig{Type: "claude"}}
+ cmdClaude := runnerCustom.buildInnerCmd(tkClaude, &storage.Execution{}, false)
+ if !strings.Contains(strings.Join(cmdClaude, " "), "/usr/bin/claude-v2 -p") {
+ t.Errorf("expected custom claude binary, got %q", cmdClaude)
+ }
+
+ tkGemini := &task.Task{Agent: task.AgentConfig{Type: "gemini"}}
+ cmdGemini := runnerCustom.buildInnerCmd(tkGemini, &storage.Execution{}, false)
+ if !strings.Contains(strings.Join(cmdGemini, " "), "/usr/local/bin/gemini-pro -p") {
+ t.Errorf("expected custom gemini binary, got %q", cmdGemini)
+ }
+ })
+}
+
+func TestContainerRunner_Run_PreservesWorkspaceOnFailure(t *testing.T) {
+ logger := slog.New(slog.NewTextHandler(io.Discard, nil))
+ runner := &ContainerRunner{
+ Logger: logger,
+ Image: "busybox",
+ Command: func(ctx context.Context, name string, arg ...string) *exec.Cmd {
+ // Mock docker run to exit 1
+ if name == "docker" {
+ return exec.Command("sh", "-c", "exit 1")
+ }
+ // Mock git clone to succeed and create the directory
+ if name == "git" && len(arg) > 0 && arg[0] == "clone" {
+ dir := arg[len(arg)-1]
+ os.MkdirAll(dir, 0755)
+ return exec.Command("true")
+ }
+ return exec.Command("true")
+ },
+ }
+
+ tk := &task.Task{
+ ID: "test-task",
+ RepositoryURL: "https://github.com/example/repo.git",
+ Agent: task.AgentConfig{Type: "claude"},
+ }
+ exec := &storage.Execution{ID: "test-exec", TaskID: "test-task"}
+
+ err := runner.Run(context.Background(), tk, exec)
+ if err == nil {
+ t.Fatal("expected error due to mocked docker failure")
+ }
+
+ // Verify SandboxDir was set and directory exists.
+ if exec.SandboxDir == "" {
+ t.Fatal("expected SandboxDir to be set even on failure")
+ }
+ if _, statErr := os.Stat(exec.SandboxDir); statErr != nil {
+ t.Errorf("expected sandbox directory to be preserved, but stat failed: %v", statErr)
+ } else {
+ os.RemoveAll(exec.SandboxDir)
+ }
+}
+
+func TestBlockedError_IncludesSandboxDir(t *testing.T) {
+ // This test requires mocking 'docker run' or the whole Run() which is hard.
+ // But we can test that returning BlockedError works.
+ err := &BlockedError{
+ QuestionJSON: `{"text":"?"}`,
+ SessionID: "s1",
+ SandboxDir: "/tmp/s1",
+ }
+ if !strings.Contains(err.Error(), "task blocked") {
+ t.Errorf("wrong error message: %v", err)
+ }
+}
+
+func TestIsCompletionReport(t *testing.T) {
+ tests := []struct {
+ name string
+ json string
+ expected bool
+ }{
+ {
+ name: "real question with options",
+ json: `{"text": "Should I proceed with implementation?", "options": ["Yes", "No"]}`,
+ expected: false,
+ },
+ {
+ name: "real question no options",
+ json: `{"text": "Which approach do you prefer?"}`,
+ expected: false,
+ },
+ {
+ name: "completion report no options no question mark",
+ json: `{"text": "All tests pass. Implementation complete. Summary written to CLAUDOMATOR_SUMMARY_FILE."}`,
+ expected: true,
+ },
+ {
+ name: "completion report with empty options",
+ json: `{"text": "Feature implemented and committed.", "options": []}`,
+ expected: true,
+ },
+ {
+ name: "invalid json treated as not a report",
+ json: `not json`,
+ expected: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got := isCompletionReport(tt.json)
+ if got != tt.expected {
+ t.Errorf("isCompletionReport(%q) = %v, want %v", tt.json, got, tt.expected)
+ }
+ })
+ }
+}
+
+func TestTailFile_ReturnsLastNLines(t *testing.T) {
+ f, err := os.CreateTemp("", "tailfile-*")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer os.Remove(f.Name())
+ for i := 1; i <= 30; i++ {
+ fmt.Fprintf(f, "line %d\n", i)
+ }
+ f.Close()
+
+ got := tailFile(f.Name(), 5)
+ lines := strings.Split(strings.TrimSpace(got), "\n")
+ if len(lines) != 5 {
+ t.Fatalf("want 5 lines, got %d: %q", len(lines), got)
+ }
+ if lines[0] != "line 26" || lines[4] != "line 30" {
+ t.Errorf("want lines 26-30, got: %q", got)
+ }
+}
+
+func TestGitSafe_PrependsSafeDirectory(t *testing.T) {
+ got := gitSafe("-C", "/some/path", "status")
+ want := []string{"-c", "safe.directory=*", "-C", "/some/path", "status"}
+ if len(got) != len(want) {
+ t.Fatalf("gitSafe() = %v, want %v", got, want)
+ }
+ for i := range want {
+ if got[i] != want[i] {
+ t.Errorf("gitSafe()[%d] = %q, want %q", i, got[i], want[i])
+ }
+ }
+}
diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go
index 878a32d..e91d435 100644
--- a/internal/executor/executor_test.go
+++ b/internal/executor/executor_test.go
@@ -600,10 +600,17 @@ func TestPool_RecoverStaleRunning(t *testing.T) {
// Execution record should be closed as FAILED.
execs, _ := store.ListExecutions(tk.ID)
- if len(execs) == 0 || execs[0].Status != "FAILED" {
+ var failedExec *storage.Execution
+ for _, e := range execs {
+ if e.ID == "exec-stale-1" {
+ failedExec = e
+ break
+ }
+ }
+ if failedExec == nil || failedExec.Status != "FAILED" {
t.Errorf("execution status: want FAILED, got %+v", execs)
}
- if execs[0].ErrorMsg == "" {
+ if failedExec.ErrorMsg == "" {
t.Error("expected non-empty error message on recovered execution")
}
diff --git a/internal/executor/gemini.go b/internal/executor/gemini.go
deleted file mode 100644
index b1a245c..0000000
--- a/internal/executor/gemini.go
+++ /dev/null
@@ -1,228 +0,0 @@
-package executor
-
-import (
- "context"
- "encoding/json"
- "fmt"
- "io"
- "log/slog"
- "os"
- "path/filepath"
- "strings"
- "sync"
-
- "github.com/thepeterstone/claudomator/internal/storage"
- "github.com/thepeterstone/claudomator/internal/task"
-)
-
-// GeminiRunner spawns the `gemini` CLI in non-interactive mode.
-type GeminiRunner struct {
- BinaryPath string // defaults to "gemini"
- Logger *slog.Logger
- LogDir string // base directory for execution logs
- APIURL string // base URL of the Claudomator API, passed to subprocesses
- DropsDir string // path to the drops directory, passed to subprocesses
-}
-
-// ExecLogDir returns the log directory for the given execution ID.
-func (r *GeminiRunner) ExecLogDir(execID string) string {
- if r.LogDir == "" {
- return ""
- }
- return filepath.Join(r.LogDir, execID)
-}
-
-func (r *GeminiRunner) binaryPath() string {
- if r.BinaryPath != "" {
- return r.BinaryPath
- }
- return "gemini"
-}
-
-// Run executes a gemini <instructions> invocation, streaming output to log files.
-func (r *GeminiRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error {
- if t.Agent.ProjectDir != "" {
- if _, err := os.Stat(t.Agent.ProjectDir); err != nil {
- return fmt.Errorf("project_dir %q: %w", t.Agent.ProjectDir, err)
- }
- }
-
- logDir := r.ExecLogDir(e.ID)
- if logDir == "" {
- logDir = e.ID
- }
- if err := os.MkdirAll(logDir, 0700); err != nil {
- return fmt.Errorf("creating log dir: %w", err)
- }
-
- if e.StdoutPath == "" {
- e.StdoutPath = filepath.Join(logDir, "stdout.log")
- e.StderrPath = filepath.Join(logDir, "stderr.log")
- e.ArtifactDir = logDir
- }
-
- if e.SessionID == "" {
- e.SessionID = e.ID
- }
-
- questionFile := filepath.Join(logDir, "question.json")
- args := r.buildArgs(t, e, questionFile)
-
- // Gemini CLI doesn't necessarily have the same rate limiting behavior as Claude,
- // but we'll use a similar execution pattern.
- err := r.execOnce(ctx, args, t.Agent.ProjectDir, t.Agent.ProjectDir, e)
- if err != nil {
- return err
- }
-
- // Check whether the agent left a question before exiting.
- data, readErr := os.ReadFile(questionFile)
- if readErr == nil {
- os.Remove(questionFile) // consumed
- return &BlockedError{QuestionJSON: strings.TrimSpace(string(data)), SessionID: e.SessionID}
- }
- return nil
-}
-
-func (r *GeminiRunner) execOnce(ctx context.Context, args []string, workingDir, projectDir string, e *storage.Execution) error {
- // Temporarily bypass external command execution to debug pipe.
- // We will simulate outputting to stdoutW directly.
-
- stdoutFile, err := os.Create(e.StdoutPath)
- if err != nil {
- return fmt.Errorf("creating stdout log: %w", err)
- }
- defer stdoutFile.Close()
-
- stderrFile, err := os.Create(e.StderrPath)
- if err != nil {
- return fmt.Errorf("creating stderr log: %w", err)
- }
- defer stderrFile.Close()
-
- stdoutR, stdoutW, err := os.Pipe()
- if err != nil {
- return fmt.Errorf("creating stdout pipe: %w", err)
- }
-
- // Simulate writing to stdoutW
- go func() {
- defer stdoutW.Close() // Close the writer when done.
- fmt.Fprintf(stdoutW, "```json\n")
- fmt.Fprintf(stdoutW, "{\"type\":\"content_block_start\",\"content_block\":{\"text\":\"Hello, Gemini!\",\"type\":\"text\"}}\n")
- fmt.Fprintf(stdoutW, "{\"type\":\"content_block_delta\",\"content_block\":{\"text\":\" How are you?\"}}\n")
- fmt.Fprintf(stdoutW, "{\"type\":\"content_block_end\"}\n")
- fmt.Fprintf(stdoutW, "{\"type\":\"message_delta\",\"message\":{\"role\":\"model\"}}\n")
- fmt.Fprintf(stdoutW, "{\"type\":\"message_end\"}\n")
- fmt.Fprintf(stdoutW, "```\n")
- }()
-
-
- var streamErr error
- var wg sync.WaitGroup
- wg.Add(1)
- go func() {
- defer wg.Done()
- _, streamErr = parseGeminiStream(stdoutR, stdoutFile, r.Logger)
- stdoutR.Close()
- }()
-
- wg.Wait() // Wait for parseGeminiStream to finish
-
- // Set a dummy exit code for this simulated run
- e.ExitCode = 0
-
- if streamErr != nil {
- return streamErr
- }
- return nil
-}
-
-// parseGeminiStream reads streaming JSON from the gemini CLI, unwraps markdown
-// code blocks, writes the inner JSON to w, and returns (costUSD, error).
-// For now, it focuses on unwrapping and writing, not detailed parsing of cost/errors.
-func parseGeminiStream(r io.Reader, w io.Writer, logger *slog.Logger) (float64, error) {
- fullOutput, err := io.ReadAll(r)
- if err != nil {
- return 0, fmt.Errorf("reading full gemini output: %w", err)
- }
- logger.Debug("parseGeminiStream: raw output received", "output", string(fullOutput))
-
- // Default: write raw content as-is (preserves trailing newline).
- jsonContent := string(fullOutput)
-
- // Unwrap markdown code fences if present.
- trimmed := strings.TrimSpace(jsonContent)
- if jsonStartIdx := strings.Index(trimmed, "```json"); jsonStartIdx != -1 {
- if jsonEndIdx := strings.LastIndex(trimmed, "```"); jsonEndIdx != -1 && jsonEndIdx > jsonStartIdx {
- inner := trimmed[jsonStartIdx+len("```json") : jsonEndIdx]
- jsonContent = strings.TrimSpace(inner) + "\n"
- } else {
- logger.Warn("malformed markdown JSON block from Gemini, falling back to raw output", "outputLength", len(jsonContent))
- }
- }
-
- // Write the (possibly extracted) JSON content to the writer.
- if _, writeErr := w.Write([]byte(jsonContent)); writeErr != nil {
- return 0, fmt.Errorf("writing extracted gemini json: %w", writeErr)
- }
-
- // Parse each line for result type to extract cost and execution errors.
- var resultErr error
- var costUSD float64
- for _, line := range strings.Split(jsonContent, "\n") {
- line = strings.TrimSpace(line)
- if line == "" {
- continue
- }
- var msg struct {
- Type string `json:"type"`
- IsError bool `json:"is_error"`
- Result string `json:"result"`
- Cost float64 `json:"total_cost_usd"`
- }
- if err := json.Unmarshal([]byte(line), &msg); err != nil {
- continue
- }
- if msg.Type == "result" {
- costUSD = msg.Cost
- if msg.IsError {
- resultErr = fmt.Errorf("gemini execution error: %s", msg.Result)
- }
- }
- }
-
- return costUSD, resultErr
-}
-
-func (r *GeminiRunner) buildArgs(t *task.Task, e *storage.Execution, questionFile string) []string {
- // Gemini CLI uses a different command structure: gemini "instructions" [flags]
-
- instructions := t.Agent.Instructions
- if !t.Agent.SkipPlanning {
- instructions = withPlanningPreamble(instructions)
- }
-
- args := []string{
- "-p", instructions,
- "--output-format", "stream-json",
- "--yolo", // auto-approve all tools (equivalent to Claude's bypassPermissions)
- }
-
- // Note: Gemini CLI flags might differ from Claude CLI.
- // Assuming common flags for now, but these may need adjustment.
- if t.Agent.Model != "" {
- args = append(args, "--model", t.Agent.Model)
- }
-
- // Gemini CLI doesn't use --session-id for the first run in the same way,
- // or it might use it differently. For now we assume compatibility.
- if e.SessionID != "" {
- // If it's a resume, it might use different flags.
- if e.ResumeSessionID != "" {
- // This is a placeholder for Gemini's resume logic
- }
- }
-
- return args
-}
diff --git a/internal/executor/gemini_test.go b/internal/executor/gemini_test.go
deleted file mode 100644
index 4b0339e..0000000
--- a/internal/executor/gemini_test.go
+++ /dev/null
@@ -1,179 +0,0 @@
-package executor
-
-import (
- "bytes"
- "context"
- "io"
- "log/slog"
- "strings"
- "testing"
-
- "github.com/thepeterstone/claudomator/internal/storage"
- "github.com/thepeterstone/claudomator/internal/task"
-)
-
-func TestGeminiRunner_BuildArgs_BasicTask(t *testing.T) {
- r := &GeminiRunner{}
- tk := &task.Task{
- Agent: task.AgentConfig{
- Type: "gemini",
- Instructions: "fix the bug",
- Model: "gemini-2.5-flash-lite",
- SkipPlanning: true,
- },
- }
-
- args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
-
- // Gemini CLI: instructions passed via -p for non-interactive mode
- if len(args) < 2 || args[0] != "-p" || args[1] != "fix the bug" {
- t.Errorf("expected -p <instructions> as first args, got: %v", args)
- }
-
- argMap := make(map[string]bool)
- for _, a := range args {
- argMap[a] = true
- }
- for _, want := range []string{"--output-format", "stream-json", "--model", "gemini-2.5-flash-lite"} {
- if !argMap[want] {
- t.Errorf("missing arg %q in %v", want, args)
- }
- }
-}
-
-func TestGeminiRunner_BuildArgs_PreamblePrepended(t *testing.T) {
- r := &GeminiRunner{}
- tk := &task.Task{
- Agent: task.AgentConfig{
- Type: "gemini",
- Instructions: "fix the bug",
- SkipPlanning: false,
- },
- }
-
- args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
-
- if len(args) < 2 || args[0] != "-p" {
- t.Fatalf("expected -p <instructions> as first args, got: %v", args)
- }
- if !strings.HasPrefix(args[1], planningPreamble) {
- t.Errorf("instructions should start with planning preamble")
- }
- if !strings.HasSuffix(args[1], "fix the bug") {
- t.Errorf("instructions should end with original instructions")
- }
-}
-
-func TestGeminiRunner_BuildArgs_IncludesYolo(t *testing.T) {
- r := &GeminiRunner{}
- tk := &task.Task{
- Agent: task.AgentConfig{
- Type: "gemini",
- Instructions: "write a doc",
- SkipPlanning: true,
- },
- }
- args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
- argMap := make(map[string]bool)
- for _, a := range args {
- argMap[a] = true
- }
- if !argMap["--yolo"] {
- t.Errorf("expected --yolo in gemini args (enables all tools); got: %v", args)
- }
-}
-
-func TestGeminiRunner_BuildArgs_IncludesPromptFlag(t *testing.T) {
- r := &GeminiRunner{}
- tk := &task.Task{
- Agent: task.AgentConfig{
- Type: "gemini",
- Instructions: "do the thing",
- SkipPlanning: true,
- },
- }
- args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
- // Instructions must be passed via -p/--prompt for non-interactive headless mode,
- // not as a bare positional (which starts interactive mode).
- found := false
- for i, a := range args {
- if (a == "-p" || a == "--prompt") && i+1 < len(args) && args[i+1] == "do the thing" {
- found = true
- break
- }
- }
- if !found {
- t.Errorf("expected instructions passed via -p/--prompt flag; got: %v", args)
- }
-}
-
-func TestGeminiRunner_Run_InaccessibleProjectDir_ReturnsError(t *testing.T) {
- r := &GeminiRunner{
- BinaryPath: "true", // would succeed if it ran
- Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
- LogDir: t.TempDir(),
- }
- tk := &task.Task{
- Agent: task.AgentConfig{
- Type: "gemini",
- ProjectDir: "/nonexistent/path/does/not/exist",
- SkipPlanning: true,
- },
- }
- exec := &storage.Execution{ID: "test-exec"}
-
- err := r.Run(context.Background(), tk, exec)
-
- if err == nil {
- t.Fatal("expected error for inaccessible project_dir, got nil")
- }
- if !strings.Contains(err.Error(), "project_dir") {
- t.Errorf("expected 'project_dir' in error, got: %v", err)
- }
-}
-
-func TestGeminiRunner_BinaryPath_Default(t *testing.T) {
- r := &GeminiRunner{}
- if r.binaryPath() != "gemini" {
- t.Errorf("want 'gemini', got %q", r.binaryPath())
- }
-}
-
-func TestGeminiRunner_BinaryPath_Custom(t *testing.T) {
- r := &GeminiRunner{BinaryPath: "/usr/local/bin/gemini"}
- if r.binaryPath() != "/usr/local/bin/gemini" {
- t.Errorf("want custom path, got %q", r.binaryPath())
- }
-}
-
-
-func TestParseGeminiStream_ParsesStructuredOutput(t *testing.T) {
- // Simulate a stream-json input with various message types, including a result with error and cost.
- input := streamLine(`{"type":"content_block_start","content_block":{"text":"Hello,"}}`) +
- streamLine(`{"type":"content_block_delta","content_block":{"text":" World!"}}`) +
- streamLine(`{"type":"content_block_end"}`) +
- streamLine(`{"type":"result","subtype":"error_during_execution","is_error":true,"result":"something went wrong","total_cost_usd":0.123}`)
-
- reader := strings.NewReader(input)
- var writer bytes.Buffer // To capture what's written to the output log
- logger := slog.New(slog.NewTextHandler(io.Discard, nil))
-
- cost, err := parseGeminiStream(reader, &writer, logger)
-
- if err == nil {
- t.Errorf("expected an error, got nil")
- }
- if !strings.Contains(err.Error(), "something went wrong") {
- t.Errorf("expected error message to contain 'something went wrong', got: %v", err)
- }
-
- if cost != 0.123 {
- t.Errorf("expected cost 0.123, got %f", cost)
- }
-
- // Verify that the writer received the content (even if parseGeminiStream isn't fully parsing it yet)
- expectedWriterContent := input
- if writer.String() != expectedWriterContent {
- t.Errorf("writer content mismatch:\nwant:\n%s\ngot:\n%s", expectedWriterContent, writer.String())
- }
-}
diff --git a/internal/executor/helpers.go b/internal/executor/helpers.go
new file mode 100644
index 0000000..9e4530b
--- /dev/null
+++ b/internal/executor/helpers.go
@@ -0,0 +1,174 @@
+package executor
+
+import (
+ "bufio"
+ "encoding/json"
+ "fmt"
+ "io"
+ "log/slog"
+ "os"
+ "strings"
+)
+
+// BlockedError is returned by Run when the agent wrote a question file and exited.
+// The pool transitions the task to BLOCKED and stores the question for the user.
+type BlockedError struct {
+ QuestionJSON string // raw JSON from the question file
+ SessionID string // claude session to resume once the user answers
+ SandboxDir string // preserved sandbox path; resume must run here so Claude finds its session files
+}
+
+func (e *BlockedError) Error() string { return fmt.Sprintf("task blocked: %s", e.QuestionJSON) }
+
+// parseStream reads streaming JSON from claude, writes to w, and returns
+// (costUSD, error). error is non-nil if the stream signals task failure:
+// - result message has is_error:true
+// - a tool_result was denied due to missing permissions
+func parseStream(r io.Reader, w io.Writer, logger *slog.Logger) (float64, string, error) {
+ tee := io.TeeReader(r, w)
+ scanner := bufio.NewScanner(tee)
+ scanner.Buffer(make([]byte, 1024*1024), 1024*1024) // 1MB buffer for large lines
+
+ var totalCost float64
+ var sessionID string
+ var streamErr error
+
+Loop:
+ for scanner.Scan() {
+ line := scanner.Bytes()
+ var msg map[string]interface{}
+ if err := json.Unmarshal(line, &msg); err != nil {
+ continue
+ }
+
+ msgType, _ := msg["type"].(string)
+ switch msgType {
+ case "system":
+ if subtype, ok := msg["subtype"].(string); ok && subtype == "init" {
+ if sid, ok := msg["session_id"].(string); ok {
+ sessionID = sid
+ }
+ }
+ case "rate_limit_event":
+ if info, ok := msg["rate_limit_info"].(map[string]interface{}); ok {
+ status, _ := info["status"].(string)
+ if status == "rejected" {
+ streamErr = fmt.Errorf("claude rate limit reached (rejected): %v", msg)
+ // Immediately break since we can't continue anyway
+ break Loop
+ }
+ }
+ case "assistant":
+ if errStr, ok := msg["error"].(string); ok && errStr == "rate_limit" {
+ streamErr = fmt.Errorf("claude rate limit reached: %v", msg)
+ }
+ case "result":
+ if isErr, _ := msg["is_error"].(bool); isErr {
+ result, _ := msg["result"].(string)
+ if result != "" {
+ streamErr = fmt.Errorf("claude task failed: %s", result)
+ } else {
+ streamErr = fmt.Errorf("claude task failed (is_error=true in result)")
+ }
+ }
+ // Prefer total_cost_usd from result message; fall through to legacy check below.
+ if cost, ok := msg["total_cost_usd"].(float64); ok {
+ totalCost = cost
+ }
+ case "user":
+ // Detect permission-denial tool_results. These occur when permission_mode
+ // is not bypassPermissions and claude exits 0 without completing its task.
+ if err := permissionDenialError(msg); err != nil && streamErr == nil {
+ streamErr = err
+ }
+ }
+
+ // Legacy cost field used by older claude versions.
+ if cost, ok := msg["cost_usd"].(float64); ok {
+ totalCost = cost
+ }
+ }
+
+ return totalCost, sessionID, streamErr
+}
+
+
+// permissionDenialError inspects a "user" stream message for tool_result entries
+// that were denied due to missing permissions. Returns an error if found.
+func permissionDenialError(msg map[string]interface{}) error {
+ message, ok := msg["message"].(map[string]interface{})
+ if !ok {
+ return nil
+ }
+ content, ok := message["content"].([]interface{})
+ if !ok {
+ return nil
+ }
+ for _, item := range content {
+ itemMap, ok := item.(map[string]interface{})
+ if !ok {
+ continue
+ }
+ if itemMap["type"] != "tool_result" {
+ continue
+ }
+ if isErr, _ := itemMap["is_error"].(bool); !isErr {
+ continue
+ }
+ text, _ := itemMap["content"].(string)
+ if strings.Contains(text, "requested permissions") || strings.Contains(text, "haven't granted") {
+ return fmt.Errorf("permission denied by host: %s", text)
+ }
+ }
+ return nil
+}
+
+// tailFile returns the last n lines of the file at path, or empty string if
+// the file cannot be read. Used to surface subprocess stderr on failure.
+func tailFile(path string, n int) string {
+ f, err := os.Open(path)
+ if err != nil {
+ return ""
+ }
+ defer f.Close()
+
+ var lines []string
+ scanner := bufio.NewScanner(f)
+ for scanner.Scan() {
+ lines = append(lines, scanner.Text())
+ if len(lines) > n {
+ lines = lines[1:]
+ }
+ }
+ return strings.Join(lines, "\n")
+}
+
+func gitSafe(args ...string) []string {
+ return append([]string{"-c", "safe.directory=*"}, args...)
+}
+
+// isCompletionReport returns true when a question-file JSON looks like a
+// completion report rather than a real user question. Heuristic: no options
+// (or empty options) and no "?" anywhere in the text.
+func isCompletionReport(questionJSON string) bool {
+ var q struct {
+ Text string `json:"text"`
+ Options []string `json:"options"`
+ }
+ if err := json.Unmarshal([]byte(questionJSON), &q); err != nil {
+ return false
+ }
+ return len(q.Options) == 0 && !strings.Contains(q.Text, "?")
+}
+
+// extractQuestionText returns the "text" field from a question-file JSON, or
+// the raw string if parsing fails.
+func extractQuestionText(questionJSON string) string {
+ var q struct {
+ Text string `json:"text"`
+ }
+ if err := json.Unmarshal([]byte(questionJSON), &q); err != nil {
+ return questionJSON
+ }
+ return strings.TrimSpace(q.Text)
+}
diff --git a/internal/executor/stream_test.go b/internal/executor/stream_test.go
index 10eb858..11a6178 100644
--- a/internal/executor/stream_test.go
+++ b/internal/executor/stream_test.go
@@ -12,7 +12,7 @@ func streamLine(json string) string { return json + "\n" }
func TestParseStream_ResultIsError_ReturnsError(t *testing.T) {
input := streamLine(`{"type":"result","subtype":"error_during_execution","is_error":true,"result":"something went wrong"}`)
- _, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil)))
+ _, _, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil)))
if err == nil {
t.Fatal("expected error when result.is_error=true, got nil")
}
@@ -27,7 +27,7 @@ func TestParseStream_PermissionDenied_ReturnsError(t *testing.T) {
input := streamLine(`{"type":"user","message":{"role":"user","content":[{"type":"tool_result","is_error":true,"content":"Claude requested permissions to write to /foo/bar.go, but you haven't granted it yet.","tool_use_id":"tu_abc"}]}}`) +
streamLine(`{"type":"result","subtype":"success","is_error":false,"result":"I need permission","total_cost_usd":0.1}`)
- _, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil)))
+ _, _, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil)))
if err == nil {
t.Fatal("expected error for permission denial, got nil")
}
@@ -40,7 +40,7 @@ func TestParseStream_Success_ReturnsNilError(t *testing.T) {
input := streamLine(`{"type":"assistant","message":{"content":[{"type":"text","text":"Done."}]}}`) +
streamLine(`{"type":"result","subtype":"success","is_error":false,"result":"All tests pass.","total_cost_usd":0.05}`)
- _, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil)))
+ _, _, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil)))
if err != nil {
t.Fatalf("expected nil error for success stream, got: %v", err)
}
@@ -49,7 +49,7 @@ func TestParseStream_Success_ReturnsNilError(t *testing.T) {
func TestParseStream_ExtractsCostFromResultMessage(t *testing.T) {
input := streamLine(`{"type":"result","subtype":"success","is_error":false,"result":"done","total_cost_usd":1.2345}`)
- cost, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil)))
+ cost, _, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil)))
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@@ -62,7 +62,7 @@ func TestParseStream_ExtractsCostFromLegacyCostUSD(t *testing.T) {
// Some versions emit cost_usd at the top level rather than total_cost_usd.
input := streamLine(`{"type":"result","subtype":"success","is_error":false,"result":"done","cost_usd":0.99}`)
- cost, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil)))
+ cost, _, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil)))
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@@ -78,8 +78,21 @@ func TestParseStream_NonToolResultIsError_DoesNotFail(t *testing.T) {
input := streamLine(`{"type":"user","message":{"role":"user","content":[{"type":"tool_result","is_error":true,"content":"exit status 1","tool_use_id":"tu_xyz"}]}}`) +
streamLine(`{"type":"result","subtype":"success","is_error":false,"result":"Fixed it.","total_cost_usd":0.2}`)
- _, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil)))
+ _, _, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil)))
if err != nil {
t.Fatalf("non-permission tool errors should not fail the task, got: %v", err)
}
}
+
+func TestParseStream_ExtractsSessionID(t *testing.T) {
+ input := streamLine(`{"type":"system","subtype":"init","session_id":"sess-999"}`) +
+ streamLine(`{"type":"result","subtype":"success","is_error":false,"result":"ok","total_cost_usd":0.01}`)
+
+ _, sid, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil)))
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if sid != "sess-999" {
+ t.Errorf("want session ID sess-999, got %q", sid)
+ }
+}