diff options
Diffstat (limited to 'internal/executor/executor.go')
| -rw-r--r-- | internal/executor/executor.go | 695 |
1 files changed, 614 insertions, 81 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 315030d..09169bd 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -2,9 +2,11 @@ package executor import ( "context" + "encoding/json" "errors" "fmt" "log/slog" + "os/exec" "path/filepath" "strings" "sync" @@ -25,6 +27,7 @@ type Store interface { ListSubtasks(parentID string) ([]*task.Task, error) ListExecutions(taskID string) ([]*storage.Execution, error) CreateExecution(e *storage.Execution) error + CreateExecutionAndSetRunning(e *storage.Execution) error UpdateExecution(e *storage.Execution) error UpdateTaskState(id string, newState task.State) error UpdateTaskQuestion(taskID, questionJSON string) error @@ -32,6 +35,14 @@ type Store interface { AppendTaskInteraction(taskID string, interaction task.Interaction) error UpdateTaskAgent(id string, agent task.AgentConfig) error UpdateExecutionChangestats(execID string, stats *task.Changestats) error + RecordAgentEvent(e storage.AgentEvent) error + GetProject(id string) (*task.Project, error) + GetStory(id string) (*task.Story, error) + ListTasksByStory(storyID string) ([]*task.Task, error) + UpdateStoryStatus(id string, status task.StoryState) error + CreateTask(t *task.Task) error + UpdateTaskCheckerReport(id, report string) error + GetCheckerTask(checkedTaskID string) (*task.Task, error) } // LogPather is an optional interface runners can implement to provide the log @@ -56,24 +67,28 @@ type workItem struct { // Pool manages a bounded set of concurrent task workers. type Pool struct { maxConcurrent int + maxPerAgent int runners map[string]Runner store Store logger *slog.Logger - depPollInterval time.Duration // how often waitForDependencies polls; defaults to 5s - - mu sync.Mutex - active int - activePerAgent map[string]int - rateLimited map[string]time.Time // agentType -> until - cancels map[string]context.CancelFunc // taskID → cancel - resultCh chan *Result - workCh chan workItem // internal bounded queue; Submit enqueues here - doneCh chan struct{} // signals when a worker slot is freed - Questions *QuestionRegistry - Classifier *Classifier - // LLM, when non-nil, enables LLM-synthesized summaries for executions - // whose stdout did not include a "## Summary" heading. - LLM *llm.Client + depPollInterval time.Duration // how often waitForDependencies polls; defaults to 5s + requeueDelay time.Duration // how long to wait before requeuing a blocked-per-agent task; defaults to 30s + + mu sync.Mutex + active int + activePerAgent map[string]int + rateLimited map[string]time.Time // agentType -> until + cancels map[string]context.CancelFunc // taskID → cancel + consecutiveFailures map[string]int // agentType -> count + closed bool // set to true when Shutdown has been called + resultCh chan *Result + startedCh chan string // task IDs that just transitioned to RUNNING + workCh chan workItem // internal bounded queue; Submit enqueues here + doneCh chan struct{} // signals when a worker slot is freed + workerWg sync.WaitGroup // tracks in-flight execute/executeResume goroutines + dispatchDone chan struct{} // closed when the dispatch goroutine exits + Classifier *Classifier + LLM *llm.Client } // Result is emitted when a task execution completes. @@ -88,18 +103,22 @@ func NewPool(maxConcurrent int, runners map[string]Runner, store Store, logger * maxConcurrent = 1 } p := &Pool{ - maxConcurrent: maxConcurrent, - runners: runners, - store: store, - logger: logger, - depPollInterval: 5 * time.Second, - activePerAgent: make(map[string]int), - rateLimited: make(map[string]time.Time), - cancels: make(map[string]context.CancelFunc), - resultCh: make(chan *Result, maxConcurrent*2), - workCh: make(chan workItem, maxConcurrent*10+100), - doneCh: make(chan struct{}, maxConcurrent), - Questions: NewQuestionRegistry(), + maxConcurrent: maxConcurrent, + maxPerAgent: 1, + runners: runners, + store: store, + logger: logger, + depPollInterval: 5 * time.Second, + requeueDelay: 30 * time.Second, + activePerAgent: make(map[string]int), + rateLimited: make(map[string]time.Time), + cancels: make(map[string]context.CancelFunc), + consecutiveFailures: make(map[string]int), + resultCh: make(chan *Result, maxConcurrent*2), + startedCh: make(chan string, maxConcurrent*2), + workCh: make(chan workItem, maxConcurrent*10+100), + doneCh: make(chan struct{}, maxConcurrent), + dispatchDone: make(chan struct{}), } go p.dispatch() return p @@ -109,6 +128,7 @@ func NewPool(maxConcurrent int, runners map[string]Runner, store Store, logger * // and launches goroutines as soon as a pool slot is available. This prevents // tasks from being rejected when the pool is temporarily at capacity. func (p *Pool) dispatch() { + defer close(p.dispatchDone) for item := range p.workCh { for { p.mu.Lock() @@ -116,9 +136,9 @@ func (p *Pool) dispatch() { p.active++ p.mu.Unlock() if item.exec != nil { - go p.executeResume(item.ctx, item.task, item.exec) + p.workerWg.Add(1); go func(i workItem) { defer p.workerWg.Done(); p.executeResume(i.ctx, i.task, i.exec) }(item) } else { - go p.execute(item.ctx, item.task) + p.workerWg.Add(1); go func(i workItem) { defer p.workerWg.Done(); p.execute(i.ctx, i.task) }(item) } break } @@ -132,19 +152,64 @@ func (p *Pool) dispatch() { // work queue is full. When the pool is at capacity the task is buffered and // dispatched as soon as a slot becomes available. func (p *Pool) Submit(ctx context.Context, t *task.Task) error { + p.mu.Lock() + if p.closed { + p.mu.Unlock() + return fmt.Errorf("executor pool is shut down") + } + // Send while holding the lock so that Shutdown cannot close workCh between + // the closed-check above and the send below. The dispatch goroutine never + // holds p.mu while receiving from workCh, so this cannot deadlock. select { case p.workCh <- workItem{ctx: ctx, task: t}: + p.mu.Unlock() return nil default: + p.mu.Unlock() return fmt.Errorf("executor work queue full (capacity %d)", cap(p.workCh)) } } +// Started returns a channel that emits task IDs when they transition to RUNNING. +func (p *Pool) Started() <-chan string { + return p.startedCh +} + // Results returns the channel for reading execution results. func (p *Pool) Results() <-chan *Result { return p.resultCh } +// Shutdown stops accepting new work and waits for all in-flight workers to +// finish. Returns ctx.Err() if the context deadline is exceeded before all +// workers complete. +func (p *Pool) Shutdown(ctx context.Context) error { + // Stop the dispatch goroutine. We must wait for it to exit before calling + // workerWg.Wait() to avoid a race between dispatch's Add(1) and Wait(). + p.mu.Lock() + p.closed = true + p.mu.Unlock() + close(p.workCh) + select { + case <-p.dispatchDone: + case <-ctx.Done(): + return ctx.Err() + } + + done := make(chan struct{}) + go func() { + p.workerWg.Wait() + close(done) + }() + + select { + case <-done: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + // Cancel requests cancellation of a running task. Returns false if the task // is not currently running in this pool. func (p *Pool) Cancel(taskID string) bool { @@ -250,11 +315,12 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex exec.StartTime = time.Now().UTC() exec.Status = "RUNNING" - if err := p.store.CreateExecution(exec); err != nil { + if err := p.store.CreateExecutionAndSetRunning(exec); err != nil { p.logger.Error("failed to create resume execution record", "error", err) } - if err := p.store.UpdateTaskState(t.ID, task.StateRunning); err != nil { - p.logger.Error("failed to update task state", "error", err) + select { + case p.startedCh <- t.ID: + default: } var cancel context.CancelFunc @@ -273,6 +339,19 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex p.mu.Unlock() }() + // Populate RepositoryURL from Project registry if missing (ADR-007). + if t.RepositoryURL == "" && t.Project != "" { + if proj, err := p.store.GetProject(t.Project); err == nil && proj.RemoteURL != "" { + t.RepositoryURL = proj.RemoteURL + } + } + // Populate BranchName from Story if missing (ADR-007). + if t.BranchName == "" && t.StoryID != "" { + if story, err := p.store.GetStory(t.StoryID); err == nil && story.BranchName != "" { + t.BranchName = story.BranchName + } + } + err = runner.Run(ctx, t, exec) exec.EndTime = time.Now().UTC() @@ -289,16 +368,32 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. if retry.IsRateLimitError(err) || isQuotaExhausted(err) { p.mu.Lock() retryAfter := retry.ParseRetryAfter(err.Error()) - if retryAfter == 0 { - if isQuotaExhausted(err) { + reason := "transient" + if isQuotaExhausted(err) { + reason = "quota" + if retryAfter == 0 { retryAfter = 5 * time.Hour - } else { - retryAfter = 1 * time.Minute } + } else if retryAfter == 0 { + retryAfter = 1 * time.Minute } - p.rateLimited[agentType] = time.Now().Add(retryAfter) + until := time.Now().Add(retryAfter) + p.rateLimited[agentType] = until p.logger.Info("agent rate limited", "agent", agentType, "retryAfter", retryAfter, "quotaExhausted", isQuotaExhausted(err)) p.mu.Unlock() + go func() { + ev := storage.AgentEvent{ + ID: uuid.New().String(), + Agent: agentType, + Event: "rate_limited", + Timestamp: time.Now(), + Until: &until, + Reason: reason, + } + if recErr := p.store.RecordAgentEvent(ev); recErr != nil { + p.logger.Warn("failed to record agent event", "error", recErr) + } + }() } var blockedErr *BlockedError @@ -335,9 +430,51 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil { p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateFailed, "error", err) } + p.mu.Lock() + p.consecutiveFailures[agentType]++ + p.mu.Unlock() + } + // If this is a checker task, attach the failure report for any terminal + // failure state (FAILED, TIMED_OUT, CANCELLED, BUDGET_EXCEEDED). + if t.CheckerForTaskID != "" && exec.ErrorMsg != "" { + if reportErr := p.store.UpdateTaskCheckerReport(t.CheckerForTaskID, exec.ErrorMsg); reportErr != nil { + p.logger.Error("handleRunResult: failed to set checker report", "taskID", t.CheckerForTaskID, "error", reportErr) + } + } + if t.StoryID != "" && exec.Status == "FAILED" { + storyID := t.StoryID + errMsg := exec.ErrorMsg + go func() { + story, getErr := p.store.GetStory(storyID) + if getErr != nil { + return + } + if story.Status == task.StoryValidating { + p.checkValidationResult(ctx, storyID, task.StateFailed, errMsg) + } + }() } } else { - if t.ParentTaskID == "" { + p.mu.Lock() + p.consecutiveFailures[agentType] = 0 + p.mu.Unlock() + if t.CheckerForTaskID != "" { + // Checker task succeeded — auto-accept the checked task. + exec.Status = "COMPLETED" + if err := p.store.UpdateTaskState(t.ID, task.StateCompleted); err != nil { + p.logger.Error("handleRunResult: failed to complete checker task", "taskID", t.ID, "error", err) + } + checkedTask, getErr := p.store.GetTask(t.CheckerForTaskID) + if getErr == nil { + if acceptErr := p.store.UpdateTaskState(t.CheckerForTaskID, task.StateCompleted); acceptErr != nil { + p.logger.Error("handleRunResult: failed to auto-accept checked task", "taskID", t.CheckerForTaskID, "error", acceptErr) + } else if checkedTask.StoryID != "" { + go p.checkStoryCompletion(context.Background(), checkedTask.StoryID) + } + } else { + p.logger.Error("handleRunResult: failed to get checked task", "taskID", t.CheckerForTaskID, "error", getErr) + } + } else if t.ParentTaskID == "" { subtasks, subErr := p.store.ListSubtasks(t.ID) if subErr != nil { p.logger.Error("failed to list subtasks", "taskID", t.ID, "error", subErr) @@ -352,6 +489,7 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. if err := p.store.UpdateTaskState(t.ID, task.StateReady); err != nil { p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateReady, "error", err) } + go p.spawnCheckerTask(context.Background(), t) } } else { exec.Status = "COMPLETED" @@ -360,6 +498,21 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. } p.maybeUnblockParent(t.ParentTaskID) } + if t.StoryID != "" { + storyID := t.StoryID + go func() { + story, getErr := p.store.GetStory(storyID) + if getErr != nil { + p.logger.Error("handleRunResult: failed to get story", "storyID", storyID, "error", getErr) + return + } + if story.Status == task.StoryValidating { + p.checkValidationResult(ctx, storyID, task.StateCompleted, "") + } else { + p.checkStoryCompletion(ctx, storyID) + } + }() + } } summary := exec.Summary @@ -374,6 +527,13 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. p.logger.Error("failed to update task summary", "taskID", t.ID, "error", summaryErr) } } + terminalFailure := exec.Status == "FAILED" || exec.Status == "TIMED_OUT" || exec.Status == "CANCELLED" || exec.Status == "BUDGET_EXCEEDED" + if t.CheckerForTaskID != "" && terminalFailure && summary != "" { + // Overwrite the initial error-message report with the richer summary. + if reportErr := p.store.UpdateTaskCheckerReport(t.CheckerForTaskID, summary); reportErr != nil { + p.logger.Error("handleRunResult: failed to update checker report with summary", "taskID", t.CheckerForTaskID, "error", reportErr) + } + } if exec.StdoutPath != "" { if cs := task.ParseChangestatFromFile(exec.StdoutPath); cs != nil { exec.Changestats = cs @@ -388,6 +548,256 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage. p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} } +// checkStoryCompletion checks whether all top-level tasks in a story have reached +// a terminal success state and transitions the story to SHIPPABLE if so. +// Subtasks are intentionally excluded — a parent task reaching READY/COMPLETED +// already accounts for its subtasks. +// CheckStoryCompletion is the exported entry point for story completion checks +// called from outside the package (e.g. the API accept handler). +func (p *Pool) CheckStoryCompletion(ctx context.Context, storyID string) { + p.checkStoryCompletion(ctx, storyID) +} + +func (p *Pool) checkStoryCompletion(ctx context.Context, storyID string) { + story, err := p.store.GetStory(storyID) + if err != nil { + p.logger.Error("checkStoryCompletion: failed to get story", "storyID", storyID, "error", err) + return + } + if story.Status != task.StoryInProgress { + return // already SHIPPABLE or beyond — nothing to do + } + tasks, err := p.store.ListTasksByStory(storyID) + if err != nil { + p.logger.Error("checkStoryCompletion: failed to list tasks", "storyID", storyID, "error", err) + return + } + if len(tasks) == 0 { + return + } + topLevelCount := 0 + for _, t := range tasks { + if t.ParentTaskID != "" { + continue // subtasks are covered by their parent + } + topLevelCount++ + if t.State != task.StateCompleted { + return // not all top-level tasks done; READY alone is not sufficient (checker may be pending) + } + } + if topLevelCount == 0 { + return // no top-level tasks — don't auto-complete + } + if err := p.store.UpdateStoryStatus(storyID, task.StoryShippable); err != nil { + p.logger.Error("checkStoryCompletion: failed to update story status", "storyID", storyID, "error", err) + return + } + p.logger.Info("story transitioned to SHIPPABLE", "storyID", storyID) + // Deploy is now triggered explicitly by the human via POST /api/stories/{id}/ship. +} + +// ShipStory merges the story branch and runs the deploy script. +// Returns an error if the story is not in SHIPPABLE state. +func (p *Pool) ShipStory(ctx context.Context, storyID string) error { + story, err := p.store.GetStory(storyID) + if err != nil { + return fmt.Errorf("story not found: %w", err) + } + if story.Status != task.StoryShippable { + return fmt.Errorf("story is not SHIPPABLE (current status: %s)", story.Status) + } + go p.triggerStoryDeploy(context.Background(), storyID) + return nil +} + +// spawnCheckerTask creates and submits a checker task for the given completed task. +// Guards: not called for subtasks, checker tasks, tasks without a repository URL, +// or tasks that already have a checker. +func (p *Pool) spawnCheckerTask(ctx context.Context, checked *task.Task) { + // Never spawn a checker for subtasks, checker tasks, or tasks without a repository. + if checked.ParentTaskID != "" || checked.CheckerForTaskID != "" || checked.RepositoryURL == "" { + return + } + // Idempotent: don't create a second checker if one already exists. + existing, err := p.store.GetCheckerTask(checked.ID) + if err != nil { + p.logger.Error("spawnCheckerTask: GetCheckerTask failed", "taskID", checked.ID, "error", err) + return + } + if existing != nil { + return + } + + criteria := checked.AcceptanceCriteria + if criteria == "" { + criteria = checked.Agent.Instructions + } + + instructions := fmt.Sprintf(`You are validating a completed task. Do not make any changes to the code or repository. + +Task: %s +Instructions given to the implementor: +%s + +Acceptance criteria: +%s + +Steps: +1. Clone the repository and review the changes made. +2. Verify each acceptance criterion is met. Run tests or make HTTP requests as needed. +3. If all criteria are satisfied, exit normally (success). +4. If any criterion is not met, use the Bash tool to exit with a non-zero code: + bash -c "exit 1" + Before exiting, write a brief summary of what failed.`, checked.Name, checked.Agent.Instructions, criteria) + + now := time.Now().UTC() + checker := &task.Task{ + ID: uuid.New().String(), + Name: "Check: " + checked.Name, + CheckerForTaskID: checked.ID, + RepositoryURL: checked.RepositoryURL, + Agent: task.AgentConfig{ + Type: "claude", + Instructions: instructions, + MaxBudgetUSD: 0.50, + AllowedTools: []string{"Bash", "Read", "Glob", "Grep"}, + }, + Timeout: task.Duration{Duration: 10 * time.Minute}, + Priority: task.PriorityNormal, + Tags: []string{}, + DependsOn: []string{}, + Retry: task.RetryConfig{MaxAttempts: 1, Backoff: "linear"}, + State: task.StatePending, + CreatedAt: now, + UpdatedAt: now, + } + + if err := p.store.CreateTask(checker); err != nil { + p.logger.Error("spawnCheckerTask: CreateTask failed", "error", err) + return + } + checker.State = task.StateQueued + if err := p.store.UpdateTaskState(checker.ID, task.StateQueued); err != nil { + p.logger.Error("spawnCheckerTask: UpdateTaskState failed", "error", err) + return + } + if err := p.Submit(ctx, checker); err != nil { + p.logger.Error("spawnCheckerTask: Submit failed", "error", err) + } +} + +// triggerStoryDeploy runs the project deploy script for a SHIPPABLE story +// and advances it to DEPLOYED on success. +func (p *Pool) triggerStoryDeploy(ctx context.Context, storyID string) { + story, err := p.store.GetStory(storyID) + if err != nil { + p.logger.Error("triggerStoryDeploy: failed to get story", "storyID", storyID, "error", err) + return + } + if story.ProjectID == "" { + return + } + proj, err := p.store.GetProject(story.ProjectID) + if err != nil { + p.logger.Error("triggerStoryDeploy: failed to get project", "storyID", storyID, "projectID", story.ProjectID, "error", err) + return + } + if proj.DeployScript == "" { + return + } + // Merge story branch to main before deploying (ADR-007). + if story.BranchName != "" && proj.LocalPath != "" { + mergeSteps := [][]string{ + {"git", "-C", proj.LocalPath, "fetch", "origin"}, + {"git", "-C", proj.LocalPath, "checkout", "main"}, + {"git", "-C", proj.LocalPath, "merge", "--no-ff", story.BranchName, "-m", "Merge " + story.BranchName}, + {"git", "-C", proj.LocalPath, "push", "origin", "main"}, + } + for _, args := range mergeSteps { + if mergeOut, mergeErr := exec.CommandContext(ctx, args[0], args[1:]...).CombinedOutput(); mergeErr != nil { + p.logger.Error("triggerStoryDeploy: merge failed", "cmd", args, "output", string(mergeOut), "error", mergeErr) + return + } + } + p.logger.Info("story branch merged to main", "storyID", storyID, "branch", story.BranchName) + } + out, err := exec.CommandContext(ctx, proj.DeployScript).CombinedOutput() + if err != nil { + p.logger.Error("triggerStoryDeploy: deploy script failed", "storyID", storyID, "script", proj.DeployScript, "output", string(out), "error", err) + return + } + if err := p.store.UpdateStoryStatus(storyID, task.StoryDeployed); err != nil { + p.logger.Error("triggerStoryDeploy: failed to update story status", "storyID", storyID, "error", err) + return + } + p.logger.Info("story transitioned to DEPLOYED", "storyID", storyID) + go p.createValidationTask(ctx, storyID) +} + +// createValidationTask creates a validation subtask from the story's ValidationJSON +// and transitions the story to VALIDATING. +func (p *Pool) createValidationTask(ctx context.Context, storyID string) { + story, err := p.store.GetStory(storyID) + if err != nil { + p.logger.Error("createValidationTask: failed to get story", "storyID", storyID, "error", err) + return + } + if story.ValidationJSON == "" { + p.logger.Warn("createValidationTask: story has no ValidationJSON, skipping", "storyID", storyID) + return + } + + var spec map[string]interface{} + if err := json.Unmarshal([]byte(story.ValidationJSON), &spec); err != nil { + p.logger.Error("createValidationTask: failed to parse ValidationJSON", "storyID", storyID, "error", err) + return + } + + instructions := fmt.Sprintf("Validate the deployment for story %q.\n\nValidation spec:\n%s", story.Name, story.ValidationJSON) + + now := time.Now().UTC() + vtask := &task.Task{ + ID: uuid.New().String(), + Name: fmt.Sprintf("validation: %s", story.Name), + StoryID: storyID, + State: task.StateQueued, + Agent: task.AgentConfig{Type: "claude", Instructions: instructions}, + Tags: []string{}, + DependsOn: []string{}, + CreatedAt: now, + UpdatedAt: now, + } + + if err := p.store.CreateTask(vtask); err != nil { + p.logger.Error("createValidationTask: failed to create task", "storyID", storyID, "error", err) + return + } + if err := p.store.UpdateStoryStatus(storyID, task.StoryValidating); err != nil { + p.logger.Error("createValidationTask: failed to update story status", "storyID", storyID, "error", err) + return + } + p.logger.Info("validation task created and story transitioned to VALIDATING", "storyID", storyID, "taskID", vtask.ID) + p.Submit(ctx, vtask) //nolint:errcheck +} + +// checkValidationResult inspects a completed validation task and transitions +// the story to REVIEW_READY or NEEDS_FIX accordingly. +func (p *Pool) checkValidationResult(ctx context.Context, storyID string, taskState task.State, errorMsg string) { + if taskState == task.StateCompleted { + if err := p.store.UpdateStoryStatus(storyID, task.StoryReviewReady); err != nil { + p.logger.Error("checkValidationResult: failed to update story status", "storyID", storyID, "error", err) + return + } + p.logger.Info("story transitioned to REVIEW_READY", "storyID", storyID) + } else { + if err := p.store.UpdateStoryStatus(storyID, task.StoryNeedsFix); err != nil { + p.logger.Error("checkValidationResult: failed to update story status", "storyID", storyID, "error", err) + return + } + p.logger.Info("story transitioned to NEEDS_FIX", "storyID", storyID, "error", errorMsg) + } +} + // ActiveCount returns the number of currently running tasks. func (p *Pool) ActiveCount() int { p.mu.Lock() @@ -395,6 +805,34 @@ func (p *Pool) ActiveCount() int { return p.active } +// AgentStatusInfo holds the current state of a single agent. +type AgentStatusInfo struct { + Agent string `json:"agent"` + ActiveTasks int `json:"active_tasks"` + RateLimited bool `json:"rate_limited"` + Until *time.Time `json:"until,omitempty"` +} + +// AgentStatuses returns the current status of all registered agents. +func (p *Pool) AgentStatuses() []AgentStatusInfo { + p.mu.Lock() + defer p.mu.Unlock() + now := time.Now() + var out []AgentStatusInfo + for agent := range p.runners { + info := AgentStatusInfo{ + Agent: agent, + ActiveTasks: p.activePerAgent[agent], + } + if deadline, ok := p.rateLimited[agent]; ok && now.Before(deadline) { + info.RateLimited = true + info.Until = &deadline + } + out = append(out, info) + } + return out +} + // pickAgent selects the best agent from the given SystemStatus using explicit // load balancing: prefer the available (non-rate-limited) agent with the fewest // active tasks. If all agents are rate-limited, fall back to fewest active. @@ -436,6 +874,18 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { activeTasks[agent] = p.activePerAgent[agent] if deadline, ok := p.rateLimited[agent]; ok && now.After(deadline) { delete(p.rateLimited, agent) + agentName := agent + go func() { + ev := storage.AgentEvent{ + ID: uuid.New().String(), + Agent: agentName, + Event: "available", + Timestamp: time.Now(), + } + if recErr := p.store.RecordAgentEvent(ev); recErr != nil { + p.logger.Warn("failed to record agent available event", "error", recErr) + } + }() } rateLimited[agent] = now.Before(p.rateLimited[agent]) } @@ -479,9 +929,58 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { agentType = "claude" } + // Check dependencies before taking the per-agent slot to avoid deadlock: + // if a dependent task holds the slot while waiting for its dependency to run, + // the dependency can never start (maxPerAgent=1). + if len(t.DependsOn) > 0 { + ready, depErr := p.checkDepsReady(t) + if depErr != nil { + // A dependency hit a terminal failure — cancel this task immediately. + now := time.Now().UTC() + exec := &storage.Execution{ + ID: uuid.New().String(), + TaskID: t.ID, + StartTime: now, + EndTime: now, + Status: "CANCELLED", + ErrorMsg: depErr.Error(), + } + if createErr := p.store.CreateExecution(exec); createErr != nil { + p.logger.Error("failed to create execution record", "error", createErr) + } + if err := p.store.UpdateTaskState(t.ID, task.StateCancelled); err != nil { + p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateCancelled, "error", err) + } + p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: depErr} + return + } + if !ready { + // Dependencies not yet done — requeue without holding the slot. + time.AfterFunc(p.requeueDelay, func() { p.workCh <- workItem{ctx: ctx, task: t} }) + return + } + } p.mu.Lock() + + if p.activePerAgent[agentType] >= p.maxPerAgent { + p.mu.Unlock() + time.AfterFunc(p.requeueDelay, func() { p.workCh <- workItem{ctx: ctx, task: t} }) + return + } if deadline, ok := p.rateLimited[agentType]; ok && time.Now().After(deadline) { delete(p.rateLimited, agentType) + agentName := agentType + go func() { + ev := storage.AgentEvent{ + ID: uuid.New().String(), + Agent: agentName, + Event: "available", + Timestamp: time.Now(), + } + if recErr := p.store.RecordAgentEvent(ev); recErr != nil { + p.logger.Warn("failed to record agent available event", "error", recErr) + } + }() } p.activePerAgent[agentType]++ p.mu.Unlock() @@ -512,30 +1011,6 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { return } - // Wait for all dependencies to complete before starting execution. - if len(t.DependsOn) > 0 { - if err := p.waitForDependencies(ctx, t); err != nil { - now := time.Now().UTC() - exec := &storage.Execution{ - ID: uuid.New().String(), - TaskID: t.ID, - StartTime: now, - EndTime: now, - Status: "FAILED", - ErrorMsg: err.Error(), - } - if createErr := p.store.CreateExecution(exec); createErr != nil { - p.logger.Error("failed to create execution record", "error", createErr) - } - if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil { - p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateFailed, "error", err) - } - p.decActiveAgent(agentType, &cleaned) - p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} - return - } - } - execID := uuid.New().String() exec := &storage.Execution{ ID: execID, @@ -554,12 +1029,13 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { } } - // Record execution start. - if err := p.store.CreateExecution(exec); err != nil { + // Record execution start atomically with the RUNNING state transition. + if err := p.store.CreateExecutionAndSetRunning(exec); err != nil { p.logger.Error("failed to create execution record", "error", err) } - if err := p.store.UpdateTaskState(t.ID, task.StateRunning); err != nil { - p.logger.Error("failed to update task state", "error", err) + select { + case p.startedCh <- t.ID: + default: } // Apply task timeout and register cancel so callers can stop this task. @@ -583,6 +1059,19 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { priorExecs, priorErr := p.store.ListExecutions(t.ID) t = withFailureHistory(t, priorExecs, priorErr) + // Populate RepositoryURL from Project registry if missing (ADR-007). + if t.RepositoryURL == "" && t.Project != "" { + if proj, err := p.store.GetProject(t.Project); err == nil && proj.RemoteURL != "" { + t.RepositoryURL = proj.RemoteURL + } + } + // Populate BranchName from Story if missing (ADR-007). + if t.BranchName == "" && t.StoryID != "" { + if story, err := p.store.GetStory(t.StoryID); err == nil && story.BranchName != "" { + t.BranchName = story.BranchName + } + } + // Run the task. err = runner.Run(ctx, t, exec) exec.EndTime = time.Now().UTC() @@ -650,18 +1139,31 @@ func (p *Pool) RecoverStaleQueued(ctx context.Context) { } } -// RecoverStaleBlocked promotes any BLOCKED parent task to READY when all of its -// subtasks are already COMPLETED. This handles the case where the server was -// restarted after subtasks finished but before maybeUnblockParent could fire. +// RecoverStaleBlocked promotes any BLOCKED or QUEUED parent task to READY when +// all of its subtasks are already COMPLETED. This handles the case where the +// server was restarted after subtasks finished but before maybeUnblockParent +// could fire, and also the case where story approval pre-created subtasks +// without ever running the parent task. // Call this once on server startup, after RecoverStaleRunning and RecoverStaleQueued. func (p *Pool) RecoverStaleBlocked() { - tasks, err := p.store.ListTasks(storage.TaskFilter{State: task.StateBlocked}) - if err != nil { - p.logger.Error("RecoverStaleBlocked: list tasks", "error", err) - return - } - for _, t := range tasks { - p.maybeUnblockParent(t.ID) + ctx := context.Background() + for _, state := range []task.State{task.StateBlocked, task.StateQueued} { + tasks, err := p.store.ListTasks(storage.TaskFilter{State: state}) + if err != nil { + p.logger.Error("RecoverStaleBlocked: list tasks", "error", err, "state", state) + continue + } + for _, t := range tasks { + if t.ParentTaskID != "" { + continue // only promote actual parents + } + before := t.State + p.maybeUnblockParent(t.ID) + // If the parent was promoted, check story completion. + if after, err := p.store.GetTask(t.ID); err == nil && after.State != before && t.StoryID != "" { + p.checkStoryCompletion(ctx, t.StoryID) + } + } } } @@ -673,6 +1175,32 @@ var terminalFailureStates = map[task.State]bool{ task.StateBudgetExceeded: true, } +// depDoneStates are task states that satisfy a DependsOn dependency. +var depDoneStates = map[task.State]bool{ + task.StateCompleted: true, + task.StateReady: true, // leaf tasks finish at READY +} + +// checkDepsReady does a single synchronous check of t.DependsOn. +// Returns (true, nil) if all deps are done, (false, nil) if any are still pending, +// or (false, err) if a dep entered a terminal failure state. +func (p *Pool) checkDepsReady(t *task.Task) (bool, error) { + for _, depID := range t.DependsOn { + dep, err := p.store.GetTask(depID) + if err != nil { + return false, fmt.Errorf("dependency %q not found: %w", depID, err) + } + if depDoneStates[dep.State] { + continue + } + if terminalFailureStates[dep.State] { + return false, fmt.Errorf("dependency %q ended in state %s", depID, dep.State) + } + return false, nil // still pending + } + return true, nil +} + // withFailureHistory returns a shallow copy of t with prior failed execution // error messages prepended to SystemPromptAppend so the agent knows what went // wrong in previous attempts. @@ -710,16 +1238,16 @@ func withFailureHistory(t *task.Task, execs []*storage.Execution, err error) *ta return © } -// maybeUnblockParent transitions the parent task from BLOCKED to READY if all -// of its subtasks are in the COMPLETED state. If any subtask is not COMPLETED -// (including FAILED, CANCELLED, RUNNING, etc.) the parent stays BLOCKED. +// maybeUnblockParent transitions the parent task to READY if all of its subtasks +// are in the COMPLETED state. Handles both BLOCKED parents (ran, created subtasks, +// paused) and QUEUED parents (story approval created subtasks without running parent). func (p *Pool) maybeUnblockParent(parentID string) { parent, err := p.store.GetTask(parentID) if err != nil { p.logger.Error("maybeUnblockParent: get parent", "parentID", parentID, "error", err) return } - if parent.State != task.StateBlocked { + if parent.State != task.StateBlocked && parent.State != task.StateQueued { return } subtasks, err := p.store.ListSubtasks(parentID) @@ -727,6 +1255,11 @@ func (p *Pool) maybeUnblockParent(parentID string) { p.logger.Error("maybeUnblockParent: list subtasks", "parentID", parentID, "error", err) return } + // A task with no subtasks was never blocked by subtask delegation — don't promote it. + // This prevents incorrectly promoting leaf tasks that are stuck in QUEUED to READY. + if len(subtasks) == 0 { + return + } for _, sub := range subtasks { if sub.State != task.StateCompleted { return @@ -747,7 +1280,7 @@ func (p *Pool) waitForDependencies(ctx context.Context, t *task.Task) error { if err != nil { return fmt.Errorf("dependency %q not found: %w", depID, err) } - if dep.State == task.StateCompleted { + if depDoneStates[dep.State] { continue } if terminalFailureStates[dep.State] { |
