diff options
| author | Peter Stone <thepeterstone@gmail.com> | 2026-03-08 21:03:50 +0000 |
|---|---|---|
| committer | Peter Stone <thepeterstone@gmail.com> | 2026-03-08 21:03:50 +0000 |
| commit | 632ea5a44731af94b6238f330a3b5440906c8ae7 (patch) | |
| tree | d8c780412598d66b89ef390b5729e379fdfd9d5b /internal/executor | |
| parent | 406247b14985ab57902e8e42898dc8cb8960290d (diff) | |
| parent | 93a4c852bf726b00e8014d385165f847763fa214 (diff) | |
merge: pull latest from master and resolve conflicts
- Resolve conflicts in API server, CLI, and executor.
- Maintain Gemini classification and assignment logic.
- Update UI to use generic agent config and project_dir.
- Fix ProjectDir/WorkingDir inconsistencies in Gemini runner.
- All tests passing after merge.
Diffstat (limited to 'internal/executor')
| -rw-r--r-- | internal/executor/claude.go | 119 | ||||
| -rw-r--r-- | internal/executor/claude_test.go | 6 | ||||
| -rw-r--r-- | internal/executor/executor.go | 136 | ||||
| -rw-r--r-- | internal/executor/executor_test.go | 30 | ||||
| -rw-r--r-- | internal/executor/gemini.go | 8 | ||||
| -rw-r--r-- | internal/executor/gemini_test.go | 10 | ||||
| -rw-r--r-- | internal/executor/ratelimit.go | 14 |
7 files changed, 256 insertions, 67 deletions
diff --git a/internal/executor/claude.go b/internal/executor/claude.go index 86a2ba5..e504369 100644 --- a/internal/executor/claude.go +++ b/internal/executor/claude.go @@ -55,10 +55,18 @@ func (r *ClaudeRunner) binaryPath() string { // Run executes a claude -p invocation, streaming output to log files. // It retries up to 3 times on rate-limit errors using exponential backoff. // If the agent writes a question file and exits, Run returns *BlockedError. +// +// When project_dir is set and this is not a resume execution, Run clones the +// project into a temp sandbox, runs the agent there, then merges committed +// changes back to project_dir. On failure the sandbox is preserved and its +// path is included in the error. func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error { - if t.Agent.WorkingDir != "" { - if _, err := os.Stat(t.Agent.WorkingDir); err != nil { - return fmt.Errorf("working_dir %q: %w", t.Agent.WorkingDir, err) + projectDir := t.Agent.ProjectDir + + // Validate project_dir exists when set. + if projectDir != "" { + if _, err := os.Stat(projectDir); err != nil { + return fmt.Errorf("project_dir %q: %w", projectDir, err) } } @@ -82,6 +90,20 @@ func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Executi e.SessionID = e.ID // reuse execution UUID as session UUID (both are UUIDs) } + // For new (non-resume) executions with a project_dir, clone into a sandbox. + // Resume executions run directly in project_dir to pick up the previous session. + var sandboxDir string + effectiveWorkingDir := projectDir + if projectDir != "" && e.ResumeSessionID == "" { + var err error + sandboxDir, err = setupSandbox(projectDir) + if err != nil { + return fmt.Errorf("setting up sandbox: %w", err) + } + effectiveWorkingDir = sandboxDir + r.Logger.Info("sandbox created", "sandbox", sandboxDir, "project_dir", projectDir) + } + questionFile := filepath.Join(logDir, "question.json") args := r.buildArgs(t, e, questionFile) @@ -95,9 +117,12 @@ func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Executi ) } attempt++ - return r.execOnce(ctx, args, t.Agent.WorkingDir, e) + return r.execOnce(ctx, args, effectiveWorkingDir, e) }) if err != nil { + if sandboxDir != "" { + return fmt.Errorf("%w (sandbox preserved at %s)", err, sandboxDir) + } return err } @@ -105,8 +130,89 @@ func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Executi data, readErr := os.ReadFile(questionFile) if readErr == nil { os.Remove(questionFile) // consumed + // Preserve sandbox on BLOCKED — agent may have partial work. return &BlockedError{QuestionJSON: strings.TrimSpace(string(data)), SessionID: e.SessionID} } + + // Merge sandbox back to project_dir and clean up. + if sandboxDir != "" { + if mergeErr := teardownSandbox(projectDir, sandboxDir, r.Logger); mergeErr != nil { + return fmt.Errorf("sandbox teardown: %w (sandbox preserved at %s)", mergeErr, sandboxDir) + } + } + return nil +} + +// setupSandbox prepares a temporary git clone of projectDir. +// If projectDir is not a git repo it is initialised with an initial commit first. +func setupSandbox(projectDir string) (string, error) { + // Ensure projectDir is a git repo; initialise if not. + check := exec.Command("git", "-C", projectDir, "rev-parse", "--git-dir") + if err := check.Run(); err != nil { + // Not a git repo — init and commit everything. + cmds := [][]string{ + {"git", "-C", projectDir, "init"}, + {"git", "-C", projectDir, "add", "-A"}, + {"git", "-C", projectDir, "commit", "--allow-empty", "-m", "chore: initial commit"}, + } + for _, args := range cmds { + if out, err := exec.Command(args[0], args[1:]...).CombinedOutput(); err != nil { //nolint:gosec + return "", fmt.Errorf("git init %s: %w\n%s", projectDir, err, out) + } + } + } + + tempDir, err := os.MkdirTemp("", "claudomator-sandbox-*") + if err != nil { + return "", fmt.Errorf("creating sandbox dir: %w", err) + } + + // Clone into the pre-created dir (git clone requires the target to not exist, + // so remove it first and let git recreate it). + if err := os.Remove(tempDir); err != nil { + return "", fmt.Errorf("removing temp dir placeholder: %w", err) + } + out, err := exec.Command("git", "clone", "--local", projectDir, tempDir).CombinedOutput() + if err != nil { + return "", fmt.Errorf("git clone: %w\n%s", err, out) + } + return tempDir, nil +} + +// teardownSandbox verifies the sandbox is clean, merges commits back to +// projectDir via fast-forward, then removes the sandbox. +func teardownSandbox(projectDir, sandboxDir string, logger *slog.Logger) error { + // Fail if agent left uncommitted changes. + out, err := exec.Command("git", "-C", sandboxDir, "status", "--porcelain").Output() + if err != nil { + return fmt.Errorf("git status: %w", err) + } + if len(strings.TrimSpace(string(out))) > 0 { + return fmt.Errorf("uncommitted changes in sandbox (agent must commit all work):\n%s", out) + } + + // Check whether there are any new commits to merge. + ahead, err := exec.Command("git", "-C", sandboxDir, "rev-list", "--count", "origin/HEAD..HEAD").Output() + if err != nil { + // No origin/HEAD (e.g. fresh init with no prior commits) — proceed anyway. + logger.Warn("could not determine commits ahead of origin; proceeding with merge", "err", err) + } + if strings.TrimSpace(string(ahead)) == "0" { + // Nothing to merge — clean up and return. + os.RemoveAll(sandboxDir) + return nil + } + + // Fetch new commits from sandbox into project_dir and fast-forward merge. + if out, err := exec.Command("git", "-C", projectDir, "fetch", sandboxDir, "HEAD").CombinedOutput(); err != nil { + return fmt.Errorf("git fetch from sandbox: %w\n%s", err, out) + } + if out, err := exec.Command("git", "-C", projectDir, "merge", "--ff-only", "FETCH_HEAD").CombinedOutput(); err != nil { + return fmt.Errorf("git merge --ff-only FETCH_HEAD: %w\n%s", err, out) + } + + logger.Info("sandbox merged and cleaned up", "sandbox", sandboxDir, "project_dir", projectDir) + os.RemoveAll(sandboxDir) return nil } @@ -189,6 +295,11 @@ func (r *ClaudeRunner) execOnce(ctx context.Context, args []string, workingDir s if exitErr, ok := waitErr.(*exec.ExitError); ok { e.ExitCode = exitErr.ExitCode() } + // If the stream captured a rate-limit or quota message, return it + // so callers can distinguish it from a generic exit-status failure. + if isRateLimitError(streamErr) || isQuotaExhausted(streamErr) { + return streamErr + } return fmt.Errorf("claude exited with error: %w", waitErr) } diff --git a/internal/executor/claude_test.go b/internal/executor/claude_test.go index dba7470..b5380f4 100644 --- a/internal/executor/claude_test.go +++ b/internal/executor/claude_test.go @@ -233,7 +233,7 @@ func TestClaudeRunner_Run_InaccessibleWorkingDir_ReturnsError(t *testing.T) { tk := &task.Task{ Agent: task.AgentConfig{ Type: "claude", - WorkingDir: "/nonexistent/path/does/not/exist", + ProjectDir: "/nonexistent/path/does/not/exist", SkipPlanning: true, }, } @@ -244,8 +244,8 @@ func TestClaudeRunner_Run_InaccessibleWorkingDir_ReturnsError(t *testing.T) { if err == nil { t.Fatal("expected error for inaccessible working_dir, got nil") } - if !strings.Contains(err.Error(), "working_dir") { - t.Errorf("expected 'working_dir' in error, got: %v", err) + if !strings.Contains(err.Error(), "project_dir") { + t.Errorf("expected 'project_dir' in error, got: %v", err) } } diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 6bd1c68..d1c8e72 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -26,12 +26,20 @@ type Runner interface { Run(ctx context.Context, t *task.Task, exec *storage.Execution) error } +// workItem is an entry in the pool's internal work queue. +type workItem struct { + ctx context.Context + task *task.Task + exec *storage.Execution // non-nil for resume submissions +} + // Pool manages a bounded set of concurrent task workers. type Pool struct { - maxConcurrent int - runners map[string]Runner - store *storage.DB - logger *slog.Logger + maxConcurrent int + runners map[string]Runner + store *storage.DB + logger *slog.Logger + depPollInterval time.Duration // how often waitForDependencies polls; defaults to 5s mu sync.Mutex active int @@ -39,6 +47,8 @@ type Pool struct { rateLimited map[string]time.Time // agentType -> until cancels map[string]context.CancelFunc // taskID → cancel resultCh chan *Result + workCh chan workItem // internal bounded queue; Submit enqueues here + doneCh chan struct{} // signals when a worker slot is freed Questions *QuestionRegistry Classifier *Classifier } @@ -54,33 +64,57 @@ func NewPool(maxConcurrent int, runners map[string]Runner, store *storage.DB, lo if maxConcurrent < 1 { maxConcurrent = 1 } - return &Pool{ - maxConcurrent: maxConcurrent, - runners: runners, - store: store, - logger: logger, - activePerAgent: make(map[string]int), - rateLimited: make(map[string]time.Time), - cancels: make(map[string]context.CancelFunc), - resultCh: make(chan *Result, maxConcurrent*2), - Questions: NewQuestionRegistry(), + p := &Pool{ + maxConcurrent: maxConcurrent, + runners: runners, + store: store, + logger: logger, + depPollInterval: 5 * time.Second, + activePerAgent: make(map[string]int), + rateLimited: make(map[string]time.Time), + cancels: make(map[string]context.CancelFunc), + resultCh: make(chan *Result, maxConcurrent*2), + workCh: make(chan workItem, maxConcurrent*10+100), + doneCh: make(chan struct{}, maxConcurrent), + Questions: NewQuestionRegistry(), } + go p.dispatch() + return p } -// Submit dispatches a task for execution. Blocks if pool is at capacity. -func (p *Pool) Submit(ctx context.Context, t *task.Task) error { - p.mu.Lock() - if p.active >= p.maxConcurrent { - active := p.active - max := p.maxConcurrent - p.mu.Unlock() - return fmt.Errorf("executor pool at capacity (%d/%d)", active, max) +// dispatch is a long-running goroutine that reads from the internal work queue +// and launches goroutines as soon as a pool slot is available. This prevents +// tasks from being rejected when the pool is temporarily at capacity. +func (p *Pool) dispatch() { + for item := range p.workCh { + for { + p.mu.Lock() + if p.active < p.maxConcurrent { + p.active++ + p.mu.Unlock() + if item.exec != nil { + go p.executeResume(item.ctx, item.task, item.exec) + } else { + go p.execute(item.ctx, item.task) + } + break + } + p.mu.Unlock() + <-p.doneCh // wait for a worker to finish + } } - p.active++ - p.mu.Unlock() +} - go p.execute(ctx, t) - return nil +// Submit enqueues a task for execution. Returns an error only if the internal +// work queue is full. When the pool is at capacity the task is buffered and +// dispatched as soon as a slot becomes available. +func (p *Pool) Submit(ctx context.Context, t *task.Task) error { + select { + case p.workCh <- workItem{ctx: ctx, task: t}: + return nil + default: + return fmt.Errorf("executor work queue full (capacity %d)", cap(p.workCh)) + } } // Results returns the channel for reading execution results. @@ -104,18 +138,18 @@ func (p *Pool) Cancel(taskID string) bool { // SubmitResume re-queues a blocked task using the provided resume execution. // The execution must have ResumeSessionID and ResumeAnswer set. func (p *Pool) SubmitResume(ctx context.Context, t *task.Task, exec *storage.Execution) error { - p.mu.Lock() - if p.active >= p.maxConcurrent { - active := p.active - max := p.maxConcurrent - p.mu.Unlock() - return fmt.Errorf("executor pool at capacity (%d/%d)", active, max) + if t.State != task.StateBlocked && t.State != task.StateTimedOut { + return fmt.Errorf("task %s must be in BLOCKED or TIMED_OUT state to resume (current: %s)", t.ID, t.State) + } + if exec.ResumeSessionID == "" { + return fmt.Errorf("resume execution for task %s must have a ResumeSessionID", t.ID) + } + select { + case p.workCh <- workItem{ctx: ctx, task: t, exec: exec}: + return nil + default: + return fmt.Errorf("executor work queue full (capacity %d)", cap(p.workCh)) } - p.active++ - p.mu.Unlock() - - go p.executeResume(ctx, t, exec) - return nil } func (p *Pool) getRunner(t *task.Task) (Runner, error) { @@ -145,6 +179,10 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex p.active-- p.activePerAgent[agentType]-- p.mu.Unlock() + select { + case p.doneCh <- struct{}{}: + default: + } }() runner, err := p.getRunner(t) @@ -178,7 +216,15 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex } else { ctx, cancel = context.WithCancel(ctx) } - defer cancel() + p.mu.Lock() + p.cancels[t.ID] = cancel + p.mu.Unlock() + defer func() { + cancel() + p.mu.Lock() + delete(p.cancels, t.ID) + p.mu.Unlock() + }() err = runner.Run(ctx, t, exec) exec.EndTime = time.Now().UTC() @@ -207,6 +253,10 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex exec.Status = "CANCELLED" exec.ErrorMsg = "execution cancelled" p.store.UpdateTaskState(t.ID, task.StateCancelled) + } else if isQuotaExhausted(err) { + exec.Status = "BUDGET_EXCEEDED" + exec.ErrorMsg = err.Error() + p.store.UpdateTaskState(t.ID, task.StateBudgetExceeded) } else { exec.Status = "FAILED" exec.ErrorMsg = err.Error() @@ -276,6 +326,10 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { p.active-- p.activePerAgent[agentType]-- p.mu.Unlock() + select { + case p.doneCh <- struct{}{}: + default: + } }() runner, err := p.getRunner(t) @@ -390,6 +444,10 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { exec.Status = "CANCELLED" exec.ErrorMsg = "execution cancelled" p.store.UpdateTaskState(t.ID, task.StateCancelled) + } else if isQuotaExhausted(err) { + exec.Status = "BUDGET_EXCEEDED" + exec.ErrorMsg = err.Error() + p.store.UpdateTaskState(t.ID, task.StateBudgetExceeded) } else { exec.Status = "FAILED" exec.ErrorMsg = err.Error() @@ -444,7 +502,7 @@ func (p *Pool) waitForDependencies(ctx context.Context, t *task.Task) error { select { case <-ctx.Done(): return ctx.Err() - case <-time.After(5 * time.Second): + case <-time.After(p.depPollInterval): } } } diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go index 9ad0617..028e5cf 100644 --- a/internal/executor/executor_test.go +++ b/internal/executor/executor_test.go @@ -224,27 +224,35 @@ func TestPool_Cancel_UnknownTask_ReturnsFalse(t *testing.T) { } } -func TestPool_AtCapacity(t *testing.T) { +// TestPool_QueuedWhenAtCapacity verifies that Submit enqueues a task rather than +// returning an error when the pool is at capacity. Both tasks should eventually complete. +func TestPool_QueuedWhenAtCapacity(t *testing.T) { store := testStore(t) - runner := &mockRunner{delay: time.Second} + runner := &mockRunner{delay: 100 * time.Millisecond} runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(1, runners, store, logger) - tk1 := makeTask("cap-1") + tk1 := makeTask("queue-1") store.CreateTask(tk1) - pool.Submit(context.Background(), tk1) + if err := pool.Submit(context.Background(), tk1); err != nil { + t.Fatalf("first submit: %v", err) + } - // Pool is at capacity, second submit should fail. - time.Sleep(10 * time.Millisecond) // let goroutine start - tk2 := makeTask("cap-2") + // Second submit must succeed (queued) even though pool slot is taken. + tk2 := makeTask("queue-2") store.CreateTask(tk2) - err := pool.Submit(context.Background(), tk2) - if err == nil { - t.Fatal("expected capacity error") + if err := pool.Submit(context.Background(), tk2); err != nil { + t.Fatalf("second submit: %v — expected task to be queued, not rejected", err) } - <-pool.Results() // drain + // Both tasks must complete. + for i := 0; i < 2; i++ { + r := <-pool.Results() + if r.Err != nil { + t.Errorf("task %s error: %v", r.TaskID, r.Err) + } + } } // logPatherMockRunner is a mockRunner that also implements LogPather, diff --git a/internal/executor/gemini.go b/internal/executor/gemini.go index 3cabed5..956d8b5 100644 --- a/internal/executor/gemini.go +++ b/internal/executor/gemini.go @@ -40,9 +40,9 @@ func (r *GeminiRunner) binaryPath() string { // Run executes a gemini <instructions> invocation, streaming output to log files. func (r *GeminiRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error { - if t.Agent.WorkingDir != "" { - if _, err := os.Stat(t.Agent.WorkingDir); err != nil { - return fmt.Errorf("working_dir %q: %w", t.Agent.WorkingDir, err) + if t.Agent.ProjectDir != "" { + if _, err := os.Stat(t.Agent.ProjectDir); err != nil { + return fmt.Errorf("project_dir %q: %w", t.Agent.ProjectDir, err) } } @@ -68,7 +68,7 @@ func (r *GeminiRunner) Run(ctx context.Context, t *task.Task, e *storage.Executi // Gemini CLI doesn't necessarily have the same rate limiting behavior as Claude, // but we'll use a similar execution pattern. - err := r.execOnce(ctx, args, t.Agent.WorkingDir, e) + err := r.execOnce(ctx, args, t.Agent.ProjectDir, e) if err != nil { return err } diff --git a/internal/executor/gemini_test.go b/internal/executor/gemini_test.go index c7acc3c..42253da 100644 --- a/internal/executor/gemini_test.go +++ b/internal/executor/gemini_test.go @@ -63,7 +63,7 @@ func TestGeminiRunner_BuildArgs_PreamblePrepended(t *testing.T) { } } -func TestGeminiRunner_Run_InaccessibleWorkingDir_ReturnsError(t *testing.T) { +func TestGeminiRunner_Run_InaccessibleProjectDir_ReturnsError(t *testing.T) { r := &GeminiRunner{ BinaryPath: "true", // would succeed if it ran Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), @@ -72,7 +72,7 @@ func TestGeminiRunner_Run_InaccessibleWorkingDir_ReturnsError(t *testing.T) { tk := &task.Task{ Agent: task.AgentConfig{ Type: "gemini", - WorkingDir: "/nonexistent/path/does/not/exist", + ProjectDir: "/nonexistent/path/does/not/exist", SkipPlanning: true, }, } @@ -81,10 +81,10 @@ func TestGeminiRunner_Run_InaccessibleWorkingDir_ReturnsError(t *testing.T) { err := r.Run(context.Background(), tk, exec) if err == nil { - t.Fatal("expected error for inaccessible working_dir, got nil") + t.Fatal("expected error for inaccessible project_dir, got nil") } - if !strings.Contains(err.Error(), "working_dir") { - t.Errorf("expected 'working_dir' in error, got: %v", err) + if !strings.Contains(err.Error(), "project_dir") { + t.Errorf("expected 'project_dir' in error, got: %v", err) } } diff --git a/internal/executor/ratelimit.go b/internal/executor/ratelimit.go index 884da43..deaad18 100644 --- a/internal/executor/ratelimit.go +++ b/internal/executor/ratelimit.go @@ -13,7 +13,8 @@ var retryAfterRe = regexp.MustCompile(`(?i)retry[-_ ]after[:\s]+(\d+)`) const maxBackoffDelay = 5 * time.Minute -// isRateLimitError returns true if err looks like a Claude API rate-limit response. +// isRateLimitError returns true if err looks like a transient Claude API +// rate-limit that is worth retrying (e.g. per-minute/per-request throttle). func isRateLimitError(err error) bool { if err == nil { return false @@ -25,6 +26,17 @@ func isRateLimitError(err error) bool { strings.Contains(msg, "overloaded") } +// isQuotaExhausted returns true if err indicates the 5-hour usage quota is +// fully exhausted. Unlike transient rate limits, these should not be retried. +func isQuotaExhausted(err error) bool { + if err == nil { + return false + } + msg := strings.ToLower(err.Error()) + return strings.Contains(msg, "hit your limit") || + strings.Contains(msg, "you've hit your limit") +} + // parseRetryAfter extracts a Retry-After duration from an error message. // Returns 0 if no retry-after value is found. func parseRetryAfter(msg string) time.Duration { |
