package executor import ( "bufio" "context" "encoding/json" "fmt" "io" "log/slog" "os" "os/exec" "path/filepath" "sync" "syscall" "time" "github.com/thepeterstone/claudomator/internal/storage" "github.com/thepeterstone/claudomator/internal/task" ) // ClaudeRunner spawns the `claude` CLI in non-interactive mode. type ClaudeRunner struct { BinaryPath string // defaults to "claude" Logger *slog.Logger LogDir string // base directory for execution logs APIURL string // base URL of the Claudomator API, passed to subprocesses } func (r *ClaudeRunner) binaryPath() string { if r.BinaryPath != "" { return r.BinaryPath } return "claude" } // Run executes a claude -p invocation, streaming output to log files. // It retries up to 3 times on rate-limit errors using exponential backoff. func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error { args := r.buildArgs(t) if t.Claude.WorkingDir != "" { if _, err := os.Stat(t.Claude.WorkingDir); err != nil { return fmt.Errorf("working_dir %q: %w", t.Claude.WorkingDir, err) } } // Setup log directory once; retries overwrite the log files. logDir := filepath.Join(r.LogDir, e.ID) if err := os.MkdirAll(logDir, 0700); err != nil { return fmt.Errorf("creating log dir: %w", err) } e.StdoutPath = filepath.Join(logDir, "stdout.log") e.StderrPath = filepath.Join(logDir, "stderr.log") e.ArtifactDir = logDir attempt := 0 return runWithBackoff(ctx, 3, 5*time.Second, func() error { if attempt > 0 { delay := 5 * time.Second * (1 << (attempt - 1)) r.Logger.Warn("rate-limited by Claude API, retrying", "attempt", attempt, "delay", delay, ) } attempt++ return r.execOnce(ctx, t, args, e) }) } // execOnce runs the claude subprocess once, streaming output to e's log paths. func (r *ClaudeRunner) execOnce(ctx context.Context, t *task.Task, args []string, e *storage.Execution) error { cmd := exec.CommandContext(ctx, r.binaryPath(), args...) cmd.Env = append(os.Environ(), "CLAUDOMATOR_API_URL="+r.APIURL, "CLAUDOMATOR_TASK_ID="+t.ID, ) // Put the subprocess in its own process group so we can SIGKILL the entire // group (MCP servers, bash children, etc.) on cancellation. cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} if t.Claude.WorkingDir != "" { cmd.Dir = t.Claude.WorkingDir } stdoutFile, err := os.Create(e.StdoutPath) if err != nil { return fmt.Errorf("creating stdout log: %w", err) } defer stdoutFile.Close() stderrFile, err := os.Create(e.StderrPath) if err != nil { return fmt.Errorf("creating stderr log: %w", err) } defer stderrFile.Close() // Use os.Pipe for stdout so we own the read-end lifetime. // cmd.StdoutPipe() would add the read-end to closeAfterWait, causing // cmd.Wait() to close it before our goroutine finishes reading. stdoutR, stdoutW, err := os.Pipe() if err != nil { return fmt.Errorf("creating stdout pipe: %w", err) } cmd.Stdout = stdoutW // *os.File — not added to closeAfterStart/Wait cmd.Stderr = stderrFile if err := cmd.Start(); err != nil { stdoutW.Close() stdoutR.Close() return fmt.Errorf("starting claude: %w", err) } // Close our write-end immediately; the subprocess holds its own copy. // The goroutine below gets EOF when the subprocess exits. stdoutW.Close() // killDone is closed when cmd.Wait() returns, stopping the pgid-kill goroutine. killDone := make(chan struct{}) go func() { select { case <-ctx.Done(): // SIGKILL the entire process group to reap orphan children. syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) case <-killDone: } }() // Stream stdout to the log file and parse cost. // wg ensures costUSD is fully written before we read it after cmd.Wait(). var costUSD float64 var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() costUSD = streamAndParseCost(stdoutR, stdoutFile, r.Logger) stdoutR.Close() }() waitErr := cmd.Wait() close(killDone) // stop the pgid-kill goroutine wg.Wait() // drain remaining stdout before reading costUSD if waitErr != nil { if exitErr, ok := waitErr.(*exec.ExitError); ok { e.ExitCode = exitErr.ExitCode() } e.CostUSD = costUSD return fmt.Errorf("claude exited with error: %w", waitErr) } e.ExitCode = 0 e.CostUSD = costUSD return nil } func (r *ClaudeRunner) buildArgs(t *task.Task) []string { instructions := t.Claude.Instructions allowedTools := t.Claude.AllowedTools if !t.Claude.SkipPlanning { instructions = withPlanningPreamble(instructions) // Ensure Bash is available so the agent can POST subtasks. hasBash := false for _, tool := range allowedTools { if tool == "Bash" { hasBash = true break } } if !hasBash { allowedTools = append(allowedTools, "Bash") } } args := []string{ "-p", instructions, "--output-format", "stream-json", "--verbose", } if t.Claude.Model != "" { args = append(args, "--model", t.Claude.Model) } if t.Claude.MaxBudgetUSD > 0 { args = append(args, "--max-budget-usd", fmt.Sprintf("%.2f", t.Claude.MaxBudgetUSD)) } if t.Claude.PermissionMode != "" { args = append(args, "--permission-mode", t.Claude.PermissionMode) } if t.Claude.SystemPromptAppend != "" { args = append(args, "--append-system-prompt", t.Claude.SystemPromptAppend) } for _, tool := range allowedTools { args = append(args, "--allowedTools", tool) } for _, tool := range t.Claude.DisallowedTools { args = append(args, "--disallowedTools", tool) } for _, f := range t.Claude.ContextFiles { args = append(args, "--add-dir", f) } args = append(args, t.Claude.AdditionalArgs...) return args } // streamAndParseCost reads streaming JSON from claude and writes to the log file, // extracting cost data from the stream. func streamAndParseCost(r io.Reader, w io.Writer, logger *slog.Logger) float64 { tee := io.TeeReader(r, w) scanner := bufio.NewScanner(tee) scanner.Buffer(make([]byte, 1024*1024), 1024*1024) // 1MB buffer for large lines var totalCost float64 for scanner.Scan() { line := scanner.Bytes() var msg map[string]interface{} if err := json.Unmarshal(line, &msg); err != nil { continue } // Extract cost from result messages. if costData, ok := msg["cost_usd"]; ok { if cost, ok := costData.(float64); ok { totalCost = cost } } } return totalCost }