diff options
| author | Peter Stone <thepeterstone@gmail.com> | 2026-05-13 04:02:20 +0000 |
|---|---|---|
| committer | Peter Stone <thepeterstone@gmail.com> | 2026-05-13 04:02:20 +0000 |
| commit | 68399a598924775a3ec22a39c2336ae497fb07f3 (patch) | |
| tree | 29ade8224eb51eca47a1d9d03bb4d0d3653a72aa /internal/executor/executor.go | |
| parent | f01231cc45f41ce2dc37072e77428e467ef3fc15 (diff) | |
| parent | d970c0730ff0dc7d714d3261197d8ba52b5d21f4 (diff) | |
Merges 12 commits from github/main (formerly master) that were developed
independently. Key additions:
- LocalRunner: OpenAI-compatible local LLM execution (Ollama, LM Studio)
- Real GeminiRunner with full sandbox parity to ClaudeRunner
- llm.Client for enriching CI failures and elaboration via local model
- retry.ParseRetryAfter moved to shared package
- tokens_in/tokens_out columns in executions table
Conflict resolutions:
- Kept local main's VAPID/push, stories, projects, agent events schema
- Merged both sets of Config fields (local + LocalModel from github/main)
- Unified activePerAgent accounting (decActiveAgent helper)
- Removed duplicate helpers from claude.go (now in helpers.go)
- Fixed double-decrement bug in handleRunResult vs decActiveAgent
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/executor/executor.go')
| -rw-r--r-- | internal/executor/executor.go | 101 |
1 files changed, 52 insertions, 49 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 376d62c..09169bd 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -12,6 +12,8 @@ import ( "sync" "time" + "github.com/thepeterstone/claudomator/internal/llm" + "github.com/thepeterstone/claudomator/internal/retry" "github.com/thepeterstone/claudomator/internal/storage" "github.com/thepeterstone/claudomator/internal/task" "github.com/google/uuid" @@ -75,17 +77,18 @@ type Pool struct { mu sync.Mutex active int activePerAgent map[string]int - rateLimited map[string]time.Time // agentType -> until + rateLimited map[string]time.Time // agentType -> until cancels map[string]context.CancelFunc // taskID → cancel - consecutiveFailures map[string]int // agentType -> count - closed bool // set to true when Shutdown has been called + consecutiveFailures map[string]int // agentType -> count + closed bool // set to true when Shutdown has been called resultCh chan *Result - startedCh chan string // task IDs that just transitioned to RUNNING - workCh chan workItem // internal bounded queue; Submit enqueues here - doneCh chan struct{} // signals when a worker slot is freed - workerWg sync.WaitGroup // tracks in-flight execute/executeResume goroutines - dispatchDone chan struct{} // closed when the dispatch goroutine exits + startedCh chan string // task IDs that just transitioned to RUNNING + workCh chan workItem // internal bounded queue; Submit enqueues here + doneCh chan struct{} // signals when a worker slot is freed + workerWg sync.WaitGroup // tracks in-flight execute/executeResume goroutines + dispatchDone chan struct{} // closed when the dispatch goroutine exits Classifier *Classifier + LLM *llm.Client } // Result is emitted when a task execution completes. @@ -258,6 +261,28 @@ func (p *Pool) getRunner(t *task.Task) (Runner, error) { return runner, nil } +// decActiveAgent decrements the active counters for a finished task. Safe to +// call multiple times — subsequent calls are no-ops via the cleaned flag. +// Always call this before sending on resultCh so consumers observing a result +// see the accounting already settled (no zero-count map entries lingering). +func (p *Pool) decActiveAgent(agentType string, cleaned *bool) { + if *cleaned { + return + } + *cleaned = true + p.mu.Lock() + p.active-- + p.activePerAgent[agentType]-- + if p.activePerAgent[agentType] == 0 { + delete(p.activePerAgent, agentType) + } + p.mu.Unlock() + select { + case p.doneCh <- struct{}{}: + default: + } +} + func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Execution) { agentType := t.Agent.Type if agentType == "" { @@ -268,25 +293,13 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex p.activePerAgent[agentType]++ p.mu.Unlock() - defer func() { - p.mu.Lock() - p.active-- - p.mu.Unlock() - select { - case p.doneCh <- struct{}{}: - default: - } - }() + var cleaned bool + defer p.decActiveAgent(agentType, &cleaned) runner, err := p.getRunner(t) if err != nil { p.logger.Error("failed to get runner for resume", "error", err, "taskID", t.ID) - p.mu.Lock() - p.activePerAgent[agentType]-- - if p.activePerAgent[agentType] == 0 { - delete(p.activePerAgent, agentType) - } - p.mu.Unlock() + p.decActiveAgent(agentType, &cleaned) p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} return } @@ -342,6 +355,7 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex err = runner.Run(ctx, t, exec) exec.EndTime = time.Now().UTC() + p.decActiveAgent(agentType, &cleaned) p.handleRunResult(ctx, t, exec, err, agentType) } @@ -351,9 +365,9 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex // resultCh. The caller must set exec.EndTime before calling. func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.Execution, err error, agentType string) { if err != nil { - if isRateLimitError(err) || isQuotaExhausted(err) { + if retry.IsRateLimitError(err) || isQuotaExhausted(err) { p.mu.Lock() - retryAfter := parseRetryAfter(err.Error()) + retryAfter := retry.ParseRetryAfter(err.Error()) reason := "transient" if isQuotaExhausted(err) { reason = "quota" @@ -505,6 +519,9 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. if summary == "" && exec.StdoutPath != "" { summary = extractSummary(exec.StdoutPath) } + if summary == "" && p.LLM != nil && exec.StdoutPath != "" { + summary = synthesizeSummary(ctx, p.LLM, exec.StdoutPath) + } if summary != "" { if summaryErr := p.store.UpdateTaskSummary(t.ID, summary); summaryErr != nil { p.logger.Error("failed to update task summary", "taskID", t.ID, "error", summaryErr) @@ -528,12 +545,6 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. if updateErr := p.store.UpdateExecution(exec); updateErr != nil { p.logger.Error("failed to update execution", "error", updateErr) } - p.mu.Lock() - p.activePerAgent[agentType]-- - if p.activePerAgent[agentType] == 0 { - delete(p.activePerAgent, agentType) - } - p.mu.Unlock() p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} } @@ -884,8 +895,11 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { } p.mu.Unlock() - // If a specific agent is already requested, skip selection and classification. - skipClassification := t.Agent.Type == "claude" || t.Agent.Type == "gemini" + // If a specific agent is already requested AND we have a runner registered + // for it, skip selection and classification. Unknown/empty types fall + // through to the load balancer. + _, runnerKnown := p.runners[t.Agent.Type] + skipClassification := t.Agent.Type != "" && runnerKnown if !skipClassification { // Deterministically pick the agent with fewest active tasks. @@ -915,16 +929,6 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { agentType = "claude" } - defer func() { - p.mu.Lock() - p.active-- - p.mu.Unlock() - select { - case p.doneCh <- struct{}{}: - default: - } - }() - // Check dependencies before taking the per-agent slot to avoid deadlock: // if a dependent task holds the slot while waiting for its dependency to run, // the dependency can never start (maxPerAgent=1). @@ -981,6 +985,9 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { p.activePerAgent[agentType]++ p.mu.Unlock() + var cleaned bool + defer p.decActiveAgent(agentType, &cleaned) + runner, err := p.getRunner(t) if err != nil { p.logger.Error("failed to get runner", "error", err, "taskID", t.ID) @@ -999,12 +1006,7 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil { p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateFailed, "error", err) } - p.mu.Lock() - p.activePerAgent[agentType]-- - if p.activePerAgent[agentType] == 0 { - delete(p.activePerAgent, agentType) - } - p.mu.Unlock() + p.decActiveAgent(agentType, &cleaned) p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} return } @@ -1074,6 +1076,7 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { err = runner.Run(ctx, t, exec) exec.EndTime = time.Now().UTC() + p.decActiveAgent(agentType, &cleaned) p.handleRunResult(ctx, t, exec, err, agentType) } |
