summaryrefslogtreecommitdiff
path: root/internal/executor/executor.go
diff options
context:
space:
mode:
authorClaudomator Agent <agent@claudomator>2026-03-09 07:41:37 +0000
committerClaudomator Agent <agent@claudomator>2026-03-09 07:41:37 +0000
commit441ed9eef3d9691cd9269772857307b84a7f5700 (patch)
tree6143f175382ab74f9de9c664a7d6f351fdc78553 /internal/executor/executor.go
parent96d1c439ce27be80b751ea0085f53602606473d1 (diff)
executor: BLOCKED→READY for top-level tasks with subtasks
When a top-level task (ParentTaskID == "") finishes successfully, check for subtasks before deciding the next state: - subtasks exist → BLOCKED (waiting for subtasks to complete) - no subtasks → READY (existing behavior, unchanged) This applies to both execute() and executeResume(). Adds ListSubtasks to the Store interface. Tests: - TestPool_Submit_TopLevel_WithSubtasks_GoesBlocked - TestPool_Submit_TopLevel_NoSubtasks_GoesReady Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/executor/executor.go')
-rw-r--r--internal/executor/executor.go35
1 files changed, 29 insertions, 6 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index baeb399..41377b2 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -20,6 +20,7 @@ import (
type Store interface {
GetTask(id string) (*task.Task, error)
ListTasks(filter storage.TaskFilter) ([]*task.Task, error)
+ ListSubtasks(parentID string) ([]*task.Task, error)
ListExecutions(taskID string) ([]*storage.Execution, error)
CreateExecution(e *storage.Execution) error
UpdateExecution(e *storage.Execution) error
@@ -297,9 +298,20 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex
}
} else {
if t.ParentTaskID == "" {
- exec.Status = "READY"
- if err := p.store.UpdateTaskState(t.ID, task.StateReady); err != nil {
- p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateReady, "error", err)
+ subtasks, subErr := p.store.ListSubtasks(t.ID)
+ if subErr != nil {
+ p.logger.Error("failed to list subtasks", "taskID", t.ID, "error", subErr)
+ }
+ if subErr == nil && len(subtasks) > 0 {
+ exec.Status = "BLOCKED"
+ if err := p.store.UpdateTaskState(t.ID, task.StateBlocked); err != nil {
+ p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateBlocked, "error", err)
+ }
+ } else {
+ exec.Status = "READY"
+ if err := p.store.UpdateTaskState(t.ID, task.StateReady); err != nil {
+ p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateReady, "error", err)
+ }
}
} else {
exec.Status = "COMPLETED"
@@ -532,9 +544,20 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
}
} else {
if t.ParentTaskID == "" {
- exec.Status = "READY"
- if err := p.store.UpdateTaskState(t.ID, task.StateReady); err != nil {
- p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateReady, "error", err)
+ subtasks, subErr := p.store.ListSubtasks(t.ID)
+ if subErr != nil {
+ p.logger.Error("failed to list subtasks", "taskID", t.ID, "error", subErr)
+ }
+ if subErr == nil && len(subtasks) > 0 {
+ exec.Status = "BLOCKED"
+ if err := p.store.UpdateTaskState(t.ID, task.StateBlocked); err != nil {
+ p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateBlocked, "error", err)
+ }
+ } else {
+ exec.Status = "READY"
+ if err := p.store.UpdateTaskState(t.ID, task.StateReady); err != nil {
+ p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateReady, "error", err)
+ }
}
} else {
exec.Status = "COMPLETED"