summaryrefslogtreecommitdiff
path: root/internal/executor
diff options
context:
space:
mode:
Diffstat (limited to 'internal/executor')
-rw-r--r--internal/executor/container.go6
-rw-r--r--internal/executor/executor.go67
-rw-r--r--internal/executor/executor_test.go1
-rw-r--r--internal/executor/helpers.go24
-rw-r--r--internal/executor/ratelimit.go6
-rw-r--r--internal/executor/ratelimit_test.go30
6 files changed, 128 insertions, 6 deletions
diff --git a/internal/executor/container.go b/internal/executor/container.go
index c43e201..ba0c03a 100644
--- a/internal/executor/container.go
+++ b/internal/executor/container.go
@@ -290,6 +290,12 @@ func (r *ContainerRunner) Run(ctx context.Context, t *task.Task, e *storage.Exec
}
if waitErr != nil {
+ // Append the tail of stderr so error classifiers (isQuotaExhausted, isRateLimitError)
+ // can inspect agent-specific messages (e.g. Gemini TerminalQuotaError).
+ stderrTail := readFileTail(e.StderrPath, 4096)
+ if stderrTail != "" {
+ return fmt.Errorf("container execution failed: %w\n%s", waitErr, stderrTail)
+ }
return fmt.Errorf("container execution failed: %w", waitErr)
}
if streamErr != nil {
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index 219a40b..1f2c27d 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -30,6 +30,7 @@ type Store interface {
AppendTaskInteraction(taskID string, interaction task.Interaction) error
UpdateTaskAgent(id string, agent task.AgentConfig) error
UpdateExecutionChangestats(execID string, stats *task.Changestats) error
+ RecordAgentEvent(e storage.AgentEvent) error
}
// LogPather is an optional interface runners can implement to provide the log
@@ -273,16 +274,32 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.
if isRateLimitError(err) || isQuotaExhausted(err) {
p.mu.Lock()
retryAfter := parseRetryAfter(err.Error())
- if retryAfter == 0 {
- if isQuotaExhausted(err) {
+ reason := "transient"
+ if isQuotaExhausted(err) {
+ reason = "quota"
+ if retryAfter == 0 {
retryAfter = 5 * time.Hour
- } else {
- retryAfter = 1 * time.Minute
}
+ } else if retryAfter == 0 {
+ retryAfter = 1 * time.Minute
}
- p.rateLimited[agentType] = time.Now().Add(retryAfter)
+ until := time.Now().Add(retryAfter)
+ p.rateLimited[agentType] = until
p.logger.Info("agent rate limited", "agent", agentType, "retryAfter", retryAfter, "quotaExhausted", isQuotaExhausted(err))
p.mu.Unlock()
+ go func() {
+ ev := storage.AgentEvent{
+ ID: uuid.New().String(),
+ Agent: agentType,
+ Event: "rate_limited",
+ Timestamp: time.Now(),
+ Until: &until,
+ Reason: reason,
+ }
+ if recErr := p.store.RecordAgentEvent(ev); recErr != nil {
+ p.logger.Warn("failed to record agent event", "error", recErr)
+ }
+ }()
}
var blockedErr *BlockedError
@@ -382,6 +399,34 @@ func (p *Pool) ActiveCount() int {
return p.active
}
+// AgentStatusInfo holds the current state of a single agent.
+type AgentStatusInfo struct {
+ Agent string `json:"agent"`
+ ActiveTasks int `json:"active_tasks"`
+ RateLimited bool `json:"rate_limited"`
+ Until *time.Time `json:"until,omitempty"`
+}
+
+// AgentStatuses returns the current status of all registered agents.
+func (p *Pool) AgentStatuses() []AgentStatusInfo {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ now := time.Now()
+ var out []AgentStatusInfo
+ for agent := range p.runners {
+ info := AgentStatusInfo{
+ Agent: agent,
+ ActiveTasks: p.activePerAgent[agent],
+ }
+ if deadline, ok := p.rateLimited[agent]; ok && now.Before(deadline) {
+ info.RateLimited = true
+ info.Until = &deadline
+ }
+ out = append(out, info)
+ }
+ return out
+}
+
// pickAgent selects the best agent from the given SystemStatus using explicit
// load balancing: prefer the available (non-rate-limited) agent with the fewest
// active tasks. If all agents are rate-limited, fall back to fewest active.
@@ -423,6 +468,18 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
activeTasks[agent] = p.activePerAgent[agent]
if deadline, ok := p.rateLimited[agent]; ok && now.After(deadline) {
delete(p.rateLimited, agent)
+ agentName := agent
+ go func() {
+ ev := storage.AgentEvent{
+ ID: uuid.New().String(),
+ Agent: agentName,
+ Event: "available",
+ Timestamp: time.Now(),
+ }
+ if recErr := p.store.RecordAgentEvent(ev); recErr != nil {
+ p.logger.Warn("failed to record agent available event", "error", recErr)
+ }
+ }()
}
rateLimited[agent] = now.Before(p.rateLimited[agent])
}
diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go
index e91d435..91d0137 100644
--- a/internal/executor/executor_test.go
+++ b/internal/executor/executor_test.go
@@ -1056,6 +1056,7 @@ func (m *minimalMockStore) UpdateExecutionChangestats(execID string, stats *task
m.mu.Unlock()
return nil
}
+func (m *minimalMockStore) RecordAgentEvent(_ storage.AgentEvent) error { return nil }
func (m *minimalMockStore) lastStateUpdate() (string, task.State, bool) {
m.mu.Lock()
diff --git a/internal/executor/helpers.go b/internal/executor/helpers.go
index 9e4530b..aee7da0 100644
--- a/internal/executor/helpers.go
+++ b/internal/executor/helpers.go
@@ -143,6 +143,30 @@ func tailFile(path string, n int) string {
return strings.Join(lines, "\n")
}
+// readFileTail returns the last maxBytes bytes of the file at path as a string,
+// or empty string if the file cannot be read. Used to surface agent stderr on failure.
+func readFileTail(path string, maxBytes int64) string {
+ f, err := os.Open(path)
+ if err != nil {
+ return ""
+ }
+ defer f.Close()
+ fi, err := f.Stat()
+ if err != nil {
+ return ""
+ }
+ offset := fi.Size() - maxBytes
+ if offset < 0 {
+ offset = 0
+ }
+ buf := make([]byte, fi.Size()-offset)
+ n, err := f.ReadAt(buf, offset)
+ if err != nil && n == 0 {
+ return ""
+ }
+ return strings.TrimSpace(string(buf[:n]))
+}
+
func gitSafe(args ...string) []string {
return append([]string{"-c", "safe.directory=*"}, args...)
}
diff --git a/internal/executor/ratelimit.go b/internal/executor/ratelimit.go
index 1f38a6d..c916291 100644
--- a/internal/executor/ratelimit.go
+++ b/internal/executor/ratelimit.go
@@ -37,7 +37,11 @@ func isQuotaExhausted(err error) bool {
strings.Contains(msg, "you've hit your limit") ||
strings.Contains(msg, "you have hit your limit") ||
strings.Contains(msg, "rate limit reached (rejected)") ||
- strings.Contains(msg, "status: rejected")
+ strings.Contains(msg, "status: rejected") ||
+ // Gemini CLI quota exhaustion
+ strings.Contains(msg, "terminalquotaerror") ||
+ strings.Contains(msg, "exhausted your daily quota") ||
+ strings.Contains(msg, "generate_content_free_tier_requests")
}
// parseRetryAfter extracts a Retry-After duration from an error message.
diff --git a/internal/executor/ratelimit_test.go b/internal/executor/ratelimit_test.go
index f45216f..1434810 100644
--- a/internal/executor/ratelimit_test.go
+++ b/internal/executor/ratelimit_test.go
@@ -77,6 +77,36 @@ func TestParseRetryAfter_NoRetryInfo(t *testing.T) {
}
}
+// --- isQuotaExhausted tests ---
+
+func TestIsQuotaExhausted_GeminiDailyQuota(t *testing.T) {
+ err := errors.New("container execution failed: exit status 1\nTerminalQuotaError: You have exhausted your daily quota on this model.")
+ if !isQuotaExhausted(err) {
+ t.Error("want true for Gemini TerminalQuotaError, got false")
+ }
+}
+
+func TestIsQuotaExhausted_GeminiExhaustedMessage(t *testing.T) {
+ err := errors.New("container execution failed: exit status 1\nyou have exhausted your daily quota")
+ if !isQuotaExhausted(err) {
+ t.Error("want true for 'exhausted your daily quota', got false")
+ }
+}
+
+func TestIsQuotaExhausted_GeminiQuotaExceeded(t *testing.T) {
+ err := errors.New("container execution failed: exit status 1\nQuota exceeded for metric: generativelanguage.googleapis.com/generate_content_free_tier_requests")
+ if !isQuotaExhausted(err) {
+ t.Error("want true for Gemini free tier quota exceeded, got false")
+ }
+}
+
+func TestIsQuotaExhausted_NotQuota(t *testing.T) {
+ err := errors.New("container execution failed: exit status 1")
+ if isQuotaExhausted(err) {
+ t.Error("want false for generic exit status 1, got true")
+ }
+}
+
// --- runWithBackoff tests ---
func TestRunWithBackoff_SuccessOnFirstTry(t *testing.T) {