From c8e3b467afdfcee9c5047902662d49d33c862764 Mon Sep 17 00:00:00 2001 From: Claudomator Agent Date: Mon, 9 Mar 2026 08:04:07 +0000 Subject: executor: unblock parent task when all subtasks complete Add maybeUnblockParent helper that transitions a BLOCKED parent task to READY once every subtask is in the COMPLETED state. Called in both execute() and executeResume() immediately after a subtask is marked COMPLETED. Any non-COMPLETED sibling (RUNNING, FAILED, etc.) keeps the parent BLOCKED. Tests added: - TestPool_Submit_LastSubtask_UnblocksParent - TestPool_Submit_NotLastSubtask_ParentStaysBlocked - TestPool_Submit_ParentNotBlocked_NoTransition --- internal/executor/executor.go | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) (limited to 'internal/executor/executor.go') diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 41377b2..f445ef3 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -318,6 +318,7 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex if err := p.store.UpdateTaskState(t.ID, task.StateCompleted); err != nil { p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateCompleted, "error", err) } + p.maybeUnblockParent(t.ParentTaskID) } } @@ -564,6 +565,7 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { if err := p.store.UpdateTaskState(t.ID, task.StateCompleted); err != nil { p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateCompleted, "error", err) } + p.maybeUnblockParent(t.ParentTaskID) } } @@ -650,6 +652,33 @@ func withFailureHistory(t *task.Task, execs []*storage.Execution, err error) *ta return © } +// maybeUnblockParent transitions the parent task from BLOCKED to READY if all +// of its subtasks are in the COMPLETED state. If any subtask is not COMPLETED +// (including FAILED, CANCELLED, RUNNING, etc.) the parent stays BLOCKED. +func (p *Pool) maybeUnblockParent(parentID string) { + parent, err := p.store.GetTask(parentID) + if err != nil { + p.logger.Error("maybeUnblockParent: get parent", "parentID", parentID, "error", err) + return + } + if parent.State != task.StateBlocked { + return + } + subtasks, err := p.store.ListSubtasks(parentID) + if err != nil { + p.logger.Error("maybeUnblockParent: list subtasks", "parentID", parentID, "error", err) + return + } + for _, sub := range subtasks { + if sub.State != task.StateCompleted { + return + } + } + if err := p.store.UpdateTaskState(parentID, task.StateReady); err != nil { + p.logger.Error("maybeUnblockParent: update parent state", "parentID", parentID, "error", err) + } +} + // waitForDependencies polls storage until all tasks in t.DependsOn reach COMPLETED, // or until a dependency enters a terminal failure state or the context is cancelled. func (p *Pool) waitForDependencies(ctx context.Context, t *task.Task) error { -- cgit v1.2.3