diff options
Diffstat (limited to 'internal/executor/executor.go')
| -rw-r--r-- | internal/executor/executor.go | 29 |
1 files changed, 29 insertions, 0 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 41377b2..f445ef3 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -318,6 +318,7 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex if err := p.store.UpdateTaskState(t.ID, task.StateCompleted); err != nil { p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateCompleted, "error", err) } + p.maybeUnblockParent(t.ParentTaskID) } } @@ -564,6 +565,7 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { if err := p.store.UpdateTaskState(t.ID, task.StateCompleted); err != nil { p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateCompleted, "error", err) } + p.maybeUnblockParent(t.ParentTaskID) } } @@ -650,6 +652,33 @@ func withFailureHistory(t *task.Task, execs []*storage.Execution, err error) *ta return © } +// maybeUnblockParent transitions the parent task from BLOCKED to READY if all +// of its subtasks are in the COMPLETED state. If any subtask is not COMPLETED +// (including FAILED, CANCELLED, RUNNING, etc.) the parent stays BLOCKED. +func (p *Pool) maybeUnblockParent(parentID string) { + parent, err := p.store.GetTask(parentID) + if err != nil { + p.logger.Error("maybeUnblockParent: get parent", "parentID", parentID, "error", err) + return + } + if parent.State != task.StateBlocked { + return + } + subtasks, err := p.store.ListSubtasks(parentID) + if err != nil { + p.logger.Error("maybeUnblockParent: list subtasks", "parentID", parentID, "error", err) + return + } + for _, sub := range subtasks { + if sub.State != task.StateCompleted { + return + } + } + if err := p.store.UpdateTaskState(parentID, task.StateReady); err != nil { + p.logger.Error("maybeUnblockParent: update parent state", "parentID", parentID, "error", err) + } +} + // waitForDependencies polls storage until all tasks in t.DependsOn reach COMPLETED, // or until a dependency enters a terminal failure state or the context is cancelled. func (p *Pool) waitForDependencies(ctx context.Context, t *task.Task) error { |
