diff options
| author | Peter Stone <thepeterstone@gmail.com> | 2026-05-13 04:02:20 +0000 |
|---|---|---|
| committer | Peter Stone <thepeterstone@gmail.com> | 2026-05-13 04:02:20 +0000 |
| commit | 68399a598924775a3ec22a39c2336ae497fb07f3 (patch) | |
| tree | 29ade8224eb51eca47a1d9d03bb4d0d3653a72aa /internal/executor/gemini.go | |
| parent | f01231cc45f41ce2dc37072e77428e467ef3fc15 (diff) | |
| parent | d970c0730ff0dc7d714d3261197d8ba52b5d21f4 (diff) | |
Merges 12 commits from github/main (formerly master) that were developed
independently. Key additions:
- LocalRunner: OpenAI-compatible local LLM execution (Ollama, LM Studio)
- Real GeminiRunner with full sandbox parity to ClaudeRunner
- llm.Client for enriching CI failures and elaboration via local model
- retry.ParseRetryAfter moved to shared package
- tokens_in/tokens_out columns in executions table
Conflict resolutions:
- Kept local main's VAPID/push, stories, projects, agent events schema
- Merged both sets of Config fields (local + LocalModel from github/main)
- Unified activePerAgent accounting (decActiveAgent helper)
- Removed duplicate helpers from claude.go (now in helpers.go)
- Fixed double-decrement bug in handleRunResult vs decActiveAgent
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/executor/gemini.go')
| -rw-r--r-- | internal/executor/gemini.go | 346 |
1 files changed, 346 insertions, 0 deletions
diff --git a/internal/executor/gemini.go b/internal/executor/gemini.go new file mode 100644 index 0000000..3abec05 --- /dev/null +++ b/internal/executor/gemini.go @@ -0,0 +1,346 @@ +package executor + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log/slog" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" + "syscall" + + "github.com/thepeterstone/claudomator/internal/storage" + "github.com/thepeterstone/claudomator/internal/task" +) + +// GeminiRunner spawns the `gemini` CLI in non-interactive mode. +type GeminiRunner struct { + BinaryPath string // defaults to "gemini" + Logger *slog.Logger + LogDir string // base directory for execution logs + APIURL string // base URL of the Claudomator API, passed to subprocesses +} + +// ExecLogDir returns the log directory for the given execution ID. +func (r *GeminiRunner) ExecLogDir(execID string) string { + if r.LogDir == "" { + return "" + } + return filepath.Join(r.LogDir, execID) +} + +func (r *GeminiRunner) binaryPath() string { + if r.BinaryPath != "" { + return r.BinaryPath + } + return "gemini" +} + +// Run executes the gemini CLI inside a sandboxed clone of project_dir. +// When project_dir is set, claudomator first clones it into a temp sandbox +// (preferring a `local` bare remote, then `origin`, then the working tree) +// and runs the agent there. On success the sandbox is autocommitted and +// pushed back to origin/master, then removed. On failure the sandbox is +// preserved and its path is included in the returned error so the user can +// inspect partial work. If the agent writes a question file before exiting, +// Run returns *BlockedError with SandboxDir populated so a resume execution +// can pick up in the same directory. +func (r *GeminiRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error { + projectDir := t.Agent.ProjectDir + + if projectDir != "" { + if _, err := os.Stat(projectDir); err != nil { + return fmt.Errorf("project_dir %q: %w", projectDir, err) + } + } + + logDir := r.ExecLogDir(e.ID) + if logDir == "" { + logDir = e.ID + } + if err := os.MkdirAll(logDir, 0700); err != nil { + return fmt.Errorf("creating log dir: %w", err) + } + + if e.StdoutPath == "" { + e.StdoutPath = filepath.Join(logDir, "stdout.log") + e.StderrPath = filepath.Join(logDir, "stderr.log") + e.ArtifactDir = logDir + } + + if e.SessionID == "" { + if e.ResumeSessionID != "" { + e.SessionID = e.ResumeSessionID + } else { + e.SessionID = e.ID + } + } + + // Sandbox setup: for new executions with a project_dir, clone into a sandbox. + // Resume executions reuse the preserved sandbox so any partial work survives. + // If the preserved sandbox is missing (e.g. /tmp was purged), clone fresh. + var sandboxDir string + var startHEAD string + effectiveWorkingDir := projectDir + if e.ResumeSessionID != "" { + if e.SandboxDir != "" { + if _, statErr := os.Stat(e.SandboxDir); statErr == nil { + effectiveWorkingDir = e.SandboxDir + } else { + r.Logger.Warn("preserved sandbox missing, cloning fresh", "sandbox", e.SandboxDir, "project_dir", projectDir) + e.SandboxDir = "" + if projectDir != "" { + var err error + sandboxDir, err = setupSandbox(projectDir, r.Logger) + if err != nil { + return fmt.Errorf("setting up sandbox: %w", err) + } + effectiveWorkingDir = sandboxDir + r.Logger.Info("fresh sandbox created for resume", "sandbox", sandboxDir, "project_dir", projectDir) + } + } + } + } else if projectDir != "" { + var err error + sandboxDir, err = setupSandbox(projectDir, r.Logger) + if err != nil { + return fmt.Errorf("setting up sandbox: %w", err) + } + effectiveWorkingDir = sandboxDir + r.Logger.Info("sandbox created", "sandbox", sandboxDir, "project_dir", projectDir) + } + + if effectiveWorkingDir != "" { + headOut, _ := exec.Command("git", gitSafe("-C", effectiveWorkingDir, "rev-parse", "HEAD")...).Output() + startHEAD = strings.TrimSpace(string(headOut)) + } + + questionFile := filepath.Join(logDir, "question.json") + args := r.buildArgs(t, e, questionFile) + + if err := r.execOnce(ctx, args, effectiveWorkingDir, projectDir, e); err != nil { + if sandboxDir != "" { + return fmt.Errorf("%w (sandbox preserved at %s)", err, sandboxDir) + } + return err + } + + // Check whether the agent left a question before exiting. + data, readErr := os.ReadFile(questionFile) + if readErr == nil { + os.Remove(questionFile) + questionJSON := strings.TrimSpace(string(data)) + if isCompletionReport(questionJSON) { + r.Logger.Info("treating question file as completion report", "taskID", e.TaskID) + e.Summary = extractQuestionText(questionJSON) + } else { + // Preserve sandbox on BLOCKED so a resume can pick up in the same dir. + return &BlockedError{QuestionJSON: questionJSON, SessionID: e.SessionID, SandboxDir: sandboxDir} + } + } + + // Read agent summary if written. + summaryFile := filepath.Join(logDir, "summary.txt") + if summaryData, readErr := os.ReadFile(summaryFile); readErr == nil { + os.Remove(summaryFile) + e.Summary = strings.TrimSpace(string(summaryData)) + } + + // Merge sandbox back to project_dir and clean up. + if sandboxDir != "" { + if mergeErr := teardownSandbox(projectDir, sandboxDir, startHEAD, r.Logger, e); mergeErr != nil { + return fmt.Errorf("sandbox teardown: %w (sandbox preserved at %s)", mergeErr, sandboxDir) + } + } + return nil +} + +func (r *GeminiRunner) execOnce(ctx context.Context, args []string, workingDir, projectDir string, e *storage.Execution) error { + cmd := exec.CommandContext(ctx, r.binaryPath(), args...) + cmd.Env = append(os.Environ(), + "CLAUDOMATOR_API_URL="+r.APIURL, + "CLAUDOMATOR_TASK_ID="+e.TaskID, + "CLAUDOMATOR_PROJECT_DIR="+projectDir, + "CLAUDOMATOR_QUESTION_FILE="+filepath.Join(e.ArtifactDir, "question.json"), + "CLAUDOMATOR_SUMMARY_FILE="+filepath.Join(e.ArtifactDir, "summary.txt"), + ) + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + if workingDir != "" { + cmd.Dir = workingDir + } + + stdoutFile, err := os.Create(e.StdoutPath) + if err != nil { + return fmt.Errorf("creating stdout log: %w", err) + } + defer stdoutFile.Close() + + stderrFile, err := os.Create(e.StderrPath) + if err != nil { + return fmt.Errorf("creating stderr log: %w", err) + } + defer stderrFile.Close() + + stdoutR, stdoutW, err := os.Pipe() + if err != nil { + return fmt.Errorf("creating stdout pipe: %w", err) + } + cmd.Stdout = stdoutW + cmd.Stderr = stderrFile + + if err := cmd.Start(); err != nil { + stdoutW.Close() + stdoutR.Close() + return fmt.Errorf("starting gemini: %w", err) + } + stdoutW.Close() + + killDone := make(chan struct{}) + go func() { + select { + case <-ctx.Done(): + syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) + case <-killDone: + } + }() + + var streamCost float64 + var streamErr error + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + streamCost, streamErr = parseGeminiStream(stdoutR, stdoutFile, r.Logger) + stdoutR.Close() + }() + + waitErr := cmd.Wait() + close(killDone) + wg.Wait() + + if streamCost > 0 { + e.CostUSD = streamCost + } + + if waitErr != nil { + if exitErr, ok := waitErr.(*exec.ExitError); ok { + e.ExitCode = exitErr.ExitCode() + } + if streamErr != nil { + return streamErr + } + if tail := tailFile(e.StderrPath, 20); tail != "" { + return fmt.Errorf("gemini exited with error: %w\nstderr:\n%s", waitErr, tail) + } + return fmt.Errorf("gemini exited with error: %w", waitErr) + } + + if streamErr != nil { + return streamErr + } + return nil +} + +// parseGeminiStream reads streaming JSON from the gemini CLI, strips markdown +// code fences if the output is wrapped in them, writes the inner stream-json +// to w, and returns (costUSD, error). If a `result` event has `is_error: true`, +// an error wrapping the result message is returned. +func parseGeminiStream(r io.Reader, w io.Writer, logger *slog.Logger) (float64, error) { + fullOutput, err := io.ReadAll(r) + if err != nil { + return 0, fmt.Errorf("reading full gemini output: %w", err) + } + logger.Debug("parseGeminiStream: raw output received", "output", string(fullOutput)) + + inner := stripGeminiFences(string(fullOutput), logger) + if _, writeErr := w.Write([]byte(inner)); writeErr != nil { + return 0, fmt.Errorf("writing gemini output: %w", writeErr) + } + + // Walk lines looking for a result event so we can surface errors and cost. + var ( + cost float64 + errMsg string + isError bool + ) + for _, raw := range strings.Split(inner, "\n") { + line := strings.TrimSpace(raw) + if line == "" { + continue + } + var evt struct { + Type string `json:"type"` + IsError bool `json:"is_error"` + Result string `json:"result"` + Cost float64 `json:"total_cost_usd"` + } + if err := json.Unmarshal([]byte(line), &evt); err != nil { + continue + } + if evt.Type == "result" { + if evt.Cost > 0 { + cost = evt.Cost + } + if evt.IsError { + isError = true + errMsg = evt.Result + } + } + } + if isError { + return cost, fmt.Errorf("gemini reported error: %s", errMsg) + } + return cost, nil +} + +// stripGeminiFences removes a surrounding ```json ... ``` markdown block if +// present, returning the trimmed inner content. If no markdown fence is +// found, the input is returned verbatim (no whitespace trimming) so callers +// that expect byte-exact pass-through behavior get it. +func stripGeminiFences(raw string, logger *slog.Logger) string { + trimmed := strings.TrimSpace(raw) + if start := strings.Index(trimmed, "```json"); start != -1 { + if end := strings.LastIndex(trimmed, "```"); end > start { + return strings.TrimSpace(trimmed[start+len("```json") : end]) + } + logger.Warn("malformed gemini markdown block (missing closing fence); using raw output", "len", len(trimmed)) + return trimmed + } + return raw +} + +func (r *GeminiRunner) buildArgs(t *task.Task, e *storage.Execution, questionFile string) []string { + // Gemini CLI uses a different command structure: gemini "instructions" [flags] + + instructions := t.Agent.Instructions + if !t.Agent.SkipPlanning { + instructions = withPlanningPreamble(instructions) + } + + args := []string{ + "-p", instructions, + "--output-format", "stream-json", + "--yolo", // auto-approve all tools (equivalent to Claude's bypassPermissions) + } + + // Note: Gemini CLI flags might differ from Claude CLI. + // Assuming common flags for now, but these may need adjustment. + if t.Agent.Model != "" { + args = append(args, "--model", t.Agent.Model) + } + + // Gemini CLI doesn't use --session-id for the first run in the same way, + // or it might use it differently. For now we assume compatibility. + if e.SessionID != "" { + // If it's a resume, it might use different flags. + if e.ResumeSessionID != "" { + // This is a placeholder for Gemini's resume logic + } + } + + return args +} |
