diff options
| author | Peter Stone <thepeterstone@gmail.com> | 2026-05-02 21:55:49 -1000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2026-05-02 21:55:49 -1000 |
| commit | 1ec3f87c392145580a62858110d9fd10638203db (patch) | |
| tree | 2ffbc4fcf585aef8091f02b3315a3234267c5618 /internal/executor/executor.go | |
| parent | 99115d8158137083239c45e5a860b718ff4cefa1 (diff) | |
| parent | 85c3bf4d28b0903a2005356339e6ea56855b8c80 (diff) | |
Merge pull request #2 from thepeterstone/claude/post-epic-cleanup
Post-epic cleanup — green test suite, no skips
Diffstat (limited to 'internal/executor/executor.go')
| -rw-r--r-- | internal/executor/executor.go | 57 |
1 files changed, 31 insertions, 26 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 4501a3c..315030d 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -196,6 +196,28 @@ func (p *Pool) getRunner(t *task.Task) (Runner, error) { return runner, nil } +// decActiveAgent decrements the active counters for a finished task. Safe to +// call multiple times — subsequent calls are no-ops via the cleaned flag. +// Always call this before sending on resultCh so consumers observing a result +// see the accounting already settled (no zero-count map entries lingering). +func (p *Pool) decActiveAgent(agentType string, cleaned *bool) { + if *cleaned { + return + } + *cleaned = true + p.mu.Lock() + p.active-- + p.activePerAgent[agentType]-- + if p.activePerAgent[agentType] == 0 { + delete(p.activePerAgent, agentType) + } + p.mu.Unlock() + select { + case p.doneCh <- struct{}{}: + default: + } +} + func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Execution) { agentType := t.Agent.Type if agentType == "" { @@ -206,23 +228,13 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex p.activePerAgent[agentType]++ p.mu.Unlock() - defer func() { - p.mu.Lock() - p.active-- - p.activePerAgent[agentType]-- - if p.activePerAgent[agentType] == 0 { - delete(p.activePerAgent, agentType) - } - p.mu.Unlock() - select { - case p.doneCh <- struct{}{}: - default: - } - }() + var cleaned bool + defer p.decActiveAgent(agentType, &cleaned) runner, err := p.getRunner(t) if err != nil { p.logger.Error("failed to get runner for resume", "error", err, "taskID", t.ID) + p.decActiveAgent(agentType, &cleaned) p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} return } @@ -264,6 +276,7 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex err = runner.Run(ctx, t, exec) exec.EndTime = time.Now().UTC() + p.decActiveAgent(agentType, &cleaned) p.handleRunResult(ctx, t, exec, err, agentType) } @@ -473,19 +486,8 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { p.activePerAgent[agentType]++ p.mu.Unlock() - defer func() { - p.mu.Lock() - p.active-- - p.activePerAgent[agentType]-- - if p.activePerAgent[agentType] == 0 { - delete(p.activePerAgent, agentType) - } - p.mu.Unlock() - select { - case p.doneCh <- struct{}{}: - default: - } - }() + var cleaned bool + defer p.decActiveAgent(agentType, &cleaned) runner, err := p.getRunner(t) if err != nil { @@ -505,6 +507,7 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil { p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateFailed, "error", err) } + p.decActiveAgent(agentType, &cleaned) p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} return } @@ -527,6 +530,7 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil { p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateFailed, "error", err) } + p.decActiveAgent(agentType, &cleaned) p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} return } @@ -583,6 +587,7 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { err = runner.Run(ctx, t, exec) exec.EndTime = time.Now().UTC() + p.decActiveAgent(agentType, &cleaned) p.handleRunResult(ctx, t, exec, err, agentType) } |
