diff options
| author | Claudomator Agent <agent@claudomator> | 2026-03-09 07:41:37 +0000 |
|---|---|---|
| committer | Claudomator Agent <agent@claudomator> | 2026-03-09 07:41:37 +0000 |
| commit | 441ed9eef3d9691cd9269772857307b84a7f5700 (patch) | |
| tree | 6143f175382ab74f9de9c664a7d6f351fdc78553 /internal/executor/executor.go | |
| parent | 96d1c439ce27be80b751ea0085f53602606473d1 (diff) | |
executor: BLOCKED→READY for top-level tasks with subtasks
When a top-level task (ParentTaskID == "") finishes successfully,
check for subtasks before deciding the next state:
- subtasks exist → BLOCKED (waiting for subtasks to complete)
- no subtasks → READY (existing behavior, unchanged)
This applies to both execute() and executeResume().
Adds ListSubtasks to the Store interface.
Tests:
- TestPool_Submit_TopLevel_WithSubtasks_GoesBlocked
- TestPool_Submit_TopLevel_NoSubtasks_GoesReady
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/executor/executor.go')
| -rw-r--r-- | internal/executor/executor.go | 35 |
1 files changed, 29 insertions, 6 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go index baeb399..41377b2 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -20,6 +20,7 @@ import ( type Store interface { GetTask(id string) (*task.Task, error) ListTasks(filter storage.TaskFilter) ([]*task.Task, error) + ListSubtasks(parentID string) ([]*task.Task, error) ListExecutions(taskID string) ([]*storage.Execution, error) CreateExecution(e *storage.Execution) error UpdateExecution(e *storage.Execution) error @@ -297,9 +298,20 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex } } else { if t.ParentTaskID == "" { - exec.Status = "READY" - if err := p.store.UpdateTaskState(t.ID, task.StateReady); err != nil { - p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateReady, "error", err) + subtasks, subErr := p.store.ListSubtasks(t.ID) + if subErr != nil { + p.logger.Error("failed to list subtasks", "taskID", t.ID, "error", subErr) + } + if subErr == nil && len(subtasks) > 0 { + exec.Status = "BLOCKED" + if err := p.store.UpdateTaskState(t.ID, task.StateBlocked); err != nil { + p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateBlocked, "error", err) + } + } else { + exec.Status = "READY" + if err := p.store.UpdateTaskState(t.ID, task.StateReady); err != nil { + p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateReady, "error", err) + } } } else { exec.Status = "COMPLETED" @@ -532,9 +544,20 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { } } else { if t.ParentTaskID == "" { - exec.Status = "READY" - if err := p.store.UpdateTaskState(t.ID, task.StateReady); err != nil { - p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateReady, "error", err) + subtasks, subErr := p.store.ListSubtasks(t.ID) + if subErr != nil { + p.logger.Error("failed to list subtasks", "taskID", t.ID, "error", subErr) + } + if subErr == nil && len(subtasks) > 0 { + exec.Status = "BLOCKED" + if err := p.store.UpdateTaskState(t.ID, task.StateBlocked); err != nil { + p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateBlocked, "error", err) + } + } else { + exec.Status = "READY" + if err := p.store.UpdateTaskState(t.ID, task.StateReady); err != nil { + p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateReady, "error", err) + } } } else { exec.Status = "COMPLETED" |
