summaryrefslogtreecommitdiff
path: root/internal/executor
diff options
context:
space:
mode:
Diffstat (limited to 'internal/executor')
-rw-r--r--internal/executor/claude.go119
-rw-r--r--internal/executor/claude_test.go6
-rw-r--r--internal/executor/executor.go136
-rw-r--r--internal/executor/executor_test.go30
-rw-r--r--internal/executor/gemini.go8
-rw-r--r--internal/executor/gemini_test.go10
-rw-r--r--internal/executor/ratelimit.go14
7 files changed, 256 insertions, 67 deletions
diff --git a/internal/executor/claude.go b/internal/executor/claude.go
index 86a2ba5..e504369 100644
--- a/internal/executor/claude.go
+++ b/internal/executor/claude.go
@@ -55,10 +55,18 @@ func (r *ClaudeRunner) binaryPath() string {
// Run executes a claude -p invocation, streaming output to log files.
// It retries up to 3 times on rate-limit errors using exponential backoff.
// If the agent writes a question file and exits, Run returns *BlockedError.
+//
+// When project_dir is set and this is not a resume execution, Run clones the
+// project into a temp sandbox, runs the agent there, then merges committed
+// changes back to project_dir. On failure the sandbox is preserved and its
+// path is included in the error.
func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error {
- if t.Agent.WorkingDir != "" {
- if _, err := os.Stat(t.Agent.WorkingDir); err != nil {
- return fmt.Errorf("working_dir %q: %w", t.Agent.WorkingDir, err)
+ projectDir := t.Agent.ProjectDir
+
+ // Validate project_dir exists when set.
+ if projectDir != "" {
+ if _, err := os.Stat(projectDir); err != nil {
+ return fmt.Errorf("project_dir %q: %w", projectDir, err)
}
}
@@ -82,6 +90,20 @@ func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Executi
e.SessionID = e.ID // reuse execution UUID as session UUID (both are UUIDs)
}
+ // For new (non-resume) executions with a project_dir, clone into a sandbox.
+ // Resume executions run directly in project_dir to pick up the previous session.
+ var sandboxDir string
+ effectiveWorkingDir := projectDir
+ if projectDir != "" && e.ResumeSessionID == "" {
+ var err error
+ sandboxDir, err = setupSandbox(projectDir)
+ if err != nil {
+ return fmt.Errorf("setting up sandbox: %w", err)
+ }
+ effectiveWorkingDir = sandboxDir
+ r.Logger.Info("sandbox created", "sandbox", sandboxDir, "project_dir", projectDir)
+ }
+
questionFile := filepath.Join(logDir, "question.json")
args := r.buildArgs(t, e, questionFile)
@@ -95,9 +117,12 @@ func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Executi
)
}
attempt++
- return r.execOnce(ctx, args, t.Agent.WorkingDir, e)
+ return r.execOnce(ctx, args, effectiveWorkingDir, e)
})
if err != nil {
+ if sandboxDir != "" {
+ return fmt.Errorf("%w (sandbox preserved at %s)", err, sandboxDir)
+ }
return err
}
@@ -105,8 +130,89 @@ func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Executi
data, readErr := os.ReadFile(questionFile)
if readErr == nil {
os.Remove(questionFile) // consumed
+ // Preserve sandbox on BLOCKED — agent may have partial work.
return &BlockedError{QuestionJSON: strings.TrimSpace(string(data)), SessionID: e.SessionID}
}
+
+ // Merge sandbox back to project_dir and clean up.
+ if sandboxDir != "" {
+ if mergeErr := teardownSandbox(projectDir, sandboxDir, r.Logger); mergeErr != nil {
+ return fmt.Errorf("sandbox teardown: %w (sandbox preserved at %s)", mergeErr, sandboxDir)
+ }
+ }
+ return nil
+}
+
+// setupSandbox prepares a temporary git clone of projectDir.
+// If projectDir is not a git repo it is initialised with an initial commit first.
+func setupSandbox(projectDir string) (string, error) {
+ // Ensure projectDir is a git repo; initialise if not.
+ check := exec.Command("git", "-C", projectDir, "rev-parse", "--git-dir")
+ if err := check.Run(); err != nil {
+ // Not a git repo — init and commit everything.
+ cmds := [][]string{
+ {"git", "-C", projectDir, "init"},
+ {"git", "-C", projectDir, "add", "-A"},
+ {"git", "-C", projectDir, "commit", "--allow-empty", "-m", "chore: initial commit"},
+ }
+ for _, args := range cmds {
+ if out, err := exec.Command(args[0], args[1:]...).CombinedOutput(); err != nil { //nolint:gosec
+ return "", fmt.Errorf("git init %s: %w\n%s", projectDir, err, out)
+ }
+ }
+ }
+
+ tempDir, err := os.MkdirTemp("", "claudomator-sandbox-*")
+ if err != nil {
+ return "", fmt.Errorf("creating sandbox dir: %w", err)
+ }
+
+ // Clone into the pre-created dir (git clone requires the target to not exist,
+ // so remove it first and let git recreate it).
+ if err := os.Remove(tempDir); err != nil {
+ return "", fmt.Errorf("removing temp dir placeholder: %w", err)
+ }
+ out, err := exec.Command("git", "clone", "--local", projectDir, tempDir).CombinedOutput()
+ if err != nil {
+ return "", fmt.Errorf("git clone: %w\n%s", err, out)
+ }
+ return tempDir, nil
+}
+
+// teardownSandbox verifies the sandbox is clean, merges commits back to
+// projectDir via fast-forward, then removes the sandbox.
+func teardownSandbox(projectDir, sandboxDir string, logger *slog.Logger) error {
+ // Fail if agent left uncommitted changes.
+ out, err := exec.Command("git", "-C", sandboxDir, "status", "--porcelain").Output()
+ if err != nil {
+ return fmt.Errorf("git status: %w", err)
+ }
+ if len(strings.TrimSpace(string(out))) > 0 {
+ return fmt.Errorf("uncommitted changes in sandbox (agent must commit all work):\n%s", out)
+ }
+
+ // Check whether there are any new commits to merge.
+ ahead, err := exec.Command("git", "-C", sandboxDir, "rev-list", "--count", "origin/HEAD..HEAD").Output()
+ if err != nil {
+ // No origin/HEAD (e.g. fresh init with no prior commits) — proceed anyway.
+ logger.Warn("could not determine commits ahead of origin; proceeding with merge", "err", err)
+ }
+ if strings.TrimSpace(string(ahead)) == "0" {
+ // Nothing to merge — clean up and return.
+ os.RemoveAll(sandboxDir)
+ return nil
+ }
+
+ // Fetch new commits from sandbox into project_dir and fast-forward merge.
+ if out, err := exec.Command("git", "-C", projectDir, "fetch", sandboxDir, "HEAD").CombinedOutput(); err != nil {
+ return fmt.Errorf("git fetch from sandbox: %w\n%s", err, out)
+ }
+ if out, err := exec.Command("git", "-C", projectDir, "merge", "--ff-only", "FETCH_HEAD").CombinedOutput(); err != nil {
+ return fmt.Errorf("git merge --ff-only FETCH_HEAD: %w\n%s", err, out)
+ }
+
+ logger.Info("sandbox merged and cleaned up", "sandbox", sandboxDir, "project_dir", projectDir)
+ os.RemoveAll(sandboxDir)
return nil
}
@@ -189,6 +295,11 @@ func (r *ClaudeRunner) execOnce(ctx context.Context, args []string, workingDir s
if exitErr, ok := waitErr.(*exec.ExitError); ok {
e.ExitCode = exitErr.ExitCode()
}
+ // If the stream captured a rate-limit or quota message, return it
+ // so callers can distinguish it from a generic exit-status failure.
+ if isRateLimitError(streamErr) || isQuotaExhausted(streamErr) {
+ return streamErr
+ }
return fmt.Errorf("claude exited with error: %w", waitErr)
}
diff --git a/internal/executor/claude_test.go b/internal/executor/claude_test.go
index dba7470..b5380f4 100644
--- a/internal/executor/claude_test.go
+++ b/internal/executor/claude_test.go
@@ -233,7 +233,7 @@ func TestClaudeRunner_Run_InaccessibleWorkingDir_ReturnsError(t *testing.T) {
tk := &task.Task{
Agent: task.AgentConfig{
Type: "claude",
- WorkingDir: "/nonexistent/path/does/not/exist",
+ ProjectDir: "/nonexistent/path/does/not/exist",
SkipPlanning: true,
},
}
@@ -244,8 +244,8 @@ func TestClaudeRunner_Run_InaccessibleWorkingDir_ReturnsError(t *testing.T) {
if err == nil {
t.Fatal("expected error for inaccessible working_dir, got nil")
}
- if !strings.Contains(err.Error(), "working_dir") {
- t.Errorf("expected 'working_dir' in error, got: %v", err)
+ if !strings.Contains(err.Error(), "project_dir") {
+ t.Errorf("expected 'project_dir' in error, got: %v", err)
}
}
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index 6bd1c68..d1c8e72 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -26,12 +26,20 @@ type Runner interface {
Run(ctx context.Context, t *task.Task, exec *storage.Execution) error
}
+// workItem is an entry in the pool's internal work queue.
+type workItem struct {
+ ctx context.Context
+ task *task.Task
+ exec *storage.Execution // non-nil for resume submissions
+}
+
// Pool manages a bounded set of concurrent task workers.
type Pool struct {
- maxConcurrent int
- runners map[string]Runner
- store *storage.DB
- logger *slog.Logger
+ maxConcurrent int
+ runners map[string]Runner
+ store *storage.DB
+ logger *slog.Logger
+ depPollInterval time.Duration // how often waitForDependencies polls; defaults to 5s
mu sync.Mutex
active int
@@ -39,6 +47,8 @@ type Pool struct {
rateLimited map[string]time.Time // agentType -> until
cancels map[string]context.CancelFunc // taskID → cancel
resultCh chan *Result
+ workCh chan workItem // internal bounded queue; Submit enqueues here
+ doneCh chan struct{} // signals when a worker slot is freed
Questions *QuestionRegistry
Classifier *Classifier
}
@@ -54,33 +64,57 @@ func NewPool(maxConcurrent int, runners map[string]Runner, store *storage.DB, lo
if maxConcurrent < 1 {
maxConcurrent = 1
}
- return &Pool{
- maxConcurrent: maxConcurrent,
- runners: runners,
- store: store,
- logger: logger,
- activePerAgent: make(map[string]int),
- rateLimited: make(map[string]time.Time),
- cancels: make(map[string]context.CancelFunc),
- resultCh: make(chan *Result, maxConcurrent*2),
- Questions: NewQuestionRegistry(),
+ p := &Pool{
+ maxConcurrent: maxConcurrent,
+ runners: runners,
+ store: store,
+ logger: logger,
+ depPollInterval: 5 * time.Second,
+ activePerAgent: make(map[string]int),
+ rateLimited: make(map[string]time.Time),
+ cancels: make(map[string]context.CancelFunc),
+ resultCh: make(chan *Result, maxConcurrent*2),
+ workCh: make(chan workItem, maxConcurrent*10+100),
+ doneCh: make(chan struct{}, maxConcurrent),
+ Questions: NewQuestionRegistry(),
}
+ go p.dispatch()
+ return p
}
-// Submit dispatches a task for execution. Blocks if pool is at capacity.
-func (p *Pool) Submit(ctx context.Context, t *task.Task) error {
- p.mu.Lock()
- if p.active >= p.maxConcurrent {
- active := p.active
- max := p.maxConcurrent
- p.mu.Unlock()
- return fmt.Errorf("executor pool at capacity (%d/%d)", active, max)
+// dispatch is a long-running goroutine that reads from the internal work queue
+// and launches goroutines as soon as a pool slot is available. This prevents
+// tasks from being rejected when the pool is temporarily at capacity.
+func (p *Pool) dispatch() {
+ for item := range p.workCh {
+ for {
+ p.mu.Lock()
+ if p.active < p.maxConcurrent {
+ p.active++
+ p.mu.Unlock()
+ if item.exec != nil {
+ go p.executeResume(item.ctx, item.task, item.exec)
+ } else {
+ go p.execute(item.ctx, item.task)
+ }
+ break
+ }
+ p.mu.Unlock()
+ <-p.doneCh // wait for a worker to finish
+ }
}
- p.active++
- p.mu.Unlock()
+}
- go p.execute(ctx, t)
- return nil
+// Submit enqueues a task for execution. Returns an error only if the internal
+// work queue is full. When the pool is at capacity the task is buffered and
+// dispatched as soon as a slot becomes available.
+func (p *Pool) Submit(ctx context.Context, t *task.Task) error {
+ select {
+ case p.workCh <- workItem{ctx: ctx, task: t}:
+ return nil
+ default:
+ return fmt.Errorf("executor work queue full (capacity %d)", cap(p.workCh))
+ }
}
// Results returns the channel for reading execution results.
@@ -104,18 +138,18 @@ func (p *Pool) Cancel(taskID string) bool {
// SubmitResume re-queues a blocked task using the provided resume execution.
// The execution must have ResumeSessionID and ResumeAnswer set.
func (p *Pool) SubmitResume(ctx context.Context, t *task.Task, exec *storage.Execution) error {
- p.mu.Lock()
- if p.active >= p.maxConcurrent {
- active := p.active
- max := p.maxConcurrent
- p.mu.Unlock()
- return fmt.Errorf("executor pool at capacity (%d/%d)", active, max)
+ if t.State != task.StateBlocked && t.State != task.StateTimedOut {
+ return fmt.Errorf("task %s must be in BLOCKED or TIMED_OUT state to resume (current: %s)", t.ID, t.State)
+ }
+ if exec.ResumeSessionID == "" {
+ return fmt.Errorf("resume execution for task %s must have a ResumeSessionID", t.ID)
+ }
+ select {
+ case p.workCh <- workItem{ctx: ctx, task: t, exec: exec}:
+ return nil
+ default:
+ return fmt.Errorf("executor work queue full (capacity %d)", cap(p.workCh))
}
- p.active++
- p.mu.Unlock()
-
- go p.executeResume(ctx, t, exec)
- return nil
}
func (p *Pool) getRunner(t *task.Task) (Runner, error) {
@@ -145,6 +179,10 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex
p.active--
p.activePerAgent[agentType]--
p.mu.Unlock()
+ select {
+ case p.doneCh <- struct{}{}:
+ default:
+ }
}()
runner, err := p.getRunner(t)
@@ -178,7 +216,15 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex
} else {
ctx, cancel = context.WithCancel(ctx)
}
- defer cancel()
+ p.mu.Lock()
+ p.cancels[t.ID] = cancel
+ p.mu.Unlock()
+ defer func() {
+ cancel()
+ p.mu.Lock()
+ delete(p.cancels, t.ID)
+ p.mu.Unlock()
+ }()
err = runner.Run(ctx, t, exec)
exec.EndTime = time.Now().UTC()
@@ -207,6 +253,10 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex
exec.Status = "CANCELLED"
exec.ErrorMsg = "execution cancelled"
p.store.UpdateTaskState(t.ID, task.StateCancelled)
+ } else if isQuotaExhausted(err) {
+ exec.Status = "BUDGET_EXCEEDED"
+ exec.ErrorMsg = err.Error()
+ p.store.UpdateTaskState(t.ID, task.StateBudgetExceeded)
} else {
exec.Status = "FAILED"
exec.ErrorMsg = err.Error()
@@ -276,6 +326,10 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
p.active--
p.activePerAgent[agentType]--
p.mu.Unlock()
+ select {
+ case p.doneCh <- struct{}{}:
+ default:
+ }
}()
runner, err := p.getRunner(t)
@@ -390,6 +444,10 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
exec.Status = "CANCELLED"
exec.ErrorMsg = "execution cancelled"
p.store.UpdateTaskState(t.ID, task.StateCancelled)
+ } else if isQuotaExhausted(err) {
+ exec.Status = "BUDGET_EXCEEDED"
+ exec.ErrorMsg = err.Error()
+ p.store.UpdateTaskState(t.ID, task.StateBudgetExceeded)
} else {
exec.Status = "FAILED"
exec.ErrorMsg = err.Error()
@@ -444,7 +502,7 @@ func (p *Pool) waitForDependencies(ctx context.Context, t *task.Task) error {
select {
case <-ctx.Done():
return ctx.Err()
- case <-time.After(5 * time.Second):
+ case <-time.After(p.depPollInterval):
}
}
}
diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go
index 9ad0617..028e5cf 100644
--- a/internal/executor/executor_test.go
+++ b/internal/executor/executor_test.go
@@ -224,27 +224,35 @@ func TestPool_Cancel_UnknownTask_ReturnsFalse(t *testing.T) {
}
}
-func TestPool_AtCapacity(t *testing.T) {
+// TestPool_QueuedWhenAtCapacity verifies that Submit enqueues a task rather than
+// returning an error when the pool is at capacity. Both tasks should eventually complete.
+func TestPool_QueuedWhenAtCapacity(t *testing.T) {
store := testStore(t)
- runner := &mockRunner{delay: time.Second}
+ runner := &mockRunner{delay: 100 * time.Millisecond}
runners := map[string]Runner{"claude": runner}
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
pool := NewPool(1, runners, store, logger)
- tk1 := makeTask("cap-1")
+ tk1 := makeTask("queue-1")
store.CreateTask(tk1)
- pool.Submit(context.Background(), tk1)
+ if err := pool.Submit(context.Background(), tk1); err != nil {
+ t.Fatalf("first submit: %v", err)
+ }
- // Pool is at capacity, second submit should fail.
- time.Sleep(10 * time.Millisecond) // let goroutine start
- tk2 := makeTask("cap-2")
+ // Second submit must succeed (queued) even though pool slot is taken.
+ tk2 := makeTask("queue-2")
store.CreateTask(tk2)
- err := pool.Submit(context.Background(), tk2)
- if err == nil {
- t.Fatal("expected capacity error")
+ if err := pool.Submit(context.Background(), tk2); err != nil {
+ t.Fatalf("second submit: %v — expected task to be queued, not rejected", err)
}
- <-pool.Results() // drain
+ // Both tasks must complete.
+ for i := 0; i < 2; i++ {
+ r := <-pool.Results()
+ if r.Err != nil {
+ t.Errorf("task %s error: %v", r.TaskID, r.Err)
+ }
+ }
}
// logPatherMockRunner is a mockRunner that also implements LogPather,
diff --git a/internal/executor/gemini.go b/internal/executor/gemini.go
index 3cabed5..956d8b5 100644
--- a/internal/executor/gemini.go
+++ b/internal/executor/gemini.go
@@ -40,9 +40,9 @@ func (r *GeminiRunner) binaryPath() string {
// Run executes a gemini <instructions> invocation, streaming output to log files.
func (r *GeminiRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error {
- if t.Agent.WorkingDir != "" {
- if _, err := os.Stat(t.Agent.WorkingDir); err != nil {
- return fmt.Errorf("working_dir %q: %w", t.Agent.WorkingDir, err)
+ if t.Agent.ProjectDir != "" {
+ if _, err := os.Stat(t.Agent.ProjectDir); err != nil {
+ return fmt.Errorf("project_dir %q: %w", t.Agent.ProjectDir, err)
}
}
@@ -68,7 +68,7 @@ func (r *GeminiRunner) Run(ctx context.Context, t *task.Task, e *storage.Executi
// Gemini CLI doesn't necessarily have the same rate limiting behavior as Claude,
// but we'll use a similar execution pattern.
- err := r.execOnce(ctx, args, t.Agent.WorkingDir, e)
+ err := r.execOnce(ctx, args, t.Agent.ProjectDir, e)
if err != nil {
return err
}
diff --git a/internal/executor/gemini_test.go b/internal/executor/gemini_test.go
index c7acc3c..42253da 100644
--- a/internal/executor/gemini_test.go
+++ b/internal/executor/gemini_test.go
@@ -63,7 +63,7 @@ func TestGeminiRunner_BuildArgs_PreamblePrepended(t *testing.T) {
}
}
-func TestGeminiRunner_Run_InaccessibleWorkingDir_ReturnsError(t *testing.T) {
+func TestGeminiRunner_Run_InaccessibleProjectDir_ReturnsError(t *testing.T) {
r := &GeminiRunner{
BinaryPath: "true", // would succeed if it ran
Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
@@ -72,7 +72,7 @@ func TestGeminiRunner_Run_InaccessibleWorkingDir_ReturnsError(t *testing.T) {
tk := &task.Task{
Agent: task.AgentConfig{
Type: "gemini",
- WorkingDir: "/nonexistent/path/does/not/exist",
+ ProjectDir: "/nonexistent/path/does/not/exist",
SkipPlanning: true,
},
}
@@ -81,10 +81,10 @@ func TestGeminiRunner_Run_InaccessibleWorkingDir_ReturnsError(t *testing.T) {
err := r.Run(context.Background(), tk, exec)
if err == nil {
- t.Fatal("expected error for inaccessible working_dir, got nil")
+ t.Fatal("expected error for inaccessible project_dir, got nil")
}
- if !strings.Contains(err.Error(), "working_dir") {
- t.Errorf("expected 'working_dir' in error, got: %v", err)
+ if !strings.Contains(err.Error(), "project_dir") {
+ t.Errorf("expected 'project_dir' in error, got: %v", err)
}
}
diff --git a/internal/executor/ratelimit.go b/internal/executor/ratelimit.go
index 884da43..deaad18 100644
--- a/internal/executor/ratelimit.go
+++ b/internal/executor/ratelimit.go
@@ -13,7 +13,8 @@ var retryAfterRe = regexp.MustCompile(`(?i)retry[-_ ]after[:\s]+(\d+)`)
const maxBackoffDelay = 5 * time.Minute
-// isRateLimitError returns true if err looks like a Claude API rate-limit response.
+// isRateLimitError returns true if err looks like a transient Claude API
+// rate-limit that is worth retrying (e.g. per-minute/per-request throttle).
func isRateLimitError(err error) bool {
if err == nil {
return false
@@ -25,6 +26,17 @@ func isRateLimitError(err error) bool {
strings.Contains(msg, "overloaded")
}
+// isQuotaExhausted returns true if err indicates the 5-hour usage quota is
+// fully exhausted. Unlike transient rate limits, these should not be retried.
+func isQuotaExhausted(err error) bool {
+ if err == nil {
+ return false
+ }
+ msg := strings.ToLower(err.Error())
+ return strings.Contains(msg, "hit your limit") ||
+ strings.Contains(msg, "you've hit your limit")
+}
+
// parseRetryAfter extracts a Retry-After duration from an error message.
// Returns 0 if no retry-after value is found.
func parseRetryAfter(msg string) time.Duration {