summaryrefslogtreecommitdiff
path: root/internal/executor
diff options
context:
space:
mode:
Diffstat (limited to 'internal/executor')
-rw-r--r--internal/executor/executor.go31
-rw-r--r--internal/executor/executor_test.go35
2 files changed, 66 insertions, 0 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index f404ddf..9dd37ae 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -483,6 +483,37 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) {
p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err}
}
+// RecoverStaleRunning marks any tasks stuck in RUNNING state (from a previous
+// server crash or restart) as FAILED. It also closes any open RUNNING execution
+// records for those tasks. Call this once on server startup.
+func (p *Pool) RecoverStaleRunning() {
+ tasks, err := p.store.ListTasks(storage.TaskFilter{State: task.StateRunning})
+ if err != nil {
+ p.logger.Error("RecoverStaleRunning: list tasks", "error", err)
+ return
+ }
+ for _, t := range tasks {
+ p.logger.Warn("recovering stale RUNNING task", "taskID", t.ID, "name", t.Name)
+ // Close any open execution records.
+ execs, err := p.store.ListExecutions(t.ID)
+ if err == nil {
+ for _, e := range execs {
+ if e.Status == "RUNNING" {
+ e.Status = "FAILED"
+ e.ErrorMsg = "server restarted while task was running"
+ e.EndTime = time.Now().UTC()
+ if updateErr := p.store.UpdateExecution(e); updateErr != nil {
+ p.logger.Error("RecoverStaleRunning: update execution", "error", updateErr, "execID", e.ID)
+ }
+ }
+ }
+ }
+ if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil {
+ p.logger.Error("RecoverStaleRunning: update task state", "error", err, "taskID", t.ID)
+ }
+ }
+}
+
// terminalFailureStates are dependency states that cause the waiting task to fail immediately.
var terminalFailureStates = map[task.State]bool{
task.StateFailed: true,
diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go
index e6b8f0b..2292aa5 100644
--- a/internal/executor/executor_test.go
+++ b/internal/executor/executor_test.go
@@ -461,6 +461,41 @@ func TestPool_FailureHistoryInjectedOnRetry(t *testing.T) {
}
}
+func TestPool_RecoverStaleRunning(t *testing.T) {
+ store := testStore(t)
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger)
+
+ // Create a task already in RUNNING state (simulating a crashed server).
+ tk := makeTask("stale-1")
+ tk.State = task.StateRunning
+ store.CreateTask(tk)
+
+ // Add an open execution record (no end time, status RUNNING).
+ store.CreateExecution(&storage.Execution{
+ ID: "exec-stale-1", TaskID: tk.ID,
+ StartTime: time.Now().Add(-5 * time.Minute),
+ Status: "RUNNING",
+ })
+
+ pool.RecoverStaleRunning()
+
+ recovered, err := store.GetTask(tk.ID)
+ if err != nil {
+ t.Fatalf("get task: %v", err)
+ }
+ if recovered.State != task.StateFailed {
+ t.Errorf("state: want FAILED, got %q", recovered.State)
+ }
+ execs, _ := store.ListExecutions(tk.ID)
+ if len(execs) == 0 || execs[0].Status != "FAILED" {
+ t.Errorf("execution status: want FAILED, got %+v", execs)
+ }
+ if execs[0].ErrorMsg == "" {
+ t.Error("expected non-empty error message on recovered execution")
+ }
+}
+
func TestPool_UnsupportedAgent(t *testing.T) {
store := testStore(t)
runners := map[string]Runner{"claude": &mockRunner{}}