summaryrefslogtreecommitdiff
path: root/internal/executor/executor.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/executor/executor.go')
-rw-r--r--internal/executor/executor.go57
1 files changed, 31 insertions, 26 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index 4501a3c..315030d 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -196,6 +196,28 @@ func (p *Pool) getRunner(t *task.Task) (Runner, error) {
return runner, nil
}
+// decActiveAgent decrements the active counters for a finished task. Safe to
+// call multiple times — subsequent calls are no-ops via the cleaned flag.
+// Always call this before sending on resultCh so consumers observing a result
+// see the accounting already settled (no zero-count map entries lingering).
+func (p *Pool) decActiveAgent(agentType string, cleaned *bool) {
+ if *cleaned {
+ return
+ }
+ *cleaned = true
+ p.mu.Lock()
+ p.active--
+ p.activePerAgent[agentType]--
+ if p.activePerAgent[agentType] == 0 {
+ delete(p.activePerAgent, agentType)
+ }
+ p.mu.Unlock()
+ select {
+ case p.doneCh <- struct{}{}:
+ default:
+ }
+}
+
func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Execution) {
agentType := t.Agent.Type
if agentType == "" {
@@ -206,23 +228,13 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex
p.activePerAgent[agentType]++
p.mu.Unlock()
- defer func() {
- p.mu.Lock()
- p.active--
- p.activePerAgent[agentType]--
- if p.activePerAgent[agentType] == 0 {
- delete(p.activePerAgent, agentType)
- }
- p.mu.Unlock()
- select {
- case p.doneCh <- struct{}{}:
- default:
- }
- }()
+ var cleaned bool
+ defer p.decActiveAgent(agentType, &cleaned)
runner, err := p.getRunner(t)
if err != nil {
p.logger.Error("failed to get runner for resume", "error", err, "taskID", t.ID)
+ p.decActiveAgent(agentType, &cleaned)
p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err}
return
}
@@ -264,6 +276,7 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex
err = runner.Run(ctx, t, exec)
exec.EndTime = time.Now().UTC()
+ p.decActiveAgent(agentType, &cleaned)
p.handleRunResult(ctx, t, exec, err, agentType)
}
@@ -473,19 +486,8 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
p.activePerAgent[agentType]++
p.mu.Unlock()
- defer func() {
- p.mu.Lock()
- p.active--
- p.activePerAgent[agentType]--
- if p.activePerAgent[agentType] == 0 {
- delete(p.activePerAgent, agentType)
- }
- p.mu.Unlock()
- select {
- case p.doneCh <- struct{}{}:
- default:
- }
- }()
+ var cleaned bool
+ defer p.decActiveAgent(agentType, &cleaned)
runner, err := p.getRunner(t)
if err != nil {
@@ -505,6 +507,7 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil {
p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateFailed, "error", err)
}
+ p.decActiveAgent(agentType, &cleaned)
p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err}
return
}
@@ -527,6 +530,7 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil {
p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateFailed, "error", err)
}
+ p.decActiveAgent(agentType, &cleaned)
p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err}
return
}
@@ -583,6 +587,7 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
err = runner.Run(ctx, t, exec)
exec.EndTime = time.Now().UTC()
+ p.decActiveAgent(agentType, &cleaned)
p.handleRunResult(ctx, t, exec, err, agentType)
}