summaryrefslogtreecommitdiff
path: root/internal/executor/helpers.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/executor/helpers.go')
-rw-r--r--internal/executor/helpers.go165
1 files changed, 165 insertions, 0 deletions
diff --git a/internal/executor/helpers.go b/internal/executor/helpers.go
new file mode 100644
index 0000000..5ffde8e
--- /dev/null
+++ b/internal/executor/helpers.go
@@ -0,0 +1,165 @@
+package executor
+
+import (
+ "bufio"
+ "encoding/json"
+ "fmt"
+ "io"
+ "log/slog"
+ "os"
+ "strings"
+)
+
+// BlockedError is returned by Run when the agent wrote a question file and exited.
+// The pool transitions the task to BLOCKED and stores the question for the user.
+type BlockedError struct {
+ QuestionJSON string // raw JSON from the question file
+ SessionID string // claude session to resume once the user answers
+ SandboxDir string // preserved sandbox path; resume must run here so Claude finds its session files
+}
+
+func (e *BlockedError) Error() string { return fmt.Sprintf("task blocked: %s", e.QuestionJSON) }
+
+// parseStream reads streaming JSON from claude, writes to w, and returns
+// (costUSD, error). error is non-nil if the stream signals task failure:
+// - result message has is_error:true
+// - a tool_result was denied due to missing permissions
+func parseStream(r io.Reader, w io.Writer, logger *slog.Logger) (float64, error) {
+ tee := io.TeeReader(r, w)
+ scanner := bufio.NewScanner(tee)
+ scanner.Buffer(make([]byte, 1024*1024), 1024*1024) // 1MB buffer for large lines
+
+ var totalCost float64
+ var streamErr error
+
+ for scanner.Scan() {
+ line := scanner.Bytes()
+ var msg map[string]interface{}
+ if err := json.Unmarshal(line, &msg); err != nil {
+ continue
+ }
+
+ msgType, _ := msg["type"].(string)
+ switch msgType {
+ case "rate_limit_event":
+ if info, ok := msg["rate_limit_info"].(map[string]interface{}); ok {
+ status, _ := info["status"].(string)
+ if status == "rejected" {
+ streamErr = fmt.Errorf("claude rate limit reached (rejected): %v", msg)
+ // Immediately break since we can't continue anyway
+ break
+ }
+ }
+ case "assistant":
+ if errStr, ok := msg["error"].(string); ok && errStr == "rate_limit" {
+ streamErr = fmt.Errorf("claude rate limit reached: %v", msg)
+ }
+ case "result":
+ if isErr, _ := msg["is_error"].(bool); isErr {
+ result, _ := msg["result"].(string)
+ if result != "" {
+ streamErr = fmt.Errorf("claude task failed: %s", result)
+ } else {
+ streamErr = fmt.Errorf("claude task failed (is_error=true in result)")
+ }
+ }
+ // Prefer total_cost_usd from result message; fall through to legacy check below.
+ if cost, ok := msg["total_cost_usd"].(float64); ok {
+ totalCost = cost
+ }
+ case "user":
+ // Detect permission-denial tool_results. These occur when permission_mode
+ // is not bypassPermissions and claude exits 0 without completing its task.
+ if err := permissionDenialError(msg); err != nil && streamErr == nil {
+ streamErr = err
+ }
+ }
+
+ // Legacy cost field used by older claude versions.
+ if cost, ok := msg["cost_usd"].(float64); ok {
+ totalCost = cost
+ }
+ }
+
+ return totalCost, streamErr
+}
+
+// permissionDenialError inspects a "user" stream message for tool_result entries
+// that were denied due to missing permissions. Returns an error if found.
+func permissionDenialError(msg map[string]interface{}) error {
+ message, ok := msg["message"].(map[string]interface{})
+ if !ok {
+ return nil
+ }
+ content, ok := message["content"].([]interface{})
+ if !ok {
+ return nil
+ }
+ for _, item := range content {
+ itemMap, ok := item.(map[string]interface{})
+ if !ok {
+ continue
+ }
+ if itemMap["type"] != "tool_result" {
+ continue
+ }
+ if isErr, _ := itemMap["is_error"].(bool); !isErr {
+ continue
+ }
+ text, _ := itemMap["content"].(string)
+ if strings.Contains(text, "requested permissions") || strings.Contains(text, "haven't granted") {
+ return fmt.Errorf("permission denied by host: %s", text)
+ }
+ }
+ return nil
+}
+
+// tailFile returns the last n lines of the file at path, or empty string if
+// the file cannot be read. Used to surface subprocess stderr on failure.
+func tailFile(path string, n int) string {
+ f, err := os.Open(path)
+ if err != nil {
+ return ""
+ }
+ defer f.Close()
+
+ var lines []string
+ scanner := bufio.NewScanner(f)
+ for scanner.Scan() {
+ lines = append(lines, scanner.Text())
+ if len(lines) > n {
+ lines = lines[1:]
+ }
+ }
+ return strings.Join(lines, "\n")
+}
+
+func gitSafe(args ...string) []string {
+ return append([]string{"-c", "safe.directory=*"}, args...)
+}
+
+// isCompletionReport returns true when a question-file JSON looks like a
+// completion report rather than a real user question. Heuristic: no options
+// (or empty options) and no "?" anywhere in the text.
+func isCompletionReport(questionJSON string) bool {
+ var q struct {
+ Text string `json:"text"`
+ Options []string `json:"options"`
+ }
+ if err := json.Unmarshal([]byte(questionJSON), &q); err != nil {
+ return false
+ }
+ return len(q.Options) == 0 && !strings.Contains(q.Text, "?")
+}
+
+// extractQuestionText returns the "text" field from a question-file JSON, or
+// the raw string if parsing fails.
+func extractQuestionText(questionJSON string) string {
+ var q struct {
+ Text string `json:"text"`
+ }
+ if err := json.Unmarshal([]byte(questionJSON), &q); err != nil {
+ return questionJSON
+ }
+ return strings.TrimSpace(q.Text)
+}