summaryrefslogtreecommitdiff
path: root/internal/executor/executor.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/executor/executor.go')
-rw-r--r--internal/executor/executor.go695
1 files changed, 614 insertions, 81 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index 315030d..09169bd 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -2,9 +2,11 @@ package executor
import (
"context"
+ "encoding/json"
"errors"
"fmt"
"log/slog"
+ "os/exec"
"path/filepath"
"strings"
"sync"
@@ -25,6 +27,7 @@ type Store interface {
ListSubtasks(parentID string) ([]*task.Task, error)
ListExecutions(taskID string) ([]*storage.Execution, error)
CreateExecution(e *storage.Execution) error
+ CreateExecutionAndSetRunning(e *storage.Execution) error
UpdateExecution(e *storage.Execution) error
UpdateTaskState(id string, newState task.State) error
UpdateTaskQuestion(taskID, questionJSON string) error
@@ -32,6 +35,14 @@ type Store interface {
AppendTaskInteraction(taskID string, interaction task.Interaction) error
UpdateTaskAgent(id string, agent task.AgentConfig) error
UpdateExecutionChangestats(execID string, stats *task.Changestats) error
+ RecordAgentEvent(e storage.AgentEvent) error
+ GetProject(id string) (*task.Project, error)
+ GetStory(id string) (*task.Story, error)
+ ListTasksByStory(storyID string) ([]*task.Task, error)
+ UpdateStoryStatus(id string, status task.StoryState) error
+ CreateTask(t *task.Task) error
+ UpdateTaskCheckerReport(id, report string) error
+ GetCheckerTask(checkedTaskID string) (*task.Task, error)
}
// LogPather is an optional interface runners can implement to provide the log
@@ -56,24 +67,28 @@ type workItem struct {
// Pool manages a bounded set of concurrent task workers.
type Pool struct {
maxConcurrent int
+ maxPerAgent int
runners map[string]Runner
store Store
logger *slog.Logger
- depPollInterval time.Duration // how often waitForDependencies polls; defaults to 5s
-
- mu sync.Mutex
- active int
- activePerAgent map[string]int
- rateLimited map[string]time.Time // agentType -> until
- cancels map[string]context.CancelFunc // taskID → cancel
- resultCh chan *Result
- workCh chan workItem // internal bounded queue; Submit enqueues here
- doneCh chan struct{} // signals when a worker slot is freed
- Questions *QuestionRegistry
- Classifier *Classifier
- // LLM, when non-nil, enables LLM-synthesized summaries for executions
- // whose stdout did not include a "## Summary" heading.
- LLM *llm.Client
+ depPollInterval time.Duration // how often waitForDependencies polls; defaults to 5s
+ requeueDelay time.Duration // how long to wait before requeuing a blocked-per-agent task; defaults to 30s
+
+ mu sync.Mutex
+ active int
+ activePerAgent map[string]int
+ rateLimited map[string]time.Time // agentType -> until
+ cancels map[string]context.CancelFunc // taskID → cancel
+ consecutiveFailures map[string]int // agentType -> count
+ closed bool // set to true when Shutdown has been called
+ resultCh chan *Result
+ startedCh chan string // task IDs that just transitioned to RUNNING
+ workCh chan workItem // internal bounded queue; Submit enqueues here
+ doneCh chan struct{} // signals when a worker slot is freed
+ workerWg sync.WaitGroup // tracks in-flight execute/executeResume goroutines
+ dispatchDone chan struct{} // closed when the dispatch goroutine exits
+ Classifier *Classifier
+ LLM *llm.Client
}
// Result is emitted when a task execution completes.
@@ -88,18 +103,22 @@ func NewPool(maxConcurrent int, runners map[string]Runner, store Store, logger *
maxConcurrent = 1
}
p := &Pool{
- maxConcurrent: maxConcurrent,
- runners: runners,
- store: store,
- logger: logger,
- depPollInterval: 5 * time.Second,
- activePerAgent: make(map[string]int),
- rateLimited: make(map[string]time.Time),
- cancels: make(map[string]context.CancelFunc),
- resultCh: make(chan *Result, maxConcurrent*2),
- workCh: make(chan workItem, maxConcurrent*10+100),
- doneCh: make(chan struct{}, maxConcurrent),
- Questions: NewQuestionRegistry(),
+ maxConcurrent: maxConcurrent,
+ maxPerAgent: 1,
+ runners: runners,
+ store: store,
+ logger: logger,
+ depPollInterval: 5 * time.Second,
+ requeueDelay: 30 * time.Second,
+ activePerAgent: make(map[string]int),
+ rateLimited: make(map[string]time.Time),
+ cancels: make(map[string]context.CancelFunc),
+ consecutiveFailures: make(map[string]int),
+ resultCh: make(chan *Result, maxConcurrent*2),
+ startedCh: make(chan string, maxConcurrent*2),
+ workCh: make(chan workItem, maxConcurrent*10+100),
+ doneCh: make(chan struct{}, maxConcurrent),
+ dispatchDone: make(chan struct{}),
}
go p.dispatch()
return p
@@ -109,6 +128,7 @@ func NewPool(maxConcurrent int, runners map[string]Runner, store Store, logger *
// and launches goroutines as soon as a pool slot is available. This prevents
// tasks from being rejected when the pool is temporarily at capacity.
func (p *Pool) dispatch() {
+ defer close(p.dispatchDone)
for item := range p.workCh {
for {
p.mu.Lock()
@@ -116,9 +136,9 @@ func (p *Pool) dispatch() {
p.active++
p.mu.Unlock()
if item.exec != nil {
- go p.executeResume(item.ctx, item.task, item.exec)
+ p.workerWg.Add(1); go func(i workItem) { defer p.workerWg.Done(); p.executeResume(i.ctx, i.task, i.exec) }(item)
} else {
- go p.execute(item.ctx, item.task)
+ p.workerWg.Add(1); go func(i workItem) { defer p.workerWg.Done(); p.execute(i.ctx, i.task) }(item)
}
break
}
@@ -132,19 +152,64 @@ func (p *Pool) dispatch() {
// work queue is full. When the pool is at capacity the task is buffered and
// dispatched as soon as a slot becomes available.
func (p *Pool) Submit(ctx context.Context, t *task.Task) error {
+ p.mu.Lock()
+ if p.closed {
+ p.mu.Unlock()
+ return fmt.Errorf("executor pool is shut down")
+ }
+ // Send while holding the lock so that Shutdown cannot close workCh between
+ // the closed-check above and the send below. The dispatch goroutine never
+ // holds p.mu while receiving from workCh, so this cannot deadlock.
select {
case p.workCh <- workItem{ctx: ctx, task: t}:
+ p.mu.Unlock()
return nil
default:
+ p.mu.Unlock()
return fmt.Errorf("executor work queue full (capacity %d)", cap(p.workCh))
}
}
+// Started returns a channel that emits task IDs when they transition to RUNNING.
+func (p *Pool) Started() <-chan string {
+ return p.startedCh
+}
+
// Results returns the channel for reading execution results.
func (p *Pool) Results() <-chan *Result {
return p.resultCh
}
+// Shutdown stops accepting new work and waits for all in-flight workers to
+// finish. Returns ctx.Err() if the context deadline is exceeded before all
+// workers complete.
+func (p *Pool) Shutdown(ctx context.Context) error {
+ // Stop the dispatch goroutine. We must wait for it to exit before calling
+ // workerWg.Wait() to avoid a race between dispatch's Add(1) and Wait().
+ p.mu.Lock()
+ p.closed = true
+ p.mu.Unlock()
+ close(p.workCh)
+ select {
+ case <-p.dispatchDone:
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+
+ done := make(chan struct{})
+ go func() {
+ p.workerWg.Wait()
+ close(done)
+ }()
+
+ select {
+ case <-done:
+ return nil
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+}
+
// Cancel requests cancellation of a running task. Returns false if the task
// is not currently running in this pool.
func (p *Pool) Cancel(taskID string) bool {
@@ -250,11 +315,12 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex
exec.StartTime = time.Now().UTC()
exec.Status = "RUNNING"
- if err := p.store.CreateExecution(exec); err != nil {
+ if err := p.store.CreateExecutionAndSetRunning(exec); err != nil {
p.logger.Error("failed to create resume execution record", "error", err)
}
- if err := p.store.UpdateTaskState(t.ID, task.StateRunning); err != nil {
- p.logger.Error("failed to update task state", "error", err)
+ select {
+ case p.startedCh <- t.ID:
+ default:
}
var cancel context.CancelFunc
@@ -273,6 +339,19 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex
p.mu.Unlock()
}()
+ // Populate RepositoryURL from Project registry if missing (ADR-007).
+ if t.RepositoryURL == "" && t.Project != "" {
+ if proj, err := p.store.GetProject(t.Project); err == nil && proj.RemoteURL != "" {
+ t.RepositoryURL = proj.RemoteURL
+ }
+ }
+ // Populate BranchName from Story if missing (ADR-007).
+ if t.BranchName == "" && t.StoryID != "" {
+ if story, err := p.store.GetStory(t.StoryID); err == nil && story.BranchName != "" {
+ t.BranchName = story.BranchName
+ }
+ }
+
err = runner.Run(ctx, t, exec)
exec.EndTime = time.Now().UTC()
@@ -289,16 +368,32 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.
if retry.IsRateLimitError(err) || isQuotaExhausted(err) {
p.mu.Lock()
retryAfter := retry.ParseRetryAfter(err.Error())
- if retryAfter == 0 {
- if isQuotaExhausted(err) {
+ reason := "transient"
+ if isQuotaExhausted(err) {
+ reason = "quota"
+ if retryAfter == 0 {
retryAfter = 5 * time.Hour
- } else {
- retryAfter = 1 * time.Minute
}
+ } else if retryAfter == 0 {
+ retryAfter = 1 * time.Minute
}
- p.rateLimited[agentType] = time.Now().Add(retryAfter)
+ until := time.Now().Add(retryAfter)
+ p.rateLimited[agentType] = until
p.logger.Info("agent rate limited", "agent", agentType, "retryAfter", retryAfter, "quotaExhausted", isQuotaExhausted(err))
p.mu.Unlock()
+ go func() {
+ ev := storage.AgentEvent{
+ ID: uuid.New().String(),
+ Agent: agentType,
+ Event: "rate_limited",
+ Timestamp: time.Now(),
+ Until: &until,
+ Reason: reason,
+ }
+ if recErr := p.store.RecordAgentEvent(ev); recErr != nil {
+ p.logger.Warn("failed to record agent event", "error", recErr)
+ }
+ }()
}
var blockedErr *BlockedError
@@ -335,9 +430,51 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.
if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil {
p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateFailed, "error", err)
}
+ p.mu.Lock()
+ p.consecutiveFailures[agentType]++
+ p.mu.Unlock()
+ }
+ // If this is a checker task, attach the failure report for any terminal
+ // failure state (FAILED, TIMED_OUT, CANCELLED, BUDGET_EXCEEDED).
+ if t.CheckerForTaskID != "" && exec.ErrorMsg != "" {
+ if reportErr := p.store.UpdateTaskCheckerReport(t.CheckerForTaskID, exec.ErrorMsg); reportErr != nil {
+ p.logger.Error("handleRunResult: failed to set checker report", "taskID", t.CheckerForTaskID, "error", reportErr)
+ }
+ }
+ if t.StoryID != "" && exec.Status == "FAILED" {
+ storyID := t.StoryID
+ errMsg := exec.ErrorMsg
+ go func() {
+ story, getErr := p.store.GetStory(storyID)
+ if getErr != nil {
+ return
+ }
+ if story.Status == task.StoryValidating {
+ p.checkValidationResult(ctx, storyID, task.StateFailed, errMsg)
+ }
+ }()
}
} else {
- if t.ParentTaskID == "" {
+ p.mu.Lock()
+ p.consecutiveFailures[agentType] = 0
+ p.mu.Unlock()
+ if t.CheckerForTaskID != "" {
+ // Checker task succeeded — auto-accept the checked task.
+ exec.Status = "COMPLETED"
+ if err := p.store.UpdateTaskState(t.ID, task.StateCompleted); err != nil {
+ p.logger.Error("handleRunResult: failed to complete checker task", "taskID", t.ID, "error", err)
+ }
+ checkedTask, getErr := p.store.GetTask(t.CheckerForTaskID)
+ if getErr == nil {
+ if acceptErr := p.store.UpdateTaskState(t.CheckerForTaskID, task.StateCompleted); acceptErr != nil {
+ p.logger.Error("handleRunResult: failed to auto-accept checked task", "taskID", t.CheckerForTaskID, "error", acceptErr)
+ } else if checkedTask.StoryID != "" {
+ go p.checkStoryCompletion(context.Background(), checkedTask.StoryID)
+ }
+ } else {
+ p.logger.Error("handleRunResult: failed to get checked task", "taskID", t.CheckerForTaskID, "error", getErr)
+ }
+ } else if t.ParentTaskID == "" {
subtasks, subErr := p.store.ListSubtasks(t.ID)
if subErr != nil {
p.logger.Error("failed to list subtasks", "taskID", t.ID, "error", subErr)
@@ -352,6 +489,7 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.
if err := p.store.UpdateTaskState(t.ID, task.StateReady); err != nil {
p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateReady, "error", err)
}
+ go p.spawnCheckerTask(context.Background(), t)
}
} else {
exec.Status = "COMPLETED"
@@ -360,6 +498,21 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.
}
p.maybeUnblockParent(t.ParentTaskID)
}
+ if t.StoryID != "" {
+ storyID := t.StoryID
+ go func() {
+ story, getErr := p.store.GetStory(storyID)
+ if getErr != nil {
+ p.logger.Error("handleRunResult: failed to get story", "storyID", storyID, "error", getErr)
+ return
+ }
+ if story.Status == task.StoryValidating {
+ p.checkValidationResult(ctx, storyID, task.StateCompleted, "")
+ } else {
+ p.checkStoryCompletion(ctx, storyID)
+ }
+ }()
+ }
}
summary := exec.Summary
@@ -374,6 +527,13 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.
p.logger.Error("failed to update task summary", "taskID", t.ID, "error", summaryErr)
}
}
+ terminalFailure := exec.Status == "FAILED" || exec.Status == "TIMED_OUT" || exec.Status == "CANCELLED" || exec.Status == "BUDGET_EXCEEDED"
+ if t.CheckerForTaskID != "" && terminalFailure && summary != "" {
+ // Overwrite the initial error-message report with the richer summary.
+ if reportErr := p.store.UpdateTaskCheckerReport(t.CheckerForTaskID, summary); reportErr != nil {
+ p.logger.Error("handleRunResult: failed to update checker report with summary", "taskID", t.CheckerForTaskID, "error", reportErr)
+ }
+ }
if exec.StdoutPath != "" {
if cs := task.ParseChangestatFromFile(exec.StdoutPath); cs != nil {
exec.Changestats = cs
@@ -388,6 +548,256 @@ func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.
p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err}
}
+// checkStoryCompletion checks whether all top-level tasks in a story have reached
+// a terminal success state and transitions the story to SHIPPABLE if so.
+// Subtasks are intentionally excluded — a parent task reaching READY/COMPLETED
+// already accounts for its subtasks.
+// CheckStoryCompletion is the exported entry point for story completion checks
+// called from outside the package (e.g. the API accept handler).
+func (p *Pool) CheckStoryCompletion(ctx context.Context, storyID string) {
+ p.checkStoryCompletion(ctx, storyID)
+}
+
+func (p *Pool) checkStoryCompletion(ctx context.Context, storyID string) {
+ story, err := p.store.GetStory(storyID)
+ if err != nil {
+ p.logger.Error("checkStoryCompletion: failed to get story", "storyID", storyID, "error", err)
+ return
+ }
+ if story.Status != task.StoryInProgress {
+ return // already SHIPPABLE or beyond — nothing to do
+ }
+ tasks, err := p.store.ListTasksByStory(storyID)
+ if err != nil {
+ p.logger.Error("checkStoryCompletion: failed to list tasks", "storyID", storyID, "error", err)
+ return
+ }
+ if len(tasks) == 0 {
+ return
+ }
+ topLevelCount := 0
+ for _, t := range tasks {
+ if t.ParentTaskID != "" {
+ continue // subtasks are covered by their parent
+ }
+ topLevelCount++
+ if t.State != task.StateCompleted {
+ return // not all top-level tasks done; READY alone is not sufficient (checker may be pending)
+ }
+ }
+ if topLevelCount == 0 {
+ return // no top-level tasks — don't auto-complete
+ }
+ if err := p.store.UpdateStoryStatus(storyID, task.StoryShippable); err != nil {
+ p.logger.Error("checkStoryCompletion: failed to update story status", "storyID", storyID, "error", err)
+ return
+ }
+ p.logger.Info("story transitioned to SHIPPABLE", "storyID", storyID)
+ // Deploy is now triggered explicitly by the human via POST /api/stories/{id}/ship.
+}
+
+// ShipStory merges the story branch and runs the deploy script.
+// Returns an error if the story is not in SHIPPABLE state.
+func (p *Pool) ShipStory(ctx context.Context, storyID string) error {
+ story, err := p.store.GetStory(storyID)
+ if err != nil {
+ return fmt.Errorf("story not found: %w", err)
+ }
+ if story.Status != task.StoryShippable {
+ return fmt.Errorf("story is not SHIPPABLE (current status: %s)", story.Status)
+ }
+ go p.triggerStoryDeploy(context.Background(), storyID)
+ return nil
+}
+
+// spawnCheckerTask creates and submits a checker task for the given completed task.
+// Guards: not called for subtasks, checker tasks, tasks without a repository URL,
+// or tasks that already have a checker.
+func (p *Pool) spawnCheckerTask(ctx context.Context, checked *task.Task) {
+ // Never spawn a checker for subtasks, checker tasks, or tasks without a repository.
+ if checked.ParentTaskID != "" || checked.CheckerForTaskID != "" || checked.RepositoryURL == "" {
+ return
+ }
+ // Idempotent: don't create a second checker if one already exists.
+ existing, err := p.store.GetCheckerTask(checked.ID)
+ if err != nil {
+ p.logger.Error("spawnCheckerTask: GetCheckerTask failed", "taskID", checked.ID, "error", err)
+ return
+ }
+ if existing != nil {
+ return
+ }
+
+ criteria := checked.AcceptanceCriteria
+ if criteria == "" {
+ criteria = checked.Agent.Instructions
+ }
+
+ instructions := fmt.Sprintf(`You are validating a completed task. Do not make any changes to the code or repository.
+
+Task: %s
+Instructions given to the implementor:
+%s
+
+Acceptance criteria:
+%s
+
+Steps:
+1. Clone the repository and review the changes made.
+2. Verify each acceptance criterion is met. Run tests or make HTTP requests as needed.
+3. If all criteria are satisfied, exit normally (success).
+4. If any criterion is not met, use the Bash tool to exit with a non-zero code:
+ bash -c "exit 1"
+ Before exiting, write a brief summary of what failed.`, checked.Name, checked.Agent.Instructions, criteria)
+
+ now := time.Now().UTC()
+ checker := &task.Task{
+ ID: uuid.New().String(),
+ Name: "Check: " + checked.Name,
+ CheckerForTaskID: checked.ID,
+ RepositoryURL: checked.RepositoryURL,
+ Agent: task.AgentConfig{
+ Type: "claude",
+ Instructions: instructions,
+ MaxBudgetUSD: 0.50,
+ AllowedTools: []string{"Bash", "Read", "Glob", "Grep"},
+ },
+ Timeout: task.Duration{Duration: 10 * time.Minute},
+ Priority: task.PriorityNormal,
+ Tags: []string{},
+ DependsOn: []string{},
+ Retry: task.RetryConfig{MaxAttempts: 1, Backoff: "linear"},
+ State: task.StatePending,
+ CreatedAt: now,
+ UpdatedAt: now,
+ }
+
+ if err := p.store.CreateTask(checker); err != nil {
+ p.logger.Error("spawnCheckerTask: CreateTask failed", "error", err)
+ return
+ }
+ checker.State = task.StateQueued
+ if err := p.store.UpdateTaskState(checker.ID, task.StateQueued); err != nil {
+ p.logger.Error("spawnCheckerTask: UpdateTaskState failed", "error", err)
+ return
+ }
+ if err := p.Submit(ctx, checker); err != nil {
+ p.logger.Error("spawnCheckerTask: Submit failed", "error", err)
+ }
+}
+
+// triggerStoryDeploy runs the project deploy script for a SHIPPABLE story
+// and advances it to DEPLOYED on success.
+func (p *Pool) triggerStoryDeploy(ctx context.Context, storyID string) {
+ story, err := p.store.GetStory(storyID)
+ if err != nil {
+ p.logger.Error("triggerStoryDeploy: failed to get story", "storyID", storyID, "error", err)
+ return
+ }
+ if story.ProjectID == "" {
+ return
+ }
+ proj, err := p.store.GetProject(story.ProjectID)
+ if err != nil {
+ p.logger.Error("triggerStoryDeploy: failed to get project", "storyID", storyID, "projectID", story.ProjectID, "error", err)
+ return
+ }
+ if proj.DeployScript == "" {
+ return
+ }
+ // Merge story branch to main before deploying (ADR-007).
+ if story.BranchName != "" && proj.LocalPath != "" {
+ mergeSteps := [][]string{
+ {"git", "-C", proj.LocalPath, "fetch", "origin"},
+ {"git", "-C", proj.LocalPath, "checkout", "main"},
+ {"git", "-C", proj.LocalPath, "merge", "--no-ff", story.BranchName, "-m", "Merge " + story.BranchName},
+ {"git", "-C", proj.LocalPath, "push", "origin", "main"},
+ }
+ for _, args := range mergeSteps {
+ if mergeOut, mergeErr := exec.CommandContext(ctx, args[0], args[1:]...).CombinedOutput(); mergeErr != nil {
+ p.logger.Error("triggerStoryDeploy: merge failed", "cmd", args, "output", string(mergeOut), "error", mergeErr)
+ return
+ }
+ }
+ p.logger.Info("story branch merged to main", "storyID", storyID, "branch", story.BranchName)
+ }
+ out, err := exec.CommandContext(ctx, proj.DeployScript).CombinedOutput()
+ if err != nil {
+ p.logger.Error("triggerStoryDeploy: deploy script failed", "storyID", storyID, "script", proj.DeployScript, "output", string(out), "error", err)
+ return
+ }
+ if err := p.store.UpdateStoryStatus(storyID, task.StoryDeployed); err != nil {
+ p.logger.Error("triggerStoryDeploy: failed to update story status", "storyID", storyID, "error", err)
+ return
+ }
+ p.logger.Info("story transitioned to DEPLOYED", "storyID", storyID)
+ go p.createValidationTask(ctx, storyID)
+}
+
+// createValidationTask creates a validation subtask from the story's ValidationJSON
+// and transitions the story to VALIDATING.
+func (p *Pool) createValidationTask(ctx context.Context, storyID string) {
+ story, err := p.store.GetStory(storyID)
+ if err != nil {
+ p.logger.Error("createValidationTask: failed to get story", "storyID", storyID, "error", err)
+ return
+ }
+ if story.ValidationJSON == "" {
+ p.logger.Warn("createValidationTask: story has no ValidationJSON, skipping", "storyID", storyID)
+ return
+ }
+
+ var spec map[string]interface{}
+ if err := json.Unmarshal([]byte(story.ValidationJSON), &spec); err != nil {
+ p.logger.Error("createValidationTask: failed to parse ValidationJSON", "storyID", storyID, "error", err)
+ return
+ }
+
+ instructions := fmt.Sprintf("Validate the deployment for story %q.\n\nValidation spec:\n%s", story.Name, story.ValidationJSON)
+
+ now := time.Now().UTC()
+ vtask := &task.Task{
+ ID: uuid.New().String(),
+ Name: fmt.Sprintf("validation: %s", story.Name),
+ StoryID: storyID,
+ State: task.StateQueued,
+ Agent: task.AgentConfig{Type: "claude", Instructions: instructions},
+ Tags: []string{},
+ DependsOn: []string{},
+ CreatedAt: now,
+ UpdatedAt: now,
+ }
+
+ if err := p.store.CreateTask(vtask); err != nil {
+ p.logger.Error("createValidationTask: failed to create task", "storyID", storyID, "error", err)
+ return
+ }
+ if err := p.store.UpdateStoryStatus(storyID, task.StoryValidating); err != nil {
+ p.logger.Error("createValidationTask: failed to update story status", "storyID", storyID, "error", err)
+ return
+ }
+ p.logger.Info("validation task created and story transitioned to VALIDATING", "storyID", storyID, "taskID", vtask.ID)
+ p.Submit(ctx, vtask) //nolint:errcheck
+}
+
+// checkValidationResult inspects a completed validation task and transitions
+// the story to REVIEW_READY or NEEDS_FIX accordingly.
+func (p *Pool) checkValidationResult(ctx context.Context, storyID string, taskState task.State, errorMsg string) {
+ if taskState == task.StateCompleted {
+ if err := p.store.UpdateStoryStatus(storyID, task.StoryReviewReady); err != nil {
+ p.logger.Error("checkValidationResult: failed to update story status", "storyID", storyID, "error", err)
+ return
+ }
+ p.logger.Info("story transitioned to REVIEW_READY", "storyID", storyID)
+ } else {
+ if err := p.store.UpdateStoryStatus(storyID, task.StoryNeedsFix); err != nil {
+ p.logger.Error("checkValidationResult: failed to update story status", "storyID", storyID, "error", err)
+ return
+ }
+ p.logger.Info("story transitioned to NEEDS_FIX", "storyID", storyID, "error", errorMsg)
+ }
+}
+
// ActiveCount returns the number of currently running tasks.
func (p *Pool) ActiveCount() int {
p.mu.Lock()
@@ -395,6 +805,34 @@ func (p *Pool) ActiveCount() int {
return p.active
}
+// AgentStatusInfo holds the current state of a single agent.
+type AgentStatusInfo struct {
+ Agent string `json:"agent"`
+ ActiveTasks int `json:"active_tasks"`
+ RateLimited bool `json:"rate_limited"`
+ Until *time.Time `json:"until,omitempty"`
+}
+
+// AgentStatuses returns the current status of all registered agents.
+func (p *Pool) AgentStatuses() []AgentStatusInfo {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ now := time.Now()
+ var out []AgentStatusInfo
+ for agent := range p.runners {
+ info := AgentStatusInfo{
+ Agent: agent,
+ ActiveTasks: p.activePerAgent[agent],
+ }
+ if deadline, ok := p.rateLimited[agent]; ok && now.Before(deadline) {
+ info.RateLimited = true
+ info.Until = &deadline
+ }
+ out = append(out, info)
+ }
+ return out
+}
+
// pickAgent selects the best agent from the given SystemStatus using explicit
// load balancing: prefer the available (non-rate-limited) agent with the fewest
// active tasks. If all agents are rate-limited, fall back to fewest active.
@@ -436,6 +874,18 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
activeTasks[agent] = p.activePerAgent[agent]
if deadline, ok := p.rateLimited[agent]; ok && now.After(deadline) {
delete(p.rateLimited, agent)
+ agentName := agent
+ go func() {
+ ev := storage.AgentEvent{
+ ID: uuid.New().String(),
+ Agent: agentName,
+ Event: "available",
+ Timestamp: time.Now(),
+ }
+ if recErr := p.store.RecordAgentEvent(ev); recErr != nil {
+ p.logger.Warn("failed to record agent available event", "error", recErr)
+ }
+ }()
}
rateLimited[agent] = now.Before(p.rateLimited[agent])
}
@@ -479,9 +929,58 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
agentType = "claude"
}
+ // Check dependencies before taking the per-agent slot to avoid deadlock:
+ // if a dependent task holds the slot while waiting for its dependency to run,
+ // the dependency can never start (maxPerAgent=1).
+ if len(t.DependsOn) > 0 {
+ ready, depErr := p.checkDepsReady(t)
+ if depErr != nil {
+ // A dependency hit a terminal failure — cancel this task immediately.
+ now := time.Now().UTC()
+ exec := &storage.Execution{
+ ID: uuid.New().String(),
+ TaskID: t.ID,
+ StartTime: now,
+ EndTime: now,
+ Status: "CANCELLED",
+ ErrorMsg: depErr.Error(),
+ }
+ if createErr := p.store.CreateExecution(exec); createErr != nil {
+ p.logger.Error("failed to create execution record", "error", createErr)
+ }
+ if err := p.store.UpdateTaskState(t.ID, task.StateCancelled); err != nil {
+ p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateCancelled, "error", err)
+ }
+ p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: depErr}
+ return
+ }
+ if !ready {
+ // Dependencies not yet done — requeue without holding the slot.
+ time.AfterFunc(p.requeueDelay, func() { p.workCh <- workItem{ctx: ctx, task: t} })
+ return
+ }
+ }
p.mu.Lock()
+
+ if p.activePerAgent[agentType] >= p.maxPerAgent {
+ p.mu.Unlock()
+ time.AfterFunc(p.requeueDelay, func() { p.workCh <- workItem{ctx: ctx, task: t} })
+ return
+ }
if deadline, ok := p.rateLimited[agentType]; ok && time.Now().After(deadline) {
delete(p.rateLimited, agentType)
+ agentName := agentType
+ go func() {
+ ev := storage.AgentEvent{
+ ID: uuid.New().String(),
+ Agent: agentName,
+ Event: "available",
+ Timestamp: time.Now(),
+ }
+ if recErr := p.store.RecordAgentEvent(ev); recErr != nil {
+ p.logger.Warn("failed to record agent available event", "error", recErr)
+ }
+ }()
}
p.activePerAgent[agentType]++
p.mu.Unlock()
@@ -512,30 +1011,6 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
return
}
- // Wait for all dependencies to complete before starting execution.
- if len(t.DependsOn) > 0 {
- if err := p.waitForDependencies(ctx, t); err != nil {
- now := time.Now().UTC()
- exec := &storage.Execution{
- ID: uuid.New().String(),
- TaskID: t.ID,
- StartTime: now,
- EndTime: now,
- Status: "FAILED",
- ErrorMsg: err.Error(),
- }
- if createErr := p.store.CreateExecution(exec); createErr != nil {
- p.logger.Error("failed to create execution record", "error", createErr)
- }
- if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil {
- p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateFailed, "error", err)
- }
- p.decActiveAgent(agentType, &cleaned)
- p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err}
- return
- }
- }
-
execID := uuid.New().String()
exec := &storage.Execution{
ID: execID,
@@ -554,12 +1029,13 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
}
}
- // Record execution start.
- if err := p.store.CreateExecution(exec); err != nil {
+ // Record execution start atomically with the RUNNING state transition.
+ if err := p.store.CreateExecutionAndSetRunning(exec); err != nil {
p.logger.Error("failed to create execution record", "error", err)
}
- if err := p.store.UpdateTaskState(t.ID, task.StateRunning); err != nil {
- p.logger.Error("failed to update task state", "error", err)
+ select {
+ case p.startedCh <- t.ID:
+ default:
}
// Apply task timeout and register cancel so callers can stop this task.
@@ -583,6 +1059,19 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
priorExecs, priorErr := p.store.ListExecutions(t.ID)
t = withFailureHistory(t, priorExecs, priorErr)
+ // Populate RepositoryURL from Project registry if missing (ADR-007).
+ if t.RepositoryURL == "" && t.Project != "" {
+ if proj, err := p.store.GetProject(t.Project); err == nil && proj.RemoteURL != "" {
+ t.RepositoryURL = proj.RemoteURL
+ }
+ }
+ // Populate BranchName from Story if missing (ADR-007).
+ if t.BranchName == "" && t.StoryID != "" {
+ if story, err := p.store.GetStory(t.StoryID); err == nil && story.BranchName != "" {
+ t.BranchName = story.BranchName
+ }
+ }
+
// Run the task.
err = runner.Run(ctx, t, exec)
exec.EndTime = time.Now().UTC()
@@ -650,18 +1139,31 @@ func (p *Pool) RecoverStaleQueued(ctx context.Context) {
}
}
-// RecoverStaleBlocked promotes any BLOCKED parent task to READY when all of its
-// subtasks are already COMPLETED. This handles the case where the server was
-// restarted after subtasks finished but before maybeUnblockParent could fire.
+// RecoverStaleBlocked promotes any BLOCKED or QUEUED parent task to READY when
+// all of its subtasks are already COMPLETED. This handles the case where the
+// server was restarted after subtasks finished but before maybeUnblockParent
+// could fire, and also the case where story approval pre-created subtasks
+// without ever running the parent task.
// Call this once on server startup, after RecoverStaleRunning and RecoverStaleQueued.
func (p *Pool) RecoverStaleBlocked() {
- tasks, err := p.store.ListTasks(storage.TaskFilter{State: task.StateBlocked})
- if err != nil {
- p.logger.Error("RecoverStaleBlocked: list tasks", "error", err)
- return
- }
- for _, t := range tasks {
- p.maybeUnblockParent(t.ID)
+ ctx := context.Background()
+ for _, state := range []task.State{task.StateBlocked, task.StateQueued} {
+ tasks, err := p.store.ListTasks(storage.TaskFilter{State: state})
+ if err != nil {
+ p.logger.Error("RecoverStaleBlocked: list tasks", "error", err, "state", state)
+ continue
+ }
+ for _, t := range tasks {
+ if t.ParentTaskID != "" {
+ continue // only promote actual parents
+ }
+ before := t.State
+ p.maybeUnblockParent(t.ID)
+ // If the parent was promoted, check story completion.
+ if after, err := p.store.GetTask(t.ID); err == nil && after.State != before && t.StoryID != "" {
+ p.checkStoryCompletion(ctx, t.StoryID)
+ }
+ }
}
}
@@ -673,6 +1175,32 @@ var terminalFailureStates = map[task.State]bool{
task.StateBudgetExceeded: true,
}
+// depDoneStates are task states that satisfy a DependsOn dependency.
+var depDoneStates = map[task.State]bool{
+ task.StateCompleted: true,
+ task.StateReady: true, // leaf tasks finish at READY
+}
+
+// checkDepsReady does a single synchronous check of t.DependsOn.
+// Returns (true, nil) if all deps are done, (false, nil) if any are still pending,
+// or (false, err) if a dep entered a terminal failure state.
+func (p *Pool) checkDepsReady(t *task.Task) (bool, error) {
+ for _, depID := range t.DependsOn {
+ dep, err := p.store.GetTask(depID)
+ if err != nil {
+ return false, fmt.Errorf("dependency %q not found: %w", depID, err)
+ }
+ if depDoneStates[dep.State] {
+ continue
+ }
+ if terminalFailureStates[dep.State] {
+ return false, fmt.Errorf("dependency %q ended in state %s", depID, dep.State)
+ }
+ return false, nil // still pending
+ }
+ return true, nil
+}
+
// withFailureHistory returns a shallow copy of t with prior failed execution
// error messages prepended to SystemPromptAppend so the agent knows what went
// wrong in previous attempts.
@@ -710,16 +1238,16 @@ func withFailureHistory(t *task.Task, execs []*storage.Execution, err error) *ta
return &copy
}
-// maybeUnblockParent transitions the parent task from BLOCKED to READY if all
-// of its subtasks are in the COMPLETED state. If any subtask is not COMPLETED
-// (including FAILED, CANCELLED, RUNNING, etc.) the parent stays BLOCKED.
+// maybeUnblockParent transitions the parent task to READY if all of its subtasks
+// are in the COMPLETED state. Handles both BLOCKED parents (ran, created subtasks,
+// paused) and QUEUED parents (story approval created subtasks without running parent).
func (p *Pool) maybeUnblockParent(parentID string) {
parent, err := p.store.GetTask(parentID)
if err != nil {
p.logger.Error("maybeUnblockParent: get parent", "parentID", parentID, "error", err)
return
}
- if parent.State != task.StateBlocked {
+ if parent.State != task.StateBlocked && parent.State != task.StateQueued {
return
}
subtasks, err := p.store.ListSubtasks(parentID)
@@ -727,6 +1255,11 @@ func (p *Pool) maybeUnblockParent(parentID string) {
p.logger.Error("maybeUnblockParent: list subtasks", "parentID", parentID, "error", err)
return
}
+ // A task with no subtasks was never blocked by subtask delegation — don't promote it.
+ // This prevents incorrectly promoting leaf tasks that are stuck in QUEUED to READY.
+ if len(subtasks) == 0 {
+ return
+ }
for _, sub := range subtasks {
if sub.State != task.StateCompleted {
return
@@ -747,7 +1280,7 @@ func (p *Pool) waitForDependencies(ctx context.Context, t *task.Task) error {
if err != nil {
return fmt.Errorf("dependency %q not found: %w", depID, err)
}
- if dep.State == task.StateCompleted {
+ if depDoneStates[dep.State] {
continue
}
if terminalFailureStates[dep.State] {