diff options
| author | Peter Stone <thepeterstone@gmail.com> | 2026-03-13 03:15:02 +0000 |
|---|---|---|
| committer | Peter Stone <thepeterstone@gmail.com> | 2026-03-13 03:15:02 +0000 |
| commit | d5f83f8662c9f9c0fb52b206b06d4dd54a7788b4 (patch) | |
| tree | 780da46be8f8f80cc7697546e8361cd405d6f3fa /internal | |
| parent | b9aba3d242482fa9cd42f2a49b2767a73d4d2213 (diff) | |
fix: resubmit QUEUED tasks on server startup to prevent them getting stuck
Add Pool.RecoverStaleQueued() that lists all QUEUED tasks from the DB on
startup and re-submits them to the in-memory pool. Previously, tasks that
were QUEUED when the server restarted would remain stuck indefinitely since
only RUNNING tasks were recovered (and marked FAILED).
Called in serve.go immediately after RecoverStaleRunning().
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/cli/serve.go | 1 | ||||
| -rw-r--r-- | internal/executor/executor.go | 17 | ||||
| -rw-r--r-- | internal/executor/executor_test.go | 58 |
3 files changed, 76 insertions, 0 deletions
diff --git a/internal/cli/serve.go b/internal/cli/serve.go index e5bd873..fd9fda8 100644 --- a/internal/cli/serve.go +++ b/internal/cli/serve.go @@ -77,6 +77,7 @@ func serve(addr string) error { pool.Classifier = &executor.Classifier{GeminiBinaryPath: cfg.GeminiBinaryPath} } pool.RecoverStaleRunning() + pool.RecoverStaleQueued(context.Background()) srv := api.NewServer(store, pool, logger, cfg.ClaudeBinaryPath, cfg.GeminiBinaryPath) if cfg.WebhookURL != "" { diff --git a/internal/executor/executor.go b/internal/executor/executor.go index bafacd2..7ae4e2d 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -586,6 +586,23 @@ func (p *Pool) RecoverStaleRunning() { } } +// RecoverStaleQueued re-submits any tasks that are stuck in QUEUED state from +// a previous server instance. Call this once on server startup, after +// RecoverStaleRunning. +func (p *Pool) RecoverStaleQueued(ctx context.Context) { + tasks, err := p.store.ListTasks(storage.TaskFilter{State: task.StateQueued}) + if err != nil { + p.logger.Error("RecoverStaleQueued: list tasks", "error", err) + return + } + for _, t := range tasks { + p.logger.Info("resubmitting stale QUEUED task", "taskID", t.ID, "name", t.Name) + if err := p.Submit(ctx, t); err != nil { + p.logger.Error("RecoverStaleQueued: submit", "error", err, "taskID", t.ID) + } + } +} + // terminalFailureStates are dependency states that cause the waiting task to fail immediately. var terminalFailureStates = map[task.State]bool{ task.StateFailed: true, diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go index 9448816..7e676eb 100644 --- a/internal/executor/executor_test.go +++ b/internal/executor/executor_test.go @@ -614,6 +614,60 @@ func TestPool_RecoverStaleRunning(t *testing.T) { } } +func TestPool_RecoverStaleQueued_ResubmitsToPool(t *testing.T) { + store := testStore(t) + runner := &mockRunner{} + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + pool := NewPool(2, map[string]Runner{"claude": runner}, store, logger) + + // Create a task already in QUEUED state (persisted from before a server restart). + tk := makeTask("stale-queued-1") + tk.State = task.StateQueued + store.CreateTask(tk) + + pool.RecoverStaleQueued(context.Background()) + + // Wait for the pool to pick it up and complete it. + select { + case result := <-pool.Results(): + if result.TaskID != tk.ID { + t.Errorf("unexpected task in results: %s", result.TaskID) + } + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for stale QUEUED task to complete") + } + + got, err := store.GetTask(tk.ID) + if err != nil { + t.Fatalf("get task: %v", err) + } + if got.State != task.StateCompleted && got.State != task.StateReady { + t.Errorf("state: want COMPLETED or READY, got %q", got.State) + } + if runner.callCount() != 1 { + t.Errorf("runner call count: want 1, got %d", runner.callCount()) + } +} + +func TestPool_RecoverStaleQueued_SkipsNonQueuedTasks(t *testing.T) { + store := testStore(t) + runner := &mockRunner{} + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + pool := NewPool(2, map[string]Runner{"claude": runner}, store, logger) + + // PENDING task should NOT be resubmitted. + tk := makeTask("pending-1") + tk.State = task.StatePending + store.CreateTask(tk) + + pool.RecoverStaleQueued(context.Background()) + time.Sleep(50 * time.Millisecond) + + if runner.callCount() != 0 { + t.Errorf("runner should not have been called for PENDING task, got %d calls", runner.callCount()) + } +} + func TestPool_ActivePerAgent_DeletesZeroEntries(t *testing.T) { store := testStore(t) runner := &mockRunner{} @@ -906,6 +960,10 @@ func (m *minimalMockStore) UpdateTaskQuestion(taskID, questionJSON string) error m.mu.Unlock() return nil } +func (m *minimalMockStore) UpdateTaskSummary(taskID, summary string) error { return nil } +func (m *minimalMockStore) AppendTaskInteraction(taskID string, _ task.Interaction) error { + return nil +} func (m *minimalMockStore) lastStateUpdate() (string, task.State, bool) { m.mu.Lock() |
