summaryrefslogtreecommitdiff
path: root/internal/executor
diff options
context:
space:
mode:
Diffstat (limited to 'internal/executor')
-rw-r--r--internal/executor/claude.go24
-rw-r--r--internal/executor/executor.go18
-rw-r--r--internal/executor/executor_test.go72
3 files changed, 110 insertions, 4 deletions
diff --git a/internal/executor/claude.go b/internal/executor/claude.go
index 815c21f..7cbcc6c 100644
--- a/internal/executor/claude.go
+++ b/internal/executor/claude.go
@@ -27,6 +27,15 @@ type ClaudeRunner struct {
APIURL string // base URL of the Claudomator API, passed to subprocesses
}
+// ExecLogDir returns the log directory for the given execution ID.
+// Implements LogPather so the pool can persist paths before execution starts.
+func (r *ClaudeRunner) ExecLogDir(execID string) string {
+ if r.LogDir == "" {
+ return ""
+ }
+ return filepath.Join(r.LogDir, execID)
+}
+
func (r *ClaudeRunner) binaryPath() string {
if r.BinaryPath != "" {
return r.BinaryPath
@@ -46,13 +55,20 @@ func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Executi
}
// Setup log directory once; retries overwrite the log files.
- logDir := filepath.Join(r.LogDir, e.ID)
+ // Use pre-set paths if the pool already populated them via LogPather;
+ // otherwise fall back to computing from LogDir + execID.
+ logDir := r.ExecLogDir(e.ID)
+ if logDir == "" {
+ logDir = e.ID // fallback: use execID as relative dir (tests without LogDir set)
+ }
if err := os.MkdirAll(logDir, 0700); err != nil {
return fmt.Errorf("creating log dir: %w", err)
}
- e.StdoutPath = filepath.Join(logDir, "stdout.log")
- e.StderrPath = filepath.Join(logDir, "stderr.log")
- e.ArtifactDir = logDir
+ if e.StdoutPath == "" {
+ e.StdoutPath = filepath.Join(logDir, "stdout.log")
+ e.StderrPath = filepath.Join(logDir, "stderr.log")
+ e.ArtifactDir = logDir
+ }
attempt := 0
return runWithBackoff(ctx, 3, 5*time.Second, func() error {
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index 51f468e..eb23c02 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log/slog"
+ "path/filepath"
"sync"
"time"
@@ -12,6 +13,13 @@ import (
"github.com/google/uuid"
)
+// LogPather is an optional interface runners can implement to provide the log
+// directory for an execution before it starts. The pool uses this to persist
+// log paths at CreateExecution time rather than waiting until execution ends.
+type LogPather interface {
+ ExecLogDir(execID string) string
+}
+
// Runner executes a single task and returns the result.
type Runner interface {
Run(ctx context.Context, t *task.Task, exec *storage.Execution) error
@@ -115,6 +123,16 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
Status: "RUNNING",
}
+ // Pre-populate log paths so they're available in the DB immediately —
+ // before the subprocess starts — enabling live tailing and debugging.
+ if lp, ok := p.runner.(LogPather); ok {
+ if logDir := lp.ExecLogDir(execID); logDir != "" {
+ exec.StdoutPath = filepath.Join(logDir, "stdout.log")
+ exec.StderrPath = filepath.Join(logDir, "stderr.log")
+ exec.ArtifactDir = logDir
+ }
+ }
+
// Record execution start.
if err := p.store.CreateExecution(exec); err != nil {
p.logger.Error("failed to create execution record", "error", err)
diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go
index 18a79bb..b3e6dae 100644
--- a/internal/executor/executor_test.go
+++ b/internal/executor/executor_test.go
@@ -6,6 +6,7 @@ import (
"log/slog"
"os"
"path/filepath"
+ "strings"
"sync"
"testing"
"time"
@@ -206,6 +207,77 @@ func TestPool_AtCapacity(t *testing.T) {
<-pool.Results() // drain
}
+// logPatherMockRunner is a mockRunner that also implements LogPather,
+// and captures the StdoutPath seen when Run() is called.
+type logPatherMockRunner struct {
+ mockRunner
+ logDir string
+ capturedPath string
+}
+
+func (m *logPatherMockRunner) ExecLogDir(execID string) string {
+ return filepath.Join(m.logDir, execID)
+}
+
+func (m *logPatherMockRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error {
+ m.mu.Lock()
+ m.capturedPath = e.StdoutPath
+ m.mu.Unlock()
+ return m.mockRunner.Run(ctx, t, e)
+}
+
+// TestPool_Execute_LogPathsPreSetBeforeRun verifies that when the runner
+// implements LogPather, log paths are set on the execution before Run() is
+// called — so they land in the DB at CreateExecution time, not just at
+// UpdateExecution time.
+func TestPool_Execute_LogPathsPreSetBeforeRun(t *testing.T) {
+ store := testStore(t)
+ runner := &logPatherMockRunner{logDir: t.TempDir()}
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, runner, store, logger)
+
+ tk := makeTask("lp-1")
+ store.CreateTask(tk)
+ if err := pool.Submit(context.Background(), tk); err != nil {
+ t.Fatalf("submit: %v", err)
+ }
+ result := <-pool.Results()
+
+ runner.mu.Lock()
+ captured := runner.capturedPath
+ runner.mu.Unlock()
+
+ if captured == "" {
+ t.Fatal("StdoutPath was empty when Run() was called; expected pre-set path")
+ }
+ if !strings.HasSuffix(captured, "stdout.log") {
+ t.Errorf("expected stdout.log suffix, got: %s", captured)
+ }
+ // Path in the returned execution record should match.
+ if result.Execution.StdoutPath != captured {
+ t.Errorf("execution StdoutPath %q != captured %q", result.Execution.StdoutPath, captured)
+ }
+}
+
+// TestPool_Execute_NoLogPather_PathsEmptyBeforeRun verifies that a runner
+// without LogPather doesn't panic and paths remain empty until Run() sets them.
+func TestPool_Execute_NoLogPather_PathsEmptyBeforeRun(t *testing.T) {
+ store := testStore(t)
+ runner := &mockRunner{} // does NOT implement LogPather
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, runner, store, logger)
+
+ tk := makeTask("nolp-1")
+ store.CreateTask(tk)
+ if err := pool.Submit(context.Background(), tk); err != nil {
+ t.Fatalf("submit: %v", err)
+ }
+ result := <-pool.Results()
+ if result.Err != nil {
+ t.Fatalf("unexpected error: %v", result.Err)
+ }
+}
+
func TestPool_ConcurrentExecution(t *testing.T) {
store := testStore(t)
runner := &mockRunner{delay: 50 * time.Millisecond}