summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
Diffstat (limited to 'internal')
-rw-r--r--internal/executor/executor.go35
-rw-r--r--internal/executor/executor_test.go64
2 files changed, 93 insertions, 6 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index baeb399..41377b2 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -20,6 +20,7 @@ import (
type Store interface {
GetTask(id string) (*task.Task, error)
ListTasks(filter storage.TaskFilter) ([]*task.Task, error)
+ ListSubtasks(parentID string) ([]*task.Task, error)
ListExecutions(taskID string) ([]*storage.Execution, error)
CreateExecution(e *storage.Execution) error
UpdateExecution(e *storage.Execution) error
@@ -297,9 +298,20 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex
}
} else {
if t.ParentTaskID == "" {
- exec.Status = "READY"
- if err := p.store.UpdateTaskState(t.ID, task.StateReady); err != nil {
- p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateReady, "error", err)
+ subtasks, subErr := p.store.ListSubtasks(t.ID)
+ if subErr != nil {
+ p.logger.Error("failed to list subtasks", "taskID", t.ID, "error", subErr)
+ }
+ if subErr == nil && len(subtasks) > 0 {
+ exec.Status = "BLOCKED"
+ if err := p.store.UpdateTaskState(t.ID, task.StateBlocked); err != nil {
+ p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateBlocked, "error", err)
+ }
+ } else {
+ exec.Status = "READY"
+ if err := p.store.UpdateTaskState(t.ID, task.StateReady); err != nil {
+ p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateReady, "error", err)
+ }
}
} else {
exec.Status = "COMPLETED"
@@ -532,9 +544,20 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
}
} else {
if t.ParentTaskID == "" {
- exec.Status = "READY"
- if err := p.store.UpdateTaskState(t.ID, task.StateReady); err != nil {
- p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateReady, "error", err)
+ subtasks, subErr := p.store.ListSubtasks(t.ID)
+ if subErr != nil {
+ p.logger.Error("failed to list subtasks", "taskID", t.ID, "error", subErr)
+ }
+ if subErr == nil && len(subtasks) > 0 {
+ exec.Status = "BLOCKED"
+ if err := p.store.UpdateTaskState(t.ID, task.StateBlocked); err != nil {
+ p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateBlocked, "error", err)
+ }
+ } else {
+ exec.Status = "READY"
+ if err := p.store.UpdateTaskState(t.ID, task.StateReady); err != nil {
+ p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateReady, "error", err)
+ }
}
} else {
exec.Status = "COMPLETED"
diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go
index 1adba7e..91f7636 100644
--- a/internal/executor/executor_test.go
+++ b/internal/executor/executor_test.go
@@ -620,6 +620,70 @@ func TestPool_RateLimited_StaleEntryCleaned(t *testing.T) {
}
}
+// TestPool_Submit_TopLevel_NoSubtasks_GoesReady verifies that a top-level task
+// with no subtasks still transitions to READY after successful execution.
+func TestPool_Submit_TopLevel_NoSubtasks_GoesReady(t *testing.T) {
+ store := testStore(t)
+ runner := &mockRunner{}
+ runners := map[string]Runner{"claude": runner}
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, runners, store, logger)
+
+ tk := makeTask("no-subtasks-1") // no ParentTaskID, no subtasks
+ store.CreateTask(tk)
+
+ if err := pool.Submit(context.Background(), tk); err != nil {
+ t.Fatalf("submit: %v", err)
+ }
+
+ result := <-pool.Results()
+ if result.Err != nil {
+ t.Errorf("expected no error, got: %v", result.Err)
+ }
+ if result.Execution.Status != "READY" {
+ t.Errorf("status: want READY, got %q", result.Execution.Status)
+ }
+ got, _ := store.GetTask(tk.ID)
+ if got.State != task.StateReady {
+ t.Errorf("task state: want READY, got %v", got.State)
+ }
+}
+
+// TestPool_Submit_TopLevel_WithSubtasks_GoesBlocked verifies that when a
+// top-level task finishes successfully but has subtasks, it transitions to
+// BLOCKED (waiting for subtasks) rather than READY.
+func TestPool_Submit_TopLevel_WithSubtasks_GoesBlocked(t *testing.T) {
+ store := testStore(t)
+ runner := &mockRunner{}
+ runners := map[string]Runner{"claude": runner}
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, runners, store, logger)
+
+ parent := makeTask("parent-with-subtasks")
+ store.CreateTask(parent)
+
+ // Create a subtask in the store but do NOT submit it.
+ sub := makeTask("sub-of-parent")
+ sub.ParentTaskID = parent.ID
+ store.CreateTask(sub)
+
+ if err := pool.Submit(context.Background(), parent); err != nil {
+ t.Fatalf("submit: %v", err)
+ }
+
+ result := <-pool.Results()
+ if result.Err != nil {
+ t.Errorf("expected no error, got: %v", result.Err)
+ }
+ if result.Execution.Status != "BLOCKED" {
+ t.Errorf("status: want BLOCKED, got %q", result.Execution.Status)
+ }
+ got, _ := store.GetTask(parent.ID)
+ if got.State != task.StateBlocked {
+ t.Errorf("task state: want BLOCKED, got %v", got.State)
+ }
+}
+
func TestPool_UnsupportedAgent(t *testing.T) {
store := testStore(t)
runners := map[string]Runner{"claude": &mockRunner{}}