From c0519a1b117c50a9c3c6394bc2948398ce378a92 Mon Sep 17 00:00:00 2001 From: Claudomator Agent Date: Mon, 16 Mar 2026 19:57:54 +0000 Subject: fix: clean up activePerAgent before sending to resultCh Move activePerAgent decrement/deletion out of execute() and executeResume() defers and into the code paths immediately before each resultCh send (handleRunResult and early-return paths). This guarantees that when a result consumer reads from the channel the map is already clean, eliminating a race between defer and result receipt. Remove the polling loop from TestPool_ActivePerAgent_DeletesZeroEntries and check the map state immediately after reading the result instead. Co-Authored-By: Claude Sonnet 4.6 --- internal/executor/executor.go | 32 ++++++++++++++++++++++++-------- internal/executor/executor_test.go | 16 +++------------- 2 files changed, 27 insertions(+), 21 deletions(-) (limited to 'internal/executor') diff --git a/internal/executor/executor.go b/internal/executor/executor.go index c07171b..219a40b 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -204,10 +204,6 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex defer func() { p.mu.Lock() p.active-- - p.activePerAgent[agentType]-- - if p.activePerAgent[agentType] == 0 { - delete(p.activePerAgent, agentType) - } p.mu.Unlock() select { case p.doneCh <- struct{}{}: @@ -218,6 +214,12 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex runner, err := p.getRunner(t) if err != nil { p.logger.Error("failed to get runner for resume", "error", err, "taskID", t.ID) + p.mu.Lock() + p.activePerAgent[agentType]-- + if p.activePerAgent[agentType] == 0 { + delete(p.activePerAgent, agentType) + } + p.mu.Unlock() p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} return } @@ -364,6 +366,12 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. if updateErr := p.store.UpdateExecution(exec); updateErr != nil { p.logger.Error("failed to update execution", "error", updateErr) } + p.mu.Lock() + p.activePerAgent[agentType]-- + if p.activePerAgent[agentType] == 0 { + delete(p.activePerAgent, agentType) + } + p.mu.Unlock() p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} } @@ -465,10 +473,6 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { defer func() { p.mu.Lock() p.active-- - p.activePerAgent[agentType]-- - if p.activePerAgent[agentType] == 0 { - delete(p.activePerAgent, agentType) - } p.mu.Unlock() select { case p.doneCh <- struct{}{}: @@ -494,6 +498,12 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil { p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateFailed, "error", err) } + p.mu.Lock() + p.activePerAgent[agentType]-- + if p.activePerAgent[agentType] == 0 { + delete(p.activePerAgent, agentType) + } + p.mu.Unlock() p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} return } @@ -516,6 +526,12 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil { p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateFailed, "error", err) } + p.mu.Lock() + p.activePerAgent[agentType]-- + if p.activePerAgent[agentType] == 0 { + delete(p.activePerAgent, agentType) + } + p.mu.Unlock() p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} return } diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go index d8a2b77..878a32d 100644 --- a/internal/executor/executor_test.go +++ b/internal/executor/executor_test.go @@ -751,19 +751,9 @@ func TestPool_ActivePerAgent_DeletesZeroEntries(t *testing.T) { pool.Submit(context.Background(), tk) <-pool.Results() - // The deferred cleanup in execute() runs after resultCh is sent, so poll - // briefly for the map entry to be removed rather than checking immediately. - var exists bool - deadline := time.Now().Add(100 * time.Millisecond) - for time.Now().Before(deadline) { - pool.mu.Lock() - _, exists = pool.activePerAgent["claude"] - pool.mu.Unlock() - if !exists { - break - } - time.Sleep(time.Millisecond) - } + pool.mu.Lock() + _, exists := pool.activePerAgent["claude"] + pool.mu.Unlock() if exists { t.Error("activePerAgent should not have a zero-count entry for claude after task completes") -- cgit v1.2.3