package executor import ( "bufio" "encoding/json" "fmt" "io" "log/slog" "os" "strings" ) // BlockedError is returned by Run when the agent wrote a question file and exited. // The pool transitions the task to BLOCKED and stores the question for the user. type BlockedError struct { QuestionJSON string // raw JSON from the question file SessionID string // claude session to resume once the user answers SandboxDir string // preserved sandbox path; resume must run here so Claude finds its session files } func (e *BlockedError) Error() string { return fmt.Sprintf("task blocked: %s", e.QuestionJSON) } // parseStream reads streaming JSON from claude, writes to w, and returns // (costUSD, error). error is non-nil if the stream signals task failure: // - result message has is_error:true // - a tool_result was denied due to missing permissions func parseStream(r io.Reader, w io.Writer, logger *slog.Logger) (float64, string, error) { tee := io.TeeReader(r, w) scanner := bufio.NewScanner(tee) scanner.Buffer(make([]byte, 1024*1024), 1024*1024) // 1MB buffer for large lines var totalCost float64 var sessionID string var streamErr error for scanner.Scan() { line := scanner.Bytes() var msg map[string]interface{} if err := json.Unmarshal(line, &msg); err != nil { continue } msgType, _ := msg["type"].(string) switch msgType { case "system": if subtype, ok := msg["subtype"].(string); ok && subtype == "init" { if sid, ok := msg["session_id"].(string); ok { sessionID = sid } } case "rate_limit_event": if info, ok := msg["rate_limit_info"].(map[string]interface{}); ok { status, _ := info["status"].(string) if status == "rejected" { streamErr = fmt.Errorf("claude rate limit reached (rejected): %v", msg) // Immediately break since we can't continue anyway break } } case "assistant": if errStr, ok := msg["error"].(string); ok && errStr == "rate_limit" { streamErr = fmt.Errorf("claude rate limit reached: %v", msg) } case "result": if isErr, _ := msg["is_error"].(bool); isErr { result, _ := msg["result"].(string) if result != "" { streamErr = fmt.Errorf("claude task failed: %s", result) } else { streamErr = fmt.Errorf("claude task failed (is_error=true in result)") } } // Prefer total_cost_usd from result message; fall through to legacy check below. if cost, ok := msg["total_cost_usd"].(float64); ok { totalCost = cost } case "user": // Detect permission-denial tool_results. These occur when permission_mode // is not bypassPermissions and claude exits 0 without completing its task. if err := permissionDenialError(msg); err != nil && streamErr == nil { streamErr = err } } // Legacy cost field used by older claude versions. if cost, ok := msg["cost_usd"].(float64); ok { totalCost = cost } } return totalCost, sessionID, streamErr } // permissionDenialError inspects a "user" stream message for tool_result entries // that were denied due to missing permissions. Returns an error if found. func permissionDenialError(msg map[string]interface{}) error { message, ok := msg["message"].(map[string]interface{}) if !ok { return nil } content, ok := message["content"].([]interface{}) if !ok { return nil } for _, item := range content { itemMap, ok := item.(map[string]interface{}) if !ok { continue } if itemMap["type"] != "tool_result" { continue } if isErr, _ := itemMap["is_error"].(bool); !isErr { continue } text, _ := itemMap["content"].(string) if strings.Contains(text, "requested permissions") || strings.Contains(text, "haven't granted") { return fmt.Errorf("permission denied by host: %s", text) } } return nil } // tailFile returns the last n lines of the file at path, or empty string if // the file cannot be read. Used to surface subprocess stderr on failure. func tailFile(path string, n int) string { f, err := os.Open(path) if err != nil { return "" } defer f.Close() var lines []string scanner := bufio.NewScanner(f) for scanner.Scan() { lines = append(lines, scanner.Text()) if len(lines) > n { lines = lines[1:] } } return strings.Join(lines, "\n") } func gitSafe(args ...string) []string { return append([]string{"-c", "safe.directory=*"}, args...) } // isCompletionReport returns true when a question-file JSON looks like a // completion report rather than a real user question. Heuristic: no options // (or empty options) and no "?" anywhere in the text. func isCompletionReport(questionJSON string) bool { var q struct { Text string `json:"text"` Options []string `json:"options"` } if err := json.Unmarshal([]byte(questionJSON), &q); err != nil { return false } return len(q.Options) == 0 && !strings.Contains(q.Text, "?") } // extractQuestionText returns the "text" field from a question-file JSON, or // the raw string if parsing fails. func extractQuestionText(questionJSON string) string { var q struct { Text string `json:"text"` } if err := json.Unmarshal([]byte(questionJSON), &q); err != nil { return questionJSON } return strings.TrimSpace(q.Text) }