summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--internal/cli/serve.go2
-rw-r--r--internal/executor/claude.go41
-rw-r--r--internal/executor/claude_test.go86
-rw-r--r--internal/executor/executor.go17
-rw-r--r--internal/executor/executor_test.go28
-rw-r--r--internal/executor/gemini.go3
6 files changed, 164 insertions, 13 deletions
diff --git a/internal/cli/serve.go b/internal/cli/serve.go
index fd9fda8..2d47630 100644
--- a/internal/cli/serve.go
+++ b/internal/cli/serve.go
@@ -76,7 +76,7 @@ func serve(addr string) error {
if cfg.GeminiBinaryPath != "" {
pool.Classifier = &executor.Classifier{GeminiBinaryPath: cfg.GeminiBinaryPath}
}
- pool.RecoverStaleRunning()
+ pool.RecoverStaleRunning(context.Background())
pool.RecoverStaleQueued(context.Background())
srv := api.NewServer(store, pool, logger, cfg.ClaudeBinaryPath, cfg.GeminiBinaryPath)
diff --git a/internal/executor/claude.go b/internal/executor/claude.go
index 626a854..5a5b35e 100644
--- a/internal/executor/claude.go
+++ b/internal/executor/claude.go
@@ -106,7 +106,23 @@ func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Executi
effectiveWorkingDir := projectDir
if e.ResumeSessionID != "" {
if e.SandboxDir != "" {
- effectiveWorkingDir = e.SandboxDir
+ if _, statErr := os.Stat(e.SandboxDir); statErr == nil {
+ effectiveWorkingDir = e.SandboxDir
+ } else {
+ // Preserved sandbox was cleaned up (e.g. /tmp purge after reboot).
+ // Clone a fresh sandbox so the task can run rather than fail immediately.
+ r.Logger.Warn("preserved sandbox missing, cloning fresh", "sandbox", e.SandboxDir, "project_dir", projectDir)
+ e.SandboxDir = ""
+ if projectDir != "" {
+ var err error
+ sandboxDir, err = setupSandbox(projectDir)
+ if err != nil {
+ return fmt.Errorf("setting up sandbox: %w", err)
+ }
+ effectiveWorkingDir = sandboxDir
+ r.Logger.Info("fresh sandbox created for resume", "sandbox", sandboxDir, "project_dir", projectDir)
+ }
+ }
}
} else if projectDir != "" {
var err error
@@ -399,6 +415,9 @@ func (r *ClaudeRunner) execOnce(ctx context.Context, args []string, workingDir s
if isRateLimitError(streamErr) || isQuotaExhausted(streamErr) {
return streamErr
}
+ if tail := tailFile(e.StderrPath, 20); tail != "" {
+ return fmt.Errorf("claude exited with error: %w\nstderr:\n%s", waitErr, tail)
+ }
return fmt.Errorf("claude exited with error: %w", waitErr)
}
@@ -578,3 +597,23 @@ func permissionDenialError(msg map[string]interface{}) error {
}
return nil
}
+
+// tailFile returns the last n lines of the file at path, or empty string if
+// the file cannot be read. Used to surface subprocess stderr on failure.
+func tailFile(path string, n int) string {
+ f, err := os.Open(path)
+ if err != nil {
+ return ""
+ }
+ defer f.Close()
+
+ var lines []string
+ scanner := bufio.NewScanner(f)
+ for scanner.Scan() {
+ lines = append(lines, scanner.Text())
+ if len(lines) > n {
+ lines = lines[1:]
+ }
+ }
+ return strings.Join(lines, "\n")
+}
diff --git a/internal/executor/claude_test.go b/internal/executor/claude_test.go
index 7ab0802..9bb873f 100644
--- a/internal/executor/claude_test.go
+++ b/internal/executor/claude_test.go
@@ -3,6 +3,7 @@ package executor
import (
"context"
"errors"
+ "fmt"
"io"
"log/slog"
"os"
@@ -579,6 +580,63 @@ func TestClaudeRunner_Run_ResumeUsesStoredSandboxDir(t *testing.T) {
}
}
+func TestClaudeRunner_Run_StaleSandboxDir_ClonesAfresh(t *testing.T) {
+ logDir := t.TempDir()
+ projectDir := t.TempDir()
+ initGitRepo(t, projectDir)
+
+ cwdFile := filepath.Join(logDir, "cwd.txt")
+ scriptPath := filepath.Join(t.TempDir(), "fake-claude.sh")
+ script := "#!/bin/sh\nprintf '%s' \"$PWD\" > " + cwdFile + "\n"
+ if err := os.WriteFile(scriptPath, []byte(script), 0755); err != nil {
+ t.Fatalf("write script: %v", err)
+ }
+
+ r := &ClaudeRunner{
+ BinaryPath: scriptPath,
+ Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
+ LogDir: logDir,
+ }
+ tk := &task.Task{
+ Agent: task.AgentConfig{
+ Type: "claude",
+ ProjectDir: projectDir,
+ SkipPlanning: true,
+ },
+ }
+ // Point to a sandbox that no longer exists (e.g. /tmp was purged).
+ staleSandbox := filepath.Join(t.TempDir(), "gone")
+ e := &storage.Execution{
+ ID: "resume-exec-2",
+ TaskID: "task-2",
+ ResumeSessionID: "session-abc",
+ ResumeAnswer: "ok",
+ SandboxDir: staleSandbox,
+ }
+
+ if err := r.Run(context.Background(), tk, e); err != nil {
+ t.Fatalf("Run with stale sandbox: %v", err)
+ }
+
+ got, err := os.ReadFile(cwdFile)
+ if err != nil {
+ t.Fatalf("cwd file not written: %v", err)
+ }
+ // Should have run in a fresh sandbox (not the stale path, not the raw projectDir).
+ // The sandbox is removed after teardown, so we only check what it wasn't.
+ cwd := string(got)
+ if cwd == staleSandbox {
+ t.Error("ran in stale sandbox dir that doesn't exist")
+ }
+ if cwd == projectDir {
+ t.Error("ran directly in project_dir; expected a fresh sandbox clone")
+ }
+ // cwd should look like a claudomator sandbox path.
+ if !strings.Contains(cwd, "claudomator-sandbox-") {
+ t.Errorf("expected sandbox path, got %q", cwd)
+ }
+}
+
func TestIsCompletionReport(t *testing.T) {
tests := []struct {
name string
@@ -621,6 +679,34 @@ func TestIsCompletionReport(t *testing.T) {
}
}
+func TestTailFile_ReturnsLastNLines(t *testing.T) {
+ f, err := os.CreateTemp("", "tailfile-*")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer os.Remove(f.Name())
+ for i := 1; i <= 30; i++ {
+ fmt.Fprintf(f, "line %d\n", i)
+ }
+ f.Close()
+
+ got := tailFile(f.Name(), 5)
+ lines := strings.Split(got, "\n")
+ if len(lines) != 5 {
+ t.Fatalf("want 5 lines, got %d: %q", len(lines), got)
+ }
+ if lines[0] != "line 26" || lines[4] != "line 30" {
+ t.Errorf("want lines 26-30, got: %q", got)
+ }
+}
+
+func TestTailFile_MissingFile_ReturnsEmpty(t *testing.T) {
+ got := tailFile("/nonexistent/path/file.log", 10)
+ if got != "" {
+ t.Errorf("want empty string for missing file, got %q", got)
+ }
+}
+
func TestGitSafe_PrependsSafeDirectory(t *testing.T) {
got := gitSafe("-C", "/some/path", "status")
want := []string{"-c", "safe.directory=*", "-C", "/some/path", "status"}
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index 475d150..7674fe6 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -567,9 +567,10 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
}
// RecoverStaleRunning marks any tasks stuck in RUNNING state (from a previous
-// server crash or restart) as FAILED. It also closes any open RUNNING execution
-// records for those tasks. Call this once on server startup.
-func (p *Pool) RecoverStaleRunning() {
+// server crash or restart) as FAILED, then immediately re-queues them for
+// retry. It also closes any open RUNNING execution records for those tasks.
+// Call this once on server startup.
+func (p *Pool) RecoverStaleRunning(ctx context.Context) {
tasks, err := p.store.ListTasks(storage.TaskFilter{State: task.StateRunning})
if err != nil {
p.logger.Error("RecoverStaleRunning: list tasks", "error", err)
@@ -593,6 +594,16 @@ func (p *Pool) RecoverStaleRunning() {
}
if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil {
p.logger.Error("RecoverStaleRunning: update task state", "error", err, "taskID", t.ID)
+ continue
+ }
+ // Re-queue so the task retries automatically. Submit expects QUEUED state.
+ if err := p.store.UpdateTaskState(t.ID, task.StateQueued); err != nil {
+ p.logger.Error("RecoverStaleRunning: set queued", "error", err, "taskID", t.ID)
+ continue
+ }
+ t.State = task.StateQueued
+ if err := p.Submit(ctx, t); err != nil {
+ p.logger.Error("RecoverStaleRunning: re-queue", "error", err, "taskID", t.ID)
}
}
}
diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go
index f6d0179..a6c4ad8 100644
--- a/internal/executor/executor_test.go
+++ b/internal/executor/executor_test.go
@@ -596,15 +596,9 @@ func TestPool_RecoverStaleRunning(t *testing.T) {
Status: "RUNNING",
})
- pool.RecoverStaleRunning()
+ pool.RecoverStaleRunning(context.Background())
- recovered, err := store.GetTask(tk.ID)
- if err != nil {
- t.Fatalf("get task: %v", err)
- }
- if recovered.State != task.StateFailed {
- t.Errorf("state: want FAILED, got %q", recovered.State)
- }
+ // Execution record should be closed as FAILED.
execs, _ := store.ListExecutions(tk.ID)
if len(execs) == 0 || execs[0].Status != "FAILED" {
t.Errorf("execution status: want FAILED, got %+v", execs)
@@ -612,6 +606,24 @@ func TestPool_RecoverStaleRunning(t *testing.T) {
if execs[0].ErrorMsg == "" {
t.Error("expected non-empty error message on recovered execution")
}
+
+ // Task should be re-queued for retry and complete.
+ select {
+ case result := <-pool.Results():
+ if result.TaskID != tk.ID {
+ t.Errorf("unexpected task in results: %s", result.TaskID)
+ }
+ case <-time.After(2 * time.Second):
+ t.Fatal("timed out waiting for stale RUNNING task to be re-queued and run")
+ }
+ recovered, err := store.GetTask(tk.ID)
+ if err != nil {
+ t.Fatalf("get task: %v", err)
+ }
+ // Top-level tasks (no parent) go to READY after a successful run.
+ if recovered.State != task.StateReady {
+ t.Errorf("state after re-queue: want READY, got %q", recovered.State)
+ }
}
func TestPool_RecoverStaleQueued_ResubmitsToPool(t *testing.T) {
diff --git a/internal/executor/gemini.go b/internal/executor/gemini.go
index c30cd66..2db3218 100644
--- a/internal/executor/gemini.go
+++ b/internal/executor/gemini.go
@@ -150,6 +150,9 @@ func (r *GeminiRunner) execOnce(ctx context.Context, args []string, workingDir s
if exitErr, ok := waitErr.(*exec.ExitError); ok {
e.ExitCode = exitErr.ExitCode()
}
+ if tail := tailFile(e.StderrPath, 20); tail != "" {
+ return fmt.Errorf("gemini exited with error: %w\nstderr:\n%s", waitErr, tail)
+ }
return fmt.Errorf("gemini exited with error: %w", waitErr)
}