summaryrefslogtreecommitdiff
path: root/internal/executor/executor.go
diff options
context:
space:
mode:
authorPeter Stone <thepeterstone@gmail.com>2026-05-13 04:02:20 +0000
committerPeter Stone <thepeterstone@gmail.com>2026-05-13 04:02:20 +0000
commit68399a598924775a3ec22a39c2336ae497fb07f3 (patch)
tree29ade8224eb51eca47a1d9d03bb4d0d3653a72aa /internal/executor/executor.go
parentf01231cc45f41ce2dc37072e77428e467ef3fc15 (diff)
parentd970c0730ff0dc7d714d3261197d8ba52b5d21f4 (diff)
merge: integrate github/main — LocalRunner, real GeminiRunner, llm clientHEADmain
Merges 12 commits from github/main (formerly master) that were developed independently. Key additions: - LocalRunner: OpenAI-compatible local LLM execution (Ollama, LM Studio) - Real GeminiRunner with full sandbox parity to ClaudeRunner - llm.Client for enriching CI failures and elaboration via local model - retry.ParseRetryAfter moved to shared package - tokens_in/tokens_out columns in executions table Conflict resolutions: - Kept local main's VAPID/push, stories, projects, agent events schema - Merged both sets of Config fields (local + LocalModel from github/main) - Unified activePerAgent accounting (decActiveAgent helper) - Removed duplicate helpers from claude.go (now in helpers.go) - Fixed double-decrement bug in handleRunResult vs decActiveAgent Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/executor/executor.go')
-rw-r--r--internal/executor/executor.go101
1 files changed, 52 insertions, 49 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index 376d62c..09169bd 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -12,6 +12,8 @@ import (
"sync"
"time"
+ "github.com/thepeterstone/claudomator/internal/llm"
+ "github.com/thepeterstone/claudomator/internal/retry"
"github.com/thepeterstone/claudomator/internal/storage"
"github.com/thepeterstone/claudomator/internal/task"
"github.com/google/uuid"
@@ -75,17 +77,18 @@ type Pool struct {
mu sync.Mutex
active int
activePerAgent map[string]int
- rateLimited map[string]time.Time // agentType -> until
+ rateLimited map[string]time.Time // agentType -> until
cancels map[string]context.CancelFunc // taskID → cancel
- consecutiveFailures map[string]int // agentType -> count
- closed bool // set to true when Shutdown has been called
+ consecutiveFailures map[string]int // agentType -> count
+ closed bool // set to true when Shutdown has been called
resultCh chan *Result
- startedCh chan string // task IDs that just transitioned to RUNNING
- workCh chan workItem // internal bounded queue; Submit enqueues here
- doneCh chan struct{} // signals when a worker slot is freed
- workerWg sync.WaitGroup // tracks in-flight execute/executeResume goroutines
- dispatchDone chan struct{} // closed when the dispatch goroutine exits
+ startedCh chan string // task IDs that just transitioned to RUNNING
+ workCh chan workItem // internal bounded queue; Submit enqueues here
+ doneCh chan struct{} // signals when a worker slot is freed
+ workerWg sync.WaitGroup // tracks in-flight execute/executeResume goroutines
+ dispatchDone chan struct{} // closed when the dispatch goroutine exits
Classifier *Classifier
+ LLM *llm.Client
}
// Result is emitted when a task execution completes.
@@ -258,6 +261,28 @@ func (p *Pool) getRunner(t *task.Task) (Runner, error) {
return runner, nil
}
+// decActiveAgent decrements the active counters for a finished task. Safe to
+// call multiple times — subsequent calls are no-ops via the cleaned flag.
+// Always call this before sending on resultCh so consumers observing a result
+// see the accounting already settled (no zero-count map entries lingering).
+func (p *Pool) decActiveAgent(agentType string, cleaned *bool) {
+ if *cleaned {
+ return
+ }
+ *cleaned = true
+ p.mu.Lock()
+ p.active--
+ p.activePerAgent[agentType]--
+ if p.activePerAgent[agentType] == 0 {
+ delete(p.activePerAgent, agentType)
+ }
+ p.mu.Unlock()
+ select {
+ case p.doneCh <- struct{}{}:
+ default:
+ }
+}
+
func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Execution) {
agentType := t.Agent.Type
if agentType == "" {
@@ -268,25 +293,13 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex
p.activePerAgent[agentType]++
p.mu.Unlock()
- defer func() {
- p.mu.Lock()
- p.active--
- p.mu.Unlock()
- select {
- case p.doneCh <- struct{}{}:
- default:
- }
- }()
+ var cleaned bool
+ defer p.decActiveAgent(agentType, &cleaned)
runner, err := p.getRunner(t)
if err != nil {
p.logger.Error("failed to get runner for resume", "error", err, "taskID", t.ID)
- p.mu.Lock()
- p.activePerAgent[agentType]--
- if p.activePerAgent[agentType] == 0 {
- delete(p.activePerAgent, agentType)
- }
- p.mu.Unlock()
+ p.decActiveAgent(agentType, &cleaned)
p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err}
return
}
@@ -342,6 +355,7 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex
err = runner.Run(ctx, t, exec)
exec.EndTime = time.Now().UTC()
+ p.decActiveAgent(agentType, &cleaned)
p.handleRunResult(ctx, t, exec, err, agentType)
}
@@ -351,9 +365,9 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex
// resultCh. The caller must set exec.EndTime before calling.
func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.Execution, err error, agentType string) {
if err != nil {
- if isRateLimitError(err) || isQuotaExhausted(err) {
+ if retry.IsRateLimitError(err) || isQuotaExhausted(err) {
p.mu.Lock()
- retryAfter := parseRetryAfter(err.Error())
+ retryAfter := retry.ParseRetryAfter(err.Error())
reason := "transient"
if isQuotaExhausted(err) {
reason = "quota"
@@ -505,6 +519,9 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.
if summary == "" && exec.StdoutPath != "" {
summary = extractSummary(exec.StdoutPath)
}
+ if summary == "" && p.LLM != nil && exec.StdoutPath != "" {
+ summary = synthesizeSummary(ctx, p.LLM, exec.StdoutPath)
+ }
if summary != "" {
if summaryErr := p.store.UpdateTaskSummary(t.ID, summary); summaryErr != nil {
p.logger.Error("failed to update task summary", "taskID", t.ID, "error", summaryErr)
@@ -528,12 +545,6 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.
if updateErr := p.store.UpdateExecution(exec); updateErr != nil {
p.logger.Error("failed to update execution", "error", updateErr)
}
- p.mu.Lock()
- p.activePerAgent[agentType]--
- if p.activePerAgent[agentType] == 0 {
- delete(p.activePerAgent, agentType)
- }
- p.mu.Unlock()
p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err}
}
@@ -884,8 +895,11 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
}
p.mu.Unlock()
- // If a specific agent is already requested, skip selection and classification.
- skipClassification := t.Agent.Type == "claude" || t.Agent.Type == "gemini"
+ // If a specific agent is already requested AND we have a runner registered
+ // for it, skip selection and classification. Unknown/empty types fall
+ // through to the load balancer.
+ _, runnerKnown := p.runners[t.Agent.Type]
+ skipClassification := t.Agent.Type != "" && runnerKnown
if !skipClassification {
// Deterministically pick the agent with fewest active tasks.
@@ -915,16 +929,6 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
agentType = "claude"
}
- defer func() {
- p.mu.Lock()
- p.active--
- p.mu.Unlock()
- select {
- case p.doneCh <- struct{}{}:
- default:
- }
- }()
-
// Check dependencies before taking the per-agent slot to avoid deadlock:
// if a dependent task holds the slot while waiting for its dependency to run,
// the dependency can never start (maxPerAgent=1).
@@ -981,6 +985,9 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
p.activePerAgent[agentType]++
p.mu.Unlock()
+ var cleaned bool
+ defer p.decActiveAgent(agentType, &cleaned)
+
runner, err := p.getRunner(t)
if err != nil {
p.logger.Error("failed to get runner", "error", err, "taskID", t.ID)
@@ -999,12 +1006,7 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil {
p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateFailed, "error", err)
}
- p.mu.Lock()
- p.activePerAgent[agentType]--
- if p.activePerAgent[agentType] == 0 {
- delete(p.activePerAgent, agentType)
- }
- p.mu.Unlock()
+ p.decActiveAgent(agentType, &cleaned)
p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err}
return
}
@@ -1074,6 +1076,7 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
err = runner.Run(ctx, t, exec)
exec.EndTime = time.Now().UTC()
+ p.decActiveAgent(agentType, &cleaned)
p.handleRunResult(ctx, t, exec, err, agentType)
}