summaryrefslogtreecommitdiff
path: root/internal/executor/executor.go
diff options
context:
space:
mode:
authorClaudomator Agent <agent@claudomator.local>2026-03-21 23:18:50 +0000
committerClaudomator Agent <agent@claudomator.local>2026-03-21 23:18:50 +0000
commit8dca9bbb0baee59ffe0d3127180ef0958dda8b91 (patch)
treee887036f4cce0f10694c5b9a29f4b4dc251769ba /internal/executor/executor.go
parent9e35f7e4087cfa6017cb65ec6a7036f394f5eb22 (diff)
feat: executor reliability — per-agent limit, drain gate, pre-flight creds, auth recovery
- maxPerAgent=1: only 1 in-flight execution per agent type at a time; excess tasks are requeued after 30s - Drain gate: after 2 consecutive failures the agent is drained and a question is set on the task; reset on first success; POST /api/pool/agents/{agent}/undrain to acknowledge - Pre-flight credential check: verify .credentials.json and .claude.json exist in agentHome before spinning up a container - Auth error auto-recovery: detect auth errors (Not logged in, OAuth token has expired, etc.) and retry once after running sync-credentials and re-copying fresh credentials - Extracted runContainer() helper from ContainerRunner.Run() to support the retry flow - Wire CredentialSyncCmd in serve.go for all three ContainerRunner instances - Tests: TestPool_MaxPerAgent_*, TestPool_ConsecutiveFailures_*, TestPool_Undrain_*, TestContainerRunner_Missing{Credentials,Settings}_FailsFast, TestIsAuthError_*, TestContainerRunner_AuthError_SyncsAndRetries Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/executor/executor.go')
-rw-r--r--internal/executor/executor.go116
1 files changed, 87 insertions, 29 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index 1f2c27d..7513916 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -2,6 +2,7 @@ package executor
import (
"context"
+ "encoding/json"
"errors"
"fmt"
"log/slog"
@@ -55,21 +56,24 @@ type workItem struct {
// Pool manages a bounded set of concurrent task workers.
type Pool struct {
maxConcurrent int
+ maxPerAgent int
runners map[string]Runner
store Store
logger *slog.Logger
depPollInterval time.Duration // how often waitForDependencies polls; defaults to 5s
- mu sync.Mutex
- active int
- activePerAgent map[string]int
- rateLimited map[string]time.Time // agentType -> until
- cancels map[string]context.CancelFunc // taskID → cancel
- resultCh chan *Result
- workCh chan workItem // internal bounded queue; Submit enqueues here
- doneCh chan struct{} // signals when a worker slot is freed
- Questions *QuestionRegistry
- Classifier *Classifier
+ mu sync.Mutex
+ active int
+ activePerAgent map[string]int
+ rateLimited map[string]time.Time // agentType -> until
+ cancels map[string]context.CancelFunc // taskID → cancel
+ consecutiveFailures map[string]int // agentType -> count
+ drained map[string]bool // agentType -> true if halted pending human ack
+ resultCh chan *Result
+ workCh chan workItem // internal bounded queue; Submit enqueues here
+ doneCh chan struct{} // signals when a worker slot is freed
+ Questions *QuestionRegistry
+ Classifier *Classifier
}
// Result is emitted when a task execution completes.
@@ -84,18 +88,21 @@ func NewPool(maxConcurrent int, runners map[string]Runner, store Store, logger *
maxConcurrent = 1
}
p := &Pool{
- maxConcurrent: maxConcurrent,
- runners: runners,
- store: store,
- logger: logger,
- depPollInterval: 5 * time.Second,
- activePerAgent: make(map[string]int),
- rateLimited: make(map[string]time.Time),
- cancels: make(map[string]context.CancelFunc),
- resultCh: make(chan *Result, maxConcurrent*2),
- workCh: make(chan workItem, maxConcurrent*10+100),
- doneCh: make(chan struct{}, maxConcurrent),
- Questions: NewQuestionRegistry(),
+ maxConcurrent: maxConcurrent,
+ maxPerAgent: 1,
+ runners: runners,
+ store: store,
+ logger: logger,
+ depPollInterval: 5 * time.Second,
+ activePerAgent: make(map[string]int),
+ rateLimited: make(map[string]time.Time),
+ cancels: make(map[string]context.CancelFunc),
+ consecutiveFailures: make(map[string]int),
+ drained: make(map[string]bool),
+ resultCh: make(chan *Result, maxConcurrent*2),
+ workCh: make(chan workItem, maxConcurrent*10+100),
+ doneCh: make(chan struct{}, maxConcurrent),
+ Questions: NewQuestionRegistry(),
}
go p.dispatch()
return p
@@ -336,8 +343,29 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.
if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil {
p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateFailed, "error", err)
}
+ p.mu.Lock()
+ p.consecutiveFailures[agentType]++
+ failures := p.consecutiveFailures[agentType]
+ p.mu.Unlock()
+ if failures >= 2 {
+ p.mu.Lock()
+ p.drained[agentType] = true
+ p.mu.Unlock()
+ p.logger.Warn("agent drained after consecutive failures", "agent", agentType, "failures", failures)
+ questionJSON, _ := json.Marshal(map[string]string{
+ "question": fmt.Sprintf("Agent %q has failed %d times in a row (last error: %s). Acknowledge to resume.", agentType, failures, exec.ErrorMsg),
+ "options": "acknowledge",
+ })
+ if err := p.store.UpdateTaskQuestion(t.ID, string(questionJSON)); err != nil {
+ p.logger.Error("failed to set drain question", "error", err)
+ }
+ }
}
} else {
+ p.mu.Lock()
+ p.consecutiveFailures[agentType] = 0
+ p.drained[agentType] = false
+ p.mu.Unlock()
if t.ParentTaskID == "" {
subtasks, subErr := p.store.ListSubtasks(t.ID)
if subErr != nil {
@@ -392,6 +420,14 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.
p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err}
}
+// UndrainingAgent resets the drain state and failure counter for the given agent type.
+func (p *Pool) UndrainingAgent(agentType string) {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ p.drained[agentType] = false
+ p.consecutiveFailures[agentType] = 0
+}
+
// ActiveCount returns the number of currently running tasks.
func (p *Pool) ActiveCount() int {
p.mu.Lock()
@@ -520,13 +556,6 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
agentType = "claude"
}
- p.mu.Lock()
- if deadline, ok := p.rateLimited[agentType]; ok && time.Now().After(deadline) {
- delete(p.rateLimited, agentType)
- }
- p.activePerAgent[agentType]++
- p.mu.Unlock()
-
defer func() {
p.mu.Lock()
p.active--
@@ -537,6 +566,35 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
}
}()
+ p.mu.Lock()
+ if p.drained[agentType] {
+ p.mu.Unlock()
+ time.AfterFunc(2*time.Minute, func() { p.workCh <- workItem{ctx: ctx, task: t} })
+ return
+ }
+ if p.activePerAgent[agentType] >= p.maxPerAgent {
+ p.mu.Unlock()
+ time.AfterFunc(30*time.Second, func() { p.workCh <- workItem{ctx: ctx, task: t} })
+ return
+ }
+ if deadline, ok := p.rateLimited[agentType]; ok && time.Now().After(deadline) {
+ delete(p.rateLimited, agentType)
+ agentName := agentType
+ go func() {
+ ev := storage.AgentEvent{
+ ID: uuid.New().String(),
+ Agent: agentName,
+ Event: "available",
+ Timestamp: time.Now(),
+ }
+ if recErr := p.store.RecordAgentEvent(ev); recErr != nil {
+ p.logger.Warn("failed to record agent available event", "error", recErr)
+ }
+ }()
+ }
+ p.activePerAgent[agentType]++
+ p.mu.Unlock()
+
runner, err := p.getRunner(t)
if err != nil {
p.logger.Error("failed to get runner", "error", err, "taskID", t.ID)