diff options
| -rw-r--r-- | internal/executor/executor.go | 29 | ||||
| -rw-r--r-- | internal/executor/executor_test.go | 121 |
2 files changed, 150 insertions, 0 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 41377b2..f445ef3 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -318,6 +318,7 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex if err := p.store.UpdateTaskState(t.ID, task.StateCompleted); err != nil { p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateCompleted, "error", err) } + p.maybeUnblockParent(t.ParentTaskID) } } @@ -564,6 +565,7 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { if err := p.store.UpdateTaskState(t.ID, task.StateCompleted); err != nil { p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateCompleted, "error", err) } + p.maybeUnblockParent(t.ParentTaskID) } } @@ -650,6 +652,33 @@ func withFailureHistory(t *task.Task, execs []*storage.Execution, err error) *ta return © } +// maybeUnblockParent transitions the parent task from BLOCKED to READY if all +// of its subtasks are in the COMPLETED state. If any subtask is not COMPLETED +// (including FAILED, CANCELLED, RUNNING, etc.) the parent stays BLOCKED. +func (p *Pool) maybeUnblockParent(parentID string) { + parent, err := p.store.GetTask(parentID) + if err != nil { + p.logger.Error("maybeUnblockParent: get parent", "parentID", parentID, "error", err) + return + } + if parent.State != task.StateBlocked { + return + } + subtasks, err := p.store.ListSubtasks(parentID) + if err != nil { + p.logger.Error("maybeUnblockParent: list subtasks", "parentID", parentID, "error", err) + return + } + for _, sub := range subtasks { + if sub.State != task.StateCompleted { + return + } + } + if err := p.store.UpdateTaskState(parentID, task.StateReady); err != nil { + p.logger.Error("maybeUnblockParent: update parent state", "parentID", parentID, "error", err) + } +} + // waitForDependencies polls storage until all tasks in t.DependsOn reach COMPLETED, // or until a dependency enters a terminal failure state or the context is cancelled. func (p *Pool) waitForDependencies(ctx context.Context, t *task.Task) error { diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go index 91f7636..9896ba1 100644 --- a/internal/executor/executor_test.go +++ b/internal/executor/executor_test.go @@ -684,6 +684,127 @@ func TestPool_Submit_TopLevel_WithSubtasks_GoesBlocked(t *testing.T) { } } +// TestPool_Submit_LastSubtask_UnblocksParent verifies that when the last +// remaining subtask completes, the parent task transitions from BLOCKED to READY. +func TestPool_Submit_LastSubtask_UnblocksParent(t *testing.T) { + store := testStore(t) + runner := &mockRunner{} + runners := map[string]Runner{"claude": runner} + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + pool := NewPool(2, runners, store, logger) + + // Parent starts BLOCKED (waiting for subtasks). + parent := makeTask("unblock-parent-1") + parent.State = task.StateBlocked + store.CreateTask(parent) + + // First subtask already completed. + sub1 := makeTask("unblock-sub-1a") + sub1.ParentTaskID = parent.ID + sub1.State = task.StateCompleted + store.CreateTask(sub1) + + // Second (last) subtask — the one we submit. + sub2 := makeTask("unblock-sub-1b") + sub2.ParentTaskID = parent.ID + store.CreateTask(sub2) + + if err := pool.Submit(context.Background(), sub2); err != nil { + t.Fatalf("submit: %v", err) + } + + result := <-pool.Results() + if result.Err != nil { + t.Errorf("expected no error, got: %v", result.Err) + } + if result.Execution.Status != "COMPLETED" { + t.Errorf("subtask status: want COMPLETED, got %q", result.Execution.Status) + } + + // Parent must now be READY. + got, err := store.GetTask(parent.ID) + if err != nil { + t.Fatalf("get parent: %v", err) + } + if got.State != task.StateReady { + t.Errorf("parent state: want READY, got %v", got.State) + } +} + +// TestPool_Submit_NotLastSubtask_ParentStaysBlocked verifies that when a subtask +// completes but another sibling subtask is still running, the parent stays BLOCKED. +func TestPool_Submit_NotLastSubtask_ParentStaysBlocked(t *testing.T) { + store := testStore(t) + runner := &mockRunner{} + runners := map[string]Runner{"claude": runner} + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + pool := NewPool(2, runners, store, logger) + + parent := makeTask("blocked-parent-2") + parent.State = task.StateBlocked + store.CreateTask(parent) + + // First subtask still RUNNING — not done yet. + sub1 := makeTask("blocked-sub-2a") + sub1.ParentTaskID = parent.ID + sub1.State = task.StateRunning + store.CreateTask(sub1) + + // Second subtask — the one we submit. + sub2 := makeTask("blocked-sub-2b") + sub2.ParentTaskID = parent.ID + store.CreateTask(sub2) + + if err := pool.Submit(context.Background(), sub2); err != nil { + t.Fatalf("submit: %v", err) + } + + <-pool.Results() + + // Parent must remain BLOCKED because sub1 is still RUNNING. + got, err := store.GetTask(parent.ID) + if err != nil { + t.Fatalf("get parent: %v", err) + } + if got.State != task.StateBlocked { + t.Errorf("parent state: want BLOCKED, got %v", got.State) + } +} + +// TestPool_Submit_ParentNotBlocked_NoTransition verifies that completing a subtask +// does not change the parent's state when the parent is not BLOCKED. +func TestPool_Submit_ParentNotBlocked_NoTransition(t *testing.T) { + store := testStore(t) + runner := &mockRunner{} + runners := map[string]Runner{"claude": runner} + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + pool := NewPool(2, runners, store, logger) + + // Parent is already READY (not BLOCKED). + parent := makeTask("ready-parent-3") + parent.State = task.StateReady + store.CreateTask(parent) + + sub1 := makeTask("ready-sub-3a") + sub1.ParentTaskID = parent.ID + store.CreateTask(sub1) + + if err := pool.Submit(context.Background(), sub1); err != nil { + t.Fatalf("submit: %v", err) + } + + <-pool.Results() + + // Parent must remain READY — no spurious state transition. + got, err := store.GetTask(parent.ID) + if err != nil { + t.Fatalf("get parent: %v", err) + } + if got.State != task.StateReady { + t.Errorf("parent state: want READY, got %v", got.State) + } +} + func TestPool_UnsupportedAgent(t *testing.T) { store := testStore(t) runners := map[string]Runner{"claude": &mockRunner{}} |
