diff options
| author | Claudomator Agent <agent@claudomator.local> | 2026-03-21 23:18:50 +0000 |
|---|---|---|
| committer | Claudomator Agent <agent@claudomator.local> | 2026-03-21 23:18:50 +0000 |
| commit | 8dca9bbb0baee59ffe0d3127180ef0958dda8b91 (patch) | |
| tree | e887036f4cce0f10694c5b9a29f4b4dc251769ba /internal/executor/executor.go | |
| parent | 9e35f7e4087cfa6017cb65ec6a7036f394f5eb22 (diff) | |
feat: executor reliability — per-agent limit, drain gate, pre-flight creds, auth recovery
- maxPerAgent=1: only 1 in-flight execution per agent type at a time; excess tasks are requeued after 30s
- Drain gate: after 2 consecutive failures the agent is drained and a question is set on the task; reset on first success; POST /api/pool/agents/{agent}/undrain to acknowledge
- Pre-flight credential check: verify .credentials.json and .claude.json exist in agentHome before spinning up a container
- Auth error auto-recovery: detect auth errors (Not logged in, OAuth token has expired, etc.) and retry once after running sync-credentials and re-copying fresh credentials
- Extracted runContainer() helper from ContainerRunner.Run() to support the retry flow
- Wire CredentialSyncCmd in serve.go for all three ContainerRunner instances
- Tests: TestPool_MaxPerAgent_*, TestPool_ConsecutiveFailures_*, TestPool_Undrain_*, TestContainerRunner_Missing{Credentials,Settings}_FailsFast, TestIsAuthError_*, TestContainerRunner_AuthError_SyncsAndRetries
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/executor/executor.go')
| -rw-r--r-- | internal/executor/executor.go | 116 |
1 files changed, 87 insertions, 29 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 1f2c27d..7513916 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -2,6 +2,7 @@ package executor import ( "context" + "encoding/json" "errors" "fmt" "log/slog" @@ -55,21 +56,24 @@ type workItem struct { // Pool manages a bounded set of concurrent task workers. type Pool struct { maxConcurrent int + maxPerAgent int runners map[string]Runner store Store logger *slog.Logger depPollInterval time.Duration // how often waitForDependencies polls; defaults to 5s - mu sync.Mutex - active int - activePerAgent map[string]int - rateLimited map[string]time.Time // agentType -> until - cancels map[string]context.CancelFunc // taskID → cancel - resultCh chan *Result - workCh chan workItem // internal bounded queue; Submit enqueues here - doneCh chan struct{} // signals when a worker slot is freed - Questions *QuestionRegistry - Classifier *Classifier + mu sync.Mutex + active int + activePerAgent map[string]int + rateLimited map[string]time.Time // agentType -> until + cancels map[string]context.CancelFunc // taskID → cancel + consecutiveFailures map[string]int // agentType -> count + drained map[string]bool // agentType -> true if halted pending human ack + resultCh chan *Result + workCh chan workItem // internal bounded queue; Submit enqueues here + doneCh chan struct{} // signals when a worker slot is freed + Questions *QuestionRegistry + Classifier *Classifier } // Result is emitted when a task execution completes. @@ -84,18 +88,21 @@ func NewPool(maxConcurrent int, runners map[string]Runner, store Store, logger * maxConcurrent = 1 } p := &Pool{ - maxConcurrent: maxConcurrent, - runners: runners, - store: store, - logger: logger, - depPollInterval: 5 * time.Second, - activePerAgent: make(map[string]int), - rateLimited: make(map[string]time.Time), - cancels: make(map[string]context.CancelFunc), - resultCh: make(chan *Result, maxConcurrent*2), - workCh: make(chan workItem, maxConcurrent*10+100), - doneCh: make(chan struct{}, maxConcurrent), - Questions: NewQuestionRegistry(), + maxConcurrent: maxConcurrent, + maxPerAgent: 1, + runners: runners, + store: store, + logger: logger, + depPollInterval: 5 * time.Second, + activePerAgent: make(map[string]int), + rateLimited: make(map[string]time.Time), + cancels: make(map[string]context.CancelFunc), + consecutiveFailures: make(map[string]int), + drained: make(map[string]bool), + resultCh: make(chan *Result, maxConcurrent*2), + workCh: make(chan workItem, maxConcurrent*10+100), + doneCh: make(chan struct{}, maxConcurrent), + Questions: NewQuestionRegistry(), } go p.dispatch() return p @@ -336,8 +343,29 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil { p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateFailed, "error", err) } + p.mu.Lock() + p.consecutiveFailures[agentType]++ + failures := p.consecutiveFailures[agentType] + p.mu.Unlock() + if failures >= 2 { + p.mu.Lock() + p.drained[agentType] = true + p.mu.Unlock() + p.logger.Warn("agent drained after consecutive failures", "agent", agentType, "failures", failures) + questionJSON, _ := json.Marshal(map[string]string{ + "question": fmt.Sprintf("Agent %q has failed %d times in a row (last error: %s). Acknowledge to resume.", agentType, failures, exec.ErrorMsg), + "options": "acknowledge", + }) + if err := p.store.UpdateTaskQuestion(t.ID, string(questionJSON)); err != nil { + p.logger.Error("failed to set drain question", "error", err) + } + } } } else { + p.mu.Lock() + p.consecutiveFailures[agentType] = 0 + p.drained[agentType] = false + p.mu.Unlock() if t.ParentTaskID == "" { subtasks, subErr := p.store.ListSubtasks(t.ID) if subErr != nil { @@ -392,6 +420,14 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} } +// UndrainingAgent resets the drain state and failure counter for the given agent type. +func (p *Pool) UndrainingAgent(agentType string) { + p.mu.Lock() + defer p.mu.Unlock() + p.drained[agentType] = false + p.consecutiveFailures[agentType] = 0 +} + // ActiveCount returns the number of currently running tasks. func (p *Pool) ActiveCount() int { p.mu.Lock() @@ -520,13 +556,6 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { agentType = "claude" } - p.mu.Lock() - if deadline, ok := p.rateLimited[agentType]; ok && time.Now().After(deadline) { - delete(p.rateLimited, agentType) - } - p.activePerAgent[agentType]++ - p.mu.Unlock() - defer func() { p.mu.Lock() p.active-- @@ -537,6 +566,35 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { } }() + p.mu.Lock() + if p.drained[agentType] { + p.mu.Unlock() + time.AfterFunc(2*time.Minute, func() { p.workCh <- workItem{ctx: ctx, task: t} }) + return + } + if p.activePerAgent[agentType] >= p.maxPerAgent { + p.mu.Unlock() + time.AfterFunc(30*time.Second, func() { p.workCh <- workItem{ctx: ctx, task: t} }) + return + } + if deadline, ok := p.rateLimited[agentType]; ok && time.Now().After(deadline) { + delete(p.rateLimited, agentType) + agentName := agentType + go func() { + ev := storage.AgentEvent{ + ID: uuid.New().String(), + Agent: agentName, + Event: "available", + Timestamp: time.Now(), + } + if recErr := p.store.RecordAgentEvent(ev); recErr != nil { + p.logger.Warn("failed to record agent available event", "error", recErr) + } + }() + } + p.activePerAgent[agentType]++ + p.mu.Unlock() + runner, err := p.getRunner(t) if err != nil { p.logger.Error("failed to get runner", "error", err, "taskID", t.ID) |
