diff options
| author | Peter Stone <thepeterstone@gmail.com> | 2026-04-10 09:17:31 +0000 |
|---|---|---|
| committer | Peter Stone <thepeterstone@gmail.com> | 2026-05-03 17:59:18 +0000 |
| commit | f01231cc45f41ce2dc37072e77428e467ef3fc15 (patch) | |
| tree | ed850c116148d86876ed43d0d7598094052c4b18 | |
| parent | 2196a048b1a980ff6a4d04918cbee69778186c83 (diff) | |
fix: atomic execution creation + RUNNING state transition
Add CreateExecutionAndSetRunning to storage.DB and Store interface,
replacing the two sequential CreateExecution/UpdateTaskState calls in
executor.go. Eliminates the crash window where a task stays PENDING
with an orphaned RUNNING execution record.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
| -rw-r--r-- | internal/executor/executor.go | 13 | ||||
| -rw-r--r-- | internal/executor/executor_test.go | 5 | ||||
| -rw-r--r-- | internal/storage/db.go | 49 |
3 files changed, 57 insertions, 10 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 384a323..376d62c 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -25,6 +25,7 @@ type Store interface { ListSubtasks(parentID string) ([]*task.Task, error) ListExecutions(taskID string) ([]*storage.Execution, error) CreateExecution(e *storage.Execution) error + CreateExecutionAndSetRunning(e *storage.Execution) error UpdateExecution(e *storage.Execution) error UpdateTaskState(id string, newState task.State) error UpdateTaskQuestion(taskID, questionJSON string) error @@ -301,12 +302,9 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex exec.StartTime = time.Now().UTC() exec.Status = "RUNNING" - if err := p.store.CreateExecution(exec); err != nil { + if err := p.store.CreateExecutionAndSetRunning(exec); err != nil { p.logger.Error("failed to create resume execution record", "error", err) } - if err := p.store.UpdateTaskState(t.ID, task.StateRunning); err != nil { - p.logger.Error("failed to update task state", "error", err) - } select { case p.startedCh <- t.ID: default: @@ -1029,13 +1027,10 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { } } - // Record execution start. - if err := p.store.CreateExecution(exec); err != nil { + // Record execution start atomically with the RUNNING state transition. + if err := p.store.CreateExecutionAndSetRunning(exec); err != nil { p.logger.Error("failed to create execution record", "error", err) } - if err := p.store.UpdateTaskState(t.ID, task.StateRunning); err != nil { - p.logger.Error("failed to update task state", "error", err) - } select { case p.startedCh <- t.ID: default: diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go index cb8205d..fac7e9c 100644 --- a/internal/executor/executor_test.go +++ b/internal/executor/executor_test.go @@ -1134,7 +1134,10 @@ func (m *minimalMockStore) ListSubtasks(parentID string) ([]*task.Task, error) { return nil, nil } func (m *minimalMockStore) ListExecutions(_ string) ([]*storage.Execution, error) { return nil, nil } -func (m *minimalMockStore) CreateExecution(e *storage.Execution) error { return nil } +func (m *minimalMockStore) CreateExecution(e *storage.Execution) error { return nil } +func (m *minimalMockStore) CreateExecutionAndSetRunning(e *storage.Execution) error { + return nil +} func (m *minimalMockStore) UpdateExecution(e *storage.Execution) error { return m.updateExecErr } diff --git a/internal/storage/db.go b/internal/storage/db.go index 37d1ada..3a3e6b2 100644 --- a/internal/storage/db.go +++ b/internal/storage/db.go @@ -463,6 +463,55 @@ type Execution struct { Summary string } +// CreateExecutionAndSetRunning inserts an execution record and transitions the +// task to RUNNING in a single transaction, preventing a crash-window where the +// task stays PENDING with an orphaned RUNNING execution record. +func (s *DB) CreateExecutionAndSetRunning(e *Execution) error { + tx, err := s.db.Begin() + if err != nil { + return err + } + defer tx.Rollback() //nolint:errcheck + + // Validate state transition. + var currentState string + if err := tx.QueryRow(`SELECT state FROM tasks WHERE id = ?`, e.TaskID).Scan(¤tState); err != nil { + if err == sql.ErrNoRows { + return fmt.Errorf("task %q not found", e.TaskID) + } + return err + } + if !task.ValidTransition(task.State(currentState), task.StateRunning) { + return fmt.Errorf("invalid state transition %s → RUNNING for task %q", currentState, e.TaskID) + } + + // Insert execution record. + commitsJSON := "[]" + if len(e.Commits) > 0 { + b, err := json.Marshal(e.Commits) + if err != nil { + return fmt.Errorf("marshaling commits: %w", err) + } + commitsJSON = string(b) + } + if _, err := tx.Exec(` + INSERT INTO executions (id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL, ?)`, + e.ID, e.TaskID, e.StartTime.UTC(), e.EndTime.UTC(), e.ExitCode, e.Status, + e.StdoutPath, e.StderrPath, e.ArtifactDir, e.CostUSD, e.ErrorMsg, e.SessionID, e.SandboxDir, commitsJSON, + ); err != nil { + return err + } + + // Transition task to RUNNING. + now := time.Now().UTC() + if _, err := tx.Exec(`UPDATE tasks SET state = ?, updated_at = ? WHERE id = ?`, string(task.StateRunning), now, e.TaskID); err != nil { + return err + } + + return tx.Commit() +} + // CreateExecution inserts an execution record. func (s *DB) CreateExecution(e *Execution) error { var changestatsJSON *string |
