summaryrefslogtreecommitdiff
path: root/docs/packages/executor.md
blob: 29d6a0cc0aa2bfb34ede50f7ee8ea4c27a0e88a5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
# Package: executor

`internal/executor` — Bounded goroutine pool that drives agent subprocesses.

---

## 1. Overview

The `executor` package is Claudomator's task-running engine. Its central type, `Pool`, maintains a bounded set of concurrent worker goroutines. Each goroutine picks a task from an internal work queue, selects the appropriate `Runner` implementation (Claude or Gemini), and launches the agent as an OS subprocess. When a subprocess finishes the pool classifies the outcome, updates storage, and emits a `Result` on a channel that the API layer consumes.

Key responsibilities:

- **Concurrency control** — never exceed `maxConcurrent` simultaneous workers.
- **Load-balanced agent selection** — prefer the agent type with the fewest active tasks; skip rate-limited agents.
- **Dynamic model selection** — the `Classifier` picks the cheapest model that fits the task's complexity.
- **Dependency gating** — poll storage until all `depends_on` tasks reach COMPLETED before starting a worker.
- **Question / block / resume cycle** — detect when an agent needs user input, park the task as BLOCKED, and re-run it with the user's answer when one arrives.
- **Rate-limit back-pressure** — exponential backoff within a run; per-agent cooldown across runs.
- **Sandbox isolation** — optional `project_dir` support clones the project into a temp directory, runs the agent there, then merges committed changes back.

---

## 2. Pool

### Struct fields

```go
type Pool struct {
    maxConcurrent   int                          // global concurrency ceiling
    runners         map[string]Runner            // "claude" → ClaudeRunner, "gemini" → GeminiRunner
    store           Store                        // storage interface (GetTask, UpdateTaskState, …)
    logger          *slog.Logger
    depPollInterval time.Duration                // how often waitForDependencies polls; default 5 s

    mu             sync.Mutex
    active         int                          // number of goroutines currently in execute/executeResume
    activePerAgent map[string]int               // per-agent-type active count (used for load balancing)
    rateLimited    map[string]time.Time         // agentType → cooldown deadline
    cancels        map[string]context.CancelFunc // taskID → cancellation function

    resultCh chan *Result    // emits results; buffered at maxConcurrent*2
    workCh   chan workItem   // internal work queue; buffered at maxConcurrent*10+100
    doneCh   chan struct{}   // signals when a worker slot is freed; buffered at maxConcurrent

    Questions  *QuestionRegistry
    Classifier *Classifier
}
```

### Concurrency model

The pool enforces a **global** ceiling (`maxConcurrent`) rather than per-agent-type limits. Within that ceiling, `activePerAgent` tracks how many goroutines are running for each agent type; this counter is the input to the load balancer, not an additional hard limit.

A single long-lived `dispatch()` goroutine serialises the slot-allocation decision. It blocks on `doneCh` when all `maxConcurrent` slots are taken, then launches a worker goroutine as soon as any slot is freed. This prevents `Submit()` from blocking the caller while still honouring the concurrency ceiling.

```
workCh (buffered) ──► dispatch goroutine ──► execute goroutine (×N)
                                │                     │
                                └── waits on doneCh ◄─┘ (on exit)
```

### Dependency polling loop

When a task has a non-empty `DependsOn` list, `waitForDependencies()` is called inside the worker goroutine (after agent selection but before subprocess launch). It polls `store.GetTask()` for each dependency at `depPollInterval` intervals (default 5 s). It returns:

- `nil` — all dependencies reached COMPLETED.
- error — a dependency reached a terminal failure state (FAILED, TIMED_OUT, CANCELLED, BUDGET_EXCEEDED) or the context was cancelled.

---

## 3. Pool.Submit()

```go
func (p *Pool) Submit(ctx context.Context, t *task.Task) error
func (p *Pool) SubmitResume(ctx context.Context, t *task.Task, exec *storage.Execution) error
```

`Submit()` is non-blocking: it sends a `workItem` to `workCh`. If `workCh` is full (capacity `maxConcurrent*10+100`) it returns an error immediately rather than blocking the caller.

