summaryrefslogtreecommitdiff
path: root/internal/executor/executor.go
diff options
context:
space:
mode:
authorClaudomator Agent <agent@claudomator>2026-03-09 08:04:07 +0000
committerClaudomator Agent <agent@claudomator>2026-03-09 08:04:07 +0000
commitc8e3b467afdfcee9c5047902662d49d33c862764 (patch)
treee39df5351d6624edc531a76ada0c0c15fd9b862a /internal/executor/executor.go
parent441ed9eef3d9691cd9269772857307b84a7f5700 (diff)
executor: unblock parent task when all subtasks complete
Add maybeUnblockParent helper that transitions a BLOCKED parent task to READY once every subtask is in the COMPLETED state. Called in both execute() and executeResume() immediately after a subtask is marked COMPLETED. Any non-COMPLETED sibling (RUNNING, FAILED, etc.) keeps the parent BLOCKED. Tests added: - TestPool_Submit_LastSubtask_UnblocksParent - TestPool_Submit_NotLastSubtask_ParentStaysBlocked - TestPool_Submit_ParentNotBlocked_NoTransition
Diffstat (limited to 'internal/executor/executor.go')
-rw-r--r--internal/executor/executor.go29
1 files changed, 29 insertions, 0 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index 41377b2..f445ef3 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -318,6 +318,7 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex
if err := p.store.UpdateTaskState(t.ID, task.StateCompleted); err != nil {
p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateCompleted, "error", err)
}
+ p.maybeUnblockParent(t.ParentTaskID)
}
}
@@ -564,6 +565,7 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
if err := p.store.UpdateTaskState(t.ID, task.StateCompleted); err != nil {
p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateCompleted, "error", err)
}
+ p.maybeUnblockParent(t.ParentTaskID)
}
}
@@ -650,6 +652,33 @@ func withFailureHistory(t *task.Task, execs []*storage.Execution, err error) *ta
return &copy
}
+// maybeUnblockParent transitions the parent task from BLOCKED to READY if all
+// of its subtasks are in the COMPLETED state. If any subtask is not COMPLETED
+// (including FAILED, CANCELLED, RUNNING, etc.) the parent stays BLOCKED.
+func (p *Pool) maybeUnblockParent(parentID string) {
+ parent, err := p.store.GetTask(parentID)
+ if err != nil {
+ p.logger.Error("maybeUnblockParent: get parent", "parentID", parentID, "error", err)
+ return
+ }
+ if parent.State != task.StateBlocked {
+ return
+ }
+ subtasks, err := p.store.ListSubtasks(parentID)
+ if err != nil {
+ p.logger.Error("maybeUnblockParent: list subtasks", "parentID", parentID, "error", err)
+ return
+ }
+ for _, sub := range subtasks {
+ if sub.State != task.StateCompleted {
+ return
+ }
+ }
+ if err := p.store.UpdateTaskState(parentID, task.StateReady); err != nil {
+ p.logger.Error("maybeUnblockParent: update parent state", "parentID", parentID, "error", err)
+ }
+}
+
// waitForDependencies polls storage until all tasks in t.DependsOn reach COMPLETED,
// or until a dependency enters a terminal failure state or the context is cancelled.
func (p *Pool) waitForDependencies(ctx context.Context, t *task.Task) error {