diff options
| author | Peter Stone <thepeterstone@gmail.com> | 2026-04-03 08:39:32 +0000 |
|---|---|---|
| committer | Peter Stone <thepeterstone@gmail.com> | 2026-04-03 08:39:32 +0000 |
| commit | 5aa6a15ffdf68a8dbe12eb0fdfff93deafb9da10 (patch) | |
| tree | 31c6a3617b2bb10814dca4b9db5a5a46ce2ba3c5 /internal/executor/executor.go | |
| parent | 2461fcf03ed2a49c7ac4aaed77fdde1aa01177c4 (diff) | |
fix: remove drain-lock circuit breaker that halted all executions after 3 consecutive failures
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/executor/executor.go')
| -rw-r--r-- | internal/executor/executor.go | 34 |
1 files changed, 0 insertions, 34 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 48626ec..a1f29ed 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -75,7 +75,6 @@ type Pool struct { rateLimited map[string]time.Time // agentType -> until cancels map[string]context.CancelFunc // taskID → cancel consecutiveFailures map[string]int // agentType -> count - drained map[string]bool // agentType -> true if halted pending human ack resultCh chan *Result startedCh chan string // task IDs that just transitioned to RUNNING workCh chan workItem // internal bounded queue; Submit enqueues here @@ -109,7 +108,6 @@ func NewPool(maxConcurrent int, runners map[string]Runner, store Store, logger * rateLimited: make(map[string]time.Time), cancels: make(map[string]context.CancelFunc), consecutiveFailures: make(map[string]int), - drained: make(map[string]bool), resultCh: make(chan *Result, maxConcurrent*2), startedCh: make(chan string, maxConcurrent*2), workCh: make(chan workItem, maxConcurrent*10+100), @@ -408,21 +406,7 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. } p.mu.Lock() p.consecutiveFailures[agentType]++ - failures := p.consecutiveFailures[agentType] p.mu.Unlock() - if failures >= 3 { - p.mu.Lock() - p.drained[agentType] = true - p.mu.Unlock() - p.logger.Warn("agent drained after consecutive failures", "agent", agentType, "failures", failures) - questionJSON, _ := json.Marshal(map[string]string{ - "question": fmt.Sprintf("Agent %q has failed %d times in a row (last error: %s). Acknowledge to resume.", agentType, failures, exec.ErrorMsg), - "options": "acknowledge", - }) - if err := p.store.UpdateTaskQuestion(t.ID, string(questionJSON)); err != nil { - p.logger.Error("failed to set drain question", "error", err) - } - } } if t.StoryID != "" && exec.Status == "FAILED" { storyID := t.StoryID @@ -440,7 +424,6 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. } else { p.mu.Lock() p.consecutiveFailures[agentType] = 0 - p.drained[agentType] = false p.mu.Unlock() if t.ParentTaskID == "" { subtasks, subErr := p.store.ListSubtasks(t.ID) @@ -657,14 +640,6 @@ func (p *Pool) checkValidationResult(ctx context.Context, storyID string, taskSt } } -// UndrainingAgent resets the drain state and failure counter for the given agent type. -func (p *Pool) UndrainingAgent(agentType string) { - p.mu.Lock() - defer p.mu.Unlock() - p.drained[agentType] = false - p.consecutiveFailures[agentType] = 0 -} - // ActiveCount returns the number of currently running tasks. func (p *Pool) ActiveCount() int { p.mu.Lock() @@ -678,7 +653,6 @@ type AgentStatusInfo struct { ActiveTasks int `json:"active_tasks"` RateLimited bool `json:"rate_limited"` Until *time.Time `json:"until,omitempty"` - Drained bool `json:"drained"` } // AgentStatuses returns the current status of all registered agents. @@ -691,7 +665,6 @@ func (p *Pool) AgentStatuses() []AgentStatusInfo { info := AgentStatusInfo{ Agent: agent, ActiveTasks: p.activePerAgent[agent], - Drained: p.drained[agent], } if deadline, ok := p.rateLimited[agent]; ok && now.Before(deadline) { info.RateLimited = true @@ -805,16 +778,9 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { } }() - p.mu.Lock() - if p.drained[agentType] { - p.mu.Unlock() - time.AfterFunc(2*time.Minute, func() { p.workCh <- workItem{ctx: ctx, task: t} }) - return - } // Check dependencies before taking the per-agent slot to avoid deadlock: // if a dependent task holds the slot while waiting for its dependency to run, // the dependency can never start (maxPerAgent=1). - p.mu.Unlock() if len(t.DependsOn) > 0 { ready, depErr := p.checkDepsReady(t) if depErr != nil { |
