diff options
| author | Peter Stone <thepeterstone@gmail.com> | 2026-03-18 23:56:34 +0000 |
|---|---|---|
| committer | Peter Stone <thepeterstone@gmail.com> | 2026-03-18 23:56:34 +0000 |
| commit | 599a26d556df52b364b5b540762a521d22eb5b7b (patch) | |
| tree | 740c141c52764604fc8d4c036733e5f47368b26a /internal/executor/container.go | |
| parent | 0db05b0fa6de318f164a1d73ddc55db9c59f1fc3 (diff) | |
| parent | 7df4f06ae0e3ae80bd967bf53cbec36e58b4a3bd (diff) | |
Merge feat/container-execution into master
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/executor/container.go')
| -rw-r--r-- | internal/executor/container.go | 380 |
1 files changed, 380 insertions, 0 deletions
diff --git a/internal/executor/container.go b/internal/executor/container.go new file mode 100644 index 0000000..c43e201 --- /dev/null +++ b/internal/executor/container.go @@ -0,0 +1,380 @@ +package executor + +import ( + "context" + "fmt" + "log/slog" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" + "syscall" + + "github.com/thepeterstone/claudomator/internal/storage" + "github.com/thepeterstone/claudomator/internal/task" +) + +// ContainerRunner executes an agent inside a container. +type ContainerRunner struct { + Image string // default image if not specified in task + Logger *slog.Logger + LogDir string + APIURL string + DropsDir string + SSHAuthSock string // optional path to host SSH agent + ClaudeBinary string // optional path to claude binary in container + GeminiBinary string // optional path to gemini binary in container + ClaudeConfigDir string // host path to ~/.claude; mounted into container for auth credentials + // Command allows mocking exec.CommandContext for tests. + Command func(ctx context.Context, name string, arg ...string) *exec.Cmd +} + +func (r *ContainerRunner) command(ctx context.Context, name string, arg ...string) *exec.Cmd { + if r.Command != nil { + return r.Command(ctx, name, arg...) + } + return exec.CommandContext(ctx, name, arg...) +} + +func (r *ContainerRunner) ExecLogDir(execID string) string { + if r.LogDir == "" { + return "" + } + return filepath.Join(r.LogDir, execID) +} + +func (r *ContainerRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error { + var err error + repoURL := t.RepositoryURL + if repoURL == "" { + repoURL = t.Agent.RepositoryURL + } + if repoURL == "" { + // Fallback to project_dir if repository_url is not set (legacy support). + // Prefer the 'local' bare remote so that git push succeeds after execution + // (pushing to a non-bare working copy on a checked-out branch is rejected by git). + if t.Agent.ProjectDir != "" { + repoURL = t.Agent.ProjectDir + if out, err2 := exec.Command("git", "-C", t.Agent.ProjectDir, "remote", "get-url", "local").Output(); err2 == nil { + repoURL = strings.TrimSpace(string(out)) + } + } else { + return fmt.Errorf("task %s has no repository_url or project_dir", t.ID) + } + } + + image := t.Agent.ContainerImage + if image == "" { + image = r.Image + } + if image == "" { + image = "claudomator-agent:latest" + } + + // 1. Setup workspace on host + var workspace string + isResume := false + if e.SandboxDir != "" { + if _, err = os.Stat(e.SandboxDir); err == nil { + workspace = e.SandboxDir + isResume = true + r.Logger.Info("resuming in preserved workspace", "path", workspace) + } + } + + if workspace == "" { + workspace, err = os.MkdirTemp("", "claudomator-workspace-*") + if err != nil { + return fmt.Errorf("creating workspace: %w", err) + } + // chmod applied after clone; see step 2. + } + + // Note: workspace is only removed on success. On failure, it's preserved for debugging. + // If the task becomes BLOCKED, it's also preserved for resumption. + success := false + isBlocked := false + defer func() { + if success && !isBlocked { + os.RemoveAll(workspace) + } else { + r.Logger.Warn("preserving workspace", "path", workspace, "success", success, "blocked", isBlocked) + } + }() + + // 2. Clone repo into workspace if not resuming. + // git clone requires the target directory to not exist; remove the MkdirTemp-created dir first. + if !isResume { + if err := os.Remove(workspace); err != nil { + return fmt.Errorf("removing workspace before clone: %w", err) + } + r.Logger.Info("cloning repository", "url", repoURL, "workspace", workspace) + if out, err := r.command(ctx, "git", "clone", repoURL, workspace).CombinedOutput(); err != nil { + return fmt.Errorf("git clone failed: %w\n%s", err, string(out)) + } + if err = os.Chmod(workspace, 0755); err != nil { + return fmt.Errorf("chmod cloned workspace: %w", err) + } + } + e.SandboxDir = workspace + + // 3. Prepare logs + logDir := r.ExecLogDir(e.ID) + if logDir == "" { + logDir = filepath.Join(workspace, ".claudomator-logs") + } + if err := os.MkdirAll(logDir, 0700); err != nil { + return fmt.Errorf("creating log dir: %w", err) + } + e.StdoutPath = filepath.Join(logDir, "stdout.log") + e.StderrPath = filepath.Join(logDir, "stderr.log") + e.ArtifactDir = logDir + + stdoutFile, err := os.Create(e.StdoutPath) + if err != nil { + return fmt.Errorf("creating stdout log: %w", err) + } + defer stdoutFile.Close() + + stderrFile, err := os.Create(e.StderrPath) + if err != nil { + return fmt.Errorf("creating stderr log: %w", err) + } + defer stderrFile.Close() + + // 4. Run container + + // Write API keys to a temporary env file to avoid exposure in 'ps' or 'docker inspect' + envFile := filepath.Join(workspace, ".claudomator-env") + envContent := fmt.Sprintf("ANTHROPIC_API_KEY=%s\nGOOGLE_API_KEY=%s\nGEMINI_API_KEY=%s\n", os.Getenv("ANTHROPIC_API_KEY"), os.Getenv("GOOGLE_API_KEY"), os.Getenv("GEMINI_API_KEY")) + if err := os.WriteFile(envFile, []byte(envContent), 0600); err != nil { + return fmt.Errorf("writing env file: %w", err) + } + + // Inject custom instructions via file to avoid CLI length limits + instructionsFile := filepath.Join(workspace, ".claudomator-instructions.txt") + if err := os.WriteFile(instructionsFile, []byte(t.Agent.Instructions), 0644); err != nil { + return fmt.Errorf("writing instructions: %w", err) + } + + // Set up a writable $HOME staging dir so any agent tool (claude, gemini, etc.) + // can freely create subdirs (session-env, .gemini, .cache, …) without hitting + // a non-existent or read-only home. We copy only the claude credentials into it. + agentHome := filepath.Join(workspace, ".agent-home") + if err := os.MkdirAll(filepath.Join(agentHome, ".claude"), 0755); err != nil { + return fmt.Errorf("creating agent home staging dir: %w", err) + } + if err := os.MkdirAll(filepath.Join(agentHome, ".gemini"), 0755); err != nil { + return fmt.Errorf("creating .gemini dir: %w", err) + } + if r.ClaudeConfigDir != "" { + // credentials + if srcData, readErr := os.ReadFile(filepath.Join(r.ClaudeConfigDir, ".credentials.json")); readErr == nil { + _ = os.WriteFile(filepath.Join(agentHome, ".claude", ".credentials.json"), srcData, 0600) + } + // settings (used by claude CLI; copy so it can write updates without hitting the host) + if srcData, readErr := os.ReadFile(filepath.Join(filepath.Dir(r.ClaudeConfigDir), ".claude.json")); readErr == nil { + _ = os.WriteFile(filepath.Join(agentHome, ".claude.json"), srcData, 0644) + } + } + + args := r.buildDockerArgs(workspace, agentHome, e.TaskID) + innerCmd := r.buildInnerCmd(t, e, isResume) + + fullArgs := append(args, image) + fullArgs = append(fullArgs, innerCmd...) + + r.Logger.Info("starting container", "image", image, "taskID", t.ID) + cmd := r.command(ctx, "docker", fullArgs...) + cmd.Stderr = stderrFile + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + + // Use os.Pipe for stdout so we can parse it in real-time + var stdoutR, stdoutW *os.File + stdoutR, stdoutW, err = os.Pipe() + if err != nil { + return fmt.Errorf("creating stdout pipe: %w", err) + } + cmd.Stdout = stdoutW + + if err := cmd.Start(); err != nil { + stdoutW.Close() + stdoutR.Close() + return fmt.Errorf("starting container: %w", err) + } + stdoutW.Close() + + // Watch for context cancellation to kill the process group (Issue 1) + done := make(chan struct{}) + defer close(done) + go func() { + select { + case <-ctx.Done(): + r.Logger.Info("killing container process group due to context cancellation", "taskID", t.ID) + syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) + case <-done: + } + }() + + // Stream stdout to the log file and parse cost/errors. + var costUSD float64 + var sessionID string + var streamErr error + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + costUSD, sessionID, streamErr = parseStream(stdoutR, stdoutFile, r.Logger) + stdoutR.Close() + }() + + waitErr := cmd.Wait() + wg.Wait() + + e.CostUSD = costUSD + if sessionID != "" { + e.SessionID = sessionID + } + + // Check whether the agent left a question before exiting. + questionFile := filepath.Join(logDir, "question.json") + if data, readErr := os.ReadFile(questionFile); readErr == nil { + os.Remove(questionFile) // consumed + questionJSON := strings.TrimSpace(string(data)) + if isCompletionReport(questionJSON) { + r.Logger.Info("treating question file as completion report", "taskID", e.TaskID) + e.Summary = extractQuestionText(questionJSON) + } else { + isBlocked = true + success = true // We consider BLOCKED as a "success" for workspace preservation + if e.SessionID == "" { + r.Logger.Warn("missing session ID; resume will start fresh", "taskID", e.TaskID) + } + return &BlockedError{ + QuestionJSON: questionJSON, + SessionID: e.SessionID, + SandboxDir: workspace, + } + } + } + + // Read agent summary if written. + summaryFile := filepath.Join(logDir, "summary.txt") + if summaryData, readErr := os.ReadFile(summaryFile); readErr == nil { + os.Remove(summaryFile) // consumed + e.Summary = strings.TrimSpace(string(summaryData)) + } + + // 5. Post-execution: push changes if successful + if waitErr == nil && streamErr == nil { + // Check if there are any commits to push (HEAD ahead of origin/HEAD). + // If origin/HEAD doesn't exist (e.g. fresh clone with no commits), we attempt push anyway. + hasCommits := true + if out, err := r.command(ctx, "git", "-C", workspace, "rev-list", "origin/HEAD..HEAD").CombinedOutput(); err == nil { + if len(strings.TrimSpace(string(out))) == 0 { + hasCommits = false + } + } + + if hasCommits { + r.Logger.Info("pushing changes back to remote", "url", repoURL) + if out, err := r.command(ctx, "git", "-C", workspace, "push", "origin", "HEAD").CombinedOutput(); err != nil { + r.Logger.Warn("git push failed", "error", err, "output", string(out)) + return fmt.Errorf("git push failed: %w\n%s", err, string(out)) + } + } else { + r.Logger.Info("no new commits to push", "taskID", t.ID) + } + success = true + } + + if waitErr != nil { + return fmt.Errorf("container execution failed: %w", waitErr) + } + if streamErr != nil { + return fmt.Errorf("stream parsing failed: %w", streamErr) + } + + return nil +} + +func (r *ContainerRunner) buildDockerArgs(workspace, claudeHome, taskID string) []string { + // --env-file takes a HOST path. + hostEnvFile := filepath.Join(workspace, ".claudomator-env") + + // Replace localhost with host.docker.internal so the container can reach the host API. + apiURL := strings.ReplaceAll(r.APIURL, "localhost", "host.docker.internal") + + args := []string{ + "run", "--rm", + // Allow container to reach the host via host.docker.internal. + "--add-host=host.docker.internal:host-gateway", + // Run as the current process UID:GID so the container can read host-owned files. + fmt.Sprintf("--user=%d:%d", os.Getuid(), os.Getgid()), + "-v", workspace + ":/workspace", + "-v", claudeHome + ":/home/agent", + "-w", "/workspace", + "--env-file", hostEnvFile, + "-e", "HOME=/home/agent", + "-e", "CLAUDOMATOR_API_URL=" + apiURL, + "-e", "CLAUDOMATOR_TASK_ID=" + taskID, + "-e", "CLAUDOMATOR_DROP_DIR=" + r.DropsDir, + } + if r.SSHAuthSock != "" { + args = append(args, "-v", r.SSHAuthSock+":/tmp/ssh-auth.sock", "-e", "SSH_AUTH_SOCK=/tmp/ssh-auth.sock") + } + return args +} + +func (r *ContainerRunner) buildInnerCmd(t *task.Task, e *storage.Execution, isResume bool) []string { + // Claude CLI uses -p for prompt text. To pass a file, we use a shell to cat it. + // We use a shell variable to capture the expansion to avoid quoting issues with instructions contents. + // The outer single quotes around the sh -c argument prevent host-side expansion. + + claudeBin := r.ClaudeBinary + if claudeBin == "" { + claudeBin = "claude" + } + geminiBin := r.GeminiBinary + if geminiBin == "" { + geminiBin = "gemini" + } + + if t.Agent.Type == "gemini" { + return []string{"sh", "-c", fmt.Sprintf("INST=$(cat /workspace/.claudomator-instructions.txt); %s -p \"$INST\"", geminiBin)} + } + + // Claude + var claudeCmd strings.Builder + claudeCmd.WriteString(fmt.Sprintf("INST=$(cat /workspace/.claudomator-instructions.txt); %s -p \"$INST\"", claudeBin)) + if isResume && e.ResumeSessionID != "" { + claudeCmd.WriteString(fmt.Sprintf(" --resume %s", e.ResumeSessionID)) + } + claudeCmd.WriteString(" --output-format stream-json --verbose --permission-mode bypassPermissions") + + return []string{"sh", "-c", claudeCmd.String()} +} + + +func (r *ContainerRunner) fallbackGitInit(repoURL, workspace string) error { + // Ensure directory exists + if err := os.MkdirAll(workspace, 0755); err != nil { + return err + } + // If it's a local directory but not a repo, init it. + cmds := [][]string{ + gitSafe("-C", workspace, "init"), + gitSafe("-C", workspace, "add", "-A"), + gitSafe("-C", workspace, "commit", "--allow-empty", "-m", "chore: initial commit"), + } + // If it was a local path, maybe we should have copied it? + // git clone handle local paths fine if they are repos. + // This fallback is only if it's NOT a repo. + for _, args := range cmds { + if out, err := r.command(context.Background(), "git", args...).CombinedOutput(); err != nil { + return fmt.Errorf("git init failed: %w\n%s", err, out) + } + } + return nil +} |
