diff options
| -rw-r--r-- | internal/cli/serve.go | 1 | ||||
| -rw-r--r-- | internal/executor/executor.go | 17 | ||||
| -rw-r--r-- | internal/executor/executor_test.go | 58 |
3 files changed, 76 insertions, 0 deletions
diff --git a/internal/cli/serve.go b/internal/cli/serve.go index e5bd873..fd9fda8 100644 --- a/internal/cli/serve.go +++ b/internal/cli/serve.go @@ -77,6 +77,7 @@ func serve(addr string) error { pool.Classifier = &executor.Classifier{GeminiBinaryPath: cfg.GeminiBinaryPath} } pool.RecoverStaleRunning() + pool.RecoverStaleQueued(context.Background()) srv := api.NewServer(store, pool, logger, cfg.ClaudeBinaryPath, cfg.GeminiBinaryPath) if cfg.WebhookURL != "" { diff --git a/internal/executor/executor.go b/internal/executor/executor.go index bafacd2..7ae4e2d 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -586,6 +586,23 @@ func (p *Pool) RecoverStaleRunning() { } } +// RecoverStaleQueued re-submits any tasks that are stuck in QUEUED state from +// a previous server instance. Call this once on server startup, after +// RecoverStaleRunning. +func (p *Pool) RecoverStaleQueued(ctx context.Context) { + tasks, err := p.store.ListTasks(storage.TaskFilter{State: task.StateQueued}) + if err != nil { + p.logger.Error("RecoverStaleQueued: list tasks", "error", err) + return + } + for _, t := range tasks { + p.logger.Info("resubmitting stale QUEUED task", "taskID", t.ID, "name", t.Name) + if err := p.Submit(ctx, t); err != nil { + p.logger.Error("RecoverStaleQueued: submit", "error", err, "taskID", t.ID) + } + } +} + // terminalFailureStates are dependency states that cause the waiting task to fail immediately. var terminalFailureStates = map[task.State]bool{ task.StateFailed: true, diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go index 9448816..7e676eb 100644 --- a/internal/executor/executor_test.go +++ b/internal/executor/executor_test.go @@ -614,6 +614,60 @@ func TestPool_RecoverStaleRunning(t *testing.T) { } } +func TestPool_RecoverStaleQueued_ResubmitsToPool(t *testing.T) { + store := testStore(t) + runner := &mockRunner{} + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + pool := NewPool(2, map[string]Runner{"claude": runner}, store, logger) + + // Create a task already in QUEUED state (persisted from before a server restart). + tk := makeTask("stale-queued-1") + tk.State = task.StateQueued + store.CreateTask(tk) + + pool.RecoverStaleQueued(context.Background()) + + // Wait for the pool to pick it up and complete it. + select { + case result := <-pool.Results(): + if result.TaskID != tk.ID { + t.Errorf("unexpected task in results: %s", result.TaskID) + } + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for stale QUEUED task to complete") + } + + got, err := store.GetTask(tk.ID) + if err != nil { + t.Fatalf("get task: %v", err) + } + if got.State != task.StateCompleted && got.State != task.StateReady { + t.Errorf("state: want COMPLETED or READY, got %q", got.State) + } + if runner.callCount() != 1 { + t.Errorf("runner call count: want 1, got %d", runner.callCount()) + } +} + +func TestPool_RecoverStaleQueued_SkipsNonQueuedTasks(t *testing.T) { + store := testStore(t) + runner := &mockRunner{} + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + pool := NewPool(2, map[string]Runner{"claude": runner}, store, logger) + + // PENDING task should NOT be resubmitted. + tk := makeTask("pending-1") + tk.State = task.StatePending + store.CreateTask(tk) + + pool.RecoverStaleQueued(context.Background()) + time.Sleep(50 * time.Millisecond) + + if runner.callCount() != 0 { + t.Errorf("runner should not have been called for PENDING task, got %d calls", runner.callCount()) + } +} + func TestPool_ActivePerAgent_DeletesZeroEntries(t *testing.T) { store := testStore(t) runner := &mockRunner{} @@ -906,6 +960,10 @@ func (m *minimalMockStore) UpdateTaskQuestion(taskID, questionJSON string) error m.mu.Unlock() return nil } +func (m *minimalMockStore) UpdateTaskSummary(taskID, summary string) error { return nil } +func (m *minimalMockStore) AppendTaskInteraction(taskID string, _ task.Interaction) error { + return nil +} func (m *minimalMockStore) lastStateUpdate() (string, task.State, bool) { m.mu.Lock() |
