package executor import ( "bufio" "context" "encoding/json" "fmt" "io" "log/slog" "os" "os/exec" "path/filepath" "strings" "sync" "syscall" "time" "github.com/thepeterstone/claudomator/internal/storage" "github.com/thepeterstone/claudomator/internal/task" ) // ClaudeRunner spawns the `claude` CLI in non-interactive mode. type ClaudeRunner struct { BinaryPath string // defaults to "claude" Logger *slog.Logger LogDir string // base directory for execution logs APIURL string // base URL of the Claudomator API, passed to subprocesses } // BlockedError is returned by Run when the agent wrote a question file and exited. // The pool transitions the task to BLOCKED and stores the question for the user. type BlockedError struct { QuestionJSON string // raw JSON from the question file SessionID string // claude session to resume once the user answers } func (e *BlockedError) Error() string { return fmt.Sprintf("task blocked: %s", e.QuestionJSON) } // ExecLogDir returns the log directory for the given execution ID. // Implements LogPather so the pool can persist paths before execution starts. func (r *ClaudeRunner) ExecLogDir(execID string) string { if r.LogDir == "" { return "" } return filepath.Join(r.LogDir, execID) } func (r *ClaudeRunner) binaryPath() string { if r.BinaryPath != "" { return r.BinaryPath } return "claude" } // Run executes a claude -p invocation, streaming output to log files. // It retries up to 3 times on rate-limit errors using exponential backoff. // If the agent writes a question file and exits, Run returns *BlockedError. // // When project_dir is set and this is not a resume execution, Run clones the // project into a temp sandbox, runs the agent there, then merges committed // changes back to project_dir. On failure the sandbox is preserved and its // path is included in the error. func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error { projectDir := t.Agent.ProjectDir // Validate project_dir exists when set. if projectDir != "" { if _, err := os.Stat(projectDir); err != nil { return fmt.Errorf("project_dir %q: %w", projectDir, err) } } // Setup log directory once; retries overwrite the log files. logDir := r.ExecLogDir(e.ID) if logDir == "" { logDir = e.ID // fallback for tests without LogDir set } if err := os.MkdirAll(logDir, 0700); err != nil { return fmt.Errorf("creating log dir: %w", err) } if e.StdoutPath == "" { e.StdoutPath = filepath.Join(logDir, "stdout.log") e.StderrPath = filepath.Join(logDir, "stderr.log") e.ArtifactDir = logDir } // Pre-assign session ID so we can resume after a BLOCKED state. // If this is a resume execution the session ID is already set. if e.SessionID == "" { e.SessionID = e.ID // reuse execution UUID as session UUID (both are UUIDs) } // For new (non-resume) executions with a project_dir, clone into a sandbox. // Resume executions run directly in project_dir to pick up the previous session. var sandboxDir string effectiveWorkingDir := projectDir if projectDir != "" && e.ResumeSessionID == "" { var err error sandboxDir, err = setupSandbox(projectDir) if err != nil { return fmt.Errorf("setting up sandbox: %w", err) } effectiveWorkingDir = sandboxDir r.Logger.Info("sandbox created", "sandbox", sandboxDir, "project_dir", projectDir) } questionFile := filepath.Join(logDir, "question.json") args := r.buildArgs(t, e, questionFile) attempt := 0 err := runWithBackoff(ctx, 3, 5*time.Second, func() error { if attempt > 0 { delay := 5 * time.Second * (1 << (attempt - 1)) r.Logger.Warn("rate-limited by Claude API, retrying", "attempt", attempt, "delay", delay, ) } attempt++ return r.execOnce(ctx, args, effectiveWorkingDir, e) }) if err != nil { if sandboxDir != "" { return fmt.Errorf("%w (sandbox preserved at %s)", err, sandboxDir) } return err } // Check whether the agent left a question before exiting. data, readErr := os.ReadFile(questionFile) if readErr == nil { os.Remove(questionFile) // consumed // Preserve sandbox on BLOCKED — agent may have partial work. return &BlockedError{QuestionJSON: strings.TrimSpace(string(data)), SessionID: e.SessionID} } // Merge sandbox back to project_dir and clean up. if sandboxDir != "" { if mergeErr := teardownSandbox(projectDir, sandboxDir, r.Logger); mergeErr != nil { return fmt.Errorf("sandbox teardown: %w (sandbox preserved at %s)", mergeErr, sandboxDir) } } return nil } // setupSandbox prepares a temporary git clone of projectDir. // If projectDir is not a git repo it is initialised with an initial commit first. func setupSandbox(projectDir string) (string, error) { // Ensure projectDir is a git repo; initialise if not. check := exec.Command("git", "-C", projectDir, "rev-parse", "--git-dir") if err := check.Run(); err != nil { // Not a git repo — init and commit everything. cmds := [][]string{ {"git", "-C", projectDir, "init"}, {"git", "-C", projectDir, "add", "-A"}, {"git", "-C", projectDir, "commit", "--allow-empty", "-m", "chore: initial commit"}, } for _, args := range cmds { if out, err := exec.Command(args[0], args[1:]...).CombinedOutput(); err != nil { //nolint:gosec return "", fmt.Errorf("git init %s: %w\n%s", projectDir, err, out) } } } tempDir, err := os.MkdirTemp("", "claudomator-sandbox-*") if err != nil { return "", fmt.Errorf("creating sandbox dir: %w", err) } // Clone into the pre-created dir (git clone requires the target to not exist, // so remove it first and let git recreate it). if err := os.Remove(tempDir); err != nil { return "", fmt.Errorf("removing temp dir placeholder: %w", err) } out, err := exec.Command("git", "clone", "--no-hardlinks", projectDir, tempDir).CombinedOutput() if err != nil { return "", fmt.Errorf("git clone: %w\n%s", err, out) } return tempDir, nil } // teardownSandbox verifies the sandbox is clean, merges commits back to // projectDir via fast-forward, then removes the sandbox. func teardownSandbox(projectDir, sandboxDir string, logger *slog.Logger) error { // Fail if agent left uncommitted changes. out, err := exec.Command("git", "-C", sandboxDir, "status", "--porcelain").Output() if err != nil { return fmt.Errorf("git status: %w", err) } if len(strings.TrimSpace(string(out))) > 0 { return fmt.Errorf("uncommitted changes in sandbox (agent must commit all work):\n%s", out) } // Check whether there are any new commits to merge. ahead, err := exec.Command("git", "-C", sandboxDir, "rev-list", "--count", "origin/HEAD..HEAD").Output() if err != nil { // No origin/HEAD (e.g. fresh init with no prior commits) — proceed anyway. logger.Warn("could not determine commits ahead of origin; proceeding with merge", "err", err) } if strings.TrimSpace(string(ahead)) == "0" { // Nothing to merge — clean up and return. os.RemoveAll(sandboxDir) return nil } // Fetch new commits from sandbox into project_dir and fast-forward merge. // Use file:// prefix to force pack-protocol transfer instead of the local // optimization that hard-links objects — hard-linking fails across devices // and can fail with permission errors when the repo has mixed-owner objects. if out, err := exec.Command("git", "-C", projectDir, "fetch", "file://"+sandboxDir, "HEAD").CombinedOutput(); err != nil { return fmt.Errorf("git fetch from sandbox: %w\n%s", err, out) } if out, err := exec.Command("git", "-C", projectDir, "merge", "--ff-only", "FETCH_HEAD").CombinedOutput(); err != nil { return fmt.Errorf("git merge --ff-only FETCH_HEAD: %w\n%s", err, out) } logger.Info("sandbox merged and cleaned up", "sandbox", sandboxDir, "project_dir", projectDir) os.RemoveAll(sandboxDir) return nil } // execOnce runs the claude subprocess once, streaming output to e's log paths. func (r *ClaudeRunner) execOnce(ctx context.Context, args []string, workingDir string, e *storage.Execution) error { cmd := exec.CommandContext(ctx, r.binaryPath(), args...) cmd.Env = append(os.Environ(), "CLAUDOMATOR_API_URL="+r.APIURL, "CLAUDOMATOR_TASK_ID="+e.TaskID, "CLAUDOMATOR_QUESTION_FILE="+filepath.Join(e.ArtifactDir, "question.json"), ) // Put the subprocess in its own process group so we can SIGKILL the entire // group (MCP servers, bash children, etc.) on cancellation. cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} if workingDir != "" { cmd.Dir = workingDir } stdoutFile, err := os.Create(e.StdoutPath) if err != nil { return fmt.Errorf("creating stdout log: %w", err) } defer stdoutFile.Close() stderrFile, err := os.Create(e.StderrPath) if err != nil { return fmt.Errorf("creating stderr log: %w", err) } defer stderrFile.Close() // Use os.Pipe for stdout so we own the read-end lifetime. // cmd.StdoutPipe() would add the read-end to closeAfterWait, causing // cmd.Wait() to close it before our goroutine finishes reading. stdoutR, stdoutW, err := os.Pipe() if err != nil { return fmt.Errorf("creating stdout pipe: %w", err) } cmd.Stdout = stdoutW // *os.File — not added to closeAfterStart/Wait cmd.Stderr = stderrFile if err := cmd.Start(); err != nil { stdoutW.Close() stdoutR.Close() return fmt.Errorf("starting claude: %w", err) } // Close our write-end immediately; the subprocess holds its own copy. // The goroutine below gets EOF when the subprocess exits. stdoutW.Close() // killDone is closed when cmd.Wait() returns, stopping the pgid-kill goroutine. killDone := make(chan struct{}) go func() { select { case <-ctx.Done(): // SIGKILL the entire process group to reap orphan children. syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) case <-killDone: } }() // Stream stdout to the log file and parse cost/errors. // wg ensures costUSD and streamErr are fully written before we read them after cmd.Wait(). var costUSD float64 var streamErr error var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() costUSD, streamErr = parseStream(stdoutR, stdoutFile, r.Logger) stdoutR.Close() }() waitErr := cmd.Wait() close(killDone) // stop the pgid-kill goroutine wg.Wait() // drain remaining stdout before reading costUSD/streamErr e.CostUSD = costUSD if waitErr != nil { if exitErr, ok := waitErr.(*exec.ExitError); ok { e.ExitCode = exitErr.ExitCode() } // If the stream captured a rate-limit or quota message, return it // so callers can distinguish it from a generic exit-status failure. if isRateLimitError(streamErr) || isQuotaExhausted(streamErr) { return streamErr } return fmt.Errorf("claude exited with error: %w", waitErr) } e.ExitCode = 0 if streamErr != nil { return streamErr } return nil } func (r *ClaudeRunner) buildArgs(t *task.Task, e *storage.Execution, questionFile string) []string { // Resume execution: the agent already has context; just deliver the answer. if e.ResumeSessionID != "" { args := []string{ "-p", e.ResumeAnswer, "--resume", e.ResumeSessionID, "--output-format", "stream-json", "--verbose", } permMode := t.Agent.PermissionMode if permMode == "" { permMode = "bypassPermissions" } args = append(args, "--permission-mode", permMode) if t.Agent.Model != "" { args = append(args, "--model", t.Agent.Model) } return args } instructions := t.Agent.Instructions allowedTools := t.Agent.AllowedTools if !t.Agent.SkipPlanning { instructions = withPlanningPreamble(instructions) // Ensure Bash is available so the agent can POST subtasks and ask questions. hasBash := false for _, tool := range allowedTools { if tool == "Bash" { hasBash = true break } } if !hasBash { allowedTools = append(allowedTools, "Bash") } } args := []string{ "-p", instructions, "--session-id", e.SessionID, "--output-format", "stream-json", "--verbose", } if t.Agent.Model != "" { args = append(args, "--model", t.Agent.Model) } if t.Agent.MaxBudgetUSD > 0 { args = append(args, "--max-budget-usd", fmt.Sprintf("%.2f", t.Agent.MaxBudgetUSD)) } // Default to bypassPermissions — claudomator runs tasks unattended, so // prompting for write access would always stall execution. Tasks that need // a more restrictive mode can set permission_mode explicitly. permMode := t.Agent.PermissionMode if permMode == "" { permMode = "bypassPermissions" } args = append(args, "--permission-mode", permMode) if t.Agent.SystemPromptAppend != "" { args = append(args, "--append-system-prompt", t.Agent.SystemPromptAppend) } for _, tool := range allowedTools { args = append(args, "--allowedTools", tool) } for _, tool := range t.Agent.DisallowedTools { args = append(args, "--disallowedTools", tool) } for _, f := range t.Agent.ContextFiles { args = append(args, "--add-dir", f) } args = append(args, t.Agent.AdditionalArgs...) return args } // parseStream reads streaming JSON from claude, writes to w, and returns // (costUSD, error). error is non-nil if the stream signals task failure: // - result message has is_error:true // - a tool_result was denied due to missing permissions func parseStream(r io.Reader, w io.Writer, logger *slog.Logger) (float64, error) { tee := io.TeeReader(r, w) scanner := bufio.NewScanner(tee) scanner.Buffer(make([]byte, 1024*1024), 1024*1024) // 1MB buffer for large lines var totalCost float64 var streamErr error for scanner.Scan() { line := scanner.Bytes() var msg map[string]interface{} if err := json.Unmarshal(line, &msg); err != nil { continue } msgType, _ := msg["type"].(string) switch msgType { case "result": if isErr, _ := msg["is_error"].(bool); isErr { result, _ := msg["result"].(string) if result != "" { streamErr = fmt.Errorf("claude task failed: %s", result) } else { streamErr = fmt.Errorf("claude task failed (is_error=true in result)") } } // Prefer total_cost_usd from result message; fall through to legacy check below. if cost, ok := msg["total_cost_usd"].(float64); ok { totalCost = cost } case "user": // Detect permission-denial tool_results. These occur when permission_mode // is not bypassPermissions and claude exits 0 without completing its task. if err := permissionDenialError(msg); err != nil && streamErr == nil { streamErr = err } } // Legacy cost field used by older claude versions. if cost, ok := msg["cost_usd"].(float64); ok { totalCost = cost } } return totalCost, streamErr } // permissionDenialError inspects a "user" stream message for tool_result entries // that were denied due to missing permissions. Returns an error if found. func permissionDenialError(msg map[string]interface{}) error { message, ok := msg["message"].(map[string]interface{}) if !ok { return nil } content, ok := message["content"].([]interface{}) if !ok { return nil } for _, item := range content { itemMap, ok := item.(map[string]interface{}) if !ok { continue } if itemMap["type"] != "tool_result" { continue } if isErr, _ := itemMap["is_error"].(bool); !isErr { continue } text, _ := itemMap["content"].(string) if strings.Contains(text, "requested permissions") || strings.Contains(text, "haven't granted") { return fmt.Errorf("permission denied by host: %s", text) } } return nil }