package executor import ( "context" "encoding/json" "errors" "fmt" "log/slog" "os/exec" "path/filepath" "strings" "sync" "time" "github.com/thepeterstone/claudomator/internal/storage" "github.com/thepeterstone/claudomator/internal/task" "github.com/google/uuid" ) // Store is the subset of storage.DB methods used by the Pool. // Defining it as an interface allows test doubles to be injected. type Store interface { GetTask(id string) (*task.Task, error) ListTasks(filter storage.TaskFilter) ([]*task.Task, error) ListSubtasks(parentID string) ([]*task.Task, error) ListExecutions(taskID string) ([]*storage.Execution, error) CreateExecution(e *storage.Execution) error UpdateExecution(e *storage.Execution) error UpdateTaskState(id string, newState task.State) error UpdateTaskQuestion(taskID, questionJSON string) error UpdateTaskSummary(taskID, summary string) error AppendTaskInteraction(taskID string, interaction task.Interaction) error UpdateTaskAgent(id string, agent task.AgentConfig) error UpdateExecutionChangestats(execID string, stats *task.Changestats) error RecordAgentEvent(e storage.AgentEvent) error GetProject(id string) (*task.Project, error) GetStory(id string) (*task.Story, error) ListTasksByStory(storyID string) ([]*task.Task, error) UpdateStoryStatus(id string, status task.StoryState) error CreateTask(t *task.Task) error } // LogPather is an optional interface runners can implement to provide the log // directory for an execution before it starts. The pool uses this to persist // log paths at CreateExecution time rather than waiting until execution ends. type LogPather interface { ExecLogDir(execID string) string } // Runner executes a single task and returns the result. type Runner interface { Run(ctx context.Context, t *task.Task, exec *storage.Execution) error } // workItem is an entry in the pool's internal work queue. type workItem struct { ctx context.Context task *task.Task exec *storage.Execution // non-nil for resume submissions } // Pool manages a bounded set of concurrent task workers. type Pool struct { maxConcurrent int maxPerAgent int runners map[string]Runner store Store logger *slog.Logger depPollInterval time.Duration // how often waitForDependencies polls; defaults to 5s requeueDelay time.Duration // how long to wait before requeuing a blocked-per-agent task; defaults to 30s mu sync.Mutex active int activePerAgent map[string]int rateLimited map[string]time.Time // agentType -> until cancels map[string]context.CancelFunc // taskID → cancel consecutiveFailures map[string]int // agentType -> count drained map[string]bool // agentType -> true if halted pending human ack resultCh chan *Result workCh chan workItem // internal bounded queue; Submit enqueues here doneCh chan struct{} // signals when a worker slot is freed Questions *QuestionRegistry Classifier *Classifier } // Result is emitted when a task execution completes. type Result struct { TaskID string Execution *storage.Execution Err error } func NewPool(maxConcurrent int, runners map[string]Runner, store Store, logger *slog.Logger) *Pool { if maxConcurrent < 1 { maxConcurrent = 1 } p := &Pool{ maxConcurrent: maxConcurrent, maxPerAgent: 1, runners: runners, store: store, logger: logger, depPollInterval: 5 * time.Second, requeueDelay: 30 * time.Second, activePerAgent: make(map[string]int), rateLimited: make(map[string]time.Time), cancels: make(map[string]context.CancelFunc), consecutiveFailures: make(map[string]int), drained: make(map[string]bool), resultCh: make(chan *Result, maxConcurrent*2), workCh: make(chan workItem, maxConcurrent*10+100), doneCh: make(chan struct{}, maxConcurrent), Questions: NewQuestionRegistry(), } go p.dispatch() return p } // dispatch is a long-running goroutine that reads from the internal work queue // and launches goroutines as soon as a pool slot is available. This prevents // tasks from being rejected when the pool is temporarily at capacity. func (p *Pool) dispatch() { for item := range p.workCh { for { p.mu.Lock() if p.active < p.maxConcurrent { p.active++ p.mu.Unlock() if item.exec != nil { go p.executeResume(item.ctx, item.task, item.exec) } else { go p.execute(item.ctx, item.task) } break } p.mu.Unlock() <-p.doneCh // wait for a worker to finish } } } // Submit enqueues a task for execution. Returns an error only if the internal // work queue is full. When the pool is at capacity the task is buffered and // dispatched as soon as a slot becomes available. func (p *Pool) Submit(ctx context.Context, t *task.Task) error { select { case p.workCh <- workItem{ctx: ctx, task: t}: return nil default: return fmt.Errorf("executor work queue full (capacity %d)", cap(p.workCh)) } } // Results returns the channel for reading execution results. func (p *Pool) Results() <-chan *Result { return p.resultCh } // Cancel requests cancellation of a running task. Returns false if the task // is not currently running in this pool. func (p *Pool) Cancel(taskID string) bool { p.mu.Lock() cancel, ok := p.cancels[taskID] p.mu.Unlock() if !ok { return false } cancel() return true } // resumablePoolStates are the task states that may be submitted for session resume. var resumablePoolStates = map[task.State]bool{ task.StateBlocked: true, task.StateTimedOut: true, task.StateCancelled: true, task.StateFailed: true, task.StateBudgetExceeded: true, } // SubmitResume re-queues a blocked or interrupted task using the provided resume execution. // The execution must have ResumeSessionID and ResumeAnswer set. func (p *Pool) SubmitResume(ctx context.Context, t *task.Task, exec *storage.Execution) error { if !resumablePoolStates[t.State] { return fmt.Errorf("task %s must be in a resumable state to resume (current: %s)", t.ID, t.State) } if exec.ResumeSessionID == "" { return fmt.Errorf("resume execution for task %s must have a ResumeSessionID", t.ID) } select { case p.workCh <- workItem{ctx: ctx, task: t, exec: exec}: return nil default: return fmt.Errorf("executor work queue full (capacity %d)", cap(p.workCh)) } } func (p *Pool) getRunner(t *task.Task) (Runner, error) { agentType := t.Agent.Type if agentType == "" { agentType = "claude" // Default for backward compatibility } runner, ok := p.runners[agentType] if !ok { return nil, fmt.Errorf("unsupported agent type: %q", agentType) } return runner, nil } func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Execution) { agentType := t.Agent.Type if agentType == "" { agentType = "claude" } p.mu.Lock() p.activePerAgent[agentType]++ p.mu.Unlock() defer func() { p.mu.Lock() p.active-- p.mu.Unlock() select { case p.doneCh <- struct{}{}: default: } }() runner, err := p.getRunner(t) if err != nil { p.logger.Error("failed to get runner for resume", "error", err, "taskID", t.ID) p.mu.Lock() p.activePerAgent[agentType]-- if p.activePerAgent[agentType] == 0 { delete(p.activePerAgent, agentType) } p.mu.Unlock() p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} return } // Pre-populate log paths. if lp, ok := runner.(LogPather); ok { if logDir := lp.ExecLogDir(exec.ID); logDir != "" { exec.StdoutPath = filepath.Join(logDir, "stdout.log") exec.StderrPath = filepath.Join(logDir, "stderr.log") exec.ArtifactDir = logDir } } exec.StartTime = time.Now().UTC() exec.Status = "RUNNING" if err := p.store.CreateExecution(exec); err != nil { p.logger.Error("failed to create resume execution record", "error", err) } if err := p.store.UpdateTaskState(t.ID, task.StateRunning); err != nil { p.logger.Error("failed to update task state", "error", err) } var cancel context.CancelFunc if t.Timeout.Duration > 0 { ctx, cancel = context.WithTimeout(ctx, t.Timeout.Duration) } else { ctx, cancel = context.WithCancel(ctx) } p.mu.Lock() p.cancels[t.ID] = cancel p.mu.Unlock() defer func() { cancel() p.mu.Lock() delete(p.cancels, t.ID) p.mu.Unlock() }() // Populate RepositoryURL from Project registry if missing (ADR-007). if t.RepositoryURL == "" && t.Project != "" { if proj, err := p.store.GetProject(t.Project); err == nil && proj.RemoteURL != "" { t.RepositoryURL = proj.RemoteURL } } // Populate BranchName from Story if missing (ADR-007). if t.BranchName == "" && t.StoryID != "" { if story, err := p.store.GetStory(t.StoryID); err == nil && story.BranchName != "" { t.BranchName = story.BranchName } } err = runner.Run(ctx, t, exec) exec.EndTime = time.Now().UTC() p.handleRunResult(ctx, t, exec, err, agentType) } // handleRunResult applies the shared post-run error-classification and // state-update logic used by both execute() and executeResume(). It sets // exec.Status and exec.ErrorMsg, updates storage, and emits the result to // resultCh. The caller must set exec.EndTime before calling. func (p *Pool) handleRunResult(ctx context.Context, t *task.Task, exec *storage.Execution, err error, agentType string) { if err != nil { if isRateLimitError(err) || isQuotaExhausted(err) { p.mu.Lock() retryAfter := parseRetryAfter(err.Error()) reason := "transient" if isQuotaExhausted(err) { reason = "quota" if retryAfter == 0 { retryAfter = 5 * time.Hour } } else if retryAfter == 0 { retryAfter = 1 * time.Minute } until := time.Now().Add(retryAfter) p.rateLimited[agentType] = until p.logger.Info("agent rate limited", "agent", agentType, "retryAfter", retryAfter, "quotaExhausted", isQuotaExhausted(err)) p.mu.Unlock() go func() { ev := storage.AgentEvent{ ID: uuid.New().String(), Agent: agentType, Event: "rate_limited", Timestamp: time.Now(), Until: &until, Reason: reason, } if recErr := p.store.RecordAgentEvent(ev); recErr != nil { p.logger.Warn("failed to record agent event", "error", recErr) } }() } var blockedErr *BlockedError if errors.As(err, &blockedErr) { exec.Status = "BLOCKED" exec.SandboxDir = blockedErr.SandboxDir // preserve so resume runs in same dir if err := p.store.UpdateTaskState(t.ID, task.StateBlocked); err != nil { p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateBlocked, "error", err) } if err := p.store.UpdateTaskQuestion(t.ID, blockedErr.QuestionJSON); err != nil { p.logger.Error("failed to update task question", "taskID", t.ID, "error", err) } } else if ctx.Err() == context.DeadlineExceeded { exec.Status = "TIMED_OUT" exec.ErrorMsg = "execution timed out" if err := p.store.UpdateTaskState(t.ID, task.StateTimedOut); err != nil { p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateTimedOut, "error", err) } } else if ctx.Err() == context.Canceled { exec.Status = "CANCELLED" exec.ErrorMsg = "execution cancelled" if err := p.store.UpdateTaskState(t.ID, task.StateCancelled); err != nil { p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateCancelled, "error", err) } } else if isQuotaExhausted(err) { exec.Status = "BUDGET_EXCEEDED" exec.ErrorMsg = err.Error() if err := p.store.UpdateTaskState(t.ID, task.StateBudgetExceeded); err != nil { p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateBudgetExceeded, "error", err) } } else { exec.Status = "FAILED" exec.ErrorMsg = err.Error() if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil { p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateFailed, "error", err) } p.mu.Lock() p.consecutiveFailures[agentType]++ failures := p.consecutiveFailures[agentType] p.mu.Unlock() if failures >= 2 { p.mu.Lock() p.drained[agentType] = true p.mu.Unlock() p.logger.Warn("agent drained after consecutive failures", "agent", agentType, "failures", failures) questionJSON, _ := json.Marshal(map[string]string{ "question": fmt.Sprintf("Agent %q has failed %d times in a row (last error: %s). Acknowledge to resume.", agentType, failures, exec.ErrorMsg), "options": "acknowledge", }) if err := p.store.UpdateTaskQuestion(t.ID, string(questionJSON)); err != nil { p.logger.Error("failed to set drain question", "error", err) } } } if t.StoryID != "" && exec.Status == "FAILED" { storyID := t.StoryID errMsg := exec.ErrorMsg go func() { story, getErr := p.store.GetStory(storyID) if getErr != nil { return } if story.Status == task.StoryValidating { p.checkValidationResult(ctx, storyID, task.StateFailed, errMsg) } }() } } else { p.mu.Lock() p.consecutiveFailures[agentType] = 0 p.drained[agentType] = false p.mu.Unlock() if t.ParentTaskID == "" { subtasks, subErr := p.store.ListSubtasks(t.ID) if subErr != nil { p.logger.Error("failed to list subtasks", "taskID", t.ID, "error", subErr) } if subErr == nil && len(subtasks) > 0 { exec.Status = "BLOCKED" if err := p.store.UpdateTaskState(t.ID, task.StateBlocked); err != nil { p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateBlocked, "error", err) } } else { exec.Status = "READY" if err := p.store.UpdateTaskState(t.ID, task.StateReady); err != nil { p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateReady, "error", err) } } } else { exec.Status = "COMPLETED" if err := p.store.UpdateTaskState(t.ID, task.StateCompleted); err != nil { p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateCompleted, "error", err) } p.maybeUnblockParent(t.ParentTaskID) } if t.StoryID != "" { storyID := t.StoryID go func() { story, getErr := p.store.GetStory(storyID) if getErr != nil { p.logger.Error("handleRunResult: failed to get story", "storyID", storyID, "error", getErr) return } if story.Status == task.StoryValidating { p.checkValidationResult(ctx, storyID, task.StateCompleted, "") } else { p.checkStoryCompletion(ctx, storyID) } }() } } summary := exec.Summary if summary == "" && exec.StdoutPath != "" { summary = extractSummary(exec.StdoutPath) } if summary != "" { if summaryErr := p.store.UpdateTaskSummary(t.ID, summary); summaryErr != nil { p.logger.Error("failed to update task summary", "taskID", t.ID, "error", summaryErr) } } if exec.StdoutPath != "" { if cs := task.ParseChangestatFromFile(exec.StdoutPath); cs != nil { exec.Changestats = cs if csErr := p.store.UpdateExecutionChangestats(exec.ID, cs); csErr != nil { p.logger.Error("failed to store changestats", "execID", exec.ID, "error", csErr) } } } if updateErr := p.store.UpdateExecution(exec); updateErr != nil { p.logger.Error("failed to update execution", "error", updateErr) } p.mu.Lock() p.activePerAgent[agentType]-- if p.activePerAgent[agentType] == 0 { delete(p.activePerAgent, agentType) } p.mu.Unlock() p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} } // checkStoryCompletion checks whether all tasks in a story have reached a terminal // success state and transitions the story to SHIPPABLE if so. func (p *Pool) checkStoryCompletion(ctx context.Context, storyID string) { tasks, err := p.store.ListTasksByStory(storyID) if err != nil { p.logger.Error("checkStoryCompletion: failed to list tasks", "storyID", storyID, "error", err) return } if len(tasks) == 0 { return } for _, t := range tasks { if t.State != task.StateCompleted && t.State != task.StateReady { return // not all tasks done } } if err := p.store.UpdateStoryStatus(storyID, task.StoryShippable); err != nil { p.logger.Error("checkStoryCompletion: failed to update story status", "storyID", storyID, "error", err) return } p.logger.Info("story transitioned to SHIPPABLE", "storyID", storyID) go p.triggerStoryDeploy(ctx, storyID) } // triggerStoryDeploy runs the project deploy script for a SHIPPABLE story // and advances it to DEPLOYED on success. func (p *Pool) triggerStoryDeploy(ctx context.Context, storyID string) { story, err := p.store.GetStory(storyID) if err != nil { p.logger.Error("triggerStoryDeploy: failed to get story", "storyID", storyID, "error", err) return } if story.ProjectID == "" { return } proj, err := p.store.GetProject(story.ProjectID) if err != nil { p.logger.Error("triggerStoryDeploy: failed to get project", "storyID", storyID, "projectID", story.ProjectID, "error", err) return } if proj.DeployScript == "" { return } // Merge story branch to main before deploying (ADR-007). if story.BranchName != "" && proj.LocalPath != "" { mergeSteps := [][]string{ {"git", "-C", proj.LocalPath, "fetch", "origin"}, {"git", "-C", proj.LocalPath, "checkout", "main"}, {"git", "-C", proj.LocalPath, "merge", "--no-ff", story.BranchName, "-m", "Merge " + story.BranchName}, {"git", "-C", proj.LocalPath, "push", "origin", "main"}, } for _, args := range mergeSteps { if mergeOut, mergeErr := exec.CommandContext(ctx, args[0], args[1:]...).CombinedOutput(); mergeErr != nil { p.logger.Error("triggerStoryDeploy: merge failed", "cmd", args, "output", string(mergeOut), "error", mergeErr) return } } p.logger.Info("story branch merged to main", "storyID", storyID, "branch", story.BranchName) } out, err := exec.CommandContext(ctx, proj.DeployScript).CombinedOutput() if err != nil { p.logger.Error("triggerStoryDeploy: deploy script failed", "storyID", storyID, "script", proj.DeployScript, "output", string(out), "error", err) return } if err := p.store.UpdateStoryStatus(storyID, task.StoryDeployed); err != nil { p.logger.Error("triggerStoryDeploy: failed to update story status", "storyID", storyID, "error", err) return } p.logger.Info("story transitioned to DEPLOYED", "storyID", storyID) go p.createValidationTask(ctx, storyID) } // createValidationTask creates a validation subtask from the story's ValidationJSON // and transitions the story to VALIDATING. func (p *Pool) createValidationTask(ctx context.Context, storyID string) { story, err := p.store.GetStory(storyID) if err != nil { p.logger.Error("createValidationTask: failed to get story", "storyID", storyID, "error", err) return } if story.ValidationJSON == "" { p.logger.Warn("createValidationTask: story has no ValidationJSON, skipping", "storyID", storyID) return } var spec map[string]interface{} if err := json.Unmarshal([]byte(story.ValidationJSON), &spec); err != nil { p.logger.Error("createValidationTask: failed to parse ValidationJSON", "storyID", storyID, "error", err) return } instructions := fmt.Sprintf("Validate the deployment for story %q.\n\nValidation spec:\n%s", story.Name, story.ValidationJSON) now := time.Now().UTC() vtask := &task.Task{ ID: uuid.New().String(), Name: fmt.Sprintf("validation: %s", story.Name), StoryID: storyID, State: task.StateQueued, Agent: task.AgentConfig{Type: "claude", Instructions: instructions}, Tags: []string{}, DependsOn: []string{}, CreatedAt: now, UpdatedAt: now, } if err := p.store.CreateTask(vtask); err != nil { p.logger.Error("createValidationTask: failed to create task", "storyID", storyID, "error", err) return } if err := p.store.UpdateStoryStatus(storyID, task.StoryValidating); err != nil { p.logger.Error("createValidationTask: failed to update story status", "storyID", storyID, "error", err) return } p.logger.Info("validation task created and story transitioned to VALIDATING", "storyID", storyID, "taskID", vtask.ID) p.Submit(ctx, vtask) //nolint:errcheck } // checkValidationResult inspects a completed validation task and transitions // the story to REVIEW_READY or NEEDS_FIX accordingly. func (p *Pool) checkValidationResult(ctx context.Context, storyID string, taskState task.State, errorMsg string) { if taskState == task.StateCompleted { if err := p.store.UpdateStoryStatus(storyID, task.StoryReviewReady); err != nil { p.logger.Error("checkValidationResult: failed to update story status", "storyID", storyID, "error", err) return } p.logger.Info("story transitioned to REVIEW_READY", "storyID", storyID) } else { if err := p.store.UpdateStoryStatus(storyID, task.StoryNeedsFix); err != nil { p.logger.Error("checkValidationResult: failed to update story status", "storyID", storyID, "error", err) return } p.logger.Info("story transitioned to NEEDS_FIX", "storyID", storyID, "error", errorMsg) } } // UndrainingAgent resets the drain state and failure counter for the given agent type. func (p *Pool) UndrainingAgent(agentType string) { p.mu.Lock() defer p.mu.Unlock() p.drained[agentType] = false p.consecutiveFailures[agentType] = 0 } // ActiveCount returns the number of currently running tasks. func (p *Pool) ActiveCount() int { p.mu.Lock() defer p.mu.Unlock() return p.active } // AgentStatusInfo holds the current state of a single agent. type AgentStatusInfo struct { Agent string `json:"agent"` ActiveTasks int `json:"active_tasks"` RateLimited bool `json:"rate_limited"` Until *time.Time `json:"until,omitempty"` Drained bool `json:"drained"` } // AgentStatuses returns the current status of all registered agents. func (p *Pool) AgentStatuses() []AgentStatusInfo { p.mu.Lock() defer p.mu.Unlock() now := time.Now() var out []AgentStatusInfo for agent := range p.runners { info := AgentStatusInfo{ Agent: agent, ActiveTasks: p.activePerAgent[agent], Drained: p.drained[agent], } if deadline, ok := p.rateLimited[agent]; ok && now.Before(deadline) { info.RateLimited = true info.Until = &deadline } out = append(out, info) } return out } // pickAgent selects the best agent from the given SystemStatus using explicit // load balancing: prefer the available (non-rate-limited) agent with the fewest // active tasks. If all agents are rate-limited, fall back to fewest active. func pickAgent(status SystemStatus) string { best := "" bestActive := -1 // First pass: only consider non-rate-limited agents. for agent, active := range status.ActiveTasks { if status.RateLimited[agent] { continue } if bestActive == -1 || active < bestActive || (active == bestActive && agent < best) { best = agent bestActive = active } } if best != "" { return best } // Fallback: all rate-limited — pick least active anyway. for agent, active := range status.ActiveTasks { if bestActive == -1 || active < bestActive || (active == bestActive && agent < best) { best = agent bestActive = active } } return best } func (p *Pool) execute(ctx context.Context, t *task.Task) { // 1. Load-balanced agent selection + model classification. p.mu.Lock() activeTasks := make(map[string]int) rateLimited := make(map[string]bool) now := time.Now() for agent := range p.runners { activeTasks[agent] = p.activePerAgent[agent] if deadline, ok := p.rateLimited[agent]; ok && now.After(deadline) { delete(p.rateLimited, agent) agentName := agent go func() { ev := storage.AgentEvent{ ID: uuid.New().String(), Agent: agentName, Event: "available", Timestamp: time.Now(), } if recErr := p.store.RecordAgentEvent(ev); recErr != nil { p.logger.Warn("failed to record agent available event", "error", recErr) } }() } rateLimited[agent] = now.Before(p.rateLimited[agent]) } status := SystemStatus{ ActiveTasks: activeTasks, RateLimited: rateLimited, } p.mu.Unlock() // If a specific agent is already requested, skip selection and classification. skipClassification := t.Agent.Type == "claude" || t.Agent.Type == "gemini" if !skipClassification { // Deterministically pick the agent with fewest active tasks. selectedAgent := pickAgent(status) if selectedAgent != "" { t.Agent.Type = selectedAgent } if p.Classifier != nil { cls, err := p.Classifier.Classify(ctx, t.Name, t.Agent.Instructions, status, t.Agent.Type) if err == nil { p.logger.Info("task classified", "taskID", t.ID, "agent", t.Agent.Type, "model", cls.Model, "reason", cls.Reason) t.Agent.Model = cls.Model } else { p.logger.Error("classification failed", "error", err, "taskID", t.ID) } } } // Persist the assigned agent (and model) to the database before running. if err := p.store.UpdateTaskAgent(t.ID, t.Agent); err != nil { p.logger.Error("failed to persist agent config", "error", err, "taskID", t.ID) } agentType := t.Agent.Type if agentType == "" { agentType = "claude" } defer func() { p.mu.Lock() p.active-- p.mu.Unlock() select { case p.doneCh <- struct{}{}: default: } }() p.mu.Lock() if p.drained[agentType] { p.mu.Unlock() time.AfterFunc(2*time.Minute, func() { p.workCh <- workItem{ctx: ctx, task: t} }) return } if p.activePerAgent[agentType] >= p.maxPerAgent { p.mu.Unlock() time.AfterFunc(p.requeueDelay, func() { p.workCh <- workItem{ctx: ctx, task: t} }) return } if deadline, ok := p.rateLimited[agentType]; ok && time.Now().After(deadline) { delete(p.rateLimited, agentType) agentName := agentType go func() { ev := storage.AgentEvent{ ID: uuid.New().String(), Agent: agentName, Event: "available", Timestamp: time.Now(), } if recErr := p.store.RecordAgentEvent(ev); recErr != nil { p.logger.Warn("failed to record agent available event", "error", recErr) } }() } p.activePerAgent[agentType]++ p.mu.Unlock() runner, err := p.getRunner(t) if err != nil { p.logger.Error("failed to get runner", "error", err, "taskID", t.ID) now := time.Now().UTC() exec := &storage.Execution{ ID: uuid.New().String(), TaskID: t.ID, StartTime: now, EndTime: now, Status: "FAILED", ErrorMsg: err.Error(), } if createErr := p.store.CreateExecution(exec); createErr != nil { p.logger.Error("failed to create execution record", "error", createErr) } if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil { p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateFailed, "error", err) } p.mu.Lock() p.activePerAgent[agentType]-- if p.activePerAgent[agentType] == 0 { delete(p.activePerAgent, agentType) } p.mu.Unlock() p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} return } // Wait for all dependencies to complete before starting execution. if len(t.DependsOn) > 0 { if err := p.waitForDependencies(ctx, t); err != nil { now := time.Now().UTC() exec := &storage.Execution{ ID: uuid.New().String(), TaskID: t.ID, StartTime: now, EndTime: now, Status: "FAILED", ErrorMsg: err.Error(), } if createErr := p.store.CreateExecution(exec); createErr != nil { p.logger.Error("failed to create execution record", "error", createErr) } if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil { p.logger.Error("failed to update task state", "taskID", t.ID, "state", task.StateFailed, "error", err) } p.mu.Lock() p.activePerAgent[agentType]-- if p.activePerAgent[agentType] == 0 { delete(p.activePerAgent, agentType) } p.mu.Unlock() p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} return } } execID := uuid.New().String() exec := &storage.Execution{ ID: execID, TaskID: t.ID, StartTime: time.Now().UTC(), Status: "RUNNING", } // Pre-populate log paths so they're available in the DB immediately — // before the subprocess starts — enabling live tailing and debugging. if lp, ok := runner.(LogPather); ok { if logDir := lp.ExecLogDir(execID); logDir != "" { exec.StdoutPath = filepath.Join(logDir, "stdout.log") exec.StderrPath = filepath.Join(logDir, "stderr.log") exec.ArtifactDir = logDir } } // Record execution start. if err := p.store.CreateExecution(exec); err != nil { p.logger.Error("failed to create execution record", "error", err) } if err := p.store.UpdateTaskState(t.ID, task.StateRunning); err != nil { p.logger.Error("failed to update task state", "error", err) } // Apply task timeout and register cancel so callers can stop this task. var cancel context.CancelFunc if t.Timeout.Duration > 0 { ctx, cancel = context.WithTimeout(ctx, t.Timeout.Duration) } else { ctx, cancel = context.WithCancel(ctx) } p.mu.Lock() p.cancels[t.ID] = cancel p.mu.Unlock() defer func() { cancel() p.mu.Lock() delete(p.cancels, t.ID) p.mu.Unlock() }() // Inject prior failure history so the agent knows what went wrong before. priorExecs, priorErr := p.store.ListExecutions(t.ID) t = withFailureHistory(t, priorExecs, priorErr) // Populate RepositoryURL from Project registry if missing (ADR-007). if t.RepositoryURL == "" && t.Project != "" { if proj, err := p.store.GetProject(t.Project); err == nil && proj.RemoteURL != "" { t.RepositoryURL = proj.RemoteURL } } // Populate BranchName from Story if missing (ADR-007). if t.BranchName == "" && t.StoryID != "" { if story, err := p.store.GetStory(t.StoryID); err == nil && story.BranchName != "" { t.BranchName = story.BranchName } } // Run the task. err = runner.Run(ctx, t, exec) exec.EndTime = time.Now().UTC() p.handleRunResult(ctx, t, exec, err, agentType) } // RecoverStaleRunning marks any tasks stuck in RUNNING state (from a previous // server crash or restart) as FAILED, then immediately re-queues them for // retry. It also closes any open RUNNING execution records for those tasks. // Call this once on server startup. func (p *Pool) RecoverStaleRunning(ctx context.Context) { tasks, err := p.store.ListTasks(storage.TaskFilter{State: task.StateRunning}) if err != nil { p.logger.Error("RecoverStaleRunning: list tasks", "error", err) return } for _, t := range tasks { p.logger.Warn("recovering stale RUNNING task", "taskID", t.ID, "name", t.Name) // Close any open execution records. execs, err := p.store.ListExecutions(t.ID) if err == nil { for _, e := range execs { if e.Status == "RUNNING" { e.Status = "FAILED" e.ErrorMsg = "server restarted while task was running" e.EndTime = time.Now().UTC() if updateErr := p.store.UpdateExecution(e); updateErr != nil { p.logger.Error("RecoverStaleRunning: update execution", "error", updateErr, "execID", e.ID) } } } } if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil { p.logger.Error("RecoverStaleRunning: update task state", "error", err, "taskID", t.ID) continue } // Re-queue so the task retries automatically. Submit expects QUEUED state. if err := p.store.UpdateTaskState(t.ID, task.StateQueued); err != nil { p.logger.Error("RecoverStaleRunning: set queued", "error", err, "taskID", t.ID) continue } t.State = task.StateQueued if err := p.Submit(ctx, t); err != nil { p.logger.Error("RecoverStaleRunning: re-queue", "error", err, "taskID", t.ID) } } } // RecoverStaleQueued re-submits any tasks that are stuck in QUEUED state from // a previous server instance. Call this once on server startup, after // RecoverStaleRunning. func (p *Pool) RecoverStaleQueued(ctx context.Context) { tasks, err := p.store.ListTasks(storage.TaskFilter{State: task.StateQueued}) if err != nil { p.logger.Error("RecoverStaleQueued: list tasks", "error", err) return } for _, t := range tasks { p.logger.Info("resubmitting stale QUEUED task", "taskID", t.ID, "name", t.Name) if err := p.Submit(ctx, t); err != nil { p.logger.Error("RecoverStaleQueued: submit", "error", err, "taskID", t.ID) } } } // RecoverStaleBlocked promotes any BLOCKED parent task to READY when all of its // subtasks are already COMPLETED. This handles the case where the server was // restarted after subtasks finished but before maybeUnblockParent could fire. // Call this once on server startup, after RecoverStaleRunning and RecoverStaleQueued. func (p *Pool) RecoverStaleBlocked() { tasks, err := p.store.ListTasks(storage.TaskFilter{State: task.StateBlocked}) if err != nil { p.logger.Error("RecoverStaleBlocked: list tasks", "error", err) return } for _, t := range tasks { p.maybeUnblockParent(t.ID) } } // terminalFailureStates are dependency states that cause the waiting task to fail immediately. var terminalFailureStates = map[task.State]bool{ task.StateFailed: true, task.StateTimedOut: true, task.StateCancelled: true, task.StateBudgetExceeded: true, } // withFailureHistory returns a shallow copy of t with prior failed execution // error messages prepended to SystemPromptAppend so the agent knows what went // wrong in previous attempts. func withFailureHistory(t *task.Task, execs []*storage.Execution, err error) *task.Task { if err != nil || len(execs) == 0 { return t } var failures []storage.Execution for _, e := range execs { if (e.Status == "FAILED" || e.Status == "TIMED_OUT") && e.ErrorMsg != "" { failures = append(failures, *e) } } if len(failures) == 0 { return t } var sb strings.Builder sb.WriteString("## Prior Attempt History\n\n") sb.WriteString("This task has failed before. Do not repeat the same mistakes.\n\n") for i, f := range failures { fmt.Fprintf(&sb, "**Attempt %d** (%s) — %s:\n%s\n\n", i+1, f.StartTime.Format("2006-01-02 15:04 UTC"), f.Status, f.ErrorMsg) } sb.WriteString("---\n\n") copy := *t copy.Agent = t.Agent if copy.Agent.SystemPromptAppend != "" { copy.Agent.SystemPromptAppend = sb.String() + copy.Agent.SystemPromptAppend } else { copy.Agent.SystemPromptAppend = sb.String() } return © } // maybeUnblockParent transitions the parent task from BLOCKED to READY if all // of its subtasks are in the COMPLETED state. If any subtask is not COMPLETED // (including FAILED, CANCELLED, RUNNING, etc.) the parent stays BLOCKED. func (p *Pool) maybeUnblockParent(parentID string) { parent, err := p.store.GetTask(parentID) if err != nil { p.logger.Error("maybeUnblockParent: get parent", "parentID", parentID, "error", err) return } if parent.State != task.StateBlocked { return } subtasks, err := p.store.ListSubtasks(parentID) if err != nil { p.logger.Error("maybeUnblockParent: list subtasks", "parentID", parentID, "error", err) return } for _, sub := range subtasks { if sub.State != task.StateCompleted { return } } if err := p.store.UpdateTaskState(parentID, task.StateReady); err != nil { p.logger.Error("maybeUnblockParent: update parent state", "parentID", parentID, "error", err) } } // waitForDependencies polls storage until all tasks in t.DependsOn reach COMPLETED, // or until a dependency enters a terminal failure state or the context is cancelled. func (p *Pool) waitForDependencies(ctx context.Context, t *task.Task) error { for { allDone := true for _, depID := range t.DependsOn { dep, err := p.store.GetTask(depID) if err != nil { return fmt.Errorf("dependency %q not found: %w", depID, err) } if dep.State == task.StateCompleted { continue } if terminalFailureStates[dep.State] { return fmt.Errorf("dependency %q ended in state %s", depID, dep.State) } allDone = false } if allDone { return nil } select { case <-ctx.Done(): return ctx.Err() case <-time.After(p.depPollInterval): } } }