From 441ed9eef3d9691cd9269772857307b84a7f5700 Mon Sep 17 00:00:00 2001 From: Claudomator Agent Date: Mon, 9 Mar 2026 07:41:37 +0000 Subject: executor: BLOCKED→READY for top-level tasks with subtasks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a top-level task (ParentTaskID == "") finishes successfully, check for subtasks before deciding the next state: - subtasks exist → BLOCKED (waiting for subtasks to complete) - no subtasks → READY (existing behavior, unchanged) This applies to both execute() and executeResume(). Adds ListSubtasks to the Store interface. Tests: - TestPool_Submit_TopLevel_WithSubtasks_GoesBlocked - TestPool_Submit_TopLevel_NoSubtasks_GoesReady Co-Authored-By: Claude Sonnet 4.6 --- internal/executor/executor.go | 35 +++++++++++++++++---- internal/executor/executor_test.go | 64 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 93 insertions(+), 6 deletions(-) (limited to 'internal') diff --git a/internal/executor/executor.go b/internal/executor/executor.go index baeb399..41377b2 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -20,6 +20,7 @@ import ( type Store interface { GetTask(id string) (*task.Task, error) ListTasks(filter storage.TaskFilter) ([]*task.Task, error) + ListSubtasks(parentID string) ([]*task.Task, error) ListExecutions(taskID string) ([]*storage.Execution, error) CreateExecution(e *storage.Execution) error UpdateExecution(e *storage.Execution) error @@ -297,9 +298,20 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex } } else { if t.ParentTaskID == "" { - exec.Status = "READY" - if err := p.store.UpdateTaskState(t.ID, task.StateReady); err != nil { - p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateReady, "error", err) + subtasks, subErr := p.store.ListSubtasks(t.ID) + if subErr != nil { + p.logger.Error("failed to list subtasks", "taskID", t.ID, "error", subErr) + } + if subErr == nil && len(subtasks) > 0 { + exec.Status = "BLOCKED" + if err := p.store.UpdateTaskState(t.ID, task.StateBlocked); err != nil { + p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateBlocked, "error", err) + } + } else { + exec.Status = "READY" + if err := p.store.UpdateTaskState(t.ID, task.StateReady); err != nil { + p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateReady, "error", err) + } } } else { exec.Status = "COMPLETED" @@ -532,9 +544,20 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { } } else { if t.ParentTaskID == "" { - exec.Status = "READY" - if err := p.store.UpdateTaskState(t.ID, task.StateReady); err != nil { - p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateReady, "error", err) + subtasks, subErr := p.store.ListSubtasks(t.ID) + if subErr != nil { + p.logger.Error("failed to list subtasks", "taskID", t.ID, "error", subErr) + } + if subErr == nil && len(subtasks) > 0 { + exec.Status = "BLOCKED" + if err := p.store.UpdateTaskState(t.ID, task.StateBlocked); err != nil { + p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateBlocked, "error", err) + } + } else { + exec.Status = "READY" + if err := p.store.UpdateTaskState(t.ID, task.StateReady); err != nil { + p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateReady, "error", err) + } } } else { exec.Status = "COMPLETED" diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go index 1adba7e..91f7636 100644 --- a/internal/executor/executor_test.go +++ b/internal/executor/executor_test.go @@ -620,6 +620,70 @@ func TestPool_RateLimited_StaleEntryCleaned(t *testing.T) { } } +// TestPool_Submit_TopLevel_NoSubtasks_GoesReady verifies that a top-level task +// with no subtasks still transitions to READY after successful execution. +func TestPool_Submit_TopLevel_NoSubtasks_GoesReady(t *testing.T) { + store := testStore(t) + runner := &mockRunner{} + runners := map[string]Runner{"claude": runner} + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + pool := NewPool(2, runners, store, logger) + + tk := makeTask("no-subtasks-1") // no ParentTaskID, no subtasks + store.CreateTask(tk) + + if err := pool.Submit(context.Background(), tk); err != nil { + t.Fatalf("submit: %v", err) + } + + result := <-pool.Results() + if result.Err != nil { + t.Errorf("expected no error, got: %v", result.Err) + } + if result.Execution.Status != "READY" { + t.Errorf("status: want READY, got %q", result.Execution.Status) + } + got, _ := store.GetTask(tk.ID) + if got.State != task.StateReady { + t.Errorf("task state: want READY, got %v", got.State) + } +} + +// TestPool_Submit_TopLevel_WithSubtasks_GoesBlocked verifies that when a +// top-level task finishes successfully but has subtasks, it transitions to +// BLOCKED (waiting for subtasks) rather than READY. +func TestPool_Submit_TopLevel_WithSubtasks_GoesBlocked(t *testing.T) { + store := testStore(t) + runner := &mockRunner{} + runners := map[string]Runner{"claude": runner} + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + pool := NewPool(2, runners, store, logger) + + parent := makeTask("parent-with-subtasks") + store.CreateTask(parent) + + // Create a subtask in the store but do NOT submit it. + sub := makeTask("sub-of-parent") + sub.ParentTaskID = parent.ID + store.CreateTask(sub) + + if err := pool.Submit(context.Background(), parent); err != nil { + t.Fatalf("submit: %v", err) + } + + result := <-pool.Results() + if result.Err != nil { + t.Errorf("expected no error, got: %v", result.Err) + } + if result.Execution.Status != "BLOCKED" { + t.Errorf("status: want BLOCKED, got %q", result.Execution.Status) + } + got, _ := store.GetTask(parent.ID) + if got.State != task.StateBlocked { + t.Errorf("task state: want BLOCKED, got %v", got.State) + } +} + func TestPool_UnsupportedAgent(t *testing.T) { store := testStore(t) runners := map[string]Runner{"claude": &mockRunner{}} -- cgit v1.2.3