diff options
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/api/executions.go | 8 | ||||
| -rw-r--r-- | internal/api/server.go | 1 | ||||
| -rw-r--r-- | internal/cli/root.go | 1 | ||||
| -rw-r--r-- | internal/cli/version.go | 18 | ||||
| -rw-r--r-- | internal/executor/executor.go | 34 | ||||
| -rw-r--r-- | internal/executor/executor_test.go | 99 |
6 files changed, 20 insertions, 141 deletions
diff --git a/internal/api/executions.go b/internal/api/executions.go index d39de9f..4d8ba9c 100644 --- a/internal/api/executions.go +++ b/internal/api/executions.go @@ -128,14 +128,6 @@ func (s *Server) handleGetAgentStatus(w http.ResponseWriter, r *http.Request) { }) } -// handleUndrainAgent resets the drain state and failure counter for the given agent type. -// POST /api/pool/agents/{agent}/undrain -func (s *Server) handleUndrainAgent(w http.ResponseWriter, r *http.Request) { - agent := r.PathValue("agent") - s.pool.UndrainingAgent(agent) - w.WriteHeader(http.StatusOK) -} - // tailLogFile reads the last n lines from the file at path. func tailLogFile(path string, n int) (string, error) { data, err := os.ReadFile(path) diff --git a/internal/api/server.go b/internal/api/server.go index be944a3..aec1439 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -127,7 +127,6 @@ func (s *Server) routes() { s.mux.HandleFunc("GET /api/executions", s.handleListRecentExecutions) s.mux.HandleFunc("GET /api/stats", s.handleGetDashboardStats) s.mux.HandleFunc("GET /api/agents/status", s.handleGetAgentStatus) - s.mux.HandleFunc("POST /api/pool/agents/{agent}/undrain", s.handleUndrainAgent) s.mux.HandleFunc("GET /api/executions/{id}", s.handleGetExecution) s.mux.HandleFunc("GET /api/executions/{id}/log", s.handleGetExecutionLog) s.mux.HandleFunc("GET /api/tasks/{id}/logs/stream", s.handleStreamTaskLogs) diff --git a/internal/cli/root.go b/internal/cli/root.go index 5c6184e..e57a9d9 100644 --- a/internal/cli/root.go +++ b/internal/cli/root.go @@ -74,6 +74,7 @@ func NewRootCmd() *cobra.Command { newStartCmd(), newCreateCmd(), newReportCmd(), + newVersionCmd(), ) return cmd diff --git a/internal/cli/version.go b/internal/cli/version.go new file mode 100644 index 0000000..789416a --- /dev/null +++ b/internal/cli/version.go @@ -0,0 +1,18 @@ +package cli + +import ( + "fmt" + + "github.com/thepeterstone/claudomator/internal/version" + "github.com/spf13/cobra" +) + +func newVersionCmd() *cobra.Command { + return &cobra.Command{ + Use: "version", + Short: "Show the version of claudomator", + Run: func(cmd *cobra.Command, args []string) { + fmt.Printf("claudomator version %s\n", version.Version()) + }, + } +} diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 48626ec..a1f29ed 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -75,7 +75,6 @@ type Pool struct { rateLimited map[string]time.Time // agentType -> until cancels map[string]context.CancelFunc // taskID → cancel consecutiveFailures map[string]int // agentType -> count - drained map[string]bool // agentType -> true if halted pending human ack resultCh chan *Result startedCh chan string // task IDs that just transitioned to RUNNING workCh chan workItem // internal bounded queue; Submit enqueues here @@ -109,7 +108,6 @@ func NewPool(maxConcurrent int, runners map[string]Runner, store Store, logger * rateLimited: make(map[string]time.Time), cancels: make(map[string]context.CancelFunc), consecutiveFailures: make(map[string]int), - drained: make(map[string]bool), resultCh: make(chan *Result, maxConcurrent*2), startedCh: make(chan string, maxConcurrent*2), workCh: make(chan workItem, maxConcurrent*10+100), @@ -408,21 +406,7 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. } p.mu.Lock() p.consecutiveFailures[agentType]++ - failures := p.consecutiveFailures[agentType] p.mu.Unlock() - if failures >= 3 { - p.mu.Lock() - p.drained[agentType] = true - p.mu.Unlock() - p.logger.Warn("agent drained after consecutive failures", "agent", agentType, "failures", failures) - questionJSON, _ := json.Marshal(map[string]string{ - "question": fmt.Sprintf("Agent %q has failed %d times in a row (last error: %s). Acknowledge to resume.", agentType, failures, exec.ErrorMsg), - "options": "acknowledge", - }) - if err := p.store.UpdateTaskQuestion(t.ID, string(questionJSON)); err != nil { - p.logger.Error("failed to set drain question", "error", err) - } - } } if t.StoryID != "" && exec.Status == "FAILED" { storyID := t.StoryID @@ -440,7 +424,6 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. } else { p.mu.Lock() p.consecutiveFailures[agentType] = 0 - p.drained[agentType] = false p.mu.Unlock() if t.ParentTaskID == "" { subtasks, subErr := p.store.ListSubtasks(t.ID) @@ -657,14 +640,6 @@ func (p *Pool) checkValidationResult(ctx context.Context, storyID string, taskSt } } -// UndrainingAgent resets the drain state and failure counter for the given agent type. -func (p *Pool) UndrainingAgent(agentType string) { - p.mu.Lock() - defer p.mu.Unlock() - p.drained[agentType] = false - p.consecutiveFailures[agentType] = 0 -} - // ActiveCount returns the number of currently running tasks. func (p *Pool) ActiveCount() int { p.mu.Lock() @@ -678,7 +653,6 @@ type AgentStatusInfo struct { ActiveTasks int `json:"active_tasks"` RateLimited bool `json:"rate_limited"` Until *time.Time `json:"until,omitempty"` - Drained bool `json:"drained"` } // AgentStatuses returns the current status of all registered agents. @@ -691,7 +665,6 @@ func (p *Pool) AgentStatuses() []AgentStatusInfo { info := AgentStatusInfo{ Agent: agent, ActiveTasks: p.activePerAgent[agent], - Drained: p.drained[agent], } if deadline, ok := p.rateLimited[agent]; ok && now.Before(deadline) { info.RateLimited = true @@ -805,16 +778,9 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { } }() - p.mu.Lock() - if p.drained[agentType] { - p.mu.Unlock() - time.AfterFunc(2*time.Minute, func() { p.workCh <- workItem{ctx: ctx, task: t} }) - return - } // Check dependencies before taking the per-agent slot to avoid deadlock: // if a dependent task holds the slot while waiting for its dependency to run, // the dependency can never start (maxPerAgent=1). - p.mu.Unlock() if len(t.DependsOn) > 0 { ready, depErr := p.checkDepsReady(t) if depErr != nil { diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go index 64e3ecb..267d9ca 100644 --- a/internal/executor/executor_test.go +++ b/internal/executor/executor_test.go @@ -1132,7 +1132,6 @@ func newPoolWithMockStore(store Store) *Pool { rateLimited: make(map[string]time.Time), cancels: make(map[string]context.CancelFunc), consecutiveFailures: make(map[string]int), - drained: make(map[string]bool), resultCh: make(chan *Result, 4), workCh: make(chan workItem, 4), doneCh: make(chan struct{}, 2), @@ -1589,55 +1588,6 @@ func TestPool_MaxPerAgent_AllowsDifferentAgents(t *testing.T) { } } -func TestPool_ConsecutiveFailures_DrainAtThree(t *testing.T) { - store := testStore(t) - runner := &mockRunner{err: fmt.Errorf("boom")} - runners := map[string]Runner{"claude": runner} - logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) - pool := NewPool(3, runners, store, logger) - - // Two failures should NOT drain. - for _, id := range []string{"cf-1", "cf-2"} { - tk := makeTask(id) - store.CreateTask(tk) - pool.Submit(context.Background(), tk) - <-pool.Results() - } - pool.mu.Lock() - draineEarly := pool.drained["claude"] - pool.mu.Unlock() - if draineEarly { - t.Error("expected claude NOT drained after only 2 failures") - } - - // Third failure should drain. - tk3 := makeTask("cf-3") - store.CreateTask(tk3) - pool.Submit(context.Background(), tk3) - <-pool.Results() - - pool.mu.Lock() - drained := pool.drained["claude"] - failures := pool.consecutiveFailures["claude"] - pool.mu.Unlock() - - if !drained { - t.Error("expected claude to be drained after 3 consecutive failures") - } - if failures < 3 { - t.Errorf("expected consecutiveFailures >= 3, got %d", failures) - } - - // The third task should have a drain question set. - tk3fetched, err := store.GetTask("cf-3") - if err != nil { - t.Fatalf("GetTask: %v", err) - } - if tk3fetched.QuestionJSON == "" { - t.Error("expected drain question to be set on task after drain") - } -} - func TestPool_ConsecutiveFailures_ResetOnSuccess(t *testing.T) { store := testStore(t) @@ -1668,7 +1618,7 @@ func TestPool_ConsecutiveFailures_ResetOnSuccess(t *testing.T) { t.Errorf("expected 1 failure after first task, got %d", failsBefore) } - // Second task succeeds + // Second task succeeds — counter resets. tk2 := makeTask("rs-2") store.CreateTask(tk2) pool.Submit(context.Background(), tk2) @@ -1676,15 +1626,11 @@ func TestPool_ConsecutiveFailures_ResetOnSuccess(t *testing.T) { pool.mu.Lock() failsAfter := pool.consecutiveFailures["claude"] - isDrained := pool.drained["claude"] pool.mu.Unlock() if failsAfter != 0 { t.Errorf("expected consecutiveFailures reset to 0 after success, got %d", failsAfter) } - if isDrained { - t.Error("expected drained to be false after success") - } } func TestPool_CheckStoryCompletion_AllComplete(t *testing.T) { @@ -1772,49 +1718,6 @@ func TestPool_CheckStoryCompletion_PartialComplete(t *testing.T) { } } -func TestPool_Undrain_ResumesExecution(t *testing.T) { - store := testStore(t) - - // Force drain state - runner := &mockRunner{} - runners := map[string]Runner{"claude": runner} - logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) - pool := NewPool(2, runners, store, logger) - - pool.mu.Lock() - pool.drained["claude"] = true - pool.consecutiveFailures["claude"] = 3 - pool.mu.Unlock() - - // Undrain - pool.UndrainingAgent("claude") - - pool.mu.Lock() - drained := pool.drained["claude"] - failures := pool.consecutiveFailures["claude"] - pool.mu.Unlock() - - if drained { - t.Error("expected drained=false after UndrainingAgent") - } - if failures != 0 { - t.Errorf("expected consecutiveFailures=0 after UndrainingAgent, got %d", failures) - } - - // Verify a task can now run - tk := makeTask("undrain-1") - store.CreateTask(tk) - pool.Submit(context.Background(), tk) - select { - case result := <-pool.Results(): - if result.Err != nil { - t.Errorf("unexpected error after undrain: %v", result.Err) - } - case <-time.After(5 * time.Second): - t.Fatal("timed out waiting for task after undrain") - } -} - func TestPool_StoryDeploy_RunsDeployScript(t *testing.T) { store := testStore(t) runner := &mockRunner{} |
