diff options
Diffstat (limited to 'internal/executor')
| -rw-r--r-- | internal/executor/classifier.go | 109 | ||||
| -rw-r--r-- | internal/executor/classifier_test.go | 49 | ||||
| -rw-r--r-- | internal/executor/claude.go | 34 | ||||
| -rw-r--r-- | internal/executor/claude_test.go | 27 | ||||
| -rw-r--r-- | internal/executor/executor.go | 148 | ||||
| -rw-r--r-- | internal/executor/executor_test.go | 61 | ||||
| -rw-r--r-- | internal/executor/gemini.go | 192 | ||||
| -rw-r--r-- | internal/executor/gemini_test.go | 103 | ||||
| -rw-r--r-- | internal/executor/preamble.go | 9 |
9 files changed, 675 insertions, 57 deletions
diff --git a/internal/executor/classifier.go b/internal/executor/classifier.go new file mode 100644 index 0000000..79ebc27 --- /dev/null +++ b/internal/executor/classifier.go @@ -0,0 +1,109 @@ +package executor + +import ( + "context" + "encoding/json" + "fmt" + "os/exec" + "strings" +) + +type Classification struct { + AgentType string `json:"agent_type"` + Model string `json:"model"` + Reason string `json:"reason"` +} + +type SystemStatus struct { + ActiveTasks map[string]int + RateLimited map[string]bool +} + +type Classifier struct { + GeminiBinaryPath string +} + +const classificationPrompt = ` +You are a task classifier for Claudomator. +Given a task description and system status, select the best agent (claude or gemini) and model to use. + +Agent Types: +- claude: Best for complex coding, reasoning, and tool use. +- gemini: Best for large context, fast reasoning, and multimodal tasks. + +Available Models: +Claude: +- claude-3-5-sonnet-latest (balanced) +- claude-3-5-sonnet-20241022 (stable) +- claude-3-opus-20240229 (most powerful, expensive) +- claude-3-5-haiku-20241022 (fast, cheap) + +Gemini: +- gemini-2.0-flash-lite (fastest, most efficient, best for simple tasks) +- gemini-2.0-flash (fast, multimodal) +- gemini-1.5-flash (fast, balanced) +- gemini-1.5-pro (more powerful, larger context) + +Selection Criteria: +- Agent: Prefer the one with least running tasks and no active rate limit. +- Model: Select based on task complexity. Use powerful models (opus, pro) for complex reasoning/coding, flash-lite/flash/haiku for simple tasks. + +Task: +Name: %s +Instructions: %s + +System Status: +%s + +Respond with ONLY a JSON object: +{ + "agent_type": "claude" | "gemini", + "model": "model-name", + "reason": "brief reason" +} +` + +func (c *Classifier) Classify(ctx context.Context, taskName, instructions string, status SystemStatus) (*Classification, error) { + statusStr := "" + for agent, active := range status.ActiveTasks { + statusStr += fmt.Sprintf("- Agent %s: %d active tasks, Rate Limited: %t\n", agent, active, status.RateLimited[agent]) + } + + prompt := fmt.Sprintf(classificationPrompt, + taskName, instructions, statusStr, + ) + + binary := c.GeminiBinaryPath + if binary == "" { + binary = "gemini" + } + + // Use a minimal model for classification to be fast and cheap. + args := []string{ + "--prompt", prompt, + "--model", "gemini-2.0-flash-lite", + "--output-format", "json", + } + + cmd := exec.CommandContext(ctx, binary, args...) + out, err := cmd.Output() + if err != nil { + if exitErr, ok := err.(*exec.ExitError); ok { + return nil, fmt.Errorf("classifier failed (%v): %s", err, string(exitErr.Stderr)) + } + return nil, fmt.Errorf("classifier failed: %w", err) + } + + var cls Classification + // Gemini might wrap the JSON in markdown code blocks. + cleanOut := strings.TrimSpace(string(out)) + cleanOut = strings.TrimPrefix(cleanOut, "```json") + cleanOut = strings.TrimSuffix(cleanOut, "```") + cleanOut = strings.TrimSpace(cleanOut) + + if err := json.Unmarshal([]byte(cleanOut), &cls); err != nil { + return nil, fmt.Errorf("failed to parse classification JSON: %w\nOutput: %s", err, cleanOut) + } + + return &cls, nil +} diff --git a/internal/executor/classifier_test.go b/internal/executor/classifier_test.go new file mode 100644 index 0000000..4de44ca --- /dev/null +++ b/internal/executor/classifier_test.go @@ -0,0 +1,49 @@ +package executor + +import ( + "context" + "os" + "testing" +) + +// TestClassifier_Classify_Mock tests the classifier with a mocked gemini binary. +func TestClassifier_Classify_Mock(t *testing.T) { + // Create a temporary mock binary. + mockBinary := filepathJoin(t.TempDir(), "mock-gemini") + mockContent := `#!/bin/sh +echo '{"agent_type": "gemini", "model": "gemini-2.0-flash", "reason": "test reason"}' +` + if err := os.WriteFile(mockBinary, []byte(mockContent), 0755); err != nil { + t.Fatal(err) + } + + c := &Classifier{GeminiBinaryPath: mockBinary} + status := SystemStatus{ + ActiveTasks: map[string]int{"claude": 5, "gemini": 1}, + RateLimited: map[string]bool{"claude": false, "gemini": false}, + } + + cls, err := c.Classify(context.Background(), "Test Task", "Test Instructions", status) + if err != nil { + t.Fatalf("Classify failed: %v", err) + } + + if cls.AgentType != "gemini" { + t.Errorf("expected gemini, got %s", cls.AgentType) + } + if cls.Model != "gemini-2.0-flash" { + t.Errorf("expected gemini-2.0-flash, got %s", cls.Model) + } +} + +func filepathJoin(elems ...string) string { + var path string + for i, e := range elems { + if i == 0 { + path = e + } else { + path = path + string(os.PathSeparator) + e + } + } + return path +} diff --git a/internal/executor/claude.go b/internal/executor/claude.go index aa715da..e504369 100644 --- a/internal/executor/claude.go +++ b/internal/executor/claude.go @@ -61,7 +61,7 @@ func (r *ClaudeRunner) binaryPath() string { // changes back to project_dir. On failure the sandbox is preserved and its // path is included in the error. func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error { - projectDir := t.Claude.ProjectDir + projectDir := t.Agent.ProjectDir // Validate project_dir exists when set. if projectDir != "" { @@ -319,21 +319,21 @@ func (r *ClaudeRunner) buildArgs(t *task.Task, e *storage.Execution, questionFil "--output-format", "stream-json", "--verbose", } - permMode := t.Claude.PermissionMode + permMode := t.Agent.PermissionMode if permMode == "" { permMode = "bypassPermissions" } args = append(args, "--permission-mode", permMode) - if t.Claude.Model != "" { - args = append(args, "--model", t.Claude.Model) + if t.Agent.Model != "" { + args = append(args, "--model", t.Agent.Model) } return args } - instructions := t.Claude.Instructions - allowedTools := t.Claude.AllowedTools + instructions := t.Agent.Instructions + allowedTools := t.Agent.AllowedTools - if !t.Claude.SkipPlanning { + if !t.Agent.SkipPlanning { instructions = withPlanningPreamble(instructions) // Ensure Bash is available so the agent can POST subtasks and ask questions. hasBash := false @@ -355,33 +355,33 @@ func (r *ClaudeRunner) buildArgs(t *task.Task, e *storage.Execution, questionFil "--verbose", } - if t.Claude.Model != "" { - args = append(args, "--model", t.Claude.Model) + if t.Agent.Model != "" { + args = append(args, "--model", t.Agent.Model) } - if t.Claude.MaxBudgetUSD > 0 { - args = append(args, "--max-budget-usd", fmt.Sprintf("%.2f", t.Claude.MaxBudgetUSD)) + if t.Agent.MaxBudgetUSD > 0 { + args = append(args, "--max-budget-usd", fmt.Sprintf("%.2f", t.Agent.MaxBudgetUSD)) } // Default to bypassPermissions — claudomator runs tasks unattended, so // prompting for write access would always stall execution. Tasks that need // a more restrictive mode can set permission_mode explicitly. - permMode := t.Claude.PermissionMode + permMode := t.Agent.PermissionMode if permMode == "" { permMode = "bypassPermissions" } args = append(args, "--permission-mode", permMode) - if t.Claude.SystemPromptAppend != "" { - args = append(args, "--append-system-prompt", t.Claude.SystemPromptAppend) + if t.Agent.SystemPromptAppend != "" { + args = append(args, "--append-system-prompt", t.Agent.SystemPromptAppend) } for _, tool := range allowedTools { args = append(args, "--allowedTools", tool) } - for _, tool := range t.Claude.DisallowedTools { + for _, tool := range t.Agent.DisallowedTools { args = append(args, "--disallowedTools", tool) } - for _, f := range t.Claude.ContextFiles { + for _, f := range t.Agent.ContextFiles { args = append(args, "--add-dir", f) } - args = append(args, t.Claude.AdditionalArgs...) + args = append(args, t.Agent.AdditionalArgs...) return args } diff --git a/internal/executor/claude_test.go b/internal/executor/claude_test.go index 31dcf52..b5380f4 100644 --- a/internal/executor/claude_test.go +++ b/internal/executor/claude_test.go @@ -14,7 +14,8 @@ import ( func TestClaudeRunner_BuildArgs_BasicTask(t *testing.T) { r := &ClaudeRunner{} tk := &task.Task{ - Claude: task.ClaudeConfig{ + Agent: task.AgentConfig{ + Type: "claude", Instructions: "fix the bug", Model: "sonnet", SkipPlanning: true, @@ -37,7 +38,8 @@ func TestClaudeRunner_BuildArgs_BasicTask(t *testing.T) { func TestClaudeRunner_BuildArgs_FullConfig(t *testing.T) { r := &ClaudeRunner{} tk := &task.Task{ - Claude: task.ClaudeConfig{ + Agent: task.AgentConfig{ + Type: "claude", Instructions: "implement feature", Model: "opus", MaxBudgetUSD: 5.0, @@ -79,7 +81,8 @@ func TestClaudeRunner_BuildArgs_FullConfig(t *testing.T) { func TestClaudeRunner_BuildArgs_DefaultsToBypassPermissions(t *testing.T) { r := &ClaudeRunner{} tk := &task.Task{ - Claude: task.ClaudeConfig{ + Agent: task.AgentConfig{ + Type: "claude", Instructions: "do work", SkipPlanning: true, // PermissionMode intentionally not set @@ -102,7 +105,8 @@ func TestClaudeRunner_BuildArgs_DefaultsToBypassPermissions(t *testing.T) { func TestClaudeRunner_BuildArgs_RespectsExplicitPermissionMode(t *testing.T) { r := &ClaudeRunner{} tk := &task.Task{ - Claude: task.ClaudeConfig{ + Agent: task.AgentConfig{ + Type: "claude", Instructions: "do work", PermissionMode: "default", SkipPlanning: true, @@ -125,7 +129,8 @@ func TestClaudeRunner_BuildArgs_RespectsExplicitPermissionMode(t *testing.T) { func TestClaudeRunner_BuildArgs_AlwaysIncludesVerbose(t *testing.T) { r := &ClaudeRunner{} tk := &task.Task{ - Claude: task.ClaudeConfig{ + Agent: task.AgentConfig{ + Type: "claude", Instructions: "do something", SkipPlanning: true, }, @@ -148,7 +153,8 @@ func TestClaudeRunner_BuildArgs_AlwaysIncludesVerbose(t *testing.T) { func TestClaudeRunner_BuildArgs_PreamblePrepended(t *testing.T) { r := &ClaudeRunner{} tk := &task.Task{ - Claude: task.ClaudeConfig{ + Agent: task.AgentConfig{ + Type: "claude", Instructions: "fix the bug", SkipPlanning: false, }, @@ -171,7 +177,8 @@ func TestClaudeRunner_BuildArgs_PreamblePrepended(t *testing.T) { func TestClaudeRunner_BuildArgs_PreambleAddsBash(t *testing.T) { r := &ClaudeRunner{} tk := &task.Task{ - Claude: task.ClaudeConfig{ + Agent: task.AgentConfig{ + Type: "claude", Instructions: "do work", AllowedTools: []string{"Read"}, SkipPlanning: false, @@ -195,7 +202,8 @@ func TestClaudeRunner_BuildArgs_PreambleAddsBash(t *testing.T) { func TestClaudeRunner_BuildArgs_PreambleBashNotDuplicated(t *testing.T) { r := &ClaudeRunner{} tk := &task.Task{ - Claude: task.ClaudeConfig{ + Agent: task.AgentConfig{ + Type: "claude", Instructions: "do work", AllowedTools: []string{"Bash", "Read"}, SkipPlanning: false, @@ -223,7 +231,8 @@ func TestClaudeRunner_Run_InaccessibleWorkingDir_ReturnsError(t *testing.T) { LogDir: t.TempDir(), } tk := &task.Task{ - Claude: task.ClaudeConfig{ + Agent: task.AgentConfig{ + Type: "claude", ProjectDir: "/nonexistent/path/does/not/exist", SkipPlanning: true, }, diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 0245899..d1c8e72 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -36,18 +36,21 @@ type workItem struct { // Pool manages a bounded set of concurrent task workers. type Pool struct { maxConcurrent int - runner Runner + runners map[string]Runner store *storage.DB logger *slog.Logger depPollInterval time.Duration // how often waitForDependencies polls; defaults to 5s - mu sync.Mutex - active int - cancels map[string]context.CancelFunc // taskID → cancel - resultCh chan *Result - workCh chan workItem // internal bounded queue; Submit enqueues here - doneCh chan struct{} // signals when a worker slot is freed - Questions *QuestionRegistry + mu sync.Mutex + active int + activePerAgent map[string]int + rateLimited map[string]time.Time // agentType -> until + cancels map[string]context.CancelFunc // taskID → cancel + resultCh chan *Result + workCh chan workItem // internal bounded queue; Submit enqueues here + doneCh chan struct{} // signals when a worker slot is freed + Questions *QuestionRegistry + Classifier *Classifier } // Result is emitted when a task execution completes. @@ -57,16 +60,18 @@ type Result struct { Err error } -func NewPool(maxConcurrent int, runner Runner, store *storage.DB, logger *slog.Logger) *Pool { +func NewPool(maxConcurrent int, runners map[string]Runner, store *storage.DB, logger *slog.Logger) *Pool { if maxConcurrent < 1 { maxConcurrent = 1 } p := &Pool{ maxConcurrent: maxConcurrent, - runner: runner, + runners: runners, store: store, logger: logger, depPollInterval: 5 * time.Second, + activePerAgent: make(map[string]int), + rateLimited: make(map[string]time.Time), cancels: make(map[string]context.CancelFunc), resultCh: make(chan *Result, maxConcurrent*2), workCh: make(chan workItem, maxConcurrent*10+100), @@ -147,10 +152,32 @@ func (p *Pool) SubmitResume(ctx context.Context, t *task.Task, exec *storage.Exe } } +func (p *Pool) getRunner(t *task.Task) (Runner, error) { + agentType := t.Agent.Type + if agentType == "" { + agentType = "claude" // Default for backward compatibility + } + runner, ok := p.runners[agentType] + if !ok { + return nil, fmt.Errorf("unsupported agent type: %q", agentType) + } + return runner, nil +} + func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Execution) { + agentType := t.Agent.Type + if agentType == "" { + agentType = "claude" + } + + p.mu.Lock() + p.activePerAgent[agentType]++ + p.mu.Unlock() + defer func() { p.mu.Lock() p.active-- + p.activePerAgent[agentType]-- p.mu.Unlock() select { case p.doneCh <- struct{}{}: @@ -158,8 +185,15 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex } }() + runner, err := p.getRunner(t) + if err != nil { + p.logger.Error("failed to get runner for resume", "error", err, "taskID", t.ID) + p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} + return + } + // Pre-populate log paths. - if lp, ok := p.runner.(LogPather); ok { + if lp, ok := runner.(LogPather); ok { if logDir := lp.ExecLogDir(exec.ID); logDir != "" { exec.StdoutPath = filepath.Join(logDir, "stdout.log") exec.StderrPath = filepath.Join(logDir, "stderr.log") @@ -182,12 +216,30 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex } else { ctx, cancel = context.WithCancel(ctx) } - defer cancel() + p.mu.Lock() + p.cancels[t.ID] = cancel + p.mu.Unlock() + defer func() { + cancel() + p.mu.Lock() + delete(p.cancels, t.ID) + p.mu.Unlock() + }() - err := p.runner.Run(ctx, t, exec) + err = runner.Run(ctx, t, exec) exec.EndTime = time.Now().UTC() if err != nil { + if isRateLimitError(err) { + p.mu.Lock() + retryAfter := parseRetryAfter(err.Error()) + if retryAfter == 0 { + retryAfter = 1 * time.Minute + } + p.rateLimited[agentType] = time.Now().Add(retryAfter) + p.mu.Unlock() + } + var blockedErr *BlockedError if errors.As(err, &blockedErr) { exec.Status = "BLOCKED" @@ -234,9 +286,45 @@ func (p *Pool) ActiveCount() int { } func (p *Pool) execute(ctx context.Context, t *task.Task) { + // 1. Classification + if p.Classifier != nil { + p.mu.Lock() + activeTasks := make(map[string]int) + rateLimited := make(map[string]bool) + now := time.Now() + for agent := range p.runners { + activeTasks[agent] = p.activePerAgent[agent] + rateLimited[agent] = now.Before(p.rateLimited[agent]) + } + status := SystemStatus{ + ActiveTasks: activeTasks, + RateLimited: rateLimited, + } + p.mu.Unlock() + + cls, err := p.Classifier.Classify(ctx, t.Name, t.Agent.Instructions, status) + if err == nil { + p.logger.Info("task classified", "taskID", t.ID, "agent", cls.AgentType, "model", cls.Model, "reason", cls.Reason) + t.Agent.Type = cls.AgentType + t.Agent.Model = cls.Model + } else { + p.logger.Error("classification failed", "error", err, "taskID", t.ID) + } + } + + agentType := t.Agent.Type + if agentType == "" { + agentType = "claude" + } + + p.mu.Lock() + p.activePerAgent[agentType]++ + p.mu.Unlock() + defer func() { p.mu.Lock() p.active-- + p.activePerAgent[agentType]-- p.mu.Unlock() select { case p.doneCh <- struct{}{}: @@ -244,6 +332,26 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { } }() + runner, err := p.getRunner(t) + if err != nil { + p.logger.Error("failed to get runner", "error", err, "taskID", t.ID) + now := time.Now().UTC() + exec := &storage.Execution{ + ID: uuid.New().String(), + TaskID: t.ID, + StartTime: now, + EndTime: now, + Status: "FAILED", + ErrorMsg: err.Error(), + } + if createErr := p.store.CreateExecution(exec); createErr != nil { + p.logger.Error("failed to create execution record", "error", createErr) + } + p.store.UpdateTaskState(t.ID, task.StateFailed) + p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} + return + } + // Wait for all dependencies to complete before starting execution. if len(t.DependsOn) > 0 { if err := p.waitForDependencies(ctx, t); err != nil { @@ -275,7 +383,7 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { // Pre-populate log paths so they're available in the DB immediately — // before the subprocess starts — enabling live tailing and debugging. - if lp, ok := p.runner.(LogPather); ok { + if lp, ok := runner.(LogPather); ok { if logDir := lp.ExecLogDir(execID); logDir != "" { exec.StdoutPath = filepath.Join(logDir, "stdout.log") exec.StderrPath = filepath.Join(logDir, "stderr.log") @@ -309,10 +417,20 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { }() // Run the task. - err := p.runner.Run(ctx, t, exec) + err = runner.Run(ctx, t, exec) exec.EndTime = time.Now().UTC() if err != nil { + if isRateLimitError(err) { + p.mu.Lock() + retryAfter := parseRetryAfter(err.Error()) + if retryAfter == 0 { + retryAfter = 1 * time.Minute + } + p.rateLimited[agentType] = time.Now().Add(retryAfter) + p.mu.Unlock() + } + var blockedErr *BlockedError if errors.As(err, &blockedErr) { exec.Status = "BLOCKED" diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go index 414f852..028e5cf 100644 --- a/internal/executor/executor_test.go +++ b/internal/executor/executor_test.go @@ -64,7 +64,7 @@ func makeTask(id string) *task.Task { now := time.Now().UTC() return &task.Task{ ID: id, Name: "Test " + id, - Claude: task.ClaudeConfig{Instructions: "test"}, + Agent: task.AgentConfig{Type: "claude", Instructions: "test"}, Priority: task.PriorityNormal, Retry: task.RetryConfig{MaxAttempts: 1, Backoff: "linear"}, Tags: []string{}, @@ -77,8 +77,9 @@ func makeTask(id string) *task.Task { func TestPool_Submit_TopLevel_GoesToReady(t *testing.T) { store := testStore(t) runner := &mockRunner{} + runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) - pool := NewPool(2, runner, store, logger) + pool := NewPool(2, runners, store, logger) tk := makeTask("ps-1") // no ParentTaskID → top-level store.CreateTask(tk) @@ -104,8 +105,9 @@ func TestPool_Submit_TopLevel_GoesToReady(t *testing.T) { func TestPool_Submit_Subtask_GoesToCompleted(t *testing.T) { store := testStore(t) runner := &mockRunner{} + runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) - pool := NewPool(2, runner, store, logger) + pool := NewPool(2, runners, store, logger) tk := makeTask("sub-1") tk.ParentTaskID = "parent-99" // subtask @@ -132,8 +134,9 @@ func TestPool_Submit_Subtask_GoesToCompleted(t *testing.T) { func TestPool_Submit_Failure(t *testing.T) { store := testStore(t) runner := &mockRunner{err: fmt.Errorf("boom"), exitCode: 1} + runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) - pool := NewPool(2, runner, store, logger) + pool := NewPool(2, runners, store, logger) tk := makeTask("pf-1") store.CreateTask(tk) @@ -151,8 +154,9 @@ func TestPool_Submit_Failure(t *testing.T) { func TestPool_Submit_Timeout(t *testing.T) { store := testStore(t) runner := &mockRunner{delay: 5 * time.Second} + runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) - pool := NewPool(2, runner, store, logger) + pool := NewPool(2, runners, store, logger) tk := makeTask("pt-1") tk.Timeout.Duration = 50 * time.Millisecond @@ -168,8 +172,9 @@ func TestPool_Submit_Timeout(t *testing.T) { func TestPool_Submit_Cancellation(t *testing.T) { store := testStore(t) runner := &mockRunner{delay: 5 * time.Second} + runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) - pool := NewPool(2, runner, store, logger) + pool := NewPool(2, runners, store, logger) ctx, cancel := context.WithCancel(context.Background()) tk := makeTask("pc-1") @@ -188,8 +193,9 @@ func TestPool_Submit_Cancellation(t *testing.T) { func TestPool_Cancel_StopsRunningTask(t *testing.T) { store := testStore(t) runner := &mockRunner{delay: 5 * time.Second} + runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) - pool := NewPool(2, runner, store, logger) + pool := NewPool(2, runners, store, logger) tk := makeTask("cancel-1") store.CreateTask(tk) @@ -209,8 +215,9 @@ func TestPool_Cancel_StopsRunningTask(t *testing.T) { func TestPool_Cancel_UnknownTask_ReturnsFalse(t *testing.T) { store := testStore(t) runner := &mockRunner{} + runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) - pool := NewPool(2, runner, store, logger) + pool := NewPool(2, runners, store, logger) if ok := pool.Cancel("nonexistent"); ok { t.Error("Cancel returned true for unknown task") @@ -222,8 +229,9 @@ func TestPool_Cancel_UnknownTask_ReturnsFalse(t *testing.T) { func TestPool_QueuedWhenAtCapacity(t *testing.T) { store := testStore(t) runner := &mockRunner{delay: 100 * time.Millisecond} + runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) - pool := NewPool(1, runner, store, logger) + pool := NewPool(1, runners, store, logger) tk1 := makeTask("queue-1") store.CreateTask(tk1) @@ -273,8 +281,9 @@ func (m *logPatherMockRunner) Run(ctx context.Context, t *task.Task, e *storage. func TestPool_Execute_LogPathsPreSetBeforeRun(t *testing.T) { store := testStore(t) runner := &logPatherMockRunner{logDir: t.TempDir()} + runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) - pool := NewPool(2, runner, store, logger) + pool := NewPool(2, runners, store, logger) tk := makeTask("lp-1") store.CreateTask(tk) @@ -304,8 +313,9 @@ func TestPool_Execute_LogPathsPreSetBeforeRun(t *testing.T) { func TestPool_Execute_NoLogPather_PathsEmptyBeforeRun(t *testing.T) { store := testStore(t) runner := &mockRunner{} // does NOT implement LogPather + runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) - pool := NewPool(2, runner, store, logger) + pool := NewPool(2, runners, store, logger) tk := makeTask("nolp-1") store.CreateTask(tk) @@ -321,8 +331,9 @@ func TestPool_Execute_NoLogPather_PathsEmptyBeforeRun(t *testing.T) { func TestPool_ConcurrentExecution(t *testing.T) { store := testStore(t) runner := &mockRunner{delay: 50 * time.Millisecond} + runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) - pool := NewPool(3, runner, store, logger) + pool := NewPool(3, runners, store, logger) for i := 0; i < 3; i++ { tk := makeTask(fmt.Sprintf("cc-%d", i)) @@ -343,3 +354,29 @@ func TestPool_ConcurrentExecution(t *testing.T) { t.Errorf("calls: want 3, got %d", runner.callCount()) } } + +func TestPool_UnsupportedAgent(t *testing.T) { + store := testStore(t) + runners := map[string]Runner{"claude": &mockRunner{}} + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + pool := NewPool(2, runners, store, logger) + + tk := makeTask("bad-agent") + tk.Agent.Type = "super-ai" + store.CreateTask(tk) + + if err := pool.Submit(context.Background(), tk); err != nil { + t.Fatalf("submit: %v", err) + } + + result := <-pool.Results() + if result.Err == nil { + t.Fatal("expected error for unsupported agent") + } + if !strings.Contains(result.Err.Error(), "unsupported agent type") { + t.Errorf("expected 'unsupported agent type' in error, got: %v", result.Err) + } + if result.Execution.Status != "FAILED" { + t.Errorf("status: want FAILED, got %q", result.Execution.Status) + } +} diff --git a/internal/executor/gemini.go b/internal/executor/gemini.go new file mode 100644 index 0000000..956d8b5 --- /dev/null +++ b/internal/executor/gemini.go @@ -0,0 +1,192 @@ +package executor + +import ( + "context" + "fmt" + "log/slog" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" + "syscall" + + "github.com/thepeterstone/claudomator/internal/storage" + "github.com/thepeterstone/claudomator/internal/task" +) + +// GeminiRunner spawns the `gemini` CLI in non-interactive mode. +type GeminiRunner struct { + BinaryPath string // defaults to "gemini" + Logger *slog.Logger + LogDir string // base directory for execution logs + APIURL string // base URL of the Claudomator API, passed to subprocesses +} + +// ExecLogDir returns the log directory for the given execution ID. +func (r *GeminiRunner) ExecLogDir(execID string) string { + if r.LogDir == "" { + return "" + } + return filepath.Join(r.LogDir, execID) +} + +func (r *GeminiRunner) binaryPath() string { + if r.BinaryPath != "" { + return r.BinaryPath + } + return "gemini" +} + +// Run executes a gemini <instructions> invocation, streaming output to log files. +func (r *GeminiRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error { + if t.Agent.ProjectDir != "" { + if _, err := os.Stat(t.Agent.ProjectDir); err != nil { + return fmt.Errorf("project_dir %q: %w", t.Agent.ProjectDir, err) + } + } + + logDir := r.ExecLogDir(e.ID) + if logDir == "" { + logDir = e.ID + } + if err := os.MkdirAll(logDir, 0700); err != nil { + return fmt.Errorf("creating log dir: %w", err) + } + if e.StdoutPath == "" { + e.StdoutPath = filepath.Join(logDir, "stdout.log") + e.StderrPath = filepath.Join(logDir, "stderr.log") + e.ArtifactDir = logDir + } + + if e.SessionID == "" { + e.SessionID = e.ID + } + + questionFile := filepath.Join(logDir, "question.json") + args := r.buildArgs(t, e, questionFile) + + // Gemini CLI doesn't necessarily have the same rate limiting behavior as Claude, + // but we'll use a similar execution pattern. + err := r.execOnce(ctx, args, t.Agent.ProjectDir, e) + if err != nil { + return err + } + + // Check whether the agent left a question before exiting. + data, readErr := os.ReadFile(questionFile) + if readErr == nil { + os.Remove(questionFile) // consumed + return &BlockedError{QuestionJSON: strings.TrimSpace(string(data)), SessionID: e.SessionID} + } + return nil +} + +func (r *GeminiRunner) execOnce(ctx context.Context, args []string, workingDir string, e *storage.Execution) error { + cmd := exec.CommandContext(ctx, r.binaryPath(), args...) + cmd.Env = append(os.Environ(), + "CLAUDOMATOR_API_URL="+r.APIURL, + "CLAUDOMATOR_TASK_ID="+e.TaskID, + "CLAUDOMATOR_QUESTION_FILE="+filepath.Join(e.ArtifactDir, "question.json"), + ) + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + if workingDir != "" { + cmd.Dir = workingDir + } + + stdoutFile, err := os.Create(e.StdoutPath) + if err != nil { + return fmt.Errorf("creating stdout log: %w", err) + } + defer stdoutFile.Close() + + stderrFile, err := os.Create(e.StderrPath) + if err != nil { + return fmt.Errorf("creating stderr log: %w", err) + } + defer stderrFile.Close() + + stdoutR, stdoutW, err := os.Pipe() + if err != nil { + return fmt.Errorf("creating stdout pipe: %w", err) + } + cmd.Stdout = stdoutW + cmd.Stderr = stderrFile + + if err := cmd.Start(); err != nil { + stdoutW.Close() + stdoutR.Close() + return fmt.Errorf("starting gemini: %w", err) + } + stdoutW.Close() + + killDone := make(chan struct{}) + go func() { + select { + case <-ctx.Done(): + syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) + case <-killDone: + } + }() + + var costUSD float64 + var streamErr error + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + // Reusing parseStream as the JSONL format should be compatible + costUSD, streamErr = parseStream(stdoutR, stdoutFile, r.Logger) + stdoutR.Close() + }() + + waitErr := cmd.Wait() + close(killDone) + wg.Wait() + + e.CostUSD = costUSD + + if waitErr != nil { + if exitErr, ok := waitErr.(*exec.ExitError); ok { + e.ExitCode = exitErr.ExitCode() + } + return fmt.Errorf("gemini exited with error: %w", waitErr) + } + + e.ExitCode = 0 + if streamErr != nil { + return streamErr + } + return nil +} + +func (r *GeminiRunner) buildArgs(t *task.Task, e *storage.Execution, questionFile string) []string { + // Gemini CLI uses a different command structure: gemini "instructions" [flags] + + instructions := t.Agent.Instructions + if !t.Agent.SkipPlanning { + instructions = withPlanningPreamble(instructions) + } + + args := []string{ + instructions, + "--output-format", "stream-json", + } + + // Note: Gemini CLI flags might differ from Claude CLI. + // Assuming common flags for now, but these may need adjustment. + if t.Agent.Model != "" { + args = append(args, "--model", t.Agent.Model) + } + + // Gemini CLI doesn't use --session-id for the first run in the same way, + // or it might use it differently. For now we assume compatibility. + if e.SessionID != "" { + // If it's a resume, it might use different flags. + if e.ResumeSessionID != "" { + // This is a placeholder for Gemini's resume logic + } + } + + return args +} diff --git a/internal/executor/gemini_test.go b/internal/executor/gemini_test.go new file mode 100644 index 0000000..42253da --- /dev/null +++ b/internal/executor/gemini_test.go @@ -0,0 +1,103 @@ +package executor + +import ( + "context" + "io" + "log/slog" + "strings" + "testing" + + "github.com/thepeterstone/claudomator/internal/storage" + "github.com/thepeterstone/claudomator/internal/task" +) + +func TestGeminiRunner_BuildArgs_BasicTask(t *testing.T) { + r := &GeminiRunner{} + tk := &task.Task{ + Agent: task.AgentConfig{ + Type: "gemini", + Instructions: "fix the bug", + Model: "gemini-2.0-flash", + SkipPlanning: true, + }, + } + + args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") + + // Gemini CLI: instructions is the first positional arg + if len(args) < 1 || args[0] != "fix the bug" { + t.Errorf("expected instructions as first arg, got: %v", args) + } + + argMap := make(map[string]bool) + for _, a := range args { + argMap[a] = true + } + for _, want := range []string{"--output-format", "stream-json", "--model", "gemini-2.0-flash"} { + if !argMap[want] { + t.Errorf("missing arg %q in %v", want, args) + } + } +} + +func TestGeminiRunner_BuildArgs_PreamblePrepended(t *testing.T) { + r := &GeminiRunner{} + tk := &task.Task{ + Agent: task.AgentConfig{ + Type: "gemini", + Instructions: "fix the bug", + SkipPlanning: false, + }, + } + + args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") + + if len(args) < 1 { + t.Fatalf("expected at least 1 arg, got: %v", args) + } + if !strings.HasPrefix(args[0], planningPreamble) { + t.Errorf("instructions should start with planning preamble") + } + if !strings.HasSuffix(args[0], "fix the bug") { + t.Errorf("instructions should end with original instructions") + } +} + +func TestGeminiRunner_Run_InaccessibleProjectDir_ReturnsError(t *testing.T) { + r := &GeminiRunner{ + BinaryPath: "true", // would succeed if it ran + Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + LogDir: t.TempDir(), + } + tk := &task.Task{ + Agent: task.AgentConfig{ + Type: "gemini", + ProjectDir: "/nonexistent/path/does/not/exist", + SkipPlanning: true, + }, + } + exec := &storage.Execution{ID: "test-exec"} + + err := r.Run(context.Background(), tk, exec) + + if err == nil { + t.Fatal("expected error for inaccessible project_dir, got nil") + } + if !strings.Contains(err.Error(), "project_dir") { + t.Errorf("expected 'project_dir' in error, got: %v", err) + } +} + +func TestGeminiRunner_BinaryPath_Default(t *testing.T) { + r := &GeminiRunner{} + if r.binaryPath() != "gemini" { + t.Errorf("want 'gemini', got %q", r.binaryPath()) + } +} + +func TestGeminiRunner_BinaryPath_Custom(t *testing.T) { + r := &GeminiRunner{BinaryPath: "/usr/local/bin/gemini"} + if r.binaryPath() != "/usr/local/bin/gemini" { + t.Errorf("want custom path, got %q", r.binaryPath()) + } +} diff --git a/internal/executor/preamble.go b/internal/executor/preamble.go index 71f8233..b20f7ea 100644 --- a/internal/executor/preamble.go +++ b/internal/executor/preamble.go @@ -23,11 +23,12 @@ Before doing any implementation work: 1. Estimate: will this task take more than 3 minutes of implementation effort? -2. If YES — break it down into subtasks: - - Create 3–7 discrete subtasks using the claudomator CLI, for example: - claudomator create "Subtask name" --instructions "..." --working-dir "/path" --parent-id "$CLAUDOMATOR_TASK_ID" --server "$CLAUDOMATOR_API_URL" - - Do NOT pass --start. Tasks will be queued and started in order by the operator. +2. If YES — break it down: + - Create 3–7 discrete subtasks by POSTing to $CLAUDOMATOR_API_URL/api/tasks + - Each subtask POST body should be JSON with: name, agent.instructions, agent.working_dir (copy from current task), agent.model, agent.allowed_tools, and agent.skip_planning set to true + - Set parent_task_id to $CLAUDOMATOR_TASK_ID in each POST body - After creating all subtasks, output a brief summary and STOP. Do not implement anything. + - You can also specify agent.type (either "claude" or "gemini") to choose the agent for subtasks. 3. If NO — proceed with the task instructions below. |
