summaryrefslogtreecommitdiff
path: root/internal/executor/claude.go
blob: b97f202d62b30f19dfb0108ee7efa1f1cf967002 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
package executor

import (
	"bufio"
	"context"
	"encoding/json"
	"fmt"
	"io"
	"log/slog"
	"os"
	"os/exec"
	"path/filepath"
	"strings"
	"sync"
	"syscall"
	"time"

	"github.com/thepeterstone/claudomator/internal/storage"
	"github.com/thepeterstone/claudomator/internal/task"
)

// ClaudeRunner spawns the `claude` CLI in non-interactive mode.
type ClaudeRunner struct {
	BinaryPath string // defaults to "claude"
	Logger     *slog.Logger
	LogDir     string // base directory for execution logs
	APIURL     string // base URL of the Claudomator API, passed to subprocesses
}

// BlockedError is returned by Run when the agent wrote a question file and exited.
// The pool transitions the task to BLOCKED and stores the question for the user.
type BlockedError struct {
	QuestionJSON string // raw JSON from the question file
	SessionID    string // claude session to resume once the user answers
}

func (e *BlockedError) Error() string { return fmt.Sprintf("task blocked: %s", e.QuestionJSON) }

// ExecLogDir returns the log directory for the given execution ID.
// Implements LogPather so the pool can persist paths before execution starts.
func (r *ClaudeRunner) ExecLogDir(execID string) string {
	if r.LogDir == "" {
		return ""
	}
	return filepath.Join(r.LogDir, execID)
}

func (r *ClaudeRunner) binaryPath() string {
	if r.BinaryPath != "" {
		return r.BinaryPath
	}
	return "claude"
}

// Run executes a claude -p invocation, streaming output to log files.
// It retries up to 3 times on rate-limit errors using exponential backoff.
// If the agent writes a question file and exits, Run returns *BlockedError.
func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error {
	if t.Claude.WorkingDir != "" {
		if _, err := os.Stat(t.Claude.WorkingDir); err != nil {
			return fmt.Errorf("working_dir %q: %w", t.Claude.WorkingDir, err)
		}
	}

	// Setup log directory once; retries overwrite the log files.
	logDir := r.ExecLogDir(e.ID)
	if logDir == "" {
		logDir = e.ID // fallback for tests without LogDir set
	}
	if err := os.MkdirAll(logDir, 0700); err != nil {
		return fmt.Errorf("creating log dir: %w", err)
	}
	if e.StdoutPath == "" {
		e.StdoutPath = filepath.Join(logDir, "stdout.log")
		e.StderrPath = filepath.Join(logDir, "stderr.log")
		e.ArtifactDir = logDir
	}

	// Pre-assign session ID so we can resume after a BLOCKED state.
	// If this is a resume execution the session ID is already set.
	if e.SessionID == "" {
		e.SessionID = e.ID // reuse execution UUID as session UUID (both are UUIDs)
	}

	questionFile := filepath.Join(logDir, "question.json")
	args := r.buildArgs(t, e, questionFile)

	attempt := 0
	err := runWithBackoff(ctx, 3, 5*time.Second, func() error {
		if attempt > 0 {
			delay := 5 * time.Second * (1 << (attempt - 1))
			r.Logger.Warn("rate-limited by Claude API, retrying",
				"attempt", attempt,
				"delay", delay,
			)
		}
		attempt++
		return r.execOnce(ctx, args, t.Claude.WorkingDir, e)
	})
	if err != nil {
		return err
	}

	// Check whether the agent left a question before exiting.
	data, readErr := os.ReadFile(questionFile)
	if readErr == nil {
		os.Remove(questionFile) // consumed
		return &BlockedError{QuestionJSON: strings.TrimSpace(string(data)), SessionID: e.SessionID}
	}
	return nil
}

