summaryrefslogtreecommitdiff
path: root/internal/executor/executor.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/executor/executor.go')
-rw-r--r--internal/executor/executor.go19
1 files changed, 15 insertions, 4 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index c07171b..4501a3c 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -10,6 +10,8 @@ import (
"sync"
"time"
+ "github.com/thepeterstone/claudomator/internal/llm"
+ "github.com/thepeterstone/claudomator/internal/retry"
"github.com/thepeterstone/claudomator/internal/storage"
"github.com/thepeterstone/claudomator/internal/task"
"github.com/google/uuid"
@@ -69,6 +71,9 @@ type Pool struct {
doneCh chan struct{} // signals when a worker slot is freed
Questions *QuestionRegistry
Classifier *Classifier
+ // LLM, when non-nil, enables LLM-synthesized summaries for executions
+ // whose stdout did not include a "## Summary" heading.
+ LLM *llm.Client
}
// Result is emitted when a task execution completes.
@@ -268,9 +273,9 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex
// resultCh. The caller must set exec.EndTime before calling.
func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.Execution, err error, agentType string) {
if err != nil {
- if isRateLimitError(err) || isQuotaExhausted(err) {
+ if retry.IsRateLimitError(err) || isQuotaExhausted(err) {
p.mu.Lock()
- retryAfter := parseRetryAfter(err.Error())
+ retryAfter := retry.ParseRetryAfter(err.Error())
if retryAfter == 0 {
if isQuotaExhausted(err) {
retryAfter = 5 * time.Hour
@@ -348,6 +353,9 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.
if summary == "" && exec.StdoutPath != "" {
summary = extractSummary(exec.StdoutPath)
}
+ if summary == "" && p.LLM != nil && exec.StdoutPath != "" {
+ summary = synthesizeSummary(ctx, p.LLM, exec.StdoutPath)
+ }
if summary != "" {
if summaryErr := p.store.UpdateTaskSummary(t.ID, summary); summaryErr != nil {
p.logger.Error("failed to update task summary", "taskID", t.ID, "error", summaryErr)
@@ -424,8 +432,11 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
}
p.mu.Unlock()
- // If a specific agent is already requested, skip selection and classification.
- skipClassification := t.Agent.Type == "claude" || t.Agent.Type == "gemini"
+ // If a specific agent is already requested AND we have a runner registered
+ // for it, skip selection and classification. Unknown/empty types fall
+ // through to the load balancer.
+ _, runnerKnown := p.runners[t.Agent.Type]
+ skipClassification := t.Agent.Type != "" && runnerKnown
if !skipClassification {
// Deterministically pick the agent with fewest active tasks.