From 5814e7d6bdec659bb8ca10cc18447a821c59ad4c Mon Sep 17 00:00:00 2001 From: Peter Stone Date: Wed, 18 Mar 2026 00:52:49 +0000 Subject: fix: comprehensive addressing of container execution review feedback - Fix Critical Bug 1: Only remove workspace on success, preserve on failure/BLOCKED. - Fix Critical Bug 2: Use correct Claude flag (--resume) and pass instructions via file. - Fix Critical Bug 3: Actually mount and use the instructions file in the container. - Address Design Issue 4: Implement Resume/BLOCKED detection and host-side workspace re-use. - Address Design Issue 5: Consolidate RepositoryURL to Task level and fix API fallback. - Address Design Issue 6: Make agent images configurable per runner type via CLI flags. - Address Design Issue 7: Secure API keys via .claudomator-env file and --env-file flag. - Address Code Quality 8: Add unit tests for ContainerRunner arg construction. - Address Code Quality 9: Fix indentation regression in app.js. - Address Code Quality 10: Clean up orphaned Claude/Gemini runner files and move helpers. - Fix tests: Update server_test.go and executor_test.go to work with new model. --- internal/executor/claude.go | 714 ----------------------------- internal/executor/claude_test.go | 882 ------------------------------------ internal/executor/container.go | 172 +++++-- internal/executor/container_test.go | 65 +++ internal/executor/executor_test.go | 11 +- internal/executor/gemini.go | 228 ---------- internal/executor/gemini_test.go | 179 -------- internal/executor/helpers.go | 165 +++++++ 8 files changed, 380 insertions(+), 2036 deletions(-) delete mode 100644 internal/executor/claude.go delete mode 100644 internal/executor/claude_test.go create mode 100644 internal/executor/container_test.go delete mode 100644 internal/executor/gemini.go delete mode 100644 internal/executor/gemini_test.go create mode 100644 internal/executor/helpers.go (limited to 'internal/executor') diff --git a/internal/executor/claude.go b/internal/executor/claude.go deleted file mode 100644 index 6346aa8..0000000 --- a/internal/executor/claude.go +++ /dev/null @@ -1,714 +0,0 @@ -package executor - -import ( - "bufio" - "context" - "encoding/json" - "fmt" - "io" - "log/slog" - "os" - "os/exec" - "path/filepath" - "strings" - "sync" - "syscall" - "time" - - "github.com/thepeterstone/claudomator/internal/storage" - "github.com/thepeterstone/claudomator/internal/task" -) - -// ClaudeRunner spawns the `claude` CLI in non-interactive mode. -type ClaudeRunner struct { - BinaryPath string // defaults to "claude" - Logger *slog.Logger - LogDir string // base directory for execution logs - APIURL string // base URL of the Claudomator API, passed to subprocesses - DropsDir string // path to the drops directory, passed to subprocesses -} - -// BlockedError is returned by Run when the agent wrote a question file and exited. -// The pool transitions the task to BLOCKED and stores the question for the user. -type BlockedError struct { - QuestionJSON string // raw JSON from the question file - SessionID string // claude session to resume once the user answers - SandboxDir string // preserved sandbox path; resume must run here so Claude finds its session files -} - -func (e *BlockedError) Error() string { return fmt.Sprintf("task blocked: %s", e.QuestionJSON) } - -// ExecLogDir returns the log directory for the given execution ID. -// Implements LogPather so the pool can persist paths before execution starts. -func (r *ClaudeRunner) ExecLogDir(execID string) string { - if r.LogDir == "" { - return "" - } - return filepath.Join(r.LogDir, execID) -} - -func (r *ClaudeRunner) binaryPath() string { - if r.BinaryPath != "" { - return r.BinaryPath - } - return "claude" -} - -// Run executes a claude -p invocation, streaming output to log files. -// It retries up to 3 times on rate-limit errors using exponential backoff. -// If the agent writes a question file and exits, Run returns *BlockedError. -// -// When project_dir is set and this is not a resume execution, Run clones the -// project into a temp sandbox, runs the agent there, then merges committed -// changes back to project_dir. On failure the sandbox is preserved and its -// path is included in the error. -func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error { - projectDir := t.Agent.ProjectDir - - // Validate project_dir exists when set. - if projectDir != "" { - if _, err := os.Stat(projectDir); err != nil { - return fmt.Errorf("project_dir %q: %w", projectDir, err) - } - } - - // Setup log directory once; retries overwrite the log files. - logDir := r.ExecLogDir(e.ID) - if logDir == "" { - logDir = e.ID // fallback for tests without LogDir set - } - if err := os.MkdirAll(logDir, 0700); err != nil { - return fmt.Errorf("creating log dir: %w", err) - } - if e.StdoutPath == "" { - e.StdoutPath = filepath.Join(logDir, "stdout.log") - e.StderrPath = filepath.Join(logDir, "stderr.log") - e.ArtifactDir = logDir - } - - // Pre-assign session ID so we can resume after a BLOCKED state. - // For resume executions, the claude session continues under the original - // session ID (the one passed to --resume). Using the new exec's own UUID - // would cause a second block-and-resume cycle to pass the wrong --resume - // argument. - if e.SessionID == "" { - if e.ResumeSessionID != "" { - e.SessionID = e.ResumeSessionID - } else { - e.SessionID = e.ID // reuse execution UUID as session UUID (both are UUIDs) - } - } - - // For new (non-resume) executions with a project_dir, clone into a sandbox. - // Resume executions run in the preserved sandbox (e.SandboxDir) so Claude - // finds its session files under the same project slug. If no sandbox was - // preserved (e.g. task had no project_dir), fall back to project_dir. - var sandboxDir string - var startHEAD string - effectiveWorkingDir := projectDir - if e.ResumeSessionID != "" { - if e.SandboxDir != "" { - if _, statErr := os.Stat(e.SandboxDir); statErr == nil { - effectiveWorkingDir = e.SandboxDir - } else { - // Preserved sandbox was cleaned up (e.g. /tmp purge after reboot). - // Clone a fresh sandbox so the task can run rather than fail immediately. - r.Logger.Warn("preserved sandbox missing, cloning fresh", "sandbox", e.SandboxDir, "project_dir", projectDir) - e.SandboxDir = "" - if projectDir != "" { - var err error - sandboxDir, err = setupSandbox(t.Agent.ProjectDir, r.Logger) - if err != nil { - return fmt.Errorf("setting up sandbox: %w", err) - } - - effectiveWorkingDir = sandboxDir - r.Logger.Info("fresh sandbox created for resume", "sandbox", sandboxDir, "project_dir", projectDir) - } - } - } - } else if projectDir != "" { - var err error - sandboxDir, err = setupSandbox(t.Agent.ProjectDir, r.Logger) - if err != nil { - return fmt.Errorf("setting up sandbox: %w", err) - } - - effectiveWorkingDir = sandboxDir - r.Logger.Info("sandbox created", "sandbox", sandboxDir, "project_dir", projectDir) - } - - if effectiveWorkingDir != "" { - // Capture the initial HEAD so we can identify new commits later. - headOut, _ := exec.Command("git", gitSafe("-C", effectiveWorkingDir, "rev-parse", "HEAD")...).Output() - startHEAD = strings.TrimSpace(string(headOut)) - } - - questionFile := filepath.Join(logDir, "question.json") - args := r.buildArgs(t, e, questionFile) - - attempt := 0 - err := runWithBackoff(ctx, 3, 5*time.Second, func() error { - if attempt > 0 { - delay := 5 * time.Second * (1 << (attempt - 1)) - r.Logger.Warn("rate-limited by Claude API, retrying", - "attempt", attempt, - "delay", delay, - ) - } - attempt++ - return r.execOnce(ctx, args, effectiveWorkingDir, projectDir, e) - }) - if err != nil { - if sandboxDir != "" { - return fmt.Errorf("%w (sandbox preserved at %s)", err, sandboxDir) - } - return err - } - - // Check whether the agent left a question before exiting. - data, readErr := os.ReadFile(questionFile) - if readErr == nil { - os.Remove(questionFile) // consumed - questionJSON := strings.TrimSpace(string(data)) - // If the agent wrote a completion report instead of a real question, - // extract the text as the summary and fall through to normal completion. - if isCompletionReport(questionJSON) { - r.Logger.Info("treating question file as completion report", "taskID", e.TaskID) - e.Summary = extractQuestionText(questionJSON) - } else { - // Preserve sandbox on BLOCKED — agent may have partial work and its - // Claude session files are stored under the sandbox's project slug. - // The resume execution must run in the same directory. - return &BlockedError{QuestionJSON: questionJSON, SessionID: e.SessionID, SandboxDir: sandboxDir} - } - } - - // Read agent summary if written. - summaryFile := filepath.Join(logDir, "summary.txt") - if summaryData, readErr := os.ReadFile(summaryFile); readErr == nil { - os.Remove(summaryFile) // consumed - e.Summary = strings.TrimSpace(string(summaryData)) - } - - // Merge sandbox back to project_dir and clean up. - if sandboxDir != "" { - if mergeErr := teardownSandbox(projectDir, sandboxDir, startHEAD, r.Logger, e); mergeErr != nil { - return fmt.Errorf("sandbox teardown: %w (sandbox preserved at %s)", mergeErr, sandboxDir) - } - } - return nil -} - -// isCompletionReport returns true when a question-file JSON looks like a -// completion report rather than a real user question. Heuristic: no options -// (or empty options) and no "?" anywhere in the text. -func isCompletionReport(questionJSON string) bool { - var q struct { - Text string `json:"text"` - Options []string `json:"options"` - } - if err := json.Unmarshal([]byte(questionJSON), &q); err != nil { - return false - } - return len(q.Options) == 0 && !strings.Contains(q.Text, "?") -} - -// extractQuestionText returns the "text" field from a question-file JSON, or -// the raw string if parsing fails. -func extractQuestionText(questionJSON string) string { - var q struct { - Text string `json:"text"` - } - if err := json.Unmarshal([]byte(questionJSON), &q); err != nil { - return questionJSON - } - return strings.TrimSpace(q.Text) -} - -// gitSafe returns git arguments that prepend "-c safe.directory=*" so that -// commands succeed regardless of the repository owner. This is needed when -// claudomator operates on project directories owned by a different OS user. -func gitSafe(args ...string) []string { - return append([]string{"-c", "safe.directory=*"}, args...) -} - -// sandboxCloneSource returns the URL to clone the sandbox from. It prefers a -// remote named "local" (a local bare repo that accepts pushes cleanly), then -// falls back to "origin", then to the working copy path itself. -func sandboxCloneSource(projectDir string) string { - // Prefer "local" remote, but only if it points to a local path (accepts pushes). - if out, err := exec.Command("git", gitSafe("-C", projectDir, "remote", "get-url", "local")...).Output(); err == nil { - u := strings.TrimSpace(string(out)) - if u != "" && (strings.HasPrefix(u, "/") || strings.HasPrefix(u, "file://")) { - return u - } - } - // Fall back to "origin" — any URL scheme is acceptable for cloning. - if out, err := exec.Command("git", gitSafe("-C", projectDir, "remote", "get-url", "origin")...).Output(); err == nil { - if u := strings.TrimSpace(string(out)); u != "" { - return u - } - } - return projectDir -} - -// setupSandbox prepares a temporary git clone of projectDir. -// If projectDir is not a git repo it is initialised with an initial commit first. -func setupSandbox(projectDir string, logger *slog.Logger) (string, error) { - // Ensure projectDir is a git repo; initialise if not. - if err := exec.Command("git", gitSafe("-C", projectDir, "rev-parse", "--git-dir")...).Run(); err != nil { - cmds := [][]string{ - gitSafe("-C", projectDir, "init"), - gitSafe("-C", projectDir, "add", "-A"), - gitSafe("-C", projectDir, "commit", "--allow-empty", "-m", "chore: initial commit"), - } - for _, args := range cmds { - if out, err := exec.Command("git", args...).CombinedOutput(); err != nil { //nolint:gosec - return "", fmt.Errorf("git init %s: %w\n%s", projectDir, err, out) - } - } - } - - src := sandboxCloneSource(projectDir) - - tempDir, err := os.MkdirTemp("", "claudomator-sandbox-*") - if err != nil { - return "", fmt.Errorf("creating sandbox dir: %w", err) - } - // git clone requires the target to not exist; remove the placeholder first. - if err := os.Remove(tempDir); err != nil { - return "", fmt.Errorf("removing temp dir placeholder: %w", err) - } - out, err := exec.Command("git", gitSafe("clone", "--no-hardlinks", src, tempDir)...).CombinedOutput() - if err != nil { - return "", fmt.Errorf("git clone: %w\n%s", err, out) - } - return tempDir, nil -} - -// teardownSandbox verifies the sandbox is clean and pushes new commits to the -// canonical bare repo. If the push is rejected because another task pushed -// concurrently, it fetches and rebases then retries once. -// -// The working copy (projectDir) is NOT updated automatically — it is the -// developer's workspace and is pulled manually. This avoids permission errors -// from mixed-owner .git/objects directories. -func teardownSandbox(projectDir, sandboxDir, startHEAD string, logger *slog.Logger, execRecord *storage.Execution) error { - // Automatically commit uncommitted changes. - out, err := exec.Command("git", "-C", sandboxDir, "status", "--porcelain").Output() - if err != nil { - return fmt.Errorf("git status: %w", err) - } - if len(strings.TrimSpace(string(out))) > 0 { - logger.Info("autocommitting uncommitted changes", "sandbox", sandboxDir) - - // Run build before autocommitting. - if _, err := os.Stat(filepath.Join(sandboxDir, "Makefile")); err == nil { - logger.Info("running 'make build' before autocommit", "sandbox", sandboxDir) - if buildOut, buildErr := exec.Command("make", "-C", sandboxDir, "build").CombinedOutput(); buildErr != nil { - return fmt.Errorf("build failed before autocommit: %w\n%s", buildErr, buildOut) - } - } else if _, err := os.Stat(filepath.Join(sandboxDir, "gradlew")); err == nil { - logger.Info("running './gradlew build' before autocommit", "sandbox", sandboxDir) - cmd := exec.Command("./gradlew", "build") - cmd.Dir = sandboxDir - if buildOut, buildErr := cmd.CombinedOutput(); buildErr != nil { - return fmt.Errorf("build failed before autocommit: %w\n%s", buildErr, buildOut) - } - } else if _, err := os.Stat(filepath.Join(sandboxDir, "go.mod")); err == nil { - logger.Info("running 'go build ./...' before autocommit", "sandbox", sandboxDir) - cmd := exec.Command("go", "build", "./...") - cmd.Dir = sandboxDir - if buildOut, buildErr := cmd.CombinedOutput(); buildErr != nil { - return fmt.Errorf("build failed before autocommit: %w\n%s", buildErr, buildOut) - } - } - - cmds := [][]string{ - gitSafe("-C", sandboxDir, "add", "-A"), - gitSafe("-C", sandboxDir, "commit", "-m", "chore: autocommit uncommitted changes"), - } - for _, args := range cmds { - if out, err := exec.Command("git", args...).CombinedOutput(); err != nil { - return fmt.Errorf("autocommit failed (%v): %w\n%s", args, err, out) - } - } - } - - // Capture commits before pushing/deleting. - // Use startHEAD..HEAD to find all commits made during this execution. - logRange := "origin/HEAD..HEAD" - if startHEAD != "" && startHEAD != "HEAD" { - logRange = startHEAD + "..HEAD" - } - - logCmd := exec.Command("git", gitSafe("-C", sandboxDir, "log", logRange, "--pretty=format:%H|%s")...) - logOut, logErr := logCmd.CombinedOutput() - if logErr == nil { - lines := strings.Split(strings.TrimSpace(string(logOut)), "\n") - logger.Debug("captured commits", "count", len(lines), "range", logRange) - for _, line := range lines { - if line == "" { - continue - } - parts := strings.SplitN(line, "|", 2) - if len(parts) == 2 { - execRecord.Commits = append(execRecord.Commits, task.GitCommit{ - Hash: parts[0], - Message: parts[1], - }) - } - } - } else { - logger.Warn("failed to capture commits", "err", logErr, "range", logRange, "output", string(logOut)) - } - - // Check whether there are any new commits to push. - ahead, err := exec.Command("git", gitSafe("-C", sandboxDir, "rev-list", "--count", logRange)...).Output() - if err != nil { - logger.Warn("could not determine commits ahead of origin; proceeding", "err", err, "range", logRange) - } - if strings.TrimSpace(string(ahead)) == "0" { - os.RemoveAll(sandboxDir) - return nil - } - - // Push from sandbox → bare repo (sandbox's origin is the bare repo). - if out, err := exec.Command("git", "-C", sandboxDir, "push", "origin", "HEAD").CombinedOutput(); err != nil { - // If rejected due to concurrent push, fetch+rebase and retry once. - if strings.Contains(string(out), "fetch first") || strings.Contains(string(out), "non-fast-forward") { - logger.Info("push rejected (concurrent task); rebasing and retrying", "sandbox", sandboxDir) - if out2, err2 := exec.Command("git", "-C", sandboxDir, "pull", "--rebase", "origin", "master").CombinedOutput(); err2 != nil { - return fmt.Errorf("git rebase before retry push: %w\n%s", err2, out2) - } - // Re-capture commits after rebase (hashes might have changed) - execRecord.Commits = nil - logOut, logErr = exec.Command("git", "-C", sandboxDir, "log", logRange, "--pretty=format:%H|%s").Output() - if logErr == nil { - lines := strings.Split(strings.TrimSpace(string(logOut)), "\n") - for _, line := range lines { - parts := strings.SplitN(line, "|", 2) - if len(parts) == 2 { - execRecord.Commits = append(execRecord.Commits, task.GitCommit{ - Hash: parts[0], - Message: parts[1], - }) - } - } - } - - if out3, err3 := exec.Command("git", "-C", sandboxDir, "push", "origin", "HEAD").CombinedOutput(); err3 != nil { - return fmt.Errorf("git push to origin (after rebase): %w\n%s", err3, out3) - } - } else { - return fmt.Errorf("git push to origin: %w\n%s", err, out) - } - } - - logger.Info("sandbox pushed to bare repo", "sandbox", sandboxDir) - os.RemoveAll(sandboxDir) - return nil -} - -// execOnce runs the claude subprocess once, streaming output to e's log paths. -func (r *ClaudeRunner) execOnce(ctx context.Context, args []string, workingDir, projectDir string, e *storage.Execution) error { - cmd := exec.CommandContext(ctx, r.binaryPath(), args...) - cmd.Env = append(os.Environ(), - "CLAUDOMATOR_API_URL="+r.APIURL, - "CLAUDOMATOR_TASK_ID="+e.TaskID, - "CLAUDOMATOR_PROJECT_DIR="+projectDir, - "CLAUDOMATOR_QUESTION_FILE="+filepath.Join(e.ArtifactDir, "question.json"), - "CLAUDOMATOR_SUMMARY_FILE="+filepath.Join(e.ArtifactDir, "summary.txt"), - "CLAUDOMATOR_DROP_DIR="+r.DropsDir, - ) - // Put the subprocess in its own process group so we can SIGKILL the entire - // group (MCP servers, bash children, etc.) on cancellation. - cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} - if workingDir != "" { - cmd.Dir = workingDir - } - - stdoutFile, err := os.Create(e.StdoutPath) - if err != nil { - return fmt.Errorf("creating stdout log: %w", err) - } - defer stdoutFile.Close() - - stderrFile, err := os.Create(e.StderrPath) - if err != nil { - return fmt.Errorf("creating stderr log: %w", err) - } - defer stderrFile.Close() - - // Use os.Pipe for stdout so we own the read-end lifetime. - // cmd.StdoutPipe() would add the read-end to closeAfterWait, causing - // cmd.Wait() to close it before our goroutine finishes reading. - stdoutR, stdoutW, err := os.Pipe() - if err != nil { - return fmt.Errorf("creating stdout pipe: %w", err) - } - cmd.Stdout = stdoutW // *os.File — not added to closeAfterStart/Wait - cmd.Stderr = stderrFile - - if err := cmd.Start(); err != nil { - stdoutW.Close() - stdoutR.Close() - return fmt.Errorf("starting claude: %w", err) - } - // Close our write-end immediately; the subprocess holds its own copy. - // The goroutine below gets EOF when the subprocess exits. - stdoutW.Close() - - // killDone is closed when cmd.Wait() returns, stopping the pgid-kill goroutine. - // - // Safety: this goroutine cannot block indefinitely. The select has two arms: - // • ctx.Done() — fires if the caller cancels (e.g. timeout, user cancel). - // The goroutine sends SIGKILL and exits immediately. - // • killDone — closed by close(killDone) below, immediately after cmd.Wait() - // returns. This fires when the process exits for any reason (natural exit, - // SIGKILL from the ctx arm, or any other signal). The goroutine exits without - // doing anything. - // - // Therefore: for a task that completes normally with a long-lived (non-cancelled) - // context, the killDone arm fires and the goroutine exits. There is no path where - // this goroutine outlives execOnce(). - killDone := make(chan struct{}) - go func() { - select { - case <-ctx.Done(): - // SIGKILL the entire process group to reap orphan children. - syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) - case <-killDone: - } - }() - - // Stream stdout to the log file and parse cost/errors. - // wg ensures costUSD and streamErr are fully written before we read them after cmd.Wait(). - var costUSD float64 - var streamErr error - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - costUSD, streamErr = parseStream(stdoutR, stdoutFile, r.Logger) - stdoutR.Close() - }() - - waitErr := cmd.Wait() - close(killDone) // stop the pgid-kill goroutine - wg.Wait() // drain remaining stdout before reading costUSD/streamErr - - e.CostUSD = costUSD - - if waitErr != nil { - if exitErr, ok := waitErr.(*exec.ExitError); ok { - e.ExitCode = exitErr.ExitCode() - } - // If the stream captured a rate-limit or quota message, return it - // so callers can distinguish it from a generic exit-status failure. - if isRateLimitError(streamErr) || isQuotaExhausted(streamErr) { - return streamErr - } - if tail := tailFile(e.StderrPath, 20); tail != "" { - return fmt.Errorf("claude exited with error: %w\nstderr:\n%s", waitErr, tail) - } - return fmt.Errorf("claude exited with error: %w", waitErr) - } - - e.ExitCode = 0 - if streamErr != nil { - return streamErr - } - return nil -} - -func (r *ClaudeRunner) buildArgs(t *task.Task, e *storage.Execution, questionFile string) []string { - // Resume execution: the agent already has context; just deliver the answer. - if e.ResumeSessionID != "" { - args := []string{ - "-p", e.ResumeAnswer, - "--resume", e.ResumeSessionID, - "--output-format", "stream-json", - "--verbose", - } - permMode := t.Agent.PermissionMode - if permMode == "" { - permMode = "bypassPermissions" - } - args = append(args, "--permission-mode", permMode) - if t.Agent.Model != "" { - args = append(args, "--model", t.Agent.Model) - } - return args - } - - instructions := t.Agent.Instructions - allowedTools := t.Agent.AllowedTools - - if !t.Agent.SkipPlanning { - instructions = withPlanningPreamble(instructions) - // Ensure Bash is available so the agent can POST subtasks and ask questions. - hasBash := false - for _, tool := range allowedTools { - if tool == "Bash" { - hasBash = true - break - } - } - if !hasBash { - allowedTools = append(allowedTools, "Bash") - } - } - - args := []string{ - "-p", instructions, - "--session-id", e.SessionID, - "--output-format", "stream-json", - "--verbose", - } - - if t.Agent.Model != "" { - args = append(args, "--model", t.Agent.Model) - } - if t.Agent.MaxBudgetUSD > 0 { - args = append(args, "--max-budget-usd", fmt.Sprintf("%.2f", t.Agent.MaxBudgetUSD)) - } - // Default to bypassPermissions — claudomator runs tasks unattended, so - // prompting for write access would always stall execution. Tasks that need - // a more restrictive mode can set permission_mode explicitly. - permMode := t.Agent.PermissionMode - if permMode == "" { - permMode = "bypassPermissions" - } - args = append(args, "--permission-mode", permMode) - if t.Agent.SystemPromptAppend != "" { - args = append(args, "--append-system-prompt", t.Agent.SystemPromptAppend) - } - for _, tool := range allowedTools { - args = append(args, "--allowedTools", tool) - } - for _, tool := range t.Agent.DisallowedTools { - args = append(args, "--disallowedTools", tool) - } - for _, f := range t.Agent.ContextFiles { - args = append(args, "--add-dir", f) - } - args = append(args, t.Agent.AdditionalArgs...) - - return args -} - -// parseStream reads streaming JSON from claude, writes to w, and returns -// (costUSD, error). error is non-nil if the stream signals task failure: -// - result message has is_error:true -// - a tool_result was denied due to missing permissions -func parseStream(r io.Reader, w io.Writer, logger *slog.Logger) (float64, error) { - tee := io.TeeReader(r, w) - scanner := bufio.NewScanner(tee) - scanner.Buffer(make([]byte, 1024*1024), 1024*1024) // 1MB buffer for large lines - - var totalCost float64 - var streamErr error - - for scanner.Scan() { - line := scanner.Bytes() - var msg map[string]interface{} - if err := json.Unmarshal(line, &msg); err != nil { - continue - } - - msgType, _ := msg["type"].(string) - switch msgType { - case "rate_limit_event": - if info, ok := msg["rate_limit_info"].(map[string]interface{}); ok { - status, _ := info["status"].(string) - if status == "rejected" { - streamErr = fmt.Errorf("claude rate limit reached (rejected): %v", msg) - // Immediately break since we can't continue anyway - break - } - } - case "assistant": - if errStr, ok := msg["error"].(string); ok && errStr == "rate_limit" { - streamErr = fmt.Errorf("claude rate limit reached: %v", msg) - } - case "result": - if isErr, _ := msg["is_error"].(bool); isErr { - result, _ := msg["result"].(string) - if result != "" { - streamErr = fmt.Errorf("claude task failed: %s", result) - } else { - streamErr = fmt.Errorf("claude task failed (is_error=true in result)") - } - } - // Prefer total_cost_usd from result message; fall through to legacy check below. - if cost, ok := msg["total_cost_usd"].(float64); ok { - totalCost = cost - } - case "user": - // Detect permission-denial tool_results. These occur when permission_mode - // is not bypassPermissions and claude exits 0 without completing its task. - if err := permissionDenialError(msg); err != nil && streamErr == nil { - streamErr = err - } - } - - // Legacy cost field used by older claude versions. - if cost, ok := msg["cost_usd"].(float64); ok { - totalCost = cost - } - } - - return totalCost, streamErr -} - -// permissionDenialError inspects a "user" stream message for tool_result entries -// that were denied due to missing permissions. Returns an error if found. -func permissionDenialError(msg map[string]interface{}) error { - message, ok := msg["message"].(map[string]interface{}) - if !ok { - return nil - } - content, ok := message["content"].([]interface{}) - if !ok { - return nil - } - for _, item := range content { - itemMap, ok := item.(map[string]interface{}) - if !ok { - continue - } - if itemMap["type"] != "tool_result" { - continue - } - if isErr, _ := itemMap["is_error"].(bool); !isErr { - continue - } - text, _ := itemMap["content"].(string) - if strings.Contains(text, "requested permissions") || strings.Contains(text, "haven't granted") { - return fmt.Errorf("permission denied by host: %s", text) - } - } - return nil -} - -// tailFile returns the last n lines of the file at path, or empty string if -// the file cannot be read. Used to surface subprocess stderr on failure. -func tailFile(path string, n int) string { - f, err := os.Open(path) - if err != nil { - return "" - } - defer f.Close() - - var lines []string - scanner := bufio.NewScanner(f) - for scanner.Scan() { - lines = append(lines, scanner.Text()) - if len(lines) > n { - lines = lines[1:] - } - } - return strings.Join(lines, "\n") -} diff --git a/internal/executor/claude_test.go b/internal/executor/claude_test.go deleted file mode 100644 index e76fbf2..0000000 --- a/internal/executor/claude_test.go +++ /dev/null @@ -1,882 +0,0 @@ -package executor - -import ( - "context" - "errors" - "fmt" - "io" - "log/slog" - "os" - "os/exec" - "path/filepath" - "runtime" - "strings" - "testing" - "time" - - "github.com/thepeterstone/claudomator/internal/storage" - "github.com/thepeterstone/claudomator/internal/task" -) - -func TestClaudeRunner_BuildArgs_BasicTask(t *testing.T) { - r := &ClaudeRunner{} - tk := &task.Task{ - Agent: task.AgentConfig{ - Type: "claude", - Instructions: "fix the bug", - Model: "sonnet", - SkipPlanning: true, - }, - } - - args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") - - argMap := make(map[string]bool) - for _, a := range args { - argMap[a] = true - } - for _, want := range []string{"-p", "fix the bug", "--output-format", "stream-json", "--verbose", "--model", "sonnet"} { - if !argMap[want] { - t.Errorf("missing arg %q in %v", want, args) - } - } -} - -func TestClaudeRunner_BuildArgs_FullConfig(t *testing.T) { - r := &ClaudeRunner{} - tk := &task.Task{ - Agent: task.AgentConfig{ - Type: "claude", - Instructions: "implement feature", - Model: "opus", - MaxBudgetUSD: 5.0, - PermissionMode: "bypassPermissions", - SystemPromptAppend: "Follow TDD", - AllowedTools: []string{"Bash", "Edit"}, - DisallowedTools: []string{"Write"}, - ContextFiles: []string{"/src"}, - AdditionalArgs: []string{"--verbose"}, - SkipPlanning: true, - }, - } - - args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") - - // Check key args are present. - argMap := make(map[string]bool) - for _, a := range args { - argMap[a] = true - } - - requiredArgs := []string{ - "-p", "implement feature", "--output-format", "stream-json", - "--model", "opus", "--max-budget-usd", "5.00", - "--permission-mode", "bypassPermissions", - "--append-system-prompt", "Follow TDD", - "--allowedTools", "Bash", "Edit", - "--disallowedTools", "Write", - "--add-dir", "/src", - "--verbose", - } - for _, req := range requiredArgs { - if !argMap[req] { - t.Errorf("missing arg %q in %v", req, args) - } - } -} - -func TestClaudeRunner_BuildArgs_DefaultsToBypassPermissions(t *testing.T) { - r := &ClaudeRunner{} - tk := &task.Task{ - Agent: task.AgentConfig{ - Type: "claude", - Instructions: "do work", - SkipPlanning: true, - // PermissionMode intentionally not set - }, - } - - args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") - - found := false - for i, a := range args { - if a == "--permission-mode" && i+1 < len(args) && args[i+1] == "bypassPermissions" { - found = true - } - } - if !found { - t.Errorf("expected --permission-mode bypassPermissions when PermissionMode is empty, args: %v", args) - } -} - -func TestClaudeRunner_BuildArgs_RespectsExplicitPermissionMode(t *testing.T) { - r := &ClaudeRunner{} - tk := &task.Task{ - Agent: task.AgentConfig{ - Type: "claude", - Instructions: "do work", - PermissionMode: "default", - SkipPlanning: true, - }, - } - - args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") - - for i, a := range args { - if a == "--permission-mode" && i+1 < len(args) { - if args[i+1] != "default" { - t.Errorf("expected --permission-mode default, got %q", args[i+1]) - } - return - } - } - t.Errorf("--permission-mode flag not found in args: %v", args) -} - -func TestClaudeRunner_BuildArgs_AlwaysIncludesVerbose(t *testing.T) { - r := &ClaudeRunner{} - tk := &task.Task{ - Agent: task.AgentConfig{ - Type: "claude", - Instructions: "do something", - SkipPlanning: true, - }, - } - - args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") - - found := false - for _, a := range args { - if a == "--verbose" { - found = true - break - } - } - if !found { - t.Errorf("--verbose missing from args: %v", args) - } -} - -func TestClaudeRunner_BuildArgs_PreamblePrepended(t *testing.T) { - r := &ClaudeRunner{} - tk := &task.Task{ - Agent: task.AgentConfig{ - Type: "claude", - Instructions: "fix the bug", - SkipPlanning: false, - }, - } - - args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") - - // The -p value should start with the preamble and end with the original instructions. - if len(args) < 2 || args[0] != "-p" { - t.Fatalf("expected -p as first arg, got: %v", args) - } - if !strings.HasPrefix(args[1], "## Runtime Environment") { - t.Errorf("instructions should start with planning preamble, got prefix: %q", args[1][:min(len(args[1]), 20)]) - } - if !strings.Contains(args[1], "$CLAUDOMATOR_PROJECT_DIR") { - t.Errorf("preamble should mention $CLAUDOMATOR_PROJECT_DIR") - } - if !strings.HasSuffix(args[1], "fix the bug") { - t.Errorf("instructions should end with original instructions") - } -} - -func TestClaudeRunner_BuildArgs_PreambleAddsBash(t *testing.T) { - r := &ClaudeRunner{} - tk := &task.Task{ - Agent: task.AgentConfig{ - Type: "claude", - Instructions: "do work", - AllowedTools: []string{"Read"}, - SkipPlanning: false, - }, - } - - args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") - - // Bash should be appended to allowed tools. - foundBash := false - for i, a := range args { - if a == "--allowedTools" && i+1 < len(args) && args[i+1] == "Bash" { - foundBash = true - } - } - if !foundBash { - t.Errorf("Bash should be added to --allowedTools when preamble is active: %v", args) - } -} - -func TestClaudeRunner_BuildArgs_PreambleBashNotDuplicated(t *testing.T) { - r := &ClaudeRunner{} - tk := &task.Task{ - Agent: task.AgentConfig{ - Type: "claude", - Instructions: "do work", - AllowedTools: []string{"Bash", "Read"}, - SkipPlanning: false, - }, - } - - args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") - - // Count Bash occurrences in --allowedTools values. - bashCount := 0 - for i, a := range args { - if a == "--allowedTools" && i+1 < len(args) && args[i+1] == "Bash" { - bashCount++ - } - } - if bashCount != 1 { - t.Errorf("Bash should appear exactly once in --allowedTools, got %d: %v", bashCount, args) - } -} - -// TestClaudeRunner_Run_ResumeSetsSessionIDFromResumeSession verifies that when a -// resume execution is itself blocked again, the stored SessionID is the original -// resumed session, not the new execution's own UUID. Without this, a second -// block-and-resume cycle passes the wrong --resume session ID and fails. -func TestClaudeRunner_Run_ResumeSetsSessionIDFromResumeSession(t *testing.T) { - logDir := t.TempDir() - r := &ClaudeRunner{ - BinaryPath: "true", // exits 0, no output - Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), - LogDir: logDir, - } - tk := &task.Task{ - Agent: task.AgentConfig{ - Type: "claude", - Instructions: "continue", - SkipPlanning: true, - }, - } - exec := &storage.Execution{ - ID: "resume-exec-uuid", - TaskID: "task-1", - ResumeSessionID: "original-session-uuid", - ResumeAnswer: "yes", - } - - // Run completes successfully (binary is "true"). - _ = r.Run(context.Background(), tk, exec) - - // SessionID must be the original session (ResumeSessionID), not the new - // exec's own ID. If it were exec.ID, a second blocked-then-resumed cycle - // would use the wrong --resume argument and fail. - if exec.SessionID != "original-session-uuid" { - t.Errorf("SessionID after resume Run: want %q, got %q", "original-session-uuid", exec.SessionID) - } -} - -func TestClaudeRunner_Run_InaccessibleWorkingDir_ReturnsError(t *testing.T) { - r := &ClaudeRunner{ - BinaryPath: "true", // would succeed if it ran - Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), - LogDir: t.TempDir(), - } - tk := &task.Task{ - Agent: task.AgentConfig{ - Type: "claude", - ProjectDir: "/nonexistent/path/does/not/exist", - SkipPlanning: true, - }, - } - exec := &storage.Execution{ID: "test-exec"} - - err := r.Run(context.Background(), tk, exec) - - if err == nil { - t.Fatal("expected error for inaccessible working_dir, got nil") - } - if !strings.Contains(err.Error(), "project_dir") { - t.Errorf("expected 'project_dir' in error, got: %v", err) - } -} - -func TestClaudeRunner_BinaryPath_Default(t *testing.T) { - r := &ClaudeRunner{} - if r.binaryPath() != "claude" { - t.Errorf("want 'claude', got %q", r.binaryPath()) - } -} - -func TestClaudeRunner_BinaryPath_Custom(t *testing.T) { - r := &ClaudeRunner{BinaryPath: "/usr/local/bin/claude"} - if r.binaryPath() != "/usr/local/bin/claude" { - t.Errorf("want custom path, got %q", r.binaryPath()) - } -} - -// TestExecOnce_NoGoroutineLeak_OnNaturalExit verifies that execOnce does not -// leave behind any goroutines when the subprocess exits normally (no context -// cancellation). Both the pgid-kill goroutine and the parseStream goroutine -// must have exited before execOnce returns. -func TestExecOnce_NoGoroutineLeak_OnNaturalExit(t *testing.T) { - logDir := t.TempDir() - r := &ClaudeRunner{ - BinaryPath: "true", // exits immediately with status 0, produces no output - Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), - LogDir: logDir, - } - e := &storage.Execution{ - ID: "goroutine-leak-test", - TaskID: "task-id", - StdoutPath: filepath.Join(logDir, "stdout.log"), - StderrPath: filepath.Join(logDir, "stderr.log"), - ArtifactDir: logDir, - } - - // Let any goroutines from test infrastructure settle before sampling. - runtime.Gosched() - baseline := runtime.NumGoroutine() - - if err := r.execOnce(context.Background(), []string{}, "", "", e); err != nil { - t.Fatalf("execOnce failed: %v", err) - } - - // Give the scheduler a moment to let any leaked goroutines actually exit. - // In correct code the goroutines exit before execOnce returns, so this is - // just a safety buffer for the scheduler. - time.Sleep(10 * time.Millisecond) - runtime.Gosched() - - after := runtime.NumGoroutine() - if after > baseline { - t.Errorf("goroutine leak: %d goroutines before execOnce, %d after (leaked %d)", - baseline, after, after-baseline) - } -} - -// initGitRepo creates a git repo in dir with one commit so it is clonable. -func initGitRepo(t *testing.T, dir string) { - t.Helper() - cmds := [][]string{ - {"git", "-c", "safe.directory=*", "-C", dir, "init", "-b", "main"}, - {"git", "-c", "safe.directory=*", "-C", dir, "config", "user.email", "test@test"}, - {"git", "-c", "safe.directory=*", "-C", dir, "config", "user.name", "test"}, - } - for _, args := range cmds { - if out, err := exec.Command(args[0], args[1:]...).CombinedOutput(); err != nil { - t.Fatalf("%v: %v\n%s", args, err, out) - } - } - if err := os.WriteFile(filepath.Join(dir, "init.txt"), []byte("init"), 0644); err != nil { - t.Fatal(err) - } - if out, err := exec.Command("git", "-c", "safe.directory=*", "-C", dir, "add", ".").CombinedOutput(); err != nil { - t.Fatalf("git add: %v\n%s", err, out) - } - if out, err := exec.Command("git", "-c", "safe.directory=*", "-C", dir, "commit", "-m", "init").CombinedOutput(); err != nil { - t.Fatalf("git commit: %v\n%s", err, out) - } -} - -func TestSandboxCloneSource_PrefersLocalRemote(t *testing.T) { - dir := t.TempDir() - initGitRepo(t, dir) - // Add a "local" remote pointing to a bare repo. - bare := t.TempDir() - exec.Command("git", "init", "--bare", bare).Run() - exec.Command("git", "-C", dir, "remote", "add", "local", bare).Run() - exec.Command("git", "-C", dir, "remote", "add", "origin", "https://example.com/repo").Run() - - got := sandboxCloneSource(dir) - if got != bare { - t.Errorf("expected bare repo path %q, got %q", bare, got) - } -} - -func TestSandboxCloneSource_FallsBackToOrigin(t *testing.T) { - dir := t.TempDir() - initGitRepo(t, dir) - originURL := "https://example.com/origin-repo" - exec.Command("git", "-C", dir, "remote", "add", "origin", originURL).Run() - - got := sandboxCloneSource(dir) - if got != originURL { - t.Errorf("expected origin URL %q, got %q", originURL, got) - } -} - -func TestSandboxCloneSource_FallsBackToProjectDir(t *testing.T) { - dir := t.TempDir() - initGitRepo(t, dir) - // No remotes configured. - got := sandboxCloneSource(dir) - if got != dir { - t.Errorf("expected projectDir %q (no remotes), got %q", dir, got) - } -} - -func TestSetupSandbox_ClonesGitRepo(t *testing.T) { - src := t.TempDir() - initGitRepo(t, src) - - sandbox, err := setupSandbox(src, slog.New(slog.NewTextHandler(io.Discard, nil))) - if err != nil { - t.Fatalf("setupSandbox: %v", err) - } - t.Cleanup(func() { os.RemoveAll(sandbox) }) - - // Force sandbox to master if it cloned as main - exec.Command("git", gitSafe("-C", sandbox, "checkout", "master")...).Run() - - // Debug sandbox - logOut, _ := exec.Command("git", "-C", sandbox, "log", "-1").CombinedOutput() - fmt.Printf("DEBUG: sandbox log: %s\n", string(logOut)) - - // Verify sandbox is a git repo with at least one commit. - out, err := exec.Command("git", "-C", sandbox, "log", "--oneline").Output() - if err != nil { - t.Fatalf("git log in sandbox: %v", err) - } - if len(strings.TrimSpace(string(out))) == 0 { - t.Error("expected at least one commit in sandbox, got empty log") - } -} - -func TestSetupSandbox_InitialisesNonGitDir(t *testing.T) { - // A plain directory (not a git repo) should be initialised then cloned. - src := t.TempDir() - - sandbox, err := setupSandbox(src, slog.New(slog.NewTextHandler(io.Discard, nil))) - if err != nil { - t.Fatalf("setupSandbox on plain dir: %v", err) - } - t.Cleanup(func() { os.RemoveAll(sandbox) }) - - if _, err := os.Stat(filepath.Join(sandbox, ".git")); err != nil { - t.Errorf("sandbox should be a git repo: %v", err) - } -} - -func TestTeardownSandbox_AutocommitsChanges(t *testing.T) { - // Create a bare repo as origin so push succeeds. - bare := t.TempDir() - if out, err := exec.Command("git", "init", "--bare", bare).CombinedOutput(); err != nil { - t.Fatalf("git init bare: %v\n%s", err, out) - } - - // Create a sandbox directly. - sandbox := t.TempDir() - initGitRepo(t, sandbox) - if out, err := exec.Command("git", "-c", "safe.directory=*", "-C", sandbox, "remote", "add", "origin", bare).CombinedOutput(); err != nil { - t.Fatalf("git remote add: %v\n%s", err, out) - } - // Initial push to establish origin/main - if out, err := exec.Command("git", "-c", "safe.directory=*", "-C", sandbox, "push", "origin", "main").CombinedOutput(); err != nil { - t.Fatalf("git push initial: %v\n%s", err, out) - } - - // Capture startHEAD - headOut, err := exec.Command("git", "-c", "safe.directory=*", "-C", sandbox, "rev-parse", "HEAD").Output() - if err != nil { - t.Fatalf("rev-parse HEAD: %v", err) - } - startHEAD := strings.TrimSpace(string(headOut)) - - // Leave an uncommitted file in the sandbox. - if err := os.WriteFile(filepath.Join(sandbox, "dirty.txt"), []byte("autocommit me"), 0644); err != nil { - t.Fatal(err) - } - - logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug})) - execRecord := &storage.Execution{} - - err = teardownSandbox("", sandbox, startHEAD, logger, execRecord) - if err != nil { - t.Fatalf("expected autocommit to succeed, got error: %v", err) - } - - // Sandbox should be removed after successful autocommit and push. - if _, statErr := os.Stat(sandbox); !os.IsNotExist(statErr) { - t.Error("sandbox should have been removed after successful autocommit and push") - } - - // Verify the commit exists in the bare repo. - out, err := exec.Command("git", "-C", bare, "log", "-1", "--pretty=%B").Output() - if err != nil { - t.Fatalf("git log in bare repo: %v", err) - } - if !strings.Contains(string(out), "chore: autocommit uncommitted changes") { - t.Errorf("expected autocommit message in log, got: %q", string(out)) - } - - // Verify the commit was captured in execRecord. - if len(execRecord.Commits) == 0 { - t.Error("expected at least one commit in execRecord") - } else if !strings.Contains(execRecord.Commits[0].Message, "chore: autocommit uncommitted changes") { - t.Errorf("unexpected commit message: %q", execRecord.Commits[0].Message) - } -} - -func TestTeardownSandbox_BuildFailure_BlocksAutocommit(t *testing.T) { - bare := t.TempDir() - if out, err := exec.Command("git", "init", "--bare", bare).CombinedOutput(); err != nil { - t.Fatalf("git init bare: %v\n%s", err, out) - } - - sandbox := t.TempDir() - initGitRepo(t, sandbox) - if out, err := exec.Command("git", "-c", "safe.directory=*", "-C", sandbox, "remote", "add", "origin", bare).CombinedOutput(); err != nil { - t.Fatalf("git remote add: %v\n%s", err, out) - } - - // Capture startHEAD - headOut, err := exec.Command("git", "-c", "safe.directory=*", "-C", sandbox, "rev-parse", "HEAD").Output() - if err != nil { - t.Fatalf("rev-parse HEAD: %v", err) - } - startHEAD := strings.TrimSpace(string(headOut)) - - // Leave an uncommitted file. - if err := os.WriteFile(filepath.Join(sandbox, "dirty.txt"), []byte("dirty"), 0644); err != nil { - t.Fatal(err) - } - - // Add a failing Makefile. - makefile := "build:\n\t@echo 'build failed'\n\texit 1\n" - if err := os.WriteFile(filepath.Join(sandbox, "Makefile"), []byte(makefile), 0644); err != nil { - t.Fatal(err) - } - - logger := slog.New(slog.NewTextHandler(io.Discard, nil)) - execRecord := &storage.Execution{} - - err = teardownSandbox("", sandbox, startHEAD, logger, execRecord) - if err == nil { - t.Error("expected teardown to fail due to build failure, but it succeeded") - } else if !strings.Contains(err.Error(), "build failed before autocommit") { - t.Errorf("expected build failure error message, got: %v", err) - } - - // Sandbox should NOT be removed if teardown failed. - if _, statErr := os.Stat(sandbox); os.IsNotExist(statErr) { - t.Error("sandbox should have been preserved after build failure") - } - - // Verify no new commit in bare repo. - out, err := exec.Command("git", "-C", bare, "log", "HEAD").CombinedOutput() - if strings.Contains(string(out), "chore: autocommit uncommitted changes") { - t.Error("autocommit should not have been pushed after build failure") - } -} - -func TestTeardownSandbox_BuildSuccess_ProceedsToAutocommit(t *testing.T) { - bare := t.TempDir() - if out, err := exec.Command("git", "init", "--bare", bare).CombinedOutput(); err != nil { - t.Fatalf("git init bare: %v\n%s", err, out) - } - - sandbox := t.TempDir() - initGitRepo(t, sandbox) - if out, err := exec.Command("git", "-c", "safe.directory=*", "-C", sandbox, "remote", "add", "origin", bare).CombinedOutput(); err != nil { - t.Fatalf("git remote add: %v\n%s", err, out) - } - - // Capture startHEAD - headOut, err := exec.Command("git", "-c", "safe.directory=*", "-C", sandbox, "rev-parse", "HEAD").Output() - if err != nil { - t.Fatalf("rev-parse HEAD: %v", err) - } - startHEAD := strings.TrimSpace(string(headOut)) - - // Leave an uncommitted file. - if err := os.WriteFile(filepath.Join(sandbox, "dirty.txt"), []byte("dirty"), 0644); err != nil { - t.Fatal(err) - } - - // Add a successful Makefile. - makefile := "build:\n\t@echo 'build succeeded'\n" - if err := os.WriteFile(filepath.Join(sandbox, "Makefile"), []byte(makefile), 0644); err != nil { - t.Fatal(err) - } - - logger := slog.New(slog.NewTextHandler(io.Discard, nil)) - execRecord := &storage.Execution{} - - err = teardownSandbox("", sandbox, startHEAD, logger, execRecord) - if err != nil { - t.Fatalf("expected teardown to succeed after build success, got error: %v", err) - } - - // Sandbox should be removed after success. - if _, statErr := os.Stat(sandbox); !os.IsNotExist(statErr) { - t.Error("sandbox should have been removed after successful build and autocommit") - } - - // Verify new commit in bare repo. - out, err := exec.Command("git", "-C", bare, "log", "-1", "--pretty=%B").Output() - if err != nil { - t.Fatalf("git log in bare repo: %v", err) - } - if !strings.Contains(string(out), "chore: autocommit uncommitted changes") { - t.Errorf("expected autocommit message in log, got: %q", string(out)) - } -} - - -func TestTeardownSandbox_CleanSandboxWithNoNewCommits_RemovesSandbox(t *testing.T) { - src := t.TempDir() - initGitRepo(t, src) - logger := slog.New(slog.NewTextHandler(io.Discard, nil)) - sandbox, err := setupSandbox(src, logger) - if err != nil { - t.Fatalf("setupSandbox: %v", err) - } - - execRecord := &storage.Execution{} - - headOut, _ := exec.Command("git", "-C", sandbox, "rev-parse", "HEAD").Output() - startHEAD := strings.TrimSpace(string(headOut)) - - // Sandbox has no new commits beyond origin; teardown should succeed and remove it. - if err := teardownSandbox(src, sandbox, startHEAD, logger, execRecord); err != nil { - t.Fatalf("teardownSandbox: %v", err) - } - if _, statErr := os.Stat(sandbox); !os.IsNotExist(statErr) { - t.Error("sandbox should have been removed after clean teardown") - os.RemoveAll(sandbox) - } -} - -// TestBlockedError_IncludesSandboxDir verifies that when a task is blocked in a -// sandbox, the BlockedError carries the sandbox path so the resume execution can -// run in the same directory (where Claude's session files are stored). -func TestBlockedError_IncludesSandboxDir(t *testing.T) { - src := t.TempDir() - initGitRepo(t, src) - - logDir := t.TempDir() - - // Use a script that writes question.json to the env-var path and exits 0 - // (simulating a blocked agent that asks a question before exiting). - scriptPath := filepath.Join(t.TempDir(), "fake-claude.sh") - if err := os.WriteFile(scriptPath, []byte(`#!/bin/sh -if [ -n "$CLAUDOMATOR_QUESTION_FILE" ]; then - printf '{"text":"Should I continue?"}' > "$CLAUDOMATOR_QUESTION_FILE" -fi -`), 0755); err != nil { - t.Fatalf("write script: %v", err) - } - - r := &ClaudeRunner{ - BinaryPath: scriptPath, - Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), - LogDir: logDir, - } - tk := &task.Task{ - Agent: task.AgentConfig{ - Type: "claude", - Instructions: "do something", - ProjectDir: src, - SkipPlanning: true, - }, - } - exec := &storage.Execution{ID: "blocked-exec-uuid", TaskID: "task-1"} - - err := r.Run(context.Background(), tk, exec) - - var blocked *BlockedError - if !errors.As(err, &blocked) { - t.Fatalf("expected BlockedError, got: %v", err) - } - if blocked.SandboxDir == "" { - t.Error("BlockedError.SandboxDir should be set when task runs in a sandbox") - } - // Sandbox should still exist (preserved for resume). - if _, statErr := os.Stat(blocked.SandboxDir); os.IsNotExist(statErr) { - t.Error("sandbox directory should be preserved when blocked") - } else { - os.RemoveAll(blocked.SandboxDir) // cleanup - } -} - -// TestClaudeRunner_Run_ResumeUsesStoredSandboxDir verifies that when a resume -// execution has SandboxDir set, the runner uses that directory (not project_dir) -// as the working directory, so Claude finds its session files there. -func TestClaudeRunner_Run_ResumeUsesStoredSandboxDir(t *testing.T) { - logDir := t.TempDir() - sandboxDir := t.TempDir() - cwdFile := filepath.Join(logDir, "cwd.txt") - - // Use a script that writes its working directory to a file in logDir (stable path). - scriptPath := filepath.Join(t.TempDir(), "fake-claude.sh") - script := "#!/bin/sh\nprintf '%s' \"$PWD\" > " + cwdFile + "\n" - if err := os.WriteFile(scriptPath, []byte(script), 0755); err != nil { - t.Fatalf("write script: %v", err) - } - - r := &ClaudeRunner{ - BinaryPath: scriptPath, - Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), - LogDir: logDir, - } - tk := &task.Task{ - Agent: task.AgentConfig{ - Type: "claude", - ProjectDir: sandboxDir, // must exist; resume overrides it with SandboxDir anyway - SkipPlanning: true, - }, - } - exec := &storage.Execution{ - ID: "resume-exec-uuid", - TaskID: "task-1", - ResumeSessionID: "original-session", - ResumeAnswer: "yes", - SandboxDir: sandboxDir, - } - - _ = r.Run(context.Background(), tk, exec) - - got, err := os.ReadFile(cwdFile) - if err != nil { - t.Fatalf("cwd file not written: %v", err) - } - // The runner should have executed claude in sandboxDir, not in project_dir. - if string(got) != sandboxDir { - t.Errorf("resume working dir: want %q, got %q", sandboxDir, string(got)) - } -} - -func TestClaudeRunner_Run_StaleSandboxDir_ClonesAfresh(t *testing.T) { - logDir := t.TempDir() - projectDir := t.TempDir() - initGitRepo(t, projectDir) - - cwdFile := filepath.Join(logDir, "cwd.txt") - scriptPath := filepath.Join(t.TempDir(), "fake-claude.sh") - script := "#!/bin/sh\nprintf '%s' \"$PWD\" > " + cwdFile + "\n" - if err := os.WriteFile(scriptPath, []byte(script), 0755); err != nil { - t.Fatalf("write script: %v", err) - } - - r := &ClaudeRunner{ - BinaryPath: scriptPath, - Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), - LogDir: logDir, - } - tk := &task.Task{ - Agent: task.AgentConfig{ - Type: "claude", - ProjectDir: projectDir, - SkipPlanning: true, - }, - } - // Point to a sandbox that no longer exists (e.g. /tmp was purged). - staleSandbox := filepath.Join(t.TempDir(), "gone") - e := &storage.Execution{ - ID: "resume-exec-2", - TaskID: "task-2", - ResumeSessionID: "session-abc", - ResumeAnswer: "ok", - SandboxDir: staleSandbox, - } - - if err := r.Run(context.Background(), tk, e); err != nil { - t.Fatalf("Run with stale sandbox: %v", err) - } - - got, err := os.ReadFile(cwdFile) - if err != nil { - t.Fatalf("cwd file not written: %v", err) - } - // Should have run in a fresh sandbox (not the stale path, not the raw projectDir). - // The sandbox is removed after teardown, so we only check what it wasn't. - cwd := string(got) - if cwd == staleSandbox { - t.Error("ran in stale sandbox dir that doesn't exist") - } - if cwd == projectDir { - t.Error("ran directly in project_dir; expected a fresh sandbox clone") - } - // cwd should look like a claudomator sandbox path. - if !strings.Contains(cwd, "claudomator-sandbox-") { - t.Errorf("expected sandbox path, got %q", cwd) - } -} - -func TestIsCompletionReport(t *testing.T) { - tests := []struct { - name string - json string - expected bool - }{ - { - name: "real question with options", - json: `{"text": "Should I proceed with implementation?", "options": ["Yes", "No"]}`, - expected: false, - }, - { - name: "real question no options", - json: `{"text": "Which approach do you prefer?"}`, - expected: false, - }, - { - name: "completion report no options no question mark", - json: `{"text": "All tests pass. Implementation complete. Summary written to CLAUDOMATOR_SUMMARY_FILE."}`, - expected: true, - }, - { - name: "completion report with empty options", - json: `{"text": "Feature implemented and committed.", "options": []}`, - expected: true, - }, - { - name: "invalid json treated as not a report", - json: `not json`, - expected: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got := isCompletionReport(tt.json) - if got != tt.expected { - t.Errorf("isCompletionReport(%q) = %v, want %v", tt.json, got, tt.expected) - } - }) - } -} - -func TestTailFile_ReturnsLastNLines(t *testing.T) { - f, err := os.CreateTemp("", "tailfile-*") - if err != nil { - t.Fatal(err) - } - defer os.Remove(f.Name()) - for i := 1; i <= 30; i++ { - fmt.Fprintf(f, "line %d\n", i) - } - f.Close() - - got := tailFile(f.Name(), 5) - lines := strings.Split(got, "\n") - if len(lines) != 5 { - t.Fatalf("want 5 lines, got %d: %q", len(lines), got) - } - if lines[0] != "line 26" || lines[4] != "line 30" { - t.Errorf("want lines 26-30, got: %q", got) - } -} - -func TestTailFile_MissingFile_ReturnsEmpty(t *testing.T) { - got := tailFile("/nonexistent/path/file.log", 10) - if got != "" { - t.Errorf("want empty string for missing file, got %q", got) - } -} - -func TestGitSafe_PrependsSafeDirectory(t *testing.T) { - got := gitSafe("-C", "/some/path", "status") - want := []string{"-c", "safe.directory=*", "-C", "/some/path", "status"} - if len(got) != len(want) { - t.Fatalf("gitSafe() = %v, want %v", got, want) - } - for i := range want { - if got[i] != want[i] { - t.Errorf("gitSafe()[%d] = %q, want %q", i, got[i], want[i]) - } - } -} diff --git a/internal/executor/container.go b/internal/executor/container.go index e148620..b5979b6 100644 --- a/internal/executor/container.go +++ b/internal/executor/container.go @@ -7,6 +7,7 @@ import ( "os" "os/exec" "path/filepath" + "strings" "sync" "syscall" @@ -32,6 +33,7 @@ func (r *ContainerRunner) ExecLogDir(execID string) string { } func (r *ContainerRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error { + var err error repoURL := t.RepositoryURL if repoURL == "" { // Fallback to project_dir if repository_url is not set (legacy support) @@ -51,18 +53,51 @@ func (r *ContainerRunner) Run(ctx context.Context, t *task.Task, e *storage.Exec } // 1. Setup workspace on host - workspace, err := os.MkdirTemp("", "claudomator-workspace-*") - if err != nil { - return fmt.Errorf("creating workspace: %w", err) + var workspace string + isResume := false + if e.SandboxDir != "" { + if _, err = os.Stat(e.SandboxDir); err == nil { + workspace = e.SandboxDir + isResume = true + r.Logger.Info("resuming in preserved workspace", "path", workspace) + } } - defer os.RemoveAll(workspace) - // 2. Clone repo into workspace - r.Logger.Info("cloning repository", "url", repoURL, "workspace", workspace) - if out, err := exec.CommandContext(ctx, "git", "clone", repoURL, workspace).CombinedOutput(); err != nil { - return fmt.Errorf("git clone failed: %w\n%s", err, string(out)) + if workspace == "" { + workspace, err = os.MkdirTemp("", "claudomator-workspace-*") + if err != nil { + return fmt.Errorf("creating workspace: %w", err) + } } + // Note: workspace is only removed on success. On failure, it's preserved for debugging. + // If the task becomes BLOCKED, it's also preserved for resumption. + success := false + isBlocked := false + defer func() { + if success && !isBlocked { + os.RemoveAll(workspace) + } else { + r.Logger.Warn("preserving workspace", "path", workspace, "success", success, "blocked", isBlocked) + } + }() + + // 2. Clone repo into workspace if not resuming + if !isResume { + r.Logger.Info("cloning repository", "url", repoURL, "workspace", workspace) + // git clone requires the target to be empty or non-existent. + // Since we just created workspace as a temp dir, it's empty. + // But git clone wants to CREATE the dir if it's the target, or clone INTO it. + if out, err := exec.CommandContext(ctx, "git", "clone", repoURL, workspace).CombinedOutput(); err != nil { + // If it's a local path and not a repo, we might need to init it (legacy support from ADR-005) + r.Logger.Warn("git clone failed, attempting fallback init", "url", repoURL, "error", err) + if initErr := r.fallbackGitInit(repoURL, workspace); initErr != nil { + return fmt.Errorf("git clone and fallback init failed: %w\n%s", err, string(out)) + } + } + } + e.SandboxDir = workspace + // 3. Prepare logs logDir := r.ExecLogDir(e.ID) if logDir == "" { @@ -88,41 +123,43 @@ func (r *ContainerRunner) Run(ctx context.Context, t *task.Task, e *storage.Exec defer stderrFile.Close() // 4. Run container - // Build docker command - args := []string{ - "run", "--rm", - "-v", workspace + ":/workspace", - "-w", "/workspace", - "-e", "CLAUDOMATOR_API_URL=" + r.APIURL, - "-e", "CLAUDOMATOR_TASK_ID=" + e.TaskID, - "-e", "CLAUDOMATOR_DROP_DIR=" + r.DropsDir, - "-e", "ANTHROPIC_API_KEY=" + os.Getenv("ANTHROPIC_API_KEY"), - "-e", "GOOGLE_API_KEY=" + os.Getenv("GOOGLE_API_KEY"), + // TODO: Support Resume/BLOCKED by re-attaching to preserved workspace. + + // Write API keys to a temporary env file to avoid exposure in 'ps' or 'docker inspect' + envFile := filepath.Join(workspace, ".claudomator-env") + envContent := fmt.Sprintf("ANTHROPIC_API_KEY=%s\nGOOGLE_API_KEY=%s\n", os.Getenv("ANTHROPIC_API_KEY"), os.Getenv("GOOGLE_API_KEY")) + if err := os.WriteFile(envFile, []byte(envContent), 0600); err != nil { + return fmt.Errorf("writing env file: %w", err) } - // Inject custom instructions as environment variable or via file + // Inject custom instructions via file to avoid CLI length limits instructionsFile := filepath.Join(workspace, ".claudomator-instructions.txt") if err := os.WriteFile(instructionsFile, []byte(t.Agent.Instructions), 0600); err != nil { return fmt.Errorf("writing instructions: %w", err) } - // Command to run inside container: we assume the image has 'claude' or 'gemini' - // and a wrapper script that reads CLAUDOMATOR_TASK_ID etc. - innerCmd := []string{"claude", "-p", t.Agent.Instructions, "--session-id", e.ID, "--output-format", "stream-json", "--verbose", "--permission-mode", "bypassPermissions"} - if t.Agent.Type == "gemini" { - innerCmd = []string{"gemini", "-p", t.Agent.Instructions} // simplified for now + args := r.buildDockerArgs(workspace, e.TaskID) + innerCmd := r.buildInnerCmd(t, e.ID) + + image = t.Agent.ContainerImage + if image == "" { + image = r.Image + } + if image == "" { + image = "claudomator-agent:latest" } - args = append(args, image) - args = append(args, innerCmd...) + fullArgs := append(args, image) + fullArgs = append(fullArgs, innerCmd...) r.Logger.Info("starting container", "image", image, "taskID", t.ID) - cmd := exec.CommandContext(ctx, "docker", args...) + cmd := exec.CommandContext(ctx, "docker", fullArgs...) cmd.Stderr = stderrFile cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} // Use os.Pipe for stdout so we can parse it in real-time - stdoutR, stdoutW, err := os.Pipe() + var stdoutR, stdoutW *os.File + stdoutR, stdoutW, err = os.Pipe() if err != nil { return fmt.Errorf("creating stdout pipe: %w", err) } @@ -151,17 +188,41 @@ func (r *ContainerRunner) Run(ctx context.Context, t *task.Task, e *storage.Exec e.CostUSD = costUSD + // Check whether the agent left a question before exiting. + questionFile := filepath.Join(logDir, "question.json") + if data, readErr := os.ReadFile(questionFile); readErr == nil { + os.Remove(questionFile) // consumed + questionJSON := strings.TrimSpace(string(data)) + if isCompletionReport(questionJSON) { + r.Logger.Info("treating question file as completion report", "taskID", e.TaskID) + e.Summary = extractQuestionText(questionJSON) + } else { + isBlocked = true + success = true // We consider BLOCKED as a "success" for workspace preservation + return &BlockedError{ + QuestionJSON: questionJSON, + SessionID: e.ID, // For container runner, we use exec ID as session ID + SandboxDir: workspace, + } + } + } + + // Read agent summary if written. + summaryFile := filepath.Join(logDir, "summary.txt") + if summaryData, readErr := os.ReadFile(summaryFile); readErr == nil { + os.Remove(summaryFile) // consumed + e.Summary = strings.TrimSpace(string(summaryData)) + } + // 5. Post-execution: push changes if successful if waitErr == nil && streamErr == nil { r.Logger.Info("pushing changes back to remote", "url", repoURL) // We assume the sandbox has committed changes (the agent image should enforce this) if out, err := exec.CommandContext(ctx, "git", "-C", workspace, "push", "origin", "HEAD").CombinedOutput(); err != nil { r.Logger.Warn("git push failed", "error", err, "output", string(out)) - // Don't fail the task just because push failed, but record it? - // Actually, user said: "they should only ever commit to their sandbox, and only ever push to an actual remote" - // So push failure is a task failure in this new model. return fmt.Errorf("git push failed: %w\n%s", err, string(out)) } + success = true } if waitErr != nil { @@ -170,3 +231,52 @@ func (r *ContainerRunner) Run(ctx context.Context, t *task.Task, e *storage.Exec return nil } + +func (r *ContainerRunner) buildDockerArgs(workspace, taskID string) []string { + return []string{ + "run", "--rm", + "-v", workspace + ":/workspace", + "-w", "/workspace", + "--env-file", "/workspace/.claudomator-env", + "-e", "CLAUDOMATOR_API_URL=" + r.APIURL, + "-e", "CLAUDOMATOR_TASK_ID=" + taskID, + "-e", "CLAUDOMATOR_DROP_DIR=" + r.DropsDir, + } +} + +func (r *ContainerRunner) buildInnerCmd(t *task.Task, execID string) []string { + if t.Agent.Type == "gemini" { + return []string{"gemini", "-p", "/workspace/.claudomator-instructions.txt"} + } + // Default to claude + return []string{ + "claude", + "-p", "/workspace/.claudomator-instructions.txt", + "--resume", execID, + "--output-format", "stream-json", + "--verbose", + "--permission-mode", "bypassPermissions", + } +} + +func (r *ContainerRunner) fallbackGitInit(repoURL, workspace string) error { + // Ensure directory exists + if err := os.MkdirAll(workspace, 0755); err != nil { + return err + } + // If it's a local directory but not a repo, init it. + cmds := [][]string{ + gitSafe("-C", workspace, "init"), + gitSafe("-C", workspace, "add", "-A"), + gitSafe("-C", workspace, "commit", "--allow-empty", "-m", "chore: initial commit"), + } + // If it was a local path, maybe we should have copied it? + // git clone handle local paths fine if they are repos. + // This fallback is only if it's NOT a repo. + for _, args := range cmds { + if out, err := exec.Command("git", args...).CombinedOutput(); err != nil { + return fmt.Errorf("git init failed: %w\n%s", err, out) + } + } + return nil +} diff --git a/internal/executor/container_test.go b/internal/executor/container_test.go new file mode 100644 index 0000000..b1513ea --- /dev/null +++ b/internal/executor/container_test.go @@ -0,0 +1,65 @@ +package executor + +import ( + "strings" + "testing" + + "github.com/thepeterstone/claudomator/internal/task" +) + +func TestContainerRunner_BuildDockerArgs(t *testing.T) { + runner := &ContainerRunner{ + APIURL: "http://localhost:8484", + DropsDir: "/data/drops", + } + workspace := "/tmp/ws" + taskID := "task-123" + + args := runner.buildDockerArgs(workspace, taskID) + + expected := []string{ + "run", "--rm", + "-v", "/tmp/ws:/workspace", + "-w", "/workspace", + "--env-file", "/workspace/.claudomator-env", + "-e", "CLAUDOMATOR_API_URL=http://localhost:8484", + "-e", "CLAUDOMATOR_TASK_ID=task-123", + "-e", "CLAUDOMATOR_DROP_DIR=/data/drops", + } + + if len(args) != len(expected) { + t.Fatalf("expected %d args, got %d", len(expected), len(args)) + } + for i, v := range args { + if v != expected[i] { + t.Errorf("arg %d: expected %q, got %q", i, expected[i], v) + } + } +} + +func TestContainerRunner_BuildInnerCmd(t *testing.T) { + runner := &ContainerRunner{} + + t.Run("claude", func(t *testing.T) { + tk := &task.Task{Agent: task.AgentConfig{Type: "claude"}} + cmd := runner.buildInnerCmd(tk, "exec-456") + + cmdStr := strings.Join(cmd, " ") + if !strings.Contains(cmdStr, "--resume exec-456") { + t.Errorf("expected --resume flag, got %q", cmdStr) + } + if !strings.Contains(cmdStr, "-p /workspace/.claudomator-instructions.txt") { + t.Errorf("expected instructions file path, got %q", cmdStr) + } + }) + + t.Run("gemini", func(t *testing.T) { + tk := &task.Task{Agent: task.AgentConfig{Type: "gemini"}} + cmd := runner.buildInnerCmd(tk, "exec-456") + + cmdStr := strings.Join(cmd, " ") + if !strings.HasPrefix(cmdStr, "gemini") { + t.Errorf("expected gemini command, got %q", cmdStr) + } + }) +} diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go index 878a32d..e91d435 100644 --- a/internal/executor/executor_test.go +++ b/internal/executor/executor_test.go @@ -600,10 +600,17 @@ func TestPool_RecoverStaleRunning(t *testing.T) { // Execution record should be closed as FAILED. execs, _ := store.ListExecutions(tk.ID) - if len(execs) == 0 || execs[0].Status != "FAILED" { + var failedExec *storage.Execution + for _, e := range execs { + if e.ID == "exec-stale-1" { + failedExec = e + break + } + } + if failedExec == nil || failedExec.Status != "FAILED" { t.Errorf("execution status: want FAILED, got %+v", execs) } - if execs[0].ErrorMsg == "" { + if failedExec.ErrorMsg == "" { t.Error("expected non-empty error message on recovered execution") } diff --git a/internal/executor/gemini.go b/internal/executor/gemini.go deleted file mode 100644 index b1a245c..0000000 --- a/internal/executor/gemini.go +++ /dev/null @@ -1,228 +0,0 @@ -package executor - -import ( - "context" - "encoding/json" - "fmt" - "io" - "log/slog" - "os" - "path/filepath" - "strings" - "sync" - - "github.com/thepeterstone/claudomator/internal/storage" - "github.com/thepeterstone/claudomator/internal/task" -) - -// GeminiRunner spawns the `gemini` CLI in non-interactive mode. -type GeminiRunner struct { - BinaryPath string // defaults to "gemini" - Logger *slog.Logger - LogDir string // base directory for execution logs - APIURL string // base URL of the Claudomator API, passed to subprocesses - DropsDir string // path to the drops directory, passed to subprocesses -} - -// ExecLogDir returns the log directory for the given execution ID. -func (r *GeminiRunner) ExecLogDir(execID string) string { - if r.LogDir == "" { - return "" - } - return filepath.Join(r.LogDir, execID) -} - -func (r *GeminiRunner) binaryPath() string { - if r.BinaryPath != "" { - return r.BinaryPath - } - return "gemini" -} - -// Run executes a gemini invocation, streaming output to log files. -func (r *GeminiRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error { - if t.Agent.ProjectDir != "" { - if _, err := os.Stat(t.Agent.ProjectDir); err != nil { - return fmt.Errorf("project_dir %q: %w", t.Agent.ProjectDir, err) - } - } - - logDir := r.ExecLogDir(e.ID) - if logDir == "" { - logDir = e.ID - } - if err := os.MkdirAll(logDir, 0700); err != nil { - return fmt.Errorf("creating log dir: %w", err) - } - - if e.StdoutPath == "" { - e.StdoutPath = filepath.Join(logDir, "stdout.log") - e.StderrPath = filepath.Join(logDir, "stderr.log") - e.ArtifactDir = logDir - } - - if e.SessionID == "" { - e.SessionID = e.ID - } - - questionFile := filepath.Join(logDir, "question.json") - args := r.buildArgs(t, e, questionFile) - - // Gemini CLI doesn't necessarily have the same rate limiting behavior as Claude, - // but we'll use a similar execution pattern. - err := r.execOnce(ctx, args, t.Agent.ProjectDir, t.Agent.ProjectDir, e) - if err != nil { - return err - } - - // Check whether the agent left a question before exiting. - data, readErr := os.ReadFile(questionFile) - if readErr == nil { - os.Remove(questionFile) // consumed - return &BlockedError{QuestionJSON: strings.TrimSpace(string(data)), SessionID: e.SessionID} - } - return nil -} - -func (r *GeminiRunner) execOnce(ctx context.Context, args []string, workingDir, projectDir string, e *storage.Execution) error { - // Temporarily bypass external command execution to debug pipe. - // We will simulate outputting to stdoutW directly. - - stdoutFile, err := os.Create(e.StdoutPath) - if err != nil { - return fmt.Errorf("creating stdout log: %w", err) - } - defer stdoutFile.Close() - - stderrFile, err := os.Create(e.StderrPath) - if err != nil { - return fmt.Errorf("creating stderr log: %w", err) - } - defer stderrFile.Close() - - stdoutR, stdoutW, err := os.Pipe() - if err != nil { - return fmt.Errorf("creating stdout pipe: %w", err) - } - - // Simulate writing to stdoutW - go func() { - defer stdoutW.Close() // Close the writer when done. - fmt.Fprintf(stdoutW, "```json\n") - fmt.Fprintf(stdoutW, "{\"type\":\"content_block_start\",\"content_block\":{\"text\":\"Hello, Gemini!\",\"type\":\"text\"}}\n") - fmt.Fprintf(stdoutW, "{\"type\":\"content_block_delta\",\"content_block\":{\"text\":\" How are you?\"}}\n") - fmt.Fprintf(stdoutW, "{\"type\":\"content_block_end\"}\n") - fmt.Fprintf(stdoutW, "{\"type\":\"message_delta\",\"message\":{\"role\":\"model\"}}\n") - fmt.Fprintf(stdoutW, "{\"type\":\"message_end\"}\n") - fmt.Fprintf(stdoutW, "```\n") - }() - - - var streamErr error - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - _, streamErr = parseGeminiStream(stdoutR, stdoutFile, r.Logger) - stdoutR.Close() - }() - - wg.Wait() // Wait for parseGeminiStream to finish - - // Set a dummy exit code for this simulated run - e.ExitCode = 0 - - if streamErr != nil { - return streamErr - } - return nil -} - -// parseGeminiStream reads streaming JSON from the gemini CLI, unwraps markdown -// code blocks, writes the inner JSON to w, and returns (costUSD, error). -// For now, it focuses on unwrapping and writing, not detailed parsing of cost/errors. -func parseGeminiStream(r io.Reader, w io.Writer, logger *slog.Logger) (float64, error) { - fullOutput, err := io.ReadAll(r) - if err != nil { - return 0, fmt.Errorf("reading full gemini output: %w", err) - } - logger.Debug("parseGeminiStream: raw output received", "output", string(fullOutput)) - - // Default: write raw content as-is (preserves trailing newline). - jsonContent := string(fullOutput) - - // Unwrap markdown code fences if present. - trimmed := strings.TrimSpace(jsonContent) - if jsonStartIdx := strings.Index(trimmed, "```json"); jsonStartIdx != -1 { - if jsonEndIdx := strings.LastIndex(trimmed, "```"); jsonEndIdx != -1 && jsonEndIdx > jsonStartIdx { - inner := trimmed[jsonStartIdx+len("```json") : jsonEndIdx] - jsonContent = strings.TrimSpace(inner) + "\n" - } else { - logger.Warn("malformed markdown JSON block from Gemini, falling back to raw output", "outputLength", len(jsonContent)) - } - } - - // Write the (possibly extracted) JSON content to the writer. - if _, writeErr := w.Write([]byte(jsonContent)); writeErr != nil { - return 0, fmt.Errorf("writing extracted gemini json: %w", writeErr) - } - - // Parse each line for result type to extract cost and execution errors. - var resultErr error - var costUSD float64 - for _, line := range strings.Split(jsonContent, "\n") { - line = strings.TrimSpace(line) - if line == "" { - continue - } - var msg struct { - Type string `json:"type"` - IsError bool `json:"is_error"` - Result string `json:"result"` - Cost float64 `json:"total_cost_usd"` - } - if err := json.Unmarshal([]byte(line), &msg); err != nil { - continue - } - if msg.Type == "result" { - costUSD = msg.Cost - if msg.IsError { - resultErr = fmt.Errorf("gemini execution error: %s", msg.Result) - } - } - } - - return costUSD, resultErr -} - -func (r *GeminiRunner) buildArgs(t *task.Task, e *storage.Execution, questionFile string) []string { - // Gemini CLI uses a different command structure: gemini "instructions" [flags] - - instructions := t.Agent.Instructions - if !t.Agent.SkipPlanning { - instructions = withPlanningPreamble(instructions) - } - - args := []string{ - "-p", instructions, - "--output-format", "stream-json", - "--yolo", // auto-approve all tools (equivalent to Claude's bypassPermissions) - } - - // Note: Gemini CLI flags might differ from Claude CLI. - // Assuming common flags for now, but these may need adjustment. - if t.Agent.Model != "" { - args = append(args, "--model", t.Agent.Model) - } - - // Gemini CLI doesn't use --session-id for the first run in the same way, - // or it might use it differently. For now we assume compatibility. - if e.SessionID != "" { - // If it's a resume, it might use different flags. - if e.ResumeSessionID != "" { - // This is a placeholder for Gemini's resume logic - } - } - - return args -} diff --git a/internal/executor/gemini_test.go b/internal/executor/gemini_test.go deleted file mode 100644 index 4b0339e..0000000 --- a/internal/executor/gemini_test.go +++ /dev/null @@ -1,179 +0,0 @@ -package executor - -import ( - "bytes" - "context" - "io" - "log/slog" - "strings" - "testing" - - "github.com/thepeterstone/claudomator/internal/storage" - "github.com/thepeterstone/claudomator/internal/task" -) - -func TestGeminiRunner_BuildArgs_BasicTask(t *testing.T) { - r := &GeminiRunner{} - tk := &task.Task{ - Agent: task.AgentConfig{ - Type: "gemini", - Instructions: "fix the bug", - Model: "gemini-2.5-flash-lite", - SkipPlanning: true, - }, - } - - args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") - - // Gemini CLI: instructions passed via -p for non-interactive mode - if len(args) < 2 || args[0] != "-p" || args[1] != "fix the bug" { - t.Errorf("expected -p as first args, got: %v", args) - } - - argMap := make(map[string]bool) - for _, a := range args { - argMap[a] = true - } - for _, want := range []string{"--output-format", "stream-json", "--model", "gemini-2.5-flash-lite"} { - if !argMap[want] { - t.Errorf("missing arg %q in %v", want, args) - } - } -} - -func TestGeminiRunner_BuildArgs_PreamblePrepended(t *testing.T) { - r := &GeminiRunner{} - tk := &task.Task{ - Agent: task.AgentConfig{ - Type: "gemini", - Instructions: "fix the bug", - SkipPlanning: false, - }, - } - - args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") - - if len(args) < 2 || args[0] != "-p" { - t.Fatalf("expected -p as first args, got: %v", args) - } - if !strings.HasPrefix(args[1], planningPreamble) { - t.Errorf("instructions should start with planning preamble") - } - if !strings.HasSuffix(args[1], "fix the bug") { - t.Errorf("instructions should end with original instructions") - } -} - -func TestGeminiRunner_BuildArgs_IncludesYolo(t *testing.T) { - r := &GeminiRunner{} - tk := &task.Task{ - Agent: task.AgentConfig{ - Type: "gemini", - Instructions: "write a doc", - SkipPlanning: true, - }, - } - args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") - argMap := make(map[string]bool) - for _, a := range args { - argMap[a] = true - } - if !argMap["--yolo"] { - t.Errorf("expected --yolo in gemini args (enables all tools); got: %v", args) - } -} - -func TestGeminiRunner_BuildArgs_IncludesPromptFlag(t *testing.T) { - r := &GeminiRunner{} - tk := &task.Task{ - Agent: task.AgentConfig{ - Type: "gemini", - Instructions: "do the thing", - SkipPlanning: true, - }, - } - args := r.buildArgs(tk, &storage.Execution{ID: "test-exec"}, "/tmp/q.json") - // Instructions must be passed via -p/--prompt for non-interactive headless mode, - // not as a bare positional (which starts interactive mode). - found := false - for i, a := range args { - if (a == "-p" || a == "--prompt") && i+1 < len(args) && args[i+1] == "do the thing" { - found = true - break - } - } - if !found { - t.Errorf("expected instructions passed via -p/--prompt flag; got: %v", args) - } -} - -func TestGeminiRunner_Run_InaccessibleProjectDir_ReturnsError(t *testing.T) { - r := &GeminiRunner{ - BinaryPath: "true", // would succeed if it ran - Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), - LogDir: t.TempDir(), - } - tk := &task.Task{ - Agent: task.AgentConfig{ - Type: "gemini", - ProjectDir: "/nonexistent/path/does/not/exist", - SkipPlanning: true, - }, - } - exec := &storage.Execution{ID: "test-exec"} - - err := r.Run(context.Background(), tk, exec) - - if err == nil { - t.Fatal("expected error for inaccessible project_dir, got nil") - } - if !strings.Contains(err.Error(), "project_dir") { - t.Errorf("expected 'project_dir' in error, got: %v", err) - } -} - -func TestGeminiRunner_BinaryPath_Default(t *testing.T) { - r := &GeminiRunner{} - if r.binaryPath() != "gemini" { - t.Errorf("want 'gemini', got %q", r.binaryPath()) - } -} - -func TestGeminiRunner_BinaryPath_Custom(t *testing.T) { - r := &GeminiRunner{BinaryPath: "/usr/local/bin/gemini"} - if r.binaryPath() != "/usr/local/bin/gemini" { - t.Errorf("want custom path, got %q", r.binaryPath()) - } -} - - -func TestParseGeminiStream_ParsesStructuredOutput(t *testing.T) { - // Simulate a stream-json input with various message types, including a result with error and cost. - input := streamLine(`{"type":"content_block_start","content_block":{"text":"Hello,"}}`) + - streamLine(`{"type":"content_block_delta","content_block":{"text":" World!"}}`) + - streamLine(`{"type":"content_block_end"}`) + - streamLine(`{"type":"result","subtype":"error_during_execution","is_error":true,"result":"something went wrong","total_cost_usd":0.123}`) - - reader := strings.NewReader(input) - var writer bytes.Buffer // To capture what's written to the output log - logger := slog.New(slog.NewTextHandler(io.Discard, nil)) - - cost, err := parseGeminiStream(reader, &writer, logger) - - if err == nil { - t.Errorf("expected an error, got nil") - } - if !strings.Contains(err.Error(), "something went wrong") { - t.Errorf("expected error message to contain 'something went wrong', got: %v", err) - } - - if cost != 0.123 { - t.Errorf("expected cost 0.123, got %f", cost) - } - - // Verify that the writer received the content (even if parseGeminiStream isn't fully parsing it yet) - expectedWriterContent := input - if writer.String() != expectedWriterContent { - t.Errorf("writer content mismatch:\nwant:\n%s\ngot:\n%s", expectedWriterContent, writer.String()) - } -} diff --git a/internal/executor/helpers.go b/internal/executor/helpers.go new file mode 100644 index 0000000..5ffde8e --- /dev/null +++ b/internal/executor/helpers.go @@ -0,0 +1,165 @@ +package executor + +import ( + "bufio" + "encoding/json" + "fmt" + "io" + "log/slog" + "os" + "strings" +) + +// BlockedError is returned by Run when the agent wrote a question file and exited. +// The pool transitions the task to BLOCKED and stores the question for the user. +type BlockedError struct { + QuestionJSON string // raw JSON from the question file + SessionID string // claude session to resume once the user answers + SandboxDir string // preserved sandbox path; resume must run here so Claude finds its session files +} + +func (e *BlockedError) Error() string { return fmt.Sprintf("task blocked: %s", e.QuestionJSON) } + +// parseStream reads streaming JSON from claude, writes to w, and returns +// (costUSD, error). error is non-nil if the stream signals task failure: +// - result message has is_error:true +// - a tool_result was denied due to missing permissions +func parseStream(r io.Reader, w io.Writer, logger *slog.Logger) (float64, error) { + tee := io.TeeReader(r, w) + scanner := bufio.NewScanner(tee) + scanner.Buffer(make([]byte, 1024*1024), 1024*1024) // 1MB buffer for large lines + + var totalCost float64 + var streamErr error + + for scanner.Scan() { + line := scanner.Bytes() + var msg map[string]interface{} + if err := json.Unmarshal(line, &msg); err != nil { + continue + } + + msgType, _ := msg["type"].(string) + switch msgType { + case "rate_limit_event": + if info, ok := msg["rate_limit_info"].(map[string]interface{}); ok { + status, _ := info["status"].(string) + if status == "rejected" { + streamErr = fmt.Errorf("claude rate limit reached (rejected): %v", msg) + // Immediately break since we can't continue anyway + break + } + } + case "assistant": + if errStr, ok := msg["error"].(string); ok && errStr == "rate_limit" { + streamErr = fmt.Errorf("claude rate limit reached: %v", msg) + } + case "result": + if isErr, _ := msg["is_error"].(bool); isErr { + result, _ := msg["result"].(string) + if result != "" { + streamErr = fmt.Errorf("claude task failed: %s", result) + } else { + streamErr = fmt.Errorf("claude task failed (is_error=true in result)") + } + } + // Prefer total_cost_usd from result message; fall through to legacy check below. + if cost, ok := msg["total_cost_usd"].(float64); ok { + totalCost = cost + } + case "user": + // Detect permission-denial tool_results. These occur when permission_mode + // is not bypassPermissions and claude exits 0 without completing its task. + if err := permissionDenialError(msg); err != nil && streamErr == nil { + streamErr = err + } + } + + // Legacy cost field used by older claude versions. + if cost, ok := msg["cost_usd"].(float64); ok { + totalCost = cost + } + } + + return totalCost, streamErr +} + +// permissionDenialError inspects a "user" stream message for tool_result entries +// that were denied due to missing permissions. Returns an error if found. +func permissionDenialError(msg map[string]interface{}) error { + message, ok := msg["message"].(map[string]interface{}) + if !ok { + return nil + } + content, ok := message["content"].([]interface{}) + if !ok { + return nil + } + for _, item := range content { + itemMap, ok := item.(map[string]interface{}) + if !ok { + continue + } + if itemMap["type"] != "tool_result" { + continue + } + if isErr, _ := itemMap["is_error"].(bool); !isErr { + continue + } + text, _ := itemMap["content"].(string) + if strings.Contains(text, "requested permissions") || strings.Contains(text, "haven't granted") { + return fmt.Errorf("permission denied by host: %s", text) + } + } + return nil +} + +// tailFile returns the last n lines of the file at path, or empty string if +// the file cannot be read. Used to surface subprocess stderr on failure. +func tailFile(path string, n int) string { + f, err := os.Open(path) + if err != nil { + return "" + } + defer f.Close() + + var lines []string + scanner := bufio.NewScanner(f) + for scanner.Scan() { + lines = append(lines, scanner.Text()) + if len(lines) > n { + lines = lines[1:] + } + } + return strings.Join(lines, "\n") +} + +func gitSafe(args ...string) []string { + return append([]string{"-c", "safe.directory=*"}, args...) +} + +// isCompletionReport returns true when a question-file JSON looks like a +// completion report rather than a real user question. Heuristic: no options +// (or empty options) and no "?" anywhere in the text. +func isCompletionReport(questionJSON string) bool { + var q struct { + Text string `json:"text"` + Options []string `json:"options"` + } + if err := json.Unmarshal([]byte(questionJSON), &q); err != nil { + return false + } + return len(q.Options) == 0 && !strings.Contains(q.Text, "?") +} + +// extractQuestionText returns the "text" field from a question-file JSON, or +// the raw string if parsing fails. +func extractQuestionText(questionJSON string) string { + var q struct { + Text string `json:"text"` + } + if err := json.Unmarshal([]byte(questionJSON), &q); err != nil { + return questionJSON + } + return strings.TrimSpace(q.Text) +} -- cgit v1.2.3