summaryrefslogtreecommitdiff
path: root/docs/packages/executor.md
diff options
context:
space:
mode:
Diffstat (limited to 'docs/packages/executor.md')
-rw-r--r--docs/packages/executor.md447
1 files changed, 447 insertions, 0 deletions
diff --git a/docs/packages/executor.md b/docs/packages/executor.md
new file mode 100644
index 0000000..29d6a0c
--- /dev/null
+++ b/docs/packages/executor.md
@@ -0,0 +1,447 @@
+# Package: executor
+
+`internal/executor` — Bounded goroutine pool that drives agent subprocesses.
+
+---
+
+## 1. Overview
+
+The `executor` package is Claudomator's task-running engine. Its central type, `Pool`, maintains a bounded set of concurrent worker goroutines. Each goroutine picks a task from an internal work queue, selects the appropriate `Runner` implementation (Claude or Gemini), and launches the agent as an OS subprocess. When a subprocess finishes the pool classifies the outcome, updates storage, and emits a `Result` on a channel that the API layer consumes.
+
+Key responsibilities:
+
+- **Concurrency control** — never exceed `maxConcurrent` simultaneous workers.
+- **Load-balanced agent selection** — prefer the agent type with the fewest active tasks; skip rate-limited agents.
+- **Dynamic model selection** — the `Classifier` picks the cheapest model that fits the task's complexity.
+- **Dependency gating** — poll storage until all `depends_on` tasks reach COMPLETED before starting a worker.
+- **Question / block / resume cycle** — detect when an agent needs user input, park the task as BLOCKED, and re-run it with the user's answer when one arrives.
+- **Rate-limit back-pressure** — exponential backoff within a run; per-agent cooldown across runs.
+- **Sandbox isolation** — optional `project_dir` support clones the project into a temp directory, runs the agent there, then merges committed changes back.
+
+---
+
+## 2. Pool
+
+### Struct fields
+
+```go
+type Pool struct {
+ maxConcurrent int // global concurrency ceiling
+ runners map[string]Runner // "claude" → ClaudeRunner, "gemini" → GeminiRunner
+ store Store // storage interface (GetTask, UpdateTaskState, …)
+ logger *slog.Logger
+ depPollInterval time.Duration // how often waitForDependencies polls; default 5 s
+
+ mu sync.Mutex
+ active int // number of goroutines currently in execute/executeResume
+ activePerAgent map[string]int // per-agent-type active count (used for load balancing)
+ rateLimited map[string]time.Time // agentType → cooldown deadline
+ cancels map[string]context.CancelFunc // taskID → cancellation function
+
+ resultCh chan *Result // emits results; buffered at maxConcurrent*2
+ workCh chan workItem // internal work queue; buffered at maxConcurrent*10+100
+ doneCh chan struct{} // signals when a worker slot is freed; buffered at maxConcurrent
+
+ Questions *QuestionRegistry
+ Classifier *Classifier
+}
+```
+
+### Concurrency model
+
+The pool enforces a **global** ceiling (`maxConcurrent`) rather than per-agent-type limits. Within that ceiling, `activePerAgent` tracks how many goroutines are running for each agent type; this counter is the input to the load balancer, not an additional hard limit.
+
+A single long-lived `dispatch()` goroutine serialises the slot-allocation decision. It blocks on `doneCh` when all `maxConcurrent` slots are taken, then launches a worker goroutine as soon as any slot is freed. This prevents `Submit()` from blocking the caller while still honouring the concurrency ceiling.
+
+```
+workCh (buffered) ──► dispatch goroutine ──► execute goroutine (×N)
+ │ │
+ └── waits on doneCh ◄─┘ (on exit)
+```
+
+### Dependency polling loop
+
+When a task has a non-empty `DependsOn` list, `waitForDependencies()` is called inside the worker goroutine (after agent selection but before subprocess launch). It polls `store.GetTask()` for each dependency at `depPollInterval` intervals (default 5 s). It returns:
+
+- `nil` — all dependencies reached COMPLETED.
+- error — a dependency reached a terminal failure state (FAILED, TIMED_OUT, CANCELLED, BUDGET_EXCEEDED) or the context was cancelled.
+
+---
+
+## 3. Pool.Submit()
+
+```go
+func (p *Pool) Submit(ctx context.Context, t *task.Task) error
+func (p *Pool) SubmitResume(ctx context.Context, t *task.Task, exec *storage.Execution) error
+```
+
+`Submit()` is non-blocking: it sends a `workItem` to `workCh`. If `workCh` is full (capacity `maxConcurrent*10+100`) it returns an error immediately rather than blocking the caller.
+
+`SubmitResume()` is used to re-queue a BLOCKED or TIMED_OUT task. The provided `exec` must have `ResumeSessionID` and `ResumeAnswer` set; the pool routes it through `executeResume()` instead of `execute()`, skipping agent selection and classification.
+
+**Goroutine lifecycle** (normal path via `execute()`):
+
+1. `dispatch` dequeues the `workItem` and waits for a free slot.
+2. `dispatch` increments `p.active` under the mutex and launches `go execute(ctx, t)`.
+3. `execute` selects the agent, optionally classifies the model, and increments `activePerAgent`.
+4. `execute` waits for dependencies, creates the execution record in storage, applies a timeout, and calls `runner.Run()`.
+5. On return, `execute` calls `handleRunResult()`, decrements `p.active` and `activePerAgent`, and sends a token to `doneCh`.
+6. `dispatch` unblocks and processes the next `workItem`.
+
+---
+
+## 4. Runner Interface
+
+```go
+type Runner interface {
+ Run(ctx context.Context, t *task.Task, exec *storage.Execution) error
+}
+
+// Optional extension — lets the pool persist log paths before execution starts.
+type LogPather interface {
+ ExecLogDir(execID string) string
+}
+```
+
+`Run()` is responsible for the full lifecycle of a single subprocess invocation, including log file management and cost extraction. It returns:
+
+- `nil` — task succeeded.
+- `*BlockedError` — agent wrote a question file; pool transitions task to BLOCKED.
+- any other error — pool classifies as FAILED, TIMED_OUT, CANCELLED, or BUDGET_EXCEEDED based on context and error text.
+
+The pool selects a runner by looking up `t.Agent.Type` in the `runners` map. If `t.Agent.Type` is empty it falls back to `"claude"`.
+
+---
+
+## 5. ClaudeRunner
+
+```go
+type ClaudeRunner struct {
+ BinaryPath string // defaults to "claude"
+ Logger *slog.Logger
+ LogDir string // base directory for per-execution log directories
+ APIURL string // injected as CLAUDOMATOR_API_URL env var
+}
+```
+
+### Subprocess invocation
+
+`Run()` calls `buildArgs()` to construct the `claude` CLI invocation, then passes it through `runWithBackoff()` (up to 3 attempts on rate-limit errors, 5 s base delay, exponential backoff, capped at 5 min).
+
+**New execution flags:**
+
+| Flag | Value |
+|---|---|
+| `-p` | task instructions (optionally prefixed with planning preamble) |
+| `--session-id` | execution UUID (doubles as the session UUID for resumability) |
+| `--output-format stream-json` | machine-readable JSONL output |
+| `--verbose` | include cost/token fields |
+| `--model` | from `t.Agent.Model` (if set) |
+| `--max-budget-usd` | from `t.Agent.MaxBudgetUSD` (if > 0) |
+| `--permission-mode` | `bypassPermissions` by default; overridable per task |
+| `--append-system-prompt` | from `t.Agent.SystemPromptAppend` |
+| `--allowedTools` | repeated for each entry in `t.Agent.AllowedTools` |
+| `--disallowedTools` | repeated for each entry in `t.Agent.DisallowedTools` |
+| `--add-dir` | repeated for each entry in `t.Agent.ContextFiles` |
+
+**Resume execution flags** (when `exec.ResumeSessionID != ""`):
+
+| Flag | Value |
+|---|---|
+| `-p` | `exec.ResumeAnswer` (the user's answer) |
+| `--resume` | `exec.ResumeSessionID` (original session to continue) |
+| `--output-format stream-json` | |
+| `--verbose` | |
+| `--permission-mode` | same default/override as above |
+| `--model` | from `t.Agent.Model` (if set) |
+
+**Environment variables** injected into every subprocess:
+
+```
+CLAUDOMATOR_API_URL — base URL for subtask creation and question answers
+CLAUDOMATOR_TASK_ID — parent task ID (used in subtask POST bodies)
+CLAUDOMATOR_QUESTION_FILE — path where the agent writes a question JSON
+```
+
+### Stream-JSON output parsing
+
+`execOnce()` attaches an `os.Pipe` to `cmd.Stdout` and spawns a goroutine that calls `parseStream()`. The goroutine tees the stream to the log file while scanning for JSONL events:
+
+| Event type | Action |
+|---|---|
+| `rate_limit_event` with `status: rejected` | sets `streamErr` to a rate-limit error |
+| `assistant` with `error: rate_limit` | sets `streamErr` to a rate-limit error |
+| `result` with `is_error: true` | sets `streamErr` to a task-failure error |
+| `result` with `total_cost_usd` | captures total cost |
+| `user` containing denied `tool_result` | sets `streamErr` to a permission-denial error |
+| any message with `cost_usd` | updates running cost (legacy field) |
+
+`costUSD` and `streamErr` are read after `cmd.Wait()` returns and the goroutine drains.
+
+### Cost and token extraction
+
+`exec.CostUSD` is set from `total_cost_usd` (preferred) or `cost_usd` (legacy) in the stream. Token counts are not extracted at the runner level; they are available in the raw stdout log.
+
+### Project sandbox enforcement
+
+When `t.Agent.ProjectDir` is set on a new (non-resume) execution:
+
+1. `setupSandbox()` ensures `projectDir` is a git repository (initialising with an empty commit if needed), then `git clone --no-hardlinks` it into a `claudomator-sandbox-*` temp directory. It prefers a remote named `"local"` over `"origin"` as the clone source.
+2. The subprocess runs with its working directory set to the sandbox.
+3. On success, `teardownSandbox()` verifies the sandbox has no uncommitted changes, counts commits ahead of `origin/HEAD`, and pushes them to the bare repo. If the push is rejected due to a concurrent task, it fetches and rebases then retries once.
+4. The sandbox is removed on success. On any failure the sandbox is preserved and its path is included in the error message.
+
+Resume executions skip sandboxing and run directly in `projectDir` so the agent can pick up its previous session state.
+
+### Question file mechanism
+
+The agent preamble instructs the agent to write a JSON file to `$CLAUDOMATOR_QUESTION_FILE` and exit immediately when it needs user input. After `execOnce()` returns successfully (exit code 0), `Run()` attempts to read that file:
+
+- If the file exists: it is consumed (`os.Remove`), and `Run()` returns `&BlockedError{QuestionJSON: ..., SessionID: e.SessionID}`.
+- If the file is absent: `Run()` returns `nil` (success).
+
+---
+
+## 6. GeminiRunner
+
+```go
+type GeminiRunner struct {
+ BinaryPath string // defaults to "gemini"
+ Logger *slog.Logger
+ LogDir string
+ APIURL string
+}
+```
+
+`GeminiRunner` follows the same pattern as `ClaudeRunner` with these differences:
+
+| Aspect | ClaudeRunner | GeminiRunner |
+|---|---|---|
+| Binary | `claude` | `gemini` |
+| Flag structure | `claude -p <instructions> --session-id … --output-format stream-json …` | `gemini <instructions> --output-format stream-json …` (instructions are the first positional argument) |
+| Session/resume | `--session-id` for new; `--resume` for resume | Session ID stored on `exec` but resume flag handling is a stub (not yet implemented) |
+| Rate-limit retries | `runWithBackoff` (up to 3 retries) | Single attempt; no retry loop |
+| Sandbox | Full clone + teardown + push | Not implemented; runs directly in `projectDir` |
+| Stream parsing | `parseStream()` | `parseStream()` (shared implementation; assumes compatible JSONL format) |
+| Question file | Same mechanism | Same mechanism |
+
+`GeminiRunner` implements `LogPather` identically to `ClaudeRunner`.
+
+---
+
+## 7. Classifier
+
+```go
+type Classifier struct {
+ GeminiBinaryPath string // defaults to "gemini"
+}
+
+type Classification struct {
+ AgentType string `json:"agent_type"`
+ Model string `json:"model"`
+ Reason string `json:"reason"`
+}
+
+type SystemStatus struct {
+ ActiveTasks map[string]int // agentType → number of active tasks
+ RateLimited map[string]bool // agentType → true if currently rate-limited
+}
+```
+
+`Classify()` is called once per task, after load-balanced agent selection but before the subprocess starts. Its sole output is the `Model` field; `AgentType` is already fixed by the load balancer.
+
+**Decision factors baked into the prompt:**
+
+- The already-chosen agent type (Claude or Gemini).
+- The task `Name` and `Instructions`.
+- A hard-coded model tier list:
+ - Claude: `haiku-4-5` (cheap/fast) → `sonnet-4-6` (default) → `opus-4-6` (hard tasks only)
+ - Gemini: `gemini-2.5-flash-lite` (cheap) → `gemini-2.5-flash` (default) → `gemini-2.5-pro` (hard tasks only)
+
+**Implementation**: `Classify()` invokes `gemini --prompt <prompt> --model gemini-2.5-flash-lite --output-format json`. It then strips markdown code fences and "Loaded cached credentials" noise before unmarshalling the JSON response into a `Classification`. If classification fails, the pool logs the error and proceeds with whatever model was already on `t.Agent.Model`.
+
+---
+
+## 8. QuestionRegistry
+
+```go
+type QuestionRegistry struct {
+ mu sync.Mutex
+ questions map[string]*PendingQuestion // keyed by toolUseID
+}
+
+type PendingQuestion struct {
+ TaskID string
+ ToolUseID string
+ Input json.RawMessage
+ AnswerCh chan string // buffered(1); closed when answer delivered
+}
+```
+
+The registry tracks questions that are waiting for a human answer. It is populated via the stream-parsing path (`streamAndParseWithQuestions`) when an agent emits an `AskUserQuestion` tool_use event in the JSONL stream.
+
+| Method | Description |
+|---|---|
+| `Register(taskID, toolUseID, input)` | Adds a `PendingQuestion`; returns the buffered answer channel. |
+| `Answer(toolUseID, answer)` | Delivers an answer, removes the entry, returns `false` if not found. |
+| `Get(toolUseID)` | Returns the pending question or `nil`. |
+| `PendingForTask(taskID)` | Returns all pending questions for a task (used by the API to list them). |
+| `Remove(toolUseID)` | Removes without answering (used on task cancellation). |
+
+**How a blocked task is resumed:**
+
+1. The API receives a `POST /api/tasks/{id}/answer` with the user's answer.
+2. It calls `QuestionRegistry.Answer(toolUseID, answer)`, which unblocks any goroutine waiting on `AnswerCh`.
+3. Separately (for the file-based path), the API creates a resume `Execution` with `ResumeSessionID` and `ResumeAnswer`, then calls `Pool.SubmitResume()`.
+4. The pool runs `executeResume()`, which invokes `runner.Run()` with `--resume <session>` and the answer as `-p`.
+
+---
+
+## 9. Rate Limiter
+
+Rate limiting is implemented at two levels:
+
+### Per-invocation retry (within `ClaudeRunner.Run`)
+
+`runWithBackoff(ctx, maxRetries=3, baseDelay=5s, fn)` calls the subprocess, detects rate-limit errors (`isRateLimitError`), and retries with exponential backoff:
+
+```
+attempt 0: immediate
+attempt 1: 5 s delay (or Retry-After header value if present)
+attempt 2: 10 s delay
+attempt 3: 20 s delay (capped at 5 min)
+```
+
+`isRateLimitError` matches: `"rate limit"`, `"too many requests"`, `"429"`, `"overloaded"`.
+
+Quota-exhausted errors (`isQuotaExhausted`) — strings like `"hit your limit"` or `"status: rejected"` — are **not** retried; they propagate immediately to `handleRunResult`.
+
+### Per-agent-type pool cooldown
+
+When `handleRunResult` detects a rate-limit or quota error it records a cooldown deadline in `p.rateLimited[agentType]`:
+
+- Transient rate limit: 1 minute (or `Retry-After` value from the error message).
+- Quota exhausted: 5 hours.
+
+During agent selection in `execute()`, the load balancer skips agents whose cooldown deadline has not passed yet. If all agents are on cooldown, the agent with the fewest active tasks is chosen anyway (best-effort fallback).
+
+---
+
+## 10. BlockedError
+
+```go
+type BlockedError struct {
+ QuestionJSON string // raw JSON: {"text": "...", "options": [...]}
+ SessionID string // claude session to resume once the user answers
+}
+```
+
+`BlockedError` is returned by `Runner.Run()` when the agent wrote a question to `$CLAUDOMATOR_QUESTION_FILE` and exited. It is **not** a fatal error — it is a signal that the task requires human input before it can continue.
+
+**Pool handling** (`handleRunResult`):
+
+```go
+var blockedErr *BlockedError
+if errors.As(err, &blockedErr) {
+ exec.Status = "BLOCKED"
+ store.UpdateTaskState(t.ID, task.StateBlocked)
+ store.UpdateTaskQuestion(t.ID, blockedErr.QuestionJSON)
+}
+```
+
+The pool does **not** emit a failure result; it still sends a `Result` to `resultCh` with `Err` set to the `BlockedError`. The API layer reads `exec.Status == "BLOCKED"` and exposes the question to the caller. Once the user answers, the API calls `Pool.SubmitResume()` and the task transitions back to RUNNING.
+
+---
+
+## 11. Result Struct and handleRunResult
+
+```go
+type Result struct {
+ TaskID string
+ Execution *storage.Execution
+ Err error
+}
+```
+
+`handleRunResult` is shared between `execute()` and `executeResume()`. It inspects the error returned by `Runner.Run()` and maps it to an execution status and task state:
+
+| Condition | exec.Status | task.State |
+|---|---|---|
+| `*BlockedError` | `BLOCKED` | `StateBlocked` |
+| `ctx.Err() == DeadlineExceeded` | `TIMED_OUT` | `StateTimedOut` |
+| `ctx.Err() == Canceled` | `CANCELLED` | `StateCancelled` |
+| `isQuotaExhausted(err)` | `BUDGET_EXCEEDED` | `StateBudgetExceeded` |
+| any other non-nil error | `FAILED` | `StateFailed` |
+| nil, root task, subtasks exist | `BLOCKED` | `StateBlocked` (waits for subtasks) |
+| nil, root task, no subtasks | `READY` | `StateReady` |
+| nil, subtask | `COMPLETED` | `StateCompleted` |
+
+After a subtask completes, `maybeUnblockParent()` checks whether all sibling subtasks are also COMPLETED; if so, the parent transitions from BLOCKED to READY.
+
+---
+
+## Diagrams
+
+### Sequence: Pool.Submit → goroutine → Runner.Run → state update
+
+```mermaid
+sequenceDiagram
+ participant Caller
+ participant Pool
+ participant Dispatch as dispatch goroutine
+ participant Worker as execute goroutine
+ participant Classifier
+ participant Runner
+ participant Store
+
+ Caller->>Pool: Submit(ctx, task)
+ Pool->>Dispatch: workCh ← workItem
+ Dispatch->>Dispatch: wait for free slot (doneCh)
+ Dispatch->>Worker: go execute(ctx, task)
+
+ Worker->>Pool: snapshot activePerAgent / rateLimited
+ Pool-->>Worker: SystemStatus
+ Worker->>Worker: pickAgent(status) → agentType
+ Worker->>Classifier: Classify(taskName, instructions, status, agentType)
+ Classifier-->>Worker: Classification{Model}
+
+ Worker->>Store: waitForDependencies (poll every 5s)
+ Store-->>Worker: all deps COMPLETED
+
+ Worker->>Store: CreateExecution(exec)
+ Worker->>Store: UpdateTaskState(RUNNING)
+
+ Worker->>Runner: Run(ctx, task, exec)
+ Note over Runner: subprocess runs,<br/>stream parsed
+ Runner-->>Worker: error or nil
+
+ Worker->>Pool: handleRunResult(ctx, task, exec, err, agentType)
+ Pool->>Store: UpdateTaskState(new state)
+ Pool->>Store: UpdateExecution(exec)
+ Pool->>Caller: resultCh ← Result{TaskID, Execution, Err}
+
+ Worker->>Dispatch: doneCh ← token (slot freed)
+```
+
+### Flowchart: Question-handling flow
+
+```mermaid
+flowchart TD
+ A([Agent subprocess running]) --> B{Need user input?}
+ B -- No --> C([Task completes normally])
+ B -- Yes --> D["Write JSON to\n$CLAUDOMATOR_QUESTION_FILE\n{text, options}"]
+ D --> E[Agent exits with code 0]
+ E --> F[Runner reads question file]
+ F --> G["Return &BlockedError\n{QuestionJSON, SessionID}"]
+ G --> H[Pool.handleRunResult detects BlockedError]
+ H --> I[exec.Status = BLOCKED]
+ H --> J[Store.UpdateTaskState → StateBlocked]
+ H --> K[Store.UpdateTaskQuestion → QuestionJSON]
+ K --> L[API exposes question to user]
+ L --> M[User submits answer via API]
+ M --> N["API creates Execution:\nResumeSessionID = original SessionID\nResumeAnswer = user answer"]
+ N --> O[Pool.SubmitResume → workCh]
+ O --> P[dispatch launches executeResume goroutine]
+ P --> Q["Runner.Run with\n--resume ResumeSessionID\n-p ResumeAnswer"]
+ Q --> R([Claude session continues from where it left off])
+```