From f8b5f2580e730a8affbccec8b5bde9b96b1f9fc2 Mon Sep 17 00:00:00 2001 From: Peter Stone Date: Thu, 5 Mar 2026 23:03:02 +0000 Subject: executor: persist log paths at execution create time, not just at end Add LogPather interface; ClaudeRunner implements it via ExecLogDir(). Pool pre-populates stdout_path/stderr_path/artifact_dir on the execution record before CreateExecution, so paths are in the DB from the moment a task starts running. ClaudeRunner.Run() skips path assignment when already set by the pool. Also update scripts/debug-execution to derive paths from the known convention (/executions//) as a fallback for historical records that predate this change. Co-Authored-By: Claude Sonnet 4.6 --- internal/executor/claude.go | 24 ++++++++++--- internal/executor/executor.go | 18 ++++++++++ internal/executor/executor_test.go | 72 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 110 insertions(+), 4 deletions(-) (limited to 'internal') diff --git a/internal/executor/claude.go b/internal/executor/claude.go index 815c21f..7cbcc6c 100644 --- a/internal/executor/claude.go +++ b/internal/executor/claude.go @@ -27,6 +27,15 @@ type ClaudeRunner struct { APIURL string // base URL of the Claudomator API, passed to subprocesses } +// ExecLogDir returns the log directory for the given execution ID. +// Implements LogPather so the pool can persist paths before execution starts. +func (r *ClaudeRunner) ExecLogDir(execID string) string { + if r.LogDir == "" { + return "" + } + return filepath.Join(r.LogDir, execID) +} + func (r *ClaudeRunner) binaryPath() string { if r.BinaryPath != "" { return r.BinaryPath @@ -46,13 +55,20 @@ func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Executi } // Setup log directory once; retries overwrite the log files. - logDir := filepath.Join(r.LogDir, e.ID) + // Use pre-set paths if the pool already populated them via LogPather; + // otherwise fall back to computing from LogDir + execID. + logDir := r.ExecLogDir(e.ID) + if logDir == "" { + logDir = e.ID // fallback: use execID as relative dir (tests without LogDir set) + } if err := os.MkdirAll(logDir, 0700); err != nil { return fmt.Errorf("creating log dir: %w", err) } - e.StdoutPath = filepath.Join(logDir, "stdout.log") - e.StderrPath = filepath.Join(logDir, "stderr.log") - e.ArtifactDir = logDir + if e.StdoutPath == "" { + e.StdoutPath = filepath.Join(logDir, "stdout.log") + e.StderrPath = filepath.Join(logDir, "stderr.log") + e.ArtifactDir = logDir + } attempt := 0 return runWithBackoff(ctx, 3, 5*time.Second, func() error { diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 51f468e..eb23c02 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log/slog" + "path/filepath" "sync" "time" @@ -12,6 +13,13 @@ import ( "github.com/google/uuid" ) +// LogPather is an optional interface runners can implement to provide the log +// directory for an execution before it starts. The pool uses this to persist +// log paths at CreateExecution time rather than waiting until execution ends. +type LogPather interface { + ExecLogDir(execID string) string +} + // Runner executes a single task and returns the result. type Runner interface { Run(ctx context.Context, t *task.Task, exec *storage.Execution) error @@ -115,6 +123,16 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { Status: "RUNNING", } + // Pre-populate log paths so they're available in the DB immediately — + // before the subprocess starts — enabling live tailing and debugging. + if lp, ok := p.runner.(LogPather); ok { + if logDir := lp.ExecLogDir(execID); logDir != "" { + exec.StdoutPath = filepath.Join(logDir, "stdout.log") + exec.StderrPath = filepath.Join(logDir, "stderr.log") + exec.ArtifactDir = logDir + } + } + // Record execution start. if err := p.store.CreateExecution(exec); err != nil { p.logger.Error("failed to create execution record", "error", err) diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go index 18a79bb..b3e6dae 100644 --- a/internal/executor/executor_test.go +++ b/internal/executor/executor_test.go @@ -6,6 +6,7 @@ import ( "log/slog" "os" "path/filepath" + "strings" "sync" "testing" "time" @@ -206,6 +207,77 @@ func TestPool_AtCapacity(t *testing.T) { <-pool.Results() // drain } +// logPatherMockRunner is a mockRunner that also implements LogPather, +// and captures the StdoutPath seen when Run() is called. +type logPatherMockRunner struct { + mockRunner + logDir string + capturedPath string +} + +func (m *logPatherMockRunner) ExecLogDir(execID string) string { + return filepath.Join(m.logDir, execID) +} + +func (m *logPatherMockRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error { + m.mu.Lock() + m.capturedPath = e.StdoutPath + m.mu.Unlock() + return m.mockRunner.Run(ctx, t, e) +} + +// TestPool_Execute_LogPathsPreSetBeforeRun verifies that when the runner +// implements LogPather, log paths are set on the execution before Run() is +// called — so they land in the DB at CreateExecution time, not just at +// UpdateExecution time. +func TestPool_Execute_LogPathsPreSetBeforeRun(t *testing.T) { + store := testStore(t) + runner := &logPatherMockRunner{logDir: t.TempDir()} + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + pool := NewPool(2, runner, store, logger) + + tk := makeTask("lp-1") + store.CreateTask(tk) + if err := pool.Submit(context.Background(), tk); err != nil { + t.Fatalf("submit: %v", err) + } + result := <-pool.Results() + + runner.mu.Lock() + captured := runner.capturedPath + runner.mu.Unlock() + + if captured == "" { + t.Fatal("StdoutPath was empty when Run() was called; expected pre-set path") + } + if !strings.HasSuffix(captured, "stdout.log") { + t.Errorf("expected stdout.log suffix, got: %s", captured) + } + // Path in the returned execution record should match. + if result.Execution.StdoutPath != captured { + t.Errorf("execution StdoutPath %q != captured %q", result.Execution.StdoutPath, captured) + } +} + +// TestPool_Execute_NoLogPather_PathsEmptyBeforeRun verifies that a runner +// without LogPather doesn't panic and paths remain empty until Run() sets them. +func TestPool_Execute_NoLogPather_PathsEmptyBeforeRun(t *testing.T) { + store := testStore(t) + runner := &mockRunner{} // does NOT implement LogPather + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + pool := NewPool(2, runner, store, logger) + + tk := makeTask("nolp-1") + store.CreateTask(tk) + if err := pool.Submit(context.Background(), tk); err != nil { + t.Fatalf("submit: %v", err) + } + result := <-pool.Results() + if result.Err != nil { + t.Fatalf("unexpected error: %v", result.Err) + } +} + func TestPool_ConcurrentExecution(t *testing.T) { store := testStore(t) runner := &mockRunner{delay: 50 * time.Millisecond} -- cgit v1.2.3