summaryrefslogtreecommitdiff
path: root/internal/executor
diff options
context:
space:
mode:
Diffstat (limited to 'internal/executor')
-rw-r--r--internal/executor/classifier.go109
-rw-r--r--internal/executor/classifier_test.go49
-rw-r--r--internal/executor/claude.go34
-rw-r--r--internal/executor/claude_test.go27
-rw-r--r--internal/executor/executor.go148
-rw-r--r--internal/executor/executor_test.go61
-rw-r--r--internal/executor/gemini.go192
-rw-r--r--internal/executor/gemini_test.go103
-rw-r--r--internal/executor/preamble.go9
9 files changed, 675 insertions, 57 deletions
diff --git a/internal/executor/classifier.go b/internal/executor/classifier.go
new file mode 100644
index 0000000..79ebc27
--- /dev/null
+++ b/internal/executor/classifier.go
@@ -0,0 +1,109 @@
+package executor
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "os/exec"
+ "strings"
+)
+
+type Classification struct {
+ AgentType string `json:"agent_type"`
+ Model string `json:"model"`
+ Reason string `json:"reason"`
+}
+
+type SystemStatus struct {
+ ActiveTasks map[string]int
+ RateLimited map[string]bool
+}
+
+type Classifier struct {
+ GeminiBinaryPath string
+}
+
+const classificationPrompt = `
+You are a task classifier for Claudomator.
+Given a task description and system status, select the best agent (claude or gemini) and model to use.
+
+Agent Types:
+- claude: Best for complex coding, reasoning, and tool use.
+- gemini: Best for large context, fast reasoning, and multimodal tasks.
+
+Available Models:
+Claude:
+- claude-3-5-sonnet-latest (balanced)
+- claude-3-5-sonnet-20241022 (stable)
+- claude-3-opus-20240229 (most powerful, expensive)
+- claude-3-5-haiku-20241022 (fast, cheap)
+
+Gemini:
+- gemini-2.0-flash-lite (fastest, most efficient, best for simple tasks)
+- gemini-2.0-flash (fast, multimodal)
+- gemini-1.5-flash (fast, balanced)
+- gemini-1.5-pro (more powerful, larger context)
+
+Selection Criteria:
+- Agent: Prefer the one with least running tasks and no active rate limit.
+- Model: Select based on task complexity. Use powerful models (opus, pro) for complex reasoning/coding, flash-lite/flash/haiku for simple tasks.
+
+Task:
+Name: %s
+Instructions: %s
+
+System Status:
+%s
+
+Respond with ONLY a JSON object:
+{
+ "agent_type": "claude" | "gemini",
+ "model": "model-name",
+ "reason": "brief reason"
+}
+`
+
+func (c *Classifier) Classify(ctx context.Context, taskName, instructions string, status SystemStatus) (*Classification, error) {
+ statusStr := ""
+ for agent, active := range status.ActiveTasks {
+ statusStr += fmt.Sprintf("- Agent %s: %d active tasks, Rate Limited: %t\n", agent, active, status.RateLimited[agent])
+ }
+
+ prompt := fmt.Sprintf(classificationPrompt,
+ taskName, instructions, statusStr,
+ )
+
+ binary := c.GeminiBinaryPath
+ if binary == "" {
+ binary = "gemini"
+ }
+
+ // Use a minimal model for classification to be fast and cheap.
+ args := []string{
+ "--prompt", prompt,
+ "--model", "gemini-2.0-flash-lite",
+ "--output-format", "json",
+ }
+
+ cmd := exec.CommandContext(ctx, binary, args...)
+ out, err := cmd.Output()
+ if err != nil {
+ if exitErr, ok := err.(*exec.ExitError); ok {
+ return nil, fmt.Errorf("classifier failed (%v): %s", err, string(exitErr.Stderr))
+ }
+ return nil, fmt.Errorf("classifier failed: %w", err)
+ }
+
+ var cls Classification
+ // Gemini might wrap the JSON in markdown code blocks.
+ cleanOut := strings.TrimSpace(string(out))
+ cleanOut = strings.TrimPrefix(cleanOut, "```json")
+ cleanOut = strings.TrimSuffix(cleanOut, "```")
+ cleanOut = strings.TrimSpace(cleanOut)
+
+ if err := json.Unmarshal([]byte(cleanOut), &cls); err != nil {
+ return nil, fmt.Errorf("failed to parse classification JSON: %w\nOutput: %s", err, cleanOut)
+ }
+
+ return &cls, nil
+}
diff --git a/internal/executor/classifier_test.go b/internal/executor/classifier_test.go
new file mode 100644
index 0000000..4de44ca
--- /dev/null
+++ b/internal/executor/classifier_test.go
@@ -0,0 +1,49 @@
+package executor
+
+import (
+ "context"
+ "os"
+ "testing"
+)
+
+// TestClassifier_Classify_Mock tests the classifier with a mocked gemini binary.
+func TestClassifier_Classify_Mock(t *testing.T) {
+ // Create a temporary mock binary.
+ mockBinary := filepathJoin(t.TempDir(), "mock-gemini")
+ mockContent := `#!/bin/sh
+echo '{"agent_type": "gemini", "model": "gemini-2.0-flash", "reason": "test reason"}'
+`
+ if err := os.WriteFile(mockBinary, []byte(mockContent), 0755); err != nil {
+ t.Fatal(err)
+ }
+
+ c := &Classifier{GeminiBinaryPath: mockBinary}
+ status := SystemStatus{
+ ActiveTasks: map[string]int{"claude": 5, "gemini": 1},
+ RateLimited: map[string]bool{"claude": false, "gemini": false},
+ }
+
+ cls, err := c.Classify(context.Background(), "Test Task", "Test Instructions", status)
+ if err != nil {
+ t.Fatalf("Classify failed: %v", err)
+ }
+
+ if cls.AgentType != "gemini" {
+ t.Errorf("expected gemini, got %s", cls.AgentType)
+ }
+ if cls.Model != "gemini-2.0-flash" {
+ t.Errorf("expected gemini-2.0-flash, got %s", cls.Model)
+ }
+}
+
+func filepathJoin(elems ...string) string {
+ var path string
+ for i, e := range elems {
+ if i == 0 {
+ path = e
+ } else {
+ path = path + string(os.PathSeparator) + e
+ }
+ }
+ return path
+}
diff --git a/internal/executor/claude.go b/internal/executor/claude.go
index aa715da..e504369 100644
--- a/internal/executor/claude.go
+++ b/internal/executor/claude.go
@@ -61,7 +61,7 @@ func (r *ClaudeRunner) binaryPath() string {
// changes back to project_dir. On failure the sandbox is preserved and its
// path is included in the error.
func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error {
- projectDir := t.Claude.ProjectDir
+ projectDir := t.Agent.ProjectDir
// Validate project_dir exists when set.
if projectDir != "" {
@@ -319,21 +319,21 @@ func (r *ClaudeRunner) buildArgs(t *task.Task, e *storage.Execution, questionFil
"--output-format", "stream-json",
"--verbose",
}
- permMode := t.Claude.PermissionMode
+ permMode := t.Agent.PermissionMode
if permMode == "" {
permMode = "bypassPermissions"
}
args = append(args, "--permission-mode", permMode)
- if t.Claude.Model != "" {
- args = append(args, "--model", t.Claude.Model)
+ if t.Agent.Model != "" {
+ args = append(args, "--model", t.Agent.Model)
}
return args
}
- instructions := t.Claude.Instructions
- allowedTools := t.Claude.AllowedTools
+ instructions := t.Agent.Instructions
+ allowedTools := t.Agent.AllowedTools
- if !t.Claude.SkipPlanning {
+ if !t.Agent.SkipPlanning {
instructions = withPlanningPreamble(instructions)
// Ensure Bash is available so the agent can POST subtasks and ask questions.
hasBash := false
@@ -355,33 +355,33 @@ func (r *ClaudeRunner) buildArgs(t *task.Task, e *storage.Execution, questionFil
"--verbose",
}
- if t.Claude.Model != "" {
- args = append(args, "--model", t.Claude.Model)
+ if t.Agent.Model != "" {
+ args = append(args, "--model", t.Agent.Model)
}
- if t.Claude.MaxBudgetUSD > 0 {
- args = append(args, "--max-budget-usd", fmt.Sprintf("%.2f", t.Claude.MaxBudgetUSD))
+ if t.Agent.MaxBudgetUSD > 0 {
+ args = append(args, "--max-budget-usd", fmt.Sprintf("%.2f", t.Agent.MaxBudgetUSD))
}
// Default to bypassPermissions — claudomator runs tasks unattended, so
// prompting for write access would always stall execution. Tasks that need
// a more restrictive mode can set permission_mode explicitly.
- permMode := t.Claude.PermissionMode
+ permMode := t.Agent.PermissionMode
if permMode == "" {
permMode = "bypassPermissions"
}
args = append(args, "--permission-mode", permMode)
- if t.Claude.SystemPromptAppend != "" {
- args = append(args, "--append-system-prompt", t.Claude.SystemPromptAppend)
+ if t.Agent.SystemPromptAppend != "" {
+ args = append(args, "--append-system-prompt", t.Agent.SystemPromptAppend)
}
for _, tool := range allowedTools {
args = append(args, "--allowedTools", tool)
}
- for _, tool := range t.Claude.DisallowedTools {
+ for _, tool := range t.Agent.DisallowedTools {
args = append(args, "--disallowedTools", tool)
}
- for _, f := range t.Claude.ContextFiles {
+ for _, f := range t.Agent.ContextFiles {
args = append(args, "--add-dir", f)
}
- args = append(args, t.Claude.AdditionalArgs...)
+ args = append(args, t.Agent.AdditionalArgs...)
return args
}
diff --git a/internal/executor/claude_test.go b/internal/executor/claude_test.go
index 31dcf52..b5380f4 100644
--- a/internal/executor/claude_test.go
+++ b/internal/executor/claude_test.go
@@ -14,7 +14,8 @@ import (
func TestClaudeRunner_BuildArgs_BasicTask(t *testing.T) {
r := &ClaudeRunner{}
tk := &task.Task{
- Claude: task.ClaudeConfig{
+ Agent: task.AgentConfig{
+ Type: "claude",
Instructions: "fix the bug",
Model: "sonnet",
SkipPlanning: true,
@@ -37,7 +38,8 @@ func TestClaudeRunner_BuildArgs_BasicTask(t *testing.T) {
func TestClaudeRunner_BuildArgs_FullConfig(t *testing.T) {
r := &ClaudeRunner{}
tk := &task.Task{
- Claude: task.ClaudeConfig{
+ Agent: task.AgentConfig{
+ Type: "claude",
Instructions: "implement feature",
Model: "opus",
MaxBudgetUSD: 5.0,
@@ -79,7 +81,8 @@ func TestClaudeRunner_BuildArgs_FullConfig(t *testing.T) {
func TestClaudeRunner_BuildArgs_DefaultsToBypassPermissions(t *testing.T) {
r := &ClaudeRunner{}
tk := &task.Task{
- Claude: task.ClaudeConfig{
+ Agent: task.AgentConfig{
+ Type: "claude",
Instructions: "do work",
SkipPlanning: true,
// PermissionMode intentionally not set
@@ -102,7 +105,8 @@ func TestClaudeRunner_BuildArgs_DefaultsToBypassPermissions(t *testing.T) {
func TestClaudeRunner_BuildArgs_RespectsExplicitPermissionMode(t *testing.T) {
r := &ClaudeRunner{}
tk := &task.Task{
- Claude: task.ClaudeConfig{
+ Agent: task.AgentConfig{
+ Type: "claude",
Instructions: "do work",
PermissionMode: "default",
SkipPlanning: true,
@@ -125,7 +129,8 @@ func TestClaudeRunner_BuildArgs_RespectsExplicitPermissionMode(t *testing.T) {
func TestClaudeRunner_BuildArgs_AlwaysIncludesVerbose(t *testing.T) {
r := &ClaudeRunner{}
tk := &task.Task{
- Claude: task.ClaudeConfig{
+ Agent: task.AgentConfig{
+ Type: "claude",
Instructions: "do something",
SkipPlanning: true,
},
@@ -148,7 +153,8 @@ func TestClaudeRunner_BuildArgs_AlwaysIncludesVerbose(t *testing.T) {
func TestClaudeRunner_BuildArgs_PreamblePrepended(t *testing.T) {
r := &ClaudeRunner{}
tk := &task.Task{
- Claude: task.ClaudeConfig{
+ Agent: task.AgentConfig{
+ Type: "claude",
Instructions: "fix the bug",
SkipPlanning: false,
},
@@ -171,7 +177,8 @@ func TestClaudeRunner_BuildArgs_PreamblePrepended(t *testing.T) {
func TestClaudeRunner_BuildArgs_PreambleAddsBash(t *testing.T) {
r := &ClaudeRunner{}
tk := &task.Task{
- Claude: task.ClaudeConfig{
+ Agent: task.AgentConfig{
+ Type: "claude",
Instructions: "do work",
AllowedTools: []string{"Read"},
SkipPlanning: false,
@@ -195,7 +202,8 @@ func TestClaudeRunner_BuildArgs_PreambleAddsBash(t *testing.T) {
func TestClaudeRunner_BuildArgs_PreambleBashNotDuplicated(t *testing.T) {
r := &ClaudeRunner{}
tk := &task.Task{
- Claude: task.ClaudeConfig{
+ Agent: task.AgentConfig{
+ Type: "claude",
Instructions: "do work",
AllowedTools: []string{"Bash", "Read"},
SkipPlanning: false,
@@ -223,7 +231,8 @@ func TestClaudeRunner_Run_InaccessibleWorkingDir_ReturnsError(t *testing.T) {
LogDir: t.TempDir(),
}
tk := &task.Task{
- Claude: task.ClaudeConfig{
+ Agent: task.AgentConfig{
+ Type: "claude",
ProjectDir: "/nonexistent/path/does/not/exist",
SkipPlanning: true,
},
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index 0245899..d1c8e72 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -36,18 +36,21 @@ type workItem struct {
// Pool manages a bounded set of concurrent task workers.
type Pool struct {
maxConcurrent int
- runner Runner
+ runners map[string]Runner
store *storage.DB
logger *slog.Logger
depPollInterval time.Duration // how often waitForDependencies polls; defaults to 5s
- mu sync.Mutex
- active int
- cancels map[string]context.CancelFunc // taskID → cancel
- resultCh chan *Result
- workCh chan workItem // internal bounded queue; Submit enqueues here
- doneCh chan struct{} // signals when a worker slot is freed
- Questions *QuestionRegistry
+ mu sync.Mutex
+ active int
+ activePerAgent map[string]int
+ rateLimited map[string]time.Time // agentType -> until
+ cancels map[string]context.CancelFunc // taskID → cancel
+ resultCh chan *Result
+ workCh chan workItem // internal bounded queue; Submit enqueues here
+ doneCh chan struct{} // signals when a worker slot is freed
+ Questions *QuestionRegistry
+ Classifier *Classifier
}
// Result is emitted when a task execution completes.
@@ -57,16 +60,18 @@ type Result struct {
Err error
}
-func NewPool(maxConcurrent int, runner Runner, store *storage.DB, logger *slog.Logger) *Pool {
+func NewPool(maxConcurrent int, runners map[string]Runner, store *storage.DB, logger *slog.Logger) *Pool {
if maxConcurrent < 1 {
maxConcurrent = 1
}
p := &Pool{
maxConcurrent: maxConcurrent,
- runner: runner,
+ runners: runners,
store: store,
logger: logger,
depPollInterval: 5 * time.Second,
+ activePerAgent: make(map[string]int),
+ rateLimited: make(map[string]time.Time),
cancels: make(map[string]context.CancelFunc),
resultCh: make(chan *Result, maxConcurrent*2),
workCh: make(chan workItem, maxConcurrent*10+100),
@@ -147,10 +152,32 @@ func (p *Pool) SubmitResume(ctx context.Context, t *task.Task, exec *storage.Exe
}
}
+func (p *Pool) getRunner(t *task.Task) (Runner, error) {
+ agentType := t.Agent.Type
+ if agentType == "" {
+ agentType = "claude" // Default for backward compatibility
+ }
+ runner, ok := p.runners[agentType]
+ if !ok {
+ return nil, fmt.Errorf("unsupported agent type: %q", agentType)
+ }
+ return runner, nil
+}
+
func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Execution) {
+ agentType := t.Agent.Type
+ if agentType == "" {
+ agentType = "claude"
+ }
+
+ p.mu.Lock()
+ p.activePerAgent[agentType]++
+ p.mu.Unlock()
+
defer func() {
p.mu.Lock()
p.active--
+ p.activePerAgent[agentType]--
p.mu.Unlock()
select {
case p.doneCh <- struct{}{}:
@@ -158,8 +185,15 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex
}
}()
+ runner, err := p.getRunner(t)
+ if err != nil {
+ p.logger.Error("failed to get runner for resume", "error", err, "taskID", t.ID)
+ p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err}
+ return
+ }
+
// Pre-populate log paths.
- if lp, ok := p.runner.(LogPather); ok {
+ if lp, ok := runner.(LogPather); ok {
if logDir := lp.ExecLogDir(exec.ID); logDir != "" {
exec.StdoutPath = filepath.Join(logDir, "stdout.log")
exec.StderrPath = filepath.Join(logDir, "stderr.log")
@@ -182,12 +216,30 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex
} else {
ctx, cancel = context.WithCancel(ctx)
}
- defer cancel()
+ p.mu.Lock()
+ p.cancels[t.ID] = cancel
+ p.mu.Unlock()
+ defer func() {
+ cancel()
+ p.mu.Lock()
+ delete(p.cancels, t.ID)
+ p.mu.Unlock()
+ }()
- err := p.runner.Run(ctx, t, exec)
+ err = runner.Run(ctx, t, exec)
exec.EndTime = time.Now().UTC()
if err != nil {
+ if isRateLimitError(err) {
+ p.mu.Lock()
+ retryAfter := parseRetryAfter(err.Error())
+ if retryAfter == 0 {
+ retryAfter = 1 * time.Minute
+ }
+ p.rateLimited[agentType] = time.Now().Add(retryAfter)
+ p.mu.Unlock()
+ }
+
var blockedErr *BlockedError
if errors.As(err, &blockedErr) {
exec.Status = "BLOCKED"
@@ -234,9 +286,45 @@ func (p *Pool) ActiveCount() int {
}
func (p *Pool) execute(ctx context.Context, t *task.Task) {
+ // 1. Classification
+ if p.Classifier != nil {
+ p.mu.Lock()
+ activeTasks := make(map[string]int)
+ rateLimited := make(map[string]bool)
+ now := time.Now()
+ for agent := range p.runners {
+ activeTasks[agent] = p.activePerAgent[agent]
+ rateLimited[agent] = now.Before(p.rateLimited[agent])
+ }
+ status := SystemStatus{
+ ActiveTasks: activeTasks,
+ RateLimited: rateLimited,
+ }
+ p.mu.Unlock()
+
+ cls, err := p.Classifier.Classify(ctx, t.Name, t.Agent.Instructions, status)
+ if err == nil {
+ p.logger.Info("task classified", "taskID", t.ID, "agent", cls.AgentType, "model", cls.Model, "reason", cls.Reason)
+ t.Agent.Type = cls.AgentType
+ t.Agent.Model = cls.Model
+ } else {
+ p.logger.Error("classification failed", "error", err, "taskID", t.ID)
+ }
+ }
+
+ agentType := t.Agent.Type
+ if agentType == "" {
+ agentType = "claude"
+ }
+
+ p.mu.Lock()
+ p.activePerAgent[agentType]++
+ p.mu.Unlock()
+
defer func() {
p.mu.Lock()
p.active--
+ p.activePerAgent[agentType]--
p.mu.Unlock()
select {
case p.doneCh <- struct{}{}:
@@ -244,6 +332,26 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
}
}()
+ runner, err := p.getRunner(t)
+ if err != nil {
+ p.logger.Error("failed to get runner", "error", err, "taskID", t.ID)
+ now := time.Now().UTC()
+ exec := &storage.Execution{
+ ID: uuid.New().String(),
+ TaskID: t.ID,
+ StartTime: now,
+ EndTime: now,
+ Status: "FAILED",
+ ErrorMsg: err.Error(),
+ }
+ if createErr := p.store.CreateExecution(exec); createErr != nil {
+ p.logger.Error("failed to create execution record", "error", createErr)
+ }
+ p.store.UpdateTaskState(t.ID, task.StateFailed)
+ p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err}
+ return
+ }
+
// Wait for all dependencies to complete before starting execution.
if len(t.DependsOn) > 0 {
if err := p.waitForDependencies(ctx, t); err != nil {
@@ -275,7 +383,7 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
// Pre-populate log paths so they're available in the DB immediately —
// before the subprocess starts — enabling live tailing and debugging.
- if lp, ok := p.runner.(LogPather); ok {
+ if lp, ok := runner.(LogPather); ok {
if logDir := lp.ExecLogDir(execID); logDir != "" {
exec.StdoutPath = filepath.Join(logDir, "stdout.log")
exec.StderrPath = filepath.Join(logDir, "stderr.log")
@@ -309,10 +417,20 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
}()
// Run the task.
- err := p.runner.Run(ctx, t, exec)
+ err = runner.Run(ctx, t, exec)
exec.EndTime = time.Now().UTC()
if err != nil {
+ if isRateLimitError(err) {
+ p.mu.Lock()
+ retryAfter := parseRetryAfter(err.Error())
+ if retryAfter == 0 {
+ retryAfter = 1 * time.Minute
+ }
+ p.rateLimited[agentType] = time.Now().Add(retryAfter)
+ p.mu.Unlock()
+ }
+
var blockedErr *BlockedError
if errors.As(err, &blockedErr) {
exec.Status = "BLOCKED"
diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go
index 414f852..028e5cf 100644
--- a/internal/executor/executor_test.go
+++ b/internal/executor/executor_test.go
@@ -64,7 +64,7 @@ func makeTask(id string) *task.Task {
now := time.Now().UTC()
return &task.Task{
ID: id, Name: "Test " + id,
- Claude: task.ClaudeConfig{Instructions: "test"},
+ Agent: task.AgentConfig{Type: "claude", Instructions: "test"},
Priority: task.PriorityNormal,
Retry: task.RetryConfig{MaxAttempts: 1, Backoff: "linear"},
Tags: []string{},
@@ -77,8 +77,9 @@ func makeTask(id string) *task.Task {
func TestPool_Submit_TopLevel_GoesToReady(t *testing.T) {
store := testStore(t)
runner := &mockRunner{}
+ runners := map[string]Runner{"claude": runner}
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
- pool := NewPool(2, runner, store, logger)
+ pool := NewPool(2, runners, store, logger)
tk := makeTask("ps-1") // no ParentTaskID → top-level
store.CreateTask(tk)
@@ -104,8 +105,9 @@ func TestPool_Submit_TopLevel_GoesToReady(t *testing.T) {
func TestPool_Submit_Subtask_GoesToCompleted(t *testing.T) {
store := testStore(t)
runner := &mockRunner{}
+ runners := map[string]Runner{"claude": runner}
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
- pool := NewPool(2, runner, store, logger)
+ pool := NewPool(2, runners, store, logger)
tk := makeTask("sub-1")
tk.ParentTaskID = "parent-99" // subtask
@@ -132,8 +134,9 @@ func TestPool_Submit_Subtask_GoesToCompleted(t *testing.T) {
func TestPool_Submit_Failure(t *testing.T) {
store := testStore(t)
runner := &mockRunner{err: fmt.Errorf("boom"), exitCode: 1}
+ runners := map[string]Runner{"claude": runner}
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
- pool := NewPool(2, runner, store, logger)
+ pool := NewPool(2, runners, store, logger)
tk := makeTask("pf-1")
store.CreateTask(tk)
@@ -151,8 +154,9 @@ func TestPool_Submit_Failure(t *testing.T) {
func TestPool_Submit_Timeout(t *testing.T) {
store := testStore(t)
runner := &mockRunner{delay: 5 * time.Second}
+ runners := map[string]Runner{"claude": runner}
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
- pool := NewPool(2, runner, store, logger)
+ pool := NewPool(2, runners, store, logger)
tk := makeTask("pt-1")
tk.Timeout.Duration = 50 * time.Millisecond
@@ -168,8 +172,9 @@ func TestPool_Submit_Timeout(t *testing.T) {
func TestPool_Submit_Cancellation(t *testing.T) {
store := testStore(t)
runner := &mockRunner{delay: 5 * time.Second}
+ runners := map[string]Runner{"claude": runner}
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
- pool := NewPool(2, runner, store, logger)
+ pool := NewPool(2, runners, store, logger)
ctx, cancel := context.WithCancel(context.Background())
tk := makeTask("pc-1")
@@ -188,8 +193,9 @@ func TestPool_Submit_Cancellation(t *testing.T) {
func TestPool_Cancel_StopsRunningTask(t *testing.T) {
store := testStore(t)
runner := &mockRunner{delay: 5 * time.Second}
+ runners := map[string]Runner{"claude": runner}
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
- pool := NewPool(2, runner, store, logger)
+ pool := NewPool(2, runners, store, logger)
tk := makeTask("cancel-1")
store.CreateTask(tk)
@@ -209,8 +215,9 @@ func TestPool_Cancel_StopsRunningTask(t *testing.T) {
func TestPool_Cancel_UnknownTask_ReturnsFalse(t *testing.T) {
store := testStore(t)
runner := &mockRunner{}
+ runners := map[string]Runner{"claude": runner}
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
- pool := NewPool(2, runner, store, logger)
+ pool := NewPool(2, runners, store, logger)
if ok := pool.Cancel("nonexistent"); ok {
t.Error("Cancel returned true for unknown task")
@@ -222,8 +229,9 @@ func TestPool_Cancel_UnknownTask_ReturnsFalse(t *testing.T) {
func TestPool_QueuedWhenAtCapacity(t *testing.T) {
store := testStore(t)
runner := &mockRunner{delay: 100 * time.Millisecond}
+ runners := map[string]Runner{"claude": runner}
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
- pool := NewPool(1, runner, store, logger)
+ pool := NewPool(1, runners, store, logger)
tk1 := makeTask("queue-1")
store.CreateTask(tk1)
@@ -273,8 +281,9 @@ func (m *logPatherMockRunner) Run(ctx context.Context, t *task.Task, e *storage.
func TestPool_Execute_LogPathsPreSetBeforeRun(t *testing.T) {
store := testStore(t)
runner := &logPatherMockRunner{logDir: t.TempDir()}
+ runners := map[string]Runner{"claude": runner}
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
- pool := NewPool(2, runner, store, logger)
+ pool := NewPool(2, runners, store, logger)
tk := makeTask("lp-1")
store.CreateTask(tk)
@@ -304,8 +313,9 @@ func TestPool_Execute_LogPathsPreSetBeforeRun(t *testing.T) {
func TestPool_Execute_NoLogPather_PathsEmptyBeforeRun(t *testing.T) {
store := testStore(t)
runner := &mockRunner{} // does NOT implement LogPather
+ runners := map[string]Runner{"claude": runner}
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
- pool := NewPool(2, runner, store, logger)
+ pool := NewPool(2, runners, store, logger)
tk := makeTask("nolp-1")
store.CreateTask(tk)
@@ -321,8 +331,9 @@ func TestPool_Execute_NoLogPather_PathsEmptyBeforeRun(t *testing.T) {
func TestPool_ConcurrentExecution(t *testing.T) {
store := testStore(t)
runner := &mockRunner{delay: 50 * time.Millisecond}
+ runners := map[string]Runner{"claude": runner}
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
- pool := NewPool(3, runner, store, logger)
+ pool := NewPool(3, runners, store, logger)
for i := 0; i < 3; i++ {
tk := makeTask(fmt.Sprintf("cc-%d", i))
@@ -343,3 +354,29 @@ func TestPool_ConcurrentExecution(t *testing.T) {
t.Errorf("calls: want 3, got %d", runner.callCount())
}
}
+
+func TestPool_UnsupportedAgent(t *testing.T) {
+ store := testStore(t)
+ runners := map[string]Runner{"claude": &mockRunner{}}
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, runners, store, logger)
+
+ tk := makeTask("bad-agent")
+ tk.Agent.Type = "super-ai"
+ store.CreateTask(tk)
+
+ if err := pool.Submit(context.Background(), tk); err != nil {
+ t.Fatalf("submit: %v", err)
+ }
+
+ result := <-pool.Results()
+ if result.Err == nil {
+ t.Fatal("expected error for unsupported agent")
+ }
+ if !strings.Contains(result.Err.Error(), "unsupported agent type") {
+ t.Errorf("expected 'unsupported agent type' in error, got: %v", result.Err)
+ }
+ if result.Execution.Status != "FAILED" {
+ t.Errorf("status: want FAILED, got %q", result.Execution.Status)
+ }
+}
diff --git a/internal/executor/gemini.go b/internal/executor/gemini.go
new file mode 100644
index 0000000..956d8b5
--- /dev/null
+++ b/internal/executor/gemini.go
@@ -0,0 +1,192 @@
+package executor
+
+import (
+ "context"
+ "fmt"
+ "log/slog"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "strings"
+ "sync"
+ "syscall"
+
+ "github.com/thepeterstone/claudomator/internal/storage"
+ "github.com/thepeterstone/claudomator/internal/task"
+)
+
+// GeminiRunner spawns the `gemini` CLI in non-interactive mode.
+type GeminiRunner struct {
+ BinaryPath string // defaults to "gemini"
+ Logger *slog.Logger
+ LogDir string // base directory for execution logs
+ APIURL string // base URL of the Claudomator API, passed to subprocesses
+}
+
+// ExecLogDir returns the log directory for the given execution ID.
+func (r *GeminiRunner) ExecLogDir(execID string) string {
+ if r.LogDir == "" {
+ return ""
+ }
+ return filepath.Join(r.LogDir, execID)
+}
+
+func (r *GeminiRunner) binaryPath() string {
+ if r.BinaryPath != "" {
+ return r.BinaryPath
+ }
+ return "gemini"
+}
+
+// Run executes a gemini <instructions> invocation, streaming output to log files.
+func (r *GeminiRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error {
+ if t.Agent.ProjectDir != "" {
+ if _, err := os.Stat(t.Agent.ProjectDir); err != nil {
+ return fmt.Errorf("project_dir %q: %w", t.Agent.ProjectDir, err)
+ }
+ }
+
+ logDir := r.ExecLogDir(e.ID)
+ if logDir == "" {
+ logDir = e.ID
+ }
+ if err := os.MkdirAll(logDir, 0700); err != nil {
+ return fmt.Errorf("creating log dir: %w", err)
+ }
+ if e.StdoutPath == "" {
+ e.StdoutPath = filepath.Join(logDir, "stdout.log")
+ e.StderrPath = filepath.Join(logDir, "stderr.log")
+ e.ArtifactDir = logDir
+ }
+
+ if e.SessionID == "" {
+ e.SessionID = e.ID
+ }
+
+ questionFile := filepath.Join(logDir, "question.json")
+ args := r.buildArgs(t, e, questionFile)
+
+ // Gemini CLI doesn't necessarily have the same rate limiting behavior as Claude,
+ // but we'll use a similar execution pattern.
+ err := r.execOnce(ctx, args, t.Agent.ProjectDir, e)
+ if err != nil {
+ return err
+ }
+
+ // Check whether the agent left a question before exiting.
+ data, readErr := os.ReadFile(questionFile)
+ if readErr == nil {
+ os.Remove(questionFile) // consumed
+ return &BlockedError{QuestionJSON: strings.TrimSpace(string(data)), SessionID: e.SessionID}
+ }
+ return nil
+}
+
+func (r *GeminiRunner) execOnce(ctx context.Context, args []string, workingDir string, e *storage.Execution) error {
+ cmd := exec.CommandContext(ctx, r.binaryPath(), args...)
+ cmd.Env = append(os.Environ(),
+ "CLAUDOMATOR_API_URL="+r.APIURL,
+ "CLAUDOMATOR_TASK_ID="+e.TaskID,
+ "CLAUDOMATOR_QUESTION_FILE="+filepath.Join(e.ArtifactDir, "question.json"),
+ )
+ cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
+ if workingDir != "" {
+ cmd.Dir = workingDir
+ }
+
+ stdoutFile, err := os.Create(e.StdoutPath)
+ if err != nil {
+ return fmt.Errorf("creating stdout log: %w", err)
+ }
+ defer stdoutFile.Close()
+
+ stderrFile, err := os.Create(e.StderrPath)
+ if err != nil {
+ return fmt.Errorf("creating stderr log: %w", err)
+ }
+ defer stderrFile.Close()
+
+ stdoutR, stdoutW, err := os.Pipe()
+ if err != nil {
+ return fmt.Errorf("creating stdout pipe: %w", err)
+ }
+ cmd.Stdout = stdoutW
+ cmd.Stderr = stderrFile
+
+ if err := cmd.Start(); err != nil {
+ stdoutW.Close()
+ stdoutR.Close()
+ return fmt.Errorf("starting gemini: %w", err)
+ }
+ stdoutW.Close()
+
+ killDone := make(chan struct{})
+ go func() {
+ select {
+ case <-ctx.Done():
+ syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)
+ case <-killDone:
+ }
+ }()
+
+ var costUSD float64
+ var streamErr error
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ // Reusing parseStream as the JSONL format should be compatible
+ costUSD, streamErr = parseStream(stdoutR, stdoutFile, r.Logger)
+ stdoutR.Close()
+ }()
+
+ waitErr := cmd.Wait()
+ close(killDone)
+ wg.Wait()
+
+ e.CostUSD = costUSD
+
+ if waitErr != nil {
+ if exitErr, ok := waitErr.(*exec.ExitError); ok {
+ e.ExitCode = exitErr.ExitCode()
+ }
+ return fmt.Errorf("gemini exited with error: %w", waitErr)
+ }
+
+ e.ExitCode = 0
+ if streamErr != nil {
+ return streamErr
+ }
+ return nil
+}
+
+func (r *GeminiRunner) buildArgs(t *task.Task, e *storage.Execution, questionFile string) []string {
+ // Gemini CLI uses a different command structure: gemini "instructions" [flags]
+
+ instructions := t.Agent.Instructions
+ if !t.Agent.SkipPlanning {
+ instructions = withPlanningPreamble(instructions)
+ }
+
+ args := []string{
+ instructions,
+ "--output-format", "stream-json",
+ }
+
+ // Note: Gemini CLI flags might differ from Claude CLI.
+ // Assuming common flags for now, but these may need adjustment.
+ if t.Agent.Model != "" {
+ args = append(args, "--model", t.Agent.Model)
+ }
+
+ // Gemini CLI doesn't use --session-id for the first run in the same way,
+ // or it might use it differently. For now we assume compatibility.
+ if e.SessionID != "" {
+ // If it's a resume, it might use different flags.
+ if e.ResumeSessionID != "" {
+ // This is a placeholder for Gemini's resume logic
+ }
+ }
+
+ return args
+}
diff --git a/internal/executor/gemini_test.go b/internal/executor/gemini_test.go
new file mode 100644
index 0000000..42253da
--- /dev/null
+++ b/internal/executor/gemini_test.go
@@ -0,0 +1,103 @@
+package executor
+
+import (
+ "context"
+ "io"
+ "log/slog"
+ "strings"
+ "testing"
+
+ "github.com/thepeterstone/claudomator/internal/storage"
+ "github.com/thepeterstone/claudomator/internal/task"
+)
+
+func TestGeminiRunner_BuildArgs_BasicTask(t *testing.T) {
+ r := &GeminiRunner{}
+ tk := &task.Task{
+ Agent: task.AgentConfig{
+ Type: "gemini",
+ Instructions: "fix the bug",
+ Model: "gemini-2.0-flash",
+ SkipPlanning: true,
+ },
+ }
+
+ args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
+
+ // Gemini CLI: instructions is the first positional arg
+ if len(args) < 1 || args[0] != "fix the bug" {
+ t.Errorf("expected instructions as first arg, got: %v", args)
+ }
+
+ argMap := make(map[string]bool)
+ for _, a := range args {
+ argMap[a] = true
+ }
+ for _, want := range []string{"--output-format", "stream-json", "--model", "gemini-2.0-flash"} {
+ if !argMap[want] {
+ t.Errorf("missing arg %q in %v", want, args)
+ }
+ }
+}
+
+func TestGeminiRunner_BuildArgs_PreamblePrepended(t *testing.T) {
+ r := &GeminiRunner{}
+ tk := &task.Task{
+ Agent: task.AgentConfig{
+ Type: "gemini",
+ Instructions: "fix the bug",
+ SkipPlanning: false,
+ },
+ }
+
+ args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json")
+
+ if len(args) < 1 {
+ t.Fatalf("expected at least 1 arg, got: %v", args)
+ }
+ if !strings.HasPrefix(args[0], planningPreamble) {
+ t.Errorf("instructions should start with planning preamble")
+ }
+ if !strings.HasSuffix(args[0], "fix the bug") {
+ t.Errorf("instructions should end with original instructions")
+ }
+}
+
+func TestGeminiRunner_Run_InaccessibleProjectDir_ReturnsError(t *testing.T) {
+ r := &GeminiRunner{
+ BinaryPath: "true", // would succeed if it ran
+ Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
+ LogDir: t.TempDir(),
+ }
+ tk := &task.Task{
+ Agent: task.AgentConfig{
+ Type: "gemini",
+ ProjectDir: "/nonexistent/path/does/not/exist",
+ SkipPlanning: true,
+ },
+ }
+ exec := &storage.Execution{ID: "test-exec"}
+
+ err := r.Run(context.Background(), tk, exec)
+
+ if err == nil {
+ t.Fatal("expected error for inaccessible project_dir, got nil")
+ }
+ if !strings.Contains(err.Error(), "project_dir") {
+ t.Errorf("expected 'project_dir' in error, got: %v", err)
+ }
+}
+
+func TestGeminiRunner_BinaryPath_Default(t *testing.T) {
+ r := &GeminiRunner{}
+ if r.binaryPath() != "gemini" {
+ t.Errorf("want 'gemini', got %q", r.binaryPath())
+ }
+}
+
+func TestGeminiRunner_BinaryPath_Custom(t *testing.T) {
+ r := &GeminiRunner{BinaryPath: "/usr/local/bin/gemini"}
+ if r.binaryPath() != "/usr/local/bin/gemini" {
+ t.Errorf("want custom path, got %q", r.binaryPath())
+ }
+}
diff --git a/internal/executor/preamble.go b/internal/executor/preamble.go
index 71f8233..b20f7ea 100644
--- a/internal/executor/preamble.go
+++ b/internal/executor/preamble.go
@@ -23,11 +23,12 @@ Before doing any implementation work:
1. Estimate: will this task take more than 3 minutes of implementation effort?
-2. If YES — break it down into subtasks:
- - Create 3–7 discrete subtasks using the claudomator CLI, for example:
- claudomator create "Subtask name" --instructions "..." --working-dir "/path" --parent-id "$CLAUDOMATOR_TASK_ID" --server "$CLAUDOMATOR_API_URL"
- - Do NOT pass --start. Tasks will be queued and started in order by the operator.
+2. If YES — break it down:
+ - Create 3–7 discrete subtasks by POSTing to $CLAUDOMATOR_API_URL/api/tasks
+ - Each subtask POST body should be JSON with: name, agent.instructions, agent.working_dir (copy from current task), agent.model, agent.allowed_tools, and agent.skip_planning set to true
+ - Set parent_task_id to $CLAUDOMATOR_TASK_ID in each POST body
- After creating all subtasks, output a brief summary and STOP. Do not implement anything.
+ - You can also specify agent.type (either "claude" or "gemini") to choose the agent for subtasks.
3. If NO — proceed with the task instructions below.