summaryrefslogtreecommitdiff
path: root/internal/executor/executor.go
diff options
context:
space:
mode:
authorClaudomator Agent <agent@claudomator>2026-03-16 19:57:54 +0000
committerClaudomator Agent <agent@claudomator>2026-03-16 19:57:54 +0000
commitc0519a1b117c50a9c3c6394bc2948398ce378a92 (patch)
tree80cf1cc22cf79118378e7c462387098b3d467808 /internal/executor/executor.go
parent20c66b4be2c0b922ae4927d3021c3f2a9e6d1e8c (diff)
fix: clean up activePerAgent before sending to resultCh
Move activePerAgent decrement/deletion out of execute() and executeResume() defers and into the code paths immediately before each resultCh send (handleRunResult and early-return paths). This guarantees that when a result consumer reads from the channel the map is already clean, eliminating a race between defer and result receipt. Remove the polling loop from TestPool_ActivePerAgent_DeletesZeroEntries and check the map state immediately after reading the result instead. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/executor/executor.go')
-rw-r--r--internal/executor/executor.go32
1 files changed, 24 insertions, 8 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index c07171b..219a40b 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -204,10 +204,6 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex
defer func() {
p.mu.Lock()
p.active--
- p.activePerAgent[agentType]--
- if p.activePerAgent[agentType] == 0 {
- delete(p.activePerAgent, agentType)
- }
p.mu.Unlock()
select {
case p.doneCh <- struct{}{}:
@@ -218,6 +214,12 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex
runner, err := p.getRunner(t)
if err != nil {
p.logger.Error("failed to get runner for resume", "error", err, "taskID", t.ID)
+ p.mu.Lock()
+ p.activePerAgent[agentType]--
+ if p.activePerAgent[agentType] == 0 {
+ delete(p.activePerAgent, agentType)
+ }
+ p.mu.Unlock()
p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err}
return
}
@@ -364,6 +366,12 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.
if updateErr := p.store.UpdateExecution(exec); updateErr != nil {
p.logger.Error("failed to update execution", "error", updateErr)
}
+ p.mu.Lock()
+ p.activePerAgent[agentType]--
+ if p.activePerAgent[agentType] == 0 {
+ delete(p.activePerAgent, agentType)
+ }
+ p.mu.Unlock()
p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err}
}
@@ -465,10 +473,6 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
defer func() {
p.mu.Lock()
p.active--
- p.activePerAgent[agentType]--
- if p.activePerAgent[agentType] == 0 {
- delete(p.activePerAgent, agentType)
- }
p.mu.Unlock()
select {
case p.doneCh <- struct{}{}:
@@ -494,6 +498,12 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil {
p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateFailed, "error", err)
}
+ p.mu.Lock()
+ p.activePerAgent[agentType]--
+ if p.activePerAgent[agentType] == 0 {
+ delete(p.activePerAgent, agentType)
+ }
+ p.mu.Unlock()
p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err}
return
}
@@ -516,6 +526,12 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil {
p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateFailed, "error", err)
}
+ p.mu.Lock()
+ p.activePerAgent[agentType]--
+ if p.activePerAgent[agentType] == 0 {
+ delete(p.activePerAgent, agentType)
+ }
+ p.mu.Unlock()
p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err}
return
}