summaryrefslogtreecommitdiff
path: root/internal/executor/claude.go
diff options
context:
space:
mode:
authorPeter Stone <thepeterstone@gmail.com>2026-03-05 18:51:50 +0000
committerPeter Stone <thepeterstone@gmail.com>2026-03-05 18:51:50 +0000
commitcf83444a9d341ae362e65a9f995100c69176887c (patch)
tree0dc12aea9510d10d9e60e9c58473cbdb9db5db47 /internal/executor/claude.go
parent680e5f668637248073c1f8f7e3547810ab1ada36 (diff)
Rescue work from claudomator-work: question/answer, ratelimit, start-next-task
Merges features developed in /site/doot.terst.org/claudomator-work (a stale clone) into the canonical repo: - executor: QuestionRegistry for human-in-the-loop answers, rate limit detection and exponential backoff retry (ratelimit.go, question.go) - executor/claude.go: process group isolation (SIGKILL orphans on cancel), os.Pipe for reliable stdout drain, backoff retry on rate limits - api/scripts.go: POST /api/scripts/start-next-task handler - api/server.go: startNextTaskScript field, answer-question route, BroadcastQuestion for WebSocket question events - web: Cancel/Restart buttons, question banner UI, log viewer, validate section, WebSocket auto-connect All tests pass. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/executor/claude.go')
-rw-r--r--internal/executor/claude.go98
1 files changed, 73 insertions, 25 deletions
diff --git a/internal/executor/claude.go b/internal/executor/claude.go
index 7b3884c..0029331 100644
--- a/internal/executor/claude.go
+++ b/internal/executor/claude.go
@@ -10,6 +10,9 @@ import (
"os"
"os/exec"
"path/filepath"
+ "sync"
+ "syscall"
+ "time"
"github.com/thepeterstone/claudomator/internal/storage"
"github.com/thepeterstone/claudomator/internal/task"
@@ -31,71 +34,116 @@ func (r *ClaudeRunner) binaryPath() string {
}
// Run executes a claude -p invocation, streaming output to log files.
+// It retries up to 3 times on rate-limit errors using exponential backoff.
func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error {
args := r.buildArgs(t)
- cmd := exec.CommandContext(ctx, r.binaryPath(), args...)
- cmd.Env = append(os.Environ(),
- "CLAUDOMATOR_API_URL="+r.APIURL,
- "CLAUDOMATOR_TASK_ID="+t.ID,
- )
if t.Claude.WorkingDir != "" {
if _, err := os.Stat(t.Claude.WorkingDir); err != nil {
return fmt.Errorf("working_dir %q: %w", t.Claude.WorkingDir, err)
}
- cmd.Dir = t.Claude.WorkingDir
}
- // Setup log directory for this execution.
+ // Setup log directory once; retries overwrite the log files.
logDir := filepath.Join(r.LogDir, e.ID)
if err := os.MkdirAll(logDir, 0700); err != nil {
return fmt.Errorf("creating log dir: %w", err)
}
-
- stdoutPath := filepath.Join(logDir, "stdout.log")
- stderrPath := filepath.Join(logDir, "stderr.log")
- e.StdoutPath = stdoutPath
- e.StderrPath = stderrPath
+ e.StdoutPath = filepath.Join(logDir, "stdout.log")
+ e.StderrPath = filepath.Join(logDir, "stderr.log")
e.ArtifactDir = logDir
- stdoutFile, err := os.Create(stdoutPath)
+ attempt := 0
+ return runWithBackoff(ctx, 3, 5*time.Second, func() error {
+ if attempt > 0 {
+ delay := 5 * time.Second * (1 << (attempt - 1))
+ r.Logger.Warn("rate-limited by Claude API, retrying",
+ "attempt", attempt,
+ "delay", delay,
+ )
+ }
+ attempt++
+ return r.execOnce(ctx, t, args, e)
+ })
+}
+
+// execOnce runs the claude subprocess once, streaming output to e's log paths.
+func (r *ClaudeRunner) execOnce(ctx context.Context, t *task.Task, args []string, e *storage.Execution) error {
+ cmd := exec.CommandContext(ctx, r.binaryPath(), args...)
+ cmd.Env = append(os.Environ(),
+ "CLAUDOMATOR_API_URL="+r.APIURL,
+ "CLAUDOMATOR_TASK_ID="+t.ID,
+ )
+ // Put the subprocess in its own process group so we can SIGKILL the entire
+ // group (MCP servers, bash children, etc.) on cancellation.
+ cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
+ if t.Claude.WorkingDir != "" {
+ cmd.Dir = t.Claude.WorkingDir
+ }
+
+ stdoutFile, err := os.Create(e.StdoutPath)
if err != nil {
return fmt.Errorf("creating stdout log: %w", err)
}
defer stdoutFile.Close()
- stderrFile, err := os.Create(stderrPath)
+ stderrFile, err := os.Create(e.StderrPath)
if err != nil {
return fmt.Errorf("creating stderr log: %w", err)
}
defer stderrFile.Close()
- stdoutPipe, err := cmd.StdoutPipe()
+ // Use os.Pipe for stdout so we own the read-end lifetime.
+ // cmd.StdoutPipe() would add the read-end to closeAfterWait, causing
+ // cmd.Wait() to close it before our goroutine finishes reading.
+ stdoutR, stdoutW, err := os.Pipe()
if err != nil {
return fmt.Errorf("creating stdout pipe: %w", err)
}
- stderrPipe, err := cmd.StderrPipe()
- if err != nil {
- return fmt.Errorf("creating stderr pipe: %w", err)
- }
+ cmd.Stdout = stdoutW // *os.File — not added to closeAfterStart/Wait
+ cmd.Stderr = stderrFile
if err := cmd.Start(); err != nil {
+ stdoutW.Close()
+ stdoutR.Close()
return fmt.Errorf("starting claude: %w", err)
}
+ // Close our write-end immediately; the subprocess holds its own copy.
+ // The goroutine below gets EOF when the subprocess exits.
+ stdoutW.Close()
+
+ // killDone is closed when cmd.Wait() returns, stopping the pgid-kill goroutine.
+ killDone := make(chan struct{})
+ go func() {
+ select {
+ case <-ctx.Done():
+ // SIGKILL the entire process group to reap orphan children.
+ syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)
+ case <-killDone:
+ }
+ }()
- // Stream output to log files and parse cost info.
+ // Stream stdout to the log file and parse cost.
+ // wg ensures costUSD is fully written before we read it after cmd.Wait().
var costUSD float64
+ var wg sync.WaitGroup
+ wg.Add(1)
go func() {
- costUSD = streamAndParseCost(stdoutPipe, stdoutFile, r.Logger)
+ defer wg.Done()
+ costUSD = streamAndParseCost(stdoutR, stdoutFile, r.Logger)
+ stdoutR.Close()
}()
- go io.Copy(stderrFile, stderrPipe)
- if err := cmd.Wait(); err != nil {
- if exitErr, ok := err.(*exec.ExitError); ok {
+ waitErr := cmd.Wait()
+ close(killDone) // stop the pgid-kill goroutine
+ wg.Wait() // drain remaining stdout before reading costUSD
+
+ if waitErr != nil {
+ if exitErr, ok := waitErr.(*exec.ExitError); ok {
e.ExitCode = exitErr.ExitCode()
}
e.CostUSD = costUSD
- return fmt.Errorf("claude exited with error: %w", err)
+ return fmt.Errorf("claude exited with error: %w", waitErr)
}
e.ExitCode = 0