diff options
| author | Peter Stone <thepeterstone@gmail.com> | 2026-03-15 23:32:14 +0000 |
|---|---|---|
| committer | Peter Stone <thepeterstone@gmail.com> | 2026-03-15 23:32:14 +0000 |
| commit | b91636be171bc8e43a3dba1b54f78e2048356ab1 (patch) | |
| tree | aa1e6bad56a09a438d31b8262bdef8ed5a61ad76 /internal/executor | |
| parent | 52f6bdee9297b48938242d3ac843cc054d7dbcaa (diff) | |
fix: promote stale BLOCKED parent tasks to READY on server startup
When the server restarts after all subtasks complete, the parent task
was left stuck in BLOCKED state because maybeUnblockParent only fires
during a live executor run. RecoverStaleBlocked() scans all BLOCKED
tasks on startup and re-evaluates them using the existing logic.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/executor')
| -rw-r--r-- | internal/executor/executor.go | 15 | ||||
| -rw-r--r-- | internal/executor/executor_test.go | 59 |
2 files changed, 74 insertions, 0 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go index f85f1ff..c07171b 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -634,6 +634,21 @@ func (p *Pool) RecoverStaleQueued(ctx context.Context) { } } +// RecoverStaleBlocked promotes any BLOCKED parent task to READY when all of its +// subtasks are already COMPLETED. This handles the case where the server was +// restarted after subtasks finished but before maybeUnblockParent could fire. +// Call this once on server startup, after RecoverStaleRunning and RecoverStaleQueued. +func (p *Pool) RecoverStaleBlocked() { + tasks, err := p.store.ListTasks(storage.TaskFilter{State: task.StateBlocked}) + if err != nil { + p.logger.Error("RecoverStaleBlocked: list tasks", "error", err) + return + } + for _, t := range tasks { + p.maybeUnblockParent(t.ID) + } +} + // terminalFailureStates are dependency states that cause the waiting task to fail immediately. var terminalFailureStates = map[task.State]bool{ task.StateFailed: true, diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go index 610ed3b..878a32d 100644 --- a/internal/executor/executor_test.go +++ b/internal/executor/executor_test.go @@ -680,6 +680,65 @@ func TestPool_RecoverStaleQueued_SkipsNonQueuedTasks(t *testing.T) { } } +func TestPool_RecoverStaleBlocked_UnblocksWhenAllSubtasksCompleted(t *testing.T) { + store := testStore(t) + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger) + + // Parent task stuck in BLOCKED state (server restarted after subtasks completed). + parent := makeTask("parent-stale-blocked") + parent.State = task.StateBlocked + store.CreateTask(parent) + + // All subtasks completed. + for i := 0; i < 3; i++ { + sub := makeTask(fmt.Sprintf("sub-%d", i)) + sub.ParentTaskID = parent.ID + sub.State = task.StateCompleted + store.CreateTask(sub) + } + + pool.RecoverStaleBlocked() + + got, err := store.GetTask(parent.ID) + if err != nil { + t.Fatalf("get task: %v", err) + } + if got.State != task.StateReady { + t.Errorf("parent state: want READY, got %q", got.State) + } +} + +func TestPool_RecoverStaleBlocked_KeepsBlockedWhenSubtaskIncomplete(t *testing.T) { + store := testStore(t) + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger) + + parent := makeTask("parent-still-blocked") + parent.State = task.StateBlocked + store.CreateTask(parent) + + sub1 := makeTask("sub-done") + sub1.ParentTaskID = parent.ID + sub1.State = task.StateCompleted + store.CreateTask(sub1) + + sub2 := makeTask("sub-running") + sub2.ParentTaskID = parent.ID + sub2.State = task.StateRunning + store.CreateTask(sub2) + + pool.RecoverStaleBlocked() + + got, err := store.GetTask(parent.ID) + if err != nil { + t.Fatalf("get task: %v", err) + } + if got.State != task.StateBlocked { + t.Errorf("parent state: want BLOCKED, got %q", got.State) + } +} + func TestPool_ActivePerAgent_DeletesZeroEntries(t *testing.T) { store := testStore(t) runner := &mockRunner{} |
