diff options
| author | Peter Stone <thepeterstone@gmail.com> | 2026-03-18 00:52:49 +0000 |
|---|---|---|
| committer | Peter Stone <thepeterstone@gmail.com> | 2026-03-18 07:54:48 +0000 |
| commit | 5814e7d6bdec659bb8ca10cc18447a821c59ad4c (patch) | |
| tree | 2106c5a34ae709d4628368a4314f7b1fea076243 /internal/executor/claude.go | |
| parent | 0fb4e3e81c20b2e2b58040772b747ec1dd9e09e7 (diff) | |
fix: comprehensive addressing of container execution review feedback
- Fix Critical Bug 1: Only remove workspace on success, preserve on failure/BLOCKED.
- Fix Critical Bug 2: Use correct Claude flag (--resume) and pass instructions via file.
- Fix Critical Bug 3: Actually mount and use the instructions file in the container.
- Address Design Issue 4: Implement Resume/BLOCKED detection and host-side workspace re-use.
- Address Design Issue 5: Consolidate RepositoryURL to Task level and fix API fallback.
- Address Design Issue 6: Make agent images configurable per runner type via CLI flags.
- Address Design Issue 7: Secure API keys via .claudomator-env file and --env-file flag.
- Address Code Quality 8: Add unit tests for ContainerRunner arg construction.
- Address Code Quality 9: Fix indentation regression in app.js.
- Address Code Quality 10: Clean up orphaned Claude/Gemini runner files and move helpers.
- Fix tests: Update server_test.go and executor_test.go to work with new model.
Diffstat (limited to 'internal/executor/claude.go')
| -rw-r--r-- | internal/executor/claude.go | 714 |
1 files changed, 0 insertions, 714 deletions
diff --git a/internal/executor/claude.go b/internal/executor/claude.go deleted file mode 100644 index 6346aa8..0000000 --- a/internal/executor/claude.go +++ /dev/null @@ -1,714 +0,0 @@ -package executor - -import ( - "bufio" - "context" - "encoding/json" - "fmt" - "io" - "log/slog" - "os" - "os/exec" - "path/filepath" - "strings" - "sync" - "syscall" - "time" - - "github.com/thepeterstone/claudomator/internal/storage" - "github.com/thepeterstone/claudomator/internal/task" -) - -// ClaudeRunner spawns the `claude` CLI in non-interactive mode. -type ClaudeRunner struct { - BinaryPath string // defaults to "claude" - Logger *slog.Logger - LogDir string // base directory for execution logs - APIURL string // base URL of the Claudomator API, passed to subprocesses - DropsDir string // path to the drops directory, passed to subprocesses -} - -// BlockedError is returned by Run when the agent wrote a question file and exited. -// The pool transitions the task to BLOCKED and stores the question for the user. -type BlockedError struct { - QuestionJSON string // raw JSON from the question file - SessionID string // claude session to resume once the user answers - SandboxDir string // preserved sandbox path; resume must run here so Claude finds its session files -} - -func (e *BlockedError) Error() string { return fmt.Sprintf("task blocked: %s", e.QuestionJSON) } - -// ExecLogDir returns the log directory for the given execution ID. -// Implements LogPather so the pool can persist paths before execution starts. -func (r *ClaudeRunner) ExecLogDir(execID string) string { - if r.LogDir == "" { - return "" - } - return filepath.Join(r.LogDir, execID) -} - -func (r *ClaudeRunner) binaryPath() string { - if r.BinaryPath != "" { - return r.BinaryPath - } - return "claude" -} - -// Run executes a claude -p invocation, streaming output to log files. -// It retries up to 3 times on rate-limit errors using exponential backoff. -// If the agent writes a question file and exits, Run returns *BlockedError. -// -// When project_dir is set and this is not a resume execution, Run clones the -// project into a temp sandbox, runs the agent there, then merges committed -// changes back to project_dir. On failure the sandbox is preserved and its -// path is included in the error. -func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error { - projectDir := t.Agent.ProjectDir - - // Validate project_dir exists when set. - if projectDir != "" { - if _, err := os.Stat(projectDir); err != nil { - return fmt.Errorf("project_dir %q: %w", projectDir, err) - } - } - - // Setup log directory once; retries overwrite the log files. - logDir := r.ExecLogDir(e.ID) - if logDir == "" { - logDir = e.ID // fallback for tests without LogDir set - } - if err := os.MkdirAll(logDir, 0700); err != nil { - return fmt.Errorf("creating log dir: %w", err) - } - if e.StdoutPath == "" { - e.StdoutPath = filepath.Join(logDir, "stdout.log") - e.StderrPath = filepath.Join(logDir, "stderr.log") - e.ArtifactDir = logDir - } - - // Pre-assign session ID so we can resume after a BLOCKED state. - // For resume executions, the claude session continues under the original - // session ID (the one passed to --resume). Using the new exec's own UUID - // would cause a second block-and-resume cycle to pass the wrong --resume - // argument. - if e.SessionID == "" { - if e.ResumeSessionID != "" { - e.SessionID = e.ResumeSessionID - } else { - e.SessionID = e.ID // reuse execution UUID as session UUID (both are UUIDs) - } - } - - // For new (non-resume) executions with a project_dir, clone into a sandbox. - // Resume executions run in the preserved sandbox (e.SandboxDir) so Claude - // finds its session files under the same project slug. If no sandbox was - // preserved (e.g. task had no project_dir), fall back to project_dir. - var sandboxDir string - var startHEAD string - effectiveWorkingDir := projectDir - if e.ResumeSessionID != "" { - if e.SandboxDir != "" { - if _, statErr := os.Stat(e.SandboxDir); statErr == nil { - effectiveWorkingDir = e.SandboxDir - } else { - // Preserved sandbox was cleaned up (e.g. /tmp purge after reboot). - // Clone a fresh sandbox so the task can run rather than fail immediately. - r.Logger.Warn("preserved sandbox missing, cloning fresh", "sandbox", e.SandboxDir, "project_dir", projectDir) - e.SandboxDir = "" - if projectDir != "" { - var err error - sandboxDir, err = setupSandbox(t.Agent.ProjectDir, r.Logger) - if err != nil { - return fmt.Errorf("setting up sandbox: %w", err) - } - - effectiveWorkingDir = sandboxDir - r.Logger.Info("fresh sandbox created for resume", "sandbox", sandboxDir, "project_dir", projectDir) - } - } - } - } else if projectDir != "" { - var err error - sandboxDir, err = setupSandbox(t.Agent.ProjectDir, r.Logger) - if err != nil { - return fmt.Errorf("setting up sandbox: %w", err) - } - - effectiveWorkingDir = sandboxDir - r.Logger.Info("sandbox created", "sandbox", sandboxDir, "project_dir", projectDir) - } - - if effectiveWorkingDir != "" { - // Capture the initial HEAD so we can identify new commits later. - headOut, _ := exec.Command("git", gitSafe("-C", effectiveWorkingDir, "rev-parse", "HEAD")...).Output() - startHEAD = strings.TrimSpace(string(headOut)) - } - - questionFile := filepath.Join(logDir, "question.json") - args := r.buildArgs(t, e, questionFile) - - attempt := 0 - err := runWithBackoff(ctx, 3, 5*time.Second, func() error { - if attempt > 0 { - delay := 5 * time.Second * (1 << (attempt - 1)) - r.Logger.Warn("rate-limited by Claude API, retrying", - "attempt", attempt, - "delay", delay, - ) - } - attempt++ - return r.execOnce(ctx, args, effectiveWorkingDir, projectDir, e) - }) - if err != nil { - if sandboxDir != "" { - return fmt.Errorf("%w (sandbox preserved at %s)", err, sandboxDir) - } - return err - } - - // Check whether the agent left a question before exiting. - data, readErr := os.ReadFile(questionFile) - if readErr == nil { - os.Remove(questionFile) // consumed - questionJSON := strings.TrimSpace(string(data)) - // If the agent wrote a completion report instead of a real question, - // extract the text as the summary and fall through to normal completion. - if isCompletionReport(questionJSON) { - r.Logger.Info("treating question file as completion report", "taskID", e.TaskID) - e.Summary = extractQuestionText(questionJSON) - } else { - // Preserve sandbox on BLOCKED — agent may have partial work and its - // Claude session files are stored under the sandbox's project slug. - // The resume execution must run in the same directory. - return &BlockedError{QuestionJSON: questionJSON, SessionID: e.SessionID, SandboxDir: sandboxDir} - } - } - - // Read agent summary if written. - summaryFile := filepath.Join(logDir, "summary.txt") - if summaryData, readErr := os.ReadFile(summaryFile); readErr == nil { - os.Remove(summaryFile) // consumed - e.Summary = strings.TrimSpace(string(summaryData)) - } - - // Merge sandbox back to project_dir and clean up. - if sandboxDir != "" { - if mergeErr := teardownSandbox(projectDir, sandboxDir, startHEAD, r.Logger, e); mergeErr != nil { - return fmt.Errorf("sandbox teardown: %w (sandbox preserved at %s)", mergeErr, sandboxDir) - } - } - return nil -} - -// isCompletionReport returns true when a question-file JSON looks like a -// completion report rather than a real user question. Heuristic: no options -// (or empty options) and no "?" anywhere in the text. -func isCompletionReport(questionJSON string) bool { - var q struct { - Text string `json:"text"` - Options []string `json:"options"` - } - if err := json.Unmarshal([]byte(questionJSON), &q); err != nil { - return false - } - return len(q.Options) == 0 && !strings.Contains(q.Text, "?") -} - -// extractQuestionText returns the "text" field from a question-file JSON, or -// the raw string if parsing fails. -func extractQuestionText(questionJSON string) string { - var q struct { - Text string `json:"text"` - } - if err := json.Unmarshal([]byte(questionJSON), &q); err != nil { - return questionJSON - } - return strings.TrimSpace(q.Text) -} - -// gitSafe returns git arguments that prepend "-c safe.directory=*" so that -// commands succeed regardless of the repository owner. This is needed when -// claudomator operates on project directories owned by a different OS user. -func gitSafe(args ...string) []string { - return append([]string{"-c", "safe.directory=*"}, args...) -} - -// sandboxCloneSource returns the URL to clone the sandbox from. It prefers a -// remote named "local" (a local bare repo that accepts pushes cleanly), then -// falls back to "origin", then to the working copy path itself. -func sandboxCloneSource(projectDir string) string { - // Prefer "local" remote, but only if it points to a local path (accepts pushes). - if out, err := exec.Command("git", gitSafe("-C", projectDir, "remote", "get-url", "local")...).Output(); err == nil { - u := strings.TrimSpace(string(out)) - if u != "" && (strings.HasPrefix(u, "/") || strings.HasPrefix(u, "file://")) { - return u - } - } - // Fall back to "origin" — any URL scheme is acceptable for cloning. - if out, err := exec.Command("git", gitSafe("-C", projectDir, "remote", "get-url", "origin")...).Output(); err == nil { - if u := strings.TrimSpace(string(out)); u != "" { - return u - } - } - return projectDir -} - -// setupSandbox prepares a temporary git clone of projectDir. -// If projectDir is not a git repo it is initialised with an initial commit first. -func setupSandbox(projectDir string, logger *slog.Logger) (string, error) { - // Ensure projectDir is a git repo; initialise if not. - if err := exec.Command("git", gitSafe("-C", projectDir, "rev-parse", "--git-dir")...).Run(); err != nil { - cmds := [][]string{ - gitSafe("-C", projectDir, "init"), - gitSafe("-C", projectDir, "add", "-A"), - gitSafe("-C", projectDir, "commit", "--allow-empty", "-m", "chore: initial commit"), - } - for _, args := range cmds { - if out, err := exec.Command("git", args...).CombinedOutput(); err != nil { //nolint:gosec - return "", fmt.Errorf("git init %s: %w\n%s", projectDir, err, out) - } - } - } - - src := sandboxCloneSource(projectDir) - - tempDir, err := os.MkdirTemp("", "claudomator-sandbox-*") - if err != nil { - return "", fmt.Errorf("creating sandbox dir: %w", err) - } - // git clone requires the target to not exist; remove the placeholder first. - if err := os.Remove(tempDir); err != nil { - return "", fmt.Errorf("removing temp dir placeholder: %w", err) - } - out, err := exec.Command("git", gitSafe("clone", "--no-hardlinks", src, tempDir)...).CombinedOutput() - if err != nil { - return "", fmt.Errorf("git clone: %w\n%s", err, out) - } - return tempDir, nil -} - -// teardownSandbox verifies the sandbox is clean and pushes new commits to the -// canonical bare repo. If the push is rejected because another task pushed -// concurrently, it fetches and rebases then retries once. -// -// The working copy (projectDir) is NOT updated automatically — it is the -// developer's workspace and is pulled manually. This avoids permission errors -// from mixed-owner .git/objects directories. -func teardownSandbox(projectDir, sandboxDir, startHEAD string, logger *slog.Logger, execRecord *storage.Execution) error { - // Automatically commit uncommitted changes. - out, err := exec.Command("git", "-C", sandboxDir, "status", "--porcelain").Output() - if err != nil { - return fmt.Errorf("git status: %w", err) - } - if len(strings.TrimSpace(string(out))) > 0 { - logger.Info("autocommitting uncommitted changes", "sandbox", sandboxDir) - - // Run build before autocommitting. - if _, err := os.Stat(filepath.Join(sandboxDir, "Makefile")); err == nil { - logger.Info("running 'make build' before autocommit", "sandbox", sandboxDir) - if buildOut, buildErr := exec.Command("make", "-C", sandboxDir, "build").CombinedOutput(); buildErr != nil { - return fmt.Errorf("build failed before autocommit: %w\n%s", buildErr, buildOut) - } - } else if _, err := os.Stat(filepath.Join(sandboxDir, "gradlew")); err == nil { - logger.Info("running './gradlew build' before autocommit", "sandbox", sandboxDir) - cmd := exec.Command("./gradlew", "build") - cmd.Dir = sandboxDir - if buildOut, buildErr := cmd.CombinedOutput(); buildErr != nil { - return fmt.Errorf("build failed before autocommit: %w\n%s", buildErr, buildOut) - } - } else if _, err := os.Stat(filepath.Join(sandboxDir, "go.mod")); err == nil { - logger.Info("running 'go build ./...' before autocommit", "sandbox", sandboxDir) - cmd := exec.Command("go", "build", "./...") - cmd.Dir = sandboxDir - if buildOut, buildErr := cmd.CombinedOutput(); buildErr != nil { - return fmt.Errorf("build failed before autocommit: %w\n%s", buildErr, buildOut) - } - } - - cmds := [][]string{ - gitSafe("-C", sandboxDir, "add", "-A"), - gitSafe("-C", sandboxDir, "commit", "-m", "chore: autocommit uncommitted changes"), - } - for _, args := range cmds { - if out, err := exec.Command("git", args...).CombinedOutput(); err != nil { - return fmt.Errorf("autocommit failed (%v): %w\n%s", args, err, out) - } - } - } - - // Capture commits before pushing/deleting. - // Use startHEAD..HEAD to find all commits made during this execution. - logRange := "origin/HEAD..HEAD" - if startHEAD != "" && startHEAD != "HEAD" { - logRange = startHEAD + "..HEAD" - } - - logCmd := exec.Command("git", gitSafe("-C", sandboxDir, "log", logRange, "--pretty=format:%H|%s")...) - logOut, logErr := logCmd.CombinedOutput() - if logErr == nil { - lines := strings.Split(strings.TrimSpace(string(logOut)), "\n") - logger.Debug("captured commits", "count", len(lines), "range", logRange) - for _, line := range lines { - if line == "" { - continue - } - parts := strings.SplitN(line, "|", 2) - if len(parts) == 2 { - execRecord.Commits = append(execRecord.Commits, task.GitCommit{ - Hash: parts[0], - Message: parts[1], - }) - } - } - } else { - logger.Warn("failed to capture commits", "err", logErr, "range", logRange, "output", string(logOut)) - } - - // Check whether there are any new commits to push. - ahead, err := exec.Command("git", gitSafe("-C", sandboxDir, "rev-list", "--count", logRange)...).Output() - if err != nil { - logger.Warn("could not determine commits ahead of origin; proceeding", "err", err, "range", logRange) - } - if strings.TrimSpace(string(ahead)) == "0" { - os.RemoveAll(sandboxDir) - return nil - } - - // Push from sandbox → bare repo (sandbox's origin is the bare repo). - if out, err := exec.Command("git", "-C", sandboxDir, "push", "origin", "HEAD").CombinedOutput(); err != nil { - // If rejected due to concurrent push, fetch+rebase and retry once. - if strings.Contains(string(out), "fetch first") || strings.Contains(string(out), "non-fast-forward") { - logger.Info("push rejected (concurrent task); rebasing and retrying", "sandbox", sandboxDir) - if out2, err2 := exec.Command("git", "-C", sandboxDir, "pull", "--rebase", "origin", "master").CombinedOutput(); err2 != nil { - return fmt.Errorf("git rebase before retry push: %w\n%s", err2, out2) - } - // Re-capture commits after rebase (hashes might have changed) - execRecord.Commits = nil - logOut, logErr = exec.Command("git", "-C", sandboxDir, "log", logRange, "--pretty=format:%H|%s").Output() - if logErr == nil { - lines := strings.Split(strings.TrimSpace(string(logOut)), "\n") - for _, line := range lines { - parts := strings.SplitN(line, "|", 2) - if len(parts) == 2 { - execRecord.Commits = append(execRecord.Commits, task.GitCommit{ - Hash: parts[0], - Message: parts[1], - }) - } - } - } - - if out3, err3 := exec.Command("git", "-C", sandboxDir, "push", "origin", "HEAD").CombinedOutput(); err3 != nil { - return fmt.Errorf("git push to origin (after rebase): %w\n%s", err3, out3) - } - } else { - return fmt.Errorf("git push to origin: %w\n%s", err, out) - } - } - - logger.Info("sandbox pushed to bare repo", "sandbox", sandboxDir) - os.RemoveAll(sandboxDir) - return nil -} - -// execOnce runs the claude subprocess once, streaming output to e's log paths. -func (r *ClaudeRunner) execOnce(ctx context.Context, args []string, workingDir, projectDir string, e *storage.Execution) error { - cmd := exec.CommandContext(ctx, r.binaryPath(), args...) - cmd.Env = append(os.Environ(), - "CLAUDOMATOR_API_URL="+r.APIURL, - "CLAUDOMATOR_TASK_ID="+e.TaskID, - "CLAUDOMATOR_PROJECT_DIR="+projectDir, - "CLAUDOMATOR_QUESTION_FILE="+filepath.Join(e.ArtifactDir, "question.json"), - "CLAUDOMATOR_SUMMARY_FILE="+filepath.Join(e.ArtifactDir, "summary.txt"), - "CLAUDOMATOR_DROP_DIR="+r.DropsDir, - ) - // Put the subprocess in its own process group so we can SIGKILL the entire - // group (MCP servers, bash children, etc.) on cancellation. - cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} - if workingDir != "" { - cmd.Dir = workingDir - } - - stdoutFile, err := os.Create(e.StdoutPath) - if err != nil { - return fmt.Errorf("creating stdout log: %w", err) - } - defer stdoutFile.Close() - - stderrFile, err := os.Create(e.StderrPath) - if err != nil { - return fmt.Errorf("creating stderr log: %w", err) - } - defer stderrFile.Close() - - // Use os.Pipe for stdout so we own the read-end lifetime. - // cmd.StdoutPipe() would add the read-end to closeAfterWait, causing - // cmd.Wait() to close it before our goroutine finishes reading. - stdoutR, stdoutW, err := os.Pipe() - if err != nil { - return fmt.Errorf("creating stdout pipe: %w", err) - } - cmd.Stdout = stdoutW // *os.File — not added to closeAfterStart/Wait - cmd.Stderr = stderrFile - - if err := cmd.Start(); err != nil { - stdoutW.Close() - stdoutR.Close() - return fmt.Errorf("starting claude: %w", err) - } - // Close our write-end immediately; the subprocess holds its own copy. - // The goroutine below gets EOF when the subprocess exits. - stdoutW.Close() - - // killDone is closed when cmd.Wait() returns, stopping the pgid-kill goroutine. - // - // Safety: this goroutine cannot block indefinitely. The select has two arms: - // • ctx.Done() — fires if the caller cancels (e.g. timeout, user cancel). - // The goroutine sends SIGKILL and exits immediately. - // • killDone — closed by close(killDone) below, immediately after cmd.Wait() - // returns. This fires when the process exits for any reason (natural exit, - // SIGKILL from the ctx arm, or any other signal). The goroutine exits without - // doing anything. - // - // Therefore: for a task that completes normally with a long-lived (non-cancelled) - // context, the killDone arm fires and the goroutine exits. There is no path where - // this goroutine outlives execOnce(). - killDone := make(chan struct{}) - go func() { - select { - case <-ctx.Done(): - // SIGKILL the entire process group to reap orphan children. - syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) - case <-killDone: - } - }() - - // Stream stdout to the log file and parse cost/errors. - // wg ensures costUSD and streamErr are fully written before we read them after cmd.Wait(). - var costUSD float64 - var streamErr error - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - costUSD, streamErr = parseStream(stdoutR, stdoutFile, r.Logger) - stdoutR.Close() - }() - - waitErr := cmd.Wait() - close(killDone) // stop the pgid-kill goroutine - wg.Wait() // drain remaining stdout before reading costUSD/streamErr - - e.CostUSD = costUSD - - if waitErr != nil { - if exitErr, ok := waitErr.(*exec.ExitError); ok { - e.ExitCode = exitErr.ExitCode() - } - // If the stream captured a rate-limit or quota message, return it - // so callers can distinguish it from a generic exit-status failure. - if isRateLimitError(streamErr) || isQuotaExhausted(streamErr) { - return streamErr - } - if tail := tailFile(e.StderrPath, 20); tail != "" { - return fmt.Errorf("claude exited with error: %w\nstderr:\n%s", waitErr, tail) - } - return fmt.Errorf("claude exited with error: %w", waitErr) - } - - e.ExitCode = 0 - if streamErr != nil { - return streamErr - } - return nil -} - -func (r *ClaudeRunner) buildArgs(t *task.Task, e *storage.Execution, questionFile string) []string { - // Resume execution: the agent already has context; just deliver the answer. - if e.ResumeSessionID != "" { - args := []string{ - "-p", e.ResumeAnswer, - "--resume", e.ResumeSessionID, - "--output-format", "stream-json", - "--verbose", - } - permMode := t.Agent.PermissionMode - if permMode == "" { - permMode = "bypassPermissions" - } - args = append(args, "--permission-mode", permMode) - if t.Agent.Model != "" { - args = append(args, "--model", t.Agent.Model) - } - return args - } - - instructions := t.Agent.Instructions - allowedTools := t.Agent.AllowedTools - - if !t.Agent.SkipPlanning { - instructions = withPlanningPreamble(instructions) - // Ensure Bash is available so the agent can POST subtasks and ask questions. - hasBash := false - for _, tool := range allowedTools { - if tool == "Bash" { - hasBash = true - break - } - } - if !hasBash { - allowedTools = append(allowedTools, "Bash") - } - } - - args := []string{ - "-p", instructions, - "--session-id", e.SessionID, - "--output-format", "stream-json", - "--verbose", - } - - if t.Agent.Model != "" { - args = append(args, "--model", t.Agent.Model) - } - if t.Agent.MaxBudgetUSD > 0 { - args = append(args, "--max-budget-usd", fmt.Sprintf("%.2f", t.Agent.MaxBudgetUSD)) - } - // Default to bypassPermissions — claudomator runs tasks unattended, so - // prompting for write access would always stall execution. Tasks that need - // a more restrictive mode can set permission_mode explicitly. - permMode := t.Agent.PermissionMode - if permMode == "" { - permMode = "bypassPermissions" - } - args = append(args, "--permission-mode", permMode) - if t.Agent.SystemPromptAppend != "" { - args = append(args, "--append-system-prompt", t.Agent.SystemPromptAppend) - } - for _, tool := range allowedTools { - args = append(args, "--allowedTools", tool) - } - for _, tool := range t.Agent.DisallowedTools { - args = append(args, "--disallowedTools", tool) - } - for _, f := range t.Agent.ContextFiles { - args = append(args, "--add-dir", f) - } - args = append(args, t.Agent.AdditionalArgs...) - - return args -} - -// parseStream reads streaming JSON from claude, writes to w, and returns -// (costUSD, error). error is non-nil if the stream signals task failure: -// - result message has is_error:true -// - a tool_result was denied due to missing permissions -func parseStream(r io.Reader, w io.Writer, logger *slog.Logger) (float64, error) { - tee := io.TeeReader(r, w) - scanner := bufio.NewScanner(tee) - scanner.Buffer(make([]byte, 1024*1024), 1024*1024) // 1MB buffer for large lines - - var totalCost float64 - var streamErr error - - for scanner.Scan() { - line := scanner.Bytes() - var msg map[string]interface{} - if err := json.Unmarshal(line, &msg); err != nil { - continue - } - - msgType, _ := msg["type"].(string) - switch msgType { - case "rate_limit_event": - if info, ok := msg["rate_limit_info"].(map[string]interface{}); ok { - status, _ := info["status"].(string) - if status == "rejected" { - streamErr = fmt.Errorf("claude rate limit reached (rejected): %v", msg) - // Immediately break since we can't continue anyway - break - } - } - case "assistant": - if errStr, ok := msg["error"].(string); ok && errStr == "rate_limit" { - streamErr = fmt.Errorf("claude rate limit reached: %v", msg) - } - case "result": - if isErr, _ := msg["is_error"].(bool); isErr { - result, _ := msg["result"].(string) - if result != "" { - streamErr = fmt.Errorf("claude task failed: %s", result) - } else { - streamErr = fmt.Errorf("claude task failed (is_error=true in result)") - } - } - // Prefer total_cost_usd from result message; fall through to legacy check below. - if cost, ok := msg["total_cost_usd"].(float64); ok { - totalCost = cost - } - case "user": - // Detect permission-denial tool_results. These occur when permission_mode - // is not bypassPermissions and claude exits 0 without completing its task. - if err := permissionDenialError(msg); err != nil && streamErr == nil { - streamErr = err - } - } - - // Legacy cost field used by older claude versions. - if cost, ok := msg["cost_usd"].(float64); ok { - totalCost = cost - } - } - - return totalCost, streamErr -} - -// permissionDenialError inspects a "user" stream message for tool_result entries -// that were denied due to missing permissions. Returns an error if found. -func permissionDenialError(msg map[string]interface{}) error { - message, ok := msg["message"].(map[string]interface{}) - if !ok { - return nil - } - content, ok := message["content"].([]interface{}) - if !ok { - return nil - } - for _, item := range content { - itemMap, ok := item.(map[string]interface{}) - if !ok { - continue - } - if itemMap["type"] != "tool_result" { - continue - } - if isErr, _ := itemMap["is_error"].(bool); !isErr { - continue - } - text, _ := itemMap["content"].(string) - if strings.Contains(text, "requested permissions") || strings.Contains(text, "haven't granted") { - return fmt.Errorf("permission denied by host: %s", text) - } - } - return nil -} - -// tailFile returns the last n lines of the file at path, or empty string if -// the file cannot be read. Used to surface subprocess stderr on failure. -func tailFile(path string, n int) string { - f, err := os.Open(path) - if err != nil { - return "" - } - defer f.Close() - - var lines []string - scanner := bufio.NewScanner(f) - for scanner.Scan() { - lines = append(lines, scanner.Text()) - if len(lines) > n { - lines = lines[1:] - } - } - return strings.Join(lines, "\n") -} |
