summaryrefslogtreecommitdiff
path: root/internal/executor
diff options
context:
space:
mode:
authorPeter Stone <thepeterstone@gmail.com>2026-03-15 23:32:14 +0000
committerPeter Stone <thepeterstone@gmail.com>2026-03-15 23:32:14 +0000
commitb91636be171bc8e43a3dba1b54f78e2048356ab1 (patch)
treeaa1e6bad56a09a438d31b8262bdef8ed5a61ad76 /internal/executor
parent52f6bdee9297b48938242d3ac843cc054d7dbcaa (diff)
fix: promote stale BLOCKED parent tasks to READY on server startup
When the server restarts after all subtasks complete, the parent task was left stuck in BLOCKED state because maybeUnblockParent only fires during a live executor run. RecoverStaleBlocked() scans all BLOCKED tasks on startup and re-evaluates them using the existing logic. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/executor')
-rw-r--r--internal/executor/executor.go15
-rw-r--r--internal/executor/executor_test.go59
2 files changed, 74 insertions, 0 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index f85f1ff..c07171b 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -634,6 +634,21 @@ func (p *Pool) RecoverStaleQueued(ctx context.Context) {
}
}
+// RecoverStaleBlocked promotes any BLOCKED parent task to READY when all of its
+// subtasks are already COMPLETED. This handles the case where the server was
+// restarted after subtasks finished but before maybeUnblockParent could fire.
+// Call this once on server startup, after RecoverStaleRunning and RecoverStaleQueued.
+func (p *Pool) RecoverStaleBlocked() {
+ tasks, err := p.store.ListTasks(storage.TaskFilter{State: task.StateBlocked})
+ if err != nil {
+ p.logger.Error("RecoverStaleBlocked: list tasks", "error", err)
+ return
+ }
+ for _, t := range tasks {
+ p.maybeUnblockParent(t.ID)
+ }
+}
+
// terminalFailureStates are dependency states that cause the waiting task to fail immediately.
var terminalFailureStates = map[task.State]bool{
task.StateFailed: true,
diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go
index 610ed3b..878a32d 100644
--- a/internal/executor/executor_test.go
+++ b/internal/executor/executor_test.go
@@ -680,6 +680,65 @@ func TestPool_RecoverStaleQueued_SkipsNonQueuedTasks(t *testing.T) {
}
}
+func TestPool_RecoverStaleBlocked_UnblocksWhenAllSubtasksCompleted(t *testing.T) {
+ store := testStore(t)
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger)
+
+ // Parent task stuck in BLOCKED state (server restarted after subtasks completed).
+ parent := makeTask("parent-stale-blocked")
+ parent.State = task.StateBlocked
+ store.CreateTask(parent)
+
+ // All subtasks completed.
+ for i := 0; i < 3; i++ {
+ sub := makeTask(fmt.Sprintf("sub-%d", i))
+ sub.ParentTaskID = parent.ID
+ sub.State = task.StateCompleted
+ store.CreateTask(sub)
+ }
+
+ pool.RecoverStaleBlocked()
+
+ got, err := store.GetTask(parent.ID)
+ if err != nil {
+ t.Fatalf("get task: %v", err)
+ }
+ if got.State != task.StateReady {
+ t.Errorf("parent state: want READY, got %q", got.State)
+ }
+}
+
+func TestPool_RecoverStaleBlocked_KeepsBlockedWhenSubtaskIncomplete(t *testing.T) {
+ store := testStore(t)
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger)
+
+ parent := makeTask("parent-still-blocked")
+ parent.State = task.StateBlocked
+ store.CreateTask(parent)
+
+ sub1 := makeTask("sub-done")
+ sub1.ParentTaskID = parent.ID
+ sub1.State = task.StateCompleted
+ store.CreateTask(sub1)
+
+ sub2 := makeTask("sub-running")
+ sub2.ParentTaskID = parent.ID
+ sub2.State = task.StateRunning
+ store.CreateTask(sub2)
+
+ pool.RecoverStaleBlocked()
+
+ got, err := store.GetTask(parent.ID)
+ if err != nil {
+ t.Fatalf("get task: %v", err)
+ }
+ if got.State != task.StateBlocked {
+ t.Errorf("parent state: want BLOCKED, got %q", got.State)
+ }
+}
+
func TestPool_ActivePerAgent_DeletesZeroEntries(t *testing.T) {
store := testStore(t)
runner := &mockRunner{}