summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPeter Stone <thepeterstone@gmail.com>2026-03-26 05:45:19 +0000
committerPeter Stone <thepeterstone@gmail.com>2026-03-26 05:45:19 +0000
commit2710eb8a3a58abbea95bd487797abbb3e67f0d0a (patch)
treea461743fb822306b82bf48196706f883d9d97353
parentb009880307298abea11efad92da2cd955afafe99 (diff)
fix: resolve dep-chain deadlock; broadcast task_started for UI visibility
With maxPerAgent=1, tasks with DependsOn were entering waitForDependencies while holding the per-agent slot, preventing the dependency from ever running. Fix: check deps before taking the slot. If not ready, requeue without holding activePerAgent. Also accept StateReady (leaf tasks) as a satisfied dependency, not just StateCompleted. Add startedCh to pool and broadcast task_started WebSocket event when a task transitions to RUNNING, so the UI immediately shows the running state during the clone phase instead of waiting for completion. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
-rw-r--r--internal/api/server.go13
-rw-r--r--internal/executor/executor.go106
-rw-r--r--web/app.js1
3 files changed, 89 insertions, 31 deletions
diff --git a/internal/api/server.go b/internal/api/server.go
index fc9bd63..38108de 100644
--- a/internal/api/server.go
+++ b/internal/api/server.go
@@ -161,8 +161,19 @@ func (s *Server) routes() {
s.mux.Handle("GET /", http.FileServerFS(webui.Files))
}
-// forwardResults listens on the executor pool's result channel and broadcasts via WebSocket.
+// forwardResults listens on the executor pool's result and started channels and broadcasts via WebSocket.
func (s *Server) forwardResults() {
+ go func() {
+ for taskID := range s.pool.Started() {
+ event := map[string]interface{}{
+ "type": "task_started",
+ "task_id": taskID,
+ "timestamp": time.Now().UTC(),
+ }
+ data, _ := json.Marshal(event)
+ s.hub.Broadcast(data)
+ }
+ }()
for result := range s.pool.Results() {
s.processResult(result)
}
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index b8979a1..9c4aac1 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -77,6 +77,7 @@ type Pool struct {
consecutiveFailures map[string]int // agentType -> count
drained map[string]bool // agentType -> true if halted pending human ack
resultCh chan *Result
+ startedCh chan string // task IDs that just transitioned to RUNNING
workCh chan workItem // internal bounded queue; Submit enqueues here
doneCh chan struct{} // signals when a worker slot is freed
Questions *QuestionRegistry
@@ -108,6 +109,7 @@ func NewPool(maxConcurrent int, runners map[string]Runner, store Store, logger *
consecutiveFailures: make(map[string]int),
drained: make(map[string]bool),
resultCh: make(chan *Result, maxConcurrent*2),
+ startedCh: make(chan string, maxConcurrent*2),
workCh: make(chan workItem, maxConcurrent*10+100),
doneCh: make(chan struct{}, maxConcurrent),
Questions: NewQuestionRegistry(),
@@ -151,6 +153,11 @@ func (p *Pool) Submit(ctx context.Context, t *task.Task) error {
}
}
+// Started returns a channel that emits task IDs when they transition to RUNNING.
+func (p *Pool) Started() <-chan string {
+ return p.startedCh
+}
+
// Results returns the channel for reading execution results.
func (p *Pool) Results() <-chan *Result {
return p.resultCh
@@ -257,6 +264,10 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex
if err := p.store.UpdateTaskState(t.ID, task.StateRunning); err != nil {
p.logger.Error("failed to update task state", "error", err)
}
+ select {
+ case p.startedCh <- t.ID:
+ default:
+ }
var cancel context.CancelFunc
if t.Timeout.Duration > 0 {
@@ -759,6 +770,40 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
time.AfterFunc(2*time.Minute, func() { p.workCh <- workItem{ctx: ctx, task: t} })
return
}
+ // Check dependencies before taking the per-agent slot to avoid deadlock:
+ // if a dependent task holds the slot while waiting for its dependency to run,
+ // the dependency can never start (maxPerAgent=1).
+ p.mu.Unlock()
+ if len(t.DependsOn) > 0 {
+ ready, depErr := p.checkDepsReady(t)
+ if depErr != nil {
+ // A dependency hit a terminal failure — fail this task immediately.
+ now := time.Now().UTC()
+ exec := &storage.Execution{
+ ID: uuid.New().String(),
+ TaskID: t.ID,
+ StartTime: now,
+ EndTime: now,
+ Status: "FAILED",
+ ErrorMsg: depErr.Error(),
+ }
+ if createErr := p.store.CreateExecution(exec); createErr != nil {
+ p.logger.Error("failed to create execution record", "error", createErr)
+ }
+ if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil {
+ p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateFailed, "error", err)
+ }
+ p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: depErr}
+ return
+ }
+ if !ready {
+ // Dependencies not yet done — requeue without holding the slot.
+ time.AfterFunc(p.requeueDelay, func() { p.workCh <- workItem{ctx: ctx, task: t} })
+ return
+ }
+ }
+ p.mu.Lock()
+
if p.activePerAgent[agentType] >= p.maxPerAgent {
p.mu.Unlock()
time.AfterFunc(p.requeueDelay, func() { p.workCh <- workItem{ctx: ctx, task: t} })
@@ -810,35 +855,6 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
return
}
- // Wait for all dependencies to complete before starting execution.
- if len(t.DependsOn) > 0 {
- if err := p.waitForDependencies(ctx, t); err != nil {
- now := time.Now().UTC()
- exec := &storage.Execution{
- ID: uuid.New().String(),
- TaskID: t.ID,
- StartTime: now,
- EndTime: now,
- Status: "FAILED",
- ErrorMsg: err.Error(),
- }
- if createErr := p.store.CreateExecution(exec); createErr != nil {
- p.logger.Error("failed to create execution record", "error", createErr)
- }
- if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil {
- p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateFailed, "error", err)
- }
- p.mu.Lock()
- p.activePerAgent[agentType]--
- if p.activePerAgent[agentType] == 0 {
- delete(p.activePerAgent, agentType)
- }
- p.mu.Unlock()
- p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err}
- return
- }
- }
-
execID := uuid.New().String()
exec := &storage.Execution{
ID: execID,
@@ -864,6 +880,10 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
if err := p.store.UpdateTaskState(t.ID, task.StateRunning); err != nil {
p.logger.Error("failed to update task state", "error", err)
}
+ select {
+ case p.startedCh <- t.ID:
+ default:
+ }
// Apply task timeout and register cancel so callers can stop this task.
var cancel context.CancelFunc
@@ -988,6 +1008,32 @@ var terminalFailureStates = map[task.State]bool{
task.StateBudgetExceeded: true,
}
+// depDoneStates are task states that satisfy a DependsOn dependency.
+var depDoneStates = map[task.State]bool{
+ task.StateCompleted: true,
+ task.StateReady: true, // leaf tasks finish at READY
+}
+
+// checkDepsReady does a single synchronous check of t.DependsOn.
+// Returns (true, nil) if all deps are done, (false, nil) if any are still pending,
+// or (false, err) if a dep entered a terminal failure state.
+func (p *Pool) checkDepsReady(t *task.Task) (bool, error) {
+ for _, depID := range t.DependsOn {
+ dep, err := p.store.GetTask(depID)
+ if err != nil {
+ return false, fmt.Errorf("dependency %q not found: %w", depID, err)
+ }
+ if depDoneStates[dep.State] {
+ continue
+ }
+ if terminalFailureStates[dep.State] {
+ return false, fmt.Errorf("dependency %q ended in state %s", depID, dep.State)
+ }
+ return false, nil // still pending
+ }
+ return true, nil
+}
+
// withFailureHistory returns a shallow copy of t with prior failed execution
// error messages prepended to SystemPromptAppend so the agent knows what went
// wrong in previous attempts.
@@ -1062,7 +1108,7 @@ func (p *Pool) waitForDependencies(ctx context.Context, t *task.Task) error {
if err != nil {
return fmt.Errorf("dependency %q not found: %w", depID, err)
}
- if dep.State == task.StateCompleted {
+ if depDoneStates[dep.State] {
continue
}
if terminalFailureStates[dep.State] {
diff --git a/web/app.js b/web/app.js
index c87c609..882ddac 100644
--- a/web/app.js
+++ b/web/app.js
@@ -1405,6 +1405,7 @@ function connectWebSocket() {
function handleWsEvent(data) {
switch (data.type) {
+ case 'task_started':
case 'task_completed':
// Force a poll immediately regardless of interval
poll();