summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
Diffstat (limited to 'internal')
-rw-r--r--internal/executor/executor.go29
-rw-r--r--internal/executor/executor_test.go121
2 files changed, 150 insertions, 0 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index 41377b2..f445ef3 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -318,6 +318,7 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex
if err := p.store.UpdateTaskState(t.ID, task.StateCompleted); err != nil {
p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateCompleted, "error", err)
}
+ p.maybeUnblockParent(t.ParentTaskID)
}
}
@@ -564,6 +565,7 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
if err := p.store.UpdateTaskState(t.ID, task.StateCompleted); err != nil {
p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateCompleted, "error", err)
}
+ p.maybeUnblockParent(t.ParentTaskID)
}
}
@@ -650,6 +652,33 @@ func withFailureHistory(t *task.Task, execs []*storage.Execution, err error) *ta
return &copy
}
+// maybeUnblockParent transitions the parent task from BLOCKED to READY if all
+// of its subtasks are in the COMPLETED state. If any subtask is not COMPLETED
+// (including FAILED, CANCELLED, RUNNING, etc.) the parent stays BLOCKED.
+func (p *Pool) maybeUnblockParent(parentID string) {
+ parent, err := p.store.GetTask(parentID)
+ if err != nil {
+ p.logger.Error("maybeUnblockParent: get parent", "parentID", parentID, "error", err)
+ return
+ }
+ if parent.State != task.StateBlocked {
+ return
+ }
+ subtasks, err := p.store.ListSubtasks(parentID)
+ if err != nil {
+ p.logger.Error("maybeUnblockParent: list subtasks", "parentID", parentID, "error", err)
+ return
+ }
+ for _, sub := range subtasks {
+ if sub.State != task.StateCompleted {
+ return
+ }
+ }
+ if err := p.store.UpdateTaskState(parentID, task.StateReady); err != nil {
+ p.logger.Error("maybeUnblockParent: update parent state", "parentID", parentID, "error", err)
+ }
+}
+
// waitForDependencies polls storage until all tasks in t.DependsOn reach COMPLETED,
// or until a dependency enters a terminal failure state or the context is cancelled.
func (p *Pool) waitForDependencies(ctx context.Context, t *task.Task) error {
diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go
index 91f7636..9896ba1 100644
--- a/internal/executor/executor_test.go
+++ b/internal/executor/executor_test.go
@@ -684,6 +684,127 @@ func TestPool_Submit_TopLevel_WithSubtasks_GoesBlocked(t *testing.T) {
}
}
+// TestPool_Submit_LastSubtask_UnblocksParent verifies that when the last
+// remaining subtask completes, the parent task transitions from BLOCKED to READY.
+func TestPool_Submit_LastSubtask_UnblocksParent(t *testing.T) {
+ store := testStore(t)
+ runner := &mockRunner{}
+ runners := map[string]Runner{"claude": runner}
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, runners, store, logger)
+
+ // Parent starts BLOCKED (waiting for subtasks).
+ parent := makeTask("unblock-parent-1")
+ parent.State = task.StateBlocked
+ store.CreateTask(parent)
+
+ // First subtask already completed.
+ sub1 := makeTask("unblock-sub-1a")
+ sub1.ParentTaskID = parent.ID
+ sub1.State = task.StateCompleted
+ store.CreateTask(sub1)
+
+ // Second (last) subtask — the one we submit.
+ sub2 := makeTask("unblock-sub-1b")
+ sub2.ParentTaskID = parent.ID
+ store.CreateTask(sub2)
+
+ if err := pool.Submit(context.Background(), sub2); err != nil {
+ t.Fatalf("submit: %v", err)
+ }
+
+ result := <-pool.Results()
+ if result.Err != nil {
+ t.Errorf("expected no error, got: %v", result.Err)
+ }
+ if result.Execution.Status != "COMPLETED" {
+ t.Errorf("subtask status: want COMPLETED, got %q", result.Execution.Status)
+ }
+
+ // Parent must now be READY.
+ got, err := store.GetTask(parent.ID)
+ if err != nil {
+ t.Fatalf("get parent: %v", err)
+ }
+ if got.State != task.StateReady {
+ t.Errorf("parent state: want READY, got %v", got.State)
+ }
+}
+
+// TestPool_Submit_NotLastSubtask_ParentStaysBlocked verifies that when a subtask
+// completes but another sibling subtask is still running, the parent stays BLOCKED.
+func TestPool_Submit_NotLastSubtask_ParentStaysBlocked(t *testing.T) {
+ store := testStore(t)
+ runner := &mockRunner{}
+ runners := map[string]Runner{"claude": runner}
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, runners, store, logger)
+
+ parent := makeTask("blocked-parent-2")
+ parent.State = task.StateBlocked
+ store.CreateTask(parent)
+
+ // First subtask still RUNNING — not done yet.
+ sub1 := makeTask("blocked-sub-2a")
+ sub1.ParentTaskID = parent.ID
+ sub1.State = task.StateRunning
+ store.CreateTask(sub1)
+
+ // Second subtask — the one we submit.
+ sub2 := makeTask("blocked-sub-2b")
+ sub2.ParentTaskID = parent.ID
+ store.CreateTask(sub2)
+
+ if err := pool.Submit(context.Background(), sub2); err != nil {
+ t.Fatalf("submit: %v", err)
+ }
+
+ <-pool.Results()
+
+ // Parent must remain BLOCKED because sub1 is still RUNNING.
+ got, err := store.GetTask(parent.ID)
+ if err != nil {
+ t.Fatalf("get parent: %v", err)
+ }
+ if got.State != task.StateBlocked {
+ t.Errorf("parent state: want BLOCKED, got %v", got.State)
+ }
+}
+
+// TestPool_Submit_ParentNotBlocked_NoTransition verifies that completing a subtask
+// does not change the parent's state when the parent is not BLOCKED.
+func TestPool_Submit_ParentNotBlocked_NoTransition(t *testing.T) {
+ store := testStore(t)
+ runner := &mockRunner{}
+ runners := map[string]Runner{"claude": runner}
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, runners, store, logger)
+
+ // Parent is already READY (not BLOCKED).
+ parent := makeTask("ready-parent-3")
+ parent.State = task.StateReady
+ store.CreateTask(parent)
+
+ sub1 := makeTask("ready-sub-3a")
+ sub1.ParentTaskID = parent.ID
+ store.CreateTask(sub1)
+
+ if err := pool.Submit(context.Background(), sub1); err != nil {
+ t.Fatalf("submit: %v", err)
+ }
+
+ <-pool.Results()
+
+ // Parent must remain READY — no spurious state transition.
+ got, err := store.GetTask(parent.ID)
+ if err != nil {
+ t.Fatalf("get parent: %v", err)
+ }
+ if got.State != task.StateReady {
+ t.Errorf("parent state: want READY, got %v", got.State)
+ }
+}
+
func TestPool_UnsupportedAgent(t *testing.T) {
store := testStore(t)
runners := map[string]Runner{"claude": &mockRunner{}}