summaryrefslogtreecommitdiff
path: root/internal/executor/executor.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/executor/executor.go')
-rw-r--r--internal/executor/executor.go34
1 files changed, 0 insertions, 34 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index 48626ec..a1f29ed 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -75,7 +75,6 @@ type Pool struct {
rateLimited map[string]time.Time // agentType -> until
cancels map[string]context.CancelFunc // taskID → cancel
consecutiveFailures map[string]int // agentType -> count
- drained map[string]bool // agentType -> true if halted pending human ack
resultCh chan *Result
startedCh chan string // task IDs that just transitioned to RUNNING
workCh chan workItem // internal bounded queue; Submit enqueues here
@@ -109,7 +108,6 @@ func NewPool(maxConcurrent int, runners map[string]Runner, store Store, logger *
rateLimited: make(map[string]time.Time),
cancels: make(map[string]context.CancelFunc),
consecutiveFailures: make(map[string]int),
- drained: make(map[string]bool),
resultCh: make(chan *Result, maxConcurrent*2),
startedCh: make(chan string, maxConcurrent*2),
workCh: make(chan workItem, maxConcurrent*10+100),
@@ -408,21 +406,7 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.
}
p.mu.Lock()
p.consecutiveFailures[agentType]++
- failures := p.consecutiveFailures[agentType]
p.mu.Unlock()
- if failures >= 3 {
- p.mu.Lock()
- p.drained[agentType] = true
- p.mu.Unlock()
- p.logger.Warn("agent drained after consecutive failures", "agent", agentType, "failures", failures)
- questionJSON, _ := json.Marshal(map[string]string{
- "question": fmt.Sprintf("Agent %q has failed %d times in a row (last error: %s). Acknowledge to resume.", agentType, failures, exec.ErrorMsg),
- "options": "acknowledge",
- })
- if err := p.store.UpdateTaskQuestion(t.ID, string(questionJSON)); err != nil {
- p.logger.Error("failed to set drain question", "error", err)
- }
- }
}
if t.StoryID != "" && exec.Status == "FAILED" {
storyID := t.StoryID
@@ -440,7 +424,6 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.
} else {
p.mu.Lock()
p.consecutiveFailures[agentType] = 0
- p.drained[agentType] = false
p.mu.Unlock()
if t.ParentTaskID == "" {
subtasks, subErr := p.store.ListSubtasks(t.ID)
@@ -657,14 +640,6 @@ func (p *Pool) checkValidationResult(ctx context.Context, storyID string, taskSt
}
}
-// UndrainingAgent resets the drain state and failure counter for the given agent type.
-func (p *Pool) UndrainingAgent(agentType string) {
- p.mu.Lock()
- defer p.mu.Unlock()
- p.drained[agentType] = false
- p.consecutiveFailures[agentType] = 0
-}
-
// ActiveCount returns the number of currently running tasks.
func (p *Pool) ActiveCount() int {
p.mu.Lock()
@@ -678,7 +653,6 @@ type AgentStatusInfo struct {
ActiveTasks int `json:"active_tasks"`
RateLimited bool `json:"rate_limited"`
Until *time.Time `json:"until,omitempty"`
- Drained bool `json:"drained"`
}
// AgentStatuses returns the current status of all registered agents.
@@ -691,7 +665,6 @@ func (p *Pool) AgentStatuses() []AgentStatusInfo {
info := AgentStatusInfo{
Agent: agent,
ActiveTasks: p.activePerAgent[agent],
- Drained: p.drained[agent],
}
if deadline, ok := p.rateLimited[agent]; ok && now.Before(deadline) {
info.RateLimited = true
@@ -805,16 +778,9 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
}
}()
- p.mu.Lock()
- if p.drained[agentType] {
- p.mu.Unlock()
- time.AfterFunc(2*time.Minute, func() { p.workCh <- workItem{ctx: ctx, task: t} })
- return
- }
// Check dependencies before taking the per-agent slot to avoid deadlock:
// if a dependent task holds the slot while waiting for its dependency to run,
// the dependency can never start (maxPerAgent=1).
- p.mu.Unlock()
if len(t.DependsOn) > 0 {
ready, depErr := p.checkDepsReady(t)
if depErr != nil {