summaryrefslogtreecommitdiff
path: root/internal/executor
diff options
context:
space:
mode:
authorPeter Stone <thepeterstone@gmail.com>2026-04-10 09:17:31 +0000
committerPeter Stone <thepeterstone@gmail.com>2026-05-03 17:59:18 +0000
commitf01231cc45f41ce2dc37072e77428e467ef3fc15 (patch)
treeed850c116148d86876ed43d0d7598094052c4b18 /internal/executor
parent2196a048b1a980ff6a4d04918cbee69778186c83 (diff)
fix: atomic execution creation + RUNNING state transition
Add CreateExecutionAndSetRunning to storage.DB and Store interface, replacing the two sequential CreateExecution/UpdateTaskState calls in executor.go. Eliminates the crash window where a task stays PENDING with an orphaned RUNNING execution record. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/executor')
-rw-r--r--internal/executor/executor.go13
-rw-r--r--internal/executor/executor_test.go5
2 files changed, 8 insertions, 10 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index 384a323..376d62c 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -25,6 +25,7 @@ type Store interface {
ListSubtasks(parentID string) ([]*task.Task, error)
ListExecutions(taskID string) ([]*storage.Execution, error)
CreateExecution(e *storage.Execution) error
+ CreateExecutionAndSetRunning(e *storage.Execution) error
UpdateExecution(e *storage.Execution) error
UpdateTaskState(id string, newState task.State) error
UpdateTaskQuestion(taskID, questionJSON string) error
@@ -301,12 +302,9 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex
exec.StartTime = time.Now().UTC()
exec.Status = "RUNNING"
- if err := p.store.CreateExecution(exec); err != nil {
+ if err := p.store.CreateExecutionAndSetRunning(exec); err != nil {
p.logger.Error("failed to create resume execution record", "error", err)
}
- if err := p.store.UpdateTaskState(t.ID, task.StateRunning); err != nil {
- p.logger.Error("failed to update task state", "error", err)
- }
select {
case p.startedCh <- t.ID:
default:
@@ -1029,13 +1027,10 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
}
}
- // Record execution start.
- if err := p.store.CreateExecution(exec); err != nil {
+ // Record execution start atomically with the RUNNING state transition.
+ if err := p.store.CreateExecutionAndSetRunning(exec); err != nil {
p.logger.Error("failed to create execution record", "error", err)
}
- if err := p.store.UpdateTaskState(t.ID, task.StateRunning); err != nil {
- p.logger.Error("failed to update task state", "error", err)
- }
select {
case p.startedCh <- t.ID:
default:
diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go
index cb8205d..fac7e9c 100644
--- a/internal/executor/executor_test.go
+++ b/internal/executor/executor_test.go
@@ -1134,7 +1134,10 @@ func (m *minimalMockStore) ListSubtasks(parentID string) ([]*task.Task, error) {
return nil, nil
}
func (m *minimalMockStore) ListExecutions(_ string) ([]*storage.Execution, error) { return nil, nil }
-func (m *minimalMockStore) CreateExecution(e *storage.Execution) error { return nil }
+func (m *minimalMockStore) CreateExecution(e *storage.Execution) error { return nil }
+func (m *minimalMockStore) CreateExecutionAndSetRunning(e *storage.Execution) error {
+ return nil
+}
func (m *minimalMockStore) UpdateExecution(e *storage.Execution) error {
return m.updateExecErr
}