From 91595726d696ef8bdc8c3ef0a39a1d3a7be34408 Mon Sep 17 00:00:00 2001 From: Peter Stone Date: Mon, 9 Mar 2026 04:24:06 +0000 Subject: executor: recover stale RUNNING tasks on server startup On restart, any tasks in RUNNING state have no active goroutine. RecoverStaleRunning() marks them FAILED (retryable) and closes their open execution records with an appropriate error message. Called once from serve.go after the pool is created. --- internal/cli/serve.go | 1 + internal/executor/executor.go | 31 +++++++++++++++++++++++++++++++ internal/executor/executor_test.go | 35 +++++++++++++++++++++++++++++++++++ 3 files changed, 67 insertions(+) (limited to 'internal') diff --git a/internal/cli/serve.go b/internal/cli/serve.go index e90c3e2..4253d56 100644 --- a/internal/cli/serve.go +++ b/internal/cli/serve.go @@ -71,6 +71,7 @@ func serve(addr string) error { if cfg.GeminiBinaryPath != "" { pool.Classifier = &executor.Classifier{GeminiBinaryPath: cfg.GeminiBinaryPath} } + pool.RecoverStaleRunning() srv := api.NewServer(store, pool, logger, cfg.ClaudeBinaryPath, cfg.GeminiBinaryPath) if cfg.WebhookURL != "" { diff --git a/internal/executor/executor.go b/internal/executor/executor.go index f404ddf..9dd37ae 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -483,6 +483,37 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} } +// RecoverStaleRunning marks any tasks stuck in RUNNING state (from a previous +// server crash or restart) as FAILED. It also closes any open RUNNING execution +// records for those tasks. Call this once on server startup. +func (p *Pool) RecoverStaleRunning() { + tasks, err := p.store.ListTasks(storage.TaskFilter{State: task.StateRunning}) + if err != nil { + p.logger.Error("RecoverStaleRunning: list tasks", "error", err) + return + } + for _, t := range tasks { + p.logger.Warn("recovering stale RUNNING task", "taskID", t.ID, "name", t.Name) + // Close any open execution records. + execs, err := p.store.ListExecutions(t.ID) + if err == nil { + for _, e := range execs { + if e.Status == "RUNNING" { + e.Status = "FAILED" + e.ErrorMsg = "server restarted while task was running" + e.EndTime = time.Now().UTC() + if updateErr := p.store.UpdateExecution(e); updateErr != nil { + p.logger.Error("RecoverStaleRunning: update execution", "error", updateErr, "execID", e.ID) + } + } + } + } + if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil { + p.logger.Error("RecoverStaleRunning: update task state", "error", err, "taskID", t.ID) + } + } +} + // terminalFailureStates are dependency states that cause the waiting task to fail immediately. var terminalFailureStates = map[task.State]bool{ task.StateFailed: true, diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go index e6b8f0b..2292aa5 100644 --- a/internal/executor/executor_test.go +++ b/internal/executor/executor_test.go @@ -461,6 +461,41 @@ func TestPool_FailureHistoryInjectedOnRetry(t *testing.T) { } } +func TestPool_RecoverStaleRunning(t *testing.T) { + store := testStore(t) + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger) + + // Create a task already in RUNNING state (simulating a crashed server). + tk := makeTask("stale-1") + tk.State = task.StateRunning + store.CreateTask(tk) + + // Add an open execution record (no end time, status RUNNING). + store.CreateExecution(&storage.Execution{ + ID: "exec-stale-1", TaskID: tk.ID, + StartTime: time.Now().Add(-5 * time.Minute), + Status: "RUNNING", + }) + + pool.RecoverStaleRunning() + + recovered, err := store.GetTask(tk.ID) + if err != nil { + t.Fatalf("get task: %v", err) + } + if recovered.State != task.StateFailed { + t.Errorf("state: want FAILED, got %q", recovered.State) + } + execs, _ := store.ListExecutions(tk.ID) + if len(execs) == 0 || execs[0].Status != "FAILED" { + t.Errorf("execution status: want FAILED, got %+v", execs) + } + if execs[0].ErrorMsg == "" { + t.Error("expected non-empty error message on recovered execution") + } +} + func TestPool_UnsupportedAgent(t *testing.T) { store := testStore(t) runners := map[string]Runner{"claude": &mockRunner{}} -- cgit v1.2.3