diff options
| -rw-r--r-- | internal/cli/serve.go | 1 | ||||
| -rw-r--r-- | internal/executor/executor.go | 31 | ||||
| -rw-r--r-- | internal/executor/executor_test.go | 35 |
3 files changed, 67 insertions, 0 deletions
diff --git a/internal/cli/serve.go b/internal/cli/serve.go index e90c3e2..4253d56 100644 --- a/internal/cli/serve.go +++ b/internal/cli/serve.go @@ -71,6 +71,7 @@ func serve(addr string) error { if cfg.GeminiBinaryPath != "" { pool.Classifier = &executor.Classifier{GeminiBinaryPath: cfg.GeminiBinaryPath} } + pool.RecoverStaleRunning() srv := api.NewServer(store, pool, logger, cfg.ClaudeBinaryPath, cfg.GeminiBinaryPath) if cfg.WebhookURL != "" { diff --git a/internal/executor/executor.go b/internal/executor/executor.go index f404ddf..9dd37ae 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -483,6 +483,37 @@ func (p *Pool) execute(ctx context.Context, t *task.Task) { p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} } +// RecoverStaleRunning marks any tasks stuck in RUNNING state (from a previous +// server crash or restart) as FAILED. It also closes any open RUNNING execution +// records for those tasks. Call this once on server startup. +func (p *Pool) RecoverStaleRunning() { + tasks, err := p.store.ListTasks(storage.TaskFilter{State: task.StateRunning}) + if err != nil { + p.logger.Error("RecoverStaleRunning: list tasks", "error", err) + return + } + for _, t := range tasks { + p.logger.Warn("recovering stale RUNNING task", "taskID", t.ID, "name", t.Name) + // Close any open execution records. + execs, err := p.store.ListExecutions(t.ID) + if err == nil { + for _, e := range execs { + if e.Status == "RUNNING" { + e.Status = "FAILED" + e.ErrorMsg = "server restarted while task was running" + e.EndTime = time.Now().UTC() + if updateErr := p.store.UpdateExecution(e); updateErr != nil { + p.logger.Error("RecoverStaleRunning: update execution", "error", updateErr, "execID", e.ID) + } + } + } + } + if err := p.store.UpdateTaskState(t.ID, task.StateFailed); err != nil { + p.logger.Error("RecoverStaleRunning: update task state", "error", err, "taskID", t.ID) + } + } +} + // terminalFailureStates are dependency states that cause the waiting task to fail immediately. var terminalFailureStates = map[task.State]bool{ task.StateFailed: true, diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go index e6b8f0b..2292aa5 100644 --- a/internal/executor/executor_test.go +++ b/internal/executor/executor_test.go @@ -461,6 +461,41 @@ func TestPool_FailureHistoryInjectedOnRetry(t *testing.T) { } } +func TestPool_RecoverStaleRunning(t *testing.T) { + store := testStore(t) + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + pool := NewPool(2, map[string]Runner{"claude": &mockRunner{}}, store, logger) + + // Create a task already in RUNNING state (simulating a crashed server). + tk := makeTask("stale-1") + tk.State = task.StateRunning + store.CreateTask(tk) + + // Add an open execution record (no end time, status RUNNING). + store.CreateExecution(&storage.Execution{ + ID: "exec-stale-1", TaskID: tk.ID, + StartTime: time.Now().Add(-5 * time.Minute), + Status: "RUNNING", + }) + + pool.RecoverStaleRunning() + + recovered, err := store.GetTask(tk.ID) + if err != nil { + t.Fatalf("get task: %v", err) + } + if recovered.State != task.StateFailed { + t.Errorf("state: want FAILED, got %q", recovered.State) + } + execs, _ := store.ListExecutions(tk.ID) + if len(execs) == 0 || execs[0].Status != "FAILED" { + t.Errorf("execution status: want FAILED, got %+v", execs) + } + if execs[0].ErrorMsg == "" { + t.Error("expected non-empty error message on recovered execution") + } +} + func TestPool_UnsupportedAgent(t *testing.T) { store := testStore(t) runners := map[string]Runner{"claude": &mockRunner{}} |
