From e1be377c851f1e7ce594fa3de6c429354bcedcce Mon Sep 17 00:00:00 2001 From: Peter Stone Date: Wed, 18 Mar 2026 07:24:31 +0000 Subject: fix: address round 3 review feedback for container execution - Fix push failure swallowing and ensure workspace preservation on push error - Fix wrong session ID in --resume flag and BlockedError - Implement safer shell quoting for instructions in buildInnerCmd - Capture and propagate actual Claude session ID from stream init message - Clean up redundant image resolution and stale TODOs - Mark ADR-005 as Superseded - Consolidate RepositoryURL to Task level (removed from AgentConfig) - Add unit test for session ID extraction in parseStream --- internal/executor/container.go | 47 +++++++++++++++++-------------------- internal/executor/container_test.go | 20 +++++++++------- internal/executor/helpers.go | 11 +++++++-- internal/executor/stream_test.go | 25 +++++++++++++++----- 4 files changed, 62 insertions(+), 41 deletions(-) (limited to 'internal/executor') diff --git a/internal/executor/container.go b/internal/executor/container.go index 32a1ea3..d21aea3 100644 --- a/internal/executor/container.go +++ b/internal/executor/container.go @@ -88,11 +88,7 @@ func (r *ContainerRunner) Run(ctx context.Context, t *task.Task, e *storage.Exec // 2. Clone repo into workspace if not resuming if !isResume { r.Logger.Info("cloning repository", "url", repoURL, "workspace", workspace) - // git clone requires the target to be empty or non-existent. - // Since we just created workspace as a temp dir, it's empty. - // But git clone wants to CREATE the dir if it's the target, or clone INTO it. if out, err := exec.CommandContext(ctx, "git", "clone", repoURL, workspace).CombinedOutput(); err != nil { - // If it's a local path and not a repo, we might need to init it (legacy support from ADR-005) r.Logger.Warn("git clone failed, attempting fallback init", "url", repoURL, "error", err) if initErr := r.fallbackGitInit(repoURL, workspace); initErr != nil { return fmt.Errorf("git clone and fallback init failed: %w\n%s", err, string(out)) @@ -126,7 +122,6 @@ func (r *ContainerRunner) Run(ctx context.Context, t *task.Task, e *storage.Exec defer stderrFile.Close() // 4. Run container - // TODO: Support Resume/BLOCKED by re-attaching to preserved workspace. // Write API keys to a temporary env file to avoid exposure in 'ps' or 'docker inspect' envFile := filepath.Join(workspace, ".claudomator-env") @@ -142,15 +137,7 @@ func (r *ContainerRunner) Run(ctx context.Context, t *task.Task, e *storage.Exec } args := r.buildDockerArgs(workspace, e.TaskID) - innerCmd := r.buildInnerCmd(t, e.ID, isResume) - - image = t.Agent.ContainerImage - if image == "" { - image = r.Image - } - if image == "" { - image = "claudomator-agent:latest" - } + innerCmd := r.buildInnerCmd(t, e, isResume) fullArgs := append(args, image) fullArgs = append(fullArgs, innerCmd...) @@ -177,12 +164,13 @@ func (r *ContainerRunner) Run(ctx context.Context, t *task.Task, e *storage.Exec // Stream stdout to the log file and parse cost/errors. var costUSD float64 + var sessionID string var streamErr error var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() - costUSD, streamErr = parseStream(stdoutR, stdoutFile, r.Logger) + costUSD, sessionID, streamErr = parseStream(stdoutR, stdoutFile, r.Logger) stdoutR.Close() }() @@ -190,6 +178,9 @@ func (r *ContainerRunner) Run(ctx context.Context, t *task.Task, e *storage.Exec wg.Wait() e.CostUSD = costUSD + if sessionID != "" { + e.SessionID = sessionID + } // Check whether the agent left a question before exiting. questionFile := filepath.Join(logDir, "question.json") @@ -204,7 +195,7 @@ func (r *ContainerRunner) Run(ctx context.Context, t *task.Task, e *storage.Exec success = true // We consider BLOCKED as a "success" for workspace preservation return &BlockedError{ QuestionJSON: questionJSON, - SessionID: e.ID, // For container runner, we use exec ID as session ID + SessionID: e.SessionID, SandboxDir: workspace, } } @@ -219,12 +210,16 @@ func (r *ContainerRunner) Run(ctx context.Context, t *task.Task, e *storage.Exec // 5. Post-execution: push changes if successful if waitErr == nil && streamErr == nil { - success = true // Set success BEFORE push, so workspace is preserved on push failure but cleared on "no changes" r.Logger.Info("pushing changes back to remote", "url", repoURL) // We assume the sandbox has committed changes (the agent image should enforce this) if out, err := exec.CommandContext(ctx, "git", "-C", workspace, "push", "origin", "HEAD").CombinedOutput(); err != nil { r.Logger.Warn("git push failed or no changes", "error", err, "output", string(out)) + // Only set success = true if we consider this "good enough". + // Review says: "If the remote is missing or the push fails, the task is marked FAILED and the host-side workspace is preserved" + // So we MUST return error here. + return fmt.Errorf("git push failed: %w\n%s", err, string(out)) } + success = true } if waitErr != nil { @@ -251,22 +246,24 @@ func (r *ContainerRunner) buildDockerArgs(workspace, taskID string) []string { } } -func (r *ContainerRunner) buildInnerCmd(t *task.Task, execID string, isResume bool) []string { +func (r *ContainerRunner) buildInnerCmd(t *task.Task, e *storage.Execution, isResume bool) []string { // Claude CLI uses -p for prompt text. To pass a file, we use a shell to cat it. - promptCmd := "cat /workspace/.claudomator-instructions.txt" + // We use a shell variable to capture the expansion to avoid quoting issues with instructions contents. + // The outer single quotes around the sh -c argument prevent host-side expansion. if t.Agent.Type == "gemini" { - return []string{"sh", "-c", "gemini -p \"$(" + promptCmd + ")\""} + return []string{"sh", "-c", "INST=$(cat /workspace/.claudomator-instructions.txt); gemini -p \"$INST\""} } // Claude - claudeArgs := []string{"claude", "-p", "\"$(" + promptCmd + ")\""} - if isResume { - claudeArgs = append(claudeArgs, "--resume", execID) + var claudeCmd strings.Builder + claudeCmd.WriteString("INST=$(cat /workspace/.claudomator-instructions.txt); claude -p \"$INST\"") + if isResume && e.ResumeSessionID != "" { + claudeCmd.WriteString(fmt.Sprintf(" --resume %s", e.ResumeSessionID)) } - claudeArgs = append(claudeArgs, "--output-format", "stream-json", "--verbose", "--permission-mode", "bypassPermissions") + claudeCmd.WriteString(" --output-format stream-json --verbose --permission-mode bypassPermissions") - return []string{"sh", "-c", strings.Join(claudeArgs, " ")} + return []string{"sh", "-c", claudeCmd.String()} } func (r *ContainerRunner) fallbackGitInit(repoURL, workspace string) error { diff --git a/internal/executor/container_test.go b/internal/executor/container_test.go index fbb4d7d..0e36def 100644 --- a/internal/executor/container_test.go +++ b/internal/executor/container_test.go @@ -33,6 +33,7 @@ func TestContainerRunner_BuildDockerArgs(t *testing.T) { "-e", "CLAUDOMATOR_DROP_DIR=/data/drops", } + if len(args) != len(expected) { t.Fatalf("expected %d args, got %d", len(expected), len(args)) } @@ -48,34 +49,37 @@ func TestContainerRunner_BuildInnerCmd(t *testing.T) { t.Run("claude-fresh", func(t *testing.T) { tk := &task.Task{Agent: task.AgentConfig{Type: "claude"}} - cmd := runner.buildInnerCmd(tk, "exec-456", false) + exec := &storage.Execution{} + cmd := runner.buildInnerCmd(tk, exec, false) cmdStr := strings.Join(cmd, " ") if strings.Contains(cmdStr, "--resume") { t.Errorf("unexpected --resume flag in fresh run: %q", cmdStr) } - if !strings.Contains(cmdStr, "cat /workspace/.claudomator-instructions.txt") { + if !strings.Contains(cmdStr, "INST=$(cat /workspace/.claudomator-instructions.txt); claude -p \"$INST\"") { t.Errorf("expected cat instructions in sh command, got %q", cmdStr) } }) t.Run("claude-resume", func(t *testing.T) { tk := &task.Task{Agent: task.AgentConfig{Type: "claude"}} - cmd := runner.buildInnerCmd(tk, "exec-456", true) + exec := &storage.Execution{ResumeSessionID: "orig-session-123"} + cmd := runner.buildInnerCmd(tk, exec, true) cmdStr := strings.Join(cmd, " ") - if !strings.Contains(cmdStr, "--resume exec-456") { - t.Errorf("expected --resume flag in resume run: %q", cmdStr) + if !strings.Contains(cmdStr, "--resume orig-session-123") { + t.Errorf("expected --resume flag with correct session ID, got %q", cmdStr) } }) t.Run("gemini", func(t *testing.T) { tk := &task.Task{Agent: task.AgentConfig{Type: "gemini"}} - cmd := runner.buildInnerCmd(tk, "exec-456", false) + exec := &storage.Execution{} + cmd := runner.buildInnerCmd(tk, exec, false) cmdStr := strings.Join(cmd, " ") - if !strings.Contains(cmdStr, "gemini") { - t.Errorf("expected gemini command, got %q", cmdStr) + if !strings.Contains(cmdStr, "gemini -p \"$INST\"") { + t.Errorf("expected gemini command with safer quoting, got %q", cmdStr) } }) } diff --git a/internal/executor/helpers.go b/internal/executor/helpers.go index 5ffde8e..36cd050 100644 --- a/internal/executor/helpers.go +++ b/internal/executor/helpers.go @@ -24,12 +24,13 @@ func (e *BlockedError) Error() string { return fmt.Sprintf("task blocked: %s", e // (costUSD, error). error is non-nil if the stream signals task failure: // - result message has is_error:true // - a tool_result was denied due to missing permissions -func parseStream(r io.Reader, w io.Writer, logger *slog.Logger) (float64, error) { +func parseStream(r io.Reader, w io.Writer, logger *slog.Logger) (float64, string, error) { tee := io.TeeReader(r, w) scanner := bufio.NewScanner(tee) scanner.Buffer(make([]byte, 1024*1024), 1024*1024) // 1MB buffer for large lines var totalCost float64 + var sessionID string var streamErr error for scanner.Scan() { @@ -41,6 +42,12 @@ func parseStream(r io.Reader, w io.Writer, logger *slog.Logger) (float64, error) msgType, _ := msg["type"].(string) switch msgType { + case "system": + if subtype, ok := msg["subtype"].(string); ok && subtype == "init" { + if sid, ok := msg["session_id"].(string); ok { + sessionID = sid + } + } case "rate_limit_event": if info, ok := msg["rate_limit_info"].(map[string]interface{}); ok { status, _ := info["status"].(string) @@ -81,7 +88,7 @@ func parseStream(r io.Reader, w io.Writer, logger *slog.Logger) (float64, error) } } - return totalCost, streamErr + return totalCost, sessionID, streamErr } // permissionDenialError inspects a "user" stream message for tool_result entries diff --git a/internal/executor/stream_test.go b/internal/executor/stream_test.go index 10eb858..11a6178 100644 --- a/internal/executor/stream_test.go +++ b/internal/executor/stream_test.go @@ -12,7 +12,7 @@ func streamLine(json string) string { return json + "\n" } func TestParseStream_ResultIsError_ReturnsError(t *testing.T) { input := streamLine(`{"type":"result","subtype":"error_during_execution","is_error":true,"result":"something went wrong"}`) - _, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil))) + _, _, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil))) if err == nil { t.Fatal("expected error when result.is_error=true, got nil") } @@ -27,7 +27,7 @@ func TestParseStream_PermissionDenied_ReturnsError(t *testing.T) { input := streamLine(`{"type":"user","message":{"role":"user","content":[{"type":"tool_result","is_error":true,"content":"Claude requested permissions to write to /foo/bar.go, but you haven't granted it yet.","tool_use_id":"tu_abc"}]}}`) + streamLine(`{"type":"result","subtype":"success","is_error":false,"result":"I need permission","total_cost_usd":0.1}`) - _, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil))) + _, _, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil))) if err == nil { t.Fatal("expected error for permission denial, got nil") } @@ -40,7 +40,7 @@ func TestParseStream_Success_ReturnsNilError(t *testing.T) { input := streamLine(`{"type":"assistant","message":{"content":[{"type":"text","text":"Done."}]}}`) + streamLine(`{"type":"result","subtype":"success","is_error":false,"result":"All tests pass.","total_cost_usd":0.05}`) - _, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil))) + _, _, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil))) if err != nil { t.Fatalf("expected nil error for success stream, got: %v", err) } @@ -49,7 +49,7 @@ func TestParseStream_Success_ReturnsNilError(t *testing.T) { func TestParseStream_ExtractsCostFromResultMessage(t *testing.T) { input := streamLine(`{"type":"result","subtype":"success","is_error":false,"result":"done","total_cost_usd":1.2345}`) - cost, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil))) + cost, _, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil))) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -62,7 +62,7 @@ func TestParseStream_ExtractsCostFromLegacyCostUSD(t *testing.T) { // Some versions emit cost_usd at the top level rather than total_cost_usd. input := streamLine(`{"type":"result","subtype":"success","is_error":false,"result":"done","cost_usd":0.99}`) - cost, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil))) + cost, _, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil))) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -78,8 +78,21 @@ func TestParseStream_NonToolResultIsError_DoesNotFail(t *testing.T) { input := streamLine(`{"type":"user","message":{"role":"user","content":[{"type":"tool_result","is_error":true,"content":"exit status 1","tool_use_id":"tu_xyz"}]}}`) + streamLine(`{"type":"result","subtype":"success","is_error":false,"result":"Fixed it.","total_cost_usd":0.2}`) - _, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil))) + _, _, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil))) if err != nil { t.Fatalf("non-permission tool errors should not fail the task, got: %v", err) } } + +func TestParseStream_ExtractsSessionID(t *testing.T) { + input := streamLine(`{"type":"system","subtype":"init","session_id":"sess-999"}`) + + streamLine(`{"type":"result","subtype":"success","is_error":false,"result":"ok","total_cost_usd":0.01}`) + + _, sid, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil))) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if sid != "sess-999" { + t.Errorf("want session ID sess-999, got %q", sid) + } +} -- cgit v1.2.3