package executor import ( "context" "errors" "fmt" "log/slog" "os" "os/exec" "path/filepath" "strings" "sync" "syscall" "github.com/thepeterstone/claudomator/internal/storage" "github.com/thepeterstone/claudomator/internal/task" ) // ContainerRunner executes an agent inside a container. type ContainerRunner struct { Image string // default image if not specified in task Logger *slog.Logger LogDir string APIURL string DropsDir string SSHAuthSock string // optional path to host SSH agent ClaudeBinary string // optional path to claude binary in container GeminiBinary string // optional path to gemini binary in container ClaudeConfigDir string // host path to ~/.claude; mounted into container for auth credentials CredentialSyncCmd string // optional path to sync-credentials script for auth-error auto-recovery Store Store // optional; used to look up stories and projects for story-aware cloning // Command allows mocking exec.CommandContext for tests. Command func(ctx context.Context, name string, arg ...string) *exec.Cmd } func isAuthError(err error) bool { if err == nil { return false } s := err.Error() return strings.Contains(s, "Not logged in") || strings.Contains(s, "OAuth token has expired") || strings.Contains(s, "authentication_error") || strings.Contains(s, "Please run /login") } func (r *ContainerRunner) command(ctx context.Context, name string, arg ...string) *exec.Cmd { if r.Command != nil { return r.Command(ctx, name, arg...) } return exec.CommandContext(ctx, name, arg...) } func (r *ContainerRunner) ExecLogDir(execID string) string { if r.LogDir == "" { return "" } return filepath.Join(r.LogDir, execID) } func (r *ContainerRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error { var err error repoURL := t.RepositoryURL if repoURL == "" { return fmt.Errorf("task %s has no repository_url", t.ID) } // 1. Setup workspace on host var workspace string isResume := false if e.SandboxDir != "" { if _, err = os.Stat(e.SandboxDir); err == nil { workspace = e.SandboxDir isResume = true r.Logger.Info("resuming in preserved workspace", "path", workspace) } } if workspace == "" { workspace, err = os.MkdirTemp("", "claudomator-workspace-*") if err != nil { return fmt.Errorf("creating workspace: %w", err) } // chmod applied after clone; see step 2. } // Note: workspace is only removed on success. On failure, it's preserved for debugging. // If the task becomes BLOCKED, it's also preserved for resumption. success := false isBlocked := false defer func() { if success && !isBlocked { os.RemoveAll(workspace) } else { r.Logger.Warn("preserving workspace", "path", workspace, "success", success, "blocked", isBlocked) } }() // Resolve story branch and project local path if this is a story task. var storyBranch string var storyLocalPath string if t.StoryID != "" && r.Store != nil { if story, err := r.Store.GetStory(t.StoryID); err == nil && story != nil { storyBranch = story.BranchName if story.ProjectID != "" { if proj, err := r.Store.GetProject(story.ProjectID); err == nil && proj != nil { storyLocalPath = proj.LocalPath } } } } // Fall back to task-level BranchName (e.g. set explicitly by executor or tests). if storyBranch == "" { storyBranch = t.BranchName } // 2. Clone repo into workspace if not resuming. // git clone requires the target directory to not exist; remove the MkdirTemp-created dir first. if !isResume { if err := os.Remove(workspace); err != nil { return fmt.Errorf("removing workspace before clone: %w", err) } r.Logger.Info("cloning repository", "url", repoURL, "workspace", workspace) var cloneArgs []string if storyLocalPath != "" { cloneArgs = []string{"clone", "--reference", storyLocalPath, repoURL, workspace} } else { cloneArgs = []string{"clone", repoURL, workspace} } if out, err := r.command(ctx, "git", cloneArgs...).CombinedOutput(); err != nil { return fmt.Errorf("git clone failed: %w\n%s", err, string(out)) } if storyBranch != "" { r.Logger.Info("checking out story branch", "branch", storyBranch) if out, err := r.command(ctx, "git", "-C", workspace, "checkout", storyBranch).CombinedOutput(); err != nil { // Branch doesn't exist in the remote yet — create it from HEAD and push. r.Logger.Warn("story branch not found, creating from HEAD", "branch", storyBranch) if out2, err2 := r.command(ctx, "git", "-C", workspace, "checkout", "-b", storyBranch).CombinedOutput(); err2 != nil { return fmt.Errorf("git checkout story branch %q failed: %w\n%s\ncreate attempt: %s", storyBranch, err, string(out), string(out2)) } if out2, err2 := r.command(ctx, "git", "-C", workspace, "push", "origin", storyBranch).CombinedOutput(); err2 != nil { r.Logger.Warn("push of auto-created story branch failed", "branch", storyBranch, "error", err2, "output", string(out2)) } } } if err = os.Chmod(workspace, 0755); err != nil { return fmt.Errorf("chmod cloned workspace: %w", err) } } e.SandboxDir = workspace // Set up a writable $HOME staging dir so any agent tool (claude, gemini, etc.) // can freely create subdirs (session-env, .gemini, .cache, …) without hitting // a non-existent or read-only home. We copy only the claude credentials into it. agentHome := filepath.Join(workspace, ".agent-home") if err := os.MkdirAll(filepath.Join(agentHome, ".claude"), 0755); err != nil { return fmt.Errorf("creating agent home staging dir: %w", err) } if err := os.MkdirAll(filepath.Join(agentHome, ".gemini"), 0755); err != nil { return fmt.Errorf("creating .gemini dir: %w", err) } if r.ClaudeConfigDir != "" { // credentials if srcData, readErr := os.ReadFile(filepath.Join(r.ClaudeConfigDir, ".credentials.json")); readErr == nil { _ = os.WriteFile(filepath.Join(agentHome, ".claude", ".credentials.json"), srcData, 0600) } // settings (used by claude CLI; copy so it can write updates without hitting the host) if srcData, readErr := os.ReadFile(filepath.Join(r.ClaudeConfigDir, ".claude.json")); readErr == nil { _ = os.WriteFile(filepath.Join(agentHome, ".claude.json"), srcData, 0644) } } // Pre-flight: verify credentials were actually copied before spinning up a container. if r.ClaudeConfigDir != "" { credsPath := filepath.Join(agentHome, ".claude", ".credentials.json") settingsPath := filepath.Join(agentHome, ".claude.json") if _, err := os.Stat(credsPath); os.IsNotExist(err) { return fmt.Errorf("credentials not found at %s — run sync-credentials", r.ClaudeConfigDir) } if _, err := os.Stat(settingsPath); os.IsNotExist(err) { return fmt.Errorf("claude settings (.claude.json) not found at %s — run sync-credentials", r.ClaudeConfigDir) } } // Run container (with auth retry on failure). runErr := r.runContainer(ctx, t, e, workspace, agentHome, isResume, storyBranch) if runErr != nil && isAuthError(runErr) && r.CredentialSyncCmd != "" { r.Logger.Warn("auth failure detected, syncing credentials and retrying once", "taskID", t.ID) syncOut, syncErr := r.command(ctx, r.CredentialSyncCmd).CombinedOutput() if syncErr != nil { r.Logger.Warn("sync-credentials failed", "error", syncErr, "output", string(syncOut)) } // Re-copy credentials into agentHome with fresh files. if srcData, readErr := os.ReadFile(filepath.Join(r.ClaudeConfigDir, ".credentials.json")); readErr == nil { _ = os.WriteFile(filepath.Join(agentHome, ".claude", ".credentials.json"), srcData, 0600) } if srcData, readErr := os.ReadFile(filepath.Join(r.ClaudeConfigDir, ".claude.json")); readErr == nil { _ = os.WriteFile(filepath.Join(agentHome, ".claude.json"), srcData, 0644) } runErr = r.runContainer(ctx, t, e, workspace, agentHome, isResume, storyBranch) } if runErr == nil { success = true } var blockedErr *BlockedError if errors.As(runErr, &blockedErr) { isBlocked = true success = true // preserve workspace for resumption } return runErr } // runContainer runs the docker container for the given task and handles log setup, // environment files, instructions, and post-execution git operations. func (r *ContainerRunner) runContainer(ctx context.Context, t *task.Task, e *storage.Execution, workspace, agentHome string, isResume bool, storyBranch string) error { repoURL := t.RepositoryURL image := t.Agent.ContainerImage if image == "" { image = r.Image } if image == "" { image = "claudomator-agent:latest" } // 3. Prepare logs logDir := r.ExecLogDir(e.ID) if logDir == "" { logDir = filepath.Join(workspace, ".claudomator-logs") } if err := os.MkdirAll(logDir, 0700); err != nil { return fmt.Errorf("creating log dir: %w", err) } e.StdoutPath = filepath.Join(logDir, "stdout.log") e.StderrPath = filepath.Join(logDir, "stderr.log") e.ArtifactDir = logDir stdoutFile, err := os.Create(e.StdoutPath) if err != nil { return fmt.Errorf("creating stdout log: %w", err) } defer stdoutFile.Close() stderrFile, err := os.Create(e.StderrPath) if err != nil { return fmt.Errorf("creating stderr log: %w", err) } defer stderrFile.Close() // 4. Run container // Write API keys to a temporary env file to avoid exposure in 'ps' or 'docker inspect' envFile := filepath.Join(workspace, ".claudomator-env") envContent := fmt.Sprintf("ANTHROPIC_API_KEY=%s\nGOOGLE_API_KEY=%s\nGEMINI_API_KEY=%s\n", os.Getenv("ANTHROPIC_API_KEY"), os.Getenv("GOOGLE_API_KEY"), os.Getenv("GEMINI_API_KEY")) if err := os.WriteFile(envFile, []byte(envContent), 0600); err != nil { return fmt.Errorf("writing env file: %w", err) } // Inject custom instructions via file to avoid CLI length limits instructionsFile := filepath.Join(workspace, ".claudomator-instructions.txt") if err := os.WriteFile(instructionsFile, []byte(t.Agent.Instructions), 0644); err != nil { return fmt.Errorf("writing instructions: %w", err) } args := r.buildDockerArgs(workspace, agentHome, e.TaskID) innerCmd := r.buildInnerCmd(t, e, isResume) fullArgs := append(args, image) fullArgs = append(fullArgs, innerCmd...) r.Logger.Info("starting container", "image", image, "taskID", t.ID) cmd := r.command(ctx, "docker", fullArgs...) cmd.Stderr = stderrFile cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} // Use os.Pipe for stdout so we can parse it in real-time var stdoutR, stdoutW *os.File stdoutR, stdoutW, err = os.Pipe() if err != nil { return fmt.Errorf("creating stdout pipe: %w", err) } cmd.Stdout = stdoutW if err := cmd.Start(); err != nil { stdoutW.Close() stdoutR.Close() return fmt.Errorf("starting container: %w", err) } stdoutW.Close() // Watch for context cancellation to kill the process group (Issue 1) done := make(chan struct{}) defer close(done) go func() { select { case <-ctx.Done(): r.Logger.Info("killing container process group due to context cancellation", "taskID", t.ID) syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) case <-done: } }() // Stream stdout to the log file and parse cost/errors. var costUSD float64 var sessionID string var streamErr error var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() costUSD, sessionID, streamErr = parseStream(stdoutR, stdoutFile, r.Logger) stdoutR.Close() }() waitErr := cmd.Wait() wg.Wait() e.CostUSD = costUSD if sessionID != "" { e.SessionID = sessionID } // Check whether the agent left a question before exiting. questionFile := filepath.Join(logDir, "question.json") if data, readErr := os.ReadFile(questionFile); readErr == nil { os.Remove(questionFile) // consumed questionJSON := strings.TrimSpace(string(data)) if isCompletionReport(questionJSON) { r.Logger.Info("treating question file as completion report", "taskID", e.TaskID) e.Summary = extractQuestionText(questionJSON) } else { if e.SessionID == "" { r.Logger.Warn("missing session ID; resume will start fresh", "taskID", e.TaskID) } return &BlockedError{ QuestionJSON: questionJSON, SessionID: e.SessionID, SandboxDir: workspace, } } } // Read agent summary if written. summaryFile := filepath.Join(logDir, "summary.txt") if summaryData, readErr := os.ReadFile(summaryFile); readErr == nil { os.Remove(summaryFile) // consumed e.Summary = strings.TrimSpace(string(summaryData)) } // 5. Post-execution: push changes if successful if waitErr == nil && streamErr == nil { // Check if there are any commits to push (HEAD ahead of origin/HEAD). // If origin/HEAD doesn't exist (e.g. fresh clone with no commits), we attempt push anyway. hasCommits := true if out, err := r.command(ctx, "git", "-C", workspace, "rev-list", "origin/HEAD..HEAD").CombinedOutput(); err == nil { if len(strings.TrimSpace(string(out))) == 0 { hasCommits = false } } if hasCommits { pushRef := "HEAD" if storyBranch != "" { pushRef = storyBranch } r.Logger.Info("pushing changes back to remote", "url", repoURL, "ref", pushRef) if out, err := r.command(ctx, "git", "-C", workspace, "push", "origin", pushRef).CombinedOutput(); err != nil { r.Logger.Warn("git push failed", "error", err, "output", string(out)) return fmt.Errorf("git push failed: %w\n%s", err, string(out)) } } else { // No commits pushed — check whether the agent left uncommitted work behind. // If so, fail loudly: the work would be silently lost when the sandbox is deleted. if err := detectUncommittedChanges(workspace); err != nil { return err } r.Logger.Info("no new commits to push", "taskID", t.ID) } } if waitErr != nil { // Append the tail of stderr so error classifiers (isQuotaExhausted, isRateLimitError) // can inspect agent-specific messages (e.g. Gemini TerminalQuotaError). stderrTail := readFileTail(e.StderrPath, 4096) if stderrTail != "" { return fmt.Errorf("container execution failed: %w\n%s", waitErr, stderrTail) } return fmt.Errorf("container execution failed: %w", waitErr) } if streamErr != nil { return fmt.Errorf("stream parsing failed: %w", streamErr) } return nil } func (r *ContainerRunner) buildDockerArgs(workspace, claudeHome, taskID string) []string { // --env-file takes a HOST path. hostEnvFile := filepath.Join(workspace, ".claudomator-env") // Replace localhost with host.docker.internal so the container can reach the host API. apiURL := strings.ReplaceAll(r.APIURL, "localhost", "host.docker.internal") args := []string{ "run", "--rm", // Allow container to reach the host via host.docker.internal. "--add-host=host.docker.internal:host-gateway", // Run as the current process UID:GID so the container can read host-owned files. fmt.Sprintf("--user=%d:%d", os.Getuid(), os.Getgid()), "-v", workspace + ":/workspace", "-v", claudeHome + ":/home/agent", "-w", "/workspace", "--env-file", hostEnvFile, "-e", "HOME=/home/agent", "-e", "CLAUDOMATOR_API_URL=" + apiURL, "-e", "CLAUDOMATOR_TASK_ID=" + taskID, "-e", "CLAUDOMATOR_DROP_DIR=" + r.DropsDir, } if r.SSHAuthSock != "" { args = append(args, "-v", r.SSHAuthSock+":/tmp/ssh-auth.sock", "-e", "SSH_AUTH_SOCK=/tmp/ssh-auth.sock") } return args } func (r *ContainerRunner) buildInnerCmd(t *task.Task, e *storage.Execution, isResume bool) []string { // Claude CLI uses -p for prompt text. To pass a file, we use a shell to cat it. // We use a shell variable to capture the expansion to avoid quoting issues with instructions contents. // The outer single quotes around the sh -c argument prevent host-side expansion. claudeBin := r.ClaudeBinary if claudeBin == "" { claudeBin = "claude" } geminiBin := r.GeminiBinary if geminiBin == "" { geminiBin = "gemini" } if t.Agent.Type == "gemini" { return []string{"sh", "-c", fmt.Sprintf("INST=$(cat /workspace/.claudomator-instructions.txt); %s -p \"$INST\"", geminiBin)} } // Claude var claudeCmd strings.Builder claudeCmd.WriteString(fmt.Sprintf("INST=$(cat /workspace/.claudomator-instructions.txt); %s -p \"$INST\"", claudeBin)) if isResume && e.ResumeSessionID != "" { claudeCmd.WriteString(fmt.Sprintf(" --resume %s", e.ResumeSessionID)) } claudeCmd.WriteString(" --output-format stream-json --verbose --permission-mode bypassPermissions") return []string{"sh", "-c", claudeCmd.String()} } // scaffoldPrefixes are files/dirs written by the harness into the workspace before the agent // runs. They are not part of the repo and must not trigger the uncommitted-changes check. var scaffoldPrefixes = []string{ ".claudomator-env", ".claudomator-instructions.txt", ".agent-home", } func isScaffold(path string) bool { for _, p := range scaffoldPrefixes { if path == p || strings.HasPrefix(path, p+"/") { return true } } return false } // detectUncommittedChanges returns an error if the workspace contains modified or // untracked source files that the agent forgot to commit. Scaffold files written by // the harness (.claudomator-env, .claudomator-instructions.txt, .agent-home/) are // excluded from the check. func detectUncommittedChanges(workspace string) error { // Modified or staged tracked files diffOut, err := exec.Command("git", "-c", "safe.directory=*", "-C", workspace, "diff", "--name-only", "HEAD").CombinedOutput() if err == nil { for _, line := range strings.Split(strings.TrimSpace(string(diffOut)), "\n") { if line != "" && !isScaffold(line) { return fmt.Errorf("agent left uncommitted changes (work would be lost on sandbox deletion):\n%s\nInstructions must include: git add -A && git commit && git push origin main", strings.TrimSpace(string(diffOut))) } } } // Untracked new source files (excludes gitignored files) lsOut, err := exec.Command("git", "-c", "safe.directory=*", "-C", workspace, "ls-files", "--others", "--exclude-standard").CombinedOutput() if err == nil { var dirty []string for _, line := range strings.Split(strings.TrimSpace(string(lsOut)), "\n") { if line != "" && !isScaffold(line) { dirty = append(dirty, line) } } if len(dirty) > 0 { return fmt.Errorf("agent left untracked files not committed (work would be lost on sandbox deletion):\n%s\nInstructions must include: git add -A && git commit && git push origin main", strings.Join(dirty, "\n")) } } return nil }