summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
Diffstat (limited to 'internal')
-rw-r--r--internal/executor/executor.go13
-rw-r--r--internal/executor/executor_test.go5
-rw-r--r--internal/storage/db.go49
3 files changed, 57 insertions, 10 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index 384a323..376d62c 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -25,6 +25,7 @@ type Store interface {
ListSubtasks(parentID string) ([]*task.Task, error)
ListExecutions(taskID string) ([]*storage.Execution, error)
CreateExecution(e *storage.Execution) error
+ CreateExecutionAndSetRunning(e *storage.Execution) error
UpdateExecution(e *storage.Execution) error
UpdateTaskState(id string, newState task.State) error
UpdateTaskQuestion(taskID, questionJSON string) error
@@ -301,12 +302,9 @@ func (p *Pool) executeResume(ctx context.Context, t *task.Task, exec *storage.Ex
exec.StartTime = time.Now().UTC()
exec.Status = "RUNNING"
- if err := p.store.CreateExecution(exec); err != nil {
+ if err := p.store.CreateExecutionAndSetRunning(exec); err != nil {
p.logger.Error("failed to create resume execution record", "error", err)
}
- if err := p.store.UpdateTaskState(t.ID, task.StateRunning); err != nil {
- p.logger.Error("failed to update task state", "error", err)
- }
select {
case p.startedCh <- t.ID:
default:
@@ -1029,13 +1027,10 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
}
}
- // Record execution start.
- if err := p.store.CreateExecution(exec); err != nil {
+ // Record execution start atomically with the RUNNING state transition.
+ if err := p.store.CreateExecutionAndSetRunning(exec); err != nil {
p.logger.Error("failed to create execution record", "error", err)
}
- if err := p.store.UpdateTaskState(t.ID, task.StateRunning); err != nil {
- p.logger.Error("failed to update task state", "error", err)
- }
select {
case p.startedCh <- t.ID:
default:
diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go
index cb8205d..fac7e9c 100644
--- a/internal/executor/executor_test.go
+++ b/internal/executor/executor_test.go
@@ -1134,7 +1134,10 @@ func (m *minimalMockStore) ListSubtasks(parentID string) ([]*task.Task, error) {
return nil, nil
}
func (m *minimalMockStore) ListExecutions(_ string) ([]*storage.Execution, error) { return nil, nil }
-func (m *minimalMockStore) CreateExecution(e *storage.Execution) error { return nil }
+func (m *minimalMockStore) CreateExecution(e *storage.Execution) error { return nil }
+func (m *minimalMockStore) CreateExecutionAndSetRunning(e *storage.Execution) error {
+ return nil
+}
func (m *minimalMockStore) UpdateExecution(e *storage.Execution) error {
return m.updateExecErr
}
diff --git a/internal/storage/db.go b/internal/storage/db.go
index 37d1ada..3a3e6b2 100644
--- a/internal/storage/db.go
+++ b/internal/storage/db.go
@@ -463,6 +463,55 @@ type Execution struct {
Summary string
}
+// CreateExecutionAndSetRunning inserts an execution record and transitions the
+// task to RUNNING in a single transaction, preventing a crash-window where the
+// task stays PENDING with an orphaned RUNNING execution record.
+func (s *DB) CreateExecutionAndSetRunning(e *Execution) error {
+ tx, err := s.db.Begin()
+ if err != nil {
+ return err
+ }
+ defer tx.Rollback() //nolint:errcheck
+
+ // Validate state transition.
+ var currentState string
+ if err := tx.QueryRow(`SELECT state FROM tasks WHERE id = ?`, e.TaskID).Scan(&currentState); err != nil {
+ if err == sql.ErrNoRows {
+ return fmt.Errorf("task %q not found", e.TaskID)
+ }
+ return err
+ }
+ if !task.ValidTransition(task.State(currentState), task.StateRunning) {
+ return fmt.Errorf("invalid state transition %s → RUNNING for task %q", currentState, e.TaskID)
+ }
+
+ // Insert execution record.
+ commitsJSON := "[]"
+ if len(e.Commits) > 0 {
+ b, err := json.Marshal(e.Commits)
+ if err != nil {
+ return fmt.Errorf("marshaling commits: %w", err)
+ }
+ commitsJSON = string(b)
+ }
+ if _, err := tx.Exec(`
+ INSERT INTO executions (id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL, ?)`,
+ e.ID, e.TaskID, e.StartTime.UTC(), e.EndTime.UTC(), e.ExitCode, e.Status,
+ e.StdoutPath, e.StderrPath, e.ArtifactDir, e.CostUSD, e.ErrorMsg, e.SessionID, e.SandboxDir, commitsJSON,
+ ); err != nil {
+ return err
+ }
+
+ // Transition task to RUNNING.
+ now := time.Now().UTC()
+ if _, err := tx.Exec(`UPDATE tasks SET state = ?, updated_at = ? WHERE id = ?`, string(task.StateRunning), now, e.TaskID); err != nil {
+ return err
+ }
+
+ return tx.Commit()
+}
+
// CreateExecution inserts an execution record.
func (s *DB) CreateExecution(e *Execution) error {
var changestatsJSON *string