package executor import ( "bufio" "context" "encoding/json" "fmt" "io" "log/slog" "os" "os/exec" "path/filepath" "strings" "sync" "syscall" "time" "github.com/thepeterstone/claudomator/internal/storage" "github.com/thepeterstone/claudomator/internal/task" ) // ClaudeRunner spawns the `claude` CLI in non-interactive mode. type ClaudeRunner struct { BinaryPath string // defaults to "claude" Logger *slog.Logger LogDir string // base directory for execution logs APIURL string // base URL of the Claudomator API, passed to subprocesses } // BlockedError is returned by Run when the agent wrote a question file and exited. // The pool transitions the task to BLOCKED and stores the question for the user. type BlockedError struct { QuestionJSON string // raw JSON from the question file SessionID string // claude session to resume once the user answers SandboxDir string // preserved sandbox path; resume must run here so Claude finds its session files } func (e *BlockedError) Error() string { return fmt.Sprintf("task blocked: %s", e.QuestionJSON) } // ExecLogDir returns the log directory for the given execution ID. // Implements LogPather so the pool can persist paths before execution starts. func (r *ClaudeRunner) ExecLogDir(execID string) string { if r.LogDir == "" { return "" } return filepath.Join(r.LogDir, execID) } func (r *ClaudeRunner) binaryPath() string { if r.BinaryPath != "" { return r.BinaryPath } return "claude" } // Run executes a claude -p invocation, streaming output to log files. // It retries up to 3 times on rate-limit errors using exponential backoff. // If the agent writes a question file and exits, Run returns *BlockedError. // // When project_dir is set and this is not a resume execution, Run clones the // project into a temp sandbox, runs the agent there, then merges committed // changes back to project_dir. On failure the sandbox is preserved and its // path is included in the error. func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error { projectDir := t.Agent.ProjectDir // Validate project_dir exists when set. if projectDir != "" { if _, err := os.Stat(projectDir); err != nil { return fmt.Errorf("project_dir %q: %w", projectDir, err) } } // Setup log directory once; retries overwrite the log files. logDir := r.ExecLogDir(e.ID) if logDir == "" { logDir = e.ID // fallback for tests without LogDir set } if err := os.MkdirAll(logDir, 0700); err != nil { return fmt.Errorf("creating log dir: %w", err) } if e.StdoutPath == "" { e.StdoutPath = filepath.Join(logDir, "stdout.log") e.StderrPath = filepath.Join(logDir, "stderr.log") e.ArtifactDir = logDir } // Pre-assign session ID so we can resume after a BLOCKED state. // For resume executions, the claude session continues under the original // session ID (the one passed to --resume). Using the new exec's own UUID // would cause a second block-and-resume cycle to pass the wrong --resume // argument. if e.SessionID == "" { if e.ResumeSessionID != "" { e.SessionID = e.ResumeSessionID } else { e.SessionID = e.ID // reuse execution UUID as session UUID (both are UUIDs) } } // For new (non-resume) executions with a project_dir, clone into a sandbox. // Resume executions run in the preserved sandbox (e.SandboxDir) so Claude // finds its session files under the same project slug. If no sandbox was // preserved (e.g. task had no project_dir), fall back to project_dir. var sandboxDir string var startHEAD string effectiveWorkingDir := projectDir if e.ResumeSessionID != "" { if e.SandboxDir != "" { if _, statErr := os.Stat(e.SandboxDir); statErr == nil { effectiveWorkingDir = e.SandboxDir } else { // Preserved sandbox was cleaned up (e.g. /tmp purge after reboot). // Clone a fresh sandbox so the task can run rather than fail immediately. r.Logger.Warn("preserved sandbox missing, cloning fresh", "sandbox", e.SandboxDir, "project_dir", projectDir) e.SandboxDir = "" if projectDir != "" { var err error sandboxDir, err = setupSandbox(projectDir) if err != nil { return fmt.Errorf("setting up sandbox: %w", err) } effectiveWorkingDir = sandboxDir r.Logger.Info("fresh sandbox created for resume", "sandbox", sandboxDir, "project_dir", projectDir) } } } } else if projectDir != "" { var err error sandboxDir, err = setupSandbox(projectDir) if err != nil { return fmt.Errorf("setting up sandbox: %w", err) } effectiveWorkingDir = sandboxDir r.Logger.Info("sandbox created", "sandbox", sandboxDir, "project_dir", projectDir) } if effectiveWorkingDir != "" { // Capture the initial HEAD so we can identify new commits later. headOut, _ := exec.Command("git", gitSafe("-C", effectiveWorkingDir, "rev-parse", "HEAD")...).Output() startHEAD = strings.TrimSpace(string(headOut)) } questionFile := filepath.Join(logDir, "question.json") args := r.buildArgs(t, e, questionFile) attempt := 0 err := runWithBackoff(ctx, 3, 5*time.Second, func() error { if attempt > 0 { delay := 5 * time.Second * (1 << (attempt - 1)) r.Logger.Warn("rate-limited by Claude API, retrying", "attempt", attempt, "delay", delay, ) } attempt++ return r.execOnce(ctx, args, effectiveWorkingDir, projectDir, e) }) if err != nil { if sandboxDir != "" { return fmt.Errorf("%w (sandbox preserved at %s)", err, sandboxDir) } return err } // Check whether the agent left a question before exiting. data, readErr := os.ReadFile(questionFile) if readErr == nil { os.Remove(questionFile) // consumed questionJSON := strings.TrimSpace(string(data)) // If the agent wrote a completion report instead of a real question, // extract the text as the summary and fall through to normal completion. if isCompletionReport(questionJSON) { r.Logger.Info("treating question file as completion report", "taskID", e.TaskID) e.Summary = extractQuestionText(questionJSON) } else { // Preserve sandbox on BLOCKED — agent may have partial work and its // Claude session files are stored under the sandbox's project slug. // The resume execution must run in the same directory. return &BlockedError{QuestionJSON: questionJSON, SessionID: e.SessionID, SandboxDir: sandboxDir} } } // Read agent summary if written. summaryFile := filepath.Join(logDir, "summary.txt") if summaryData, readErr := os.ReadFile(summaryFile); readErr == nil { os.Remove(summaryFile) // consumed e.Summary = strings.TrimSpace(string(summaryData)) } // Merge sandbox back to project_dir and clean up. if sandboxDir != "" { if mergeErr := teardownSandbox(projectDir, sandboxDir, startHEAD, r.Logger, e); mergeErr != nil { return fmt.Errorf("sandbox teardown: %w (sandbox preserved at %s)", mergeErr, sandboxDir) } } return nil } // isCompletionReport returns true when a question-file JSON looks like a // completion report rather than a real user question. Heuristic: no options // (or empty options) and no "?" anywhere in the text. func isCompletionReport(questionJSON string) bool { var q struct { Text string `json:"text"` Options []string `json:"options"` } if err := json.Unmarshal([]byte(questionJSON), &q); err != nil { return false } return len(q.Options) == 0 && !strings.Contains(q.Text, "?") } // extractQuestionText returns the "text" field from a question-file JSON, or // the raw string if parsing fails. func extractQuestionText(questionJSON string) string { var q struct { Text string `json:"text"` } if err := json.Unmarshal([]byte(questionJSON), &q); err != nil { return questionJSON } return strings.TrimSpace(q.Text) } // gitSafe returns git arguments that prepend "-c safe.directory=*" so that // commands succeed regardless of the repository owner. This is needed when // claudomator operates on project directories owned by a different OS user. func gitSafe(args ...string) []string { return append([]string{"-c", "safe.directory=*"}, args...) } // sandboxCloneSource returns the URL to clone the sandbox from. It prefers a // remote named "local" (a local bare repo that accepts pushes cleanly), then // falls back to "origin", then to the working copy path itself. func sandboxCloneSource(projectDir string) string { for _, remote := range []string{"local", "origin"} { out, err := exec.Command("git", gitSafe("-C", projectDir, "remote", "get-url", remote)...).Output() if err == nil && len(strings.TrimSpace(string(out))) > 0 { return strings.TrimSpace(string(out)) } } return projectDir } // setupSandbox prepares a temporary git clone of projectDir. // If projectDir is not a git repo it is initialised with an initial commit first. func setupSandbox(projectDir string) (string, error) { // Ensure projectDir is a git repo; initialise if not. if err := exec.Command("git", gitSafe("-C", projectDir, "rev-parse", "--git-dir")...).Run(); err != nil { cmds := [][]string{ gitSafe("-C", projectDir, "init"), gitSafe("-C", projectDir, "add", "-A"), gitSafe("-C", projectDir, "commit", "--allow-empty", "-m", "chore: initial commit"), } for _, args := range cmds { if out, err := exec.Command("git", args...).CombinedOutput(); err != nil { //nolint:gosec return "", fmt.Errorf("git init %s: %w\n%s", projectDir, err, out) } } } src := sandboxCloneSource(projectDir) tempDir, err := os.MkdirTemp("", "claudomator-sandbox-*") if err != nil { return "", fmt.Errorf("creating sandbox dir: %w", err) } // git clone requires the target to not exist; remove the placeholder first. if err := os.Remove(tempDir); err != nil { return "", fmt.Errorf("removing temp dir placeholder: %w", err) } out, err := exec.Command("git", gitSafe("clone", "--no-hardlinks", src, tempDir)...).CombinedOutput() if err != nil { return "", fmt.Errorf("git clone: %w\n%s", err, out) } return tempDir, nil } // teardownSandbox verifies the sandbox is clean and pushes new commits to the // canonical bare repo. If the push is rejected because another task pushed // concurrently, it fetches and rebases then retries once. // // The working copy (projectDir) is NOT updated automatically — it is the // developer's workspace and is pulled manually. This avoids permission errors // from mixed-owner .git/objects directories. func teardownSandbox(projectDir, sandboxDir, startHEAD string, logger *slog.Logger, execRecord *storage.Execution) error { // Automatically commit uncommitted changes. out, err := exec.Command("git", "-C", sandboxDir, "status", "--porcelain").Output() if err != nil { return fmt.Errorf("git status: %w", err) } if len(strings.TrimSpace(string(out))) > 0 { logger.Info("autocommitting uncommitted changes", "sandbox", sandboxDir) cmds := [][]string{ gitSafe("-C", sandboxDir, "add", "-A"), gitSafe("-C", sandboxDir, "commit", "-m", "chore: autocommit uncommitted changes"), } for _, args := range cmds { if out, err := exec.Command("git", args...).CombinedOutput(); err != nil { return fmt.Errorf("autocommit failed (%v): %w\n%s", args, err, out) } } } // Capture commits before pushing/deleting. // Use startHEAD..HEAD to find all commits made during this execution. logRange := "origin/HEAD..HEAD" if startHEAD != "" && startHEAD != "HEAD" { logRange = startHEAD + "..HEAD" } logCmd := exec.Command("git", gitSafe("-C", sandboxDir, "log", logRange, "--pretty=format:%H|%s")...) logOut, logErr := logCmd.CombinedOutput() if logErr == nil { lines := strings.Split(strings.TrimSpace(string(logOut)), "\n") logger.Debug("captured commits", "count", len(lines), "range", logRange) for _, line := range lines { if line == "" { continue } parts := strings.SplitN(line, "|", 2) if len(parts) == 2 { execRecord.Commits = append(execRecord.Commits, task.GitCommit{ Hash: parts[0], Message: parts[1], }) } } } else { logger.Warn("failed to capture commits", "err", logErr, "range", logRange, "output", string(logOut)) } // Check whether there are any new commits to push. ahead, err := exec.Command("git", gitSafe("-C", sandboxDir, "rev-list", "--count", logRange)...).Output() if err != nil { logger.Warn("could not determine commits ahead of origin; proceeding", "err", err, "range", logRange) } if strings.TrimSpace(string(ahead)) == "0" { os.RemoveAll(sandboxDir) return nil } // Push from sandbox → bare repo (sandbox's origin is the bare repo). if out, err := exec.Command("git", "-C", sandboxDir, "push", "origin", "HEAD").CombinedOutput(); err != nil { // If rejected due to concurrent push, fetch+rebase and retry once. if strings.Contains(string(out), "fetch first") || strings.Contains(string(out), "non-fast-forward") { logger.Info("push rejected (concurrent task); rebasing and retrying", "sandbox", sandboxDir) if out2, err2 := exec.Command("git", "-C", sandboxDir, "pull", "--rebase", "origin", "master").CombinedOutput(); err2 != nil { return fmt.Errorf("git rebase before retry push: %w\n%s", err2, out2) } // Re-capture commits after rebase (hashes might have changed) execRecord.Commits = nil logOut, logErr = exec.Command("git", "-C", sandboxDir, "log", logRange, "--pretty=format:%H|%s").Output() if logErr == nil { lines := strings.Split(strings.TrimSpace(string(logOut)), "\n") for _, line := range lines { parts := strings.SplitN(line, "|", 2) if len(parts) == 2 { execRecord.Commits = append(execRecord.Commits, task.GitCommit{ Hash: parts[0], Message: parts[1], }) } } } if out3, err3 := exec.Command("git", "-C", sandboxDir, "push", "origin", "HEAD").CombinedOutput(); err3 != nil { return fmt.Errorf("git push to origin (after rebase): %w\n%s", err3, out3) } } else { return fmt.Errorf("git push to origin: %w\n%s", err, out) } } logger.Info("sandbox pushed to bare repo", "sandbox", sandboxDir) os.RemoveAll(sandboxDir) return nil } // execOnce runs the claude subprocess once, streaming output to e's log paths. func (r *ClaudeRunner) execOnce(ctx context.Context, args []string, workingDir, projectDir string, e *storage.Execution) error { cmd := exec.CommandContext(ctx, r.binaryPath(), args...) cmd.Env = append(os.Environ(), "CLAUDOMATOR_API_URL="+r.APIURL, "CLAUDOMATOR_TASK_ID="+e.TaskID, "CLAUDOMATOR_PROJECT_DIR="+projectDir, "CLAUDOMATOR_QUESTION_FILE="+filepath.Join(e.ArtifactDir, "question.json"), "CLAUDOMATOR_SUMMARY_FILE="+filepath.Join(e.ArtifactDir, "summary.txt"), ) // Put the subprocess in its own process group so we can SIGKILL the entire // group (MCP servers, bash children, etc.) on cancellation. cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} if workingDir != "" { cmd.Dir = workingDir } stdoutFile, err := os.Create(e.StdoutPath) if err != nil { return fmt.Errorf("creating stdout log: %w", err) } defer stdoutFile.Close() stderrFile, err := os.Create(e.StderrPath) if err != nil { return fmt.Errorf("creating stderr log: %w", err) } defer stderrFile.Close() // Use os.Pipe for stdout so we own the read-end lifetime. // cmd.StdoutPipe() would add the read-end to closeAfterWait, causing // cmd.Wait() to close it before our goroutine finishes reading. stdoutR, stdoutW, err := os.Pipe() if err != nil { return fmt.Errorf("creating stdout pipe: %w", err) } cmd.Stdout = stdoutW // *os.File — not added to closeAfterStart/Wait cmd.Stderr = stderrFile if err := cmd.Start(); err != nil { stdoutW.Close() stdoutR.Close() return fmt.Errorf("starting claude: %w", err) } // Close our write-end immediately; the subprocess holds its own copy. // The goroutine below gets EOF when the subprocess exits. stdoutW.Close() // killDone is closed when cmd.Wait() returns, stopping the pgid-kill goroutine. // // Safety: this goroutine cannot block indefinitely. The select has two arms: // • ctx.Done() — fires if the caller cancels (e.g. timeout, user cancel). // The goroutine sends SIGKILL and exits immediately. // • killDone — closed by close(killDone) below, immediately after cmd.Wait() // returns. This fires when the process exits for any reason (natural exit, // SIGKILL from the ctx arm, or any other signal). The goroutine exits without // doing anything. // // Therefore: for a task that completes normally with a long-lived (non-cancelled) // context, the killDone arm fires and the goroutine exits. There is no path where // this goroutine outlives execOnce(). killDone := make(chan struct{}) go func() { select { case <-ctx.Done(): // SIGKILL the entire process group to reap orphan children. syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) case <-killDone: } }() // Stream stdout to the log file and parse cost/errors. // wg ensures costUSD and streamErr are fully written before we read them after cmd.Wait(). var costUSD float64 var streamErr error var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() costUSD, streamErr = parseStream(stdoutR, stdoutFile, r.Logger) stdoutR.Close() }() waitErr := cmd.Wait() close(killDone) // stop the pgid-kill goroutine wg.Wait() // drain remaining stdout before reading costUSD/streamErr e.CostUSD = costUSD if waitErr != nil { if exitErr, ok := waitErr.(*exec.ExitError); ok { e.ExitCode = exitErr.ExitCode() } // If the stream captured a rate-limit or quota message, return it // so callers can distinguish it from a generic exit-status failure. if isRateLimitError(streamErr) || isQuotaExhausted(streamErr) { return streamErr } if tail := tailFile(e.StderrPath, 20); tail != "" { return fmt.Errorf("claude exited with error: %w\nstderr:\n%s", waitErr, tail) } return fmt.Errorf("claude exited with error: %w", waitErr) } e.ExitCode = 0 if streamErr != nil { return streamErr } return nil } func (r *ClaudeRunner) buildArgs(t *task.Task, e *storage.Execution, questionFile string) []string { // Resume execution: the agent already has context; just deliver the answer. if e.ResumeSessionID != "" { args := []string{ "-p", e.ResumeAnswer, "--resume", e.ResumeSessionID, "--output-format", "stream-json", "--verbose", } permMode := t.Agent.PermissionMode if permMode == "" { permMode = "bypassPermissions" } args = append(args, "--permission-mode", permMode) if t.Agent.Model != "" { args = append(args, "--model", t.Agent.Model) } return args } instructions := t.Agent.Instructions allowedTools := t.Agent.AllowedTools if !t.Agent.SkipPlanning { instructions = withPlanningPreamble(instructions) // Ensure Bash is available so the agent can POST subtasks and ask questions. hasBash := false for _, tool := range allowedTools { if tool == "Bash" { hasBash = true break } } if !hasBash { allowedTools = append(allowedTools, "Bash") } } args := []string{ "-p", instructions, "--session-id", e.SessionID, "--output-format", "stream-json", "--verbose", } if t.Agent.Model != "" { args = append(args, "--model", t.Agent.Model) } if t.Agent.MaxBudgetUSD > 0 { args = append(args, "--max-budget-usd", fmt.Sprintf("%.2f", t.Agent.MaxBudgetUSD)) } // Default to bypassPermissions — claudomator runs tasks unattended, so // prompting for write access would always stall execution. Tasks that need // a more restrictive mode can set permission_mode explicitly. permMode := t.Agent.PermissionMode if permMode == "" { permMode = "bypassPermissions" } args = append(args, "--permission-mode", permMode) if t.Agent.SystemPromptAppend != "" { args = append(args, "--append-system-prompt", t.Agent.SystemPromptAppend) } for _, tool := range allowedTools { args = append(args, "--allowedTools", tool) } for _, tool := range t.Agent.DisallowedTools { args = append(args, "--disallowedTools", tool) } for _, f := range t.Agent.ContextFiles { args = append(args, "--add-dir", f) } args = append(args, t.Agent.AdditionalArgs...) return args } // parseStream reads streaming JSON from claude, writes to w, and returns // (costUSD, error). error is non-nil if the stream signals task failure: // - result message has is_error:true // - a tool_result was denied due to missing permissions func parseStream(r io.Reader, w io.Writer, logger *slog.Logger) (float64, error) { tee := io.TeeReader(r, w) scanner := bufio.NewScanner(tee) scanner.Buffer(make([]byte, 1024*1024), 1024*1024) // 1MB buffer for large lines var totalCost float64 var streamErr error for scanner.Scan() { line := scanner.Bytes() var msg map[string]interface{} if err := json.Unmarshal(line, &msg); err != nil { continue } msgType, _ := msg["type"].(string) switch msgType { case "rate_limit_event": if info, ok := msg["rate_limit_info"].(map[string]interface{}); ok { status, _ := info["status"].(string) if status == "rejected" { streamErr = fmt.Errorf("claude rate limit reached (rejected): %v", msg) // Immediately break since we can't continue anyway break } } case "assistant": if errStr, ok := msg["error"].(string); ok && errStr == "rate_limit" { streamErr = fmt.Errorf("claude rate limit reached: %v", msg) } case "result": if isErr, _ := msg["is_error"].(bool); isErr { result, _ := msg["result"].(string) if result != "" { streamErr = fmt.Errorf("claude task failed: %s", result) } else { streamErr = fmt.Errorf("claude task failed (is_error=true in result)") } } // Prefer total_cost_usd from result message; fall through to legacy check below. if cost, ok := msg["total_cost_usd"].(float64); ok { totalCost = cost } case "user": // Detect permission-denial tool_results. These occur when permission_mode // is not bypassPermissions and claude exits 0 without completing its task. if err := permissionDenialError(msg); err != nil && streamErr == nil { streamErr = err } } // Legacy cost field used by older claude versions. if cost, ok := msg["cost_usd"].(float64); ok { totalCost = cost } } return totalCost, streamErr } // permissionDenialError inspects a "user" stream message for tool_result entries // that were denied due to missing permissions. Returns an error if found. func permissionDenialError(msg map[string]interface{}) error { message, ok := msg["message"].(map[string]interface{}) if !ok { return nil } content, ok := message["content"].([]interface{}) if !ok { return nil } for _, item := range content { itemMap, ok := item.(map[string]interface{}) if !ok { continue } if itemMap["type"] != "tool_result" { continue } if isErr, _ := itemMap["is_error"].(bool); !isErr { continue } text, _ := itemMap["content"].(string) if strings.Contains(text, "requested permissions") || strings.Contains(text, "haven't granted") { return fmt.Errorf("permission denied by host: %s", text) } } return nil } // tailFile returns the last n lines of the file at path, or empty string if // the file cannot be read. Used to surface subprocess stderr on failure. func tailFile(path string, n int) string { f, err := os.Open(path) if err != nil { return "" } defer f.Close() var lines []string scanner := bufio.NewScanner(f) for scanner.Scan() { lines = append(lines, scanner.Text()) if len(lines) > n { lines = lines[1:] } } return strings.Join(lines, "\n") }