// execOnce runs the claude subprocess once, streaming output to e's log paths.
func (r *ClaudeRunner) execOnce(ctx context.Context, args []string, workingDir string, e *storage.Execution) error {
	cmd := exec.CommandContext(ctx, r.binaryPath(), args...)
	cmd.Env = append(os.Environ(),
		"CLAUDOMATOR_API_URL="+r.APIURL,
		"CLAUDOMATOR_TASK_ID="+e.TaskID,
		"CLAUDOMATOR_QUESTION_FILE="+filepath.Join(e.ArtifactDir, "question.json"),
	)
	// Put the subprocess in its own process group so we can SIGKILL the entire
	// group (MCP servers, bash children, etc.) on cancellation.
	cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
	if workingDir != "" {
		cmd.Dir = workingDir
	}

	stdoutFile, err := os.Create(e.StdoutPath)
	if err != nil {
		return fmt.Errorf("creating stdout log: %w", err)
	}
	defer stdoutFile.Close()

	stderrFile, err := os.Create(e.StderrPath)
	if err != nil {
		return fmt.Errorf("creating stderr log: %w", err)
	}
	defer stderrFile.Close()

	// Use os.Pipe for stdout so we own the read-end lifetime.
	// cmd.StdoutPipe() would add the read-end to closeAfterWait, causing
	// cmd.Wait() to close it before our goroutine finishes reading.
	stdoutR, stdoutW, err := os.Pipe()
	if err != nil {
		return fmt.Errorf("creating stdout pipe: %w", err)
	}
	cmd.Stdout = stdoutW // *os.File — not added to closeAfterStart/Wait
	cmd.Stderr = stderrFile

	if err := cmd.Start(); err != nil {
		stdoutW.Close()
		stdoutR.Close()
		return fmt.Errorf("starting claude: %w", err)
	}
	// Close our write-end immediately; the subprocess holds its own copy.
	// The goroutine below gets EOF when the subprocess exits.
	stdoutW.Close()

	// killDone is closed when cmd.Wait() returns, stopping the pgid-kill goroutine.
	killDone := make(chan struct{})
	go func() {
		select {
		case <-ctx.Done():
			// SIGKILL the entire process group to reap orphan children.
			syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)
		case <-killDone:
		}
	}()

	// Stream stdout to the log file and parse cost/errors.
	// wg ensures costUSD and streamErr are fully written before we read them after cmd.Wait().
	var costUSD float64
	var streamErr error
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		costUSD, streamErr = parseStream(stdoutR, stdoutFile, r.Logger)
		stdoutR.Close()
	}()

	waitErr := cmd.Wait()
	close(killDone) // stop the pgid-kill goroutine
	wg.Wait()       // drain remaining stdout before reading costUSD/streamErr

	e.CostUSD = costUSD

	if waitErr != nil {
		if exitErr, ok := waitErr.(*exec.ExitError); ok {
			e.ExitCode = exitErr.ExitCode()
		}
		return fmt.Errorf("claude exited with error: %w", waitErr)
	}

	e.ExitCode = 0
	if streamErr != nil {
		return streamErr
	}
	return nil
}

