diff options
| author | Peter Stone <thepeterstone@gmail.com> | 2026-03-03 21:15:06 +0000 |
|---|---|---|
| committer | Peter Stone <thepeterstone@gmail.com> | 2026-03-03 21:15:06 +0000 |
| commit | f527972f4d8311a09e639ede6c4da4ca669cfd5e (patch) | |
| tree | 58ee5c97fd7dea71cdff0ffca7ccd36af562c640 /internal/executor/executor.go | |
| parent | 704d007a26cac804148a51d35e129beaea382fb0 (diff) | |
Executor: dependency waiting and planning preamble
- Pool.waitForDependencies polls depends_on task states before running
- ClaudeRunner prepends planningPreamble to task instructions to prompt
a plan-then-implement approach
- Rate-limit test helper updated to match new ClaudeRunner signature
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/executor/executor.go')
| -rw-r--r-- | internal/executor/executor.go | 68 |
1 files changed, 64 insertions, 4 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go index c8130cc..68ebdf3 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -78,6 +78,33 @@ func (p *Pool) ActiveCount() int { } func (p *Pool) execute(ctx context.Context, t *task.Task) { + defer func() { + p.mu.Lock() + p.active-- + p.mu.Unlock() + }() + + // Wait for all dependencies to complete before starting execution. + if len(t.DependsOn) > 0 { + if err := p.waitForDependencies(ctx, t); err != nil { + now := time.Now().UTC() + exec := &storage.Execution{ + ID: uuid.New().String(), + TaskID: t.ID, + StartTime: now, + EndTime: now, + Status: "FAILED", + ErrorMsg: err.Error(), + } + if createErr := p.store.CreateExecution(exec); createErr != nil { + p.logger.Error("failed to create execution record", "error", createErr) + } + p.store.UpdateTaskState(t.ID, task.StateFailed) + p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} + return + } + } + execID := uuid.New().String() exec := &storage.Execution{ ID: execID, @@ -130,9 +157,42 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { p.logger.Error("failed to update execution", "error", updateErr) } - p.mu.Lock() - p.active-- - p.mu.Unlock() - p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} } + +// terminalFailureStates are dependency states that cause the waiting task to fail immediately. +var terminalFailureStates = map[task.State]bool{ + task.StateFailed: true, + task.StateTimedOut: true, + task.StateCancelled: true, + task.StateBudgetExceeded: true, +} + +// waitForDependencies polls storage until all tasks in t.DependsOn reach COMPLETED, +// or until a dependency enters a terminal failure state or the context is cancelled. +func (p *Pool) waitForDependencies(ctx context.Context, t *task.Task) error { + for { + allDone := true + for _, depID := range t.DependsOn { + dep, err := p.store.GetTask(depID) + if err != nil { + return fmt.Errorf("dependency %q not found: %w", depID, err) + } + if dep.State == task.StateCompleted { + continue + } + if terminalFailureStates[dep.State] { + return fmt.Errorf("dependency %q ended in state %s", depID, dep.State) + } + allDone = false + } + if allDone { + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(5 * time.Second): + } + } +} |
