summaryrefslogtreecommitdiff
path: root/internal/executor/executor.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/executor/executor.go')
-rw-r--r--internal/executor/executor.go29
1 files changed, 29 insertions, 0 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index 41377b2..f445ef3 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -318,6 +318,7 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex
if err := p.store.UpdateTaskState(t.ID, task.StateCompleted); err != nil {
p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateCompleted, "error", err)
}
+ p.maybeUnblockParent(t.ParentTaskID)
}
}
@@ -564,6 +565,7 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
if err := p.store.UpdateTaskState(t.ID, task.StateCompleted); err != nil {
p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateCompleted, "error", err)
}
+ p.maybeUnblockParent(t.ParentTaskID)
}
}
@@ -650,6 +652,33 @@ func withFailureHistory(t *task.Task, execs []*storage.Execution, err error) *ta
return &copy
}
+// maybeUnblockParent transitions the parent task from BLOCKED to READY if all
+// of its subtasks are in the COMPLETED state. If any subtask is not COMPLETED
+// (including FAILED, CANCELLED, RUNNING, etc.) the parent stays BLOCKED.
+func (p *Pool) maybeUnblockParent(parentID string) {
+ parent, err := p.store.GetTask(parentID)
+ if err != nil {
+ p.logger.Error("maybeUnblockParent: get parent", "parentID", parentID, "error", err)
+ return
+ }
+ if parent.State != task.StateBlocked {
+ return
+ }
+ subtasks, err := p.store.ListSubtasks(parentID)
+ if err != nil {
+ p.logger.Error("maybeUnblockParent: list subtasks", "parentID", parentID, "error", err)
+ return
+ }
+ for _, sub := range subtasks {
+ if sub.State != task.StateCompleted {
+ return
+ }
+ }
+ if err := p.store.UpdateTaskState(parentID, task.StateReady); err != nil {
+ p.logger.Error("maybeUnblockParent: update parent state", "parentID", parentID, "error", err)
+ }
+}
+
// waitForDependencies polls storage until all tasks in t.DependsOn reach COMPLETED,
// or until a dependency enters a terminal failure state or the context is cancelled.
func (p *Pool) waitForDependencies(ctx context.Context, t *task.Task) error {