diff options
Diffstat (limited to 'docs/packages/executor.md')
| -rw-r--r-- | docs/packages/executor.md | 447 |
1 files changed, 447 insertions, 0 deletions
diff --git a/docs/packages/executor.md b/docs/packages/executor.md new file mode 100644 index 0000000..29d6a0c --- /dev/null +++ b/docs/packages/executor.md @@ -0,0 +1,447 @@ +# Package: executor + +`internal/executor` — Bounded goroutine pool that drives agent subprocesses. + +--- + +## 1. Overview + +The `executor` package is Claudomator's task-running engine. Its central type, `Pool`, maintains a bounded set of concurrent worker goroutines. Each goroutine picks a task from an internal work queue, selects the appropriate `Runner` implementation (Claude or Gemini), and launches the agent as an OS subprocess. When a subprocess finishes the pool classifies the outcome, updates storage, and emits a `Result` on a channel that the API layer consumes. + +Key responsibilities: + +- **Concurrency control** — never exceed `maxConcurrent` simultaneous workers. +- **Load-balanced agent selection** — prefer the agent type with the fewest active tasks; skip rate-limited agents. +- **Dynamic model selection** — the `Classifier` picks the cheapest model that fits the task's complexity. +- **Dependency gating** — poll storage until all `depends_on` tasks reach COMPLETED before starting a worker. +- **Question / block / resume cycle** — detect when an agent needs user input, park the task as BLOCKED, and re-run it with the user's answer when one arrives. +- **Rate-limit back-pressure** — exponential backoff within a run; per-agent cooldown across runs. +- **Sandbox isolation** — optional `project_dir` support clones the project into a temp directory, runs the agent there, then merges committed changes back. + +--- + +## 2. Pool + +### Struct fields + +```go +type Pool struct { + maxConcurrent int // global concurrency ceiling + runners map[string]Runner // "claude" → ClaudeRunner, "gemini" → GeminiRunner + store Store // storage interface (GetTask, UpdateTaskState, …) + logger *slog.Logger + depPollInterval time.Duration // how often waitForDependencies polls; default 5 s + + mu sync.Mutex + active int // number of goroutines currently in execute/executeResume + activePerAgent map[string]int // per-agent-type active count (used for load balancing) + rateLimited map[string]time.Time // agentType → cooldown deadline + cancels map[string]context.CancelFunc // taskID → cancellation function + + resultCh chan *Result // emits results; buffered at maxConcurrent*2 + workCh chan workItem // internal work queue; buffered at maxConcurrent*10+100 + doneCh chan struct{} // signals when a worker slot is freed; buffered at maxConcurrent + + Questions *QuestionRegistry + Classifier *Classifier +} +``` + +### Concurrency model + +The pool enforces a **global** ceiling (`maxConcurrent`) rather than per-agent-type limits. Within that ceiling, `activePerAgent` tracks how many goroutines are running for each agent type; this counter is the input to the load balancer, not an additional hard limit. + +A single long-lived `dispatch()` goroutine serialises the slot-allocation decision. It blocks on `doneCh` when all `maxConcurrent` slots are taken, then launches a worker goroutine as soon as any slot is freed. This prevents `Submit()` from blocking the caller while still honouring the concurrency ceiling. + +``` +workCh (buffered) ──► dispatch goroutine ──► execute goroutine (×N) + │ │ + └── waits on doneCh ◄─┘ (on exit) +``` + +### Dependency polling loop + +When a task has a non-empty `DependsOn` list, `waitForDependencies()` is called inside the worker goroutine (after agent selection but before subprocess launch). It polls `store.GetTask()` for each dependency at `depPollInterval` intervals (default 5 s). It returns: + +- `nil` — all dependencies reached COMPLETED. +- error — a dependency reached a terminal failure state (FAILED, TIMED_OUT, CANCELLED, BUDGET_EXCEEDED) or the context was cancelled. + +--- + +## 3. Pool.Submit() + +```go +func (p *Pool) Submit(ctx context.Context, t *task.Task) error +func (p *Pool) SubmitResume(ctx context.Context, t *task.Task, exec *storage.Execution) error +``` + +`Submit()` is non-blocking: it sends a `workItem` to `workCh`. If `workCh` is full (capacity `maxConcurrent*10+100`) it returns an error immediately rather than blocking the caller. + +`SubmitResume()` is used to re-queue a BLOCKED or TIMED_OUT task. The provided `exec` must have `ResumeSessionID` and `ResumeAnswer` set; the pool routes it through `executeResume()` instead of `execute()`, skipping agent selection and classification. + +**Goroutine lifecycle** (normal path via `execute()`): + +1. `dispatch` dequeues the `workItem` and waits for a free slot. +2. `dispatch` increments `p.active` under the mutex and launches `go execute(ctx, t)`. +3. `execute` selects the agent, optionally classifies the model, and increments `activePerAgent`. +4. `execute` waits for dependencies, creates the execution record in storage, applies a timeout, and calls `runner.Run()`. +5. On return, `execute` calls `handleRunResult()`, decrements `p.active` and `activePerAgent`, and sends a token to `doneCh`. +6. `dispatch` unblocks and processes the next `workItem`. + +--- + +## 4. Runner Interface + +```go +type Runner interface { + Run(ctx context.Context, t *task.Task, exec *storage.Execution) error +} + +// Optional extension — lets the pool persist log paths before execution starts. +type LogPather interface { + ExecLogDir(execID string) string +} +``` + +`Run()` is responsible for the full lifecycle of a single subprocess invocation, including log file management and cost extraction. It returns: + +- `nil` — task succeeded. +- `*BlockedError` — agent wrote a question file; pool transitions task to BLOCKED. +- any other error — pool classifies as FAILED, TIMED_OUT, CANCELLED, or BUDGET_EXCEEDED based on context and error text. + +The pool selects a runner by looking up `t.Agent.Type` in the `runners` map. If `t.Agent.Type` is empty it falls back to `"claude"`. + +--- + +## 5. ClaudeRunner + +```go +type ClaudeRunner struct { + BinaryPath string // defaults to "claude" + Logger *slog.Logger + LogDir string // base directory for per-execution log directories + APIURL string // injected as CLAUDOMATOR_API_URL env var +} +``` + +### Subprocess invocation + +`Run()` calls `buildArgs()` to construct the `claude` CLI invocation, then passes it through `runWithBackoff()` (up to 3 attempts on rate-limit errors, 5 s base delay, exponential backoff, capped at 5 min). + +**New execution flags:** + +| Flag | Value | +|---|---| +| `-p` | task instructions (optionally prefixed with planning preamble) | +| `--session-id` | execution UUID (doubles as the session UUID for resumability) | +| `--output-format stream-json` | machine-readable JSONL output | +| `--verbose` | include cost/token fields | +| `--model` | from `t.Agent.Model` (if set) | +| `--max-budget-usd` | from `t.Agent.MaxBudgetUSD` (if > 0) | +| `--permission-mode` | `bypassPermissions` by default; overridable per task | +| `--append-system-prompt` | from `t.Agent.SystemPromptAppend` | +| `--allowedTools` | repeated for each entry in `t.Agent.AllowedTools` | +| `--disallowedTools` | repeated for each entry in `t.Agent.DisallowedTools` | +| `--add-dir` | repeated for each entry in `t.Agent.ContextFiles` | + +**Resume execution flags** (when `exec.ResumeSessionID != ""`): + +| Flag | Value | +|---|---| +| `-p` | `exec.ResumeAnswer` (the user's answer) | +| `--resume` | `exec.ResumeSessionID` (original session to continue) | +| `--output-format stream-json` | | +| `--verbose` | | +| `--permission-mode` | same default/override as above | +| `--model` | from `t.Agent.Model` (if set) | + +**Environment variables** injected into every subprocess: + +``` +CLAUDOMATOR_API_URL — base URL for subtask creation and question answers +CLAUDOMATOR_TASK_ID — parent task ID (used in subtask POST bodies) +CLAUDOMATOR_QUESTION_FILE — path where the agent writes a question JSON +``` + +### Stream-JSON output parsing + +`execOnce()` attaches an `os.Pipe` to `cmd.Stdout` and spawns a goroutine that calls `parseStream()`. The goroutine tees the stream to the log file while scanning for JSONL events: + +| Event type | Action | +|---|---| +| `rate_limit_event` with `status: rejected` | sets `streamErr` to a rate-limit error | +| `assistant` with `error: rate_limit` | sets `streamErr` to a rate-limit error | +| `result` with `is_error: true` | sets `streamErr` to a task-failure error | +| `result` with `total_cost_usd` | captures total cost | +| `user` containing denied `tool_result` | sets `streamErr` to a permission-denial error | +| any message with `cost_usd` | updates running cost (legacy field) | + +`costUSD` and `streamErr` are read after `cmd.Wait()` returns and the goroutine drains. + +### Cost and token extraction + +`exec.CostUSD` is set from `total_cost_usd` (preferred) or `cost_usd` (legacy) in the stream. Token counts are not extracted at the runner level; they are available in the raw stdout log. + +### Project sandbox enforcement + +When `t.Agent.ProjectDir` is set on a new (non-resume) execution: + +1. `setupSandbox()` ensures `projectDir` is a git repository (initialising with an empty commit if needed), then `git clone --no-hardlinks` it into a `claudomator-sandbox-*` temp directory. It prefers a remote named `"local"` over `"origin"` as the clone source. +2. The subprocess runs with its working directory set to the sandbox. +3. On success, `teardownSandbox()` verifies the sandbox has no uncommitted changes, counts commits ahead of `origin/HEAD`, and pushes them to the bare repo. If the push is rejected due to a concurrent task, it fetches and rebases then retries once. +4. The sandbox is removed on success. On any failure the sandbox is preserved and its path is included in the error message. + +Resume executions skip sandboxing and run directly in `projectDir` so the agent can pick up its previous session state. + +### Question file mechanism + +The agent preamble instructs the agent to write a JSON file to `$CLAUDOMATOR_QUESTION_FILE` and exit immediately when it needs user input. After `execOnce()` returns successfully (exit code 0), `Run()` attempts to read that file: + +- If the file exists: it is consumed (`os.Remove`), and `Run()` returns `&BlockedError{QuestionJSON: ..., SessionID: e.SessionID}`. +- If the file is absent: `Run()` returns `nil` (success). + +--- + +## 6. GeminiRunner + +```go +type GeminiRunner struct { + BinaryPath string // defaults to "gemini" + Logger *slog.Logger + LogDir string + APIURL string +} +``` + +`GeminiRunner` follows the same pattern as `ClaudeRunner` with these differences: + +| Aspect | ClaudeRunner | GeminiRunner | +|---|---|---| +| Binary | `claude` | `gemini` | +| Flag structure | `claude -p <instructions> --session-id … --output-format stream-json …` | `gemini <instructions> --output-format stream-json …` (instructions are the first positional argument) | +| Session/resume | `--session-id` for new; `--resume` for resume | Session ID stored on `exec` but resume flag handling is a stub (not yet implemented) | +| Rate-limit retries | `runWithBackoff` (up to 3 retries) | Single attempt; no retry loop | +| Sandbox | Full clone + teardown + push | Not implemented; runs directly in `projectDir` | +| Stream parsing | `parseStream()` | `parseStream()` (shared implementation; assumes compatible JSONL format) | +| Question file | Same mechanism | Same mechanism | + +`GeminiRunner` implements `LogPather` identically to `ClaudeRunner`. + +--- + +## 7. Classifier + +```go +type Classifier struct { + GeminiBinaryPath string // defaults to "gemini" +} + +type Classification struct { + AgentType string `json:"agent_type"` + Model string `json:"model"` + Reason string `json:"reason"` +} + +type SystemStatus struct { + ActiveTasks map[string]int // agentType → number of active tasks + RateLimited map[string]bool // agentType → true if currently rate-limited +} +``` + +`Classify()` is called once per task, after load-balanced agent selection but before the subprocess starts. Its sole output is the `Model` field; `AgentType` is already fixed by the load balancer. + +**Decision factors baked into the prompt:** + +- The already-chosen agent type (Claude or Gemini). +- The task `Name` and `Instructions`. +- A hard-coded model tier list: + - Claude: `haiku-4-5` (cheap/fast) → `sonnet-4-6` (default) → `opus-4-6` (hard tasks only) + - Gemini: `gemini-2.5-flash-lite` (cheap) → `gemini-2.5-flash` (default) → `gemini-2.5-pro` (hard tasks only) + +**Implementation**: `Classify()` invokes `gemini --prompt <prompt> --model gemini-2.5-flash-lite --output-format json`. It then strips markdown code fences and "Loaded cached credentials" noise before unmarshalling the JSON response into a `Classification`. If classification fails, the pool logs the error and proceeds with whatever model was already on `t.Agent.Model`. + +--- + +## 8. QuestionRegistry + +```go +type QuestionRegistry struct { + mu sync.Mutex + questions map[string]*PendingQuestion // keyed by toolUseID +} + +type PendingQuestion struct { + TaskID string + ToolUseID string + Input json.RawMessage + AnswerCh chan string // buffered(1); closed when answer delivered +} +``` + +The registry tracks questions that are waiting for a human answer. It is populated via the stream-parsing path (`streamAndParseWithQuestions`) when an agent emits an `AskUserQuestion` tool_use event in the JSONL stream. + +| Method | Description | +|---|---| +| `Register(taskID, toolUseID, input)` | Adds a `PendingQuestion`; returns the buffered answer channel. | +| `Answer(toolUseID, answer)` | Delivers an answer, removes the entry, returns `false` if not found. | +| `Get(toolUseID)` | Returns the pending question or `nil`. | +| `PendingForTask(taskID)` | Returns all pending questions for a task (used by the API to list them). | +| `Remove(toolUseID)` | Removes without answering (used on task cancellation). | + +**How a blocked task is resumed:** + +1. The API receives a `POST /api/tasks/{id}/answer` with the user's answer. +2. It calls `QuestionRegistry.Answer(toolUseID, answer)`, which unblocks any goroutine waiting on `AnswerCh`. +3. Separately (for the file-based path), the API creates a resume `Execution` with `ResumeSessionID` and `ResumeAnswer`, then calls `Pool.SubmitResume()`. +4. The pool runs `executeResume()`, which invokes `runner.Run()` with `--resume <session>` and the answer as `-p`. + +--- + +## 9. Rate Limiter + +Rate limiting is implemented at two levels: + +### Per-invocation retry (within `ClaudeRunner.Run`) + +`runWithBackoff(ctx, maxRetries=3, baseDelay=5s, fn)` calls the subprocess, detects rate-limit errors (`isRateLimitError`), and retries with exponential backoff: + +``` +attempt 0: immediate +attempt 1: 5 s delay (or Retry-After header value if present) +attempt 2: 10 s delay +attempt 3: 20 s delay (capped at 5 min) +``` + +`isRateLimitError` matches: `"rate limit"`, `"too many requests"`, `"429"`, `"overloaded"`. + +Quota-exhausted errors (`isQuotaExhausted`) — strings like `"hit your limit"` or `"status: rejected"` — are **not** retried; they propagate immediately to `handleRunResult`. + +### Per-agent-type pool cooldown + +When `handleRunResult` detects a rate-limit or quota error it records a cooldown deadline in `p.rateLimited[agentType]`: + +- Transient rate limit: 1 minute (or `Retry-After` value from the error message). +- Quota exhausted: 5 hours. + +During agent selection in `execute()`, the load balancer skips agents whose cooldown deadline has not passed yet. If all agents are on cooldown, the agent with the fewest active tasks is chosen anyway (best-effort fallback). + +--- + +## 10. BlockedError + +```go +type BlockedError struct { + QuestionJSON string // raw JSON: {"text": "...", "options": [...]} + SessionID string // claude session to resume once the user answers +} +``` + +`BlockedError` is returned by `Runner.Run()` when the agent wrote a question to `$CLAUDOMATOR_QUESTION_FILE` and exited. It is **not** a fatal error — it is a signal that the task requires human input before it can continue. + +**Pool handling** (`handleRunResult`): + +```go +var blockedErr *BlockedError +if errors.As(err, &blockedErr) { + exec.Status = "BLOCKED" + store.UpdateTaskState(t.ID, task.StateBlocked) + store.UpdateTaskQuestion(t.ID, blockedErr.QuestionJSON) +} +``` + +The pool does **not** emit a failure result; it still sends a `Result` to `resultCh` with `Err` set to the `BlockedError`. The API layer reads `exec.Status == "BLOCKED"` and exposes the question to the caller. Once the user answers, the API calls `Pool.SubmitResume()` and the task transitions back to RUNNING. + +--- + +## 11. Result Struct and handleRunResult + +```go +type Result struct { + TaskID string + Execution *storage.Execution + Err error +} +``` + +`handleRunResult` is shared between `execute()` and `executeResume()`. It inspects the error returned by `Runner.Run()` and maps it to an execution status and task state: + +| Condition | exec.Status | task.State | +|---|---|---| +| `*BlockedError` | `BLOCKED` | `StateBlocked` | +| `ctx.Err() == DeadlineExceeded` | `TIMED_OUT` | `StateTimedOut` | +| `ctx.Err() == Canceled` | `CANCELLED` | `StateCancelled` | +| `isQuotaExhausted(err)` | `BUDGET_EXCEEDED` | `StateBudgetExceeded` | +| any other non-nil error | `FAILED` | `StateFailed` | +| nil, root task, subtasks exist | `BLOCKED` | `StateBlocked` (waits for subtasks) | +| nil, root task, no subtasks | `READY` | `StateReady` | +| nil, subtask | `COMPLETED` | `StateCompleted` | + +After a subtask completes, `maybeUnblockParent()` checks whether all sibling subtasks are also COMPLETED; if so, the parent transitions from BLOCKED to READY. + +--- + +## Diagrams + +### Sequence: Pool.Submit → goroutine → Runner.Run → state update + +```mermaid +sequenceDiagram + participant Caller + participant Pool + participant Dispatch as dispatch goroutine + participant Worker as execute goroutine + participant Classifier + participant Runner + participant Store + + Caller->>Pool: Submit(ctx, task) + Pool->>Dispatch: workCh ← workItem + Dispatch->>Dispatch: wait for free slot (doneCh) + Dispatch->>Worker: go execute(ctx, task) + + Worker->>Pool: snapshot activePerAgent / rateLimited + Pool-->>Worker: SystemStatus + Worker->>Worker: pickAgent(status) → agentType + Worker->>Classifier: Classify(taskName, instructions, status, agentType) + Classifier-->>Worker: Classification{Model} + + Worker->>Store: waitForDependencies (poll every 5s) + Store-->>Worker: all deps COMPLETED + + Worker->>Store: CreateExecution(exec) + Worker->>Store: UpdateTaskState(RUNNING) + + Worker->>Runner: Run(ctx, task, exec) + Note over Runner: subprocess runs,<br/>stream parsed + Runner-->>Worker: error or nil + + Worker->>Pool: handleRunResult(ctx, task, exec, err, agentType) + Pool->>Store: UpdateTaskState(new state) + Pool->>Store: UpdateExecution(exec) + Pool->>Caller: resultCh ← Result{TaskID, Execution, Err} + + Worker->>Dispatch: doneCh ← token (slot freed) +``` + +### Flowchart: Question-handling flow + +```mermaid +flowchart TD + A([Agent subprocess running]) --> B{Need user input?} + B -- No --> C([Task completes normally]) + B -- Yes --> D["Write JSON to\n$CLAUDOMATOR_QUESTION_FILE\n{text, options}"] + D --> E[Agent exits with code 0] + E --> F[Runner reads question file] + F --> G["Return &BlockedError\n{QuestionJSON, SessionID}"] + G --> H[Pool.handleRunResult detects BlockedError] + H --> I[exec.Status = BLOCKED] + H --> J[Store.UpdateTaskState → StateBlocked] + H --> K[Store.UpdateTaskQuestion → QuestionJSON] + K --> L[API exposes question to user] + L --> M[User submits answer via API] + M --> N["API creates Execution:\nResumeSessionID = original SessionID\nResumeAnswer = user answer"] + N --> O[Pool.SubmitResume → workCh] + O --> P[dispatch launches executeResume goroutine] + P --> Q["Runner.Run with\n--resume ResumeSessionID\n-p ResumeAnswer"] + Q --> R([Claude session continues from where it left off]) +``` |
