1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
|
package executor
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"syscall"
"time"
"github.com/thepeterstone/claudomator/internal/storage"
"github.com/thepeterstone/claudomator/internal/task"
)
// ClaudeRunner spawns the `claude` CLI in non-interactive mode.
type ClaudeRunner struct {
BinaryPath string // defaults to "claude"
Logger *slog.Logger
LogDir string // base directory for execution logs
APIURL string // base URL of the Claudomator API, passed to subprocesses
}
// BlockedError is returned by Run when the agent wrote a question file and exited.
// The pool transitions the task to BLOCKED and stores the question for the user.
type BlockedError struct {
QuestionJSON string // raw JSON from the question file
SessionID string // claude session to resume once the user answers
SandboxDir string // preserved sandbox path; resume must run here so Claude finds its session files
}
func (e *BlockedError) Error() string { return fmt.Sprintf("task blocked: %s", e.QuestionJSON) }
// ExecLogDir returns the log directory for the given execution ID.
// Implements LogPather so the pool can persist paths before execution starts.
func (r *ClaudeRunner) ExecLogDir(execID string) string {
if r.LogDir == "" {
return ""
}
return filepath.Join(r.LogDir, execID)
}
func (r *ClaudeRunner) binaryPath() string {
if r.BinaryPath != "" {
return r.BinaryPath
}
return "claude"
}
// Run executes a claude -p invocation, streaming output to log files.
// It retries up to 3 times on rate-limit errors using exponential backoff.
// If the agent writes a question file and exits, Run returns *BlockedError.
//
// When project_dir is set and this is not a resume execution, Run clones the
// project into a temp sandbox, runs the agent there, then merges committed
// changes back to project_dir. On failure the sandbox is preserved and its
// path is included in the error.
func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error {
projectDir := t.Agent.ProjectDir
// Validate project_dir exists when set.
if projectDir != "" {
if _, err := os.Stat(projectDir); err != nil {
return fmt.Errorf("project_dir %q: %w", projectDir, err)
}
}
// Setup log directory once; retries overwrite the log files.
logDir := r.ExecLogDir(e.ID)
if logDir == "" {
logDir = e.ID // fallback for tests without LogDir set
}
if err := os.MkdirAll(logDir, 0700); err != nil {
return fmt.Errorf("creating log dir: %w", err)
}
if e.StdoutPath == "" {
e.StdoutPath = filepath.Join(logDir, "stdout.log")
e.StderrPath = filepath.Join(logDir, "stderr.log")
e.ArtifactDir = logDir
}
// Pre-assign session ID so we can resume after a BLOCKED state.
// For resume executions, the claude session continues under the original
// session ID (the one passed to --resume). Using the new exec's own UUID
// would cause a second block-and-resume cycle to pass the wrong --resume
// argument.
if e.SessionID == "" {
if e.ResumeSessionID != "" {
e.SessionID = e.ResumeSessionID
} else {
e.SessionID = e.ID // reuse execution UUID as session UUID (both are UUIDs)
}
}
// For new (non-resume) executions with a project_dir, clone into a sandbox.
// Resume executions run in the preserved sandbox (e.SandboxDir) so Claude
// finds its session files under the same project slug. If no sandbox was
// preserved (e.g. task had no project_dir), fall back to project_dir.
var sandboxDir string
effectiveWorkingDir := projectDir
if e.ResumeSessionID != "" {
if e.SandboxDir != "" {
effectiveWorkingDir = e.SandboxDir
}
} else if projectDir != "" {
var err error
sandboxDir, err = setupSandbox(projectDir)
if err != nil {
return fmt.Errorf("setting up sandbox: %w", err)
}
effectiveWorkingDir = sandboxDir
r.Logger.Info("sandbox created", "sandbox", sandboxDir, "project_dir", projectDir)
}
questionFile := filepath.Join(logDir, "question.json")
args := r.buildArgs(t, e, questionFile)
attempt := 0
err := runWithBackoff(ctx, 3, 5*time.Second, func() error {
if attempt > 0 {
delay := 5 * time.Second * (1 << (attempt - 1))
r.Logger.Warn("rate-limited by Claude API, retrying",
"attempt", attempt,
"delay", delay,
)
}
attempt++
return r.execOnce(ctx, args, effectiveWorkingDir, e)
})
if err != nil {
if sandboxDir != "" {
return fmt.Errorf("%w (sandbox preserved at %s)", err, sandboxDir)
}
return err
}
// Check whether the agent left a question before exiting.
data, readErr := os.ReadFile(questionFile)
if readErr == nil {
os.Remove(questionFile) // consumed
questionJSON := strings.TrimSpace(string(data))
// If the agent wrote a completion report instead of a real question,
// extract the text as the summary and fall through to normal completion.
if isCompletionReport(questionJSON) {
r.Logger.Info("treating question file as completion report", "taskID", e.TaskID)
e.Summary = extractQuestionText(questionJSON)
} else {
// Preserve sandbox on BLOCKED — agent may have partial work and its
// Claude session files are stored under the sandbox's project slug.
// The resume execution must run in the same directory.
return &BlockedError{QuestionJSON: questionJSON, SessionID: e.SessionID, SandboxDir: sandboxDir}
}
}
// Read agent summary if written.
summaryFile := filepath.Join(logDir, "summary.txt")
if summaryData, readErr := os.ReadFile(summaryFile); readErr == nil {
os.Remove(summaryFile) // consumed
e.Summary = strings.TrimSpace(string(summaryData))
}
// Merge sandbox back to project_dir and clean up.
if sandboxDir != "" {
if mergeErr := teardownSandbox(projectDir, sandboxDir, r.Logger); mergeErr != nil {
return fmt.Errorf("sandbox teardown: %w (sandbox preserved at %s)", mergeErr, sandboxDir)
}
}
return nil
}
// isCompletionReport returns true when a question-file JSON looks like a
// completion report rather than a real user question. Heuristic: no options
// (or empty options) and no "?" anywhere in the text.
func isCompletionReport(questionJSON string) bool {
var q struct {
Text string `json:"text"`
Options []string `json:"options"`
}
if err := json.Unmarshal([]byte(questionJSON), &q); err != nil {
return false
}
return len(q.Options) == 0 && !strings.Contains(q.Text, "?")
}
// extractQuestionText returns the "text" field from a question-file JSON, or
// the raw string if parsing fails.
func extractQuestionText(questionJSON string) string {
var q struct {
Text string `json:"text"`
}
if err := json.Unmarshal([]byte(questionJSON), &q); err != nil {
return questionJSON
}
return strings.TrimSpace(q.Text)
}
// gitSafe returns git arguments that prepend "-c safe.directory=*" so that
// commands succeed regardless of the repository owner. This is needed when
// claudomator operates on project directories owned by a different OS user.
func gitSafe(args ...string) []string {
return append([]string{"-c", "safe.directory=*"}, args...)
}
// sandboxCloneSource returns the URL to clone the sandbox from. It prefers a
// remote named "local" (a local bare repo that accepts pushes cleanly), then
// falls back to "origin", then to the working copy path itself.
func sandboxCloneSource(projectDir string) string {
for _, remote := range []string{"local", "origin"} {
out, err := exec.Command("git", gitSafe("-C", projectDir, "remote", "get-url", remote)...).Output()
if err == nil && len(strings.TrimSpace(string(out))) > 0 {
return strings.TrimSpace(string(out))
}
}
return projectDir
}
// setupSandbox prepares a temporary git clone of projectDir.
// If projectDir is not a git repo it is initialised with an initial commit first.
func setupSandbox(projectDir string) (string, error) {
// Ensure projectDir is a git repo; initialise if not.
if err := exec.Command("git", gitSafe("-C", projectDir, "rev-parse", "--git-dir")...).Run(); err != nil {
cmds := [][]string{
gitSafe("-C", projectDir, "init"),
gitSafe("-C", projectDir, "add", "-A"),
gitSafe("-C", projectDir, "commit", "--allow-empty", "-m", "chore: initial commit"),
}
for _, args := range cmds {
if out, err := exec.Command("git", args...).CombinedOutput(); err != nil { //nolint:gosec
return "", fmt.Errorf("git init %s: %w\n%s", projectDir, err, out)
}
}
}
src := sandboxCloneSource(projectDir)
tempDir, err := os.MkdirTemp("", "claudomator-sandbox-*")
if err != nil {
return "", fmt.Errorf("creating sandbox dir: %w", err)
}
// git clone requires the target to not exist; remove the placeholder first.
if err := os.Remove(tempDir); err != nil {
return "", fmt.Errorf("removing temp dir placeholder: %w", err)
}
out, err := exec.Command("git", gitSafe("clone", "--no-hardlinks", src, tempDir)...).CombinedOutput()
if err != nil {
return "", fmt.Errorf("git clone: %w\n%s", err, out)
}
return tempDir, nil
}
// teardownSandbox verifies the sandbox is clean and pushes new commits to the
// canonical bare repo. If the push is rejected because another task pushed
// concurrently, it fetches and rebases then retries once.
//
// The working copy (projectDir) is NOT updated automatically — it is the
// developer's workspace and is pulled manually. This avoids permission errors
// from mixed-owner .git/objects directories.
func teardownSandbox(projectDir, sandboxDir string, logger *slog.Logger) error {
// Fail if agent left uncommitted changes.
out, err := exec.Command("git", "-C", sandboxDir, "status", "--porcelain").Output()
if err != nil {
return fmt.Errorf("git status: %w", err)
}
if len(strings.TrimSpace(string(out))) > 0 {
return fmt.Errorf("uncommitted changes in sandbox (agent must commit all work):\n%s", out)
}
// Check whether there are any new commits to push.
ahead, err := exec.Command("git", "-C", sandboxDir, "rev-list", "--count", "origin/HEAD..HEAD").Output()
if err != nil {
logger.Warn("could not determine commits ahead of origin; proceeding", "err", err)
}
if strings.TrimSpace(string(ahead)) == "0" {
os.RemoveAll(sandboxDir)
return nil
}
// Push from sandbox → bare repo (sandbox's origin is the bare repo).
if out, err := exec.Command("git", "-C", sandboxDir, "push", "origin", "HEAD").CombinedOutput(); err != nil {
// If rejected due to concurrent push, fetch+rebase and retry once.
if strings.Contains(string(out), "fetch first") || strings.Contains(string(out), "non-fast-forward") {
logger.Info("push rejected (concurrent task); rebasing and retrying", "sandbox", sandboxDir)
if out2, err2 := exec.Command("git", "-C", sandboxDir, "pull", "--rebase", "origin", "master").CombinedOutput(); err2 != nil {
return fmt.Errorf("git rebase before retry push: %w\n%s", err2, out2)
}
if out3, err3 := exec.Command("git", "-C", sandboxDir, "push", "origin", "HEAD").CombinedOutput(); err3 != nil {
return fmt.Errorf("git push to origin (after rebase): %w\n%s", err3, out3)
}
} else {
return fmt.Errorf("git push to origin: %w\n%s", err, out)
}
}
logger.Info("sandbox pushed to bare repo", "sandbox", sandboxDir)
os.RemoveAll(sandboxDir)
return nil
}
// execOnce runs the claude subprocess once, streaming output to e's log paths.
func (r *ClaudeRunner) execOnce(ctx context.Context, args []string, workingDir string, e *storage.Execution) error {
cmd := exec.CommandContext(ctx, r.binaryPath(), args...)
cmd.Env = append(os.Environ(),
"CLAUDOMATOR_API_URL="+r.APIURL,
"CLAUDOMATOR_TASK_ID="+e.TaskID,
"CLAUDOMATOR_QUESTION_FILE="+filepath.Join(e.ArtifactDir, "question.json"),
"CLAUDOMATOR_SUMMARY_FILE="+filepath.Join(e.ArtifactDir, "summary.txt"),
)
// Put the subprocess in its own process group so we can SIGKILL the entire
// group (MCP servers, bash children, etc.) on cancellation.
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
if workingDir != "" {
cmd.Dir = workingDir
}
stdoutFile, err := os.Create(e.StdoutPath)
if err != nil {
return fmt.Errorf("creating stdout log: %w", err)
}
defer stdoutFile.Close()
stderrFile, err := os.Create(e.StderrPath)
if err != nil {
return fmt.Errorf("creating stderr log: %w", err)
}
defer stderrFile.Close()
// Use os.Pipe for stdout so we own the read-end lifetime.
// cmd.StdoutPipe() would add the read-end to closeAfterWait, causing
// cmd.Wait() to close it before our goroutine finishes reading.
stdoutR, stdoutW, err := os.Pipe()
if err != nil {
return fmt.Errorf("creating stdout pipe: %w", err)
}
cmd.Stdout = stdoutW // *os.File — not added to closeAfterStart/Wait
cmd.Stderr = stderrFile
if err := cmd.Start(); err != nil {
stdoutW.Close()
stdoutR.Close()
return fmt.Errorf("starting claude: %w", err)
}
// Close our write-end immediately; the subprocess holds its own copy.
// The goroutine below gets EOF when the subprocess exits.
stdoutW.Close()
// killDone is closed when cmd.Wait() returns, stopping the pgid-kill goroutine.
//
// Safety: this goroutine cannot block indefinitely. The select has two arms:
// • ctx.Done() — fires if the caller cancels (e.g. timeout, user cancel).
// The goroutine sends SIGKILL and exits immediately.
// • killDone — closed by close(killDone) below, immediately after cmd.Wait()
// returns. This fires when the process exits for any reason (natural exit,
// SIGKILL from the ctx arm, or any other signal). The goroutine exits without
// doing anything.
//
// Therefore: for a task that completes normally with a long-lived (non-cancelled)
// context, the killDone arm fires and the goroutine exits. There is no path where
// this goroutine outlives execOnce().
killDone := make(chan struct{})
go func() {
select {
case <-ctx.Done():
// SIGKILL the entire process group to reap orphan children.
syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)
case <-killDone:
}
}()
// Stream stdout to the log file and parse cost/errors.
// wg ensures costUSD and streamErr are fully written before we read them after cmd.Wait().
var costUSD float64
var streamErr error
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
costUSD, streamErr = parseStream(stdoutR, stdoutFile, r.Logger)
stdoutR.Close()
}()
waitErr := cmd.Wait()
close(killDone) // stop the pgid-kill goroutine
wg.Wait() // drain remaining stdout before reading costUSD/streamErr
e.CostUSD = costUSD
if waitErr != nil {
if exitErr, ok := waitErr.(*exec.ExitError); ok {
e.ExitCode = exitErr.ExitCode()
}
// If the stream captured a rate-limit or quota message, return it
// so callers can distinguish it from a generic exit-status failure.
if isRateLimitError(streamErr) || isQuotaExhausted(streamErr) {
return streamErr
}
return fmt.Errorf("claude exited with error: %w", waitErr)
}
e.ExitCode = 0
if streamErr != nil {
return streamErr
}
return nil
}
func (r *ClaudeRunner) buildArgs(t *task.Task, e *storage.Execution, questionFile string) []string {
// Resume execution: the agent already has context; just deliver the answer.
if e.ResumeSessionID != "" {
args := []string{
"-p", e.ResumeAnswer,
"--resume", e.ResumeSessionID,
"--output-format", "stream-json",
"--verbose",
}
permMode := t.Agent.PermissionMode
if permMode == "" {
permMode = "bypassPermissions"
}
args = append(args, "--permission-mode", permMode)
if t.Agent.Model != "" {
args = append(args, "--model", t.Agent.Model)
}
return args
}
instructions := t.Agent.Instructions
allowedTools := t.Agent.AllowedTools
if !t.Agent.SkipPlanning {
instructions = withPlanningPreamble(instructions)
// Ensure Bash is available so the agent can POST subtasks and ask questions.
hasBash := false
for _, tool := range allowedTools {
if tool == "Bash" {
hasBash = true
break
}
}
if !hasBash {
allowedTools = append(allowedTools, "Bash")
}
}
args := []string{
"-p", instructions,
"--session-id", e.SessionID,
"--output-format", "stream-json",
"--verbose",
}
if t.Agent.Model != "" {
args = append(args, "--model", t.Agent.Model)
}
if t.Agent.MaxBudgetUSD > 0 {
args = append(args, "--max-budget-usd", fmt.Sprintf("%.2f", t.Agent.MaxBudgetUSD))
}
// Default to bypassPermissions — claudomator runs tasks unattended, so
// prompting for write access would always stall execution. Tasks that need
// a more restrictive mode can set permission_mode explicitly.
permMode := t.Agent.PermissionMode
if permMode == "" {
permMode = "bypassPermissions"
}
args = append(args, "--permission-mode", permMode)
if t.Agent.SystemPromptAppend != "" {
args = append(args, "--append-system-prompt", t.Agent.SystemPromptAppend)
}
for _, tool := range allowedTools {
args = append(args, "--allowedTools", tool)
}
for _, tool := range t.Agent.DisallowedTools {
args = append(args, "--disallowedTools", tool)
}
for _, f := range t.Agent.ContextFiles {
args = append(args, "--add-dir", f)
}
args = append(args, t.Agent.AdditionalArgs...)
return args
}
// parseStream reads streaming JSON from claude, writes to w, and returns
// (costUSD, error). error is non-nil if the stream signals task failure:
// - result message has is_error:true
// - a tool_result was denied due to missing permissions
func parseStream(r io.Reader, w io.Writer, logger *slog.Logger) (float64, error) {
tee := io.TeeReader(r, w)
scanner := bufio.NewScanner(tee)
scanner.Buffer(make([]byte, 1024*1024), 1024*1024) // 1MB buffer for large lines
var totalCost float64
var streamErr error
for scanner.Scan() {
line := scanner.Bytes()
var msg map[string]interface{}
if err := json.Unmarshal(line, &msg); err != nil {
continue
}
msgType, _ := msg["type"].(string)
switch msgType {
case "rate_limit_event":
if info, ok := msg["rate_limit_info"].(map[string]interface{}); ok {
status, _ := info["status"].(string)
if status == "rejected" {
streamErr = fmt.Errorf("claude rate limit reached (rejected): %v", msg)
// Immediately break since we can't continue anyway
break
}
}
case "assistant":
if errStr, ok := msg["error"].(string); ok && errStr == "rate_limit" {
streamErr = fmt.Errorf("claude rate limit reached: %v", msg)
}
case "result":
if isErr, _ := msg["is_error"].(bool); isErr {
result, _ := msg["result"].(string)
if result != "" {
streamErr = fmt.Errorf("claude task failed: %s", result)
} else {
streamErr = fmt.Errorf("claude task failed (is_error=true in result)")
}
}
// Prefer total_cost_usd from result message; fall through to legacy check below.
if cost, ok := msg["total_cost_usd"].(float64); ok {
totalCost = cost
}
case "user":
// Detect permission-denial tool_results. These occur when permission_mode
// is not bypassPermissions and claude exits 0 without completing its task.
if err := permissionDenialError(msg); err != nil && streamErr == nil {
streamErr = err
}
}
// Legacy cost field used by older claude versions.
if cost, ok := msg["cost_usd"].(float64); ok {
totalCost = cost
}
}
return totalCost, streamErr
}
// permissionDenialError inspects a "user" stream message for tool_result entries
// that were denied due to missing permissions. Returns an error if found.
func permissionDenialError(msg map[string]interface{}) error {
message, ok := msg["message"].(map[string]interface{})
if !ok {
return nil
}
content, ok := message["content"].([]interface{})
if !ok {
return nil
}
for _, item := range content {
itemMap, ok := item.(map[string]interface{})
if !ok {
continue
}
if itemMap["type"] != "tool_result" {
continue
}
if isErr, _ := itemMap["is_error"].(bool); !isErr {
continue
}
text, _ := itemMap["content"].(string)
if strings.Contains(text, "requested permissions") || strings.Contains(text, "haven't granted") {
return fmt.Errorf("permission denied by host: %s", text)
}
}
return nil
}
|