summaryrefslogtreecommitdiff
path: root/internal/executor/container.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/executor/container.go')
-rw-r--r--internal/executor/container.go549
1 files changed, 549 insertions, 0 deletions
diff --git a/internal/executor/container.go b/internal/executor/container.go
new file mode 100644
index 0000000..61ac29c
--- /dev/null
+++ b/internal/executor/container.go
@@ -0,0 +1,549 @@
+package executor
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "log/slog"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "strings"
+ "sync"
+ "syscall"
+
+ "github.com/thepeterstone/claudomator/internal/storage"
+ "github.com/thepeterstone/claudomator/internal/task"
+)
+
+// ContainerRunner executes an agent inside a container.
+type ContainerRunner struct {
+ Image string // default image if not specified in task
+ Logger *slog.Logger
+ LogDir string
+ APIURL string
+ DropsDir string
+ SSHAuthSock string // optional path to host SSH agent
+ ClaudeBinary string // optional path to claude binary in container
+ GeminiBinary string // optional path to gemini binary in container
+ ClaudeConfigDir string // host path to ~/.claude; mounted into container for auth credentials
+ CredentialSyncCmd string // optional path to sync-credentials script for auth-error auto-recovery
+ Store Store // optional; used to look up stories and projects for story-aware cloning
+ // Command allows mocking exec.CommandContext for tests.
+ Command func(ctx context.Context, name string, arg ...string) *exec.Cmd
+}
+
+func isAuthError(err error) bool {
+ if err == nil {
+ return false
+ }
+ s := err.Error()
+ return strings.Contains(s, "Not logged in") ||
+ strings.Contains(s, "OAuth token has expired") ||
+ strings.Contains(s, "authentication_error") ||
+ strings.Contains(s, "Please run /login")
+}
+
+func (r *ContainerRunner) command(ctx context.Context, name string, arg ...string) *exec.Cmd {
+ if r.Command != nil {
+ return r.Command(ctx, name, arg...)
+ }
+ return exec.CommandContext(ctx, name, arg...)
+}
+
+func (r *ContainerRunner) ExecLogDir(execID string) string {
+ if r.LogDir == "" {
+ return ""
+ }
+ return filepath.Join(r.LogDir, execID)
+}
+
+// ensureStoryBranch checks whether branchName exists in remoteURL and creates
+// it from main if not. Uses localPath as a reference clone for speed if set.
+func (r *ContainerRunner) ensureStoryBranch(ctx context.Context, remoteURL, branchName, localPath string) error {
+ // Check if branch already exists.
+ out, err := r.command(ctx, "git", "ls-remote", "--heads", remoteURL, branchName).CombinedOutput()
+ if err == nil && len(strings.TrimSpace(string(out))) > 0 {
+ return nil // already exists
+ }
+
+ r.Logger.Info("story branch missing, creating from main", "branch", branchName, "remote", remoteURL)
+
+ // Clone into a temp dir so we can create the branch.
+ tmp, err := os.MkdirTemp("", "claudomator-branchsetup-*")
+ if err != nil {
+ return fmt.Errorf("mktemp for branch setup: %w", err)
+ }
+ defer os.RemoveAll(tmp)
+
+ // Remove the dir git clone expects to create.
+ if err := os.Remove(tmp); err != nil {
+ return fmt.Errorf("removing tmp dir before clone: %w", err)
+ }
+
+ var cloneArgs []string
+ if localPath != "" {
+ cloneArgs = []string{"clone", "--reference", localPath, remoteURL, tmp}
+ } else {
+ cloneArgs = []string{"clone", remoteURL, tmp}
+ }
+ if out, err := r.command(ctx, "git", cloneArgs...).CombinedOutput(); err != nil {
+ return fmt.Errorf("git clone for branch setup: %w\n%s", err, string(out))
+ }
+ if out, err := r.command(ctx, "git", "-C", tmp, "checkout", "-b", branchName).CombinedOutput(); err != nil {
+ return fmt.Errorf("git checkout -b %q: %w\n%s", branchName, err, string(out))
+ }
+ if out, err := r.command(ctx, "git", "-C", tmp, "push", "origin", branchName).CombinedOutput(); err != nil {
+ return fmt.Errorf("git push %q: %w\n%s", branchName, err, string(out))
+ }
+ r.Logger.Info("story branch created and pushed", "branch", branchName)
+ return nil
+}
+
+func (r *ContainerRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error {
+ var err error
+ repoURL := t.RepositoryURL
+ if repoURL == "" {
+ return fmt.Errorf("task %s has no repository_url", t.ID)
+ }
+
+ // 1. Setup workspace on host
+ var workspace string
+ isResume := false
+ if e.SandboxDir != "" {
+ if _, err = os.Stat(e.SandboxDir); err == nil {
+ workspace = e.SandboxDir
+ isResume = true
+ r.Logger.Info("resuming in preserved workspace", "path", workspace)
+ }
+ }
+
+ if workspace == "" {
+ workspace, err = os.MkdirTemp("", "claudomator-workspace-*")
+ if err != nil {
+ return fmt.Errorf("creating workspace: %w", err)
+ }
+ // chmod applied after clone; see step 2.
+ }
+
+ // Note: workspace is only removed on success. On failure, it's preserved for debugging.
+ // If the task becomes BLOCKED, it's also preserved for resumption.
+ success := false
+ isBlocked := false
+ defer func() {
+ if success && !isBlocked {
+ os.RemoveAll(workspace)
+ } else {
+ r.Logger.Warn("preserving workspace", "path", workspace, "success", success, "blocked", isBlocked)
+ }
+ }()
+
+ // Resolve story branch and project local path if this is a story task.
+ var storyBranch string
+ var storyLocalPath string
+ if t.StoryID != "" && r.Store != nil {
+ if story, err := r.Store.GetStory(t.StoryID); err == nil && story != nil {
+ storyBranch = story.BranchName
+ if story.ProjectID != "" {
+ if proj, err := r.Store.GetProject(story.ProjectID); err == nil && proj != nil {
+ storyLocalPath = proj.LocalPath
+ }
+ }
+ }
+ }
+ // Fall back to task-level BranchName (e.g. set explicitly by executor or tests).
+ if storyBranch == "" {
+ storyBranch = t.BranchName
+ }
+
+ // 2. Ensure story branch exists in the remote before cloning.
+ // If the branch is missing (e.g. story approved before fix, or branch push failed),
+ // create it from main using the project local path as a reference repo.
+ if storyBranch != "" && !isResume {
+ if err := r.ensureStoryBranch(ctx, repoURL, storyBranch, storyLocalPath); err != nil {
+ r.Logger.Warn("ensureStoryBranch failed (will attempt checkout anyway)", "branch", storyBranch, "error", err)
+ }
+ }
+
+ // 3. Clone repo into workspace if not resuming.
+ // git clone requires the target directory to not exist; remove the MkdirTemp-created dir first.
+ if !isResume {
+ if err := os.Remove(workspace); err != nil {
+ return fmt.Errorf("removing workspace before clone: %w", err)
+ }
+ r.Logger.Info("cloning repository", "url", repoURL, "workspace", workspace)
+ var cloneArgs []string
+ if storyLocalPath != "" {
+ cloneArgs = []string{"clone", "--reference", storyLocalPath, repoURL, workspace}
+ } else {
+ cloneArgs = []string{"clone", repoURL, workspace}
+ }
+ if out, err := r.command(ctx, "git", cloneArgs...).CombinedOutput(); err != nil {
+ return fmt.Errorf("git clone failed: %w\n%s", err, string(out))
+ }
+ if storyBranch != "" {
+ r.Logger.Info("checking out story branch", "branch", storyBranch)
+ if out, err := r.command(ctx, "git", "-C", workspace, "checkout", storyBranch).CombinedOutput(); err != nil {
+ return fmt.Errorf("git checkout story branch %q failed: %w\n%s", storyBranch, err, string(out))
+ }
+ }
+ if err = os.Chmod(workspace, 0755); err != nil {
+ return fmt.Errorf("chmod cloned workspace: %w", err)
+ }
+ }
+ e.SandboxDir = workspace
+
+ // Set up a writable $HOME staging dir so any agent tool (claude, gemini, etc.)
+ // can freely create subdirs (session-env, .gemini, .cache, …) without hitting
+ // a non-existent or read-only home. We copy only the claude credentials into it.
+ agentHome := filepath.Join(workspace, ".agent-home")
+ if err := os.MkdirAll(filepath.Join(agentHome, ".claude"), 0755); err != nil {
+ return fmt.Errorf("creating agent home staging dir: %w", err)
+ }
+ if err := os.MkdirAll(filepath.Join(agentHome, ".gemini"), 0755); err != nil {
+ return fmt.Errorf("creating .gemini dir: %w", err)
+ }
+ if r.ClaudeConfigDir != "" {
+ // credentials
+ if srcData, readErr := os.ReadFile(filepath.Join(r.ClaudeConfigDir, ".credentials.json")); readErr == nil {
+ _ = os.WriteFile(filepath.Join(agentHome, ".claude", ".credentials.json"), srcData, 0600)
+ }
+ // settings (used by claude CLI; copy so it can write updates without hitting the host)
+ if srcData, readErr := os.ReadFile(filepath.Join(r.ClaudeConfigDir, ".claude.json")); readErr == nil {
+ _ = os.WriteFile(filepath.Join(agentHome, ".claude.json"), srcData, 0644)
+ }
+ }
+
+ // Pre-flight: verify credentials were actually copied before spinning up a container.
+ if r.ClaudeConfigDir != "" {
+ credsPath := filepath.Join(agentHome, ".claude", ".credentials.json")
+ settingsPath := filepath.Join(agentHome, ".claude.json")
+ if _, err := os.Stat(credsPath); os.IsNotExist(err) {
+ return fmt.Errorf("credentials not found at %s — run sync-credentials", r.ClaudeConfigDir)
+ }
+ if _, err := os.Stat(settingsPath); os.IsNotExist(err) {
+ return fmt.Errorf("claude settings (.claude.json) not found at %s — run sync-credentials", r.ClaudeConfigDir)
+ }
+ }
+
+ // Run container (with auth retry on failure).
+ runErr := r.runContainer(ctx, t, e, workspace, agentHome, isResume, storyBranch)
+ if runErr != nil && isAuthError(runErr) && r.CredentialSyncCmd != "" {
+ r.Logger.Warn("auth failure detected, syncing credentials and retrying once", "taskID", t.ID)
+ syncOut, syncErr := r.command(ctx, r.CredentialSyncCmd).CombinedOutput()
+ if syncErr != nil {
+ r.Logger.Warn("sync-credentials failed", "error", syncErr, "output", string(syncOut))
+ }
+ // Re-copy credentials into agentHome with fresh files.
+ if srcData, readErr := os.ReadFile(filepath.Join(r.ClaudeConfigDir, ".credentials.json")); readErr == nil {
+ _ = os.WriteFile(filepath.Join(agentHome, ".claude", ".credentials.json"), srcData, 0600)
+ }
+ if srcData, readErr := os.ReadFile(filepath.Join(r.ClaudeConfigDir, ".claude.json")); readErr == nil {
+ _ = os.WriteFile(filepath.Join(agentHome, ".claude.json"), srcData, 0644)
+ }
+ runErr = r.runContainer(ctx, t, e, workspace, agentHome, isResume, storyBranch)
+ }
+
+ if runErr == nil {
+ success = true
+ }
+ var blockedErr *BlockedError
+ if errors.As(runErr, &blockedErr) {
+ isBlocked = true
+ success = true // preserve workspace for resumption
+ }
+ return runErr
+}
+
+// runContainer runs the docker container for the given task and handles log setup,
+// environment files, instructions, and post-execution git operations.
+func (r *ContainerRunner) runContainer(ctx context.Context, t *task.Task, e *storage.Execution, workspace, agentHome string, isResume bool, storyBranch string) error {
+ repoURL := t.RepositoryURL
+
+ image := t.Agent.ContainerImage
+ if image == "" {
+ image = r.Image
+ }
+ if image == "" {
+ image = "claudomator-agent:latest"
+ }
+
+ // 3. Prepare logs
+ logDir := r.ExecLogDir(e.ID)
+ if logDir == "" {
+ logDir = filepath.Join(workspace, ".claudomator-logs")
+ }
+ if err := os.MkdirAll(logDir, 0700); err != nil {
+ return fmt.Errorf("creating log dir: %w", err)
+ }
+ e.StdoutPath = filepath.Join(logDir, "stdout.log")
+ e.StderrPath = filepath.Join(logDir, "stderr.log")
+ e.ArtifactDir = logDir
+
+ stdoutFile, err := os.Create(e.StdoutPath)
+ if err != nil {
+ return fmt.Errorf("creating stdout log: %w", err)
+ }
+ defer stdoutFile.Close()
+
+ stderrFile, err := os.Create(e.StderrPath)
+ if err != nil {
+ return fmt.Errorf("creating stderr log: %w", err)
+ }
+ defer stderrFile.Close()
+
+ // 4. Run container
+
+ // Write API keys to a temporary env file to avoid exposure in 'ps' or 'docker inspect'
+ envFile := filepath.Join(workspace, ".claudomator-env")
+ envContent := fmt.Sprintf("ANTHROPIC_API_KEY=%s\nGOOGLE_API_KEY=%s\nGEMINI_API_KEY=%s\n", os.Getenv("ANTHROPIC_API_KEY"), os.Getenv("GOOGLE_API_KEY"), os.Getenv("GEMINI_API_KEY"))
+ if err := os.WriteFile(envFile, []byte(envContent), 0600); err != nil {
+ return fmt.Errorf("writing env file: %w", err)
+ }
+
+ // Inject custom instructions via file to avoid CLI length limits
+ instructionsFile := filepath.Join(workspace, ".claudomator-instructions.txt")
+ if err := os.WriteFile(instructionsFile, []byte(t.Agent.Instructions), 0644); err != nil {
+ return fmt.Errorf("writing instructions: %w", err)
+ }
+
+ args := r.buildDockerArgs(workspace, agentHome, e.TaskID)
+ innerCmd := r.buildInnerCmd(t, e, isResume)
+
+ fullArgs := append(args, image)
+ fullArgs = append(fullArgs, innerCmd...)
+
+ r.Logger.Info("starting container", "image", image, "taskID", t.ID)
+ cmd := r.command(ctx, "docker", fullArgs...)
+ cmd.Stderr = stderrFile
+ cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
+
+ // Use os.Pipe for stdout so we can parse it in real-time
+ var stdoutR, stdoutW *os.File
+ stdoutR, stdoutW, err = os.Pipe()
+ if err != nil {
+ return fmt.Errorf("creating stdout pipe: %w", err)
+ }
+ cmd.Stdout = stdoutW
+
+ if err := cmd.Start(); err != nil {
+ stdoutW.Close()
+ stdoutR.Close()
+ return fmt.Errorf("starting container: %w", err)
+ }
+ stdoutW.Close()
+
+ // Watch for context cancellation to kill the process group (Issue 1)
+ done := make(chan struct{})
+ defer close(done)
+ go func() {
+ select {
+ case <-ctx.Done():
+ r.Logger.Info("killing container process group due to context cancellation", "taskID", t.ID)
+ syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)
+ case <-done:
+ }
+ }()
+
+ // Stream stdout to the log file and parse cost/errors.
+ var costUSD float64
+ var sessionID string
+ var streamErr error
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ costUSD, sessionID, streamErr = parseStream(stdoutR, stdoutFile, r.Logger)
+ stdoutR.Close()
+ }()
+
+ waitErr := cmd.Wait()
+ wg.Wait()
+
+ e.CostUSD = costUSD
+ if sessionID != "" {
+ e.SessionID = sessionID
+ }
+
+ // Check whether the agent left a question before exiting.
+ questionFile := filepath.Join(logDir, "question.json")
+ if data, readErr := os.ReadFile(questionFile); readErr == nil {
+ os.Remove(questionFile) // consumed
+ questionJSON := strings.TrimSpace(string(data))
+ if isCompletionReport(questionJSON) {
+ r.Logger.Info("treating question file as completion report", "taskID", e.TaskID)
+ e.Summary = extractQuestionText(questionJSON)
+ } else {
+ if e.SessionID == "" {
+ r.Logger.Warn("missing session ID; resume will start fresh", "taskID", e.TaskID)
+ }
+ return &BlockedError{
+ QuestionJSON: questionJSON,
+ SessionID: e.SessionID,
+ SandboxDir: workspace,
+ }
+ }
+ }
+
+ // Read agent summary if written.
+ summaryFile := filepath.Join(logDir, "summary.txt")
+ if summaryData, readErr := os.ReadFile(summaryFile); readErr == nil {
+ os.Remove(summaryFile) // consumed
+ e.Summary = strings.TrimSpace(string(summaryData))
+ }
+
+ // 5. Post-execution: push changes if successful
+ if waitErr == nil && streamErr == nil {
+ // Check if there are any commits to push (HEAD ahead of origin/HEAD).
+ // If origin/HEAD doesn't exist (e.g. fresh clone with no commits), we attempt push anyway.
+ hasCommits := true
+ if out, err := r.command(ctx, "git", "-C", workspace, "rev-list", "origin/HEAD..HEAD").CombinedOutput(); err == nil {
+ if len(strings.TrimSpace(string(out))) == 0 {
+ hasCommits = false
+ }
+ }
+
+ if hasCommits {
+ pushRef := "HEAD"
+ if storyBranch != "" {
+ pushRef = storyBranch
+ }
+ r.Logger.Info("pushing changes back to remote", "url", repoURL, "ref", pushRef)
+ if out, err := r.command(ctx, "git", "-C", workspace, "push", "origin", pushRef).CombinedOutput(); err != nil {
+ r.Logger.Warn("git push failed", "error", err, "output", string(out))
+ return fmt.Errorf("git push failed: %w\n%s", err, string(out))
+ }
+ } else {
+ // No commits pushed — check whether the agent left uncommitted work behind.
+ // If so, fail loudly: the work would be silently lost when the sandbox is deleted.
+ if err := detectUncommittedChanges(workspace); err != nil {
+ return err
+ }
+ r.Logger.Info("no new commits to push", "taskID", t.ID)
+ }
+ }
+
+ if waitErr != nil {
+ // Append the tail of stderr so error classifiers (isQuotaExhausted, isRateLimitError)
+ // can inspect agent-specific messages (e.g. Gemini TerminalQuotaError).
+ stderrTail := readFileTail(e.StderrPath, 4096)
+ if stderrTail != "" {
+ return fmt.Errorf("container execution failed: %w\n%s", waitErr, stderrTail)
+ }
+ return fmt.Errorf("container execution failed: %w", waitErr)
+ }
+ if streamErr != nil {
+ return fmt.Errorf("stream parsing failed: %w", streamErr)
+ }
+
+ return nil
+}
+
+func (r *ContainerRunner) buildDockerArgs(workspace, claudeHome, taskID string) []string {
+ // --env-file takes a HOST path.
+ hostEnvFile := filepath.Join(workspace, ".claudomator-env")
+
+ // Replace localhost with host.docker.internal so the container can reach the host API.
+ apiURL := strings.ReplaceAll(r.APIURL, "localhost", "host.docker.internal")
+
+ args := []string{
+ "run", "--rm",
+ // Allow container to reach the host via host.docker.internal.
+ "--add-host=host.docker.internal:host-gateway",
+ // Run as the current process UID:GID so the container can read host-owned files.
+ fmt.Sprintf("--user=%d:%d", os.Getuid(), os.Getgid()),
+ "-v", workspace + ":/workspace",
+ "-v", claudeHome + ":/home/agent",
+ "-w", "/workspace",
+ "--env-file", hostEnvFile,
+ "-e", "HOME=/home/agent",
+ "-e", "CLAUDOMATOR_API_URL=" + apiURL,
+ "-e", "CLAUDOMATOR_TASK_ID=" + taskID,
+ "-e", "CLAUDOMATOR_DROP_DIR=" + r.DropsDir,
+ }
+ if r.SSHAuthSock != "" {
+ args = append(args, "-v", r.SSHAuthSock+":/tmp/ssh-auth.sock", "-e", "SSH_AUTH_SOCK=/tmp/ssh-auth.sock")
+ }
+ return args
+}
+
+func (r *ContainerRunner) buildInnerCmd(t *task.Task, e *storage.Execution, isResume bool) []string {
+ // Claude CLI uses -p for prompt text. To pass a file, we use a shell to cat it.
+ // We use a shell variable to capture the expansion to avoid quoting issues with instructions contents.
+ // The outer single quotes around the sh -c argument prevent host-side expansion.
+
+ claudeBin := r.ClaudeBinary
+ if claudeBin == "" {
+ claudeBin = "claude"
+ }
+ geminiBin := r.GeminiBinary
+ if geminiBin == "" {
+ geminiBin = "gemini"
+ }
+
+ if t.Agent.Type == "gemini" {
+ return []string{"sh", "-c", fmt.Sprintf("INST=$(cat /workspace/.claudomator-instructions.txt); %s -p \"$INST\"", geminiBin)}
+ }
+
+ // Claude
+ var claudeCmd strings.Builder
+ claudeCmd.WriteString(fmt.Sprintf("INST=$(cat /workspace/.claudomator-instructions.txt); %s -p \"$INST\"", claudeBin))
+ if isResume && e.ResumeSessionID != "" {
+ claudeCmd.WriteString(fmt.Sprintf(" --resume %s", e.ResumeSessionID))
+ }
+ claudeCmd.WriteString(" --output-format stream-json --verbose --permission-mode bypassPermissions")
+
+ return []string{"sh", "-c", claudeCmd.String()}
+}
+
+// scaffoldPrefixes are files/dirs written by the harness into the workspace before the agent
+// runs. They are not part of the repo and must not trigger the uncommitted-changes check.
+var scaffoldPrefixes = []string{
+ ".claudomator-env",
+ ".claudomator-instructions.txt",
+ ".agent-home",
+}
+
+func isScaffold(path string) bool {
+ for _, p := range scaffoldPrefixes {
+ if path == p || strings.HasPrefix(path, p+"/") {
+ return true
+ }
+ }
+ return false
+}
+
+// detectUncommittedChanges returns an error if the workspace contains modified or
+// untracked source files that the agent forgot to commit. Scaffold files written by
+// the harness (.claudomator-env, .claudomator-instructions.txt, .agent-home/) are
+// excluded from the check.
+func detectUncommittedChanges(workspace string) error {
+ // Modified or staged tracked files
+ diffOut, err := exec.Command("git", "-c", "safe.directory=*", "-C", workspace,
+ "diff", "--name-only", "HEAD").CombinedOutput()
+ if err == nil {
+ for _, line := range strings.Split(strings.TrimSpace(string(diffOut)), "\n") {
+ if line != "" && !isScaffold(line) {
+ return fmt.Errorf("agent left uncommitted changes (work would be lost on sandbox deletion):\n%s\nInstructions must include: git add -A && git commit && git push origin main", strings.TrimSpace(string(diffOut)))
+ }
+ }
+ }
+
+ // Untracked new source files (excludes gitignored files)
+ lsOut, err := exec.Command("git", "-c", "safe.directory=*", "-C", workspace,
+ "ls-files", "--others", "--exclude-standard").CombinedOutput()
+ if err == nil {
+ var dirty []string
+ for _, line := range strings.Split(strings.TrimSpace(string(lsOut)), "\n") {
+ if line != "" && !isScaffold(line) {
+ dirty = append(dirty, line)
+ }
+ }
+ if len(dirty) > 0 {
+ return fmt.Errorf("agent left untracked files not committed (work would be lost on sandbox deletion):\n%s\nInstructions must include: git add -A && git commit && git push origin main", strings.Join(dirty, "\n"))
+ }
+ }
+
+ return nil
+}
+