diff options
| author | Peter Stone <thepeterstone@gmail.com> | 2026-03-06 00:09:34 +0000 |
|---|---|---|
| committer | Peter Stone <thepeterstone@gmail.com> | 2026-03-06 00:09:34 +0000 |
| commit | 3672981c4d225bb5b7d965a8945bc7bc6e8b4e9d (patch) | |
| tree | 0e40902052405b9cf0063f859a017ff3ab62ef3c /internal/executor/executor.go | |
| parent | 7466b1751c4126735769a3304e1db80dab166a9e (diff) | |
fix: implement cancel endpoint and pool cancel mechanism
POST /api/tasks/{id}/cancel now works. Pool tracks a cancel func per
running task ID; Cancel(taskID) calls it and returns false if the task
isn't running. The execute goroutine registers/deregisters the cancel
func around the runner call.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/executor/executor.go')
| -rw-r--r-- | internal/executor/executor.go | 27 |
1 files changed, 25 insertions, 2 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go index f6932f4..62fed2e 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -35,6 +35,7 @@ type Pool struct { mu sync.Mutex active int + cancels map[string]context.CancelFunc // taskID → cancel resultCh chan *Result Questions *QuestionRegistry } @@ -55,6 +56,7 @@ func NewPool(maxConcurrent int, runner Runner, store *storage.DB, logger *slog.L runner: runner, store: store, logger: logger, + cancels: make(map[string]context.CancelFunc), resultCh: make(chan *Result, maxConcurrent*2), Questions: NewQuestionRegistry(), } @@ -81,6 +83,19 @@ func (p *Pool) Results() <-chan *Result { return p.resultCh } +// Cancel requests cancellation of a running task. Returns false if the task +// is not currently running in this pool. +func (p *Pool) Cancel(taskID string) bool { + p.mu.Lock() + cancel, ok := p.cancels[taskID] + p.mu.Unlock() + if !ok { + return false + } + cancel() + return true +} + // SubmitResume re-queues a blocked task using the provided resume execution. // The execution must have ResumeSessionID and ResumeAnswer set. func (p *Pool) SubmitResume(ctx context.Context, t *task.Task, exec *storage.Execution) error { @@ -230,14 +245,22 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { p.logger.Error("failed to update task state", "error", err) } - // Apply task timeout. + // Apply task timeout and register cancel so callers can stop this task. var cancel context.CancelFunc if t.Timeout.Duration > 0 { ctx, cancel = context.WithTimeout(ctx, t.Timeout.Duration) } else { ctx, cancel = context.WithCancel(ctx) } - defer cancel() + p.mu.Lock() + p.cancels[t.ID] = cancel + p.mu.Unlock() + defer func() { + cancel() + p.mu.Lock() + delete(p.cancels, t.ID) + p.mu.Unlock() + }() // Run the task. err := p.runner.Run(ctx, t, exec) |
