diff options
Diffstat (limited to 'internal/executor/container.go')
| -rw-r--r-- | internal/executor/container.go | 549 |
1 files changed, 549 insertions, 0 deletions
diff --git a/internal/executor/container.go b/internal/executor/container.go new file mode 100644 index 0000000..61ac29c --- /dev/null +++ b/internal/executor/container.go @@ -0,0 +1,549 @@ +package executor + +import ( + "context" + "errors" + "fmt" + "log/slog" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" + "syscall" + + "github.com/thepeterstone/claudomator/internal/storage" + "github.com/thepeterstone/claudomator/internal/task" +) + +// ContainerRunner executes an agent inside a container. +type ContainerRunner struct { + Image string // default image if not specified in task + Logger *slog.Logger + LogDir string + APIURL string + DropsDir string + SSHAuthSock string // optional path to host SSH agent + ClaudeBinary string // optional path to claude binary in container + GeminiBinary string // optional path to gemini binary in container + ClaudeConfigDir string // host path to ~/.claude; mounted into container for auth credentials + CredentialSyncCmd string // optional path to sync-credentials script for auth-error auto-recovery + Store Store // optional; used to look up stories and projects for story-aware cloning + // Command allows mocking exec.CommandContext for tests. + Command func(ctx context.Context, name string, arg ...string) *exec.Cmd +} + +func isAuthError(err error) bool { + if err == nil { + return false + } + s := err.Error() + return strings.Contains(s, "Not logged in") || + strings.Contains(s, "OAuth token has expired") || + strings.Contains(s, "authentication_error") || + strings.Contains(s, "Please run /login") +} + +func (r *ContainerRunner) command(ctx context.Context, name string, arg ...string) *exec.Cmd { + if r.Command != nil { + return r.Command(ctx, name, arg...) + } + return exec.CommandContext(ctx, name, arg...) +} + +func (r *ContainerRunner) ExecLogDir(execID string) string { + if r.LogDir == "" { + return "" + } + return filepath.Join(r.LogDir, execID) +} + +// ensureStoryBranch checks whether branchName exists in remoteURL and creates +// it from main if not. Uses localPath as a reference clone for speed if set. +func (r *ContainerRunner) ensureStoryBranch(ctx context.Context, remoteURL, branchName, localPath string) error { + // Check if branch already exists. + out, err := r.command(ctx, "git", "ls-remote", "--heads", remoteURL, branchName).CombinedOutput() + if err == nil && len(strings.TrimSpace(string(out))) > 0 { + return nil // already exists + } + + r.Logger.Info("story branch missing, creating from main", "branch", branchName, "remote", remoteURL) + + // Clone into a temp dir so we can create the branch. + tmp, err := os.MkdirTemp("", "claudomator-branchsetup-*") + if err != nil { + return fmt.Errorf("mktemp for branch setup: %w", err) + } + defer os.RemoveAll(tmp) + + // Remove the dir git clone expects to create. + if err := os.Remove(tmp); err != nil { + return fmt.Errorf("removing tmp dir before clone: %w", err) + } + + var cloneArgs []string + if localPath != "" { + cloneArgs = []string{"clone", "--reference", localPath, remoteURL, tmp} + } else { + cloneArgs = []string{"clone", remoteURL, tmp} + } + if out, err := r.command(ctx, "git", cloneArgs...).CombinedOutput(); err != nil { + return fmt.Errorf("git clone for branch setup: %w\n%s", err, string(out)) + } + if out, err := r.command(ctx, "git", "-C", tmp, "checkout", "-b", branchName).CombinedOutput(); err != nil { + return fmt.Errorf("git checkout -b %q: %w\n%s", branchName, err, string(out)) + } + if out, err := r.command(ctx, "git", "-C", tmp, "push", "origin", branchName).CombinedOutput(); err != nil { + return fmt.Errorf("git push %q: %w\n%s", branchName, err, string(out)) + } + r.Logger.Info("story branch created and pushed", "branch", branchName) + return nil +} + +func (r *ContainerRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error { + var err error + repoURL := t.RepositoryURL + if repoURL == "" { + return fmt.Errorf("task %s has no repository_url", t.ID) + } + + // 1. Setup workspace on host + var workspace string + isResume := false + if e.SandboxDir != "" { + if _, err = os.Stat(e.SandboxDir); err == nil { + workspace = e.SandboxDir + isResume = true + r.Logger.Info("resuming in preserved workspace", "path", workspace) + } + } + + if workspace == "" { + workspace, err = os.MkdirTemp("", "claudomator-workspace-*") + if err != nil { + return fmt.Errorf("creating workspace: %w", err) + } + // chmod applied after clone; see step 2. + } + + // Note: workspace is only removed on success. On failure, it's preserved for debugging. + // If the task becomes BLOCKED, it's also preserved for resumption. + success := false + isBlocked := false + defer func() { + if success && !isBlocked { + os.RemoveAll(workspace) + } else { + r.Logger.Warn("preserving workspace", "path", workspace, "success", success, "blocked", isBlocked) + } + }() + + // Resolve story branch and project local path if this is a story task. + var storyBranch string + var storyLocalPath string + if t.StoryID != "" && r.Store != nil { + if story, err := r.Store.GetStory(t.StoryID); err == nil && story != nil { + storyBranch = story.BranchName + if story.ProjectID != "" { + if proj, err := r.Store.GetProject(story.ProjectID); err == nil && proj != nil { + storyLocalPath = proj.LocalPath + } + } + } + } + // Fall back to task-level BranchName (e.g. set explicitly by executor or tests). + if storyBranch == "" { + storyBranch = t.BranchName + } + + // 2. Ensure story branch exists in the remote before cloning. + // If the branch is missing (e.g. story approved before fix, or branch push failed), + // create it from main using the project local path as a reference repo. + if storyBranch != "" && !isResume { + if err := r.ensureStoryBranch(ctx, repoURL, storyBranch, storyLocalPath); err != nil { + r.Logger.Warn("ensureStoryBranch failed (will attempt checkout anyway)", "branch", storyBranch, "error", err) + } + } + + // 3. Clone repo into workspace if not resuming. + // git clone requires the target directory to not exist; remove the MkdirTemp-created dir first. + if !isResume { + if err := os.Remove(workspace); err != nil { + return fmt.Errorf("removing workspace before clone: %w", err) + } + r.Logger.Info("cloning repository", "url", repoURL, "workspace", workspace) + var cloneArgs []string + if storyLocalPath != "" { + cloneArgs = []string{"clone", "--reference", storyLocalPath, repoURL, workspace} + } else { + cloneArgs = []string{"clone", repoURL, workspace} + } + if out, err := r.command(ctx, "git", cloneArgs...).CombinedOutput(); err != nil { + return fmt.Errorf("git clone failed: %w\n%s", err, string(out)) + } + if storyBranch != "" { + r.Logger.Info("checking out story branch", "branch", storyBranch) + if out, err := r.command(ctx, "git", "-C", workspace, "checkout", storyBranch).CombinedOutput(); err != nil { + return fmt.Errorf("git checkout story branch %q failed: %w\n%s", storyBranch, err, string(out)) + } + } + if err = os.Chmod(workspace, 0755); err != nil { + return fmt.Errorf("chmod cloned workspace: %w", err) + } + } + e.SandboxDir = workspace + + // Set up a writable $HOME staging dir so any agent tool (claude, gemini, etc.) + // can freely create subdirs (session-env, .gemini, .cache, …) without hitting + // a non-existent or read-only home. We copy only the claude credentials into it. + agentHome := filepath.Join(workspace, ".agent-home") + if err := os.MkdirAll(filepath.Join(agentHome, ".claude"), 0755); err != nil { + return fmt.Errorf("creating agent home staging dir: %w", err) + } + if err := os.MkdirAll(filepath.Join(agentHome, ".gemini"), 0755); err != nil { + return fmt.Errorf("creating .gemini dir: %w", err) + } + if r.ClaudeConfigDir != "" { + // credentials + if srcData, readErr := os.ReadFile(filepath.Join(r.ClaudeConfigDir, ".credentials.json")); readErr == nil { + _ = os.WriteFile(filepath.Join(agentHome, ".claude", ".credentials.json"), srcData, 0600) + } + // settings (used by claude CLI; copy so it can write updates without hitting the host) + if srcData, readErr := os.ReadFile(filepath.Join(r.ClaudeConfigDir, ".claude.json")); readErr == nil { + _ = os.WriteFile(filepath.Join(agentHome, ".claude.json"), srcData, 0644) + } + } + + // Pre-flight: verify credentials were actually copied before spinning up a container. + if r.ClaudeConfigDir != "" { + credsPath := filepath.Join(agentHome, ".claude", ".credentials.json") + settingsPath := filepath.Join(agentHome, ".claude.json") + if _, err := os.Stat(credsPath); os.IsNotExist(err) { + return fmt.Errorf("credentials not found at %s — run sync-credentials", r.ClaudeConfigDir) + } + if _, err := os.Stat(settingsPath); os.IsNotExist(err) { + return fmt.Errorf("claude settings (.claude.json) not found at %s — run sync-credentials", r.ClaudeConfigDir) + } + } + + // Run container (with auth retry on failure). + runErr := r.runContainer(ctx, t, e, workspace, agentHome, isResume, storyBranch) + if runErr != nil && isAuthError(runErr) && r.CredentialSyncCmd != "" { + r.Logger.Warn("auth failure detected, syncing credentials and retrying once", "taskID", t.ID) + syncOut, syncErr := r.command(ctx, r.CredentialSyncCmd).CombinedOutput() + if syncErr != nil { + r.Logger.Warn("sync-credentials failed", "error", syncErr, "output", string(syncOut)) + } + // Re-copy credentials into agentHome with fresh files. + if srcData, readErr := os.ReadFile(filepath.Join(r.ClaudeConfigDir, ".credentials.json")); readErr == nil { + _ = os.WriteFile(filepath.Join(agentHome, ".claude", ".credentials.json"), srcData, 0600) + } + if srcData, readErr := os.ReadFile(filepath.Join(r.ClaudeConfigDir, ".claude.json")); readErr == nil { + _ = os.WriteFile(filepath.Join(agentHome, ".claude.json"), srcData, 0644) + } + runErr = r.runContainer(ctx, t, e, workspace, agentHome, isResume, storyBranch) + } + + if runErr == nil { + success = true + } + var blockedErr *BlockedError + if errors.As(runErr, &blockedErr) { + isBlocked = true + success = true // preserve workspace for resumption + } + return runErr +} + +// runContainer runs the docker container for the given task and handles log setup, +// environment files, instructions, and post-execution git operations. +func (r *ContainerRunner) runContainer(ctx context.Context, t *task.Task, e *storage.Execution, workspace, agentHome string, isResume bool, storyBranch string) error { + repoURL := t.RepositoryURL + + image := t.Agent.ContainerImage + if image == "" { + image = r.Image + } + if image == "" { + image = "claudomator-agent:latest" + } + + // 3. Prepare logs + logDir := r.ExecLogDir(e.ID) + if logDir == "" { + logDir = filepath.Join(workspace, ".claudomator-logs") + } + if err := os.MkdirAll(logDir, 0700); err != nil { + return fmt.Errorf("creating log dir: %w", err) + } + e.StdoutPath = filepath.Join(logDir, "stdout.log") + e.StderrPath = filepath.Join(logDir, "stderr.log") + e.ArtifactDir = logDir + + stdoutFile, err := os.Create(e.StdoutPath) + if err != nil { + return fmt.Errorf("creating stdout log: %w", err) + } + defer stdoutFile.Close() + + stderrFile, err := os.Create(e.StderrPath) + if err != nil { + return fmt.Errorf("creating stderr log: %w", err) + } + defer stderrFile.Close() + + // 4. Run container + + // Write API keys to a temporary env file to avoid exposure in 'ps' or 'docker inspect' + envFile := filepath.Join(workspace, ".claudomator-env") + envContent := fmt.Sprintf("ANTHROPIC_API_KEY=%s\nGOOGLE_API_KEY=%s\nGEMINI_API_KEY=%s\n", os.Getenv("ANTHROPIC_API_KEY"), os.Getenv("GOOGLE_API_KEY"), os.Getenv("GEMINI_API_KEY")) + if err := os.WriteFile(envFile, []byte(envContent), 0600); err != nil { + return fmt.Errorf("writing env file: %w", err) + } + + // Inject custom instructions via file to avoid CLI length limits + instructionsFile := filepath.Join(workspace, ".claudomator-instructions.txt") + if err := os.WriteFile(instructionsFile, []byte(t.Agent.Instructions), 0644); err != nil { + return fmt.Errorf("writing instructions: %w", err) + } + + args := r.buildDockerArgs(workspace, agentHome, e.TaskID) + innerCmd := r.buildInnerCmd(t, e, isResume) + + fullArgs := append(args, image) + fullArgs = append(fullArgs, innerCmd...) + + r.Logger.Info("starting container", "image", image, "taskID", t.ID) + cmd := r.command(ctx, "docker", fullArgs...) + cmd.Stderr = stderrFile + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + + // Use os.Pipe for stdout so we can parse it in real-time + var stdoutR, stdoutW *os.File + stdoutR, stdoutW, err = os.Pipe() + if err != nil { + return fmt.Errorf("creating stdout pipe: %w", err) + } + cmd.Stdout = stdoutW + + if err := cmd.Start(); err != nil { + stdoutW.Close() + stdoutR.Close() + return fmt.Errorf("starting container: %w", err) + } + stdoutW.Close() + + // Watch for context cancellation to kill the process group (Issue 1) + done := make(chan struct{}) + defer close(done) + go func() { + select { + case <-ctx.Done(): + r.Logger.Info("killing container process group due to context cancellation", "taskID", t.ID) + syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) + case <-done: + } + }() + + // Stream stdout to the log file and parse cost/errors. + var costUSD float64 + var sessionID string + var streamErr error + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + costUSD, sessionID, streamErr = parseStream(stdoutR, stdoutFile, r.Logger) + stdoutR.Close() + }() + + waitErr := cmd.Wait() + wg.Wait() + + e.CostUSD = costUSD + if sessionID != "" { + e.SessionID = sessionID + } + + // Check whether the agent left a question before exiting. + questionFile := filepath.Join(logDir, "question.json") + if data, readErr := os.ReadFile(questionFile); readErr == nil { + os.Remove(questionFile) // consumed + questionJSON := strings.TrimSpace(string(data)) + if isCompletionReport(questionJSON) { + r.Logger.Info("treating question file as completion report", "taskID", e.TaskID) + e.Summary = extractQuestionText(questionJSON) + } else { + if e.SessionID == "" { + r.Logger.Warn("missing session ID; resume will start fresh", "taskID", e.TaskID) + } + return &BlockedError{ + QuestionJSON: questionJSON, + SessionID: e.SessionID, + SandboxDir: workspace, + } + } + } + + // Read agent summary if written. + summaryFile := filepath.Join(logDir, "summary.txt") + if summaryData, readErr := os.ReadFile(summaryFile); readErr == nil { + os.Remove(summaryFile) // consumed + e.Summary = strings.TrimSpace(string(summaryData)) + } + + // 5. Post-execution: push changes if successful + if waitErr == nil && streamErr == nil { + // Check if there are any commits to push (HEAD ahead of origin/HEAD). + // If origin/HEAD doesn't exist (e.g. fresh clone with no commits), we attempt push anyway. + hasCommits := true + if out, err := r.command(ctx, "git", "-C", workspace, "rev-list", "origin/HEAD..HEAD").CombinedOutput(); err == nil { + if len(strings.TrimSpace(string(out))) == 0 { + hasCommits = false + } + } + + if hasCommits { + pushRef := "HEAD" + if storyBranch != "" { + pushRef = storyBranch + } + r.Logger.Info("pushing changes back to remote", "url", repoURL, "ref", pushRef) + if out, err := r.command(ctx, "git", "-C", workspace, "push", "origin", pushRef).CombinedOutput(); err != nil { + r.Logger.Warn("git push failed", "error", err, "output", string(out)) + return fmt.Errorf("git push failed: %w\n%s", err, string(out)) + } + } else { + // No commits pushed — check whether the agent left uncommitted work behind. + // If so, fail loudly: the work would be silently lost when the sandbox is deleted. + if err := detectUncommittedChanges(workspace); err != nil { + return err + } + r.Logger.Info("no new commits to push", "taskID", t.ID) + } + } + + if waitErr != nil { + // Append the tail of stderr so error classifiers (isQuotaExhausted, isRateLimitError) + // can inspect agent-specific messages (e.g. Gemini TerminalQuotaError). + stderrTail := readFileTail(e.StderrPath, 4096) + if stderrTail != "" { + return fmt.Errorf("container execution failed: %w\n%s", waitErr, stderrTail) + } + return fmt.Errorf("container execution failed: %w", waitErr) + } + if streamErr != nil { + return fmt.Errorf("stream parsing failed: %w", streamErr) + } + + return nil +} + +func (r *ContainerRunner) buildDockerArgs(workspace, claudeHome, taskID string) []string { + // --env-file takes a HOST path. + hostEnvFile := filepath.Join(workspace, ".claudomator-env") + + // Replace localhost with host.docker.internal so the container can reach the host API. + apiURL := strings.ReplaceAll(r.APIURL, "localhost", "host.docker.internal") + + args := []string{ + "run", "--rm", + // Allow container to reach the host via host.docker.internal. + "--add-host=host.docker.internal:host-gateway", + // Run as the current process UID:GID so the container can read host-owned files. + fmt.Sprintf("--user=%d:%d", os.Getuid(), os.Getgid()), + "-v", workspace + ":/workspace", + "-v", claudeHome + ":/home/agent", + "-w", "/workspace", + "--env-file", hostEnvFile, + "-e", "HOME=/home/agent", + "-e", "CLAUDOMATOR_API_URL=" + apiURL, + "-e", "CLAUDOMATOR_TASK_ID=" + taskID, + "-e", "CLAUDOMATOR_DROP_DIR=" + r.DropsDir, + } + if r.SSHAuthSock != "" { + args = append(args, "-v", r.SSHAuthSock+":/tmp/ssh-auth.sock", "-e", "SSH_AUTH_SOCK=/tmp/ssh-auth.sock") + } + return args +} + +func (r *ContainerRunner) buildInnerCmd(t *task.Task, e *storage.Execution, isResume bool) []string { + // Claude CLI uses -p for prompt text. To pass a file, we use a shell to cat it. + // We use a shell variable to capture the expansion to avoid quoting issues with instructions contents. + // The outer single quotes around the sh -c argument prevent host-side expansion. + + claudeBin := r.ClaudeBinary + if claudeBin == "" { + claudeBin = "claude" + } + geminiBin := r.GeminiBinary + if geminiBin == "" { + geminiBin = "gemini" + } + + if t.Agent.Type == "gemini" { + return []string{"sh", "-c", fmt.Sprintf("INST=$(cat /workspace/.claudomator-instructions.txt); %s -p \"$INST\"", geminiBin)} + } + + // Claude + var claudeCmd strings.Builder + claudeCmd.WriteString(fmt.Sprintf("INST=$(cat /workspace/.claudomator-instructions.txt); %s -p \"$INST\"", claudeBin)) + if isResume && e.ResumeSessionID != "" { + claudeCmd.WriteString(fmt.Sprintf(" --resume %s", e.ResumeSessionID)) + } + claudeCmd.WriteString(" --output-format stream-json --verbose --permission-mode bypassPermissions") + + return []string{"sh", "-c", claudeCmd.String()} +} + +// scaffoldPrefixes are files/dirs written by the harness into the workspace before the agent +// runs. They are not part of the repo and must not trigger the uncommitted-changes check. +var scaffoldPrefixes = []string{ + ".claudomator-env", + ".claudomator-instructions.txt", + ".agent-home", +} + +func isScaffold(path string) bool { + for _, p := range scaffoldPrefixes { + if path == p || strings.HasPrefix(path, p+"/") { + return true + } + } + return false +} + +// detectUncommittedChanges returns an error if the workspace contains modified or +// untracked source files that the agent forgot to commit. Scaffold files written by +// the harness (.claudomator-env, .claudomator-instructions.txt, .agent-home/) are +// excluded from the check. +func detectUncommittedChanges(workspace string) error { + // Modified or staged tracked files + diffOut, err := exec.Command("git", "-c", "safe.directory=*", "-C", workspace, + "diff", "--name-only", "HEAD").CombinedOutput() + if err == nil { + for _, line := range strings.Split(strings.TrimSpace(string(diffOut)), "\n") { + if line != "" && !isScaffold(line) { + return fmt.Errorf("agent left uncommitted changes (work would be lost on sandbox deletion):\n%s\nInstructions must include: git add -A && git commit && git push origin main", strings.TrimSpace(string(diffOut))) + } + } + } + + // Untracked new source files (excludes gitignored files) + lsOut, err := exec.Command("git", "-c", "safe.directory=*", "-C", workspace, + "ls-files", "--others", "--exclude-standard").CombinedOutput() + if err == nil { + var dirty []string + for _, line := range strings.Split(strings.TrimSpace(string(lsOut)), "\n") { + if line != "" && !isScaffold(line) { + dirty = append(dirty, line) + } + } + if len(dirty) > 0 { + return fmt.Errorf("agent left untracked files not committed (work would be lost on sandbox deletion):\n%s\nInstructions must include: git add -A && git commit && git push origin main", strings.Join(dirty, "\n")) + } + } + + return nil +} + |
