summaryrefslogtreecommitdiff
path: root/internal/executor/executor.go
diff options
context:
space:
mode:
authorPeter Stone <thepeterstone@gmail.com>2026-03-26 09:09:19 +0000
committerPeter Stone <thepeterstone@gmail.com>2026-03-26 09:14:14 +0000
commit3f9843b34d7ae9df2dd9c69427ecab45744b97e9 (patch)
tree1c667c17d77b43a1e5fbcae464068a74c2857fb5 /internal/executor/executor.go
parentdac676e8284725c8ec6de08282fe08a9b519ccc8 (diff)
feat: graceful shutdown — drain workers before exit (default 3m timeout)
- Add workerWg to Pool; Shutdown() closes workCh and waits for all in-flight execute/executeResume goroutines to finish - Signal handler now shuts down HTTP first, then drains the pool - ShutdownTimeout config field (toml: shutdown_timeout); default 3m - Tests: WaitsForWorkers and TimesOut Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/executor/executor.go')
-rw-r--r--internal/executor/executor.go35
1 files changed, 33 insertions, 2 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index 6aef736..ae040c2 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -80,6 +80,8 @@ type Pool struct {
startedCh chan string // task IDs that just transitioned to RUNNING
workCh chan workItem // internal bounded queue; Submit enqueues here
doneCh chan struct{} // signals when a worker slot is freed
+ workerWg sync.WaitGroup // tracks in-flight execute/executeResume goroutines
+ dispatchDone chan struct{} // closed when the dispatch goroutine exits
Questions *QuestionRegistry
Classifier *Classifier
}
@@ -112,6 +114,7 @@ func NewPool(maxConcurrent int, runners map[string]Runner, store Store, logger *
startedCh: make(chan string, maxConcurrent*2),
workCh: make(chan workItem, maxConcurrent*10+100),
doneCh: make(chan struct{}, maxConcurrent),
+ dispatchDone: make(chan struct{}),
Questions: NewQuestionRegistry(),
}
go p.dispatch()
@@ -122,6 +125,7 @@ func NewPool(maxConcurrent int, runners map[string]Runner, store Store, logger *
// and launches goroutines as soon as a pool slot is available. This prevents
// tasks from being rejected when the pool is temporarily at capacity.
func (p *Pool) dispatch() {
+ defer close(p.dispatchDone)
for item := range p.workCh {
for {
p.mu.Lock()
@@ -129,9 +133,9 @@ func (p *Pool) dispatch() {
p.active++
p.mu.Unlock()
if item.exec != nil {
- go p.executeResume(item.ctx, item.task, item.exec)
+ p.workerWg.Add(1); go func(i workItem) { defer p.workerWg.Done(); p.executeResume(i.ctx, i.task, i.exec) }(item)
} else {
- go p.execute(item.ctx, item.task)
+ p.workerWg.Add(1); go func(i workItem) { defer p.workerWg.Done(); p.execute(i.ctx, i.task) }(item)
}
break
}
@@ -163,6 +167,33 @@ func (p *Pool) Results() <-chan *Result {
return p.resultCh
}
+// Shutdown stops accepting new work and waits for all in-flight workers to
+// finish. Returns ctx.Err() if the context deadline is exceeded before all
+// workers complete.
+func (p *Pool) Shutdown(ctx context.Context) error {
+ // Stop the dispatch goroutine. We must wait for it to exit before calling
+ // workerWg.Wait() to avoid a race between dispatch's Add(1) and Wait().
+ close(p.workCh)
+ select {
+ case <-p.dispatchDone:
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+
+ done := make(chan struct{})
+ go func() {
+ p.workerWg.Wait()
+ close(done)
+ }()
+
+ select {
+ case <-done:
+ return nil
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+}
+
// Cancel requests cancellation of a running task. Returns false if the task
// is not currently running in this pool.
func (p *Pool) Cancel(taskID string) bool {