diff options
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/api/server.go | 4 | ||||
| -rw-r--r-- | internal/api/websocket.go | 37 | ||||
| -rw-r--r-- | internal/api/websocket_test.go | 5 | ||||
| -rw-r--r-- | internal/executor/classifier.go | 32 | ||||
| -rw-r--r-- | internal/executor/classifier_test.go | 2 | ||||
| -rw-r--r-- | internal/executor/claude.go | 28 | ||||
| -rw-r--r-- | internal/executor/claude_test.go | 273 | ||||
| -rw-r--r-- | internal/executor/executor.go | 77 | ||||
| -rw-r--r-- | internal/executor/executor_test.go | 63 | ||||
| -rw-r--r-- | internal/storage/db.go | 26 | ||||
| -rw-r--r-- | internal/storage/db_test.go | 35 | ||||
| -rw-r--r-- | internal/task/task.go | 37 | ||||
| -rw-r--r-- | internal/task/task_test.go | 1 |
13 files changed, 517 insertions, 103 deletions
diff --git a/internal/api/server.go b/internal/api/server.go index 944e450..c545253 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -260,12 +260,14 @@ func (s *Server) handleAnswerQuestion(w http.ResponseWriter, r *http.Request) { return } - // Submit a resume execution. + // Submit a resume execution. Carry the sandbox path so the runner uses + // the same working directory where Claude stored its session files. resumeExec := &storage.Execution{ ID: uuid.New().String(), TaskID: taskID, ResumeSessionID: latest.SessionID, ResumeAnswer: input.Answer, + SandboxDir: latest.SandboxDir, } if err := s.pool.SubmitResume(context.Background(), tk, resumeExec); err != nil { writeJSON(w, http.StatusServiceUnavailable, map[string]string{"error": err.Error()}) diff --git a/internal/api/websocket.go b/internal/api/websocket.go index b5bf728..25522dc 100644 --- a/internal/api/websocket.go +++ b/internal/api/websocket.go @@ -16,33 +16,40 @@ import ( var ( wsPingInterval = 30 * time.Second wsPingDeadline = 10 * time.Second - - // maxWsClients caps the number of concurrent WebSocket connections. - // Exposed as a var so tests can override it. - maxWsClients = 1000 ) +const defaultMaxWsClients = 1000 + // Hub manages WebSocket connections and broadcasts messages. type Hub struct { - mu sync.RWMutex - clients map[*websocket.Conn]bool - logger *slog.Logger + mu sync.RWMutex + clients map[*websocket.Conn]bool + maxClients int + logger *slog.Logger } func NewHub() *Hub { return &Hub{ - clients: make(map[*websocket.Conn]bool), - logger: slog.Default(), + clients: make(map[*websocket.Conn]bool), + maxClients: defaultMaxWsClients, + logger: slog.Default(), } } // Run is a no-op loop kept for future cleanup/heartbeat logic. func (h *Hub) Run() {} +// SetMaxClients configures the maximum number of concurrent WebSocket clients. +func (h *Hub) SetMaxClients(n int) { + h.mu.Lock() + h.maxClients = n + h.mu.Unlock() +} + func (h *Hub) Register(ws *websocket.Conn) error { h.mu.Lock() defer h.mu.Unlock() - if len(h.clients) >= maxWsClients { + if len(h.clients) >= h.maxClients { return errors.New("max WebSocket clients reached") } h.clients[ws] = true @@ -74,7 +81,10 @@ func (h *Hub) ClientCount() int { } func (s *Server) handleWebSocket(w http.ResponseWriter, r *http.Request) { - if s.hub.ClientCount() >= maxWsClients { + s.hub.mu.RLock() + atCap := len(s.hub.clients) >= s.hub.maxClients + s.hub.mu.RUnlock() + if atCap { http.Error(w, "too many connections", http.StatusServiceUnavailable) return } @@ -104,15 +114,16 @@ func (s *Server) handleWebSocket(w http.ResponseWriter, r *http.Request) { // causing the read loop below to exit and unregister the client. done := make(chan struct{}) defer close(done) + pingInterval, pingDeadline := wsPingInterval, wsPingDeadline // capture before goroutine starts go func() { - ticker := time.NewTicker(wsPingInterval) + ticker := time.NewTicker(pingInterval) defer ticker.Stop() for { select { case <-done: return case <-ticker.C: - ws.SetWriteDeadline(time.Now().Add(wsPingDeadline)) + ws.SetWriteDeadline(time.Now().Add(pingDeadline)) err := websocket.Message.Send(ws, "ping") ws.SetWriteDeadline(time.Time{}) if err != nil { diff --git a/internal/api/websocket_test.go b/internal/api/websocket_test.go index 72b83f2..682a555 100644 --- a/internal/api/websocket_test.go +++ b/internal/api/websocket_test.go @@ -99,11 +99,8 @@ func TestWebSocket_NoTokenConfigured(t *testing.T) { // TestWebSocket_RejectsConnectionWhenAtMaxClients verifies that when the hub // is at capacity, new WebSocket upgrade requests are rejected with 503. func TestWebSocket_RejectsConnectionWhenAtMaxClients(t *testing.T) { - orig := maxWsClients - maxWsClients = 0 // immediately at capacity - t.Cleanup(func() { maxWsClients = orig }) - srv, _ := testServer(t) + srv.hub.SetMaxClients(0) // immediately at capacity srv.StartHub() req := httptest.NewRequest("GET", "/api/ws", nil) diff --git a/internal/executor/classifier.go b/internal/executor/classifier.go index efd2acb..7a474b6 100644 --- a/internal/executor/classifier.go +++ b/internal/executor/classifier.go @@ -24,12 +24,10 @@ type Classifier struct { } const classificationPrompt = ` -You are a task classifier for Claudomator. -Given a task description and system status, select the best agent (claude or gemini) and model to use. +You are a model selector for Claudomator. +The agent has already been chosen by the load balancer. Your ONLY job is to select the best model for that agent. -Agent Types: -- claude: Best for complex coding, reasoning, and tool use. -- gemini: Best for large context, fast reasoning, and multimodal tasks. +REQUIRED agent: %s Available Models: Claude: @@ -38,38 +36,30 @@ Claude: - claude-haiku-4-5-20251001 (fast, cheap, use for simple tasks) Gemini: -- gemini-2.5-flash-lite (fastest, most efficient, best for simple tasks) +- gemini-2.5-flash-lite (fastest, most efficient, best for simple/trivial tasks) - gemini-2.5-flash (fast, balanced) -- gemini-2.5-pro (most powerful Gemini, larger context) +- gemini-2.5-pro (most powerful, use for hardest tasks only) Selection Criteria: -- Agent: CRITICAL: You MUST select an agent where "Rate Limited: false". DO NOT select an agent where "Rate Limited: true" if any other agent is available and NOT rate limited. - Check the "System Status" section below. If it says "- Agent claude: ... Rate Limited: true", you MUST NOT select claude. Use gemini instead. -- Model: Select based on task complexity. Use powerful models (opus, pro, pro-preview) for complex reasoning/coding, flash-lite/flash/haiku for simple tasks. +- Use powerful models (opus, pro) only for the hardest reasoning/coding tasks. +- Use lite/haiku for simple, short, or low-stakes tasks. +- Default to the balanced model (sonnet, flash) for everything else. Task: Name: %s Instructions: %s -System Status: -%s - Respond with ONLY a JSON object: { - "agent_type": "claude" | "gemini", + "agent_type": "%s", "model": "model-name", "reason": "brief reason" } ` -func (c *Classifier) Classify(ctx context.Context, taskName, instructions string, status SystemStatus) (*Classification, error) { - statusStr := "" - for agent, active := range status.ActiveTasks { - statusStr += fmt.Sprintf("- Agent %s: %d active tasks, Rate Limited: %t\n", agent, active, status.RateLimited[agent]) - } - +func (c *Classifier) Classify(ctx context.Context, taskName, instructions string, _ SystemStatus, agentType string) (*Classification, error) { prompt := fmt.Sprintf(classificationPrompt, - taskName, instructions, statusStr, + agentType, taskName, instructions, agentType, ) binary := c.GeminiBinaryPath diff --git a/internal/executor/classifier_test.go b/internal/executor/classifier_test.go index 631952f..83a9743 100644 --- a/internal/executor/classifier_test.go +++ b/internal/executor/classifier_test.go @@ -23,7 +23,7 @@ echo '{"response": "{\"agent_type\": \"gemini\", \"model\": \"gemini-2.5-flash-l RateLimited: map[string]bool{"claude": false, "gemini": false}, } - cls, err := c.Classify(context.Background(), "Test Task", "Test Instructions", status) + cls, err := c.Classify(context.Background(), "Test Task", "Test Instructions", status, "gemini") if err != nil { t.Fatalf("Classify failed: %v", err) } diff --git a/internal/executor/claude.go b/internal/executor/claude.go index db4d0fa..0e29f7f 100644 --- a/internal/executor/claude.go +++ b/internal/executor/claude.go @@ -32,6 +32,7 @@ type ClaudeRunner struct { type BlockedError struct { QuestionJSON string // raw JSON from the question file SessionID string // claude session to resume once the user answers + SandboxDir string // preserved sandbox path; resume must run here so Claude finds its session files } func (e *BlockedError) Error() string { return fmt.Sprintf("task blocked: %s", e.QuestionJSON) } @@ -85,16 +86,29 @@ func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Executi } // Pre-assign session ID so we can resume after a BLOCKED state. - // If this is a resume execution the session ID is already set. + // For resume executions, the claude session continues under the original + // session ID (the one passed to --resume). Using the new exec's own UUID + // would cause a second block-and-resume cycle to pass the wrong --resume + // argument. if e.SessionID == "" { - e.SessionID = e.ID // reuse execution UUID as session UUID (both are UUIDs) + if e.ResumeSessionID != "" { + e.SessionID = e.ResumeSessionID + } else { + e.SessionID = e.ID // reuse execution UUID as session UUID (both are UUIDs) + } } // For new (non-resume) executions with a project_dir, clone into a sandbox. - // Resume executions run directly in project_dir to pick up the previous session. + // Resume executions run in the preserved sandbox (e.SandboxDir) so Claude + // finds its session files under the same project slug. If no sandbox was + // preserved (e.g. task had no project_dir), fall back to project_dir. var sandboxDir string effectiveWorkingDir := projectDir - if projectDir != "" && e.ResumeSessionID == "" { + if e.ResumeSessionID != "" { + if e.SandboxDir != "" { + effectiveWorkingDir = e.SandboxDir + } + } else if projectDir != "" { var err error sandboxDir, err = setupSandbox(projectDir) if err != nil { @@ -130,8 +144,10 @@ func (r *ClaudeRunner) Run(ctx context.Context, t *task.Task, e *storage.Executi data, readErr := os.ReadFile(questionFile) if readErr == nil { os.Remove(questionFile) // consumed - // Preserve sandbox on BLOCKED — agent may have partial work. - return &BlockedError{QuestionJSON: strings.TrimSpace(string(data)), SessionID: e.SessionID} + // Preserve sandbox on BLOCKED — agent may have partial work and its + // Claude session files are stored under the sandbox's project slug. + // The resume execution must run in the same directory. + return &BlockedError{QuestionJSON: strings.TrimSpace(string(data)), SessionID: e.SessionID, SandboxDir: sandboxDir} } // Merge sandbox back to project_dir and clean up. diff --git a/internal/executor/claude_test.go b/internal/executor/claude_test.go index 1f95b4a..b5f7962 100644 --- a/internal/executor/claude_test.go +++ b/internal/executor/claude_test.go @@ -2,8 +2,11 @@ package executor import ( "context" + "errors" "io" "log/slog" + "os" + "os/exec" "path/filepath" "runtime" "strings" @@ -227,6 +230,42 @@ func TestClaudeRunner_BuildArgs_PreambleBashNotDuplicated(t *testing.T) { } } +// TestClaudeRunner_Run_ResumeSetsSessionIDFromResumeSession verifies that when a +// resume execution is itself blocked again, the stored SessionID is the original +// resumed session, not the new execution's own UUID. Without this, a second +// block-and-resume cycle passes the wrong --resume session ID and fails. +func TestClaudeRunner_Run_ResumeSetsSessionIDFromResumeSession(t *testing.T) { + logDir := t.TempDir() + r := &ClaudeRunner{ + BinaryPath: "true", // exits 0, no output + Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + LogDir: logDir, + } + tk := &task.Task{ + Agent: task.AgentConfig{ + Type: "claude", + Instructions: "continue", + SkipPlanning: true, + }, + } + exec := &storage.Execution{ + ID: "resume-exec-uuid", + TaskID: "task-1", + ResumeSessionID: "original-session-uuid", + ResumeAnswer: "yes", + } + + // Run completes successfully (binary is "true"). + _ = r.Run(context.Background(), tk, exec) + + // SessionID must be the original session (ResumeSessionID), not the new + // exec's own ID. If it were exec.ID, a second blocked-then-resumed cycle + // would use the wrong --resume argument and fail. + if exec.SessionID != "original-session-uuid" { + t.Errorf("SessionID after resume Run: want %q, got %q", "original-session-uuid", exec.SessionID) + } +} + func TestClaudeRunner_Run_InaccessibleWorkingDir_ReturnsError(t *testing.T) { r := &ClaudeRunner{ BinaryPath: "true", // would succeed if it ran @@ -305,3 +344,237 @@ func TestExecOnce_NoGoroutineLeak_OnNaturalExit(t *testing.T) { baseline, after, after-baseline) } } + +// initGitRepo creates a git repo in dir with one commit so it is clonable. +func initGitRepo(t *testing.T, dir string) { + t.Helper() + cmds := [][]string{ + {"git", "-C", dir, "init"}, + {"git", "-C", dir, "config", "user.email", "test@test"}, + {"git", "-C", dir, "config", "user.name", "test"}, + {"git", "-C", dir, "commit", "--allow-empty", "-m", "init"}, + } + for _, args := range cmds { + if out, err := exec.Command(args[0], args[1:]...).CombinedOutput(); err != nil { + t.Fatalf("%v: %v\n%s", args, err, out) + } + } +} + +func TestSandboxCloneSource_PrefersLocalRemote(t *testing.T) { + dir := t.TempDir() + initGitRepo(t, dir) + // Add a "local" remote pointing to a bare repo. + bare := t.TempDir() + exec.Command("git", "init", "--bare", bare).Run() + exec.Command("git", "-C", dir, "remote", "add", "local", bare).Run() + exec.Command("git", "-C", dir, "remote", "add", "origin", "https://example.com/repo").Run() + + got := sandboxCloneSource(dir) + if got != bare { + t.Errorf("expected bare repo path %q, got %q", bare, got) + } +} + +func TestSandboxCloneSource_FallsBackToOrigin(t *testing.T) { + dir := t.TempDir() + initGitRepo(t, dir) + originURL := "https://example.com/origin-repo" + exec.Command("git", "-C", dir, "remote", "add", "origin", originURL).Run() + + got := sandboxCloneSource(dir) + if got != originURL { + t.Errorf("expected origin URL %q, got %q", originURL, got) + } +} + +func TestSandboxCloneSource_FallsBackToProjectDir(t *testing.T) { + dir := t.TempDir() + initGitRepo(t, dir) + // No remotes configured. + got := sandboxCloneSource(dir) + if got != dir { + t.Errorf("expected projectDir %q (no remotes), got %q", dir, got) + } +} + +func TestSetupSandbox_ClonesGitRepo(t *testing.T) { + src := t.TempDir() + initGitRepo(t, src) + + sandbox, err := setupSandbox(src) + if err != nil { + t.Fatalf("setupSandbox: %v", err) + } + t.Cleanup(func() { os.RemoveAll(sandbox) }) + + // Verify sandbox is a git repo with at least one commit. + out, err := exec.Command("git", "-C", sandbox, "log", "--oneline").Output() + if err != nil { + t.Fatalf("git log in sandbox: %v", err) + } + if len(strings.TrimSpace(string(out))) == 0 { + t.Error("expected at least one commit in sandbox, got empty log") + } +} + +func TestSetupSandbox_InitialisesNonGitDir(t *testing.T) { + // A plain directory (not a git repo) should be initialised then cloned. + src := t.TempDir() + + sandbox, err := setupSandbox(src) + if err != nil { + t.Fatalf("setupSandbox on plain dir: %v", err) + } + t.Cleanup(func() { os.RemoveAll(sandbox) }) + + if _, err := os.Stat(filepath.Join(sandbox, ".git")); err != nil { + t.Errorf("sandbox should be a git repo: %v", err) + } +} + +func TestTeardownSandbox_UncommittedChanges_ReturnsError(t *testing.T) { + src := t.TempDir() + initGitRepo(t, src) + sandbox, err := setupSandbox(src) + if err != nil { + t.Fatalf("setupSandbox: %v", err) + } + t.Cleanup(func() { os.RemoveAll(sandbox) }) + + // Leave an uncommitted file in the sandbox. + if err := os.WriteFile(filepath.Join(sandbox, "dirty.txt"), []byte("oops"), 0644); err != nil { + t.Fatal(err) + } + + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + err = teardownSandbox(src, sandbox, logger) + if err == nil { + t.Fatal("expected error for uncommitted changes, got nil") + } + if !strings.Contains(err.Error(), "uncommitted changes") { + t.Errorf("expected 'uncommitted changes' in error, got: %v", err) + } + // Sandbox should be preserved (not removed) on error. + if _, statErr := os.Stat(sandbox); os.IsNotExist(statErr) { + t.Error("sandbox was removed despite error; should be preserved for debugging") + } +} + +func TestTeardownSandbox_CleanSandboxWithNoNewCommits_RemovesSandbox(t *testing.T) { + src := t.TempDir() + initGitRepo(t, src) + sandbox, err := setupSandbox(src) + if err != nil { + t.Fatalf("setupSandbox: %v", err) + } + + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + // Sandbox has no new commits beyond origin; teardown should succeed and remove it. + if err := teardownSandbox(src, sandbox, logger); err != nil { + t.Fatalf("teardownSandbox: %v", err) + } + if _, statErr := os.Stat(sandbox); !os.IsNotExist(statErr) { + t.Error("sandbox should have been removed after clean teardown") + os.RemoveAll(sandbox) + } +} + +// TestBlockedError_IncludesSandboxDir verifies that when a task is blocked in a +// sandbox, the BlockedError carries the sandbox path so the resume execution can +// run in the same directory (where Claude's session files are stored). +func TestBlockedError_IncludesSandboxDir(t *testing.T) { + src := t.TempDir() + initGitRepo(t, src) + + logDir := t.TempDir() + + // Use a script that writes question.json to the env-var path and exits 0 + // (simulating a blocked agent that asks a question before exiting). + scriptPath := filepath.Join(t.TempDir(), "fake-claude.sh") + if err := os.WriteFile(scriptPath, []byte(`#!/bin/sh +if [ -n "$CLAUDOMATOR_QUESTION_FILE" ]; then + printf '{"question":"continue?"}' > "$CLAUDOMATOR_QUESTION_FILE" +fi +`), 0755); err != nil { + t.Fatalf("write script: %v", err) + } + + r := &ClaudeRunner{ + BinaryPath: scriptPath, + Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + LogDir: logDir, + } + tk := &task.Task{ + Agent: task.AgentConfig{ + Type: "claude", + Instructions: "do something", + ProjectDir: src, + SkipPlanning: true, + }, + } + exec := &storage.Execution{ID: "blocked-exec-uuid", TaskID: "task-1"} + + err := r.Run(context.Background(), tk, exec) + + var blocked *BlockedError + if !errors.As(err, &blocked) { + t.Fatalf("expected BlockedError, got: %v", err) + } + if blocked.SandboxDir == "" { + t.Error("BlockedError.SandboxDir should be set when task runs in a sandbox") + } + // Sandbox should still exist (preserved for resume). + if _, statErr := os.Stat(blocked.SandboxDir); os.IsNotExist(statErr) { + t.Error("sandbox directory should be preserved when blocked") + } else { + os.RemoveAll(blocked.SandboxDir) // cleanup + } +} + +// TestClaudeRunner_Run_ResumeUsesStoredSandboxDir verifies that when a resume +// execution has SandboxDir set, the runner uses that directory (not project_dir) +// as the working directory, so Claude finds its session files there. +func TestClaudeRunner_Run_ResumeUsesStoredSandboxDir(t *testing.T) { + logDir := t.TempDir() + sandboxDir := t.TempDir() + cwdFile := filepath.Join(logDir, "cwd.txt") + + // Use a script that writes its working directory to a file in logDir (stable path). + scriptPath := filepath.Join(t.TempDir(), "fake-claude.sh") + script := "#!/bin/sh\nprintf '%s' \"$PWD\" > " + cwdFile + "\n" + if err := os.WriteFile(scriptPath, []byte(script), 0755); err != nil { + t.Fatalf("write script: %v", err) + } + + r := &ClaudeRunner{ + BinaryPath: scriptPath, + Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + LogDir: logDir, + } + tk := &task.Task{ + Agent: task.AgentConfig{ + Type: "claude", + ProjectDir: sandboxDir, // must exist; resume overrides it with SandboxDir anyway + SkipPlanning: true, + }, + } + exec := &storage.Execution{ + ID: "resume-exec-uuid", + TaskID: "task-1", + ResumeSessionID: "original-session", + ResumeAnswer: "yes", + SandboxDir: sandboxDir, + } + + _ = r.Run(context.Background(), tk, exec) + + got, err := os.ReadFile(cwdFile) + if err != nil { + t.Fatalf("cwd file not written: %v", err) + } + // The runner should have executed claude in sandboxDir, not in project_dir. + if string(got) != sandboxDir { + t.Errorf("resume working dir: want %q, got %q", sandboxDir, string(got)) + } +} diff --git a/internal/executor/executor.go b/internal/executor/executor.go index c04f68e..76c8ac7 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -273,6 +273,7 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. var blockedErr *BlockedError if errors.As(err, &blockedErr) { exec.Status = "BLOCKED" + exec.SandboxDir = blockedErr.SandboxDir // preserve so resume runs in same dir if err := p.store.UpdateTaskState(t.ID, task.StateBlocked); err != nil { p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateBlocked, "error", err) } @@ -343,30 +344,66 @@ func (p *Pool) ActiveCount() int { return p.active } -func (p *Pool) execute(ctx context.Context, t *task.Task) { - // 1. Classification - if p.Classifier != nil { - p.mu.Lock() - activeTasks := make(map[string]int) - rateLimited := make(map[string]bool) - now := time.Now() - for agent := range p.runners { - activeTasks[agent] = p.activePerAgent[agent] - if deadline, ok := p.rateLimited[agent]; ok && now.After(deadline) { - delete(p.rateLimited, agent) - } - rateLimited[agent] = now.Before(p.rateLimited[agent]) +// pickAgent selects the best agent from the given SystemStatus using explicit +// load balancing: prefer the available (non-rate-limited) agent with the fewest +// active tasks. If all agents are rate-limited, fall back to fewest active. +func pickAgent(status SystemStatus) string { + best := "" + bestActive := -1 + + // First pass: only consider non-rate-limited agents. + for agent, active := range status.ActiveTasks { + if status.RateLimited[agent] { + continue } - status := SystemStatus{ - ActiveTasks: activeTasks, - RateLimited: rateLimited, + if bestActive == -1 || active < bestActive || (active == bestActive && agent < best) { + best = agent + bestActive = active } - p.mu.Unlock() + } + if best != "" { + return best + } + + // Fallback: all rate-limited — pick least active anyway. + for agent, active := range status.ActiveTasks { + if bestActive == -1 || active < bestActive || (active == bestActive && agent < best) { + best = agent + bestActive = active + } + } + return best +} - cls, err := p.Classifier.Classify(ctx, t.Name, t.Agent.Instructions, status) +func (p *Pool) execute(ctx context.Context, t *task.Task) { + // 1. Load-balanced agent selection + model classification. + p.mu.Lock() + activeTasks := make(map[string]int) + rateLimited := make(map[string]bool) + now := time.Now() + for agent := range p.runners { + activeTasks[agent] = p.activePerAgent[agent] + if deadline, ok := p.rateLimited[agent]; ok && now.After(deadline) { + delete(p.rateLimited, agent) + } + rateLimited[agent] = now.Before(p.rateLimited[agent]) + } + status := SystemStatus{ + ActiveTasks: activeTasks, + RateLimited: rateLimited, + } + p.mu.Unlock() + + // Deterministically pick the agent with fewest active tasks. + selectedAgent := pickAgent(status) + if selectedAgent != "" { + t.Agent.Type = selectedAgent + } + + if p.Classifier != nil { + cls, err := p.Classifier.Classify(ctx, t.Name, t.Agent.Instructions, status, t.Agent.Type) if err == nil { - p.logger.Info("task classified", "taskID", t.ID, "agent", cls.AgentType, "model", cls.Model, "reason", cls.Reason) - t.Agent.Type = cls.AgentType + p.logger.Info("task classified", "taskID", t.ID, "agent", t.Agent.Type, "model", cls.Model, "reason", cls.Reason) t.Agent.Model = cls.Model } else { p.logger.Error("classification failed", "error", err, "taskID", t.ID) diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go index 0935545..9448816 100644 --- a/internal/executor/executor_test.go +++ b/internal/executor/executor_test.go @@ -116,6 +116,48 @@ func makeTask(id string) *task.Task { } } +func TestPickAgent_PrefersLessActiveAgent(t *testing.T) { + status := SystemStatus{ + ActiveTasks: map[string]int{"claude": 3, "gemini": 1}, + RateLimited: map[string]bool{"claude": false, "gemini": false}, + } + if got := pickAgent(status); got != "gemini" { + t.Errorf("expected gemini (fewer active tasks), got %s", got) + } +} + +func TestPickAgent_SkipsRateLimitedAgent(t *testing.T) { + status := SystemStatus{ + ActiveTasks: map[string]int{"claude": 0, "gemini": 5}, + RateLimited: map[string]bool{"claude": true, "gemini": false}, + } + if got := pickAgent(status); got != "gemini" { + t.Errorf("expected gemini (claude rate limited), got %s", got) + } +} + +func TestPickAgent_FallsBackWhenAllRateLimited(t *testing.T) { + status := SystemStatus{ + ActiveTasks: map[string]int{"claude": 2, "gemini": 5}, + RateLimited: map[string]bool{"claude": true, "gemini": true}, + } + // Falls back to least active regardless of rate limit. + if got := pickAgent(status); got != "claude" { + t.Errorf("expected claude (fewer active tasks among all), got %s", got) + } +} + +func TestPickAgent_TieBreakPrefersFirstAlpha(t *testing.T) { + status := SystemStatus{ + ActiveTasks: map[string]int{"claude": 2, "gemini": 2}, + RateLimited: map[string]bool{"claude": false, "gemini": false}, + } + got := pickAgent(status) + if got != "claude" && got != "gemini" { + t.Errorf("unexpected agent %q on tie", got) + } +} + func TestPool_Submit_TopLevel_GoesToReady(t *testing.T) { store := testStore(t) runner := &mockRunner{} @@ -995,13 +1037,17 @@ func TestHandleRunResult_SharedPath(t *testing.T) { }) } -func TestPool_UnsupportedAgent(t *testing.T) { +// TestPool_LoadBalancing_OverridesAgentType verifies that load balancing picks +// from registered runners, overriding any pre-set Agent.Type on the task. +func TestPool_LoadBalancing_OverridesAgentType(t *testing.T) { store := testStore(t) - runners := map[string]Runner{"claude": &mockRunner{}} + runner := &mockRunner{} + runners := map[string]Runner{"claude": runner} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(2, runners, store, logger) - tk := makeTask("bad-agent") + // Task has a non-existent agent type; load balancing should route to "claude". + tk := makeTask("lb-override") tk.Agent.Type = "super-ai" store.CreateTask(tk) @@ -1010,13 +1056,10 @@ func TestPool_UnsupportedAgent(t *testing.T) { } result := <-pool.Results() - if result.Err == nil { - t.Fatal("expected error for unsupported agent") - } - if !strings.Contains(result.Err.Error(), "unsupported agent type") { - t.Errorf("expected 'unsupported agent type' in error, got: %v", result.Err) + if result.Err != nil { + t.Fatalf("expected success (load balancing overrides agent type), got: %v", result.Err) } - if result.Execution.Status != "FAILED" { - t.Errorf("status: want FAILED, got %q", result.Execution.Status) + if runner.callCount() != 1 { + t.Errorf("expected claude runner to be called once, got %d", runner.callCount()) } } diff --git a/internal/storage/db.go b/internal/storage/db.go index 01ce902..aaf1e09 100644 --- a/internal/storage/db.go +++ b/internal/storage/db.go @@ -80,6 +80,7 @@ func (s *DB) migrate() error { `ALTER TABLE tasks ADD COLUMN rejection_comment TEXT`, `ALTER TABLE tasks ADD COLUMN question_json TEXT`, `ALTER TABLE executions ADD COLUMN session_id TEXT`, + `ALTER TABLE executions ADD COLUMN sandbox_dir TEXT`, } for _, m := range migrations { if _, err := s.db.Exec(m); err != nil { @@ -236,7 +237,7 @@ func (s *DB) ResetTaskForRetry(id string) (*task.Task, error) { configJSON, _ := json.Marshal(t.Agent) now := time.Now().UTC() - if _, err := tx.Exec(`UPDATE tasks SET state = ?, config_json = ?, updated_at = ? WHERE id = ?`, + if _, err := tx.Exec(`UPDATE tasks SET state = ?, config_json = ?, question_json = NULL, updated_at = ? WHERE id = ?`, string(task.StateQueued), string(configJSON), now, id); err != nil { return nil, err } @@ -348,7 +349,8 @@ type Execution struct { ArtifactDir string CostUSD float64 ErrorMsg string - SessionID string // claude --session-id; persisted for resume + SessionID string // claude --session-id; persisted for resume + SandboxDir string // preserved sandbox path when task is BLOCKED; resume must run here // In-memory only: set when creating a resume execution, not stored in DB. ResumeSessionID string @@ -358,23 +360,23 @@ type Execution struct { // CreateExecution inserts an execution record. func (s *DB) CreateExecution(e *Execution) error { _, err := s.db.Exec(` - INSERT INTO executions (id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + INSERT INTO executions (id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, e.ID, e.TaskID, e.StartTime.UTC(), e.EndTime.UTC(), e.ExitCode, e.Status, - e.StdoutPath, e.StderrPath, e.ArtifactDir, e.CostUSD, e.ErrorMsg, e.SessionID, + e.StdoutPath, e.StderrPath, e.ArtifactDir, e.CostUSD, e.ErrorMsg, e.SessionID, e.SandboxDir, ) return err } // GetExecution retrieves an execution by ID. func (s *DB) GetExecution(id string) (*Execution, error) { - row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id FROM executions WHERE id = ?`, id) + row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir FROM executions WHERE id = ?`, id) return scanExecution(row) } // ListExecutions returns executions for a task. func (s *DB) ListExecutions(taskID string) ([]*Execution, error) { - rows, err := s.db.Query(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id FROM executions WHERE task_id = ? ORDER BY start_time DESC`, taskID) + rows, err := s.db.Query(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir FROM executions WHERE task_id = ? ORDER BY start_time DESC`, taskID) if err != nil { return nil, err } @@ -393,7 +395,7 @@ func (s *DB) ListExecutions(taskID string) ([]*Execution, error) { // GetLatestExecution returns the most recent execution for a task. func (s *DB) GetLatestExecution(taskID string) (*Execution, error) { - row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id FROM executions WHERE task_id = ? ORDER BY start_time DESC LIMIT 1`, taskID) + row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir FROM executions WHERE task_id = ? ORDER BY start_time DESC LIMIT 1`, taskID) return scanExecution(row) } @@ -518,10 +520,10 @@ func (s *DB) UpdateTaskQuestion(taskID, questionJSON string) error { func (s *DB) UpdateExecution(e *Execution) error { _, err := s.db.Exec(` UPDATE executions SET end_time = ?, exit_code = ?, status = ?, cost_usd = ?, error_msg = ?, - stdout_path = ?, stderr_path = ?, artifact_dir = ?, session_id = ? + stdout_path = ?, stderr_path = ?, artifact_dir = ?, session_id = ?, sandbox_dir = ? WHERE id = ?`, e.EndTime.UTC(), e.ExitCode, e.Status, e.CostUSD, e.ErrorMsg, - e.StdoutPath, e.StderrPath, e.ArtifactDir, e.SessionID, e.ID, + e.StdoutPath, e.StderrPath, e.ArtifactDir, e.SessionID, e.SandboxDir, e.ID, ) return err } @@ -576,12 +578,14 @@ func scanTaskRows(rows *sql.Rows) (*task.Task, error) { func scanExecution(row scanner) (*Execution, error) { var e Execution var sessionID sql.NullString + var sandboxDir sql.NullString err := row.Scan(&e.ID, &e.TaskID, &e.StartTime, &e.EndTime, &e.ExitCode, &e.Status, - &e.StdoutPath, &e.StderrPath, &e.ArtifactDir, &e.CostUSD, &e.ErrorMsg, &sessionID) + &e.StdoutPath, &e.StderrPath, &e.ArtifactDir, &e.CostUSD, &e.ErrorMsg, &sessionID, &sandboxDir) if err != nil { return nil, err } e.SessionID = sessionID.String + e.SandboxDir = sandboxDir.String return &e, nil } diff --git a/internal/storage/db_test.go b/internal/storage/db_test.go index d28a4a8..2956be0 100644 --- a/internal/storage/db_test.go +++ b/internal/storage/db_test.go @@ -628,6 +628,41 @@ func TestDeleteTask_DeepSubtaskCascadeAtomic(t *testing.T) { } } +// TestResetTaskForRetry_ClearsQuestionJSON verifies that restarting a BLOCKED +// or FAILED task via ResetTaskForRetry clears any stale question so the frontend +// does not show a stale "waiting for input" prompt. +func TestResetTaskForRetry_ClearsQuestionJSON(t *testing.T) { + db := testDB(t) + now := time.Now().UTC() + tk := makeTestTask("retry-task", now) + tk.State = task.StatePending + db.CreateTask(tk) + + // Transition to BLOCKED with a question. + db.UpdateTaskState("retry-task", task.StateQueued) + db.UpdateTaskState("retry-task", task.StateRunning) + db.UpdateTaskState("retry-task", task.StateBlocked) + db.UpdateTaskQuestion("retry-task", `{"question":"which branch?"}`) + + // Simulate the task failing and being restarted. + db.UpdateTaskState("retry-task", task.StateFailed) + + if _, err := db.ResetTaskForRetry("retry-task"); err != nil { + t.Fatalf("ResetTaskForRetry: %v", err) + } + + got, err := db.GetTask("retry-task") + if err != nil { + t.Fatalf("GetTask: %v", err) + } + if got.QuestionJSON != "" { + t.Errorf("question_json should be cleared after reset, got %q", got.QuestionJSON) + } + if got.State != task.StateQueued { + t.Errorf("state should be QUEUED, got %q", got.State) + } +} + func TestStorage_GetLatestExecution(t *testing.T) { db := testDB(t) now := time.Now().UTC() diff --git a/internal/task/task.go b/internal/task/task.go index c0aa036..9968b15 100644 --- a/internal/task/task.go +++ b/internal/task/task.go @@ -52,10 +52,10 @@ type Task struct { ID string `yaml:"id" json:"id"` ParentTaskID string `yaml:"parent_task_id" json:"parent_task_id"` Name string `yaml:"name" json:"name"` - Description string `yaml:"description" json:"description"` - Agent AgentConfig `yaml:"agent" json:"agent"` - Timeout Duration `yaml:"timeout" json:"timeout"` - Retry RetryConfig `yaml:"retry" json:"retry"` + Description string `yaml:"description" json:"description"` + Agent AgentConfig `yaml:"agent" json:"agent"` + Timeout Duration `yaml:"timeout" json:"timeout"` + Retry RetryConfig `yaml:"retry" json:"retry"` Priority Priority `yaml:"priority" json:"priority"` Tags []string `yaml:"tags" json:"tags"` DependsOn []string `yaml:"depends_on" json:"depends_on"` @@ -93,20 +93,25 @@ type BatchFile struct { Tasks []Task `yaml:"tasks"` } +// validTransitions maps each state to the set of states it may transition into. +// Terminal state COMPLETED has no outgoing edges. +// CANCELLED, FAILED, TIMED_OUT, and BUDGET_EXCEEDED all allow re-entry at QUEUED +// (restart or retry). +var validTransitions = map[State][]State{ + StatePending: {StateQueued, StateCancelled}, + StateQueued: {StateRunning, StateCancelled}, + StateRunning: {StateReady, StateCompleted, StateFailed, StateTimedOut, StateCancelled, StateBudgetExceeded, StateBlocked}, + StateReady: {StateCompleted, StatePending}, + StateFailed: {StateQueued}, // retry + StateTimedOut: {StateQueued}, // retry or resume + StateCancelled: {StateQueued}, // restart + StateBudgetExceeded: {StateQueued}, // retry + StateBlocked: {StateQueued, StateReady}, +} + // ValidTransition returns true if moving from the current state to next is allowed. func ValidTransition(from, to State) bool { - transitions := map[State][]State{ - StatePending: {StateQueued, StateCancelled}, - StateQueued: {StateRunning, StateCancelled}, - StateRunning: {StateReady, StateCompleted, StateFailed, StateTimedOut, StateCancelled, StateBudgetExceeded, StateBlocked}, - StateReady: {StateCompleted, StatePending}, - StateFailed: {StateQueued}, // retry - StateTimedOut: {StateQueued}, // retry - StateCancelled: {StateQueued}, // restart - StateBudgetExceeded: {StateQueued}, // retry - StateBlocked: {StateQueued, StateReady}, // answer received → re-queue as resume execution - } - for _, allowed := range transitions[from] { + for _, allowed := range validTransitions[from] { if allowed == to { return true } diff --git a/internal/task/task_test.go b/internal/task/task_test.go index 637baf5..9873084 100644 --- a/internal/task/task_test.go +++ b/internal/task/task_test.go @@ -26,6 +26,7 @@ func TestValidTransition_AllowedTransitions(t *testing.T) { {"blocked to queued (answer resume)", StateBlocked, StateQueued}, {"blocked to ready (parent unblocked by subtasks)", StateBlocked, StateReady}, {"budget exceeded to queued (retry)", StateBudgetExceeded, StateQueued}, + {"cancelled to queued (restart)", StateCancelled, StateQueued}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { |
