diff options
Diffstat (limited to 'internal/executor')
| -rw-r--r-- | internal/executor/executor.go | 82 | ||||
| -rw-r--r-- | internal/executor/executor_test.go | 30 |
2 files changed, 77 insertions, 35 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 9c8e126..0245899 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -26,6 +26,13 @@ type Runner interface { Run(ctx context.Context, t *task.Task, exec *storage.Execution) error } +// workItem is an entry in the pool's internal work queue. +type workItem struct { + ctx context.Context + task *task.Task + exec *storage.Execution // non-nil for resume submissions +} + // Pool manages a bounded set of concurrent task workers. type Pool struct { maxConcurrent int @@ -38,6 +45,8 @@ type Pool struct { active int cancels map[string]context.CancelFunc // taskID → cancel resultCh chan *Result + workCh chan workItem // internal bounded queue; Submit enqueues here + doneCh chan struct{} // signals when a worker slot is freed Questions *QuestionRegistry } @@ -52,7 +61,7 @@ func NewPool(maxConcurrent int, runner Runner, store *storage.DB, logger *slog.L if maxConcurrent < 1 { maxConcurrent = 1 } - return &Pool{ + p := &Pool{ maxConcurrent: maxConcurrent, runner: runner, store: store, @@ -60,24 +69,47 @@ func NewPool(maxConcurrent int, runner Runner, store *storage.DB, logger *slog.L depPollInterval: 5 * time.Second, cancels: make(map[string]context.CancelFunc), resultCh: make(chan *Result, maxConcurrent*2), + workCh: make(chan workItem, maxConcurrent*10+100), + doneCh: make(chan struct{}, maxConcurrent), Questions: NewQuestionRegistry(), } + go p.dispatch() + return p } -// Submit dispatches a task for execution. Blocks if pool is at capacity. -func (p *Pool) Submit(ctx context.Context, t *task.Task) error { - p.mu.Lock() - if p.active >= p.maxConcurrent { - active := p.active - max := p.maxConcurrent - p.mu.Unlock() - return fmt.Errorf("executor pool at capacity (%d/%d)", active, max) +// dispatch is a long-running goroutine that reads from the internal work queue +// and launches goroutines as soon as a pool slot is available. This prevents +// tasks from being rejected when the pool is temporarily at capacity. +func (p *Pool) dispatch() { + for item := range p.workCh { + for { + p.mu.Lock() + if p.active < p.maxConcurrent { + p.active++ + p.mu.Unlock() + if item.exec != nil { + go p.executeResume(item.ctx, item.task, item.exec) + } else { + go p.execute(item.ctx, item.task) + } + break + } + p.mu.Unlock() + <-p.doneCh // wait for a worker to finish + } } - p.active++ - p.mu.Unlock() +} - go p.execute(ctx, t) - return nil +// Submit enqueues a task for execution. Returns an error only if the internal +// work queue is full. When the pool is at capacity the task is buffered and +// dispatched as soon as a slot becomes available. +func (p *Pool) Submit(ctx context.Context, t *task.Task) error { + select { + case p.workCh <- workItem{ctx: ctx, task: t}: + return nil + default: + return fmt.Errorf("executor work queue full (capacity %d)", cap(p.workCh)) + } } // Results returns the channel for reading execution results. @@ -107,18 +139,12 @@ func (p *Pool) SubmitResume(ctx context.Context, t *task.Task, exec *storage.Exe if exec.ResumeSessionID == "" { return fmt.Errorf("resume execution for task %s must have a ResumeSessionID", t.ID) } - p.mu.Lock() - if p.active >= p.maxConcurrent { - active := p.active - max := p.maxConcurrent - p.mu.Unlock() - return fmt.Errorf("executor pool at capacity (%d/%d)", active, max) + select { + case p.workCh <- workItem{ctx: ctx, task: t, exec: exec}: + return nil + default: + return fmt.Errorf("executor work queue full (capacity %d)", cap(p.workCh)) } - p.active++ - p.mu.Unlock() - - go p.executeResume(ctx, t, exec) - return nil } func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Execution) { @@ -126,6 +152,10 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex p.mu.Lock() p.active-- p.mu.Unlock() + select { + case p.doneCh <- struct{}{}: + default: + } }() // Pre-populate log paths. @@ -208,6 +238,10 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { p.mu.Lock() p.active-- p.mu.Unlock() + select { + case p.doneCh <- struct{}{}: + default: + } }() // Wait for all dependencies to complete before starting execution. diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go index 6d13873..414f852 100644 --- a/internal/executor/executor_test.go +++ b/internal/executor/executor_test.go @@ -217,26 +217,34 @@ func TestPool_Cancel_UnknownTask_ReturnsFalse(t *testing.T) { } } -func TestPool_AtCapacity(t *testing.T) { +// TestPool_QueuedWhenAtCapacity verifies that Submit enqueues a task rather than +// returning an error when the pool is at capacity. Both tasks should eventually complete. +func TestPool_QueuedWhenAtCapacity(t *testing.T) { store := testStore(t) - runner := &mockRunner{delay: time.Second} + runner := &mockRunner{delay: 100 * time.Millisecond} logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) pool := NewPool(1, runner, store, logger) - tk1 := makeTask("cap-1") + tk1 := makeTask("queue-1") store.CreateTask(tk1) - pool.Submit(context.Background(), tk1) + if err := pool.Submit(context.Background(), tk1); err != nil { + t.Fatalf("first submit: %v", err) + } - // Pool is at capacity, second submit should fail. - time.Sleep(10 * time.Millisecond) // let goroutine start - tk2 := makeTask("cap-2") + // Second submit must succeed (queued) even though pool slot is taken. + tk2 := makeTask("queue-2") store.CreateTask(tk2) - err := pool.Submit(context.Background(), tk2) - if err == nil { - t.Fatal("expected capacity error") + if err := pool.Submit(context.Background(), tk2); err != nil { + t.Fatalf("second submit: %v — expected task to be queued, not rejected", err) } - <-pool.Results() // drain + // Both tasks must complete. + for i := 0; i < 2; i++ { + r := <-pool.Results() + if r.Err != nil { + t.Errorf("task %s error: %v", r.TaskID, r.Err) + } + } } // logPatherMockRunner is a mockRunner that also implements LogPather, |