func (r *ClaudeRunner) buildArgs(t *task.Task, e *storage.Execution, questionFile string) []string {
	// Resume execution: the agent already has context; just deliver the answer.
	if e.ResumeSessionID != "" {
		args := []string{
			"-p", e.ResumeAnswer,
			"--resume", e.ResumeSessionID,
			"--output-format", "stream-json",
			"--verbose",
		}
		permMode := t.Claude.PermissionMode
		if permMode == "" {
			permMode = "bypassPermissions"
		}
		args = append(args, "--permission-mode", permMode)
		if t.Claude.Model != "" {
			args = append(args, "--model", t.Claude.Model)
		}
		return args
	}

	instructions := t.Claude.Instructions
	allowedTools := t.Claude.AllowedTools

	if !t.Claude.SkipPlanning {
		instructions = withPlanningPreamble(instructions)
		// Ensure Bash is available so the agent can POST subtasks and ask questions.
		hasBash := false
		for _, tool := range allowedTools {
			if tool == "Bash" {
				hasBash = true
				break
			}
		}
		if !hasBash {
			allowedTools = append(allowedTools, "Bash")
		}
	}

	args := []string{
		"-p", instructions,
		"--session-id", e.SessionID,
		"--output-format", "stream-json",
		"--verbose",
	}

	if t.Claude.Model != "" {
		args = append(args, "--model", t.Claude.Model)
	}
	if t.Claude.MaxBudgetUSD > 0 {
		args = append(args, "--max-budget-usd", fmt.Sprintf("%.2f", t.Claude.MaxBudgetUSD))
	}
	// Default to bypassPermissions — claudomator runs tasks unattended, so
	// prompting for write access would always stall execution. Tasks that need
	// a more restrictive mode can set permission_mode explicitly.
	permMode := t.Claude.PermissionMode
	if permMode == "" {
		permMode = "bypassPermissions"
	}
	args = append(args, "--permission-mode", permMode)
	if t.Claude.SystemPromptAppend != "" {
		args = append(args, "--append-system-prompt", t.Claude.SystemPromptAppend)
	}
	for _, tool := range allowedTools {
		args = append(args, "--allowedTools", tool)
	}
	for _, tool := range t.Claude.DisallowedTools {
		args = append(args, "--disallowedTools", tool)
	}
	for _, f := range t.Claude.ContextFiles {
		args = append(args, "--add-dir", f)
	}
	args = append(args, t.Claude.AdditionalArgs...)

	return args
}

// parseStream reads streaming JSON from claude, writes to w, and returns
// (costUSD, error). error is non-nil if the stream signals task failure:
// - result message has is_error:true
// - a tool_result was denied due to missing permissions
func parseStream(r io.Reader, w io.Writer, logger *slog.Logger) (float64, error) {
	tee := io.TeeReader(r, w)
	scanner := bufio.NewScanner(tee)
	scanner.Buffer(make([]byte, 1024*1024), 1024*1024) // 1MB buffer for large lines

	var totalCost float64
	var streamErr error

	for scanner.Scan() {
		line := scanner.Bytes()
		var msg map[string]interface{}
		if err := json.Unmarshal(line, &msg); err != nil {
			continue
		}

		msgType, _ := msg["type"].(string)
		switch msgType {
		case "result":
			if isErr, _ := msg["is_error"].(bool); isErr {
				result, _ := msg["result"].(string)
				if result != "" {
					streamErr = fmt.Errorf("claude task failed: %s", result)
				} else {
					streamErr = fmt.Errorf("claude task failed (is_error=true in result)")
				}
			}
			// Prefer total_cost_usd from result message; fall through to legacy check below.
			if cost, ok := msg["total_cost_usd"].(float64); ok {
				totalCost = cost
			}
		case "user":
			// Detect permission-denial tool_results. These occur when permission_mode
			// is not bypassPermissions and claude exits 0 without completing its task.
			if err := permissionDenialError(msg); err != nil && streamErr == nil {
				streamErr = err
			}
		}

		// Legacy cost field used by older claude versions.
		if cost, ok := msg["cost_usd"].(float64); ok {
			totalCost = cost
		}
	}

	return totalCost, streamErr
}

// permissionDenialError inspects a "user" stream message for tool_result entries
// that were denied due to missing permissions. Returns an error if found.
func permissionDenialError(msg map[string]interface{}) error {
	message, ok := msg["message"].(map[string]interface{})
	if !ok {
		return nil
	}
	content, ok := message["content"].([]interface{})
	if !ok {
		return nil
	}
	for _, item := range content {
		itemMap, ok := item.(map[string]interface{})
		if !ok {
			continue
		}
		if itemMap["type"] != "tool_result" {
			continue
		}
		if isErr, _ := itemMap["is_error"].(bool); !isErr {
			continue
		}
		text, _ := itemMap["content"].(string)
		if strings.Contains(text, "requested permissions") || strings.Contains(text, "haven't granted") {
			return fmt.Errorf("permission denied by host: %s", text)
		}
	}
	return nil
}