diff options
| author | Peter Stone <thepeterstone@gmail.com> | 2026-03-05 22:53:02 +0000 |
|---|---|---|
| committer | Peter Stone <thepeterstone@gmail.com> | 2026-03-05 22:53:02 +0000 |
| commit | 4c0ee5c215b6b1965ee2ac30d9341f5e8fb6f569 (patch) | |
| tree | 63c2a13e48d5d1ce7781d34b63391c94792f8b46 /internal/executor | |
| parent | 9e790e35708f834abe1a09af52e43742e164cb63 (diff) | |
executor: detect stream-level failures when claude exits 0
Rename streamAndParseCost → parseStream (returns float64, error).
Detect two failure modes that claude reports via exit 0:
- result message with is_error:true
- tool_result permission denial ("haven't granted it yet")
Also fix cost extraction to read total_cost_usd from the result
message (the actual field name), keeping cost_usd as legacy fallback.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/executor')
| -rw-r--r-- | internal/executor/claude.go | 88 | ||||
| -rw-r--r-- | internal/executor/stream_test.go | 85 |
2 files changed, 160 insertions, 13 deletions
diff --git a/internal/executor/claude.go b/internal/executor/claude.go index 0029331..815c21f 100644 --- a/internal/executor/claude.go +++ b/internal/executor/claude.go @@ -10,6 +10,7 @@ import ( "os" "os/exec" "path/filepath" + "strings" "sync" "syscall" "time" @@ -123,31 +124,35 @@ func (r *ClaudeRunner) execOnce(ctx context.Context, t *task.Task, args []string } }() - // Stream stdout to the log file and parse cost. - // wg ensures costUSD is fully written before we read it after cmd.Wait(). + // Stream stdout to the log file and parse cost/errors. + // wg ensures costUSD and streamErr are fully written before we read them after cmd.Wait(). var costUSD float64 + var streamErr error var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() - costUSD = streamAndParseCost(stdoutR, stdoutFile, r.Logger) + costUSD, streamErr = parseStream(stdoutR, stdoutFile, r.Logger) stdoutR.Close() }() waitErr := cmd.Wait() close(killDone) // stop the pgid-kill goroutine - wg.Wait() // drain remaining stdout before reading costUSD + wg.Wait() // drain remaining stdout before reading costUSD/streamErr + + e.CostUSD = costUSD if waitErr != nil { if exitErr, ok := waitErr.(*exec.ExitError); ok { e.ExitCode = exitErr.ExitCode() } - e.CostUSD = costUSD return fmt.Errorf("claude exited with error: %w", waitErr) } e.ExitCode = 0 - e.CostUSD = costUSD + if streamErr != nil { + return streamErr + } return nil } @@ -202,26 +207,83 @@ func (r *ClaudeRunner) buildArgs(t *task.Task) []string { return args } -// streamAndParseCost reads streaming JSON from claude and writes to the log file, -// extracting cost data from the stream. -func streamAndParseCost(r io.Reader, w io.Writer, logger *slog.Logger) float64 { +// parseStream reads streaming JSON from claude, writes to w, and returns +// (costUSD, error). error is non-nil if the stream signals task failure: +// - result message has is_error:true +// - a tool_result was denied due to missing permissions +func parseStream(r io.Reader, w io.Writer, logger *slog.Logger) (float64, error) { tee := io.TeeReader(r, w) scanner := bufio.NewScanner(tee) scanner.Buffer(make([]byte, 1024*1024), 1024*1024) // 1MB buffer for large lines var totalCost float64 + var streamErr error + for scanner.Scan() { line := scanner.Bytes() var msg map[string]interface{} if err := json.Unmarshal(line, &msg); err != nil { continue } - // Extract cost from result messages. - if costData, ok := msg["cost_usd"]; ok { - if cost, ok := costData.(float64); ok { + + msgType, _ := msg["type"].(string) + switch msgType { + case "result": + if isErr, _ := msg["is_error"].(bool); isErr { + result, _ := msg["result"].(string) + if result != "" { + streamErr = fmt.Errorf("claude task failed: %s", result) + } else { + streamErr = fmt.Errorf("claude task failed (is_error=true in result)") + } + } + // Prefer total_cost_usd from result message; fall through to legacy check below. + if cost, ok := msg["total_cost_usd"].(float64); ok { totalCost = cost } + case "user": + // Detect permission-denial tool_results. These occur when permission_mode + // is not bypassPermissions and claude exits 0 without completing its task. + if err := permissionDenialError(msg); err != nil && streamErr == nil { + streamErr = err + } + } + + // Legacy cost field used by older claude versions. + if cost, ok := msg["cost_usd"].(float64); ok { + totalCost = cost + } + } + + return totalCost, streamErr +} + +// permissionDenialError inspects a "user" stream message for tool_result entries +// that were denied due to missing permissions. Returns an error if found. +func permissionDenialError(msg map[string]interface{}) error { + message, ok := msg["message"].(map[string]interface{}) + if !ok { + return nil + } + content, ok := message["content"].([]interface{}) + if !ok { + return nil + } + for _, item := range content { + itemMap, ok := item.(map[string]interface{}) + if !ok { + continue + } + if itemMap["type"] != "tool_result" { + continue + } + if isErr, _ := itemMap["is_error"].(bool); !isErr { + continue + } + text, _ := itemMap["content"].(string) + if strings.Contains(text, "requested permissions") || strings.Contains(text, "haven't granted") { + return fmt.Errorf("permission denied by host: %s", text) } } - return totalCost + return nil } diff --git a/internal/executor/stream_test.go b/internal/executor/stream_test.go new file mode 100644 index 0000000..10eb858 --- /dev/null +++ b/internal/executor/stream_test.go @@ -0,0 +1,85 @@ +package executor + +import ( + "io" + "log/slog" + "strings" + "testing" +) + +// streamLine builds a single-line stream-json message. +func streamLine(json string) string { return json + "\n" } + +func TestParseStream_ResultIsError_ReturnsError(t *testing.T) { + input := streamLine(`{"type":"result","subtype":"error_during_execution","is_error":true,"result":"something went wrong"}`) + _, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil))) + if err == nil { + t.Fatal("expected error when result.is_error=true, got nil") + } + if !strings.Contains(err.Error(), "something went wrong") { + t.Errorf("error should contain result text, got: %v", err) + } +} + +func TestParseStream_PermissionDenied_ReturnsError(t *testing.T) { + // Simulate the permission denial tool_result that Claude Code emits + // when permission_mode is not bypassPermissions. + input := streamLine(`{"type":"user","message":{"role":"user","content":[{"type":"tool_result","is_error":true,"content":"Claude requested permissions to write to /foo/bar.go, but you haven't granted it yet.","tool_use_id":"tu_abc"}]}}`) + + streamLine(`{"type":"result","subtype":"success","is_error":false,"result":"I need permission","total_cost_usd":0.1}`) + + _, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil))) + if err == nil { + t.Fatal("expected error for permission denial, got nil") + } + if !strings.Contains(err.Error(), "permission") { + t.Errorf("error should mention permission, got: %v", err) + } +} + +func TestParseStream_Success_ReturnsNilError(t *testing.T) { + input := streamLine(`{"type":"assistant","message":{"content":[{"type":"text","text":"Done."}]}}`) + + streamLine(`{"type":"result","subtype":"success","is_error":false,"result":"All tests pass.","total_cost_usd":0.05}`) + + _, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil))) + if err != nil { + t.Fatalf("expected nil error for success stream, got: %v", err) + } +} + +func TestParseStream_ExtractsCostFromResultMessage(t *testing.T) { + input := streamLine(`{"type":"result","subtype":"success","is_error":false,"result":"done","total_cost_usd":1.2345}`) + + cost, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil))) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if cost != 1.2345 { + t.Errorf("want cost 1.2345, got %f", cost) + } +} + +func TestParseStream_ExtractsCostFromLegacyCostUSD(t *testing.T) { + // Some versions emit cost_usd at the top level rather than total_cost_usd. + input := streamLine(`{"type":"result","subtype":"success","is_error":false,"result":"done","cost_usd":0.99}`) + + cost, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil))) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if cost != 0.99 { + t.Errorf("want cost 0.99, got %f", cost) + } +} + +func TestParseStream_NonToolResultIsError_DoesNotFail(t *testing.T) { + // A tool_result with is_error:true that is NOT a permission denial + // (e.g. the tool ran but the command failed) should not mark the task failed — + // the agent can recover from individual tool errors. + input := streamLine(`{"type":"user","message":{"role":"user","content":[{"type":"tool_result","is_error":true,"content":"exit status 1","tool_use_id":"tu_xyz"}]}}`) + + streamLine(`{"type":"result","subtype":"success","is_error":false,"result":"Fixed it.","total_cost_usd":0.2}`) + + _, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil))) + if err != nil { + t.Fatalf("non-permission tool errors should not fail the task, got: %v", err) + } +} |
