diff options
Diffstat (limited to 'internal/executor/claude.go')
| -rw-r--r-- | internal/executor/claude.go | 552 |
1 files changed, 552 insertions, 0 deletions
diff --git a/internal/executor/claude.go b/internal/executor/claude.go new file mode 100644 index 0000000..3c87f26 --- /dev/null +++ b/internal/executor/claude.go @@ -0,0 +1,552 @@ +package executor + +import ( + "context" + "fmt" + "log/slog" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" + "syscall" + "time" + + "github.com/thepeterstone/claudomator/internal/retry" + "github.com/thepeterstone/claudomator/internal/storage" + "github.com/thepeterstone/claudomator/internal/task" +) + +// ClaudeRunner spawns the `claude` CLI in non-interactive mode. +type ClaudeRunner struct { + BinaryPath string // defaults to "claude" + Logger *slog.Logger + LogDir string // base directory for execution logs + APIURL string // base URL of the Claudomator API, passed to subprocesses +} + +// BlockedError is returned by Run when the agent wrote a question file and exited. +// The pool transitions the task to BLOCKED and stores the question for the user. +// ExecLogDir returns the log directory for the given execution ID. +// Implements LogPather so the pool can persist paths before execution starts. +func (r *ClaudeRunner) ExecLogDir(execID string) string { + if r.LogDir == "" { + return "" + } + return filepath.Join(r.LogDir, execID) +} + +func (r *ClaudeRunner) binaryPath() string { + if r.BinaryPath != "" { + return r.BinaryPath + } + return "claude" +} + +// Run executes a claude -p invocation, streaming output to log files. +// It retries up to 3 times on rate-limit errors using exponential backoff. +// If the agent writes a question file and exits, Run returns *BlockedError. +// +// When project_dir is set and this is not a resume execution, Run clones the +// project into a temp sandbox, runs the agent there, then merges committed +// changes back to project_dir. On failure the sandbox is preserved and its +// path is included in the error. +func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error { + projectDir := t.Agent.ProjectDir + + // Validate project_dir exists when set. + if projectDir != "" { + if _, err := os.Stat(projectDir); err != nil { + return fmt.Errorf("project_dir %q: %w", projectDir, err) + } + } + + // Setup log directory once; retries overwrite the log files. + logDir := r.ExecLogDir(e.ID) + if logDir == "" { + logDir = e.ID // fallback for tests without LogDir set + } + if err := os.MkdirAll(logDir, 0700); err != nil { + return fmt.Errorf("creating log dir: %w", err) + } + if e.StdoutPath == "" { + e.StdoutPath = filepath.Join(logDir, "stdout.log") + e.StderrPath = filepath.Join(logDir, "stderr.log") + e.ArtifactDir = logDir + } + + // Pre-assign session ID so we can resume after a BLOCKED state. + // For resume executions, the claude session continues under the original + // session ID (the one passed to --resume). Using the new exec's own UUID + // would cause a second block-and-resume cycle to pass the wrong --resume + // argument. + if e.SessionID == "" { + if e.ResumeSessionID != "" { + e.SessionID = e.ResumeSessionID + } else { + e.SessionID = e.ID // reuse execution UUID as session UUID (both are UUIDs) + } + } + + // For new (non-resume) executions with a project_dir, clone into a sandbox. + // Resume executions run in the preserved sandbox (e.SandboxDir) so Claude + // finds its session files under the same project slug. If no sandbox was + // preserved (e.g. task had no project_dir), fall back to project_dir. + var sandboxDir string + var startHEAD string + effectiveWorkingDir := projectDir + if e.ResumeSessionID != "" { + if e.SandboxDir != "" { + if _, statErr := os.Stat(e.SandboxDir); statErr == nil { + effectiveWorkingDir = e.SandboxDir + } else { + // Preserved sandbox was cleaned up (e.g. /tmp purge after reboot). + // Clone a fresh sandbox so the task can run rather than fail immediately. + r.Logger.Warn("preserved sandbox missing, cloning fresh", "sandbox", e.SandboxDir, "project_dir", projectDir) + e.SandboxDir = "" + if projectDir != "" { + var err error + sandboxDir, err = setupSandbox(t.Agent.ProjectDir, r.Logger) + if err != nil { + return fmt.Errorf("setting up sandbox: %w", err) + } + + effectiveWorkingDir = sandboxDir + r.Logger.Info("fresh sandbox created for resume", "sandbox", sandboxDir, "project_dir", projectDir) + } + } + } + } else if projectDir != "" { + var err error + sandboxDir, err = setupSandbox(t.Agent.ProjectDir, r.Logger) + if err != nil { + return fmt.Errorf("setting up sandbox: %w", err) + } + + effectiveWorkingDir = sandboxDir + r.Logger.Info("sandbox created", "sandbox", sandboxDir, "project_dir", projectDir) + } + + if effectiveWorkingDir != "" { + // Capture the initial HEAD so we can identify new commits later. + headOut, _ := exec.Command("git", gitSafe("-C", effectiveWorkingDir, "rev-parse", "HEAD")...).Output() + startHEAD = strings.TrimSpace(string(headOut)) + } + + questionFile := filepath.Join(logDir, "question.json") + args := r.buildArgs(t, e, questionFile) + + attempt := 0 + err := retry.RunWithBackoff(ctx, 3, 5*time.Second, func() error { + if attempt > 0 { + delay := 5 * time.Second * (1 << (attempt - 1)) + r.Logger.Warn("rate-limited by Claude API, retrying", + "attempt", attempt, + "delay", delay, + ) + } + attempt++ + return r.execOnce(ctx, args, effectiveWorkingDir, projectDir, e) + }) + if err != nil { + if sandboxDir != "" { + return fmt.Errorf("%w (sandbox preserved at %s)", err, sandboxDir) + } + return err + } + + // Check whether the agent left a question before exiting. + data, readErr := os.ReadFile(questionFile) + if readErr == nil { + os.Remove(questionFile) // consumed + questionJSON := strings.TrimSpace(string(data)) + // If the agent wrote a completion report instead of a real question, + // extract the text as the summary and fall through to normal completion. + if isCompletionReport(questionJSON) { + r.Logger.Info("treating question file as completion report", "taskID", e.TaskID) + e.Summary = extractQuestionText(questionJSON) + } else { + // Preserve sandbox on BLOCKED — agent may have partial work and its + // Claude session files are stored under the sandbox's project slug. + // The resume execution must run in the same directory. + return &BlockedError{QuestionJSON: questionJSON, SessionID: e.SessionID, SandboxDir: sandboxDir} + } + } + + // Read agent summary if written. + summaryFile := filepath.Join(logDir, "summary.txt") + if summaryData, readErr := os.ReadFile(summaryFile); readErr == nil { + os.Remove(summaryFile) // consumed + e.Summary = strings.TrimSpace(string(summaryData)) + } + + // Merge sandbox back to project_dir and clean up. + if sandboxDir != "" { + if mergeErr := teardownSandbox(projectDir, sandboxDir, startHEAD, r.Logger, e); mergeErr != nil { + return fmt.Errorf("sandbox teardown: %w (sandbox preserved at %s)", mergeErr, sandboxDir) + } + } + return nil +} + +// sandboxCloneSource returns the URL to clone the sandbox from. It prefers a +// remote named "local" (a local bare repo that accepts pushes cleanly), then +// falls back to "origin", then to the working copy path itself. +func sandboxCloneSource(projectDir string) string { + for _, remote := range []string{"local", "origin"} { + out, err := exec.Command("git", gitSafe("-C", projectDir, "remote", "get-url", remote)...).Output() + if err == nil { + u := strings.TrimSpace(string(out)) + if u != "" && (strings.HasPrefix(u, "/") || strings.HasPrefix(u, "file://")) { + return u + } + } + } + return projectDir +} + +// setupSandbox prepares a temporary git clone of projectDir. +// If projectDir is not a git repo it is initialised with an initial commit first. +func setupSandbox(projectDir string, logger *slog.Logger) (string, error) { + // Ensure projectDir is a git repo; initialise if not. + if err := exec.Command("git", gitSafe("-C", projectDir, "rev-parse", "--git-dir")...).Run(); err != nil { + cmds := [][]string{ + gitSafe("-C", projectDir, "init"), + gitSafe("-C", projectDir, "add", "-A"), + gitSafe("-C", projectDir, "commit", "--allow-empty", "-m", "chore: initial commit"), + } + for _, args := range cmds { + if out, err := exec.Command("git", args...).CombinedOutput(); err != nil { //nolint:gosec + return "", fmt.Errorf("git init %s: %w\n%s", projectDir, err, out) + } + } + } + + src := sandboxCloneSource(projectDir) + + tempDir, err := os.MkdirTemp("", "claudomator-sandbox-*") + if err != nil { + return "", fmt.Errorf("creating sandbox dir: %w", err) + } + // git clone requires the target to not exist; remove the placeholder first. + if err := os.Remove(tempDir); err != nil { + return "", fmt.Errorf("removing temp dir placeholder: %w", err) + } + out, err := exec.Command("git", gitSafe("clone", "--no-hardlinks", src, tempDir)...).CombinedOutput() + if err != nil { + return "", fmt.Errorf("git clone: %w\n%s", err, out) + } + return tempDir, nil +} + +// teardownSandbox verifies the sandbox is clean and pushes new commits to the +// canonical bare repo. If the push is rejected because another task pushed +// concurrently, it fetches and rebases then retries once. +// +// The working copy (projectDir) is NOT updated automatically — it is the +// developer's workspace and is pulled manually. This avoids permission errors +// from mixed-owner .git/objects directories. +func teardownSandbox(projectDir, sandboxDir, startHEAD string, logger *slog.Logger, execRecord *storage.Execution) error { + // Automatically commit uncommitted changes. + out, err := exec.Command("git", "-C", sandboxDir, "status", "--porcelain").Output() + if err != nil { + return fmt.Errorf("git status: %w", err) + } + if len(strings.TrimSpace(string(out))) > 0 { + logger.Info("autocommitting uncommitted changes", "sandbox", sandboxDir) + + // Run build before autocommitting. + if _, err := os.Stat(filepath.Join(sandboxDir, "Makefile")); err == nil { + logger.Info("running 'make build' before autocommit", "sandbox", sandboxDir) + if buildOut, buildErr := exec.Command("make", "-C", sandboxDir, "build").CombinedOutput(); buildErr != nil { + return fmt.Errorf("build failed before autocommit: %w\n%s", buildErr, buildOut) + } + } else if _, err := os.Stat(filepath.Join(sandboxDir, "gradlew")); err == nil { + logger.Info("running './gradlew build' before autocommit", "sandbox", sandboxDir) + cmd := exec.Command("./gradlew", "build") + cmd.Dir = sandboxDir + if buildOut, buildErr := cmd.CombinedOutput(); buildErr != nil { + return fmt.Errorf("build failed before autocommit: %w\n%s", buildErr, buildOut) + } + } else if _, err := os.Stat(filepath.Join(sandboxDir, "go.mod")); err == nil { + logger.Info("running 'go build ./...' before autocommit", "sandbox", sandboxDir) + cmd := exec.Command("go", "build", "./...") + cmd.Dir = sandboxDir + if buildOut, buildErr := cmd.CombinedOutput(); buildErr != nil { + return fmt.Errorf("build failed before autocommit: %w\n%s", buildErr, buildOut) + } + } + + cmds := [][]string{ + gitSafe("-C", sandboxDir, "add", "-A"), + gitSafe("-C", sandboxDir, "commit", "-m", "chore: autocommit uncommitted changes"), + } + for _, args := range cmds { + if out, err := exec.Command("git", args...).CombinedOutput(); err != nil { + return fmt.Errorf("autocommit failed (%v): %w\n%s", args, err, out) + } + } + } + + // Capture commits before pushing/deleting. + // Use startHEAD..HEAD to find all commits made during this execution. + logRange := "origin/HEAD..HEAD" + if startHEAD != "" && startHEAD != "HEAD" { + logRange = startHEAD + "..HEAD" + } + + logCmd := exec.Command("git", gitSafe("-C", sandboxDir, "log", logRange, "--pretty=format:%H|%s")...) + logOut, logErr := logCmd.CombinedOutput() + if logErr == nil { + lines := strings.Split(strings.TrimSpace(string(logOut)), "\n") + logger.Debug("captured commits", "count", len(lines), "range", logRange) + for _, line := range lines { + if line == "" { + continue + } + parts := strings.SplitN(line, "|", 2) + if len(parts) == 2 { + execRecord.Commits = append(execRecord.Commits, task.GitCommit{ + Hash: parts[0], + Message: parts[1], + }) + } + } + } else { + logger.Warn("failed to capture commits", "err", logErr, "range", logRange, "output", string(logOut)) + } + + // Check whether there are any new commits to push. + ahead, err := exec.Command("git", gitSafe("-C", sandboxDir, "rev-list", "--count", logRange)...).Output() + if err != nil { + logger.Warn("could not determine commits ahead of origin; proceeding", "err", err, "range", logRange) + } + if strings.TrimSpace(string(ahead)) == "0" { + os.RemoveAll(sandboxDir) + return nil + } + + // Push from sandbox → bare repo (sandbox's origin is the bare repo). + if out, err := exec.Command("git", "-C", sandboxDir, "push", "origin", "HEAD").CombinedOutput(); err != nil { + // If rejected due to concurrent push, fetch+rebase and retry once. + if strings.Contains(string(out), "fetch first") || strings.Contains(string(out), "non-fast-forward") { + logger.Info("push rejected (concurrent task); rebasing and retrying", "sandbox", sandboxDir) + if out2, err2 := exec.Command("git", "-C", sandboxDir, "pull", "--rebase", "origin", "master").CombinedOutput(); err2 != nil { + return fmt.Errorf("git rebase before retry push: %w\n%s", err2, out2) + } + // Re-capture commits after rebase (hashes might have changed) + execRecord.Commits = nil + logOut, logErr = exec.Command("git", "-C", sandboxDir, "log", logRange, "--pretty=format:%H|%s").Output() + if logErr == nil { + lines := strings.Split(strings.TrimSpace(string(logOut)), "\n") + for _, line := range lines { + parts := strings.SplitN(line, "|", 2) + if len(parts) == 2 { + execRecord.Commits = append(execRecord.Commits, task.GitCommit{ + Hash: parts[0], + Message: parts[1], + }) + } + } + } + + if out3, err3 := exec.Command("git", "-C", sandboxDir, "push", "origin", "HEAD").CombinedOutput(); err3 != nil { + return fmt.Errorf("git push to origin (after rebase): %w\n%s", err3, out3) + } + } else { + return fmt.Errorf("git push to origin: %w\n%s", err, out) + } + } + + logger.Info("sandbox pushed to bare repo", "sandbox", sandboxDir) + os.RemoveAll(sandboxDir) + return nil +} + +// execOnce runs the claude subprocess once, streaming output to e's log paths. +func (r *ClaudeRunner) execOnce(ctx context.Context, args []string, workingDir, projectDir string, e *storage.Execution) error { + cmd := exec.CommandContext(ctx, r.binaryPath(), args...) + cmd.Env = append(os.Environ(), + "CLAUDOMATOR_API_URL="+r.APIURL, + "CLAUDOMATOR_TASK_ID="+e.TaskID, + "CLAUDOMATOR_PROJECT_DIR="+projectDir, + "CLAUDOMATOR_QUESTION_FILE="+filepath.Join(e.ArtifactDir, "question.json"), + "CLAUDOMATOR_SUMMARY_FILE="+filepath.Join(e.ArtifactDir, "summary.txt"), + ) + // Put the subprocess in its own process group so we can SIGKILL the entire + // group (MCP servers, bash children, etc.) on cancellation. + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + if workingDir != "" { + cmd.Dir = workingDir + } + + stdoutFile, err := os.Create(e.StdoutPath) + if err != nil { + return fmt.Errorf("creating stdout log: %w", err) + } + defer stdoutFile.Close() + + stderrFile, err := os.Create(e.StderrPath) + if err != nil { + return fmt.Errorf("creating stderr log: %w", err) + } + defer stderrFile.Close() + + // Use os.Pipe for stdout so we own the read-end lifetime. + // cmd.StdoutPipe() would add the read-end to closeAfterWait, causing + // cmd.Wait() to close it before our goroutine finishes reading. + stdoutR, stdoutW, err := os.Pipe() + if err != nil { + return fmt.Errorf("creating stdout pipe: %w", err) + } + cmd.Stdout = stdoutW // *os.File — not added to closeAfterStart/Wait + cmd.Stderr = stderrFile + + if err := cmd.Start(); err != nil { + stdoutW.Close() + stdoutR.Close() + return fmt.Errorf("starting claude: %w", err) + } + // Close our write-end immediately; the subprocess holds its own copy. + // The goroutine below gets EOF when the subprocess exits. + stdoutW.Close() + + // killDone is closed when cmd.Wait() returns, stopping the pgid-kill goroutine. + // + // Safety: this goroutine cannot block indefinitely. The select has two arms: + // • ctx.Done() — fires if the caller cancels (e.g. timeout, user cancel). + // The goroutine sends SIGKILL and exits immediately. + // • killDone — closed by close(killDone) below, immediately after cmd.Wait() + // returns. This fires when the process exits for any reason (natural exit, + // SIGKILL from the ctx arm, or any other signal). The goroutine exits without + // doing anything. + // + // Therefore: for a task that completes normally with a long-lived (non-cancelled) + // context, the killDone arm fires and the goroutine exits. There is no path where + // this goroutine outlives execOnce(). + killDone := make(chan struct{}) + go func() { + select { + case <-ctx.Done(): + // SIGKILL the entire process group to reap orphan children. + syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) + case <-killDone: + } + }() + + // Stream stdout to the log file and parse cost/errors. + // wg ensures costUSD and streamErr are fully written before we read them after cmd.Wait(). + var costUSD float64 + var streamErr error + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + costUSD, _, streamErr = parseStream(stdoutR, stdoutFile, r.Logger) + stdoutR.Close() + }() + + waitErr := cmd.Wait() + close(killDone) // stop the pgid-kill goroutine + wg.Wait() // drain remaining stdout before reading costUSD/streamErr + + e.CostUSD = costUSD + + if waitErr != nil { + if exitErr, ok := waitErr.(*exec.ExitError); ok { + e.ExitCode = exitErr.ExitCode() + } + // If the stream captured a rate-limit or quota message, return it + // so callers can distinguish it from a generic exit-status failure. + if retry.IsRateLimitError(streamErr) || isQuotaExhausted(streamErr) { + return streamErr + } + if tail := tailFile(e.StderrPath, 20); tail != "" { + return fmt.Errorf("claude exited with error: %w\nstderr:\n%s", waitErr, tail) + } + return fmt.Errorf("claude exited with error: %w", waitErr) + } + + e.ExitCode = 0 + if streamErr != nil { + return streamErr + } + return nil +} + +func (r *ClaudeRunner) buildArgs(t *task.Task, e *storage.Execution, questionFile string) []string { + // Resume execution: the agent already has context; just deliver the answer. + if e.ResumeSessionID != "" { + args := []string{ + "-p", e.ResumeAnswer, + "--resume", e.ResumeSessionID, + "--output-format", "stream-json", + "--verbose", + } + permMode := t.Agent.PermissionMode + if permMode == "" { + permMode = "bypassPermissions" + } + args = append(args, "--permission-mode", permMode) + if t.Agent.Model != "" { + args = append(args, "--model", t.Agent.Model) + } + return args + } + + instructions := t.Agent.Instructions + allowedTools := t.Agent.AllowedTools + + if !t.Agent.SkipPlanning { + instructions = withPlanningPreamble(instructions) + // Ensure Bash is available so the agent can POST subtasks and ask questions. + hasBash := false + for _, tool := range allowedTools { + if tool == "Bash" { + hasBash = true + break + } + } + if !hasBash { + allowedTools = append(allowedTools, "Bash") + } + } + + args := []string{ + "-p", instructions, + "--session-id", e.SessionID, + "--output-format", "stream-json", + "--verbose", + } + + if t.Agent.Model != "" { + args = append(args, "--model", t.Agent.Model) + } + if t.Agent.MaxBudgetUSD > 0 { + args = append(args, "--max-budget-usd", fmt.Sprintf("%.2f", t.Agent.MaxBudgetUSD)) + } + // Default to bypassPermissions — claudomator runs tasks unattended, so + // prompting for write access would always stall execution. Tasks that need + // a more restrictive mode can set permission_mode explicitly. + permMode := t.Agent.PermissionMode + if permMode == "" { + permMode = "bypassPermissions" + } + args = append(args, "--permission-mode", permMode) + if t.Agent.SystemPromptAppend != "" { + args = append(args, "--append-system-prompt", t.Agent.SystemPromptAppend) + } + for _, tool := range allowedTools { + args = append(args, "--allowedTools", tool) + } + for _, tool := range t.Agent.DisallowedTools { + args = append(args, "--disallowedTools", tool) + } + for _, f := range t.Agent.ContextFiles { + args = append(args, "--add-dir", f) + } + args = append(args, t.Agent.AdditionalArgs...) + + return args +} + |
