diff options
| author | Peter Stone <thepeterstone@gmail.com> | 2026-05-01 22:14:37 -1000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2026-05-01 22:14:37 -1000 |
| commit | 99115d8158137083239c45e5a860b718ff4cefa1 (patch) | |
| tree | 1bf3bd0505eea79375c67af83c7c5fe8c0f274ff /internal/executor/executor.go | |
| parent | c2aa026f6ce1c9e216b99d74f294fc133d5fcddd (diff) | |
| parent | 50f8fe8c1ff8b82e0bd399e5776e58bda3e57d1c (diff) | |
Merge pull request #1 from thepeterstone/claude/local-oss-model-agents-MEBqj
Local OSS models as a third runner (epic)
Diffstat (limited to 'internal/executor/executor.go')
| -rw-r--r-- | internal/executor/executor.go | 19 |
1 files changed, 15 insertions, 4 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go index c07171b..4501a3c 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -10,6 +10,8 @@ import ( "sync" "time" + "github.com/thepeterstone/claudomator/internal/llm" + "github.com/thepeterstone/claudomator/internal/retry" "github.com/thepeterstone/claudomator/internal/storage" "github.com/thepeterstone/claudomator/internal/task" "github.com/google/uuid" @@ -69,6 +71,9 @@ type Pool struct { doneCh chan struct{} // signals when a worker slot is freed Questions *QuestionRegistry Classifier *Classifier + // LLM, when non-nil, enables LLM-synthesized summaries for executions + // whose stdout did not include a "## Summary" heading. + LLM *llm.Client } // Result is emitted when a task execution completes. @@ -268,9 +273,9 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex // resultCh. The caller must set exec.EndTime before calling. func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.Execution, err error, agentType string) { if err != nil { - if isRateLimitError(err) || isQuotaExhausted(err) { + if retry.IsRateLimitError(err) || isQuotaExhausted(err) { p.mu.Lock() - retryAfter := parseRetryAfter(err.Error()) + retryAfter := retry.ParseRetryAfter(err.Error()) if retryAfter == 0 { if isQuotaExhausted(err) { retryAfter = 5 * time.Hour @@ -348,6 +353,9 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. if summary == "" && exec.StdoutPath != "" { summary = extractSummary(exec.StdoutPath) } + if summary == "" && p.LLM != nil && exec.StdoutPath != "" { + summary = synthesizeSummary(ctx, p.LLM, exec.StdoutPath) + } if summary != "" { if summaryErr := p.store.UpdateTaskSummary(t.ID, summary); summaryErr != nil { p.logger.Error("failed to update task summary", "taskID", t.ID, "error", summaryErr) @@ -424,8 +432,11 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { } p.mu.Unlock() - // If a specific agent is already requested, skip selection and classification. - skipClassification := t.Agent.Type == "claude" || t.Agent.Type == "gemini" + // If a specific agent is already requested AND we have a runner registered + // for it, skip selection and classification. Unknown/empty types fall + // through to the load balancer. + _, runnerKnown := p.runners[t.Agent.Type] + skipClassification := t.Agent.Type != "" && runnerKnown if !skipClassification { // Deterministically pick the agent with fewest active tasks. |
