From 7e8967decbc8221694953abf1435fda8aaf18824 Mon Sep 17 00:00:00 2001 From: Peter Stone Date: Thu, 19 Mar 2026 23:03:56 +0000 Subject: feat: agent status dashboard with availability timeline and Gemini quota detection - Detect Gemini TerminalQuotaError (daily quota) as BUDGET_EXCEEDED, not generic FAILED - Surface container stderr tail in error so quota/rate-limit classifiers can match it - Add agent_events table to persist rate-limit start/recovery events across restarts - Add GET /api/agents/status endpoint returning live agent state + 24h event history - Stats dashboard: agent status cards, 24h availability timeline, per-run execution table Co-Authored-By: Claude Sonnet 4.6 --- internal/executor/container.go | 6 ++++ internal/executor/executor.go | 67 ++++++++++++++++++++++++++++++++++--- internal/executor/executor_test.go | 1 + internal/executor/helpers.go | 24 +++++++++++++ internal/executor/ratelimit.go | 6 +++- internal/executor/ratelimit_test.go | 30 +++++++++++++++++ 6 files changed, 128 insertions(+), 6 deletions(-) (limited to 'internal/executor') diff --git a/internal/executor/container.go b/internal/executor/container.go index c43e201..ba0c03a 100644 --- a/internal/executor/container.go +++ b/internal/executor/container.go @@ -290,6 +290,12 @@ func (r *ContainerRunner) Run(ctx context.Context, t *task.Task, e *storage.Exec } if waitErr != nil { + // Append the tail of stderr so error classifiers (isQuotaExhausted, isRateLimitError) + // can inspect agent-specific messages (e.g. Gemini TerminalQuotaError). + stderrTail := readFileTail(e.StderrPath, 4096) + if stderrTail != "" { + return fmt.Errorf("container execution failed: %w\n%s", waitErr, stderrTail) + } return fmt.Errorf("container execution failed: %w", waitErr) } if streamErr != nil { diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 219a40b..1f2c27d 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -30,6 +30,7 @@ type Store interface { AppendTaskInteraction(taskID string, interaction task.Interaction) error UpdateTaskAgent(id string, agent task.AgentConfig) error UpdateExecutionChangestats(execID string, stats *task.Changestats) error + RecordAgentEvent(e storage.AgentEvent) error } // LogPather is an optional interface runners can implement to provide the log @@ -273,16 +274,32 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. if isRateLimitError(err) || isQuotaExhausted(err) { p.mu.Lock() retryAfter := parseRetryAfter(err.Error()) - if retryAfter == 0 { - if isQuotaExhausted(err) { + reason := "transient" + if isQuotaExhausted(err) { + reason = "quota" + if retryAfter == 0 { retryAfter = 5 * time.Hour - } else { - retryAfter = 1 * time.Minute } + } else if retryAfter == 0 { + retryAfter = 1 * time.Minute } - p.rateLimited[agentType] = time.Now().Add(retryAfter) + until := time.Now().Add(retryAfter) + p.rateLimited[agentType] = until p.logger.Info("agent rate limited", "agent", agentType, "retryAfter", retryAfter, "quotaExhausted", isQuotaExhausted(err)) p.mu.Unlock() + go func() { + ev := storage.AgentEvent{ + ID: uuid.New().String(), + Agent: agentType, + Event: "rate_limited", + Timestamp: time.Now(), + Until: &until, + Reason: reason, + } + if recErr := p.store.RecordAgentEvent(ev); recErr != nil { + p.logger.Warn("failed to record agent event", "error", recErr) + } + }() } var blockedErr *BlockedError @@ -382,6 +399,34 @@ func (p *Pool) ActiveCount() int { return p.active } +// AgentStatusInfo holds the current state of a single agent. +type AgentStatusInfo struct { + Agent string `json:"agent"` + ActiveTasks int `json:"active_tasks"` + RateLimited bool `json:"rate_limited"` + Until *time.Time `json:"until,omitempty"` +} + +// AgentStatuses returns the current status of all registered agents. +func (p *Pool) AgentStatuses() []AgentStatusInfo { + p.mu.Lock() + defer p.mu.Unlock() + now := time.Now() + var out []AgentStatusInfo + for agent := range p.runners { + info := AgentStatusInfo{ + Agent: agent, + ActiveTasks: p.activePerAgent[agent], + } + if deadline, ok := p.rateLimited[agent]; ok && now.Before(deadline) { + info.RateLimited = true + info.Until = &deadline + } + out = append(out, info) + } + return out +} + // pickAgent selects the best agent from the given SystemStatus using explicit // load balancing: prefer the available (non-rate-limited) agent with the fewest // active tasks. If all agents are rate-limited, fall back to fewest active. @@ -423,6 +468,18 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { activeTasks[agent] = p.activePerAgent[agent] if deadline, ok := p.rateLimited[agent]; ok && now.After(deadline) { delete(p.rateLimited, agent) + agentName := agent + go func() { + ev := storage.AgentEvent{ + ID: uuid.New().String(), + Agent: agentName, + Event: "available", + Timestamp: time.Now(), + } + if recErr := p.store.RecordAgentEvent(ev); recErr != nil { + p.logger.Warn("failed to record agent available event", "error", recErr) + } + }() } rateLimited[agent] = now.Before(p.rateLimited[agent]) } diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go index e91d435..91d0137 100644 --- a/internal/executor/executor_test.go +++ b/internal/executor/executor_test.go @@ -1056,6 +1056,7 @@ func (m *minimalMockStore) UpdateExecutionChangestats(execID string, stats *task m.mu.Unlock() return nil } +func (m *minimalMockStore) RecordAgentEvent(_ storage.AgentEvent) error { return nil } func (m *minimalMockStore) lastStateUpdate() (string, task.State, bool) { m.mu.Lock() diff --git a/internal/executor/helpers.go b/internal/executor/helpers.go index 9e4530b..aee7da0 100644 --- a/internal/executor/helpers.go +++ b/internal/executor/helpers.go @@ -143,6 +143,30 @@ func tailFile(path string, n int) string { return strings.Join(lines, "\n") } +// readFileTail returns the last maxBytes bytes of the file at path as a string, +// or empty string if the file cannot be read. Used to surface agent stderr on failure. +func readFileTail(path string, maxBytes int64) string { + f, err := os.Open(path) + if err != nil { + return "" + } + defer f.Close() + fi, err := f.Stat() + if err != nil { + return "" + } + offset := fi.Size() - maxBytes + if offset < 0 { + offset = 0 + } + buf := make([]byte, fi.Size()-offset) + n, err := f.ReadAt(buf, offset) + if err != nil && n == 0 { + return "" + } + return strings.TrimSpace(string(buf[:n])) +} + func gitSafe(args ...string) []string { return append([]string{"-c", "safe.directory=*"}, args...) } diff --git a/internal/executor/ratelimit.go b/internal/executor/ratelimit.go index 1f38a6d..c916291 100644 --- a/internal/executor/ratelimit.go +++ b/internal/executor/ratelimit.go @@ -37,7 +37,11 @@ func isQuotaExhausted(err error) bool { strings.Contains(msg, "you've hit your limit") || strings.Contains(msg, "you have hit your limit") || strings.Contains(msg, "rate limit reached (rejected)") || - strings.Contains(msg, "status: rejected") + strings.Contains(msg, "status: rejected") || + // Gemini CLI quota exhaustion + strings.Contains(msg, "terminalquotaerror") || + strings.Contains(msg, "exhausted your daily quota") || + strings.Contains(msg, "generate_content_free_tier_requests") } // parseRetryAfter extracts a Retry-After duration from an error message. diff --git a/internal/executor/ratelimit_test.go b/internal/executor/ratelimit_test.go index f45216f..1434810 100644 --- a/internal/executor/ratelimit_test.go +++ b/internal/executor/ratelimit_test.go @@ -77,6 +77,36 @@ func TestParseRetryAfter_NoRetryInfo(t *testing.T) { } } +// --- isQuotaExhausted tests --- + +func TestIsQuotaExhausted_GeminiDailyQuota(t *testing.T) { + err := errors.New("container execution failed: exit status 1\nTerminalQuotaError: You have exhausted your daily quota on this model.") + if !isQuotaExhausted(err) { + t.Error("want true for Gemini TerminalQuotaError, got false") + } +} + +func TestIsQuotaExhausted_GeminiExhaustedMessage(t *testing.T) { + err := errors.New("container execution failed: exit status 1\nyou have exhausted your daily quota") + if !isQuotaExhausted(err) { + t.Error("want true for 'exhausted your daily quota', got false") + } +} + +func TestIsQuotaExhausted_GeminiQuotaExceeded(t *testing.T) { + err := errors.New("container execution failed: exit status 1\nQuota exceeded for metric: generativelanguage.googleapis.com/generate_content_free_tier_requests") + if !isQuotaExhausted(err) { + t.Error("want true for Gemini free tier quota exceeded, got false") + } +} + +func TestIsQuotaExhausted_NotQuota(t *testing.T) { + err := errors.New("container execution failed: exit status 1") + if isQuotaExhausted(err) { + t.Error("want false for generic exit status 1, got true") + } +} + // --- runWithBackoff tests --- func TestRunWithBackoff_SuccessOnFirstTry(t *testing.T) { -- cgit v1.2.3