summaryrefslogtreecommitdiff
path: root/internal/executor
diff options
context:
space:
mode:
authorClaudomator Agent <agent@claudomator>2026-03-16 19:57:54 +0000
committerClaudomator Agent <agent@claudomator>2026-03-16 19:57:54 +0000
commitc0519a1b117c50a9c3c6394bc2948398ce378a92 (patch)
tree80cf1cc22cf79118378e7c462387098b3d467808 /internal/executor
parent20c66b4be2c0b922ae4927d3021c3f2a9e6d1e8c (diff)
fix: clean up activePerAgent before sending to resultCh
Move activePerAgent decrement/deletion out of execute() and executeResume() defers and into the code paths immediately before each resultCh send (handleRunResult and early-return paths). This guarantees that when a result consumer reads from the channel the map is already clean, eliminating a race between defer and result receipt. Remove the polling loop from TestPool_ActivePerAgent_DeletesZeroEntries and check the map state immediately after reading the result instead. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/executor')
-rw-r--r--internal/executor/executor.go32
-rw-r--r--internal/executor/executor_test.go16
2 files changed, 27 insertions, 21 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index c07171b..219a40b 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -204,10 +204,6 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex
defer func() {
p.mu.Lock()
p.active--
- p.activePerAgent[agentType]--
- if p.activePerAgent[agentType] == 0 {
- delete(p.activePerAgent, agentType)
- }
p.mu.Unlock()
select {
case p.doneCh <- struct{}{}:
@@ -218,6 +214,12 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex
runner, err := p.getRunner(t)
if err != nil {
p.logger.Error("failed to get runner for resume", "error", err, "taskID", t.ID)
+ p.mu.Lock()
+ p.activePerAgent[agentType]--
+ if p.activePerAgent[agentType] == 0 {
+ delete(p.activePerAgent, agentType)
+ }
+ p.mu.Unlock()
p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err}
return
}
@@ -364,6 +366,12 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.
if updateErr := p.store.UpdateExecution(exec); updateErr != nil {
p.logger.Error("failed to update execution", "error", updateErr)
}
+ p.mu.Lock()
+ p.activePerAgent[agentType]--
+ if p.activePerAgent[agentType] == 0 {
+ delete(p.activePerAgent, agentType)
+ }
+ p.mu.Unlock()
p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err}
}
@@ -465,10 +473,6 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
defer func() {
p.mu.Lock()
p.active--
- p.activePerAgent[agentType]--
- if p.activePerAgent[agentType] == 0 {
- delete(p.activePerAgent, agentType)
- }
p.mu.Unlock()
select {
case p.doneCh <- struct{}{}:
@@ -494,6 +498,12 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil {
p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateFailed, "error", err)
}
+ p.mu.Lock()
+ p.activePerAgent[agentType]--
+ if p.activePerAgent[agentType] == 0 {
+ delete(p.activePerAgent, agentType)
+ }
+ p.mu.Unlock()
p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err}
return
}
@@ -516,6 +526,12 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil {
p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateFailed, "error", err)
}
+ p.mu.Lock()
+ p.activePerAgent[agentType]--
+ if p.activePerAgent[agentType] == 0 {
+ delete(p.activePerAgent, agentType)
+ }
+ p.mu.Unlock()
p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err}
return
}
diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go
index d8a2b77..878a32d 100644
--- a/internal/executor/executor_test.go
+++ b/internal/executor/executor_test.go
@@ -751,19 +751,9 @@ func TestPool_ActivePerAgent_DeletesZeroEntries(t *testing.T) {
pool.Submit(context.Background(), tk)
<-pool.Results()
- // The deferred cleanup in execute() runs after resultCh is sent, so poll
- // briefly for the map entry to be removed rather than checking immediately.
- var exists bool
- deadline := time.Now().Add(100 * time.Millisecond)
- for time.Now().Before(deadline) {
- pool.mu.Lock()
- _, exists = pool.activePerAgent["claude"]
- pool.mu.Unlock()
- if !exists {
- break
- }
- time.Sleep(time.Millisecond)
- }
+ pool.mu.Lock()
+ _, exists := pool.activePerAgent["claude"]
+ pool.mu.Unlock()
if exists {
t.Error("activePerAgent should not have a zero-count entry for claude after task completes")