summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--internal/executor/executor.go82
-rw-r--r--internal/executor/executor_test.go30
2 files changed, 77 insertions, 35 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index 9c8e126..0245899 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -26,6 +26,13 @@ type Runner interface {
Run(ctx context.Context, t *task.Task, exec *storage.Execution) error
}
+// workItem is an entry in the pool's internal work queue.
+type workItem struct {
+ ctx context.Context
+ task *task.Task
+ exec *storage.Execution // non-nil for resume submissions
+}
+
// Pool manages a bounded set of concurrent task workers.
type Pool struct {
maxConcurrent int
@@ -38,6 +45,8 @@ type Pool struct {
active int
cancels map[string]context.CancelFunc // taskID → cancel
resultCh chan *Result
+ workCh chan workItem // internal bounded queue; Submit enqueues here
+ doneCh chan struct{} // signals when a worker slot is freed
Questions *QuestionRegistry
}
@@ -52,7 +61,7 @@ func NewPool(maxConcurrent int, runner Runner, store *storage.DB, logger *slog.L
if maxConcurrent < 1 {
maxConcurrent = 1
}
- return &Pool{
+ p := &Pool{
maxConcurrent: maxConcurrent,
runner: runner,
store: store,
@@ -60,24 +69,47 @@ func NewPool(maxConcurrent int, runner Runner, store *storage.DB, logger *slog.L
depPollInterval: 5 * time.Second,
cancels: make(map[string]context.CancelFunc),
resultCh: make(chan *Result, maxConcurrent*2),
+ workCh: make(chan workItem, maxConcurrent*10+100),
+ doneCh: make(chan struct{}, maxConcurrent),
Questions: NewQuestionRegistry(),
}
+ go p.dispatch()
+ return p
}
-// Submit dispatches a task for execution. Blocks if pool is at capacity.
-func (p *Pool) Submit(ctx context.Context, t *task.Task) error {
- p.mu.Lock()
- if p.active >= p.maxConcurrent {
- active := p.active
- max := p.maxConcurrent
- p.mu.Unlock()
- return fmt.Errorf("executor pool at capacity (%d/%d)", active, max)
+// dispatch is a long-running goroutine that reads from the internal work queue
+// and launches goroutines as soon as a pool slot is available. This prevents
+// tasks from being rejected when the pool is temporarily at capacity.
+func (p *Pool) dispatch() {
+ for item := range p.workCh {
+ for {
+ p.mu.Lock()
+ if p.active < p.maxConcurrent {
+ p.active++
+ p.mu.Unlock()
+ if item.exec != nil {
+ go p.executeResume(item.ctx, item.task, item.exec)
+ } else {
+ go p.execute(item.ctx, item.task)
+ }
+ break
+ }
+ p.mu.Unlock()
+ <-p.doneCh // wait for a worker to finish
+ }
}
- p.active++
- p.mu.Unlock()
+}
- go p.execute(ctx, t)
- return nil
+// Submit enqueues a task for execution. Returns an error only if the internal
+// work queue is full. When the pool is at capacity the task is buffered and
+// dispatched as soon as a slot becomes available.
+func (p *Pool) Submit(ctx context.Context, t *task.Task) error {
+ select {
+ case p.workCh <- workItem{ctx: ctx, task: t}:
+ return nil
+ default:
+ return fmt.Errorf("executor work queue full (capacity %d)", cap(p.workCh))
+ }
}
// Results returns the channel for reading execution results.
@@ -107,18 +139,12 @@ func (p *Pool) SubmitResume(ctx context.Context, t *task.Task, exec *storage.Exe
if exec.ResumeSessionID == "" {
return fmt.Errorf("resume execution for task %s must have a ResumeSessionID", t.ID)
}
- p.mu.Lock()
- if p.active >= p.maxConcurrent {
- active := p.active
- max := p.maxConcurrent
- p.mu.Unlock()
- return fmt.Errorf("executor pool at capacity (%d/%d)", active, max)
+ select {
+ case p.workCh <- workItem{ctx: ctx, task: t, exec: exec}:
+ return nil
+ default:
+ return fmt.Errorf("executor work queue full (capacity %d)", cap(p.workCh))
}
- p.active++
- p.mu.Unlock()
-
- go p.executeResume(ctx, t, exec)
- return nil
}
func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Execution) {
@@ -126,6 +152,10 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex
p.mu.Lock()
p.active--
p.mu.Unlock()
+ select {
+ case p.doneCh <- struct{}{}:
+ default:
+ }
}()
// Pre-populate log paths.
@@ -208,6 +238,10 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
p.mu.Lock()
p.active--
p.mu.Unlock()
+ select {
+ case p.doneCh <- struct{}{}:
+ default:
+ }
}()
// Wait for all dependencies to complete before starting execution.
diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go
index 6d13873..414f852 100644
--- a/internal/executor/executor_test.go
+++ b/internal/executor/executor_test.go
@@ -217,26 +217,34 @@ func TestPool_Cancel_UnknownTask_ReturnsFalse(t *testing.T) {
}
}
-func TestPool_AtCapacity(t *testing.T) {
+// TestPool_QueuedWhenAtCapacity verifies that Submit enqueues a task rather than
+// returning an error when the pool is at capacity. Both tasks should eventually complete.
+func TestPool_QueuedWhenAtCapacity(t *testing.T) {
store := testStore(t)
- runner := &mockRunner{delay: time.Second}
+ runner := &mockRunner{delay: 100 * time.Millisecond}
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
pool := NewPool(1, runner, store, logger)
- tk1 := makeTask("cap-1")
+ tk1 := makeTask("queue-1")
store.CreateTask(tk1)
- pool.Submit(context.Background(), tk1)
+ if err := pool.Submit(context.Background(), tk1); err != nil {
+ t.Fatalf("first submit: %v", err)
+ }
- // Pool is at capacity, second submit should fail.
- time.Sleep(10 * time.Millisecond) // let goroutine start
- tk2 := makeTask("cap-2")
+ // Second submit must succeed (queued) even though pool slot is taken.
+ tk2 := makeTask("queue-2")
store.CreateTask(tk2)
- err := pool.Submit(context.Background(), tk2)
- if err == nil {
- t.Fatal("expected capacity error")
+ if err := pool.Submit(context.Background(), tk2); err != nil {
+ t.Fatalf("second submit: %v — expected task to be queued, not rejected", err)
}
- <-pool.Results() // drain
+ // Both tasks must complete.
+ for i := 0; i < 2; i++ {
+ r := <-pool.Results()
+ if r.Err != nil {
+ t.Errorf("task %s error: %v", r.TaskID, r.Err)
+ }
+ }
}
// logPatherMockRunner is a mockRunner that also implements LogPather,