package executor import ( "context" "fmt" "log/slog" "sync" "time" "github.com/claudomator/claudomator/internal/storage" "github.com/claudomator/claudomator/internal/task" "github.com/google/uuid" ) // Runner executes a single task and returns the result. type Runner interface { Run(ctx context.Context, t *task.Task, exec *storage.Execution) error } // Pool manages a bounded set of concurrent task workers. type Pool struct { maxConcurrent int runner Runner store *storage.DB logger *slog.Logger mu sync.Mutex active int resultCh chan *Result } // Result is emitted when a task execution completes. type Result struct { TaskID string Execution *storage.Execution Err error } func NewPool(maxConcurrent int, runner Runner, store *storage.DB, logger *slog.Logger) *Pool { if maxConcurrent < 1 { maxConcurrent = 1 } return &Pool{ maxConcurrent: maxConcurrent, runner: runner, store: store, logger: logger, resultCh: make(chan *Result, maxConcurrent*2), } } // Submit dispatches a task for execution. Blocks if pool is at capacity. func (p *Pool) Submit(ctx context.Context, t *task.Task) error { p.mu.Lock() if p.active >= p.maxConcurrent { active := p.active max := p.maxConcurrent p.mu.Unlock() return fmt.Errorf("executor pool at capacity (%d/%d)", active, max) } p.active++ p.mu.Unlock() go p.execute(ctx, t) return nil } // Results returns the channel for reading execution results. func (p *Pool) Results() <-chan *Result { return p.resultCh } // ActiveCount returns the number of currently running tasks. func (p *Pool) ActiveCount() int { p.mu.Lock() defer p.mu.Unlock() return p.active } func (p *Pool) execute(ctx context.Context, t *task.Task) { execID := uuid.New().String() exec := &storage.Execution{ ID: execID, TaskID: t.ID, StartTime: time.Now().UTC(), Status: "RUNNING", } // Record execution start. if err := p.store.CreateExecution(exec); err != nil { p.logger.Error("failed to create execution record", "error", err) } if err := p.store.UpdateTaskState(t.ID, task.StateRunning); err != nil { p.logger.Error("failed to update task state", "error", err) } // Apply task timeout. var cancel context.CancelFunc if t.Timeout.Duration > 0 { ctx, cancel = context.WithTimeout(ctx, t.Timeout.Duration) } else { ctx, cancel = context.WithCancel(ctx) } defer cancel() // Run the task. err := p.runner.Run(ctx, t, exec) exec.EndTime = time.Now().UTC() if err != nil { if ctx.Err() == context.DeadlineExceeded { exec.Status = "TIMED_OUT" exec.ErrorMsg = "execution timed out" p.store.UpdateTaskState(t.ID, task.StateTimedOut) } else if ctx.Err() == context.Canceled { exec.Status = "CANCELLED" exec.ErrorMsg = "execution cancelled" p.store.UpdateTaskState(t.ID, task.StateCancelled) } else { exec.Status = "FAILED" exec.ErrorMsg = err.Error() p.store.UpdateTaskState(t.ID, task.StateFailed) } } else { exec.Status = "COMPLETED" p.store.UpdateTaskState(t.ID, task.StateCompleted) } if updateErr := p.store.UpdateExecution(exec); updateErr != nil { p.logger.Error("failed to update execution", "error", updateErr) } p.mu.Lock() p.active-- p.mu.Unlock() p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} }