summaryrefslogtreecommitdiff
path: root/internal/executor
diff options
context:
space:
mode:
Diffstat (limited to 'internal/executor')
-rw-r--r--internal/executor/claude.go73
-rw-r--r--internal/executor/claude_test.go16
-rw-r--r--internal/executor/executor.go96
-rw-r--r--internal/executor/preamble.go19
4 files changed, 181 insertions, 23 deletions
diff --git a/internal/executor/claude.go b/internal/executor/claude.go
index d3f5751..b97f202 100644
--- a/internal/executor/claude.go
+++ b/internal/executor/claude.go
@@ -27,6 +27,15 @@ type ClaudeRunner struct {
APIURL string // base URL of the Claudomator API, passed to subprocesses
}
+// BlockedError is returned by Run when the agent wrote a question file and exited.
+// The pool transitions the task to BLOCKED and stores the question for the user.
+type BlockedError struct {
+ QuestionJSON string // raw JSON from the question file
+ SessionID string // claude session to resume once the user answers
+}
+
+func (e *BlockedError) Error() string { return fmt.Sprintf("task blocked: %s", e.QuestionJSON) }
+
// ExecLogDir returns the log directory for the given execution ID.
// Implements LogPather so the pool can persist paths before execution starts.
func (r *ClaudeRunner) ExecLogDir(execID string) string {
@@ -45,9 +54,8 @@ func (r *ClaudeRunner) binaryPath() string {
// Run executes a claude -p invocation, streaming output to log files.
// It retries up to 3 times on rate-limit errors using exponential backoff.
+// If the agent writes a question file and exits, Run returns *BlockedError.
func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error {
- args := r.buildArgs(t)
-
if t.Claude.WorkingDir != "" {
if _, err := os.Stat(t.Claude.WorkingDir); err != nil {
return fmt.Errorf("working_dir %q: %w", t.Claude.WorkingDir, err)
@@ -55,11 +63,9 @@ func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Executi
}
// Setup log directory once; retries overwrite the log files.
- // Use pre-set paths if the pool already populated them via LogPather;
- // otherwise fall back to computing from LogDir + execID.
logDir := r.ExecLogDir(e.ID)
if logDir == "" {
- logDir = e.ID // fallback: use execID as relative dir (tests without LogDir set)
+ logDir = e.ID // fallback for tests without LogDir set
}
if err := os.MkdirAll(logDir, 0700); err != nil {
return fmt.Errorf("creating log dir: %w", err)
@@ -70,8 +76,17 @@ func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Executi
e.ArtifactDir = logDir
}
+ // Pre-assign session ID so we can resume after a BLOCKED state.
+ // If this is a resume execution the session ID is already set.
+ if e.SessionID == "" {
+ e.SessionID = e.ID // reuse execution UUID as session UUID (both are UUIDs)
+ }
+
+ questionFile := filepath.Join(logDir, "question.json")
+ args := r.buildArgs(t, e, questionFile)
+
attempt := 0
- return runWithBackoff(ctx, 3, 5*time.Second, func() error {
+ err := runWithBackoff(ctx, 3, 5*time.Second, func() error {
if attempt > 0 {
delay := 5 * time.Second * (1 << (attempt - 1))
r.Logger.Warn("rate-limited by Claude API, retrying",
@@ -80,22 +95,34 @@ func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Executi
)
}
attempt++
- return r.execOnce(ctx, t, args, e)
+ return r.execOnce(ctx, args, t.Claude.WorkingDir, e)
})
+ if err != nil {
+ return err
+ }
+
+ // Check whether the agent left a question before exiting.
+ data, readErr := os.ReadFile(questionFile)
+ if readErr == nil {
+ os.Remove(questionFile) // consumed
+ return &BlockedError{QuestionJSON: strings.TrimSpace(string(data)), SessionID: e.SessionID}
+ }
+ return nil
}
// execOnce runs the claude subprocess once, streaming output to e's log paths.
-func (r *ClaudeRunner) execOnce(ctx context.Context, t *task.Task, args []string, e *storage.Execution) error {
+func (r *ClaudeRunner) execOnce(ctx context.Context, args []string, workingDir string, e *storage.Execution) error {
cmd := exec.CommandContext(ctx, r.binaryPath(), args...)
cmd.Env = append(os.Environ(),
"CLAUDOMATOR_API_URL="+r.APIURL,
- "CLAUDOMATOR_TASK_ID="+t.ID,
+ "CLAUDOMATOR_TASK_ID="+e.TaskID,
+ "CLAUDOMATOR_QUESTION_FILE="+filepath.Join(e.ArtifactDir, "question.json"),
)
// Put the subprocess in its own process group so we can SIGKILL the entire
// group (MCP servers, bash children, etc.) on cancellation.
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
- if t.Claude.WorkingDir != "" {
- cmd.Dir = t.Claude.WorkingDir
+ if workingDir != "" {
+ cmd.Dir = workingDir
}
stdoutFile, err := os.Create(e.StdoutPath)
@@ -172,13 +199,32 @@ func (r *ClaudeRunner) execOnce(ctx context.Context, t *task.Task, args []string
return nil
}
-func (r *ClaudeRunner) buildArgs(t *task.Task) []string {
+func (r *ClaudeRunner) buildArgs(t *task.Task, e *storage.Execution, questionFile string) []string {
+ // Resume execution: the agent already has context; just deliver the answer.
+ if e.ResumeSessionID != "" {
+ args := []string{
+ "-p", e.ResumeAnswer,
+ "--resume", e.ResumeSessionID,
+ "--output-format", "stream-json",
+ "--verbose",
+ }
+ permMode := t.Claude.PermissionMode
+ if permMode == "" {
+ permMode = "bypassPermissions"
+ }
+ args = append(args, "--permission-mode", permMode)
+ if t.Claude.Model != "" {
+ args = append(args, "--model", t.Claude.Model)
+ }
+ return args
+ }
+
instructions := t.Claude.Instructions
allowedTools := t.Claude.AllowedTools
if !t.Claude.SkipPlanning {
instructions = withPlanningPreamble(instructions)
- // Ensure Bash is available so the agent can POST subtasks.
+ // Ensure Bash is available so the agent can POST subtasks and ask questions.
hasBash := false
for _, tool := range allowedTools {
if tool == "Bash" {
@@ -193,6 +239,7 @@ func (r *ClaudeRunner) buildArgs(t *task.Task) []string {
args := []string{
"-p", instructions,
+ "--session-id", e.SessionID,
"--output-format", "stream-json",
"--verbose",
}
diff --git a/internal/executor/claude_test.go b/internal/executor/claude_test.go
index edf13b5..056c7e8 100644
--- a/internal/executor/claude_test.go
+++ b/internal/executor/claude_test.go
@@ -21,7 +21,7 @@ func TestClaudeRunner_BuildArgs_BasicTask(t *testing.T) {
},
}
- args := r.buildArgs(tk)
+ args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
argMap := make(map[string]bool)
for _, a := range args {
@@ -51,7 +51,7 @@ func TestClaudeRunner_BuildArgs_FullConfig(t *testing.T) {
},
}
- args := r.buildArgs(tk)
+ args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
// Check key args are present.
argMap := make(map[string]bool)
@@ -86,7 +86,7 @@ func TestClaudeRunner_BuildArgs_DefaultsToBypassPermissions(t *testing.T) {
},
}
- args := r.buildArgs(tk)
+ args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
found := false
for i, a := range args {
@@ -109,7 +109,7 @@ func TestClaudeRunner_BuildArgs_RespectsExplicitPermissionMode(t *testing.T) {
},
}
- args := r.buildArgs(tk)
+ args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
for i, a := range args {
if a == "--permission-mode" && i+1 < len(args) {
@@ -131,7 +131,7 @@ func TestClaudeRunner_BuildArgs_AlwaysIncludesVerbose(t *testing.T) {
},
}
- args := r.buildArgs(tk)
+ args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
found := false
for _, a := range args {
@@ -154,7 +154,7 @@ func TestClaudeRunner_BuildArgs_PreamblePrepended(t *testing.T) {
},
}
- args := r.buildArgs(tk)
+ args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
// The -p value should start with the preamble and end with the original instructions.
if len(args) < 2 || args[0] != "-p" {
@@ -178,7 +178,7 @@ func TestClaudeRunner_BuildArgs_PreambleAddsBash(t *testing.T) {
},
}
- args := r.buildArgs(tk)
+ args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
// Bash should be appended to allowed tools.
foundBash := false
@@ -202,7 +202,7 @@ func TestClaudeRunner_BuildArgs_PreambleBashNotDuplicated(t *testing.T) {
},
}
- args := r.buildArgs(tk)
+ args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
// Count Bash occurrences in --allowedTools values.
bashCount := 0
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index eb23c02..f6932f4 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -2,6 +2,7 @@ package executor
import (
"context"
+ "errors"
"fmt"
"log/slog"
"path/filepath"
@@ -80,6 +81,94 @@ func (p *Pool) Results() <-chan *Result {
return p.resultCh
}
+// SubmitResume re-queues a blocked task using the provided resume execution.
+// The execution must have ResumeSessionID and ResumeAnswer set.
+func (p *Pool) SubmitResume(ctx context.Context, t *task.Task, exec *storage.Execution) error {
+ p.mu.Lock()
+ if p.active >= p.maxConcurrent {
+ active := p.active
+ max := p.maxConcurrent
+ p.mu.Unlock()
+ return fmt.Errorf("executor pool at capacity (%d/%d)", active, max)
+ }
+ p.active++
+ p.mu.Unlock()
+
+ go p.executeResume(ctx, t, exec)
+ return nil
+}
+
+func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Execution) {
+ defer func() {
+ p.mu.Lock()
+ p.active--
+ p.mu.Unlock()
+ }()
+
+ // Pre-populate log paths.
+ if lp, ok := p.runner.(LogPather); ok {
+ if logDir := lp.ExecLogDir(exec.ID); logDir != "" {
+ exec.StdoutPath = filepath.Join(logDir, "stdout.log")
+ exec.StderrPath = filepath.Join(logDir, "stderr.log")
+ exec.ArtifactDir = logDir
+ }
+ }
+ exec.StartTime = time.Now().UTC()
+ exec.Status = "RUNNING"
+
+ if err := p.store.CreateExecution(exec); err != nil {
+ p.logger.Error("failed to create resume execution record", "error", err)
+ }
+ if err := p.store.UpdateTaskState(t.ID, task.StateRunning); err != nil {
+ p.logger.Error("failed to update task state", "error", err)
+ }
+
+ var cancel context.CancelFunc
+ if t.Timeout.Duration > 0 {
+ ctx, cancel = context.WithTimeout(ctx, t.Timeout.Duration)
+ } else {
+ ctx, cancel = context.WithCancel(ctx)
+ }
+ defer cancel()
+
+ err := p.runner.Run(ctx, t, exec)
+ exec.EndTime = time.Now().UTC()
+
+ if err != nil {
+ var blockedErr *BlockedError
+ if errors.As(err, &blockedErr) {
+ exec.Status = "BLOCKED"
+ p.store.UpdateTaskState(t.ID, task.StateBlocked)
+ p.store.UpdateTaskQuestion(t.ID, blockedErr.QuestionJSON)
+ } else if ctx.Err() == context.DeadlineExceeded {
+ exec.Status = "TIMED_OUT"
+ exec.ErrorMsg = "execution timed out"
+ p.store.UpdateTaskState(t.ID, task.StateTimedOut)
+ } else if ctx.Err() == context.Canceled {
+ exec.Status = "CANCELLED"
+ exec.ErrorMsg = "execution cancelled"
+ p.store.UpdateTaskState(t.ID, task.StateCancelled)
+ } else {
+ exec.Status = "FAILED"
+ exec.ErrorMsg = err.Error()
+ p.store.UpdateTaskState(t.ID, task.StateFailed)
+ }
+ } else {
+ if t.ParentTaskID == "" {
+ exec.Status = "READY"
+ p.store.UpdateTaskState(t.ID, task.StateReady)
+ } else {
+ exec.Status = "COMPLETED"
+ p.store.UpdateTaskState(t.ID, task.StateCompleted)
+ }
+ }
+
+ if updateErr := p.store.UpdateExecution(exec); updateErr != nil {
+ p.logger.Error("failed to update resume execution", "error", updateErr)
+ }
+ p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err}
+}
+
// ActiveCount returns the number of currently running tasks.
func (p *Pool) ActiveCount() int {
p.mu.Lock()
@@ -155,7 +244,12 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
exec.EndTime = time.Now().UTC()
if err != nil {
- if ctx.Err() == context.DeadlineExceeded {
+ var blockedErr *BlockedError
+ if errors.As(err, &blockedErr) {
+ exec.Status = "BLOCKED"
+ p.store.UpdateTaskState(t.ID, task.StateBlocked)
+ p.store.UpdateTaskQuestion(t.ID, blockedErr.QuestionJSON)
+ } else if ctx.Err() == context.DeadlineExceeded {
exec.Status = "TIMED_OUT"
exec.ErrorMsg = "execution timed out"
p.store.UpdateTaskState(t.ID, task.StateTimedOut)
diff --git a/internal/executor/preamble.go b/internal/executor/preamble.go
index cd1a2cc..8a2cce3 100644
--- a/internal/executor/preamble.go
+++ b/internal/executor/preamble.go
@@ -1,6 +1,23 @@
package executor
-const planningPreamble = `## Planning Step (do this first)
+const planningPreamble = `## Runtime Environment
+
+You are running as a background agent inside Claudomator. You cannot interact
+with the user directly. However, if you need a decision or clarification:
+
+**To ask the user a question and pause:**
+1. Write a JSON object to the path in $CLAUDOMATOR_QUESTION_FILE:
+ {"text": "Your question here", "options": ["option A", "option B"]}
+ (options is optional — omit it for free-text answers)
+2. Exit immediately. Do not wait. The task will be resumed with the user's answer
+ as the next message in this conversation.
+
+Only ask a question when truly blocked. Prefer making a reasonable decision
+and noting it in your output.
+
+---
+
+## Planning Step (do this first)
Before doing any implementation work: