From f527972f4d8311a09e639ede6c4da4ca669cfd5e Mon Sep 17 00:00:00 2001 From: Peter Stone Date: Tue, 3 Mar 2026 21:15:06 +0000 Subject: Executor: dependency waiting and planning preamble - Pool.waitForDependencies polls depends_on task states before running - ClaudeRunner prepends planningPreamble to task instructions to prompt a plan-then-implement approach - Rate-limit test helper updated to match new ClaudeRunner signature Co-Authored-By: Claude Sonnet 4.6 --- internal/executor/executor.go | 68 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 64 insertions(+), 4 deletions(-) (limited to 'internal/executor/executor.go') diff --git a/internal/executor/executor.go b/internal/executor/executor.go index c8130cc..68ebdf3 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -78,6 +78,33 @@ func (p *Pool) ActiveCount() int { } func (p *Pool) execute(ctx context.Context, t *task.Task) { + defer func() { + p.mu.Lock() + p.active-- + p.mu.Unlock() + }() + + // Wait for all dependencies to complete before starting execution. + if len(t.DependsOn) > 0 { + if err := p.waitForDependencies(ctx, t); err != nil { + now := time.Now().UTC() + exec := &storage.Execution{ + ID: uuid.New().String(), + TaskID: t.ID, + StartTime: now, + EndTime: now, + Status: "FAILED", + ErrorMsg: err.Error(), + } + if createErr := p.store.CreateExecution(exec); createErr != nil { + p.logger.Error("failed to create execution record", "error", createErr) + } + p.store.UpdateTaskState(t.ID, task.StateFailed) + p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} + return + } + } + execID := uuid.New().String() exec := &storage.Execution{ ID: execID, @@ -130,9 +157,42 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { p.logger.Error("failed to update execution", "error", updateErr) } - p.mu.Lock() - p.active-- - p.mu.Unlock() - p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} } + +// terminalFailureStates are dependency states that cause the waiting task to fail immediately. +var terminalFailureStates = map[task.State]bool{ + task.StateFailed: true, + task.StateTimedOut: true, + task.StateCancelled: true, + task.StateBudgetExceeded: true, +} + +// waitForDependencies polls storage until all tasks in t.DependsOn reach COMPLETED, +// or until a dependency enters a terminal failure state or the context is cancelled. +func (p *Pool) waitForDependencies(ctx context.Context, t *task.Task) error { + for { + allDone := true + for _, depID := range t.DependsOn { + dep, err := p.store.GetTask(depID) + if err != nil { + return fmt.Errorf("dependency %q not found: %w", depID, err) + } + if dep.State == task.StateCompleted { + continue + } + if terminalFailureStates[dep.State] { + return fmt.Errorf("dependency %q ended in state %s", depID, dep.State) + } + allDone = false + } + if allDone { + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(5 * time.Second): + } + } +} -- cgit v1.2.3