diff options
| author | Peter Stone <thepeterstone@gmail.com> | 2026-03-13 03:15:02 +0000 |
|---|---|---|
| committer | Peter Stone <thepeterstone@gmail.com> | 2026-03-13 03:15:02 +0000 |
| commit | d5f83f8662c9f9c0fb52b206b06d4dd54a7788b4 (patch) | |
| tree | 780da46be8f8f80cc7697546e8361cd405d6f3fa /internal/executor/executor_test.go | |
| parent | b9aba3d242482fa9cd42f2a49b2767a73d4d2213 (diff) | |
fix: resubmit QUEUED tasks on server startup to prevent them getting stuck
Add Pool.RecoverStaleQueued() that lists all QUEUED tasks from the DB on
startup and re-submits them to the in-memory pool. Previously, tasks that
were QUEUED when the server restarted would remain stuck indefinitely since
only RUNNING tasks were recovered (and marked FAILED).
Called in serve.go immediately after RecoverStaleRunning().
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/executor/executor_test.go')
| -rw-r--r-- | internal/executor/executor_test.go | 58 |
1 files changed, 58 insertions, 0 deletions
diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go index 9448816..7e676eb 100644 --- a/internal/executor/executor_test.go +++ b/internal/executor/executor_test.go @@ -614,6 +614,60 @@ func TestPool_RecoverStaleRunning(t *testing.T) { } } +func TestPool_RecoverStaleQueued_ResubmitsToPool(t *testing.T) { + store := testStore(t) + runner := &mockRunner{} + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + pool := NewPool(2, map[string]Runner{"claude": runner}, store, logger) + + // Create a task already in QUEUED state (persisted from before a server restart). + tk := makeTask("stale-queued-1") + tk.State = task.StateQueued + store.CreateTask(tk) + + pool.RecoverStaleQueued(context.Background()) + + // Wait for the pool to pick it up and complete it. + select { + case result := <-pool.Results(): + if result.TaskID != tk.ID { + t.Errorf("unexpected task in results: %s", result.TaskID) + } + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for stale QUEUED task to complete") + } + + got, err := store.GetTask(tk.ID) + if err != nil { + t.Fatalf("get task: %v", err) + } + if got.State != task.StateCompleted && got.State != task.StateReady { + t.Errorf("state: want COMPLETED or READY, got %q", got.State) + } + if runner.callCount() != 1 { + t.Errorf("runner call count: want 1, got %d", runner.callCount()) + } +} + +func TestPool_RecoverStaleQueued_SkipsNonQueuedTasks(t *testing.T) { + store := testStore(t) + runner := &mockRunner{} + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + pool := NewPool(2, map[string]Runner{"claude": runner}, store, logger) + + // PENDING task should NOT be resubmitted. + tk := makeTask("pending-1") + tk.State = task.StatePending + store.CreateTask(tk) + + pool.RecoverStaleQueued(context.Background()) + time.Sleep(50 * time.Millisecond) + + if runner.callCount() != 0 { + t.Errorf("runner should not have been called for PENDING task, got %d calls", runner.callCount()) + } +} + func TestPool_ActivePerAgent_DeletesZeroEntries(t *testing.T) { store := testStore(t) runner := &mockRunner{} @@ -906,6 +960,10 @@ func (m *minimalMockStore) UpdateTaskQuestion(taskID, questionJSON string) error m.mu.Unlock() return nil } +func (m *minimalMockStore) UpdateTaskSummary(taskID, summary string) error { return nil } +func (m *minimalMockStore) AppendTaskInteraction(taskID string, _ task.Interaction) error { + return nil +} func (m *minimalMockStore) lastStateUpdate() (string, task.State, bool) { m.mu.Lock() |
