summaryrefslogtreecommitdiff
path: root/internal/executor
diff options
context:
space:
mode:
Diffstat (limited to 'internal/executor')
-rw-r--r--internal/executor/executor.go15
-rw-r--r--internal/executor/executor_test.go59
2 files changed, 74 insertions, 0 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index f85f1ff..c07171b 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -634,6 +634,21 @@ func (p *Pool) RecoverStaleQueued(ctx context.Context) {
}
}
+// RecoverStaleBlocked promotes any BLOCKED parent task to READY when all of its
+// subtasks are already COMPLETED. This handles the case where the server was
+// restarted after subtasks finished but before maybeUnblockParent could fire.
+// Call this once on server startup, after RecoverStaleRunning and RecoverStaleQueued.
+func (p *Pool) RecoverStaleBlocked() {
+ tasks, err := p.store.ListTasks(storage.TaskFilter{State: task.StateBlocked})
+ if err != nil {
+ p.logger.Error("RecoverStaleBlocked: list tasks", "error", err)
+ return
+ }
+ for _, t := range tasks {
+ p.maybeUnblockParent(t.ID)
+ }
+}
+
// terminalFailureStates are dependency states that cause the waiting task to fail immediately.
var terminalFailureStates = map[task.State]bool{
task.StateFailed: true,
diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go
index 610ed3b..878a32d 100644
--- a/internal/executor/executor_test.go
+++ b/internal/executor/executor_test.go
@@ -680,6 +680,65 @@ func TestPool_RecoverStaleQueued_SkipsNonQueuedTasks(t *testing.T) {
}
}
+func TestPool_RecoverStaleBlocked_UnblocksWhenAllSubtasksCompleted(t *testing.T) {
+ store := testStore(t)
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger)
+
+ // Parent task stuck in BLOCKED state (server restarted after subtasks completed).
+ parent := makeTask("parent-stale-blocked")
+ parent.State = task.StateBlocked
+ store.CreateTask(parent)
+
+ // All subtasks completed.
+ for i := 0; i < 3; i++ {
+ sub := makeTask(fmt.Sprintf("sub-%d", i))
+ sub.ParentTaskID = parent.ID
+ sub.State = task.StateCompleted
+ store.CreateTask(sub)
+ }
+
+ pool.RecoverStaleBlocked()
+
+ got, err := store.GetTask(parent.ID)
+ if err != nil {
+ t.Fatalf("get task: %v", err)
+ }
+ if got.State != task.StateReady {
+ t.Errorf("parent state: want READY, got %q", got.State)
+ }
+}
+
+func TestPool_RecoverStaleBlocked_KeepsBlockedWhenSubtaskIncomplete(t *testing.T) {
+ store := testStore(t)
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger)
+
+ parent := makeTask("parent-still-blocked")
+ parent.State = task.StateBlocked
+ store.CreateTask(parent)
+
+ sub1 := makeTask("sub-done")
+ sub1.ParentTaskID = parent.ID
+ sub1.State = task.StateCompleted
+ store.CreateTask(sub1)
+
+ sub2 := makeTask("sub-running")
+ sub2.ParentTaskID = parent.ID
+ sub2.State = task.StateRunning
+ store.CreateTask(sub2)
+
+ pool.RecoverStaleBlocked()
+
+ got, err := store.GetTask(parent.ID)
+ if err != nil {
+ t.Fatalf("get task: %v", err)
+ }
+ if got.State != task.StateBlocked {
+ t.Errorf("parent state: want BLOCKED, got %q", got.State)
+ }
+}
+
func TestPool_ActivePerAgent_DeletesZeroEntries(t *testing.T) {
store := testStore(t)
runner := &mockRunner{}