summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
Diffstat (limited to 'internal')
-rw-r--r--internal/api/executions.go25
-rw-r--r--internal/api/server.go1
-rw-r--r--internal/executor/container.go6
-rw-r--r--internal/executor/executor.go67
-rw-r--r--internal/executor/executor_test.go1
-rw-r--r--internal/executor/helpers.go24
-rw-r--r--internal/executor/ratelimit.go6
-rw-r--r--internal/executor/ratelimit_test.go30
-rw-r--r--internal/storage/db.go63
9 files changed, 217 insertions, 6 deletions
diff --git a/internal/api/executions.go b/internal/api/executions.go
index 114425e..29af139 100644
--- a/internal/api/executions.go
+++ b/internal/api/executions.go
@@ -86,6 +86,31 @@ func (s *Server) handleGetExecutionLog(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, content)
}
+// handleGetAgentStatus returns the current status of all agents and recent rate-limit events.
+// GET /api/agents/status?since=<RFC3339>
+func (s *Server) handleGetAgentStatus(w http.ResponseWriter, r *http.Request) {
+ since := time.Now().Add(-24 * time.Hour)
+ if v := r.URL.Query().Get("since"); v != "" {
+ if t, err := time.Parse(time.RFC3339, v); err == nil {
+ since = t
+ }
+ }
+
+ events, err := s.store.ListAgentEvents(since)
+ if err != nil {
+ writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
+ return
+ }
+ if events == nil {
+ events = []storage.AgentEvent{}
+ }
+
+ writeJSON(w, http.StatusOK, map[string]interface{}{
+ "agents": s.pool.AgentStatuses(),
+ "events": events,
+ })
+}
+
// tailLogFile reads the last n lines from the file at path.
func tailLogFile(path string, n int) (string, error) {
data, err := os.ReadFile(path)
diff --git a/internal/api/server.go b/internal/api/server.go
index e5d0ba6..2d5c308 100644
--- a/internal/api/server.go
+++ b/internal/api/server.go
@@ -121,6 +121,7 @@ func (s *Server) routes() {
s.mux.HandleFunc("GET /api/tasks/{id}/subtasks", s.handleListSubtasks)
s.mux.HandleFunc("GET /api/tasks/{id}/executions", s.handleListExecutions)
s.mux.HandleFunc("GET /api/executions", s.handleListRecentExecutions)
+ s.mux.HandleFunc("GET /api/agents/status", s.handleGetAgentStatus)
s.mux.HandleFunc("GET /api/executions/{id}", s.handleGetExecution)
s.mux.HandleFunc("GET /api/executions/{id}/log", s.handleGetExecutionLog)
s.mux.HandleFunc("GET /api/tasks/{id}/logs/stream", s.handleStreamTaskLogs)
diff --git a/internal/executor/container.go b/internal/executor/container.go
index c43e201..ba0c03a 100644
--- a/internal/executor/container.go
+++ b/internal/executor/container.go
@@ -290,6 +290,12 @@ func (r *ContainerRunner) Run(ctx context.Context, t *task.Task, e *storage.Exec
}
if waitErr != nil {
+ // Append the tail of stderr so error classifiers (isQuotaExhausted, isRateLimitError)
+ // can inspect agent-specific messages (e.g. Gemini TerminalQuotaError).
+ stderrTail := readFileTail(e.StderrPath, 4096)
+ if stderrTail != "" {
+ return fmt.Errorf("container execution failed: %w\n%s", waitErr, stderrTail)
+ }
return fmt.Errorf("container execution failed: %w", waitErr)
}
if streamErr != nil {
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index 219a40b..1f2c27d 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -30,6 +30,7 @@ type Store interface {
AppendTaskInteraction(taskID string, interaction task.Interaction) error
UpdateTaskAgent(id string, agent task.AgentConfig) error
UpdateExecutionChangestats(execID string, stats *task.Changestats) error
+ RecordAgentEvent(e storage.AgentEvent) error
}
// LogPather is an optional interface runners can implement to provide the log
@@ -273,16 +274,32 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.
if isRateLimitError(err) || isQuotaExhausted(err) {
p.mu.Lock()
retryAfter := parseRetryAfter(err.Error())
- if retryAfter == 0 {
- if isQuotaExhausted(err) {
+ reason := "transient"
+ if isQuotaExhausted(err) {
+ reason = "quota"
+ if retryAfter == 0 {
retryAfter = 5 * time.Hour
- } else {
- retryAfter = 1 * time.Minute
}
+ } else if retryAfter == 0 {
+ retryAfter = 1 * time.Minute
}
- p.rateLimited[agentType] = time.Now().Add(retryAfter)
+ until := time.Now().Add(retryAfter)
+ p.rateLimited[agentType] = until
p.logger.Info("agent rate limited", "agent", agentType, "retryAfter", retryAfter, "quotaExhausted", isQuotaExhausted(err))
p.mu.Unlock()
+ go func() {
+ ev := storage.AgentEvent{
+ ID: uuid.New().String(),
+ Agent: agentType,
+ Event: "rate_limited",
+ Timestamp: time.Now(),
+ Until: &until,
+ Reason: reason,
+ }
+ if recErr := p.store.RecordAgentEvent(ev); recErr != nil {
+ p.logger.Warn("failed to record agent event", "error", recErr)
+ }
+ }()
}
var blockedErr *BlockedError
@@ -382,6 +399,34 @@ func (p *Pool) ActiveCount() int {
return p.active
}
+// AgentStatusInfo holds the current state of a single agent.
+type AgentStatusInfo struct {
+ Agent string `json:"agent"`
+ ActiveTasks int `json:"active_tasks"`
+ RateLimited bool `json:"rate_limited"`
+ Until *time.Time `json:"until,omitempty"`
+}
+
+// AgentStatuses returns the current status of all registered agents.
+func (p *Pool) AgentStatuses() []AgentStatusInfo {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ now := time.Now()
+ var out []AgentStatusInfo
+ for agent := range p.runners {
+ info := AgentStatusInfo{
+ Agent: agent,
+ ActiveTasks: p.activePerAgent[agent],
+ }
+ if deadline, ok := p.rateLimited[agent]; ok && now.Before(deadline) {
+ info.RateLimited = true
+ info.Until = &deadline
+ }
+ out = append(out, info)
+ }
+ return out
+}
+
// pickAgent selects the best agent from the given SystemStatus using explicit
// load balancing: prefer the available (non-rate-limited) agent with the fewest
// active tasks. If all agents are rate-limited, fall back to fewest active.
@@ -423,6 +468,18 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
activeTasks[agent] = p.activePerAgent[agent]
if deadline, ok := p.rateLimited[agent]; ok && now.After(deadline) {
delete(p.rateLimited, agent)
+ agentName := agent
+ go func() {
+ ev := storage.AgentEvent{
+ ID: uuid.New().String(),
+ Agent: agentName,
+ Event: "available",
+ Timestamp: time.Now(),
+ }
+ if recErr := p.store.RecordAgentEvent(ev); recErr != nil {
+ p.logger.Warn("failed to record agent available event", "error", recErr)
+ }
+ }()
}
rateLimited[agent] = now.Before(p.rateLimited[agent])
}
diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go
index e91d435..91d0137 100644
--- a/internal/executor/executor_test.go
+++ b/internal/executor/executor_test.go
@@ -1056,6 +1056,7 @@ func (m *minimalMockStore) UpdateExecutionChangestats(execID string, stats *task
m.mu.Unlock()
return nil
}
+func (m *minimalMockStore) RecordAgentEvent(_ storage.AgentEvent) error { return nil }
func (m *minimalMockStore) lastStateUpdate() (string, task.State, bool) {
m.mu.Lock()
diff --git a/internal/executor/helpers.go b/internal/executor/helpers.go
index 9e4530b..aee7da0 100644
--- a/internal/executor/helpers.go
+++ b/internal/executor/helpers.go
@@ -143,6 +143,30 @@ func tailFile(path string, n int) string {
return strings.Join(lines, "\n")
}
+// readFileTail returns the last maxBytes bytes of the file at path as a string,
+// or empty string if the file cannot be read. Used to surface agent stderr on failure.
+func readFileTail(path string, maxBytes int64) string {
+ f, err := os.Open(path)
+ if err != nil {
+ return ""
+ }
+ defer f.Close()
+ fi, err := f.Stat()
+ if err != nil {
+ return ""
+ }
+ offset := fi.Size() - maxBytes
+ if offset < 0 {
+ offset = 0
+ }
+ buf := make([]byte, fi.Size()-offset)
+ n, err := f.ReadAt(buf, offset)
+ if err != nil && n == 0 {
+ return ""
+ }
+ return strings.TrimSpace(string(buf[:n]))
+}
+
func gitSafe(args ...string) []string {
return append([]string{"-c", "safe.directory=*"}, args...)
}
diff --git a/internal/executor/ratelimit.go b/internal/executor/ratelimit.go
index 1f38a6d..c916291 100644
--- a/internal/executor/ratelimit.go
+++ b/internal/executor/ratelimit.go
@@ -37,7 +37,11 @@ func isQuotaExhausted(err error) bool {
strings.Contains(msg, "you've hit your limit") ||
strings.Contains(msg, "you have hit your limit") ||
strings.Contains(msg, "rate limit reached (rejected)") ||
- strings.Contains(msg, "status: rejected")
+ strings.Contains(msg, "status: rejected") ||
+ // Gemini CLI quota exhaustion
+ strings.Contains(msg, "terminalquotaerror") ||
+ strings.Contains(msg, "exhausted your daily quota") ||
+ strings.Contains(msg, "generate_content_free_tier_requests")
}
// parseRetryAfter extracts a Retry-After duration from an error message.
diff --git a/internal/executor/ratelimit_test.go b/internal/executor/ratelimit_test.go
index f45216f..1434810 100644
--- a/internal/executor/ratelimit_test.go
+++ b/internal/executor/ratelimit_test.go
@@ -77,6 +77,36 @@ func TestParseRetryAfter_NoRetryInfo(t *testing.T) {
}
}
+// --- isQuotaExhausted tests ---
+
+func TestIsQuotaExhausted_GeminiDailyQuota(t *testing.T) {
+ err := errors.New("container execution failed: exit status 1\nTerminalQuotaError: You have exhausted your daily quota on this model.")
+ if !isQuotaExhausted(err) {
+ t.Error("want true for Gemini TerminalQuotaError, got false")
+ }
+}
+
+func TestIsQuotaExhausted_GeminiExhaustedMessage(t *testing.T) {
+ err := errors.New("container execution failed: exit status 1\nyou have exhausted your daily quota")
+ if !isQuotaExhausted(err) {
+ t.Error("want true for 'exhausted your daily quota', got false")
+ }
+}
+
+func TestIsQuotaExhausted_GeminiQuotaExceeded(t *testing.T) {
+ err := errors.New("container execution failed: exit status 1\nQuota exceeded for metric: generativelanguage.googleapis.com/generate_content_free_tier_requests")
+ if !isQuotaExhausted(err) {
+ t.Error("want true for Gemini free tier quota exceeded, got false")
+ }
+}
+
+func TestIsQuotaExhausted_NotQuota(t *testing.T) {
+ err := errors.New("container execution failed: exit status 1")
+ if isQuotaExhausted(err) {
+ t.Error("want false for generic exit status 1, got true")
+ }
+}
+
// --- runWithBackoff tests ---
func TestRunWithBackoff_SuccessOnFirstTry(t *testing.T) {
diff --git a/internal/storage/db.go b/internal/storage/db.go
index 8bc9864..0d11b4e 100644
--- a/internal/storage/db.go
+++ b/internal/storage/db.go
@@ -99,6 +99,16 @@ func (s *DB) migrate() error {
key TEXT PRIMARY KEY,
value TEXT NOT NULL
)`,
+ `CREATE TABLE IF NOT EXISTS agent_events (
+ id TEXT PRIMARY KEY,
+ agent TEXT NOT NULL,
+ event TEXT NOT NULL,
+ timestamp DATETIME NOT NULL,
+ until DATETIME,
+ reason TEXT
+ )`,
+ `CREATE INDEX IF NOT EXISTS idx_agent_events_agent ON agent_events(agent)`,
+ `CREATE INDEX IF NOT EXISTS idx_agent_events_timestamp ON agent_events(timestamp)`,
}
for _, m := range migrations {
if _, err := s.db.Exec(m); err != nil {
@@ -858,3 +868,56 @@ func (s *DB) SetSetting(key, value string) error {
_, err := s.db.Exec(`INSERT INTO settings (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value = excluded.value`, key, value)
return err
}
+
+// AgentEvent records a rate-limit state change for an agent.
+type AgentEvent struct {
+ ID string
+ Agent string
+ Event string // "rate_limited" | "available"
+ Timestamp time.Time
+ Until *time.Time // non-nil for "rate_limited" events
+ Reason string // "transient" | "quota"
+}
+
+// RecordAgentEvent inserts an agent rate-limit event.
+func (s *DB) RecordAgentEvent(e AgentEvent) error {
+ _, err := s.db.Exec(
+ `INSERT INTO agent_events (id, agent, event, timestamp, until, reason) VALUES (?, ?, ?, ?, ?, ?)`,
+ e.ID, e.Agent, e.Event, e.Timestamp.UTC(), timeOrNull(e.Until), e.Reason,
+ )
+ return err
+}
+
+// ListAgentEvents returns agent events since the given time, newest first.
+func (s *DB) ListAgentEvents(since time.Time) ([]AgentEvent, error) {
+ rows, err := s.db.Query(
+ `SELECT id, agent, event, timestamp, until, reason FROM agent_events WHERE timestamp >= ? ORDER BY timestamp DESC LIMIT 500`,
+ since.UTC(),
+ )
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close()
+ var events []AgentEvent
+ for rows.Next() {
+ var e AgentEvent
+ var until sql.NullTime
+ var reason sql.NullString
+ if err := rows.Scan(&e.ID, &e.Agent, &e.Event, &e.Timestamp, &until, &reason); err != nil {
+ return nil, err
+ }
+ if until.Valid {
+ e.Until = &until.Time
+ }
+ e.Reason = reason.String
+ events = append(events, e)
+ }
+ return events, rows.Err()
+}
+
+func timeOrNull(t *time.Time) interface{} {
+ if t == nil {
+ return nil
+ }
+ return t.UTC()
+}