# Package: executor `internal/executor` — Bounded goroutine pool that drives agent subprocesses. --- ## 1. Overview The `executor` package is Claudomator's task-running engine. Its central type, `Pool`, maintains a bounded set of concurrent worker goroutines. Each goroutine picks a task from an internal work queue, selects the appropriate `Runner` implementation (Claude or Gemini), and launches the agent as an OS subprocess. When a subprocess finishes the pool classifies the outcome, updates storage, and emits a `Result` on a channel that the API layer consumes. Key responsibilities: - **Concurrency control** — never exceed `maxConcurrent` simultaneous workers. - **Load-balanced agent selection** — prefer the agent type with the fewest active tasks; skip rate-limited agents. - **Dynamic model selection** — the `Classifier` picks the cheapest model that fits the task's complexity. - **Dependency gating** — poll storage until all `depends_on` tasks reach COMPLETED before starting a worker. - **Question / block / resume cycle** — detect when an agent needs user input, park the task as BLOCKED, and re-run it with the user's answer when one arrives. - **Rate-limit back-pressure** — exponential backoff within a run; per-agent cooldown across runs. - **Sandbox isolation** — optional `project_dir` support clones the project into a temp directory, runs the agent there, then merges committed changes back. --- ## 2. Pool ### Struct fields ```go type Pool struct { maxConcurrent int // global concurrency ceiling runners map[string]Runner // "claude" → ClaudeRunner, "gemini" → GeminiRunner store Store // storage interface (GetTask, UpdateTaskState, …) logger *slog.Logger depPollInterval time.Duration // how often waitForDependencies polls; default 5 s mu sync.Mutex active int // number of goroutines currently in execute/executeResume activePerAgent map[string]int // per-agent-type active count (used for load balancing) rateLimited map[string]time.Time // agentType → cooldown deadline cancels map[string]context.CancelFunc // taskID → cancellation function resultCh chan *Result // emits results; buffered at maxConcurrent*2 workCh chan workItem // internal work queue; buffered at maxConcurrent*10+100 doneCh chan struct{} // signals when a worker slot is freed; buffered at maxConcurrent Questions *QuestionRegistry Classifier *Classifier } ``` ### Concurrency model The pool enforces a **global** ceiling (`maxConcurrent`) rather than per-agent-type limits. Within that ceiling, `activePerAgent` tracks how many goroutines are running for each agent type; this counter is the input to the load balancer, not an additional hard limit. A single long-lived `dispatch()` goroutine serialises the slot-allocation decision. It blocks on `doneCh` when all `maxConcurrent` slots are taken, then launches a worker goroutine as soon as any slot is freed. This prevents `Submit()` from blocking the caller while still honouring the concurrency ceiling. ``` workCh (buffered) ──► dispatch goroutine ──► execute goroutine (×N) │ │ └── waits on doneCh ◄─┘ (on exit) ``` ### Dependency polling loop When a task has a non-empty `DependsOn` list, `waitForDependencies()` is called inside the worker goroutine (after agent selection but before subprocess launch). It polls `store.GetTask()` for each dependency at `depPollInterval` intervals (default 5 s). It returns: - `nil` — all dependencies reached COMPLETED. - error — a dependency reached a terminal failure state (FAILED, TIMED_OUT, CANCELLED, BUDGET_EXCEEDED) or the context was cancelled. --- ## 3. Pool.Submit() ```go func (p *Pool) Submit(ctx context.Context, t *task.Task) error func (p *Pool) SubmitResume(ctx context.Context, t *task.Task, exec *storage.Execution) error ``` `Submit()` is non-blocking: it sends a `workItem` to `workCh`. If `workCh` is full (capacity `maxConcurrent*10+100`) it returns an error immediately rather than blocking the caller. `SubmitResume()` is used to re-queue a BLOCKED or TIMED_OUT task. The provided `exec` must have `ResumeSessionID` and `ResumeAnswer` set; the pool routes it through `executeResume()` instead of `execute()`, skipping agent selection and classification. **Goroutine lifecycle** (normal path via `execute()`): 1. `dispatch` dequeues the `workItem` and waits for a free slot. 2. `dispatch` increments `p.active` under the mutex and launches `go execute(ctx, t)`. 3. `execute` selects the agent, optionally classifies the model, and increments `activePerAgent`. 4. `execute` waits for dependencies, creates the execution record in storage, applies a timeout, and calls `runner.Run()`. 5. On return, `execute` calls `handleRunResult()`, decrements `p.active` and `activePerAgent`, and sends a token to `doneCh`. 6. `dispatch` unblocks and processes the next `workItem`. --- ## 4. Runner Interface ```go type Runner interface { Run(ctx context.Context, t *task.Task, exec *storage.Execution) error } // Optional extension — lets the pool persist log paths before execution starts. type LogPather interface { ExecLogDir(execID string) string } ``` `Run()` is responsible for the full lifecycle of a single subprocess invocation, including log file management and cost extraction. It returns: - `nil` — task succeeded. - `*BlockedError` — agent wrote a question file; pool transitions task to BLOCKED. - any other error — pool classifies as FAILED, TIMED_OUT, CANCELLED, or BUDGET_EXCEEDED based on context and error text. The pool selects a runner by looking up `t.Agent.Type` in the `runners` map. If `t.Agent.Type` is empty it falls back to `"claude"`. --- ## 5. ClaudeRunner ```go type ClaudeRunner struct { BinaryPath string // defaults to "claude" Logger *slog.Logger LogDir string // base directory for per-execution log directories APIURL string // injected as CLAUDOMATOR_API_URL env var } ``` ### Subprocess invocation `Run()` calls `buildArgs()` to construct the `claude` CLI invocation, then passes it through `runWithBackoff()` (up to 3 attempts on rate-limit errors, 5 s base delay, exponential backoff, capped at 5 min). **New execution flags:** | Flag | Value | |---|---| | `-p` | task instructions (optionally prefixed with planning preamble) | | `--session-id` | execution UUID (doubles as the session UUID for resumability) | | `--output-format stream-json` | machine-readable JSONL output | | `--verbose` | include cost/token fields | | `--model` | from `t.Agent.Model` (if set) | | `--max-budget-usd` | from `t.Agent.MaxBudgetUSD` (if > 0) | | `--permission-mode` | `bypassPermissions` by default; overridable per task | | `--append-system-prompt` | from `t.Agent.SystemPromptAppend` | | `--allowedTools` | repeated for each entry in `t.Agent.AllowedTools` | | `--disallowedTools` | repeated for each entry in `t.Agent.DisallowedTools` | | `--add-dir` | repeated for each entry in `t.Agent.ContextFiles` | **Resume execution flags** (when `exec.ResumeSessionID != ""`): | Flag | Value | |---|---| | `-p` | `exec.ResumeAnswer` (the user's answer) | | `--resume` | `exec.ResumeSessionID` (original session to continue) | | `--output-format stream-json` | | | `--verbose` | | | `--permission-mode` | same default/override as above | | `--model` | from `t.Agent.Model` (if set) | **Environment variables** injected into every subprocess: ``` CLAUDOMATOR_API_URL — base URL for subtask creation and question answers CLAUDOMATOR_TASK_ID — parent task ID (used in subtask POST bodies) CLAUDOMATOR_QUESTION_FILE — path where the agent writes a question JSON ``` ### Stream-JSON output parsing `execOnce()` attaches an `os.Pipe` to `cmd.Stdout` and spawns a goroutine that calls `parseStream()`. The goroutine tees the stream to the log file while scanning for JSONL events: | Event type | Action | |---|---| | `rate_limit_event` with `status: rejected` | sets `streamErr` to a rate-limit error | | `assistant` with `error: rate_limit` | sets `streamErr` to a rate-limit error | | `result` with `is_error: true` | sets `streamErr` to a task-failure error | | `result` with `total_cost_usd` | captures total cost | | `user` containing denied `tool_result` | sets `streamErr` to a permission-denial error | | any message with `cost_usd` | updates running cost (legacy field) | `costUSD` and `streamErr` are read after `cmd.Wait()` returns and the goroutine drains. ### Cost and token extraction `exec.CostUSD` is set from `total_cost_usd` (preferred) or `cost_usd` (legacy) in the stream. Token counts are not extracted at the runner level; they are available in the raw stdout log. ### Project sandbox enforcement When `t.Agent.ProjectDir` is set on a new (non-resume) execution: 1. `setupSandbox()` ensures `projectDir` is a git repository (initialising with an empty commit if needed), then `git clone --no-hardlinks` it into a `claudomator-sandbox-*` temp directory. It prefers a remote named `"local"` over `"origin"` as the clone source. 2. The subprocess runs with its working directory set to the sandbox. 3. On success, `teardownSandbox()` verifies the sandbox has no uncommitted changes, counts commits ahead of `origin/HEAD`, and pushes them to the bare repo. If the push is rejected due to a concurrent task, it fetches and rebases then retries once. 4. The sandbox is removed on success. On any failure the sandbox is preserved and its path is included in the error message. Resume executions skip sandboxing and run directly in `projectDir` so the agent can pick up its previous session state. ### Question file mechanism The agent preamble instructs the agent to write a JSON file to `$CLAUDOMATOR_QUESTION_FILE` and exit immediately when it needs user input. After `execOnce()` returns successfully (exit code 0), `Run()` attempts to read that file: - If the file exists: it is consumed (`os.Remove`), and `Run()` returns `&BlockedError{QuestionJSON: ..., SessionID: e.SessionID}`. - If the file is absent: `Run()` returns `nil` (success). --- ## 6. GeminiRunner ```go type GeminiRunner struct { BinaryPath string // defaults to "gemini" Logger *slog.Logger LogDir string APIURL string } ``` `GeminiRunner` follows the same pattern as `ClaudeRunner` with these differences: | Aspect | ClaudeRunner | GeminiRunner | |---|---|---| | Binary | `claude` | `gemini` | | Flag structure | `claude -p --session-id … --output-format stream-json …` | `gemini --output-format stream-json …` (instructions are the first positional argument) | | Session/resume | `--session-id` for new; `--resume` for resume | Session ID stored on `exec` but resume flag handling is a stub (not yet implemented) | | Rate-limit retries | `runWithBackoff` (up to 3 retries) | Single attempt; no retry loop | | Sandbox | Full clone + teardown + push | Not implemented; runs directly in `projectDir` | | Stream parsing | `parseStream()` | `parseStream()` (shared implementation; assumes compatible JSONL format) | | Question file | Same mechanism | Same mechanism | `GeminiRunner` implements `LogPather` identically to `ClaudeRunner`. --- ## 7. Classifier ```go type Classifier struct { GeminiBinaryPath string // defaults to "gemini" } type Classification struct { AgentType string `json:"agent_type"` Model string `json:"model"` Reason string `json:"reason"` } type SystemStatus struct { ActiveTasks map[string]int // agentType → number of active tasks RateLimited map[string]bool // agentType → true if currently rate-limited } ``` `Classify()` is called once per task, after load-balanced agent selection but before the subprocess starts. Its sole output is the `Model` field; `AgentType` is already fixed by the load balancer. **Decision factors baked into the prompt:** - The already-chosen agent type (Claude or Gemini). - The task `Name` and `Instructions`. - A hard-coded model tier list: - Claude: `haiku-4-5` (cheap/fast) → `sonnet-4-6` (default) → `opus-4-6` (hard tasks only) - Gemini: `gemini-2.5-flash-lite` (cheap) → `gemini-2.5-flash` (default) → `gemini-2.5-pro` (hard tasks only) **Implementation**: `Classify()` invokes `gemini --prompt --model gemini-2.5-flash-lite --output-format json`. It then strips markdown code fences and "Loaded cached credentials" noise before unmarshalling the JSON response into a `Classification`. If classification fails, the pool logs the error and proceeds with whatever model was already on `t.Agent.Model`. --- ## 8. QuestionRegistry ```go type QuestionRegistry struct { mu sync.Mutex questions map[string]*PendingQuestion // keyed by toolUseID } type PendingQuestion struct { TaskID string ToolUseID string Input json.RawMessage AnswerCh chan string // buffered(1); closed when answer delivered } ``` The registry tracks questions that are waiting for a human answer. It is populated via the stream-parsing path (`streamAndParseWithQuestions`) when an agent emits an `AskUserQuestion` tool_use event in the JSONL stream. | Method | Description | |---|---| | `Register(taskID, toolUseID, input)` | Adds a `PendingQuestion`; returns the buffered answer channel. | | `Answer(toolUseID, answer)` | Delivers an answer, removes the entry, returns `false` if not found. | | `Get(toolUseID)` | Returns the pending question or `nil`. | | `PendingForTask(taskID)` | Returns all pending questions for a task (used by the API to list them). | | `Remove(toolUseID)` | Removes without answering (used on task cancellation). | **How a blocked task is resumed:** 1. The API receives a `POST /api/tasks/{id}/answer` with the user's answer. 2. It calls `QuestionRegistry.Answer(toolUseID, answer)`, which unblocks any goroutine waiting on `AnswerCh`. 3. Separately (for the file-based path), the API creates a resume `Execution` with `ResumeSessionID` and `ResumeAnswer`, then calls `Pool.SubmitResume()`. 4. The pool runs `executeResume()`, which invokes `runner.Run()` with `--resume ` and the answer as `-p`. --- ## 9. Rate Limiter Rate limiting is implemented at two levels: ### Per-invocation retry (within `ClaudeRunner.Run`) `runWithBackoff(ctx, maxRetries=3, baseDelay=5s, fn)` calls the subprocess, detects rate-limit errors (`isRateLimitError`), and retries with exponential backoff: ``` attempt 0: immediate attempt 1: 5 s delay (or Retry-After header value if present) attempt 2: 10 s delay attempt 3: 20 s delay (capped at 5 min) ``` `isRateLimitError` matches: `"rate limit"`, `"too many requests"`, `"429"`, `"overloaded"`. Quota-exhausted errors (`isQuotaExhausted`) — strings like `"hit your limit"` or `"status: rejected"` — are **not** retried; they propagate immediately to `handleRunResult`. ### Per-agent-type pool cooldown When `handleRunResult` detects a rate-limit or quota error it records a cooldown deadline in `p.rateLimited[agentType]`: - Transient rate limit: 1 minute (or `Retry-After` value from the error message). - Quota exhausted: 5 hours. During agent selection in `execute()`, the load balancer skips agents whose cooldown deadline has not passed yet. If all agents are on cooldown, the agent with the fewest active tasks is chosen anyway (best-effort fallback). --- ## 10. BlockedError ```go type BlockedError struct { QuestionJSON string // raw JSON: {"text": "...", "options": [...]} SessionID string // claude session to resume once the user answers } ``` `BlockedError` is returned by `Runner.Run()` when the agent wrote a question to `$CLAUDOMATOR_QUESTION_FILE` and exited. It is **not** a fatal error — it is a signal that the task requires human input before it can continue. **Pool handling** (`handleRunResult`): ```go var blockedErr *BlockedError if errors.As(err, &blockedErr) { exec.Status = "BLOCKED" store.UpdateTaskState(t.ID, task.StateBlocked) store.UpdateTaskQuestion(t.ID, blockedErr.QuestionJSON) } ``` The pool does **not** emit a failure result; it still sends a `Result` to `resultCh` with `Err` set to the `BlockedError`. The API layer reads `exec.Status == "BLOCKED"` and exposes the question to the caller. Once the user answers, the API calls `Pool.SubmitResume()` and the task transitions back to RUNNING. --- ## 11. Result Struct and handleRunResult ```go type Result struct { TaskID string Execution *storage.Execution Err error } ``` `handleRunResult` is shared between `execute()` and `executeResume()`. It inspects the error returned by `Runner.Run()` and maps it to an execution status and task state: | Condition | exec.Status | task.State | |---|---|---| | `*BlockedError` | `BLOCKED` | `StateBlocked` | | `ctx.Err() == DeadlineExceeded` | `TIMED_OUT` | `StateTimedOut` | | `ctx.Err() == Canceled` | `CANCELLED` | `StateCancelled` | | `isQuotaExhausted(err)` | `BUDGET_EXCEEDED` | `StateBudgetExceeded` | | any other non-nil error | `FAILED` | `StateFailed` | | nil, root task, subtasks exist | `BLOCKED` | `StateBlocked` (waits for subtasks) | | nil, root task, no subtasks | `READY` | `StateReady` | | nil, subtask | `COMPLETED` | `StateCompleted` | After a subtask completes, `maybeUnblockParent()` checks whether all sibling subtasks are also COMPLETED; if so, the parent transitions from BLOCKED to READY. --- ## Diagrams ### Sequence: Pool.Submit → goroutine → Runner.Run → state update ```mermaid sequenceDiagram participant Caller participant Pool participant Dispatch as dispatch goroutine participant Worker as execute goroutine participant Classifier participant Runner participant Store Caller->>Pool: Submit(ctx, task) Pool->>Dispatch: workCh ← workItem Dispatch->>Dispatch: wait for free slot (doneCh) Dispatch->>Worker: go execute(ctx, task) Worker->>Pool: snapshot activePerAgent / rateLimited Pool-->>Worker: SystemStatus Worker->>Worker: pickAgent(status) → agentType Worker->>Classifier: Classify(taskName, instructions, status, agentType) Classifier-->>Worker: Classification{Model} Worker->>Store: waitForDependencies (poll every 5s) Store-->>Worker: all deps COMPLETED Worker->>Store: CreateExecution(exec) Worker->>Store: UpdateTaskState(RUNNING) Worker->>Runner: Run(ctx, task, exec) Note over Runner: subprocess runs,
stream parsed Runner-->>Worker: error or nil Worker->>Pool: handleRunResult(ctx, task, exec, err, agentType) Pool->>Store: UpdateTaskState(new state) Pool->>Store: UpdateExecution(exec) Pool->>Caller: resultCh ← Result{TaskID, Execution, Err} Worker->>Dispatch: doneCh ← token (slot freed) ``` ### Flowchart: Question-handling flow ```mermaid flowchart TD A([Agent subprocess running]) --> B{Need user input?} B -- No --> C([Task completes normally]) B -- Yes --> D["Write JSON to\n$CLAUDOMATOR_QUESTION_FILE\n{text, options}"] D --> E[Agent exits with code 0] E --> F[Runner reads question file] F --> G["Return &BlockedError\n{QuestionJSON, SessionID}"] G --> H[Pool.handleRunResult detects BlockedError] H --> I[exec.Status = BLOCKED] H --> J[Store.UpdateTaskState → StateBlocked] H --> K[Store.UpdateTaskQuestion → QuestionJSON] K --> L[API exposes question to user] L --> M[User submits answer via API] M --> N["API creates Execution:\nResumeSessionID = original SessionID\nResumeAnswer = user answer"] N --> O[Pool.SubmitResume → workCh] O --> P[dispatch launches executeResume goroutine] P --> Q["Runner.Run with\n--resume ResumeSessionID\n-p ResumeAnswer"] Q --> R([Claude session continues from where it left off]) ```