diff options
Diffstat (limited to 'internal/executor')
| -rw-r--r-- | internal/executor/executor.go | 13 | ||||
| -rw-r--r-- | internal/executor/executor_test.go | 5 |
2 files changed, 8 insertions, 10 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 384a323..376d62c 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -25,6 +25,7 @@ type Store interface { ListSubtasks(parentID string) ([]*task.Task, error) ListExecutions(taskID string) ([]*storage.Execution, error) CreateExecution(e *storage.Execution) error + CreateExecutionAndSetRunning(e *storage.Execution) error UpdateExecution(e *storage.Execution) error UpdateTaskState(id string, newState task.State) error UpdateTaskQuestion(taskID, questionJSON string) error @@ -301,12 +302,9 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex exec.StartTime = time.Now().UTC() exec.Status = "RUNNING" - if err := p.store.CreateExecution(exec); err != nil { + if err := p.store.CreateExecutionAndSetRunning(exec); err != nil { p.logger.Error("failed to create resume execution record", "error", err) } - if err := p.store.UpdateTaskState(t.ID, task.StateRunning); err != nil { - p.logger.Error("failed to update task state", "error", err) - } select { case p.startedCh <- t.ID: default: @@ -1029,13 +1027,10 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { } } - // Record execution start. - if err := p.store.CreateExecution(exec); err != nil { + // Record execution start atomically with the RUNNING state transition. + if err := p.store.CreateExecutionAndSetRunning(exec); err != nil { p.logger.Error("failed to create execution record", "error", err) } - if err := p.store.UpdateTaskState(t.ID, task.StateRunning); err != nil { - p.logger.Error("failed to update task state", "error", err) - } select { case p.startedCh <- t.ID: default: diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go index cb8205d..fac7e9c 100644 --- a/internal/executor/executor_test.go +++ b/internal/executor/executor_test.go @@ -1134,7 +1134,10 @@ func (m *minimalMockStore) ListSubtasks(parentID string) ([]*task.Task, error) { return nil, nil } func (m *minimalMockStore) ListExecutions(_ string) ([]*storage.Execution, error) { return nil, nil } -func (m *minimalMockStore) CreateExecution(e *storage.Execution) error { return nil } +func (m *minimalMockStore) CreateExecution(e *storage.Execution) error { return nil } +func (m *minimalMockStore) CreateExecutionAndSetRunning(e *storage.Execution) error { + return nil +} func (m *minimalMockStore) UpdateExecution(e *storage.Execution) error { return m.updateExecErr } |