`SubmitResume()` is used to re-queue a BLOCKED or TIMED_OUT task. The provided `exec` must have `ResumeSessionID` and `ResumeAnswer` set; the pool routes it through `executeResume()` instead of `execute()`, skipping agent selection and classification.

**Goroutine lifecycle** (normal path via `execute()`):

1. `dispatch` dequeues the `workItem` and waits for a free slot.
2. `dispatch` increments `p.active` under the mutex and launches `go execute(ctx, t)`.
3. `execute` selects the agent, optionally classifies the model, and increments `activePerAgent`.
4. `execute` waits for dependencies, creates the execution record in storage, applies a timeout, and calls `runner.Run()`.
5. On return, `execute` calls `handleRunResult()`, decrements `p.active` and `activePerAgent`, and sends a token to `doneCh`.
6. `dispatch` unblocks and processes the next `workItem`.

---

## 4. Runner Interface

```go
type Runner interface {
    Run(ctx context.Context, t *task.Task, exec *storage.Execution) error
}

// Optional extension — lets the pool persist log paths before execution starts.
type LogPather interface {
    ExecLogDir(execID string) string
}
```

`Run()` is responsible for the full lifecycle of a single subprocess invocation, including log file management and cost extraction. It returns:

- `nil` — task succeeded.
- `*BlockedError` — agent wrote a question file; pool transitions task to BLOCKED.
- any other error — pool classifies as FAILED, TIMED_OUT, CANCELLED, or BUDGET_EXCEEDED based on context and error text.

The pool selects a runner by looking up `t.Agent.Type` in the `runners` map. If `t.Agent.Type` is empty it falls back to `"claude"`.

---

## 5. ClaudeRunner

```go
type ClaudeRunner struct {
    BinaryPath string // defaults to "claude"
    Logger     *slog.Logger
    LogDir     string // base directory for per-execution log directories
    APIURL     string // injected as CLAUDOMATOR_API_URL env var
}
```

### Subprocess invocation

`Run()` calls `buildArgs()` to construct the `claude` CLI invocation, then passes it through `runWithBackoff()` (up to 3 attempts on rate-limit errors, 5 s base delay, exponential backoff, capped at 5 min).

**New execution flags:**

| Flag | Value |
|---|---|
| `-p` | task instructions (optionally prefixed with planning preamble) |
| `--session-id` | execution UUID (doubles as the session UUID for resumability) |
| `--output-format stream-json` | machine-readable JSONL output |
| `--verbose` | include cost/token fields |
| `--model` | from `t.Agent.Model` (if set) |
| `--max-budget-usd` | from `t.Agent.MaxBudgetUSD` (if > 0) |
| `--permission-mode` | `bypassPermissions` by default; overridable per task |
| `--append-system-prompt` | from `t.Agent.SystemPromptAppend` |
| `--allowedTools` | repeated for each entry in `t.Agent.AllowedTools` |
| `--disallowedTools` | repeated for each entry in `t.Agent.DisallowedTools` |
| `--add-dir` | repeated for each entry in `t.Agent.ContextFiles` |

**Resume execution flags** (when `exec.ResumeSessionID != ""`):

