diff options
| author | Peter Stone <thepeterstone@gmail.com> | 2026-03-05 18:51:50 +0000 |
|---|---|---|
| committer | Peter Stone <thepeterstone@gmail.com> | 2026-03-05 18:51:50 +0000 |
| commit | cf83444a9d341ae362e65a9f995100c69176887c (patch) | |
| tree | 0dc12aea9510d10d9e60e9c58473cbdb9db5db47 /internal/executor/claude.go | |
| parent | 680e5f668637248073c1f8f7e3547810ab1ada36 (diff) | |
Rescue work from claudomator-work: question/answer, ratelimit, start-next-task
Merges features developed in /site/doot.terst.org/claudomator-work (a
stale clone) into the canonical repo:
- executor: QuestionRegistry for human-in-the-loop answers, rate limit
detection and exponential backoff retry (ratelimit.go, question.go)
- executor/claude.go: process group isolation (SIGKILL orphans on cancel),
os.Pipe for reliable stdout drain, backoff retry on rate limits
- api/scripts.go: POST /api/scripts/start-next-task handler
- api/server.go: startNextTaskScript field, answer-question route,
BroadcastQuestion for WebSocket question events
- web: Cancel/Restart buttons, question banner UI, log viewer, validate
section, WebSocket auto-connect
All tests pass.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/executor/claude.go')
| -rw-r--r-- | internal/executor/claude.go | 98 |
1 files changed, 73 insertions, 25 deletions
diff --git a/internal/executor/claude.go b/internal/executor/claude.go index 7b3884c..0029331 100644 --- a/internal/executor/claude.go +++ b/internal/executor/claude.go @@ -10,6 +10,9 @@ import ( "os" "os/exec" "path/filepath" + "sync" + "syscall" + "time" "github.com/thepeterstone/claudomator/internal/storage" "github.com/thepeterstone/claudomator/internal/task" @@ -31,71 +34,116 @@ func (r *ClaudeRunner) binaryPath() string { } // Run executes a claude -p invocation, streaming output to log files. +// It retries up to 3 times on rate-limit errors using exponential backoff. func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error { args := r.buildArgs(t) - cmd := exec.CommandContext(ctx, r.binaryPath(), args...) - cmd.Env = append(os.Environ(), - "CLAUDOMATOR_API_URL="+r.APIURL, - "CLAUDOMATOR_TASK_ID="+t.ID, - ) if t.Claude.WorkingDir != "" { if _, err := os.Stat(t.Claude.WorkingDir); err != nil { return fmt.Errorf("working_dir %q: %w", t.Claude.WorkingDir, err) } - cmd.Dir = t.Claude.WorkingDir } - // Setup log directory for this execution. + // Setup log directory once; retries overwrite the log files. logDir := filepath.Join(r.LogDir, e.ID) if err := os.MkdirAll(logDir, 0700); err != nil { return fmt.Errorf("creating log dir: %w", err) } - - stdoutPath := filepath.Join(logDir, "stdout.log") - stderrPath := filepath.Join(logDir, "stderr.log") - e.StdoutPath = stdoutPath - e.StderrPath = stderrPath + e.StdoutPath = filepath.Join(logDir, "stdout.log") + e.StderrPath = filepath.Join(logDir, "stderr.log") e.ArtifactDir = logDir - stdoutFile, err := os.Create(stdoutPath) + attempt := 0 + return runWithBackoff(ctx, 3, 5*time.Second, func() error { + if attempt > 0 { + delay := 5 * time.Second * (1 << (attempt - 1)) + r.Logger.Warn("rate-limited by Claude API, retrying", + "attempt", attempt, + "delay", delay, + ) + } + attempt++ + return r.execOnce(ctx, t, args, e) + }) +} + +// execOnce runs the claude subprocess once, streaming output to e's log paths. +func (r *ClaudeRunner) execOnce(ctx context.Context, t *task.Task, args []string, e *storage.Execution) error { + cmd := exec.CommandContext(ctx, r.binaryPath(), args...) + cmd.Env = append(os.Environ(), + "CLAUDOMATOR_API_URL="+r.APIURL, + "CLAUDOMATOR_TASK_ID="+t.ID, + ) + // Put the subprocess in its own process group so we can SIGKILL the entire + // group (MCP servers, bash children, etc.) on cancellation. + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + if t.Claude.WorkingDir != "" { + cmd.Dir = t.Claude.WorkingDir + } + + stdoutFile, err := os.Create(e.StdoutPath) if err != nil { return fmt.Errorf("creating stdout log: %w", err) } defer stdoutFile.Close() - stderrFile, err := os.Create(stderrPath) + stderrFile, err := os.Create(e.StderrPath) if err != nil { return fmt.Errorf("creating stderr log: %w", err) } defer stderrFile.Close() - stdoutPipe, err := cmd.StdoutPipe() + // Use os.Pipe for stdout so we own the read-end lifetime. + // cmd.StdoutPipe() would add the read-end to closeAfterWait, causing + // cmd.Wait() to close it before our goroutine finishes reading. + stdoutR, stdoutW, err := os.Pipe() if err != nil { return fmt.Errorf("creating stdout pipe: %w", err) } - stderrPipe, err := cmd.StderrPipe() - if err != nil { - return fmt.Errorf("creating stderr pipe: %w", err) - } + cmd.Stdout = stdoutW // *os.File — not added to closeAfterStart/Wait + cmd.Stderr = stderrFile if err := cmd.Start(); err != nil { + stdoutW.Close() + stdoutR.Close() return fmt.Errorf("starting claude: %w", err) } + // Close our write-end immediately; the subprocess holds its own copy. + // The goroutine below gets EOF when the subprocess exits. + stdoutW.Close() + + // killDone is closed when cmd.Wait() returns, stopping the pgid-kill goroutine. + killDone := make(chan struct{}) + go func() { + select { + case <-ctx.Done(): + // SIGKILL the entire process group to reap orphan children. + syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) + case <-killDone: + } + }() - // Stream output to log files and parse cost info. + // Stream stdout to the log file and parse cost. + // wg ensures costUSD is fully written before we read it after cmd.Wait(). var costUSD float64 + var wg sync.WaitGroup + wg.Add(1) go func() { - costUSD = streamAndParseCost(stdoutPipe, stdoutFile, r.Logger) + defer wg.Done() + costUSD = streamAndParseCost(stdoutR, stdoutFile, r.Logger) + stdoutR.Close() }() - go io.Copy(stderrFile, stderrPipe) - if err := cmd.Wait(); err != nil { - if exitErr, ok := err.(*exec.ExitError); ok { + waitErr := cmd.Wait() + close(killDone) // stop the pgid-kill goroutine + wg.Wait() // drain remaining stdout before reading costUSD + + if waitErr != nil { + if exitErr, ok := waitErr.(*exec.ExitError); ok { e.ExitCode = exitErr.ExitCode() } e.CostUSD = costUSD - return fmt.Errorf("claude exited with error: %w", err) + return fmt.Errorf("claude exited with error: %w", waitErr) } e.ExitCode = 0 |
