summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
authorPeter Stone <thepeterstone@gmail.com>2026-03-18 07:24:31 +0000
committerPeter Stone <thepeterstone@gmail.com>2026-03-18 07:55:27 +0000
commite1be377c851f1e7ce594fa3de6c429354bcedcce (patch)
treed67c9d1460d8a419cb8235cfe547aee800095a90 /internal
parent86842903e4cae3a60b9732797cfc5dccddcc22e5 (diff)
fix: address round 3 review feedback for container execution
- Fix push failure swallowing and ensure workspace preservation on push error - Fix wrong session ID in --resume flag and BlockedError - Implement safer shell quoting for instructions in buildInnerCmd - Capture and propagate actual Claude session ID from stream init message - Clean up redundant image resolution and stale TODOs - Mark ADR-005 as Superseded - Consolidate RepositoryURL to Task level (removed from AgentConfig) - Add unit test for session ID extraction in parseStream
Diffstat (limited to 'internal')
-rw-r--r--internal/executor/container.go47
-rw-r--r--internal/executor/container_test.go20
-rw-r--r--internal/executor/helpers.go11
-rw-r--r--internal/executor/stream_test.go25
4 files changed, 62 insertions, 41 deletions
diff --git a/internal/executor/container.go b/internal/executor/container.go
index 32a1ea3..d21aea3 100644
--- a/internal/executor/container.go
+++ b/internal/executor/container.go
@@ -88,11 +88,7 @@ func (r *ContainerRunner) Run(ctx context.Context, t *task.Task, e *storage.Exec
// 2. Clone repo into workspace if not resuming
if !isResume {
r.Logger.Info("cloning repository", "url", repoURL, "workspace", workspace)
- // git clone requires the target to be empty or non-existent.
- // Since we just created workspace as a temp dir, it's empty.
- // But git clone wants to CREATE the dir if it's the target, or clone INTO it.
if out, err := exec.CommandContext(ctx, "git", "clone", repoURL, workspace).CombinedOutput(); err != nil {
- // If it's a local path and not a repo, we might need to init it (legacy support from ADR-005)
r.Logger.Warn("git clone failed, attempting fallback init", "url", repoURL, "error", err)
if initErr := r.fallbackGitInit(repoURL, workspace); initErr != nil {
return fmt.Errorf("git clone and fallback init failed: %w\n%s", err, string(out))
@@ -126,7 +122,6 @@ func (r *ContainerRunner) Run(ctx context.Context, t *task.Task, e *storage.Exec
defer stderrFile.Close()
// 4. Run container
- // TODO: Support Resume/BLOCKED by re-attaching to preserved workspace.
// Write API keys to a temporary env file to avoid exposure in 'ps' or 'docker inspect'
envFile := filepath.Join(workspace, ".claudomator-env")
@@ -142,15 +137,7 @@ func (r *ContainerRunner) Run(ctx context.Context, t *task.Task, e *storage.Exec
}
args := r.buildDockerArgs(workspace, e.TaskID)
- innerCmd := r.buildInnerCmd(t, e.ID, isResume)
-
- image = t.Agent.ContainerImage
- if image == "" {
- image = r.Image
- }
- if image == "" {
- image = "claudomator-agent:latest"
- }
+ innerCmd := r.buildInnerCmd(t, e, isResume)
fullArgs := append(args, image)
fullArgs = append(fullArgs, innerCmd...)
@@ -177,12 +164,13 @@ func (r *ContainerRunner) Run(ctx context.Context, t *task.Task, e *storage.Exec
// Stream stdout to the log file and parse cost/errors.
var costUSD float64
+ var sessionID string
var streamErr error
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
- costUSD, streamErr = parseStream(stdoutR, stdoutFile, r.Logger)
+ costUSD, sessionID, streamErr = parseStream(stdoutR, stdoutFile, r.Logger)
stdoutR.Close()
}()
@@ -190,6 +178,9 @@ func (r *ContainerRunner) Run(ctx context.Context, t *task.Task, e *storage.Exec
wg.Wait()
e.CostUSD = costUSD
+ if sessionID != "" {
+ e.SessionID = sessionID
+ }
// Check whether the agent left a question before exiting.
questionFile := filepath.Join(logDir, "question.json")
@@ -204,7 +195,7 @@ func (r *ContainerRunner) Run(ctx context.Context, t *task.Task, e *storage.Exec
success = true // We consider BLOCKED as a "success" for workspace preservation
return &BlockedError{
QuestionJSON: questionJSON,
- SessionID: e.ID, // For container runner, we use exec ID as session ID
+ SessionID: e.SessionID,
SandboxDir: workspace,
}
}
@@ -219,12 +210,16 @@ func (r *ContainerRunner) Run(ctx context.Context, t *task.Task, e *storage.Exec
// 5. Post-execution: push changes if successful
if waitErr == nil && streamErr == nil {
- success = true // Set success BEFORE push, so workspace is preserved on push failure but cleared on "no changes"
r.Logger.Info("pushing changes back to remote", "url", repoURL)
// We assume the sandbox has committed changes (the agent image should enforce this)
if out, err := exec.CommandContext(ctx, "git", "-C", workspace, "push", "origin", "HEAD").CombinedOutput(); err != nil {
r.Logger.Warn("git push failed or no changes", "error", err, "output", string(out))
+ // Only set success = true if we consider this "good enough".
+ // Review says: "If the remote is missing or the push fails, the task is marked FAILED and the host-side workspace is preserved"
+ // So we MUST return error here.
+ return fmt.Errorf("git push failed: %w\n%s", err, string(out))
}
+ success = true
}
if waitErr != nil {
@@ -251,22 +246,24 @@ func (r *ContainerRunner) buildDockerArgs(workspace, taskID string) []string {
}
}
-func (r *ContainerRunner) buildInnerCmd(t *task.Task, execID string, isResume bool) []string {
+func (r *ContainerRunner) buildInnerCmd(t *task.Task, e *storage.Execution, isResume bool) []string {
// Claude CLI uses -p for prompt text. To pass a file, we use a shell to cat it.
- promptCmd := "cat /workspace/.claudomator-instructions.txt"
+ // We use a shell variable to capture the expansion to avoid quoting issues with instructions contents.
+ // The outer single quotes around the sh -c argument prevent host-side expansion.
if t.Agent.Type == "gemini" {
- return []string{"sh", "-c", "gemini -p \"$(" + promptCmd + ")\""}
+ return []string{"sh", "-c", "INST=$(cat /workspace/.claudomator-instructions.txt); gemini -p \"$INST\""}
}
// Claude
- claudeArgs := []string{"claude", "-p", "\"$(" + promptCmd + ")\""}
- if isResume {
- claudeArgs = append(claudeArgs, "--resume", execID)
+ var claudeCmd strings.Builder
+ claudeCmd.WriteString("INST=$(cat /workspace/.claudomator-instructions.txt); claude -p \"$INST\"")
+ if isResume && e.ResumeSessionID != "" {
+ claudeCmd.WriteString(fmt.Sprintf(" --resume %s", e.ResumeSessionID))
}
- claudeArgs = append(claudeArgs, "--output-format", "stream-json", "--verbose", "--permission-mode", "bypassPermissions")
+ claudeCmd.WriteString(" --output-format stream-json --verbose --permission-mode bypassPermissions")
- return []string{"sh", "-c", strings.Join(claudeArgs, " ")}
+ return []string{"sh", "-c", claudeCmd.String()}
}
func (r *ContainerRunner) fallbackGitInit(repoURL, workspace string) error {
diff --git a/internal/executor/container_test.go b/internal/executor/container_test.go
index fbb4d7d..0e36def 100644
--- a/internal/executor/container_test.go
+++ b/internal/executor/container_test.go
@@ -33,6 +33,7 @@ func TestContainerRunner_BuildDockerArgs(t *testing.T) {
"-e", "CLAUDOMATOR_DROP_DIR=/data/drops",
}
+
if len(args) != len(expected) {
t.Fatalf("expected %d args, got %d", len(expected), len(args))
}
@@ -48,34 +49,37 @@ func TestContainerRunner_BuildInnerCmd(t *testing.T) {
t.Run("claude-fresh", func(t *testing.T) {
tk := &task.Task{Agent: task.AgentConfig{Type: "claude"}}
- cmd := runner.buildInnerCmd(tk, "exec-456", false)
+ exec := &storage.Execution{}
+ cmd := runner.buildInnerCmd(tk, exec, false)
cmdStr := strings.Join(cmd, " ")
if strings.Contains(cmdStr, "--resume") {
t.Errorf("unexpected --resume flag in fresh run: %q", cmdStr)
}
- if !strings.Contains(cmdStr, "cat /workspace/.claudomator-instructions.txt") {
+ if !strings.Contains(cmdStr, "INST=$(cat /workspace/.claudomator-instructions.txt); claude -p \"$INST\"") {
t.Errorf("expected cat instructions in sh command, got %q", cmdStr)
}
})
t.Run("claude-resume", func(t *testing.T) {
tk := &task.Task{Agent: task.AgentConfig{Type: "claude"}}
- cmd := runner.buildInnerCmd(tk, "exec-456", true)
+ exec := &storage.Execution{ResumeSessionID: "orig-session-123"}
+ cmd := runner.buildInnerCmd(tk, exec, true)
cmdStr := strings.Join(cmd, " ")
- if !strings.Contains(cmdStr, "--resume exec-456") {
- t.Errorf("expected --resume flag in resume run: %q", cmdStr)
+ if !strings.Contains(cmdStr, "--resume orig-session-123") {
+ t.Errorf("expected --resume flag with correct session ID, got %q", cmdStr)
}
})
t.Run("gemini", func(t *testing.T) {
tk := &task.Task{Agent: task.AgentConfig{Type: "gemini"}}
- cmd := runner.buildInnerCmd(tk, "exec-456", false)
+ exec := &storage.Execution{}
+ cmd := runner.buildInnerCmd(tk, exec, false)
cmdStr := strings.Join(cmd, " ")
- if !strings.Contains(cmdStr, "gemini") {
- t.Errorf("expected gemini command, got %q", cmdStr)
+ if !strings.Contains(cmdStr, "gemini -p \"$INST\"") {
+ t.Errorf("expected gemini command with safer quoting, got %q", cmdStr)
}
})
}
diff --git a/internal/executor/helpers.go b/internal/executor/helpers.go
index 5ffde8e..36cd050 100644
--- a/internal/executor/helpers.go
+++ b/internal/executor/helpers.go
@@ -24,12 +24,13 @@ func (e *BlockedError) Error() string { return fmt.Sprintf("task blocked: %s", e
// (costUSD, error). error is non-nil if the stream signals task failure:
// - result message has is_error:true
// - a tool_result was denied due to missing permissions
-func parseStream(r io.Reader, w io.Writer, logger *slog.Logger) (float64, error) {
+func parseStream(r io.Reader, w io.Writer, logger *slog.Logger) (float64, string, error) {
tee := io.TeeReader(r, w)
scanner := bufio.NewScanner(tee)
scanner.Buffer(make([]byte, 1024*1024), 1024*1024) // 1MB buffer for large lines
var totalCost float64
+ var sessionID string
var streamErr error
for scanner.Scan() {
@@ -41,6 +42,12 @@ func parseStream(r io.Reader, w io.Writer, logger *slog.Logger) (float64, error)
msgType, _ := msg["type"].(string)
switch msgType {
+ case "system":
+ if subtype, ok := msg["subtype"].(string); ok && subtype == "init" {
+ if sid, ok := msg["session_id"].(string); ok {
+ sessionID = sid
+ }
+ }
case "rate_limit_event":
if info, ok := msg["rate_limit_info"].(map[string]interface{}); ok {
status, _ := info["status"].(string)
@@ -81,7 +88,7 @@ func parseStream(r io.Reader, w io.Writer, logger *slog.Logger) (float64, error)
}
}
- return totalCost, streamErr
+ return totalCost, sessionID, streamErr
}
// permissionDenialError inspects a "user" stream message for tool_result entries
diff --git a/internal/executor/stream_test.go b/internal/executor/stream_test.go
index 10eb858..11a6178 100644
--- a/internal/executor/stream_test.go
+++ b/internal/executor/stream_test.go
@@ -12,7 +12,7 @@ func streamLine(json string) string { return json + "\n" }
func TestParseStream_ResultIsError_ReturnsError(t *testing.T) {
input := streamLine(`{"type":"result","subtype":"error_during_execution","is_error":true,"result":"something went wrong"}`)
- _, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil)))
+ _, _, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil)))
if err == nil {
t.Fatal("expected error when result.is_error=true, got nil")
}
@@ -27,7 +27,7 @@ func TestParseStream_PermissionDenied_ReturnsError(t *testing.T) {
input := streamLine(`{"type":"user","message":{"role":"user","content":[{"type":"tool_result","is_error":true,"content":"Claude requested permissions to write to /foo/bar.go, but you haven't granted it yet.","tool_use_id":"tu_abc"}]}}`) +
streamLine(`{"type":"result","subtype":"success","is_error":false,"result":"I need permission","total_cost_usd":0.1}`)
- _, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil)))
+ _, _, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil)))
if err == nil {
t.Fatal("expected error for permission denial, got nil")
}
@@ -40,7 +40,7 @@ func TestParseStream_Success_ReturnsNilError(t *testing.T) {
input := streamLine(`{"type":"assistant","message":{"content":[{"type":"text","text":"Done."}]}}`) +
streamLine(`{"type":"result","subtype":"success","is_error":false,"result":"All tests pass.","total_cost_usd":0.05}`)
- _, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil)))
+ _, _, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil)))
if err != nil {
t.Fatalf("expected nil error for success stream, got: %v", err)
}
@@ -49,7 +49,7 @@ func TestParseStream_Success_ReturnsNilError(t *testing.T) {
func TestParseStream_ExtractsCostFromResultMessage(t *testing.T) {
input := streamLine(`{"type":"result","subtype":"success","is_error":false,"result":"done","total_cost_usd":1.2345}`)
- cost, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil)))
+ cost, _, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil)))
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@@ -62,7 +62,7 @@ func TestParseStream_ExtractsCostFromLegacyCostUSD(t *testing.T) {
// Some versions emit cost_usd at the top level rather than total_cost_usd.
input := streamLine(`{"type":"result","subtype":"success","is_error":false,"result":"done","cost_usd":0.99}`)
- cost, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil)))
+ cost, _, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil)))
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@@ -78,8 +78,21 @@ func TestParseStream_NonToolResultIsError_DoesNotFail(t *testing.T) {
input := streamLine(`{"type":"user","message":{"role":"user","content":[{"type":"tool_result","is_error":true,"content":"exit status 1","tool_use_id":"tu_xyz"}]}}`) +
streamLine(`{"type":"result","subtype":"success","is_error":false,"result":"Fixed it.","total_cost_usd":0.2}`)
- _, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil)))
+ _, _, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil)))
if err != nil {
t.Fatalf("non-permission tool errors should not fail the task, got: %v", err)
}
}
+
+func TestParseStream_ExtractsSessionID(t *testing.T) {
+ input := streamLine(`{"type":"system","subtype":"init","session_id":"sess-999"}`) +
+ streamLine(`{"type":"result","subtype":"success","is_error":false,"result":"ok","total_cost_usd":0.01}`)
+
+ _, sid, err := parseStream(strings.NewReader(input), io.Discard, slog.New(slog.NewTextHandler(io.Discard, nil)))
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if sid != "sess-999" {
+ t.Errorf("want session ID sess-999, got %q", sid)
+ }
+}