From 2710eb8a3a58abbea95bd487797abbb3e67f0d0a Mon Sep 17 00:00:00 2001 From: Peter Stone Date: Thu, 26 Mar 2026 05:45:19 +0000 Subject: fix: resolve dep-chain deadlock; broadcast task_started for UI visibility With maxPerAgent=1, tasks with DependsOn were entering waitForDependencies while holding the per-agent slot, preventing the dependency from ever running. Fix: check deps before taking the slot. If not ready, requeue without holding activePerAgent. Also accept StateReady (leaf tasks) as a satisfied dependency, not just StateCompleted. Add startedCh to pool and broadcast task_started WebSocket event when a task transitions to RUNNING, so the UI immediately shows the running state during the clone phase instead of waiting for completion. Co-Authored-By: Claude Sonnet 4.6 --- internal/executor/executor.go | 106 ++++++++++++++++++++++++++++++------------ 1 file changed, 76 insertions(+), 30 deletions(-) (limited to 'internal/executor') diff --git a/internal/executor/executor.go b/internal/executor/executor.go index b8979a1..9c4aac1 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -77,6 +77,7 @@ type Pool struct { consecutiveFailures map[string]int // agentType -> count drained map[string]bool // agentType -> true if halted pending human ack resultCh chan *Result + startedCh chan string // task IDs that just transitioned to RUNNING workCh chan workItem // internal bounded queue; Submit enqueues here doneCh chan struct{} // signals when a worker slot is freed Questions *QuestionRegistry @@ -108,6 +109,7 @@ func NewPool(maxConcurrent int, runners map[string]Runner, store Store, logger * consecutiveFailures: make(map[string]int), drained: make(map[string]bool), resultCh: make(chan *Result, maxConcurrent*2), + startedCh: make(chan string, maxConcurrent*2), workCh: make(chan workItem, maxConcurrent*10+100), doneCh: make(chan struct{}, maxConcurrent), Questions: NewQuestionRegistry(), @@ -151,6 +153,11 @@ func (p *Pool) Submit(ctx context.Context, t *task.Task) error { } } +// Started returns a channel that emits task IDs when they transition to RUNNING. +func (p *Pool) Started() <-chan string { + return p.startedCh +} + // Results returns the channel for reading execution results. func (p *Pool) Results() <-chan *Result { return p.resultCh @@ -257,6 +264,10 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex if err := p.store.UpdateTaskState(t.ID, task.StateRunning); err != nil { p.logger.Error("failed to update task state", "error", err) } + select { + case p.startedCh <- t.ID: + default: + } var cancel context.CancelFunc if t.Timeout.Duration > 0 { @@ -759,6 +770,40 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { time.AfterFunc(2*time.Minute, func() { p.workCh <- workItem{ctx: ctx, task: t} }) return } + // Check dependencies before taking the per-agent slot to avoid deadlock: + // if a dependent task holds the slot while waiting for its dependency to run, + // the dependency can never start (maxPerAgent=1). + p.mu.Unlock() + if len(t.DependsOn) > 0 { + ready, depErr := p.checkDepsReady(t) + if depErr != nil { + // A dependency hit a terminal failure — fail this task immediately. + now := time.Now().UTC() + exec := &storage.Execution{ + ID: uuid.New().String(), + TaskID: t.ID, + StartTime: now, + EndTime: now, + Status: "FAILED", + ErrorMsg: depErr.Error(), + } + if createErr := p.store.CreateExecution(exec); createErr != nil { + p.logger.Error("failed to create execution record", "error", createErr) + } + if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil { + p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateFailed, "error", err) + } + p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: depErr} + return + } + if !ready { + // Dependencies not yet done — requeue without holding the slot. + time.AfterFunc(p.requeueDelay, func() { p.workCh <- workItem{ctx: ctx, task: t} }) + return + } + } + p.mu.Lock() + if p.activePerAgent[agentType] >= p.maxPerAgent { p.mu.Unlock() time.AfterFunc(p.requeueDelay, func() { p.workCh <- workItem{ctx: ctx, task: t} }) @@ -810,35 +855,6 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { return } - // Wait for all dependencies to complete before starting execution. - if len(t.DependsOn) > 0 { - if err := p.waitForDependencies(ctx, t); err != nil { - now := time.Now().UTC() - exec := &storage.Execution{ - ID: uuid.New().String(), - TaskID: t.ID, - StartTime: now, - EndTime: now, - Status: "FAILED", - ErrorMsg: err.Error(), - } - if createErr := p.store.CreateExecution(exec); createErr != nil { - p.logger.Error("failed to create execution record", "error", createErr) - } - if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil { - p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateFailed, "error", err) - } - p.mu.Lock() - p.activePerAgent[agentType]-- - if p.activePerAgent[agentType] == 0 { - delete(p.activePerAgent, agentType) - } - p.mu.Unlock() - p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} - return - } - } - execID := uuid.New().String() exec := &storage.Execution{ ID: execID, @@ -864,6 +880,10 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { if err := p.store.UpdateTaskState(t.ID, task.StateRunning); err != nil { p.logger.Error("failed to update task state", "error", err) } + select { + case p.startedCh <- t.ID: + default: + } // Apply task timeout and register cancel so callers can stop this task. var cancel context.CancelFunc @@ -988,6 +1008,32 @@ var terminalFailureStates = map[task.State]bool{ task.StateBudgetExceeded: true, } +// depDoneStates are task states that satisfy a DependsOn dependency. +var depDoneStates = map[task.State]bool{ + task.StateCompleted: true, + task.StateReady: true, // leaf tasks finish at READY +} + +// checkDepsReady does a single synchronous check of t.DependsOn. +// Returns (true, nil) if all deps are done, (false, nil) if any are still pending, +// or (false, err) if a dep entered a terminal failure state. +func (p *Pool) checkDepsReady(t *task.Task) (bool, error) { + for _, depID := range t.DependsOn { + dep, err := p.store.GetTask(depID) + if err != nil { + return false, fmt.Errorf("dependency %q not found: %w", depID, err) + } + if depDoneStates[dep.State] { + continue + } + if terminalFailureStates[dep.State] { + return false, fmt.Errorf("dependency %q ended in state %s", depID, dep.State) + } + return false, nil // still pending + } + return true, nil +} + // withFailureHistory returns a shallow copy of t with prior failed execution // error messages prepended to SystemPromptAppend so the agent knows what went // wrong in previous attempts. @@ -1062,7 +1108,7 @@ func (p *Pool) waitForDependencies(ctx context.Context, t *task.Task) error { if err != nil { return fmt.Errorf("dependency %q not found: %w", depID, err) } - if dep.State == task.StateCompleted { + if depDoneStates[dep.State] { continue } if terminalFailureStates[dep.State] { -- cgit v1.2.3