summaryrefslogtreecommitdiff
path: root/internal/executor/executor.go
diff options
context:
space:
mode:
authorPeter Stone <thepeterstone@gmail.com>2026-03-06 00:09:34 +0000
committerPeter Stone <thepeterstone@gmail.com>2026-03-06 00:09:34 +0000
commit3672981c4d225bb5b7d965a8945bc7bc6e8b4e9d (patch)
tree0e40902052405b9cf0063f859a017ff3ab62ef3c /internal/executor/executor.go
parent7466b1751c4126735769a3304e1db80dab166a9e (diff)
fix: implement cancel endpoint and pool cancel mechanism
POST /api/tasks/{id}/cancel now works. Pool tracks a cancel func per running task ID; Cancel(taskID) calls it and returns false if the task isn't running. The execute goroutine registers/deregisters the cancel func around the runner call. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/executor/executor.go')
-rw-r--r--internal/executor/executor.go27
1 files changed, 25 insertions, 2 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index f6932f4..62fed2e 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -35,6 +35,7 @@ type Pool struct {
mu sync.Mutex
active int
+ cancels map[string]context.CancelFunc // taskID → cancel
resultCh chan *Result
Questions *QuestionRegistry
}
@@ -55,6 +56,7 @@ func NewPool(maxConcurrent int, runner Runner, store *storage.DB, logger *slog.L
runner: runner,
store: store,
logger: logger,
+ cancels: make(map[string]context.CancelFunc),
resultCh: make(chan *Result, maxConcurrent*2),
Questions: NewQuestionRegistry(),
}
@@ -81,6 +83,19 @@ func (p *Pool) Results() <-chan *Result {
return p.resultCh
}
+// Cancel requests cancellation of a running task. Returns false if the task
+// is not currently running in this pool.
+func (p *Pool) Cancel(taskID string) bool {
+ p.mu.Lock()
+ cancel, ok := p.cancels[taskID]
+ p.mu.Unlock()
+ if !ok {
+ return false
+ }
+ cancel()
+ return true
+}
+
// SubmitResume re-queues a blocked task using the provided resume execution.
// The execution must have ResumeSessionID and ResumeAnswer set.
func (p *Pool) SubmitResume(ctx context.Context, t *task.Task, exec *storage.Execution) error {
@@ -230,14 +245,22 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
p.logger.Error("failed to update task state", "error", err)
}
- // Apply task timeout.
+ // Apply task timeout and register cancel so callers can stop this task.
var cancel context.CancelFunc
if t.Timeout.Duration > 0 {
ctx, cancel = context.WithTimeout(ctx, t.Timeout.Duration)
} else {
ctx, cancel = context.WithCancel(ctx)
}
- defer cancel()
+ p.mu.Lock()
+ p.cancels[t.ID] = cancel
+ p.mu.Unlock()
+ defer func() {
+ cancel()
+ p.mu.Lock()
+ delete(p.cancels, t.ID)
+ p.mu.Unlock()
+ }()
// Run the task.
err := p.runner.Run(ctx, t, exec)