diff options
Diffstat (limited to 'docs')
| -rw-r--r-- | docs/packages/executor.md | 447 | ||||
| -rw-r--r-- | docs/packages/storage.md | 209 | ||||
| -rw-r--r-- | docs/packages/task.md | 332 |
3 files changed, 988 insertions, 0 deletions
diff --git a/docs/packages/executor.md b/docs/packages/executor.md new file mode 100644 index 0000000..29d6a0c --- /dev/null +++ b/docs/packages/executor.md @@ -0,0 +1,447 @@ +# Package: executor + +`internal/executor` — Bounded goroutine pool that drives agent subprocesses. + +--- + +## 1. Overview + +The `executor` package is Claudomator's task-running engine. Its central type, `Pool`, maintains a bounded set of concurrent worker goroutines. Each goroutine picks a task from an internal work queue, selects the appropriate `Runner` implementation (Claude or Gemini), and launches the agent as an OS subprocess. When a subprocess finishes the pool classifies the outcome, updates storage, and emits a `Result` on a channel that the API layer consumes. + +Key responsibilities: + +- **Concurrency control** — never exceed `maxConcurrent` simultaneous workers. +- **Load-balanced agent selection** — prefer the agent type with the fewest active tasks; skip rate-limited agents. +- **Dynamic model selection** — the `Classifier` picks the cheapest model that fits the task's complexity. +- **Dependency gating** — poll storage until all `depends_on` tasks reach COMPLETED before starting a worker. +- **Question / block / resume cycle** — detect when an agent needs user input, park the task as BLOCKED, and re-run it with the user's answer when one arrives. +- **Rate-limit back-pressure** — exponential backoff within a run; per-agent cooldown across runs. +- **Sandbox isolation** — optional `project_dir` support clones the project into a temp directory, runs the agent there, then merges committed changes back. + +--- + +## 2. Pool + +### Struct fields + +```go +type Pool struct { + maxConcurrent int // global concurrency ceiling + runners map[string]Runner // "claude" → ClaudeRunner, "gemini" → GeminiRunner + store Store // storage interface (GetTask, UpdateTaskState, …) + logger *slog.Logger + depPollInterval time.Duration // how often waitForDependencies polls; default 5 s + + mu sync.Mutex + active int // number of goroutines currently in execute/executeResume + activePerAgent map[string]int // per-agent-type active count (used for load balancing) + rateLimited map[string]time.Time // agentType → cooldown deadline + cancels map[string]context.CancelFunc // taskID → cancellation function + + resultCh chan *Result // emits results; buffered at maxConcurrent*2 + workCh chan workItem // internal work queue; buffered at maxConcurrent*10+100 + doneCh chan struct{} // signals when a worker slot is freed; buffered at maxConcurrent + + Questions *QuestionRegistry + Classifier *Classifier +} +``` + +### Concurrency model + +The pool enforces a **global** ceiling (`maxConcurrent`) rather than per-agent-type limits. Within that ceiling, `activePerAgent` tracks how many goroutines are running for each agent type; this counter is the input to the load balancer, not an additional hard limit. + +A single long-lived `dispatch()` goroutine serialises the slot-allocation decision. It blocks on `doneCh` when all `maxConcurrent` slots are taken, then launches a worker goroutine as soon as any slot is freed. This prevents `Submit()` from blocking the caller while still honouring the concurrency ceiling. + +``` +workCh (buffered) ──► dispatch goroutine ──► execute goroutine (×N) + │ │ + └── waits on doneCh ◄─┘ (on exit) +``` + +### Dependency polling loop + +When a task has a non-empty `DependsOn` list, `waitForDependencies()` is called inside the worker goroutine (after agent selection but before subprocess launch). It polls `store.GetTask()` for each dependency at `depPollInterval` intervals (default 5 s). It returns: + +- `nil` — all dependencies reached COMPLETED. +- error — a dependency reached a terminal failure state (FAILED, TIMED_OUT, CANCELLED, BUDGET_EXCEEDED) or the context was cancelled. + +--- + +## 3. Pool.Submit() + +```go +func (p *Pool) Submit(ctx context.Context, t *task.Task) error +func (p *Pool) SubmitResume(ctx context.Context, t *task.Task, exec *storage.Execution) error +``` + +`Submit()` is non-blocking: it sends a `workItem` to `workCh`. If `workCh` is full (capacity `maxConcurrent*10+100`) it returns an error immediately rather than blocking the caller. + +`SubmitResume()` is used to re-queue a BLOCKED or TIMED_OUT task. The provided `exec` must have `ResumeSessionID` and `ResumeAnswer` set; the pool routes it through `executeResume()` instead of `execute()`, skipping agent selection and classification. + +**Goroutine lifecycle** (normal path via `execute()`): + +1. `dispatch` dequeues the `workItem` and waits for a free slot. +2. `dispatch` increments `p.active` under the mutex and launches `go execute(ctx, t)`. +3. `execute` selects the agent, optionally classifies the model, and increments `activePerAgent`. +4. `execute` waits for dependencies, creates the execution record in storage, applies a timeout, and calls `runner.Run()`. +5. On return, `execute` calls `handleRunResult()`, decrements `p.active` and `activePerAgent`, and sends a token to `doneCh`. +6. `dispatch` unblocks and processes the next `workItem`. + +--- + +## 4. Runner Interface + +```go +type Runner interface { + Run(ctx context.Context, t *task.Task, exec *storage.Execution) error +} + +// Optional extension — lets the pool persist log paths before execution starts. +type LogPather interface { + ExecLogDir(execID string) string +} +``` + +`Run()` is responsible for the full lifecycle of a single subprocess invocation, including log file management and cost extraction. It returns: + +- `nil` — task succeeded. +- `*BlockedError` — agent wrote a question file; pool transitions task to BLOCKED. +- any other error — pool classifies as FAILED, TIMED_OUT, CANCELLED, or BUDGET_EXCEEDED based on context and error text. + +The pool selects a runner by looking up `t.Agent.Type` in the `runners` map. If `t.Agent.Type` is empty it falls back to `"claude"`. + +--- + +## 5. ClaudeRunner + +```go +type ClaudeRunner struct { + BinaryPath string // defaults to "claude" + Logger *slog.Logger + LogDir string // base directory for per-execution log directories + APIURL string // injected as CLAUDOMATOR_API_URL env var +} +``` + +### Subprocess invocation + +`Run()` calls `buildArgs()` to construct the `claude` CLI invocation, then passes it through `runWithBackoff()` (up to 3 attempts on rate-limit errors, 5 s base delay, exponential backoff, capped at 5 min). + +**New execution flags:** + +| Flag | Value | +|---|---| +| `-p` | task instructions (optionally prefixed with planning preamble) | +| `--session-id` | execution UUID (doubles as the session UUID for resumability) | +| `--output-format stream-json` | machine-readable JSONL output | +| `--verbose` | include cost/token fields | +| `--model` | from `t.Agent.Model` (if set) | +| `--max-budget-usd` | from `t.Agent.MaxBudgetUSD` (if > 0) | +| `--permission-mode` | `bypassPermissions` by default; overridable per task | +| `--append-system-prompt` | from `t.Agent.SystemPromptAppend` | +| `--allowedTools` | repeated for each entry in `t.Agent.AllowedTools` | +| `--disallowedTools` | repeated for each entry in `t.Agent.DisallowedTools` | +| `--add-dir` | repeated for each entry in `t.Agent.ContextFiles` | + +**Resume execution flags** (when `exec.ResumeSessionID != ""`): + +| Flag | Value | +|---|---| +| `-p` | `exec.ResumeAnswer` (the user's answer) | +| `--resume` | `exec.ResumeSessionID` (original session to continue) | +| `--output-format stream-json` | | +| `--verbose` | | +| `--permission-mode` | same default/override as above | +| `--model` | from `t.Agent.Model` (if set) | + +**Environment variables** injected into every subprocess: + +``` +CLAUDOMATOR_API_URL — base URL for subtask creation and question answers +CLAUDOMATOR_TASK_ID — parent task ID (used in subtask POST bodies) +CLAUDOMATOR_QUESTION_FILE — path where the agent writes a question JSON +``` + +### Stream-JSON output parsing + +`execOnce()` attaches an `os.Pipe` to `cmd.Stdout` and spawns a goroutine that calls `parseStream()`. The goroutine tees the stream to the log file while scanning for JSONL events: + +| Event type | Action | +|---|---| +| `rate_limit_event` with `status: rejected` | sets `streamErr` to a rate-limit error | +| `assistant` with `error: rate_limit` | sets `streamErr` to a rate-limit error | +| `result` with `is_error: true` | sets `streamErr` to a task-failure error | +| `result` with `total_cost_usd` | captures total cost | +| `user` containing denied `tool_result` | sets `streamErr` to a permission-denial error | +| any message with `cost_usd` | updates running cost (legacy field) | + +`costUSD` and `streamErr` are read after `cmd.Wait()` returns and the goroutine drains. + +### Cost and token extraction + +`exec.CostUSD` is set from `total_cost_usd` (preferred) or `cost_usd` (legacy) in the stream. Token counts are not extracted at the runner level; they are available in the raw stdout log. + +### Project sandbox enforcement + +When `t.Agent.ProjectDir` is set on a new (non-resume) execution: + +1. `setupSandbox()` ensures `projectDir` is a git repository (initialising with an empty commit if needed), then `git clone --no-hardlinks` it into a `claudomator-sandbox-*` temp directory. It prefers a remote named `"local"` over `"origin"` as the clone source. +2. The subprocess runs with its working directory set to the sandbox. +3. On success, `teardownSandbox()` verifies the sandbox has no uncommitted changes, counts commits ahead of `origin/HEAD`, and pushes them to the bare repo. If the push is rejected due to a concurrent task, it fetches and rebases then retries once. +4. The sandbox is removed on success. On any failure the sandbox is preserved and its path is included in the error message. + +Resume executions skip sandboxing and run directly in `projectDir` so the agent can pick up its previous session state. + +### Question file mechanism + +The agent preamble instructs the agent to write a JSON file to `$CLAUDOMATOR_QUESTION_FILE` and exit immediately when it needs user input. After `execOnce()` returns successfully (exit code 0), `Run()` attempts to read that file: + +- If the file exists: it is consumed (`os.Remove`), and `Run()` returns `&BlockedError{QuestionJSON: ..., SessionID: e.SessionID}`. +- If the file is absent: `Run()` returns `nil` (success). + +--- + +## 6. GeminiRunner + +```go +type GeminiRunner struct { + BinaryPath string // defaults to "gemini" + Logger *slog.Logger + LogDir string + APIURL string +} +``` + +`GeminiRunner` follows the same pattern as `ClaudeRunner` with these differences: + +| Aspect | ClaudeRunner | GeminiRunner | +|---|---|---| +| Binary | `claude` | `gemini` | +| Flag structure | `claude -p <instructions> --session-id … --output-format stream-json …` | `gemini <instructions> --output-format stream-json …` (instructions are the first positional argument) | +| Session/resume | `--session-id` for new; `--resume` for resume | Session ID stored on `exec` but resume flag handling is a stub (not yet implemented) | +| Rate-limit retries | `runWithBackoff` (up to 3 retries) | Single attempt; no retry loop | +| Sandbox | Full clone + teardown + push | Not implemented; runs directly in `projectDir` | +| Stream parsing | `parseStream()` | `parseStream()` (shared implementation; assumes compatible JSONL format) | +| Question file | Same mechanism | Same mechanism | + +`GeminiRunner` implements `LogPather` identically to `ClaudeRunner`. + +--- + +## 7. Classifier + +```go +type Classifier struct { + GeminiBinaryPath string // defaults to "gemini" +} + +type Classification struct { + AgentType string `json:"agent_type"` + Model string `json:"model"` + Reason string `json:"reason"` +} + +type SystemStatus struct { + ActiveTasks map[string]int // agentType → number of active tasks + RateLimited map[string]bool // agentType → true if currently rate-limited +} +``` + +`Classify()` is called once per task, after load-balanced agent selection but before the subprocess starts. Its sole output is the `Model` field; `AgentType` is already fixed by the load balancer. + +**Decision factors baked into the prompt:** + +- The already-chosen agent type (Claude or Gemini). +- The task `Name` and `Instructions`. +- A hard-coded model tier list: + - Claude: `haiku-4-5` (cheap/fast) → `sonnet-4-6` (default) → `opus-4-6` (hard tasks only) + - Gemini: `gemini-2.5-flash-lite` (cheap) → `gemini-2.5-flash` (default) → `gemini-2.5-pro` (hard tasks only) + +**Implementation**: `Classify()` invokes `gemini --prompt <prompt> --model gemini-2.5-flash-lite --output-format json`. It then strips markdown code fences and "Loaded cached credentials" noise before unmarshalling the JSON response into a `Classification`. If classification fails, the pool logs the error and proceeds with whatever model was already on `t.Agent.Model`. + +--- + +## 8. QuestionRegistry + +```go +type QuestionRegistry struct { + mu sync.Mutex + questions map[string]*PendingQuestion // keyed by toolUseID +} + +type PendingQuestion struct { + TaskID string + ToolUseID string + Input json.RawMessage + AnswerCh chan string // buffered(1); closed when answer delivered +} +``` + +The registry tracks questions that are waiting for a human answer. It is populated via the stream-parsing path (`streamAndParseWithQuestions`) when an agent emits an `AskUserQuestion` tool_use event in the JSONL stream. + +| Method | Description | +|---|---| +| `Register(taskID, toolUseID, input)` | Adds a `PendingQuestion`; returns the buffered answer channel. | +| `Answer(toolUseID, answer)` | Delivers an answer, removes the entry, returns `false` if not found. | +| `Get(toolUseID)` | Returns the pending question or `nil`. | +| `PendingForTask(taskID)` | Returns all pending questions for a task (used by the API to list them). | +| `Remove(toolUseID)` | Removes without answering (used on task cancellation). | + +**How a blocked task is resumed:** + +1. The API receives a `POST /api/tasks/{id}/answer` with the user's answer. +2. It calls `QuestionRegistry.Answer(toolUseID, answer)`, which unblocks any goroutine waiting on `AnswerCh`. +3. Separately (for the file-based path), the API creates a resume `Execution` with `ResumeSessionID` and `ResumeAnswer`, then calls `Pool.SubmitResume()`. +4. The pool runs `executeResume()`, which invokes `runner.Run()` with `--resume <session>` and the answer as `-p`. + +--- + +## 9. Rate Limiter + +Rate limiting is implemented at two levels: + +### Per-invocation retry (within `ClaudeRunner.Run`) + +`runWithBackoff(ctx, maxRetries=3, baseDelay=5s, fn)` calls the subprocess, detects rate-limit errors (`isRateLimitError`), and retries with exponential backoff: + +``` +attempt 0: immediate +attempt 1: 5 s delay (or Retry-After header value if present) +attempt 2: 10 s delay +attempt 3: 20 s delay (capped at 5 min) +``` + +`isRateLimitError` matches: `"rate limit"`, `"too many requests"`, `"429"`, `"overloaded"`. + +Quota-exhausted errors (`isQuotaExhausted`) — strings like `"hit your limit"` or `"status: rejected"` — are **not** retried; they propagate immediately to `handleRunResult`. + +### Per-agent-type pool cooldown + +When `handleRunResult` detects a rate-limit or quota error it records a cooldown deadline in `p.rateLimited[agentType]`: + +- Transient rate limit: 1 minute (or `Retry-After` value from the error message). +- Quota exhausted: 5 hours. + +During agent selection in `execute()`, the load balancer skips agents whose cooldown deadline has not passed yet. If all agents are on cooldown, the agent with the fewest active tasks is chosen anyway (best-effort fallback). + +--- + +## 10. BlockedError + +```go +type BlockedError struct { + QuestionJSON string // raw JSON: {"text": "...", "options": [...]} + SessionID string // claude session to resume once the user answers +} +``` + +`BlockedError` is returned by `Runner.Run()` when the agent wrote a question to `$CLAUDOMATOR_QUESTION_FILE` and exited. It is **not** a fatal error — it is a signal that the task requires human input before it can continue. + +**Pool handling** (`handleRunResult`): + +```go +var blockedErr *BlockedError +if errors.As(err, &blockedErr) { + exec.Status = "BLOCKED" + store.UpdateTaskState(t.ID, task.StateBlocked) + store.UpdateTaskQuestion(t.ID, blockedErr.QuestionJSON) +} +``` + +The pool does **not** emit a failure result; it still sends a `Result` to `resultCh` with `Err` set to the `BlockedError`. The API layer reads `exec.Status == "BLOCKED"` and exposes the question to the caller. Once the user answers, the API calls `Pool.SubmitResume()` and the task transitions back to RUNNING. + +--- + +## 11. Result Struct and handleRunResult + +```go +type Result struct { + TaskID string + Execution *storage.Execution + Err error +} +``` + +`handleRunResult` is shared between `execute()` and `executeResume()`. It inspects the error returned by `Runner.Run()` and maps it to an execution status and task state: + +| Condition | exec.Status | task.State | +|---|---|---| +| `*BlockedError` | `BLOCKED` | `StateBlocked` | +| `ctx.Err() == DeadlineExceeded` | `TIMED_OUT` | `StateTimedOut` | +| `ctx.Err() == Canceled` | `CANCELLED` | `StateCancelled` | +| `isQuotaExhausted(err)` | `BUDGET_EXCEEDED` | `StateBudgetExceeded` | +| any other non-nil error | `FAILED` | `StateFailed` | +| nil, root task, subtasks exist | `BLOCKED` | `StateBlocked` (waits for subtasks) | +| nil, root task, no subtasks | `READY` | `StateReady` | +| nil, subtask | `COMPLETED` | `StateCompleted` | + +After a subtask completes, `maybeUnblockParent()` checks whether all sibling subtasks are also COMPLETED; if so, the parent transitions from BLOCKED to READY. + +--- + +## Diagrams + +### Sequence: Pool.Submit → goroutine → Runner.Run → state update + +```mermaid +sequenceDiagram + participant Caller + participant Pool + participant Dispatch as dispatch goroutine + participant Worker as execute goroutine + participant Classifier + participant Runner + participant Store + + Caller->>Pool: Submit(ctx, task) + Pool->>Dispatch: workCh ← workItem + Dispatch->>Dispatch: wait for free slot (doneCh) + Dispatch->>Worker: go execute(ctx, task) + + Worker->>Pool: snapshot activePerAgent / rateLimited + Pool-->>Worker: SystemStatus + Worker->>Worker: pickAgent(status) → agentType + Worker->>Classifier: Classify(taskName, instructions, status, agentType) + Classifier-->>Worker: Classification{Model} + + Worker->>Store: waitForDependencies (poll every 5s) + Store-->>Worker: all deps COMPLETED + + Worker->>Store: CreateExecution(exec) + Worker->>Store: UpdateTaskState(RUNNING) + + Worker->>Runner: Run(ctx, task, exec) + Note over Runner: subprocess runs,<br/>stream parsed + Runner-->>Worker: error or nil + + Worker->>Pool: handleRunResult(ctx, task, exec, err, agentType) + Pool->>Store: UpdateTaskState(new state) + Pool->>Store: UpdateExecution(exec) + Pool->>Caller: resultCh ← Result{TaskID, Execution, Err} + + Worker->>Dispatch: doneCh ← token (slot freed) +``` + +### Flowchart: Question-handling flow + +```mermaid +flowchart TD + A([Agent subprocess running]) --> B{Need user input?} + B -- No --> C([Task completes normally]) + B -- Yes --> D["Write JSON to\n$CLAUDOMATOR_QUESTION_FILE\n{text, options}"] + D --> E[Agent exits with code 0] + E --> F[Runner reads question file] + F --> G["Return &BlockedError\n{QuestionJSON, SessionID}"] + G --> H[Pool.handleRunResult detects BlockedError] + H --> I[exec.Status = BLOCKED] + H --> J[Store.UpdateTaskState → StateBlocked] + H --> K[Store.UpdateTaskQuestion → QuestionJSON] + K --> L[API exposes question to user] + L --> M[User submits answer via API] + M --> N["API creates Execution:\nResumeSessionID = original SessionID\nResumeAnswer = user answer"] + N --> O[Pool.SubmitResume → workCh] + O --> P[dispatch launches executeResume goroutine] + P --> Q["Runner.Run with\n--resume ResumeSessionID\n-p ResumeAnswer"] + Q --> R([Claude session continues from where it left off]) +``` diff --git a/docs/packages/storage.md b/docs/packages/storage.md new file mode 100644 index 0000000..2e53cea --- /dev/null +++ b/docs/packages/storage.md @@ -0,0 +1,209 @@ +# Package: `internal/storage` + +## Overview + +The `storage` package provides the SQLite persistence layer for Claudomator. It exposes a `DB` struct that manages all reads and writes to the database, runs schema migrations automatically on `Open`, and enforces valid state transitions when updating task states. + +Key characteristics: +- **Auto-migration**: `Open` creates all tables and indexes if they do not exist, then applies additive column migrations idempotently (duplicate-column errors are silently ignored). +- **WAL mode**: The database is opened with `?_journal_mode=WAL&_busy_timeout=5000` for concurrent read safety and reduced lock contention. +- **State enforcement**: `UpdateTaskState` performs the state change inside a transaction and calls `task.ValidTransition` before committing. +- **Cascading deletes**: `DeleteTask` removes a task, all its descendant subtasks, and all their execution records in a single transaction using a recursive CTE. + +--- + +## Schema + +### `tasks` table + +| Column | Type | Description | +|------------------|-----------|----------------------------------------------------------| +| `id` | `TEXT PK` | UUID; primary key. | +| `name` | `TEXT` | Human-readable task name. Not null. | +| `description` | `TEXT` | Optional longer description. | +| `config_json` | `TEXT` | JSON blob — serialised `AgentConfig` struct. | +| `priority` | `TEXT` | `"high"`, `"normal"`, or `"low"`. Default `"normal"`. | +| `timeout_ns` | `INTEGER` | Timeout as nanoseconds (`time.Duration`). Default `0`. | +| `retry_json` | `TEXT` | JSON blob — serialised `RetryConfig` struct. | +| `tags_json` | `TEXT` | JSON blob — serialised `[]string`. | +| `depends_on_json`| `TEXT` | JSON blob — serialised `[]string` of task IDs. | +| `parent_task_id` | `TEXT` | ID of parent task; null for root tasks. | +| `state` | `TEXT` | Current `State` value. Default `"PENDING"`. | +| `created_at` | `DATETIME`| Creation timestamp (UTC). | +| `updated_at` | `DATETIME`| Last-update timestamp (UTC). | +| `rejection_comment` | `TEXT` | Reviewer comment set by `RejectTask`; nullable. | +| `question_json` | `TEXT` | Pending agent question set by `UpdateTaskQuestion`; nullable. | + +**JSON blob columns**: `config_json`, `retry_json`, `tags_json`, `depends_on_json`. + +**Indexes**: `idx_tasks_state` (state), `idx_tasks_parent_task_id` (parent_task_id). + +### `executions` table + +| Column | Type | Description | +|---------------|-----------|---------------------------------------------------------------| +| `id` | `TEXT PK` | UUID; primary key. | +| `task_id` | `TEXT` | Foreign key → `tasks.id`. | +| `start_time` | `DATETIME`| When the execution started (UTC). | +| `end_time` | `DATETIME`| When the execution ended (UTC); null while running. | +| `exit_code` | `INTEGER` | Process exit code; 0 = success. | +| `status` | `TEXT` | Execution status string (mirrors task state at completion). | +| `stdout_path` | `TEXT` | Path to the stdout log file under `~/.claudomator/executions/`. | +| `stderr_path` | `TEXT` | Path to the stderr log file under `~/.claudomator/executions/`. | +| `artifact_dir`| `TEXT` | Directory containing output artifacts. | +| `cost_usd` | `REAL` | Total API spend for this execution in USD. | +| `error_msg` | `TEXT` | Error description if the execution failed. | +| `session_id` | `TEXT` | Claude `--session-id` value; used to resume interrupted runs. | + +**Indexes**: `idx_executions_status`, `idx_executions_task_id`, `idx_executions_start_time`. + +--- + +## `DB` Struct Methods + +### Lifecycle + +#### `Open(path string) (*DB, error)` +Opens the SQLite database at `path`, runs auto-migration, and returns a `*DB`. The connection string appends `?_journal_mode=WAL&_busy_timeout=5000`. + +#### `(*DB) Close() error` +Closes the underlying database connection. + +--- + +### Task CRUD + +#### `(*DB) CreateTask(t *task.Task) error` +Inserts a new task. `AgentConfig`, `RetryConfig`, `Tags`, and `DependsOn` are serialised as JSON blobs; `Timeout` is stored as nanoseconds. + +#### `(*DB) GetTask(id string) (*task.Task, error)` +Retrieves a single task by ID. Returns an error wrapping `sql.ErrNoRows` if not found. + +#### `(*DB) ListTasks(filter TaskFilter) ([]*task.Task, error)` +Returns tasks matching `filter`, ordered by `created_at DESC`. See [TaskFilter](#taskfilter). + +#### `(*DB) ListSubtasks(parentID string) ([]*task.Task, error)` +Returns all tasks whose `parent_task_id` equals `parentID`, ordered by `created_at ASC`. + +#### `(*DB) UpdateTask(id string, u TaskUpdate) error` +Replaces editable fields (`Name`, `Description`, `Config`, `Priority`, `TimeoutNS`, `Retry`, `Tags`, `DependsOn`) and resets `state` to `PENDING`. Returns an error if the task does not exist. + +#### `(*DB) DeleteTask(id string) error` +Deletes a task, all its descendant subtasks (via recursive CTE), and all their execution records in a single transaction. Returns an error if the task does not exist. + +--- + +### State Management + +#### `(*DB) UpdateTaskState(id string, newState task.State) error` +Atomically updates a task's state inside a transaction. Calls `task.ValidTransition` to reject illegal moves before applying the change. + +#### `(*DB) RejectTask(id, comment string) error` +Sets the task's state to `PENDING` and stores `comment` in `rejection_comment`. Does not go through the state-machine validator (direct update). + +#### `(*DB) ResetTaskForRetry(id string) (*task.Task, error)` +Validates that the current state can transition to `QUEUED`, clears `Agent.Type` and `Agent.Model` so the task can be re-classified, sets `state = QUEUED`, and persists the changes in a transaction. Returns the updated task. + +#### `(*DB) UpdateTaskQuestion(taskID, questionJSON string) error` +Stores a JSON-encoded agent question on the task. Pass an empty string to clear the question after it has been answered. + +--- + +### Execution CRUD + +#### `(*DB) CreateExecution(e *Execution) error` +Inserts a new execution record. + +#### `(*DB) GetExecution(id string) (*Execution, error)` +Retrieves a single execution by ID. + +#### `(*DB) ListExecutions(taskID string) ([]*Execution, error)` +Returns all executions for a task, ordered by `start_time DESC`. + +#### `(*DB) GetLatestExecution(taskID string) (*Execution, error)` +Returns the most recent execution for a task. + +#### `(*DB) UpdateExecution(e *Execution) error` +Updates a completed execution record (end time, exit code, status, cost, paths, session ID). + +#### `(*DB) ListRecentExecutions(since time.Time, limit int, taskID string) ([]*RecentExecution, error)` +Returns executions since `since`, joined with the task name, ordered by `start_time DESC`. If `taskID` is non-empty only executions for that task are included. Returns `RecentExecution` values (see below). + +--- + +## `TaskFilter` + +```go +type TaskFilter struct { + State task.State + Limit int +} +``` + +| Field | Type | Description | +|---------|--------------|-------------------------------------------------------| +| `State` | `task.State` | If non-empty, only tasks in this state are returned. | +| `Limit` | `int` | Maximum number of results. `0` means no limit. | + +--- + +## `Execution` Struct + +```go +type Execution struct { + ID string + TaskID string + StartTime time.Time + EndTime time.Time + ExitCode int + Status string + StdoutPath string + StderrPath string + ArtifactDir string + CostUSD float64 + ErrorMsg string + SessionID string // persisted: claude --session-id for resume + ResumeSessionID string // in-memory only: set when creating a resume execution + ResumeAnswer string // in-memory only: human answer forwarded to agent +} +``` + +| Field | Persisted | Description | +|-------------------|-----------|-----------------------------------------------------------------------| +| `ID` | Yes | UUID; primary key. | +| `TaskID` | Yes | ID of the owning task. | +| `StartTime` | Yes | When execution started (UTC). | +| `EndTime` | Yes | When execution ended (UTC); zero while still running. | +| `ExitCode` | Yes | Process exit code (0 = success). | +| `Status` | Yes | Status string at completion. | +| `StdoutPath` | Yes | Absolute path to the stdout log file. | +| `StderrPath` | Yes | Absolute path to the stderr log file. | +| `ArtifactDir` | Yes | Directory containing output artifacts produced by the agent. | +| `CostUSD` | Yes | Total API spend for this run in USD. | +| `ErrorMsg` | Yes | Error description if the run failed. | +| `SessionID` | Yes | Claude session ID; stored to allow resuming an interrupted session. | +| `ResumeSessionID` | No | Session to resume; set in-memory when creating a resume execution. | +| `ResumeAnswer` | No | Human answer to a blocked question; forwarded to the agent in-memory. | + +--- + +## File Layout + +All runtime data lives under `~/.claudomator/`: + +``` +~/.claudomator/ +├── claudomator.db # SQLite database (WAL mode) +└── executions/ # Execution log files + ├── <execution-id>.stdout + └── <execution-id>.stderr +``` + +The paths are configured by `internal/config`: + +| Config field | Value | +|--------------|--------------------------------------| +| `DBPath` | `~/.claudomator/claudomator.db` | +| `LogDir` | `~/.claudomator/executions/` | + +Both directories are created automatically on first run if they do not exist. diff --git a/docs/packages/task.md b/docs/packages/task.md new file mode 100644 index 0000000..3502f8a --- /dev/null +++ b/docs/packages/task.md @@ -0,0 +1,332 @@ +# Package: `internal/task` + +## Overview + +The `task` package defines the core domain model for Claudomator. A **Task** represents a discrete unit of work to be executed by a Claude agent — described by natural-language instructions, optional budget and timeout constraints, retry policy, and scheduling metadata. Tasks are authored as YAML files and progress through a well-defined lifecycle from `PENDING` to a terminal state. + +--- + +## Task Struct + +```go +type Task struct { + ID string `yaml:"id"` + ParentTaskID string `yaml:"parent_task_id"` + Name string `yaml:"name"` + Description string `yaml:"description"` + Agent AgentConfig `yaml:"agent"` + Timeout Duration `yaml:"timeout"` + Retry RetryConfig `yaml:"retry"` + Priority Priority `yaml:"priority"` + Tags []string `yaml:"tags"` + DependsOn []string `yaml:"depends_on"` + State State `yaml:"-"` + RejectionComment string `yaml:"-"` + QuestionJSON string `yaml:"-"` + CreatedAt time.Time `yaml:"-"` + UpdatedAt time.Time `yaml:"-"` +} +``` + +| Field | Type | Purpose | Default / Notes | +|--------------------|---------------|----------------------------------------------------------------------|------------------------------------------| +| `ID` | `string` | Unique identifier (UUID). | Auto-generated by `Parse` if empty. | +| `ParentTaskID` | `string` | ID of the parent task; empty for top-level tasks. | Optional. | +| `Name` | `string` | Human-readable name. **Required.** | Must be non-empty. | +| `Description` | `string` | Longer explanation shown in the UI. | Optional. | +| `Agent` | `AgentConfig` | Configuration passed to the Claude agent executor. | See [AgentConfig](#agentconfig-struct). | +| `Timeout` | `Duration` | Maximum wall-clock time for a single execution attempt. | `0` means no limit. | +| `Retry` | `RetryConfig` | Retry policy applied when a run fails. | See [RetryConfig](#retryconfig-struct). | +| `Priority` | `Priority` | Scheduling priority: `high`, `normal`, or `low`. | Default: `normal`. | +| `Tags` | `[]string` | Arbitrary labels for filtering and grouping. | Optional; defaults to `[]`. | +| `DependsOn` | `[]string` | IDs of tasks that must reach `COMPLETED` before this one is queued. | Optional; defaults to `[]`. | +| `State` | `State` | Current lifecycle state. Not read from YAML (`yaml:"-"`). | Set to `PENDING` by `Parse`. | +| `RejectionComment` | `string` | Free-text comment left by a reviewer when rejecting a task. | Not in YAML; populated by storage layer. | +| `QuestionJSON` | `string` | JSON-encoded question posted by the agent while `BLOCKED`. | Not in YAML; populated by storage layer. | +| `CreatedAt` | `time.Time` | Timestamp set when the task is parsed. | Auto-set by `Parse`. | +| `UpdatedAt` | `time.Time` | Timestamp updated on every state change. | Auto-set by `Parse`; updated by DB. | + +Fields tagged `yaml:"-"` are runtime-only and are never parsed from YAML task files. + +--- + +## AgentConfig Struct + +```go +type AgentConfig struct { + Type string `yaml:"type"` + Model string `yaml:"model"` + ContextFiles []string `yaml:"context_files"` + Instructions string `yaml:"instructions"` + ProjectDir string `yaml:"project_dir"` + MaxBudgetUSD float64 `yaml:"max_budget_usd"` + PermissionMode string `yaml:"permission_mode"` + AllowedTools []string `yaml:"allowed_tools"` + DisallowedTools []string `yaml:"disallowed_tools"` + SystemPromptAppend string `yaml:"system_prompt_append"` + AdditionalArgs []string `yaml:"additional_args"` + SkipPlanning bool `yaml:"skip_planning"` +} +``` + +| Field | Type | Purpose | +|----------------------|------------|---------------------------------------------------------------------------------------------------------------------------------| +| `Type` | `string` | Agent type classifier. Cleared on retry by `ResetTaskForRetry` so the task can be re-classified. | +| `Model` | `string` | Claude model ID (e.g. `"claude-sonnet-4-6"`). Cleared on retry alongside `Type`. | +| `ContextFiles` | `[]string` | Paths to files pre-loaded as context for the agent. | +| `Instructions` | `string` | Natural-language task instructions. **Required.** | +| `ProjectDir` | `string` | Working directory the agent operates in. | +| `MaxBudgetUSD` | `float64` | API spend cap in USD. `0` means no limit. Must be ≥ 0. | +| `PermissionMode` | `string` | Claude permission mode. Valid values: `default`, `acceptEdits`, `bypassPermissions`, `plan`, `dontAsk`, `delegate`. | +| `AllowedTools` | `[]string` | Explicit tool whitelist passed to the agent. | +| `DisallowedTools` | `[]string` | Explicit tool blacklist passed to the agent. | +| `SystemPromptAppend` | `string` | Text appended to the Claude system prompt. | +| `AdditionalArgs` | `[]string` | Extra CLI arguments forwarded verbatim to the Claude executable. | +| `SkipPlanning` | `bool` | When `true`, the agent skips the planning phase. | + +--- + +## RetryConfig Struct + +```go +type RetryConfig struct { + MaxAttempts int `yaml:"max_attempts"` + Backoff string `yaml:"backoff"` // "linear" or "exponential" +} +``` + +| Field | Type | Purpose | Default | +|---------------|----------|------------------------------------------------------------|-----------------| +| `MaxAttempts` | `int` | Total number of attempts (first run + retries). Min: 1. | `1` | +| `Backoff` | `string` | Wait strategy between retries: `linear` or `exponential`. | `"exponential"` | + +--- + +## YAML Task File Format + +A task file may contain a single task or a batch (see next section). Every supported field is shown below with annotations. + +```yaml +# Unique identifier. Omit to have Claudomator generate a UUID. +id: "fix-login-bug" + +# Display name. Required. +name: "Fix login redirect bug" + +# Optional longer description. +description: "Users are redirected to /home instead of /dashboard after login." + +agent: + # Agent type for routing/classification. Optional; auto-assigned if blank. + type: "claude" + + # Claude model ID. Optional; auto-assigned if blank. + model: "claude-sonnet-4-6" + + # Task instructions passed to the agent. Required. + instructions: | + Fix the post-login redirect in src/auth/login.go so that users are + sent to /dashboard instead of /home. Add a regression test. + + # Working directory for the agent process. + project_dir: "/workspace/myapp" + + # Files loaded into the agent's context before execution starts. + context_files: + - "src/auth/login.go" + - "docs/adr/0003-auth.md" + + # USD spending cap. 0 = no limit. + max_budget_usd: 1.00 + + # Claude permission mode. + # Values: default | acceptEdits | bypassPermissions | plan | dontAsk | delegate + permission_mode: "acceptEdits" + + # Tool whitelist. Empty = all tools allowed. + allowed_tools: + - "Read" + - "Edit" + - "Bash" + + # Tool blacklist. + disallowed_tools: + - "WebFetch" + + # Appended to the agent's system prompt. + system_prompt_append: "Always write tests before implementation." + + # Extra arguments forwarded verbatim to the Claude executable. + additional_args: + - "--verbose" + + # Skip the planning phase. + skip_planning: false + +# Maximum wall-clock time per attempt. Uses Go duration syntax: "30m", "1h", "45s". +# 0 or omitted means no limit. +timeout: "30m" + +retry: + # Total attempts including the first run. Must be >= 1. + max_attempts: 3 + # "linear" or "exponential" + backoff: "exponential" + +# Scheduling priority: high | normal | low +priority: "normal" + +# Arbitrary labels for filtering. +tags: + - "bug" + - "auth" + +# IDs of tasks that must reach COMPLETED before this task is queued. +depends_on: + - "setup-test-db" +``` + +--- + +## Batch File Format + +A batch file wraps multiple task definitions under a `tasks:` key. All top-level task fields are supported per entry. + +```yaml +tasks: + - name: "Step 1 — scaffold" + agent: + instructions: "Create the initial project structure." + priority: "high" + + - name: "Step 2 — implement" + agent: + instructions: "Implement the feature described in docs/feature.md." + depends_on: + - "step-1-id" + + - name: "Step 3 — test" + agent: + instructions: "Write and run integration tests." + depends_on: + - "step-2-id" + retry: + max_attempts: 2 + backoff: "linear" +``` + +`ParseFile` / `Parse` detect the batch format automatically: if unmarshaling into `BatchFile` succeeds and produces at least one task, the batch path is used; otherwise single-task parsing is attempted. + +--- + +## State Constants + +```go +type State string + +const ( + StatePending State = "PENDING" + StateQueued State = "QUEUED" + StateRunning State = "RUNNING" + StateReady State = "READY" + StateCompleted State = "COMPLETED" + StateFailed State = "FAILED" + StateTimedOut State = "TIMED_OUT" + StateCancelled State = "CANCELLED" + StateBudgetExceeded State = "BUDGET_EXCEEDED" + StateBlocked State = "BLOCKED" +) +``` + +| Constant | String value | Description | +|-----------------------|-------------------|-------------------------------------------------------------------------------------------------------| +| `StatePending` | `PENDING` | Newly created; awaiting acceptance for scheduling. | +| `StateQueued` | `QUEUED` | Accepted by the scheduler and waiting for an available executor. | +| `StateRunning` | `RUNNING` | An agent is actively executing the task. | +| `StateReady` | `READY` | Agent finished execution; output is awaiting human review/approval. | +| `StateCompleted` | `COMPLETED` | Approved and finished. **Terminal** — no further transitions. | +| `StateFailed` | `FAILED` | Execution failed (non-zero exit or unrecoverable error). Eligible for retry → `QUEUED`. | +| `StateTimedOut` | `TIMED_OUT` | Execution exceeded the configured `timeout`. Eligible for retry or resume → `QUEUED`. | +| `StateCancelled` | `CANCELLED` | Manually cancelled. Can be restarted by transitioning back to `QUEUED`. | +| `StateBudgetExceeded` | `BUDGET_EXCEEDED` | The `max_budget_usd` cap was reached during execution. Eligible for retry → `QUEUED`. | +| `StateBlocked` | `BLOCKED` | The running agent posted a question requiring a human response before it can continue. | + +--- + +## State Machine + +### Valid Transitions + +| From | To | Condition | +|---------------------|---------------------|-------------------------------------------------------------------| +| `PENDING` | `QUEUED` | Task accepted for scheduling. | +| `PENDING` | `CANCELLED` | Manually cancelled before queuing. | +| `QUEUED` | `RUNNING` | An executor picks up the task. | +| `QUEUED` | `CANCELLED` | Manually cancelled while waiting in queue. | +| `RUNNING` | `READY` | Agent finished; output ready for human review. | +| `RUNNING` | `COMPLETED` | Agent finished successfully with no review required. | +| `RUNNING` | `FAILED` | Agent exited with an error. | +| `RUNNING` | `TIMED_OUT` | Wall-clock timeout exceeded. | +| `RUNNING` | `CANCELLED` | Manually cancelled mid-execution. | +| `RUNNING` | `BUDGET_EXCEEDED` | API spend reached `max_budget_usd`. | +| `RUNNING` | `BLOCKED` | Agent posted a question requiring human input before continuing. | +| `READY` | `COMPLETED` | Reviewer approves the output. | +| `READY` | `PENDING` | Reviewer rejects the output; task sent back for revision. | +| `FAILED` | `QUEUED` | Retry: task re-queued for another attempt. | +| `TIMED_OUT` | `QUEUED` | Retry or resume: task re-queued. | +| `CANCELLED` | `QUEUED` | Restart: task re-queued from scratch. | +| `BUDGET_EXCEEDED` | `QUEUED` | Retry with adjusted budget. | +| `BLOCKED` | `QUEUED` | Answer provided; task re-queued to resume. | +| `BLOCKED` | `READY` | Question resolved; output is ready for review. | +| `COMPLETED` | *(none)* | Terminal state — no outgoing transitions. | + +--- + +## Key Functions + +### `ParseFile` + +```go +func ParseFile(path string) ([]Task, error) +``` + +Reads a YAML file at `path` and returns the parsed tasks. Supports both single-task and batch formats. Initialises defaults (UUID, priority, retry policy, initial state, timestamps) on every returned task. + +### `Parse` + +```go +func Parse(data []byte) ([]Task, error) +``` + +Same as `ParseFile` but operates on raw YAML bytes instead of a file path. + +### `ValidTransition` + +```go +func ValidTransition(from, to State) bool +``` + +Returns `true` if transitioning from `from` to `to` is permitted by the state machine. Used by `storage.DB.UpdateTaskState` to enforce valid transitions inside a transaction. + +### `Validate` + +```go +func Validate(t *Task) error +``` + +Checks a task for constraint violations. Returns a `*ValidationError` (implementing `error`) listing all violations, or `nil` if the task is valid. All violations are collected before returning so callers receive the full list at once. + +--- + +## Validation Rules + +| Rule | Error message | +|------------------------------------------------------------------------------|--------------------------------------------------------------------| +| `name` must be non-empty. | `name is required` | +| `agent.instructions` must be non-empty. | `agent.instructions is required` | +| `agent.max_budget_usd` must be ≥ 0. | `agent.max_budget_usd must be non-negative` | +| `timeout` must be ≥ 0. | `timeout must be non-negative` | +| `retry.max_attempts` must be ≥ 1. | `retry.max_attempts must be at least 1` | +| `retry.backoff`, if set, must be `linear` or `exponential`. | `retry.backoff must be 'linear' or 'exponential'` | +| `priority`, if set, must be `high`, `normal`, or `low`. | `invalid priority "…"; must be high, normal, or low` | +| `agent.permission_mode`, if set, must be one of the six recognised values. | `invalid permission_mode "…"` | |
