summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPeter Stone <thepeterstone@gmail.com>2026-03-13 03:15:02 +0000
committerPeter Stone <thepeterstone@gmail.com>2026-03-13 03:15:02 +0000
commitd5f83f8662c9f9c0fb52b206b06d4dd54a7788b4 (patch)
tree780da46be8f8f80cc7697546e8361cd405d6f3fa
parentb9aba3d242482fa9cd42f2a49b2767a73d4d2213 (diff)
fix: resubmit QUEUED tasks on server startup to prevent them getting stuck
Add Pool.RecoverStaleQueued() that lists all QUEUED tasks from the DB on startup and re-submits them to the in-memory pool. Previously, tasks that were QUEUED when the server restarted would remain stuck indefinitely since only RUNNING tasks were recovered (and marked FAILED). Called in serve.go immediately after RecoverStaleRunning(). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
-rw-r--r--internal/cli/serve.go1
-rw-r--r--internal/executor/executor.go17
-rw-r--r--internal/executor/executor_test.go58
3 files changed, 76 insertions, 0 deletions
diff --git a/internal/cli/serve.go b/internal/cli/serve.go
index e5bd873..fd9fda8 100644
--- a/internal/cli/serve.go
+++ b/internal/cli/serve.go
@@ -77,6 +77,7 @@ func serve(addr string) error {
pool.Classifier = &executor.Classifier{GeminiBinaryPath: cfg.GeminiBinaryPath}
}
pool.RecoverStaleRunning()
+ pool.RecoverStaleQueued(context.Background())
srv := api.NewServer(store, pool, logger, cfg.ClaudeBinaryPath, cfg.GeminiBinaryPath)
if cfg.WebhookURL != "" {
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index bafacd2..7ae4e2d 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -586,6 +586,23 @@ func (p *Pool) RecoverStaleRunning() {
}
}
+// RecoverStaleQueued re-submits any tasks that are stuck in QUEUED state from
+// a previous server instance. Call this once on server startup, after
+// RecoverStaleRunning.
+func (p *Pool) RecoverStaleQueued(ctx context.Context) {
+ tasks, err := p.store.ListTasks(storage.TaskFilter{State: task.StateQueued})
+ if err != nil {
+ p.logger.Error("RecoverStaleQueued: list tasks", "error", err)
+ return
+ }
+ for _, t := range tasks {
+ p.logger.Info("resubmitting stale QUEUED task", "taskID", t.ID, "name", t.Name)
+ if err := p.Submit(ctx, t); err != nil {
+ p.logger.Error("RecoverStaleQueued: submit", "error", err, "taskID", t.ID)
+ }
+ }
+}
+
// terminalFailureStates are dependency states that cause the waiting task to fail immediately.
var terminalFailureStates = map[task.State]bool{
task.StateFailed: true,
diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go
index 9448816..7e676eb 100644
--- a/internal/executor/executor_test.go
+++ b/internal/executor/executor_test.go
@@ -614,6 +614,60 @@ func TestPool_RecoverStaleRunning(t *testing.T) {
}
}
+func TestPool_RecoverStaleQueued_ResubmitsToPool(t *testing.T) {
+ store := testStore(t)
+ runner := &mockRunner{}
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, map[string]Runner{"claude": runner}, store, logger)
+
+ // Create a task already in QUEUED state (persisted from before a server restart).
+ tk := makeTask("stale-queued-1")
+ tk.State = task.StateQueued
+ store.CreateTask(tk)
+
+ pool.RecoverStaleQueued(context.Background())
+
+ // Wait for the pool to pick it up and complete it.
+ select {
+ case result := <-pool.Results():
+ if result.TaskID != tk.ID {
+ t.Errorf("unexpected task in results: %s", result.TaskID)
+ }
+ case <-time.After(2 * time.Second):
+ t.Fatal("timed out waiting for stale QUEUED task to complete")
+ }
+
+ got, err := store.GetTask(tk.ID)
+ if err != nil {
+ t.Fatalf("get task: %v", err)
+ }
+ if got.State != task.StateCompleted && got.State != task.StateReady {
+ t.Errorf("state: want COMPLETED or READY, got %q", got.State)
+ }
+ if runner.callCount() != 1 {
+ t.Errorf("runner call count: want 1, got %d", runner.callCount())
+ }
+}
+
+func TestPool_RecoverStaleQueued_SkipsNonQueuedTasks(t *testing.T) {
+ store := testStore(t)
+ runner := &mockRunner{}
+ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
+ pool := NewPool(2, map[string]Runner{"claude": runner}, store, logger)
+
+ // PENDING task should NOT be resubmitted.
+ tk := makeTask("pending-1")
+ tk.State = task.StatePending
+ store.CreateTask(tk)
+
+ pool.RecoverStaleQueued(context.Background())
+ time.Sleep(50 * time.Millisecond)
+
+ if runner.callCount() != 0 {
+ t.Errorf("runner should not have been called for PENDING task, got %d calls", runner.callCount())
+ }
+}
+
func TestPool_ActivePerAgent_DeletesZeroEntries(t *testing.T) {
store := testStore(t)
runner := &mockRunner{}
@@ -906,6 +960,10 @@ func (m *minimalMockStore) UpdateTaskQuestion(taskID, questionJSON string) error
m.mu.Unlock()
return nil
}
+func (m *minimalMockStore) UpdateTaskSummary(taskID, summary string) error { return nil }
+func (m *minimalMockStore) AppendTaskInteraction(taskID string, _ task.Interaction) error {
+ return nil
+}
func (m *minimalMockStore) lastStateUpdate() (string, task.State, bool) {
m.mu.Lock()