summaryrefslogtreecommitdiff
path: root/internal/executor/claude.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/executor/claude.go')
-rw-r--r--internal/executor/claude.go552
1 files changed, 552 insertions, 0 deletions
diff --git a/internal/executor/claude.go b/internal/executor/claude.go
new file mode 100644
index 0000000..3c87f26
--- /dev/null
+++ b/internal/executor/claude.go
@@ -0,0 +1,552 @@
+package executor
+
+import (
+ "context"
+ "fmt"
+ "log/slog"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "strings"
+ "sync"
+ "syscall"
+ "time"
+
+ "github.com/thepeterstone/claudomator/internal/retry"
+ "github.com/thepeterstone/claudomator/internal/storage"
+ "github.com/thepeterstone/claudomator/internal/task"
+)
+
+// ClaudeRunner spawns the `claude` CLI in non-interactive mode.
+type ClaudeRunner struct {
+ BinaryPath string // defaults to "claude"
+ Logger *slog.Logger
+ LogDir string // base directory for execution logs
+ APIURL string // base URL of the Claudomator API, passed to subprocesses
+}
+
+// BlockedError is returned by Run when the agent wrote a question file and exited.
+// The pool transitions the task to BLOCKED and stores the question for the user.
+// ExecLogDir returns the log directory for the given execution ID.
+// Implements LogPather so the pool can persist paths before execution starts.
+func (r *ClaudeRunner) ExecLogDir(execID string) string {
+ if r.LogDir == "" {
+ return ""
+ }
+ return filepath.Join(r.LogDir, execID)
+}
+
+func (r *ClaudeRunner) binaryPath() string {
+ if r.BinaryPath != "" {
+ return r.BinaryPath
+ }
+ return "claude"
+}
+
+// Run executes a claude -p invocation, streaming output to log files.
+// It retries up to 3 times on rate-limit errors using exponential backoff.
+// If the agent writes a question file and exits, Run returns *BlockedError.
+//
+// When project_dir is set and this is not a resume execution, Run clones the
+// project into a temp sandbox, runs the agent there, then merges committed
+// changes back to project_dir. On failure the sandbox is preserved and its
+// path is included in the error.
+func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error {
+ projectDir := t.Agent.ProjectDir
+
+ // Validate project_dir exists when set.
+ if projectDir != "" {
+ if _, err := os.Stat(projectDir); err != nil {
+ return fmt.Errorf("project_dir %q: %w", projectDir, err)
+ }
+ }
+
+ // Setup log directory once; retries overwrite the log files.
+ logDir := r.ExecLogDir(e.ID)
+ if logDir == "" {
+ logDir = e.ID // fallback for tests without LogDir set
+ }
+ if err := os.MkdirAll(logDir, 0700); err != nil {
+ return fmt.Errorf("creating log dir: %w", err)
+ }
+ if e.StdoutPath == "" {
+ e.StdoutPath = filepath.Join(logDir, "stdout.log")
+ e.StderrPath = filepath.Join(logDir, "stderr.log")
+ e.ArtifactDir = logDir
+ }
+
+ // Pre-assign session ID so we can resume after a BLOCKED state.
+ // For resume executions, the claude session continues under the original
+ // session ID (the one passed to --resume). Using the new exec's own UUID
+ // would cause a second block-and-resume cycle to pass the wrong --resume
+ // argument.
+ if e.SessionID == "" {
+ if e.ResumeSessionID != "" {
+ e.SessionID = e.ResumeSessionID
+ } else {
+ e.SessionID = e.ID // reuse execution UUID as session UUID (both are UUIDs)
+ }
+ }
+
+ // For new (non-resume) executions with a project_dir, clone into a sandbox.
+ // Resume executions run in the preserved sandbox (e.SandboxDir) so Claude
+ // finds its session files under the same project slug. If no sandbox was
+ // preserved (e.g. task had no project_dir), fall back to project_dir.
+ var sandboxDir string
+ var startHEAD string
+ effectiveWorkingDir := projectDir
+ if e.ResumeSessionID != "" {
+ if e.SandboxDir != "" {
+ if _, statErr := os.Stat(e.SandboxDir); statErr == nil {
+ effectiveWorkingDir = e.SandboxDir
+ } else {
+ // Preserved sandbox was cleaned up (e.g. /tmp purge after reboot).
+ // Clone a fresh sandbox so the task can run rather than fail immediately.
+ r.Logger.Warn("preserved sandbox missing, cloning fresh", "sandbox", e.SandboxDir, "project_dir", projectDir)
+ e.SandboxDir = ""
+ if projectDir != "" {
+ var err error
+ sandboxDir, err = setupSandbox(t.Agent.ProjectDir, r.Logger)
+ if err != nil {
+ return fmt.Errorf("setting up sandbox: %w", err)
+ }
+
+ effectiveWorkingDir = sandboxDir
+ r.Logger.Info("fresh sandbox created for resume", "sandbox", sandboxDir, "project_dir", projectDir)
+ }
+ }
+ }
+ } else if projectDir != "" {
+ var err error
+ sandboxDir, err = setupSandbox(t.Agent.ProjectDir, r.Logger)
+ if err != nil {
+ return fmt.Errorf("setting up sandbox: %w", err)
+ }
+
+ effectiveWorkingDir = sandboxDir
+ r.Logger.Info("sandbox created", "sandbox", sandboxDir, "project_dir", projectDir)
+ }
+
+ if effectiveWorkingDir != "" {
+ // Capture the initial HEAD so we can identify new commits later.
+ headOut, _ := exec.Command("git", gitSafe("-C", effectiveWorkingDir, "rev-parse", "HEAD")...).Output()
+ startHEAD = strings.TrimSpace(string(headOut))
+ }
+
+ questionFile := filepath.Join(logDir, "question.json")
+ args := r.buildArgs(t, e, questionFile)
+
+ attempt := 0
+ err := retry.RunWithBackoff(ctx, 3, 5*time.Second, func() error {
+ if attempt > 0 {
+ delay := 5 * time.Second * (1 << (attempt - 1))
+ r.Logger.Warn("rate-limited by Claude API, retrying",
+ "attempt", attempt,
+ "delay", delay,
+ )
+ }
+ attempt++
+ return r.execOnce(ctx, args, effectiveWorkingDir, projectDir, e)
+ })
+ if err != nil {
+ if sandboxDir != "" {
+ return fmt.Errorf("%w (sandbox preserved at %s)", err, sandboxDir)
+ }
+ return err
+ }
+
+ // Check whether the agent left a question before exiting.
+ data, readErr := os.ReadFile(questionFile)
+ if readErr == nil {
+ os.Remove(questionFile) // consumed
+ questionJSON := strings.TrimSpace(string(data))
+ // If the agent wrote a completion report instead of a real question,
+ // extract the text as the summary and fall through to normal completion.
+ if isCompletionReport(questionJSON) {
+ r.Logger.Info("treating question file as completion report", "taskID", e.TaskID)
+ e.Summary = extractQuestionText(questionJSON)
+ } else {
+ // Preserve sandbox on BLOCKED — agent may have partial work and its
+ // Claude session files are stored under the sandbox's project slug.
+ // The resume execution must run in the same directory.
+ return &BlockedError{QuestionJSON: questionJSON, SessionID: e.SessionID, SandboxDir: sandboxDir}
+ }
+ }
+
+ // Read agent summary if written.
+ summaryFile := filepath.Join(logDir, "summary.txt")
+ if summaryData, readErr := os.ReadFile(summaryFile); readErr == nil {
+ os.Remove(summaryFile) // consumed
+ e.Summary = strings.TrimSpace(string(summaryData))
+ }
+
+ // Merge sandbox back to project_dir and clean up.
+ if sandboxDir != "" {
+ if mergeErr := teardownSandbox(projectDir, sandboxDir, startHEAD, r.Logger, e); mergeErr != nil {
+ return fmt.Errorf("sandbox teardown: %w (sandbox preserved at %s)", mergeErr, sandboxDir)
+ }
+ }
+ return nil
+}
+
+// sandboxCloneSource returns the URL to clone the sandbox from. It prefers a
+// remote named "local" (a local bare repo that accepts pushes cleanly), then
+// falls back to "origin", then to the working copy path itself.
+func sandboxCloneSource(projectDir string) string {
+ for _, remote := range []string{"local", "origin"} {
+ out, err := exec.Command("git", gitSafe("-C", projectDir, "remote", "get-url", remote)...).Output()
+ if err == nil {
+ u := strings.TrimSpace(string(out))
+ if u != "" && (strings.HasPrefix(u, "/") || strings.HasPrefix(u, "file://")) {
+ return u
+ }
+ }
+ }
+ return projectDir
+}
+
+// setupSandbox prepares a temporary git clone of projectDir.
+// If projectDir is not a git repo it is initialised with an initial commit first.
+func setupSandbox(projectDir string, logger *slog.Logger) (string, error) {
+ // Ensure projectDir is a git repo; initialise if not.
+ if err := exec.Command("git", gitSafe("-C", projectDir, "rev-parse", "--git-dir")...).Run(); err != nil {
+ cmds := [][]string{
+ gitSafe("-C", projectDir, "init"),
+ gitSafe("-C", projectDir, "add", "-A"),
+ gitSafe("-C", projectDir, "commit", "--allow-empty", "-m", "chore: initial commit"),
+ }
+ for _, args := range cmds {
+ if out, err := exec.Command("git", args...).CombinedOutput(); err != nil { //nolint:gosec
+ return "", fmt.Errorf("git init %s: %w\n%s", projectDir, err, out)
+ }
+ }
+ }
+
+ src := sandboxCloneSource(projectDir)
+
+ tempDir, err := os.MkdirTemp("", "claudomator-sandbox-*")
+ if err != nil {
+ return "", fmt.Errorf("creating sandbox dir: %w", err)
+ }
+ // git clone requires the target to not exist; remove the placeholder first.
+ if err := os.Remove(tempDir); err != nil {
+ return "", fmt.Errorf("removing temp dir placeholder: %w", err)
+ }
+ out, err := exec.Command("git", gitSafe("clone", "--no-hardlinks", src, tempDir)...).CombinedOutput()
+ if err != nil {
+ return "", fmt.Errorf("git clone: %w\n%s", err, out)
+ }
+ return tempDir, nil
+}
+
+// teardownSandbox verifies the sandbox is clean and pushes new commits to the
+// canonical bare repo. If the push is rejected because another task pushed
+// concurrently, it fetches and rebases then retries once.
+//
+// The working copy (projectDir) is NOT updated automatically — it is the
+// developer's workspace and is pulled manually. This avoids permission errors
+// from mixed-owner .git/objects directories.
+func teardownSandbox(projectDir, sandboxDir, startHEAD string, logger *slog.Logger, execRecord *storage.Execution) error {
+ // Automatically commit uncommitted changes.
+ out, err := exec.Command("git", "-C", sandboxDir, "status", "--porcelain").Output()
+ if err != nil {
+ return fmt.Errorf("git status: %w", err)
+ }
+ if len(strings.TrimSpace(string(out))) > 0 {
+ logger.Info("autocommitting uncommitted changes", "sandbox", sandboxDir)
+
+ // Run build before autocommitting.
+ if _, err := os.Stat(filepath.Join(sandboxDir, "Makefile")); err == nil {
+ logger.Info("running 'make build' before autocommit", "sandbox", sandboxDir)
+ if buildOut, buildErr := exec.Command("make", "-C", sandboxDir, "build").CombinedOutput(); buildErr != nil {
+ return fmt.Errorf("build failed before autocommit: %w\n%s", buildErr, buildOut)
+ }
+ } else if _, err := os.Stat(filepath.Join(sandboxDir, "gradlew")); err == nil {
+ logger.Info("running './gradlew build' before autocommit", "sandbox", sandboxDir)
+ cmd := exec.Command("./gradlew", "build")
+ cmd.Dir = sandboxDir
+ if buildOut, buildErr := cmd.CombinedOutput(); buildErr != nil {
+ return fmt.Errorf("build failed before autocommit: %w\n%s", buildErr, buildOut)
+ }
+ } else if _, err := os.Stat(filepath.Join(sandboxDir, "go.mod")); err == nil {
+ logger.Info("running 'go build ./...' before autocommit", "sandbox", sandboxDir)
+ cmd := exec.Command("go", "build", "./...")
+ cmd.Dir = sandboxDir
+ if buildOut, buildErr := cmd.CombinedOutput(); buildErr != nil {
+ return fmt.Errorf("build failed before autocommit: %w\n%s", buildErr, buildOut)
+ }
+ }
+
+ cmds := [][]string{
+ gitSafe("-C", sandboxDir, "add", "-A"),
+ gitSafe("-C", sandboxDir, "commit", "-m", "chore: autocommit uncommitted changes"),
+ }
+ for _, args := range cmds {
+ if out, err := exec.Command("git", args...).CombinedOutput(); err != nil {
+ return fmt.Errorf("autocommit failed (%v): %w\n%s", args, err, out)
+ }
+ }
+ }
+
+ // Capture commits before pushing/deleting.
+ // Use startHEAD..HEAD to find all commits made during this execution.
+ logRange := "origin/HEAD..HEAD"
+ if startHEAD != "" && startHEAD != "HEAD" {
+ logRange = startHEAD + "..HEAD"
+ }
+
+ logCmd := exec.Command("git", gitSafe("-C", sandboxDir, "log", logRange, "--pretty=format:%H|%s")...)
+ logOut, logErr := logCmd.CombinedOutput()
+ if logErr == nil {
+ lines := strings.Split(strings.TrimSpace(string(logOut)), "\n")
+ logger.Debug("captured commits", "count", len(lines), "range", logRange)
+ for _, line := range lines {
+ if line == "" {
+ continue
+ }
+ parts := strings.SplitN(line, "|", 2)
+ if len(parts) == 2 {
+ execRecord.Commits = append(execRecord.Commits, task.GitCommit{
+ Hash: parts[0],
+ Message: parts[1],
+ })
+ }
+ }
+ } else {
+ logger.Warn("failed to capture commits", "err", logErr, "range", logRange, "output", string(logOut))
+ }
+
+ // Check whether there are any new commits to push.
+ ahead, err := exec.Command("git", gitSafe("-C", sandboxDir, "rev-list", "--count", logRange)...).Output()
+ if err != nil {
+ logger.Warn("could not determine commits ahead of origin; proceeding", "err", err, "range", logRange)
+ }
+ if strings.TrimSpace(string(ahead)) == "0" {
+ os.RemoveAll(sandboxDir)
+ return nil
+ }
+
+ // Push from sandbox → bare repo (sandbox's origin is the bare repo).
+ if out, err := exec.Command("git", "-C", sandboxDir, "push", "origin", "HEAD").CombinedOutput(); err != nil {
+ // If rejected due to concurrent push, fetch+rebase and retry once.
+ if strings.Contains(string(out), "fetch first") || strings.Contains(string(out), "non-fast-forward") {
+ logger.Info("push rejected (concurrent task); rebasing and retrying", "sandbox", sandboxDir)
+ if out2, err2 := exec.Command("git", "-C", sandboxDir, "pull", "--rebase", "origin", "master").CombinedOutput(); err2 != nil {
+ return fmt.Errorf("git rebase before retry push: %w\n%s", err2, out2)
+ }
+ // Re-capture commits after rebase (hashes might have changed)
+ execRecord.Commits = nil
+ logOut, logErr = exec.Command("git", "-C", sandboxDir, "log", logRange, "--pretty=format:%H|%s").Output()
+ if logErr == nil {
+ lines := strings.Split(strings.TrimSpace(string(logOut)), "\n")
+ for _, line := range lines {
+ parts := strings.SplitN(line, "|", 2)
+ if len(parts) == 2 {
+ execRecord.Commits = append(execRecord.Commits, task.GitCommit{
+ Hash: parts[0],
+ Message: parts[1],
+ })
+ }
+ }
+ }
+
+ if out3, err3 := exec.Command("git", "-C", sandboxDir, "push", "origin", "HEAD").CombinedOutput(); err3 != nil {
+ return fmt.Errorf("git push to origin (after rebase): %w\n%s", err3, out3)
+ }
+ } else {
+ return fmt.Errorf("git push to origin: %w\n%s", err, out)
+ }
+ }
+
+ logger.Info("sandbox pushed to bare repo", "sandbox", sandboxDir)
+ os.RemoveAll(sandboxDir)
+ return nil
+}
+
+// execOnce runs the claude subprocess once, streaming output to e's log paths.
+func (r *ClaudeRunner) execOnce(ctx context.Context, args []string, workingDir, projectDir string, e *storage.Execution) error {
+ cmd := exec.CommandContext(ctx, r.binaryPath(), args...)
+ cmd.Env = append(os.Environ(),
+ "CLAUDOMATOR_API_URL="+r.APIURL,
+ "CLAUDOMATOR_TASK_ID="+e.TaskID,
+ "CLAUDOMATOR_PROJECT_DIR="+projectDir,
+ "CLAUDOMATOR_QUESTION_FILE="+filepath.Join(e.ArtifactDir, "question.json"),
+ "CLAUDOMATOR_SUMMARY_FILE="+filepath.Join(e.ArtifactDir, "summary.txt"),
+ )
+ // Put the subprocess in its own process group so we can SIGKILL the entire
+ // group (MCP servers, bash children, etc.) on cancellation.
+ cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
+ if workingDir != "" {
+ cmd.Dir = workingDir
+ }
+
+ stdoutFile, err := os.Create(e.StdoutPath)
+ if err != nil {
+ return fmt.Errorf("creating stdout log: %w", err)
+ }
+ defer stdoutFile.Close()
+
+ stderrFile, err := os.Create(e.StderrPath)
+ if err != nil {
+ return fmt.Errorf("creating stderr log: %w", err)
+ }
+ defer stderrFile.Close()
+
+ // Use os.Pipe for stdout so we own the read-end lifetime.
+ // cmd.StdoutPipe() would add the read-end to closeAfterWait, causing
+ // cmd.Wait() to close it before our goroutine finishes reading.
+ stdoutR, stdoutW, err := os.Pipe()
+ if err != nil {
+ return fmt.Errorf("creating stdout pipe: %w", err)
+ }
+ cmd.Stdout = stdoutW // *os.File — not added to closeAfterStart/Wait
+ cmd.Stderr = stderrFile
+
+ if err := cmd.Start(); err != nil {
+ stdoutW.Close()
+ stdoutR.Close()
+ return fmt.Errorf("starting claude: %w", err)
+ }
+ // Close our write-end immediately; the subprocess holds its own copy.
+ // The goroutine below gets EOF when the subprocess exits.
+ stdoutW.Close()
+
+ // killDone is closed when cmd.Wait() returns, stopping the pgid-kill goroutine.
+ //
+ // Safety: this goroutine cannot block indefinitely. The select has two arms:
+ // • ctx.Done() — fires if the caller cancels (e.g. timeout, user cancel).
+ // The goroutine sends SIGKILL and exits immediately.
+ // • killDone — closed by close(killDone) below, immediately after cmd.Wait()
+ // returns. This fires when the process exits for any reason (natural exit,
+ // SIGKILL from the ctx arm, or any other signal). The goroutine exits without
+ // doing anything.
+ //
+ // Therefore: for a task that completes normally with a long-lived (non-cancelled)
+ // context, the killDone arm fires and the goroutine exits. There is no path where
+ // this goroutine outlives execOnce().
+ killDone := make(chan struct{})
+ go func() {
+ select {
+ case <-ctx.Done():
+ // SIGKILL the entire process group to reap orphan children.
+ syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)
+ case <-killDone:
+ }
+ }()
+
+ // Stream stdout to the log file and parse cost/errors.
+ // wg ensures costUSD and streamErr are fully written before we read them after cmd.Wait().
+ var costUSD float64
+ var streamErr error
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ costUSD, _, streamErr = parseStream(stdoutR, stdoutFile, r.Logger)
+ stdoutR.Close()
+ }()
+
+ waitErr := cmd.Wait()
+ close(killDone) // stop the pgid-kill goroutine
+ wg.Wait() // drain remaining stdout before reading costUSD/streamErr
+
+ e.CostUSD = costUSD
+
+ if waitErr != nil {
+ if exitErr, ok := waitErr.(*exec.ExitError); ok {
+ e.ExitCode = exitErr.ExitCode()
+ }
+ // If the stream captured a rate-limit or quota message, return it
+ // so callers can distinguish it from a generic exit-status failure.
+ if retry.IsRateLimitError(streamErr) || isQuotaExhausted(streamErr) {
+ return streamErr
+ }
+ if tail := tailFile(e.StderrPath, 20); tail != "" {
+ return fmt.Errorf("claude exited with error: %w\nstderr:\n%s", waitErr, tail)
+ }
+ return fmt.Errorf("claude exited with error: %w", waitErr)
+ }
+
+ e.ExitCode = 0
+ if streamErr != nil {
+ return streamErr
+ }
+ return nil
+}
+
+func (r *ClaudeRunner) buildArgs(t *task.Task, e *storage.Execution, questionFile string) []string {
+ // Resume execution: the agent already has context; just deliver the answer.
+ if e.ResumeSessionID != "" {
+ args := []string{
+ "-p", e.ResumeAnswer,
+ "--resume", e.ResumeSessionID,
+ "--output-format", "stream-json",
+ "--verbose",
+ }
+ permMode := t.Agent.PermissionMode
+ if permMode == "" {
+ permMode = "bypassPermissions"
+ }
+ args = append(args, "--permission-mode", permMode)
+ if t.Agent.Model != "" {
+ args = append(args, "--model", t.Agent.Model)
+ }
+ return args
+ }
+
+ instructions := t.Agent.Instructions
+ allowedTools := t.Agent.AllowedTools
+
+ if !t.Agent.SkipPlanning {
+ instructions = withPlanningPreamble(instructions)
+ // Ensure Bash is available so the agent can POST subtasks and ask questions.
+ hasBash := false
+ for _, tool := range allowedTools {
+ if tool == "Bash" {
+ hasBash = true
+ break
+ }
+ }
+ if !hasBash {
+ allowedTools = append(allowedTools, "Bash")
+ }
+ }
+
+ args := []string{
+ "-p", instructions,
+ "--session-id", e.SessionID,
+ "--output-format", "stream-json",
+ "--verbose",
+ }
+
+ if t.Agent.Model != "" {
+ args = append(args, "--model", t.Agent.Model)
+ }
+ if t.Agent.MaxBudgetUSD > 0 {
+ args = append(args, "--max-budget-usd", fmt.Sprintf("%.2f", t.Agent.MaxBudgetUSD))
+ }
+ // Default to bypassPermissions — claudomator runs tasks unattended, so
+ // prompting for write access would always stall execution. Tasks that need
+ // a more restrictive mode can set permission_mode explicitly.
+ permMode := t.Agent.PermissionMode
+ if permMode == "" {
+ permMode = "bypassPermissions"
+ }
+ args = append(args, "--permission-mode", permMode)
+ if t.Agent.SystemPromptAppend != "" {
+ args = append(args, "--append-system-prompt", t.Agent.SystemPromptAppend)
+ }
+ for _, tool := range allowedTools {
+ args = append(args, "--allowedTools", tool)
+ }
+ for _, tool := range t.Agent.DisallowedTools {
+ args = append(args, "--disallowedTools", tool)
+ }
+ for _, f := range t.Agent.ContextFiles {
+ args = append(args, "--add-dir", f)
+ }
+ args = append(args, t.Agent.AdditionalArgs...)
+
+ return args
+}
+