summaryrefslogtreecommitdiff
path: root/internal/executor/executor.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/executor/executor.go')
-rw-r--r--internal/executor/executor.go68
1 files changed, 64 insertions, 4 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index c8130cc..68ebdf3 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -78,6 +78,33 @@ func (p *Pool) ActiveCount() int {
}
func (p *Pool) execute(ctx context.Context, t *task.Task) {
+ defer func() {
+ p.mu.Lock()
+ p.active--
+ p.mu.Unlock()
+ }()
+
+ // Wait for all dependencies to complete before starting execution.
+ if len(t.DependsOn) > 0 {
+ if err := p.waitForDependencies(ctx, t); err != nil {
+ now := time.Now().UTC()
+ exec := &storage.Execution{
+ ID: uuid.New().String(),
+ TaskID: t.ID,
+ StartTime: now,
+ EndTime: now,
+ Status: "FAILED",
+ ErrorMsg: err.Error(),
+ }
+ if createErr := p.store.CreateExecution(exec); createErr != nil {
+ p.logger.Error("failed to create execution record", "error", createErr)
+ }
+ p.store.UpdateTaskState(t.ID, task.StateFailed)
+ p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err}
+ return
+ }
+ }
+
execID := uuid.New().String()
exec := &storage.Execution{
ID: execID,
@@ -130,9 +157,42 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
p.logger.Error("failed to update execution", "error", updateErr)
}
- p.mu.Lock()
- p.active--
- p.mu.Unlock()
-
p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err}
}
+
+// terminalFailureStates are dependency states that cause the waiting task to fail immediately.
+var terminalFailureStates = map[task.State]bool{
+ task.StateFailed: true,
+ task.StateTimedOut: true,
+ task.StateCancelled: true,
+ task.StateBudgetExceeded: true,
+}
+
+// waitForDependencies polls storage until all tasks in t.DependsOn reach COMPLETED,
+// or until a dependency enters a terminal failure state or the context is cancelled.
+func (p *Pool) waitForDependencies(ctx context.Context, t *task.Task) error {
+ for {
+ allDone := true
+ for _, depID := range t.DependsOn {
+ dep, err := p.store.GetTask(depID)
+ if err != nil {
+ return fmt.Errorf("dependency %q not found: %w", depID, err)
+ }
+ if dep.State == task.StateCompleted {
+ continue
+ }
+ if terminalFailureStates[dep.State] {
+ return fmt.Errorf("dependency %q ended in state %s", depID, dep.State)
+ }
+ allDone = false
+ }
+ if allDone {
+ return nil
+ }
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-time.After(5 * time.Second):
+ }
+ }
+}