summaryrefslogtreecommitdiff
path: root/internal/executor/helpers.go
blob: 9e4530b9c8df395e3d1ffc8ea99b3d3897087843 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package executor

import (
	"bufio"
	"encoding/json"
	"fmt"
	"io"
	"log/slog"
	"os"
	"strings"
)

// BlockedError is returned by Run when the agent wrote a question file and exited.
// The pool transitions the task to BLOCKED and stores the question for the user.
type BlockedError struct {
	QuestionJSON string // raw JSON from the question file
	SessionID    string // claude session to resume once the user answers
	SandboxDir   string // preserved sandbox path; resume must run here so Claude finds its session files
}

func (e *BlockedError) Error() string { return fmt.Sprintf("task blocked: %s", e.QuestionJSON) }

// parseStream reads streaming JSON from claude, writes to w, and returns
// (costUSD, error). error is non-nil if the stream signals task failure:
// - result message has is_error:true
// - a tool_result was denied due to missing permissions
func parseStream(r io.Reader, w io.Writer, logger *slog.Logger) (float64, string, error) {
	tee := io.TeeReader(r, w)
	scanner := bufio.NewScanner(tee)
	scanner.Buffer(make([]byte, 1024*1024), 1024*1024) // 1MB buffer for large lines

	var totalCost float64
	var sessionID string
	var streamErr error

Loop:
	for scanner.Scan() {
		line := scanner.Bytes()
		var msg map[string]interface{}
		if err := json.Unmarshal(line, &msg); err != nil {
			continue
		}

		msgType, _ := msg["type"].(string)
		switch msgType {
		case "system":
			if subtype, ok := msg["subtype"].(string); ok && subtype == "init" {
				if sid, ok := msg["session_id"].(string); ok {
					sessionID = sid
				}
			}
		case "rate_limit_event":
			if info, ok := msg["rate_limit_info"].(map[string]interface{}); ok {
				status, _ := info["status"].(string)
				if status == "rejected" {
					streamErr = fmt.Errorf("claude rate limit reached (rejected): %v", msg)
					// Immediately break since we can't continue anyway
					break Loop
				}
			}
		case "assistant":
			if errStr, ok := msg["error"].(string); ok && errStr == "rate_limit" {
				streamErr = fmt.Errorf("claude rate limit reached: %v", msg)
			}
		case "result":
			if isErr, _ := msg["is_error"].(bool); isErr {
				result, _ := msg["result"].(string)
				if result != "" {
					streamErr = fmt.Errorf("claude task failed: %s", result)
				} else {
					streamErr = fmt.Errorf("claude task failed (is_error=true in result)")
				}
			}
			// Prefer total_cost_usd from result message; fall through to legacy check below.
			if cost, ok := msg["total_cost_usd"].(float64); ok {
				totalCost = cost
			}
		case "user":
			// Detect permission-denial tool_results. These occur when permission_mode
			// is not bypassPermissions and claude exits 0 without completing its task.
			if err := permissionDenialError(msg); err != nil && streamErr == nil {
				streamErr = err
			}
		}

		// Legacy cost field used by older claude versions.
		if cost, ok := msg["cost_usd"].(float64); ok {
			totalCost = cost
		}
	}

	return totalCost, sessionID, streamErr
}


// permissionDenialError inspects a "user" stream message for tool_result entries
// that were denied due to missing permissions. Returns an error if found.
func permissionDenialError(msg map[string]interface{}) error {
	message, ok := msg["message"].(map[string]interface{})
	if !ok {
		return nil
	}
	content, ok := message["content"].([]interface{})
	if !ok {
		return nil
	}
	for _, item := range content {
		itemMap, ok := item.(map[string]interface{})
		if !ok {
			continue
		}
		if itemMap["type"] != "tool_result" {
			continue
		}
		if isErr, _ := itemMap["is_error"].(bool); !isErr {
			continue
		}
		text, _ := itemMap["content"].(string)
		if strings.Contains(text, "requested permissions") || strings.Contains(text, "haven't granted") {
			return fmt.Errorf("permission denied by host: %s", text)
		}
	}
	return nil
}

// tailFile returns the last n lines of the file at path, or empty string if
// the file cannot be read. Used to surface subprocess stderr on failure.
func tailFile(path string, n int) string {
	f, err := os.Open(path)
	if err != nil {
		return ""
	}
	defer f.Close()

	var lines []string
	scanner := bufio.NewScanner(f)
	for scanner.Scan() {
		lines = append(lines, scanner.Text())
		if len(lines) > n {
			lines = lines[1:]
		}
	}
	return strings.Join(lines, "\n")
}

func gitSafe(args ...string) []string {
	return append([]string{"-c", "safe.directory=*"}, args...)
}

// isCompletionReport returns true when a question-file JSON looks like a
// completion report rather than a real user question. Heuristic: no options
// (or empty options) and no "?" anywhere in the text.
func isCompletionReport(questionJSON string) bool {
	var q struct {
		Text    string   `json:"text"`
		Options []string `json:"options"`
	}
	if err := json.Unmarshal([]byte(questionJSON), &q); err != nil {
		return false
	}
	return len(q.Options) == 0 && !strings.Contains(q.Text, "?")
}

// extractQuestionText returns the "text" field from a question-file JSON, or
// the raw string if parsing fails.
func extractQuestionText(questionJSON string) string {
	var q struct {
		Text string `json:"text"`
	}
	if err := json.Unmarshal([]byte(questionJSON), &q); err != nil {
		return questionJSON
	}
	return strings.TrimSpace(q.Text)
}