summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
Diffstat (limited to 'internal')
-rw-r--r--internal/api/executions.go8
-rw-r--r--internal/api/server.go1
-rw-r--r--internal/cli/root.go1
-rw-r--r--internal/cli/version.go18
-rw-r--r--internal/executor/executor.go34
-rw-r--r--internal/executor/executor_test.go99
6 files changed, 20 insertions, 141 deletions
diff --git a/internal/api/executions.go b/internal/api/executions.go
index d39de9f..4d8ba9c 100644
--- a/internal/api/executions.go
+++ b/internal/api/executions.go
@@ -128,14 +128,6 @@ func (s *Server) handleGetAgentStatus(w http.ResponseWriter, r *http.Request) {
})
}
-// handleUndrainAgent resets the drain state and failure counter for the given agent type.
-// POST /api/pool/agents/{agent}/undrain
-func (s *Server) handleUndrainAgent(w http.ResponseWriter, r *http.Request) {
- agent := r.PathValue("agent")
- s.pool.UndrainingAgent(agent)
- w.WriteHeader(http.StatusOK)
-}
-
// tailLogFile reads the last n lines from the file at path.
func tailLogFile(path string, n int) (string, error) {
data, err := os.ReadFile(path)
diff --git a/internal/api/server.go b/internal/api/server.go
index be944a3..aec1439 100644
--- a/internal/api/server.go
+++ b/internal/api/server.go
@@ -127,7 +127,6 @@ func (s *Server) routes() {
s.mux.HandleFunc("GET /api/executions", s.handleListRecentExecutions)
s.mux.HandleFunc("GET /api/stats", s.handleGetDashboardStats)
s.mux.HandleFunc("GET /api/agents/status", s.handleGetAgentStatus)
- s.mux.HandleFunc("POST /api/pool/agents/{agent}/undrain", s.handleUndrainAgent)
s.mux.HandleFunc("GET /api/executions/{id}", s.handleGetExecution)
s.mux.HandleFunc("GET /api/executions/{id}/log", s.handleGetExecutionLog)
s.mux.HandleFunc("GET /api/tasks/{id}/logs/stream", s.handleStreamTaskLogs)
diff --git a/internal/cli/root.go b/internal/cli/root.go
index 5c6184e..e57a9d9 100644
--- a/internal/cli/root.go
+++ b/internal/cli/root.go
@@ -74,6 +74,7 @@ func NewRootCmd() *cobra.Command {
newStartCmd(),
newCreateCmd(),
newReportCmd(),
+ newVersionCmd(),
)
return cmd
diff --git a/internal/cli/version.go b/internal/cli/version.go
new file mode 100644
index 0000000..789416a
--- /dev/null
+++ b/internal/cli/version.go
@@ -0,0 +1,18 @@
+package cli
+
+import (
+ "fmt"
+
+ "github.com/thepeterstone/claudomator/internal/version"
+ "github.com/spf13/cobra"
+)
+
+func newVersionCmd() *cobra.Command {
+ return &cobra.Command{
+ Use: "version",
+ Short: "Show the version of claudomator",
+ Run: func(cmd *cobra.Command, args []string) {
+ fmt.Printf("claudomator version %s\n", version.Version())
+ },
+ }
+}
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index 48626ec..a1f29ed 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -75,7 +75,6 @@ type Pool struct {
rateLimited map[string]time.Time // agentType -> until
cancels map[string]context.CancelFunc // taskID → cancel
consecutiveFailures map[string]int // agentType -> count
- drained map[string]bool // agentType -> true if halted pending human ack
resultCh chan *Result
startedCh chan string // task IDs that just transitioned to RUNNING
workCh chan workItem // internal bounded queue; Submit enqueues here
@@ -109,7 +108,6 @@ func NewPool(maxConcurrent int, runners map[string]Runner, store Store, logger *
rateLimited: make(map[string]time.Time),
cancels: make(map[string]context.CancelFunc),
consecutiveFailures: make(map[string]int),
- drained: make(map[string]bool),
resultCh: make(chan *Result, maxConcurrent*2),
startedCh: make(chan string, maxConcurrent*2),
workCh: make(chan workItem, maxConcurrent*10+100),
@@ -408,21 +406,7 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.
}
p.mu.Lock()
p.consecutiveFailures[agentType]++
- failures := p.consecutiveFailures[agentType]
p.mu.Unlock()
- if failures >= 3 {
- p.mu.Lock()
- p.drained[agentType] = true
- p.mu.Unlock()
- p.logger.Warn("agent drained after consecutive failures", "agent", agentType, "failures", failures)
- questionJSON, _ := json.Marshal(map[string]string{
- "question": fmt.Sprintf("Agent %q has failed %d times in a row (last error: %s). Acknowledge to resume.", agentType, failures, exec.ErrorMsg),
- "options": "acknowledge",
- })
- if err := p.store.UpdateTaskQuestion(t.ID, string(questionJSON)); err != nil {
- p.logger.Error("failed to set drain question", "error", err)
- }
- }
}
if t.StoryID != "" && exec.Status == "FAILED" {
storyID := t.StoryID
@@ -440,7 +424,6 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.
} else {
p.mu.Lock()
p.consecutiveFailures[agentType] = 0
- p.drained[agentType] = false
p.mu.Unlock()
if t.ParentTaskID == "" {
subtasks, subErr := p.store.ListSubtasks(t.ID)
@@ -657,14 +640,6 @@ func (p *Pool) checkValidationResult(ctx context.Context, storyID string, taskSt
}
}
-// UndrainingAgent resets the drain state and failure counter for the given agent type.
-func (p *Pool) UndrainingAgent(agentType string) {
- p.mu.Lock()
- defer p.mu.Unlock()
- p.drained[agentType] = false
- p.consecutiveFailures[agentType] = 0
-}
-
// ActiveCount returns the number of currently running tasks.
func (p *Pool) ActiveCount() int {
p.mu.Lock()
@@ -678,7 +653,6 @@ type AgentStatusInfo struct {
ActiveTasks int `json:"active_tasks"`
RateLimited bool `json:"rate_limited"`
Until *time.Time `json:"until,omitempty"`
- Drained bool `json:"drained"`
}
// AgentStatuses returns the current status of all registered agents.
@@ -691,7 +665,6 @@ func (p *Pool) AgentStatuses() []AgentStatusInfo {
info := AgentStatusInfo{
Agent: agent,
ActiveTasks: p.activePerAgent[agent],
- Drained: p.drained[agent],
}
if deadline, ok := p.rateLimited[agent]; ok && now.Before(deadline) {
info.RateLimited = true
@@ -805,16 +778,9 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
}
}()
- p.mu.Lock()
- if p.drained[agentType] {
- p.mu.Unlock()
- time.AfterFunc(2*time.Minute, func() { p.workCh <- workItem{ctx: ctx, task: t} })
- return
- }
// Check dependencies before taking the per-agent slot to avoid deadlock:
// if a dependent task holds the slot while waiting for its dependency to run,
// the dependency can never start (maxPerAgent=1).
- p.mu.Unlock()
if len(t.DependsOn) > 0 {
ready, depErr := p.checkDepsReady(t)
if depErr != nil {
diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go
index 64e3ecb..267d9ca 100644
--- a/internal/executor/executor_test.go
+++ b/internal/executor/executor_test.go
@@ -1132,7 +1132,6 @@ func newPoolWithMockStore(store Store) *Pool {
rateLimited: make(map[string]time.Time),
cancels: make(map[string]context.CancelFunc),
consecutiveFailures: make(map[string]int),
- drained: make(map[string]bool),
resultCh: make(chan *Result, 4),
workCh: make(chan workItem, 4),
doneCh: make(chan struct{}, 2),
@@ -1589,55 +1588,6 @@ func TestPool_MaxPerAgent_AllowsDifferentAgents(t *testing.T) {
}
}
-func TestPool_ConsecutiveFailures_DrainAtThree(t *testing.T) {
- store := testStore(t)
- runner := &mockRunner{err: fmt.Errorf("boom")}
- runners := map[string]Runner{"claude": runner}
- logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
- pool := NewPool(3, runners, store, logger)
-
- // Two failures should NOT drain.
- for _, id := range []string{"cf-1", "cf-2"} {
- tk := makeTask(id)
- store.CreateTask(tk)
- pool.Submit(context.Background(), tk)
- <-pool.Results()
- }
- pool.mu.Lock()
- draineEarly := pool.drained["claude"]
- pool.mu.Unlock()
- if draineEarly {
- t.Error("expected claude NOT drained after only 2 failures")
- }
-
- // Third failure should drain.
- tk3 := makeTask("cf-3")
- store.CreateTask(tk3)
- pool.Submit(context.Background(), tk3)
- <-pool.Results()
-
- pool.mu.Lock()
- drained := pool.drained["claude"]
- failures := pool.consecutiveFailures["claude"]
- pool.mu.Unlock()
-
- if !drained {
- t.Error("expected claude to be drained after 3 consecutive failures")
- }
- if failures < 3 {
- t.Errorf("expected consecutiveFailures >= 3, got %d", failures)
- }
-
- // The third task should have a drain question set.
- tk3fetched, err := store.GetTask("cf-3")
- if err != nil {
- t.Fatalf("GetTask: %v", err)
- }
- if tk3fetched.QuestionJSON == "" {
- t.Error("expected drain question to be set on task after drain")
- }
-}
-
func TestPool_ConsecutiveFailures_ResetOnSuccess(t *testing.T) {
store := testStore(t)
@@ -1668,7 +1618,7 @@ func TestPool_ConsecutiveFailures_ResetOnSuccess(t *testing.T) {
t.Errorf("expected 1 failure after first task, got %d", failsBefore)
}
- // Second task succeeds
+ // Second task succeeds — counter resets.
tk2 := makeTask("rs-2")
store.CreateTask(tk2)
pool.Submit(context.Background(), tk2)
@@ -1676,15 +1626,11 @@ func TestPool_ConsecutiveFailures_ResetOnSuccess(t *testing.T) {
pool.mu.Lock()
failsAfter := pool.consecutiveFailures["claude"]
- isDrained := pool.drained["claude"]
pool.mu.Unlock()
if failsAfter != 0 {
t.Errorf("expected consecutiveFailures reset to 0 after success, got %d", failsAfter)
}
- if isDrained {
- t.Error("expected drained to be false after success")
- }
}
func TestPool_CheckStoryCompletion_AllComplete(t *testing.T) {
@@ -1772,49 +1718,6 @@ func TestPool_CheckStoryCompletion_PartialComplete(t *testing.T) {
}
}
-func TestPool_Undrain_ResumesExecution(t *testing.T) {
- store := testStore(t)
-
- // Force drain state
- runner := &mockRunner{}
- runners := map[string]Runner{"claude": runner}
- logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
- pool := NewPool(2, runners, store, logger)
-
- pool.mu.Lock()
- pool.drained["claude"] = true
- pool.consecutiveFailures["claude"] = 3
- pool.mu.Unlock()
-
- // Undrain
- pool.UndrainingAgent("claude")
-
- pool.mu.Lock()
- drained := pool.drained["claude"]
- failures := pool.consecutiveFailures["claude"]
- pool.mu.Unlock()
-
- if drained {
- t.Error("expected drained=false after UndrainingAgent")
- }
- if failures != 0 {
- t.Errorf("expected consecutiveFailures=0 after UndrainingAgent, got %d", failures)
- }
-
- // Verify a task can now run
- tk := makeTask("undrain-1")
- store.CreateTask(tk)
- pool.Submit(context.Background(), tk)
- select {
- case result := <-pool.Results():
- if result.Err != nil {
- t.Errorf("unexpected error after undrain: %v", result.Err)
- }
- case <-time.After(5 * time.Second):
- t.Fatal("timed out waiting for task after undrain")
- }
-}
-
func TestPool_StoryDeploy_RunsDeployScript(t *testing.T) {
store := testStore(t)
runner := &mockRunner{}