| Flag | Value |
|---|---|
| `-p` | `exec.ResumeAnswer` (the user's answer) |
| `--resume` | `exec.ResumeSessionID` (original session to continue) |
| `--output-format stream-json` | |
| `--verbose` | |
| `--permission-mode` | same default/override as above |
| `--model` | from `t.Agent.Model` (if set) |

**Environment variables** injected into every subprocess:

```
CLAUDOMATOR_API_URL          — base URL for subtask creation and question answers
CLAUDOMATOR_TASK_ID          — parent task ID (used in subtask POST bodies)
CLAUDOMATOR_QUESTION_FILE    — path where the agent writes a question JSON
```

### Stream-JSON output parsing

`execOnce()` attaches an `os.Pipe` to `cmd.Stdout` and spawns a goroutine that calls `parseStream()`. The goroutine tees the stream to the log file while scanning for JSONL events:

| Event type | Action |
|---|---|
| `rate_limit_event` with `status: rejected` | sets `streamErr` to a rate-limit error |
| `assistant` with `error: rate_limit` | sets `streamErr` to a rate-limit error |
| `result` with `is_error: true` | sets `streamErr` to a task-failure error |
| `result` with `total_cost_usd` | captures total cost |
| `user` containing denied `tool_result` | sets `streamErr` to a permission-denial error |
| any message with `cost_usd` | updates running cost (legacy field) |

`costUSD` and `streamErr` are read after `cmd.Wait()` returns and the goroutine drains.

### Cost and token extraction

`exec.CostUSD` is set from `total_cost_usd` (preferred) or `cost_usd` (legacy) in the stream. Token counts are not extracted at the runner level; they are available in the raw stdout log.

### Project sandbox enforcement

When `t.Agent.ProjectDir` is set on a new (non-resume) execution:

1. `setupSandbox()` ensures `projectDir` is a git repository (initialising with an empty commit if needed), then `git clone --no-hardlinks` it into a `claudomator-sandbox-*` temp directory. It prefers a remote named `"local"` over `"origin"` as the clone source.
2. The subprocess runs with its working directory set to the sandbox.
3. On success, `teardownSandbox()` verifies the sandbox has no uncommitted changes, counts commits ahead of `origin/HEAD`, and pushes them to the bare repo. If the push is rejected due to a concurrent task, it fetches and rebases then retries once.
4. The sandbox is removed on success. On any failure the sandbox is preserved and its path is included in the error message.

Resume executions skip sandboxing and run directly in `projectDir` so the agent can pick up its previous session state.

### Question file mechanism

The agent preamble instructs the agent to write a JSON file to `$CLAUDOMATOR_QUESTION_FILE` and exit immediately when it needs user input. After `execOnce()` returns successfully (exit code 0), `Run()` attempts to read that file:

- If the file exists: it is consumed (`os.Remove`), and `Run()` returns `&BlockedError{QuestionJSON: ..., SessionID: e.SessionID}`.
- If the file is absent: `Run()` returns `nil` (success).

---

## 6. GeminiRunner

```go
type GeminiRunner struct {
    BinaryPath string // defaults to "gemini"
    Logger     *slog.Logger
    LogDir     string
    APIURL     string
}
```

`GeminiRunner` follows the same pattern as `ClaudeRunner` with these differences:

| Aspect | ClaudeRunner | GeminiRunner |
|---|---|---|
| Binary | `claude` | `gemini` |
| Flag structure | `claude -p <instructions> --session-id … --output-format stream-json …` | `gemini <instructions> --output-format stream-json …` (instructions are the first positional argument) |
| Session/resume | `--session-id` for new; `--resume` for resume | Session ID stored on `exec` but resume flag handling is a stub (not yet implemented) |
| Rate-limit retries | `runWithBackoff` (up to 3 retries) | Single attempt; no retry loop |
| Sandbox | Full clone + teardown + push | Not implemented; runs directly in `projectDir` |
| Stream parsing | `parseStream()` | `parseStream()` (shared implementation; assumes compatible JSONL format) |
| Question file | Same mechanism | Same mechanism |

`GeminiRunner` implements `LogPather` identically to `ClaudeRunner`.

---

## 7. Classifier

```go
type Classifier struct {
    GeminiBinaryPath string // defaults to "gemini"
}

type Classification struct {
    AgentType string `json:"agent_type"`
    Model     string `json:"model"`
    Reason    string `json:"reason"`
}

type SystemStatus struct {
    ActiveTasks map[string]int  // agentType → number of active tasks
    RateLimited map[string]bool // agentType → true if currently rate-limited
}
```

`Classify()` is called once per task, after load-balanced agent selection but before the subprocess starts. Its sole output is the `Model` field; `AgentType` is already fixed by the load balancer.

**Decision factors baked into the prompt:**

- The already-chosen agent type (Claude or Gemini).
- The task `Name` and `Instructions`.
- A hard-coded model tier list:
  - Claude: `haiku-4-5` (cheap/fast) → `sonnet-4-6` (default) → `opus-4-6` (hard tasks only)
  - Gemini: `gemini-2.5-flash-lite` (cheap) → `gemini-2.5-flash` (default) → `gemini-2.5-pro` (hard tasks only)

**Implementation**: `Classify()` invokes `gemini --prompt <prompt> --model gemini-2.5-flash-lite --output-format json`. It then strips markdown code fences and "Loaded cached credentials" noise before unmarshalling the JSON response into a `Classification`. If classification fails, the pool logs the error and proceeds with whatever model was already on `t.Agent.Model`.

---

## 8. QuestionRegistry

```go
type QuestionRegistry struct {
    mu        sync.Mutex
    questions map[string]*PendingQuestion // keyed by toolUseID
}

type PendingQuestion struct {
    TaskID    string
    ToolUseID string
    Input     json.RawMessage
    AnswerCh  chan string // buffered(1); closed when answer delivered
}
```

The registry tracks questions that are waiting for a human answer. It is populated via the stream-parsing path (`streamAndParseWithQuestions`) when an agent emits an `AskUserQuestion` tool_use event in the JSONL stream.

| Method | Description |
|---|---|
| `Register(taskID, toolUseID, input)` | Adds a `PendingQuestion`; returns the buffered answer channel. |
| `Answer(toolUseID, answer)` | Delivers an answer, removes the entry, returns `false` if not found. |
| `Get(toolUseID)` | Returns the pending question or `nil`. |
| `PendingForTask(taskID)` | Returns all pending questions for a task (used by the API to list them). |
| `Remove(toolUseID)` | Removes without answering (used on task cancellation). |

**How a blocked task is resumed:**

1. The API receives a `POST /api/tasks/{id}/answer` with the user's answer.
2. It calls `QuestionRegistry.Answer(toolUseID, answer)`, which unblocks any goroutine waiting on `AnswerCh`.
3. Separately (for the file-based path), the API creates a resume `Execution` with `ResumeSessionID` and `ResumeAnswer`, then calls `Pool.SubmitResume()`.
4. The pool runs `executeResume()`, which invokes `runner.Run()` with `--resume <session>` and the answer as `-p`.

---

## 9. Rate Limiter

Rate limiting is implemented at two levels:

### Per-invocation retry (within `ClaudeRunner.Run`)

`runWithBackoff(ctx, maxRetries=3, baseDelay=5s, fn)` calls the subprocess, detects rate-limit errors (`isRateLimitError`), and retries with exponential backoff:

```
attempt 0: immediate
attempt 1: 5 s delay  (or Retry-After header value if present)
attempt 2: 10 s delay
attempt 3: 20 s delay  (capped at 5 min)
```

`isRateLimitError` matches: `"rate limit"`, `"too many requests"`, `"429"`, `"overloaded"`.

Quota-exhausted errors (`isQuotaExhausted`) — strings like `"hit your limit"` or `"status: rejected"` — are **not** retried; they propagate immediately to `handleRunResult`.

### Per-agent-type pool cooldown

When `handleRunResult` detects a rate-limit or quota error it records a cooldown deadline in `p.rateLimited[agentType]`:

- Transient rate limit: 1 minute (or `Retry-After` value from the error message).
- Quota exhausted: 5 hours.

During agent selection in `execute()`, the load balancer skips agents whose cooldown deadline has not passed yet. If all agents are on cooldown, the agent with the fewest active tasks is chosen anyway (best-effort fallback).

---

## 10. BlockedError

```go
type BlockedError struct {
    QuestionJSON string // raw JSON: {"text": "...", "options": [...]}
    SessionID    string // claude session to resume once the user answers
}
```

`BlockedError` is returned by `Runner.Run()` when the agent wrote a question to `$CLAUDOMATOR_QUESTION_FILE` and exited. It is **not** a fatal error — it is a signal that the task requires human input before it can continue.

**Pool handling** (`handleRunResult`):

```go
var blockedErr *BlockedError
if errors.As(err, &blockedErr) {
    exec.Status = "BLOCKED"
    store.UpdateTaskState(t.ID, task.StateBlocked)
    store.UpdateTaskQuestion(t.ID, blockedErr.QuestionJSON)
}
```

The pool does **not** emit a failure result; it still sends a `Result` to `resultCh` with `Err` set to the `BlockedError`. The API layer reads `exec.Status == "BLOCKED"` and exposes the question to the caller. Once the user answers, the API calls `Pool.SubmitResume()` and the task transitions back to RUNNING.

---

## 11. Result Struct and handleRunResult

```go
type Result struct {
    TaskID    string
    Execution *storage.Execution
    Err       error
}
```

`handleRunResult` is shared between `execute()` and `executeResume()`. It inspects the error returned by `Runner.Run()` and maps it to an execution status and task state:

| Condition | exec.Status | task.State |
|---|---|---|
| `*BlockedError` | `BLOCKED` | `StateBlocked` |
| `ctx.Err() == DeadlineExceeded` | `TIMED_OUT` | `StateTimedOut` |
| `ctx.Err() == Canceled` | `CANCELLED` | `StateCancelled` |
| `isQuotaExhausted(err)` | `BUDGET_EXCEEDED` | `StateBudgetExceeded` |
| any other non-nil error | `FAILED` | `StateFailed` |
| nil, root task, subtasks exist | `BLOCKED` | `StateBlocked` (waits for subtasks) |
| nil, root task, no subtasks | `READY` | `StateReady` |
| nil, subtask | `COMPLETED` | `StateCompleted` |

After a subtask completes, `maybeUnblockParent()` checks whether all sibling subtasks are also COMPLETED; if so, the parent transitions from BLOCKED to READY.

---

## Diagrams

### Sequence: Pool.Submit → goroutine → Runner.Run → state update

```mermaid
sequenceDiagram
    participant Caller
    participant Pool
    participant Dispatch as dispatch goroutine
    participant Worker as execute goroutine
    participant Classifier
    participant Runner
    participant Store

    Caller->>Pool: Submit(ctx, task)
    Pool->>Dispatch: workCh ← workItem
    Dispatch->>Dispatch: wait for free slot (doneCh)
    Dispatch->>Worker: go execute(ctx, task)

    Worker->>Pool: snapshot activePerAgent / rateLimited
    Pool-->>Worker: SystemStatus
    Worker->>Worker: pickAgent(status) → agentType
    Worker->>Classifier: Classify(taskName, instructions, status, agentType)
    Classifier-->>Worker: Classification{Model}

    Worker->>Store: waitForDependencies (poll every 5s)
    Store-->>Worker: all deps COMPLETED

    Worker->>Store: CreateExecution(exec)
    Worker->>Store: UpdateTaskState(RUNNING)

    Worker->>Runner: Run(ctx, task, exec)
    Note over Runner: subprocess runs,<br/>stream parsed
    Runner-->>Worker: error or nil

    Worker->>Pool: handleRunResult(ctx, task, exec, err, agentType)
    Pool->>Store: UpdateTaskState(new state)
    Pool->>Store: UpdateExecution(exec)
    Pool->>Caller: resultCh ← Result{TaskID, Execution, Err}

    Worker->>Dispatch: doneCh ← token (slot freed)
```

### Flowchart: Question-handling flow

```mermaid
flowchart TD
    A([Agent subprocess running]) --> B{Need user input?}
    B -- No --> C([Task completes normally])
    B -- Yes --> D["Write JSON to\n$CLAUDOMATOR_QUESTION_FILE\n{text, options}"]
    D --> E[Agent exits with code 0]
    E --> F[Runner reads question file]
    F --> G["Return &BlockedError\n{QuestionJSON, SessionID}"]
    G --> H[Pool.handleRunResult detects BlockedError]
    H --> I[exec.Status = BLOCKED]
    H --> J[Store.UpdateTaskState → StateBlocked]
    H --> K[Store.UpdateTaskQuestion → QuestionJSON]
    K --> L[API exposes question to user]
    L --> M[User submits answer via API]
    M --> N["API creates Execution:\nResumeSessionID = original SessionID\nResumeAnswer = user answer"]
    N --> O[Pool.SubmitResume → workCh]
    O --> P[dispatch launches executeResume goroutine]
    P --> Q["Runner.Run with\n--resume ResumeSessionID\n-p ResumeAnswer"]
    Q --> R([Claude session continues from where it left off])
```