summaryrefslogtreecommitdiff
path: root/internal/executor
diff options
context:
space:
mode:
Diffstat (limited to 'internal/executor')
-rw-r--r--internal/executor/executor.go17
-rw-r--r--internal/executor/executor_test.go58
2 files changed, 75 insertions, 0 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index bafacd2..7ae4e2d 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -586,6 +586,23 @@ func (p *Pool) RecoverStaleRunning() {
}
}
+// RecoverStaleQueued re-submits any tasks that are stuck in QUEUED state from
+// a previous server instance. Call this once on server startup, after
+// RecoverStaleRunning.
+func (p *Pool) RecoverStaleQueued(ctx context.Context) {
+ tasks, err := p.store.ListTasks(storage.TaskFilter{State: task.StateQueued})
+ if err != nil {
+ p.logger.Error("RecoverStaleQueued: list tasks", "error", err)
+ return
+ }
+ for _, t := range tasks {
+ p.logger.Info("resubmitting stale QUEUED task", "taskID", t.ID, "name", t.Name)
+ if err := p.Submit(ctx, t); err != nil {
+ p.logger.Error("RecoverStaleQueued: submit", "error", err, "taskID", t.ID)
+ }
+ }
+}
+
// terminalFailureStates are dependency states that cause the waiting task to fail immediately.
var terminalFailureStates = map[task.State]bool{
task.StateFailed: true,
diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go
index 9448816..7e676eb 100644
--- a/internal/executor/executor_test.go
+++ b/internal/executor/executor_test.go
@@ -614,6 +614,60 @@ func TestPool_RecoverStaleRunning(t *testing.T) {
}
}
+func TestPool_RecoverStaleQueued_ResubmitsToPool(t *testing.T) {
+ store := testStore(t)
+ runner := &mockRunner{}
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, map[string]Runner{"claude": runner}, store, logger)
+
+ // Create a task already in QUEUED state (persisted from before a server restart).
+ tk := makeTask("stale-queued-1")
+ tk.State = task.StateQueued
+ store.CreateTask(tk)
+
+ pool.RecoverStaleQueued(context.Background())
+
+ // Wait for the pool to pick it up and complete it.
+ select {
+ case result := <-pool.Results():
+ if result.TaskID != tk.ID {
+ t.Errorf("unexpected task in results: %s", result.TaskID)
+ }
+ case <-time.After(2 * time.Second):
+ t.Fatal("timed out waiting for stale QUEUED task to complete")
+ }
+
+ got, err := store.GetTask(tk.ID)
+ if err != nil {
+ t.Fatalf("get task: %v", err)
+ }
+ if got.State != task.StateCompleted && got.State != task.StateReady {
+ t.Errorf("state: want COMPLETED or READY, got %q", got.State)
+ }
+ if runner.callCount() != 1 {
+ t.Errorf("runner call count: want 1, got %d", runner.callCount())
+ }
+}
+
+func TestPool_RecoverStaleQueued_SkipsNonQueuedTasks(t *testing.T) {
+ store := testStore(t)
+ runner := &mockRunner{}
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, map[string]Runner{"claude": runner}, store, logger)
+
+ // PENDING task should NOT be resubmitted.
+ tk := makeTask("pending-1")
+ tk.State = task.StatePending
+ store.CreateTask(tk)
+
+ pool.RecoverStaleQueued(context.Background())
+ time.Sleep(50 * time.Millisecond)
+
+ if runner.callCount() != 0 {
+ t.Errorf("runner should not have been called for PENDING task, got %d calls", runner.callCount())
+ }
+}
+
func TestPool_ActivePerAgent_DeletesZeroEntries(t *testing.T) {
store := testStore(t)
runner := &mockRunner{}
@@ -906,6 +960,10 @@ func (m *minimalMockStore) UpdateTaskQuestion(taskID, questionJSON string) error
m.mu.Unlock()
return nil
}
+func (m *minimalMockStore) UpdateTaskSummary(taskID, summary string) error { return nil }
+func (m *minimalMockStore) AppendTaskInteraction(taskID string, _ task.Interaction) error {
+ return nil
+}
func (m *minimalMockStore) lastStateUpdate() (string, task.State, bool) {
m.mu.Lock()