diff options
| author | Claudomator Agent <agent@claudomator> | 2026-03-09 05:02:36 +0000 |
|---|---|---|
| committer | Claudomator Agent <agent@claudomator> | 2026-03-09 05:02:36 +0000 |
| commit | 7c7dd2bc352c91963ece06f3176a032ee7ab462f (patch) | |
| tree | f6d329d095a88c8138d9fb043cdf8106d7585715 | |
| parent | 67b8544b222392d8a01847e3d34559c23fd0cd12 (diff) | |
executor: fix map leaks in activePerAgent and rateLimited
activePerAgent: delete zero-count entries after decrement so the map
doesn't accumulate stale keys for agent types that are no longer active.
rateLimited: delete entries whose deadline has passed when reading them
(in both the classifier block and the execute() pre-flight), so stale
entries are cleaned up on the next check rather than accumulating forever.
Both fixes are covered by new regression tests.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
| -rw-r--r-- | internal/executor/executor.go | 12 | ||||
| -rw-r--r-- | internal/executor/executor_test.go | 48 |
2 files changed, 60 insertions, 0 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 9dd37ae..1c9e667 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -179,6 +179,9 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex p.mu.Lock() p.active-- p.activePerAgent[agentType]-- + if p.activePerAgent[agentType] == 0 { + delete(p.activePerAgent, agentType) + } p.mu.Unlock() select { case p.doneCh <- struct{}{}: @@ -296,6 +299,9 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { now := time.Now() for agent := range p.runners { activeTasks[agent] = p.activePerAgent[agent] + if deadline, ok := p.rateLimited[agent]; ok && now.After(deadline) { + delete(p.rateLimited, agent) + } activeUntil := p.rateLimited[agent] isLimited := now.Before(activeUntil) rateLimited[agent] = isLimited @@ -326,6 +332,9 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { } p.mu.Lock() + if deadline, ok := p.rateLimited[agentType]; ok && time.Now().After(deadline) { + delete(p.rateLimited, agentType) + } p.activePerAgent[agentType]++ p.mu.Unlock() @@ -333,6 +342,9 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { p.mu.Lock() p.active-- p.activePerAgent[agentType]-- + if p.activePerAgent[agentType] == 0 { + delete(p.activePerAgent, agentType) + } p.mu.Unlock() select { case p.doneCh <- struct{}{}: diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go index 2292aa5..c0f6d66 100644 --- a/internal/executor/executor_test.go +++ b/internal/executor/executor_test.go @@ -496,6 +496,54 @@ func TestPool_RecoverStaleRunning(t *testing.T) { } } +func TestPool_ActivePerAgent_DeletesZeroEntries(t *testing.T) { + store := testStore(t) + runner := &mockRunner{} + runners := map[string]Runner{"claude": runner, "gemini": runner} + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + pool := NewPool(2, runners, store, logger) + + tk := makeTask("apa-1") + store.CreateTask(tk) // Agent.Type = "claude" + pool.Submit(context.Background(), tk) + <-pool.Results() + + pool.mu.Lock() + _, exists := pool.activePerAgent["claude"] + pool.mu.Unlock() + + if exists { + t.Error("activePerAgent should not have a zero-count entry for claude after task completes") + } +} + +func TestPool_RateLimited_StaleEntryCleaned(t *testing.T) { + store := testStore(t) + runner := &mockRunner{} + runners := map[string]Runner{"claude": runner} + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + pool := NewPool(2, runners, store, logger) + + // Inject a stale rate-limit entry (deadline already passed). + pool.mu.Lock() + pool.rateLimited["claude"] = time.Now().Add(-1 * time.Minute) + pool.mu.Unlock() + + // Submit a task — the execute() path reads rateLimited during classification. + tk := makeTask("rl-stale-1") + store.CreateTask(tk) + pool.Submit(context.Background(), tk) + <-pool.Results() + + pool.mu.Lock() + _, exists := pool.rateLimited["claude"] + pool.mu.Unlock() + + if exists { + t.Error("stale rate-limit entry should be deleted after deadline passes") + } +} + func TestPool_UnsupportedAgent(t *testing.T) { store := testStore(t) runners := map[string]Runner{"claude": &mockRunner{}} |
