diff options
Diffstat (limited to 'internal/executor')
| -rw-r--r-- | internal/executor/executor.go | 15 | ||||
| -rw-r--r-- | internal/executor/executor_test.go | 59 |
2 files changed, 74 insertions, 0 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go index f85f1ff..c07171b 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -634,6 +634,21 @@ func (p *Pool) RecoverStaleQueued(ctx context.Context) { } } +// RecoverStaleBlocked promotes any BLOCKED parent task to READY when all of its +// subtasks are already COMPLETED. This handles the case where the server was +// restarted after subtasks finished but before maybeUnblockParent could fire. +// Call this once on server startup, after RecoverStaleRunning and RecoverStaleQueued. +func (p *Pool) RecoverStaleBlocked() { + tasks, err := p.store.ListTasks(storage.TaskFilter{State: task.StateBlocked}) + if err != nil { + p.logger.Error("RecoverStaleBlocked: list tasks", "error", err) + return + } + for _, t := range tasks { + p.maybeUnblockParent(t.ID) + } +} + // terminalFailureStates are dependency states that cause the waiting task to fail immediately. var terminalFailureStates = map[task.State]bool{ task.StateFailed: true, diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go index 610ed3b..878a32d 100644 --- a/internal/executor/executor_test.go +++ b/internal/executor/executor_test.go @@ -680,6 +680,65 @@ func TestPool_RecoverStaleQueued_SkipsNonQueuedTasks(t *testing.T) { } } +func TestPool_RecoverStaleBlocked_UnblocksWhenAllSubtasksCompleted(t *testing.T) { + store := testStore(t) + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger) + + // Parent task stuck in BLOCKED state (server restarted after subtasks completed). + parent := makeTask("parent-stale-blocked") + parent.State = task.StateBlocked + store.CreateTask(parent) + + // All subtasks completed. + for i := 0; i < 3; i++ { + sub := makeTask(fmt.Sprintf("sub-%d", i)) + sub.ParentTaskID = parent.ID + sub.State = task.StateCompleted + store.CreateTask(sub) + } + + pool.RecoverStaleBlocked() + + got, err := store.GetTask(parent.ID) + if err != nil { + t.Fatalf("get task: %v", err) + } + if got.State != task.StateReady { + t.Errorf("parent state: want READY, got %q", got.State) + } +} + +func TestPool_RecoverStaleBlocked_KeepsBlockedWhenSubtaskIncomplete(t *testing.T) { + store := testStore(t) + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger) + + parent := makeTask("parent-still-blocked") + parent.State = task.StateBlocked + store.CreateTask(parent) + + sub1 := makeTask("sub-done") + sub1.ParentTaskID = parent.ID + sub1.State = task.StateCompleted + store.CreateTask(sub1) + + sub2 := makeTask("sub-running") + sub2.ParentTaskID = parent.ID + sub2.State = task.StateRunning + store.CreateTask(sub2) + + pool.RecoverStaleBlocked() + + got, err := store.GetTask(parent.ID) + if err != nil { + t.Fatalf("get task: %v", err) + } + if got.State != task.StateBlocked { + t.Errorf("parent state: want BLOCKED, got %q", got.State) + } +} + func TestPool_ActivePerAgent_DeletesZeroEntries(t *testing.T) { store := testStore(t) runner := &mockRunner{} |
