package executor import ( "context" "errors" "fmt" "log/slog" "path/filepath" "sync" "time" "github.com/thepeterstone/claudomator/internal/storage" "github.com/thepeterstone/claudomator/internal/task" "github.com/google/uuid" ) // LogPather is an optional interface runners can implement to provide the log // directory for an execution before it starts. The pool uses this to persist // log paths at CreateExecution time rather than waiting until execution ends. type LogPather interface { ExecLogDir(execID string) string } // Runner executes a single task and returns the result. type Runner interface { Run(ctx context.Context, t *task.Task, exec *storage.Execution) error } // Pool manages a bounded set of concurrent task workers. type Pool struct { maxConcurrent int runners map[string]Runner store *storage.DB logger *slog.Logger mu sync.Mutex active int cancels map[string]context.CancelFunc // taskID → cancel resultCh chan *Result Questions *QuestionRegistry } // Result is emitted when a task execution completes. type Result struct { TaskID string Execution *storage.Execution Err error } func NewPool(maxConcurrent int, runners map[string]Runner, store *storage.DB, logger *slog.Logger) *Pool { if maxConcurrent < 1 { maxConcurrent = 1 } return &Pool{ maxConcurrent: maxConcurrent, runners: runners, store: store, logger: logger, cancels: make(map[string]context.CancelFunc), resultCh: make(chan *Result, maxConcurrent*2), Questions: NewQuestionRegistry(), } } // Submit dispatches a task for execution. Blocks if pool is at capacity. func (p *Pool) Submit(ctx context.Context, t *task.Task) error { p.mu.Lock() if p.active >= p.maxConcurrent { active := p.active max := p.maxConcurrent p.mu.Unlock() return fmt.Errorf("executor pool at capacity (%d/%d)", active, max) } p.active++ p.mu.Unlock() go p.execute(ctx, t) return nil } // Results returns the channel for reading execution results. func (p *Pool) Results() <-chan *Result { return p.resultCh } // Cancel requests cancellation of a running task. Returns false if the task // is not currently running in this pool. func (p *Pool) Cancel(taskID string) bool { p.mu.Lock() cancel, ok := p.cancels[taskID] p.mu.Unlock() if !ok { return false } cancel() return true } // SubmitResume re-queues a blocked task using the provided resume execution. // The execution must have ResumeSessionID and ResumeAnswer set. func (p *Pool) SubmitResume(ctx context.Context, t *task.Task, exec *storage.Execution) error { p.mu.Lock() if p.active >= p.maxConcurrent { active := p.active max := p.maxConcurrent p.mu.Unlock() return fmt.Errorf("executor pool at capacity (%d/%d)", active, max) } p.active++ p.mu.Unlock() go p.executeResume(ctx, t, exec) return nil } func (p *Pool) getRunner(t *task.Task) (Runner, error) { agentType := t.Agent.Type if agentType == "" { agentType = "claude" // Default for backward compatibility } runner, ok := p.runners[agentType] if !ok { return nil, fmt.Errorf("unsupported agent type: %q", agentType) } return runner, nil } func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Execution) { defer func() { p.mu.Lock() p.active-- p.mu.Unlock() }() runner, err := p.getRunner(t) if err != nil { p.logger.Error("failed to get runner for resume", "error", err, "taskID", t.ID) p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} return } // Pre-populate log paths. if lp, ok := runner.(LogPather); ok { if logDir := lp.ExecLogDir(exec.ID); logDir != "" { exec.StdoutPath = filepath.Join(logDir, "stdout.log") exec.StderrPath = filepath.Join(logDir, "stderr.log") exec.ArtifactDir = logDir } } exec.StartTime = time.Now().UTC() exec.Status = "RUNNING" if err := p.store.CreateExecution(exec); err != nil { p.logger.Error("failed to create resume execution record", "error", err) } if err := p.store.UpdateTaskState(t.ID, task.StateRunning); err != nil { p.logger.Error("failed to update task state", "error", err) } var cancel context.CancelFunc if t.Timeout.Duration > 0 { ctx, cancel = context.WithTimeout(ctx, t.Timeout.Duration) } else { ctx, cancel = context.WithCancel(ctx) } defer cancel() err = runner.Run(ctx, t, exec) exec.EndTime = time.Now().UTC() if err != nil { var blockedErr *BlockedError if errors.As(err, &blockedErr) { exec.Status = "BLOCKED" p.store.UpdateTaskState(t.ID, task.StateBlocked) p.store.UpdateTaskQuestion(t.ID, blockedErr.QuestionJSON) } else if ctx.Err() == context.DeadlineExceeded { exec.Status = "TIMED_OUT" exec.ErrorMsg = "execution timed out" p.store.UpdateTaskState(t.ID, task.StateTimedOut) } else if ctx.Err() == context.Canceled { exec.Status = "CANCELLED" exec.ErrorMsg = "execution cancelled" p.store.UpdateTaskState(t.ID, task.StateCancelled) } else { exec.Status = "FAILED" exec.ErrorMsg = err.Error() p.store.UpdateTaskState(t.ID, task.StateFailed) } } else { if t.ParentTaskID == "" { exec.Status = "READY" p.store.UpdateTaskState(t.ID, task.StateReady) } else { exec.Status = "COMPLETED" p.store.UpdateTaskState(t.ID, task.StateCompleted) } } if updateErr := p.store.UpdateExecution(exec); updateErr != nil { p.logger.Error("failed to update resume execution", "error", updateErr) } p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} } // ActiveCount returns the number of currently running tasks. func (p *Pool) ActiveCount() int { p.mu.Lock() defer p.mu.Unlock() return p.active } func (p *Pool) execute(ctx context.Context, t *task.Task) { defer func() { p.mu.Lock() p.active-- p.mu.Unlock() }() runner, err := p.getRunner(t) if err != nil { p.logger.Error("failed to get runner", "error", err, "taskID", t.ID) now := time.Now().UTC() exec := &storage.Execution{ ID: uuid.New().String(), TaskID: t.ID, StartTime: now, EndTime: now, Status: "FAILED", ErrorMsg: err.Error(), } if createErr := p.store.CreateExecution(exec); createErr != nil { p.logger.Error("failed to create execution record", "error", createErr) } p.store.UpdateTaskState(t.ID, task.StateFailed) p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} return } // Wait for all dependencies to complete before starting execution. if len(t.DependsOn) > 0 { if err := p.waitForDependencies(ctx, t); err != nil { now := time.Now().UTC() exec := &storage.Execution{ ID: uuid.New().String(), TaskID: t.ID, StartTime: now, EndTime: now, Status: "FAILED", ErrorMsg: err.Error(), } if createErr := p.store.CreateExecution(exec); createErr != nil { p.logger.Error("failed to create execution record", "error", createErr) } p.store.UpdateTaskState(t.ID, task.StateFailed) p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} return } } execID := uuid.New().String() exec := &storage.Execution{ ID: execID, TaskID: t.ID, StartTime: time.Now().UTC(), Status: "RUNNING", } // Pre-populate log paths so they're available in the DB immediately — // before the subprocess starts — enabling live tailing and debugging. if lp, ok := runner.(LogPather); ok { if logDir := lp.ExecLogDir(execID); logDir != "" { exec.StdoutPath = filepath.Join(logDir, "stdout.log") exec.StderrPath = filepath.Join(logDir, "stderr.log") exec.ArtifactDir = logDir } } // Record execution start. if err := p.store.CreateExecution(exec); err != nil { p.logger.Error("failed to create execution record", "error", err) } if err := p.store.UpdateTaskState(t.ID, task.StateRunning); err != nil { p.logger.Error("failed to update task state", "error", err) } // Apply task timeout and register cancel so callers can stop this task. var cancel context.CancelFunc if t.Timeout.Duration > 0 { ctx, cancel = context.WithTimeout(ctx, t.Timeout.Duration) } else { ctx, cancel = context.WithCancel(ctx) } p.mu.Lock() p.cancels[t.ID] = cancel p.mu.Unlock() defer func() { cancel() p.mu.Lock() delete(p.cancels, t.ID) p.mu.Unlock() }() // Run the task. err = runner.Run(ctx, t, exec) exec.EndTime = time.Now().UTC() if err != nil { var blockedErr *BlockedError if errors.As(err, &blockedErr) { exec.Status = "BLOCKED" p.store.UpdateTaskState(t.ID, task.StateBlocked) p.store.UpdateTaskQuestion(t.ID, blockedErr.QuestionJSON) } else if ctx.Err() == context.DeadlineExceeded { exec.Status = "TIMED_OUT" exec.ErrorMsg = "execution timed out" p.store.UpdateTaskState(t.ID, task.StateTimedOut) } else if ctx.Err() == context.Canceled { exec.Status = "CANCELLED" exec.ErrorMsg = "execution cancelled" p.store.UpdateTaskState(t.ID, task.StateCancelled) } else { exec.Status = "FAILED" exec.ErrorMsg = err.Error() p.store.UpdateTaskState(t.ID, task.StateFailed) } } else { if t.ParentTaskID == "" { exec.Status = "READY" p.store.UpdateTaskState(t.ID, task.StateReady) } else { exec.Status = "COMPLETED" p.store.UpdateTaskState(t.ID, task.StateCompleted) } } if updateErr := p.store.UpdateExecution(exec); updateErr != nil { p.logger.Error("failed to update execution", "error", updateErr) } p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} } // terminalFailureStates are dependency states that cause the waiting task to fail immediately. var terminalFailureStates = map[task.State]bool{ task.StateFailed: true, task.StateTimedOut: true, task.StateCancelled: true, task.StateBudgetExceeded: true, } // waitForDependencies polls storage until all tasks in t.DependsOn reach COMPLETED, // or until a dependency enters a terminal failure state or the context is cancelled. func (p *Pool) waitForDependencies(ctx context.Context, t *task.Task) error { for { allDone := true for _, depID := range t.DependsOn { dep, err := p.store.GetTask(depID) if err != nil { return fmt.Errorf("dependency %q not found: %w", depID, err) } if dep.State == task.StateCompleted { continue } if terminalFailureStates[dep.State] { return fmt.Errorf("dependency %q ended in state %s", depID, dep.State) } allDone = false } if allDone { return nil } select { case <-ctx.Done(): return ctx.Err() case <-time.After(5 * time.Second): } } }