summaryrefslogtreecommitdiff
path: root/internal/executor
diff options
context:
space:
mode:
Diffstat (limited to 'internal/executor')
-rw-r--r--internal/executor/executor.go32
-rw-r--r--internal/executor/executor_test.go16
2 files changed, 27 insertions, 21 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index c07171b..219a40b 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -204,10 +204,6 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex
defer func() {
p.mu.Lock()
p.active--
- p.activePerAgent[agentType]--
- if p.activePerAgent[agentType] == 0 {
- delete(p.activePerAgent, agentType)
- }
p.mu.Unlock()
select {
case p.doneCh <- struct{}{}:
@@ -218,6 +214,12 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex
runner, err := p.getRunner(t)
if err != nil {
p.logger.Error("failed to get runner for resume", "error", err, "taskID", t.ID)
+ p.mu.Lock()
+ p.activePerAgent[agentType]--
+ if p.activePerAgent[agentType] == 0 {
+ delete(p.activePerAgent, agentType)
+ }
+ p.mu.Unlock()
p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err}
return
}
@@ -364,6 +366,12 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.
if updateErr := p.store.UpdateExecution(exec); updateErr != nil {
p.logger.Error("failed to update execution", "error", updateErr)
}
+ p.mu.Lock()
+ p.activePerAgent[agentType]--
+ if p.activePerAgent[agentType] == 0 {
+ delete(p.activePerAgent, agentType)
+ }
+ p.mu.Unlock()
p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err}
}
@@ -465,10 +473,6 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
defer func() {
p.mu.Lock()
p.active--
- p.activePerAgent[agentType]--
- if p.activePerAgent[agentType] == 0 {
- delete(p.activePerAgent, agentType)
- }
p.mu.Unlock()
select {
case p.doneCh <- struct{}{}:
@@ -494,6 +498,12 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil {
p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateFailed, "error", err)
}
+ p.mu.Lock()
+ p.activePerAgent[agentType]--
+ if p.activePerAgent[agentType] == 0 {
+ delete(p.activePerAgent, agentType)
+ }
+ p.mu.Unlock()
p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err}
return
}
@@ -516,6 +526,12 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil {
p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateFailed, "error", err)
}
+ p.mu.Lock()
+ p.activePerAgent[agentType]--
+ if p.activePerAgent[agentType] == 0 {
+ delete(p.activePerAgent, agentType)
+ }
+ p.mu.Unlock()
p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err}
return
}
diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go
index d8a2b77..878a32d 100644
--- a/internal/executor/executor_test.go
+++ b/internal/executor/executor_test.go
@@ -751,19 +751,9 @@ func TestPool_ActivePerAgent_DeletesZeroEntries(t *testing.T) {
pool.Submit(context.Background(), tk)
<-pool.Results()
- // The deferred cleanup in execute() runs after resultCh is sent, so poll
- // briefly for the map entry to be removed rather than checking immediately.
- var exists bool
- deadline := time.Now().Add(100 * time.Millisecond)
- for time.Now().Before(deadline) {
- pool.mu.Lock()
- _, exists = pool.activePerAgent["claude"]
- pool.mu.Unlock()
- if !exists {
- break
- }
- time.Sleep(time.Millisecond)
- }
+ pool.mu.Lock()
+ _, exists := pool.activePerAgent["claude"]
+ pool.mu.Unlock()
if exists {
t.Error("activePerAgent should not have a zero-count entry for claude after task completes")