diff options
| author | Peter Stone <thepeterstone@gmail.com> | 2026-03-19 23:03:56 +0000 |
|---|---|---|
| committer | Peter Stone <thepeterstone@gmail.com> | 2026-03-19 23:03:56 +0000 |
| commit | 7e8967decbc8221694953abf1435fda8aaf18824 (patch) | |
| tree | 3cee147c32da1565ec1e5ea72b0ddf131077dd66 /internal | |
| parent | e2f5379e00747f17d91ee1c90828d4494c2eb4d8 (diff) | |
feat: agent status dashboard with availability timeline and Gemini quota detection
- Detect Gemini TerminalQuotaError (daily quota) as BUDGET_EXCEEDED, not generic FAILED
- Surface container stderr tail in error so quota/rate-limit classifiers can match it
- Add agent_events table to persist rate-limit start/recovery events across restarts
- Add GET /api/agents/status endpoint returning live agent state + 24h event history
- Stats dashboard: agent status cards, 24h availability timeline, per-run execution table
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/api/executions.go | 25 | ||||
| -rw-r--r-- | internal/api/server.go | 1 | ||||
| -rw-r--r-- | internal/executor/container.go | 6 | ||||
| -rw-r--r-- | internal/executor/executor.go | 67 | ||||
| -rw-r--r-- | internal/executor/executor_test.go | 1 | ||||
| -rw-r--r-- | internal/executor/helpers.go | 24 | ||||
| -rw-r--r-- | internal/executor/ratelimit.go | 6 | ||||
| -rw-r--r-- | internal/executor/ratelimit_test.go | 30 | ||||
| -rw-r--r-- | internal/storage/db.go | 63 |
9 files changed, 217 insertions, 6 deletions
diff --git a/internal/api/executions.go b/internal/api/executions.go index 114425e..29af139 100644 --- a/internal/api/executions.go +++ b/internal/api/executions.go @@ -86,6 +86,31 @@ func (s *Server) handleGetExecutionLog(w http.ResponseWriter, r *http.Request) { fmt.Fprint(w, content) } +// handleGetAgentStatus returns the current status of all agents and recent rate-limit events. +// GET /api/agents/status?since=<RFC3339> +func (s *Server) handleGetAgentStatus(w http.ResponseWriter, r *http.Request) { + since := time.Now().Add(-24 * time.Hour) + if v := r.URL.Query().Get("since"); v != "" { + if t, err := time.Parse(time.RFC3339, v); err == nil { + since = t + } + } + + events, err := s.store.ListAgentEvents(since) + if err != nil { + writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) + return + } + if events == nil { + events = []storage.AgentEvent{} + } + + writeJSON(w, http.StatusOK, map[string]interface{}{ + "agents": s.pool.AgentStatuses(), + "events": events, + }) +} + // tailLogFile reads the last n lines from the file at path. func tailLogFile(path string, n int) (string, error) { data, err := os.ReadFile(path) diff --git a/internal/api/server.go b/internal/api/server.go index e5d0ba6..2d5c308 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -121,6 +121,7 @@ func (s *Server) routes() { s.mux.HandleFunc("GET /api/tasks/{id}/subtasks", s.handleListSubtasks) s.mux.HandleFunc("GET /api/tasks/{id}/executions", s.handleListExecutions) s.mux.HandleFunc("GET /api/executions", s.handleListRecentExecutions) + s.mux.HandleFunc("GET /api/agents/status", s.handleGetAgentStatus) s.mux.HandleFunc("GET /api/executions/{id}", s.handleGetExecution) s.mux.HandleFunc("GET /api/executions/{id}/log", s.handleGetExecutionLog) s.mux.HandleFunc("GET /api/tasks/{id}/logs/stream", s.handleStreamTaskLogs) diff --git a/internal/executor/container.go b/internal/executor/container.go index c43e201..ba0c03a 100644 --- a/internal/executor/container.go +++ b/internal/executor/container.go @@ -290,6 +290,12 @@ func (r *ContainerRunner) Run(ctx context.Context, t *task.Task, e *storage.Exec } if waitErr != nil { + // Append the tail of stderr so error classifiers (isQuotaExhausted, isRateLimitError) + // can inspect agent-specific messages (e.g. Gemini TerminalQuotaError). + stderrTail := readFileTail(e.StderrPath, 4096) + if stderrTail != "" { + return fmt.Errorf("container execution failed: %w\n%s", waitErr, stderrTail) + } return fmt.Errorf("container execution failed: %w", waitErr) } if streamErr != nil { diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 219a40b..1f2c27d 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -30,6 +30,7 @@ type Store interface { AppendTaskInteraction(taskID string, interaction task.Interaction) error UpdateTaskAgent(id string, agent task.AgentConfig) error UpdateExecutionChangestats(execID string, stats *task.Changestats) error + RecordAgentEvent(e storage.AgentEvent) error } // LogPather is an optional interface runners can implement to provide the log @@ -273,16 +274,32 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. if isRateLimitError(err) || isQuotaExhausted(err) { p.mu.Lock() retryAfter := parseRetryAfter(err.Error()) - if retryAfter == 0 { - if isQuotaExhausted(err) { + reason := "transient" + if isQuotaExhausted(err) { + reason = "quota" + if retryAfter == 0 { retryAfter = 5 * time.Hour - } else { - retryAfter = 1 * time.Minute } + } else if retryAfter == 0 { + retryAfter = 1 * time.Minute } - p.rateLimited[agentType] = time.Now().Add(retryAfter) + until := time.Now().Add(retryAfter) + p.rateLimited[agentType] = until p.logger.Info("agent rate limited", "agent", agentType, "retryAfter", retryAfter, "quotaExhausted", isQuotaExhausted(err)) p.mu.Unlock() + go func() { + ev := storage.AgentEvent{ + ID: uuid.New().String(), + Agent: agentType, + Event: "rate_limited", + Timestamp: time.Now(), + Until: &until, + Reason: reason, + } + if recErr := p.store.RecordAgentEvent(ev); recErr != nil { + p.logger.Warn("failed to record agent event", "error", recErr) + } + }() } var blockedErr *BlockedError @@ -382,6 +399,34 @@ func (p *Pool) ActiveCount() int { return p.active } +// AgentStatusInfo holds the current state of a single agent. +type AgentStatusInfo struct { + Agent string `json:"agent"` + ActiveTasks int `json:"active_tasks"` + RateLimited bool `json:"rate_limited"` + Until *time.Time `json:"until,omitempty"` +} + +// AgentStatuses returns the current status of all registered agents. +func (p *Pool) AgentStatuses() []AgentStatusInfo { + p.mu.Lock() + defer p.mu.Unlock() + now := time.Now() + var out []AgentStatusInfo + for agent := range p.runners { + info := AgentStatusInfo{ + Agent: agent, + ActiveTasks: p.activePerAgent[agent], + } + if deadline, ok := p.rateLimited[agent]; ok && now.Before(deadline) { + info.RateLimited = true + info.Until = &deadline + } + out = append(out, info) + } + return out +} + // pickAgent selects the best agent from the given SystemStatus using explicit // load balancing: prefer the available (non-rate-limited) agent with the fewest // active tasks. If all agents are rate-limited, fall back to fewest active. @@ -423,6 +468,18 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { activeTasks[agent] = p.activePerAgent[agent] if deadline, ok := p.rateLimited[agent]; ok && now.After(deadline) { delete(p.rateLimited, agent) + agentName := agent + go func() { + ev := storage.AgentEvent{ + ID: uuid.New().String(), + Agent: agentName, + Event: "available", + Timestamp: time.Now(), + } + if recErr := p.store.RecordAgentEvent(ev); recErr != nil { + p.logger.Warn("failed to record agent available event", "error", recErr) + } + }() } rateLimited[agent] = now.Before(p.rateLimited[agent]) } diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go index e91d435..91d0137 100644 --- a/internal/executor/executor_test.go +++ b/internal/executor/executor_test.go @@ -1056,6 +1056,7 @@ func (m *minimalMockStore) UpdateExecutionChangestats(execID string, stats *task m.mu.Unlock() return nil } +func (m *minimalMockStore) RecordAgentEvent(_ storage.AgentEvent) error { return nil } func (m *minimalMockStore) lastStateUpdate() (string, task.State, bool) { m.mu.Lock() diff --git a/internal/executor/helpers.go b/internal/executor/helpers.go index 9e4530b..aee7da0 100644 --- a/internal/executor/helpers.go +++ b/internal/executor/helpers.go @@ -143,6 +143,30 @@ func tailFile(path string, n int) string { return strings.Join(lines, "\n") } +// readFileTail returns the last maxBytes bytes of the file at path as a string, +// or empty string if the file cannot be read. Used to surface agent stderr on failure. +func readFileTail(path string, maxBytes int64) string { + f, err := os.Open(path) + if err != nil { + return "" + } + defer f.Close() + fi, err := f.Stat() + if err != nil { + return "" + } + offset := fi.Size() - maxBytes + if offset < 0 { + offset = 0 + } + buf := make([]byte, fi.Size()-offset) + n, err := f.ReadAt(buf, offset) + if err != nil && n == 0 { + return "" + } + return strings.TrimSpace(string(buf[:n])) +} + func gitSafe(args ...string) []string { return append([]string{"-c", "safe.directory=*"}, args...) } diff --git a/internal/executor/ratelimit.go b/internal/executor/ratelimit.go index 1f38a6d..c916291 100644 --- a/internal/executor/ratelimit.go +++ b/internal/executor/ratelimit.go @@ -37,7 +37,11 @@ func isQuotaExhausted(err error) bool { strings.Contains(msg, "you've hit your limit") || strings.Contains(msg, "you have hit your limit") || strings.Contains(msg, "rate limit reached (rejected)") || - strings.Contains(msg, "status: rejected") + strings.Contains(msg, "status: rejected") || + // Gemini CLI quota exhaustion + strings.Contains(msg, "terminalquotaerror") || + strings.Contains(msg, "exhausted your daily quota") || + strings.Contains(msg, "generate_content_free_tier_requests") } // parseRetryAfter extracts a Retry-After duration from an error message. diff --git a/internal/executor/ratelimit_test.go b/internal/executor/ratelimit_test.go index f45216f..1434810 100644 --- a/internal/executor/ratelimit_test.go +++ b/internal/executor/ratelimit_test.go @@ -77,6 +77,36 @@ func TestParseRetryAfter_NoRetryInfo(t *testing.T) { } } +// --- isQuotaExhausted tests --- + +func TestIsQuotaExhausted_GeminiDailyQuota(t *testing.T) { + err := errors.New("container execution failed: exit status 1\nTerminalQuotaError: You have exhausted your daily quota on this model.") + if !isQuotaExhausted(err) { + t.Error("want true for Gemini TerminalQuotaError, got false") + } +} + +func TestIsQuotaExhausted_GeminiExhaustedMessage(t *testing.T) { + err := errors.New("container execution failed: exit status 1\nyou have exhausted your daily quota") + if !isQuotaExhausted(err) { + t.Error("want true for 'exhausted your daily quota', got false") + } +} + +func TestIsQuotaExhausted_GeminiQuotaExceeded(t *testing.T) { + err := errors.New("container execution failed: exit status 1\nQuota exceeded for metric: generativelanguage.googleapis.com/generate_content_free_tier_requests") + if !isQuotaExhausted(err) { + t.Error("want true for Gemini free tier quota exceeded, got false") + } +} + +func TestIsQuotaExhausted_NotQuota(t *testing.T) { + err := errors.New("container execution failed: exit status 1") + if isQuotaExhausted(err) { + t.Error("want false for generic exit status 1, got true") + } +} + // --- runWithBackoff tests --- func TestRunWithBackoff_SuccessOnFirstTry(t *testing.T) { diff --git a/internal/storage/db.go b/internal/storage/db.go index 8bc9864..0d11b4e 100644 --- a/internal/storage/db.go +++ b/internal/storage/db.go @@ -99,6 +99,16 @@ func (s *DB) migrate() error { key TEXT PRIMARY KEY, value TEXT NOT NULL )`, + `CREATE TABLE IF NOT EXISTS agent_events ( + id TEXT PRIMARY KEY, + agent TEXT NOT NULL, + event TEXT NOT NULL, + timestamp DATETIME NOT NULL, + until DATETIME, + reason TEXT + )`, + `CREATE INDEX IF NOT EXISTS idx_agent_events_agent ON agent_events(agent)`, + `CREATE INDEX IF NOT EXISTS idx_agent_events_timestamp ON agent_events(timestamp)`, } for _, m := range migrations { if _, err := s.db.Exec(m); err != nil { @@ -858,3 +868,56 @@ func (s *DB) SetSetting(key, value string) error { _, err := s.db.Exec(`INSERT INTO settings (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value = excluded.value`, key, value) return err } + +// AgentEvent records a rate-limit state change for an agent. +type AgentEvent struct { + ID string + Agent string + Event string // "rate_limited" | "available" + Timestamp time.Time + Until *time.Time // non-nil for "rate_limited" events + Reason string // "transient" | "quota" +} + +// RecordAgentEvent inserts an agent rate-limit event. +func (s *DB) RecordAgentEvent(e AgentEvent) error { + _, err := s.db.Exec( + `INSERT INTO agent_events (id, agent, event, timestamp, until, reason) VALUES (?, ?, ?, ?, ?, ?)`, + e.ID, e.Agent, e.Event, e.Timestamp.UTC(), timeOrNull(e.Until), e.Reason, + ) + return err +} + +// ListAgentEvents returns agent events since the given time, newest first. +func (s *DB) ListAgentEvents(since time.Time) ([]AgentEvent, error) { + rows, err := s.db.Query( + `SELECT id, agent, event, timestamp, until, reason FROM agent_events WHERE timestamp >= ? ORDER BY timestamp DESC LIMIT 500`, + since.UTC(), + ) + if err != nil { + return nil, err + } + defer rows.Close() + var events []AgentEvent + for rows.Next() { + var e AgentEvent + var until sql.NullTime + var reason sql.NullString + if err := rows.Scan(&e.ID, &e.Agent, &e.Event, &e.Timestamp, &until, &reason); err != nil { + return nil, err + } + if until.Valid { + e.Until = &until.Time + } + e.Reason = reason.String + events = append(events, e) + } + return events, rows.Err() +} + +func timeOrNull(t *time.Time) interface{} { + if t == nil { + return nil + } + return t.UTC() +} |
