diff options
| -rw-r--r-- | internal/api/server.go | 39 | ||||
| -rw-r--r-- | internal/api/server_test.go | 67 | ||||
| -rwxr-xr-x | scripts/reset-failed-tasks | 48 |
3 files changed, 152 insertions, 2 deletions
diff --git a/internal/api/server.go b/internal/api/server.go index 38108de..8eba829 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -571,6 +571,41 @@ func (s *Server) handleGetTask(w http.ResponseWriter, r *http.Request) { } writeJSON(w, http.StatusOK, s.enrichTask(t)) } +// retryableDepStates are the states from which a dependency can be retried +// when cascading a retry from a dependent task. +var retryableDepStates = map[task.State]bool{ + task.StateFailed: true, + task.StateTimedOut: true, + task.StateCancelled: true, + task.StateBudgetExceeded: true, +} + +// cascadeRetryDeps resets any dependency (recursively) that is in a retryable +// terminal state, and submits it to the pool. This ensures that retrying a +// CANCELLED task that was blocked by a failed dep will also restart that dep. +func (s *Server) cascadeRetryDeps(ctx context.Context, t *task.Task) { + for _, depID := range t.DependsOn { + dep, err := s.store.GetTask(depID) + if err != nil { + s.logger.Warn("cascadeRetryDeps: dep not found", "depID", depID) + continue + } + if !retryableDepStates[dep.State] { + continue + } + // Recursively cascade first (depth-first so root deps go first). + s.cascadeRetryDeps(ctx, dep) + reset, err := s.store.ResetTaskForRetry(depID) + if err != nil { + s.logger.Warn("cascadeRetryDeps: reset failed", "depID", depID, "error", err) + continue + } + if submitErr := s.pool.Submit(ctx, reset); submitErr != nil { + s.logger.Warn("cascadeRetryDeps: submit failed", "depID", depID, "error", submitErr) + } + } +} + func (s *Server) handleRunTask(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") agentParam := r.URL.Query().Get("agent") // Use a different name to avoid confusion @@ -619,6 +654,10 @@ func (s *Server) handleRunTask(w http.ResponseWriter, r *http.Request) { } // The task `t` now has the correct agent configuration. + // 6. Cascade-retry any deps that are in a terminal failure state so the + // task isn't immediately re-cancelled by checkDepsReady. + s.cascadeRetryDeps(r.Context(), originalTask) + if err := s.pool.Submit(context.Background(), t); err != nil { writeJSON(w, http.StatusServiceUnavailable, map[string]string{"error": fmt.Sprintf("executor pool: %v", err)}) return diff --git a/internal/api/server_test.go b/internal/api/server_test.go index 27fc645..4b45f25 100644 --- a/internal/api/server_test.go +++ b/internal/api/server_test.go @@ -2014,3 +2014,70 @@ func TestProjects_CRUD(t *testing.T) { t.Error("expected at least one project in list") } } + +func TestHandleRunTask_CascadesRetryToFailedDeps(t *testing.T) { + srv, store := testServer(t) + + now := time.Now().UTC() + + // Task A: the dependency, in FAILED state. + taskA := &task.Task{ + ID: "cascade-dep-a", + Name: "Dep A", + State: task.StateFailed, + DependsOn: []string{}, + Priority: task.PriorityNormal, + Tags: []string{}, + Agent: task.AgentConfig{Type: "claude", Instructions: "do A"}, + Retry: task.RetryConfig{MaxAttempts: 1, Backoff: "exponential"}, + CreatedAt: now, + UpdatedAt: now, + } + if err := store.CreateTask(taskA); err != nil { + t.Fatalf("CreateTask A: %v", err) + } + + // Task B: depends on A, in CANCELLED state (was cancelled because A failed). + taskB := &task.Task{ + ID: "cascade-task-b", + Name: "Task B", + State: task.StateCancelled, + DependsOn: []string{taskA.ID}, + Priority: task.PriorityNormal, + Tags: []string{}, + Agent: task.AgentConfig{Type: "claude", Instructions: "do B"}, + Retry: task.RetryConfig{MaxAttempts: 1, Backoff: "exponential"}, + CreatedAt: now, + UpdatedAt: now, + } + if err := store.CreateTask(taskB); err != nil { + t.Fatalf("CreateTask B: %v", err) + } + + // Run task B — should cascade-retry dep A. + req := httptest.NewRequest("POST", "/api/tasks/cascade-task-b/run", nil) + w := httptest.NewRecorder() + srv.mux.ServeHTTP(w, req) + + if w.Code != http.StatusAccepted { + t.Fatalf("expected 202, got %d: %s", w.Code, w.Body.String()) + } + + // Dep A should now be QUEUED. + a, err := store.GetTask(taskA.ID) + if err != nil { + t.Fatalf("GetTask A: %v", err) + } + if a.State != task.StateQueued { + t.Errorf("dep A: want QUEUED after cascade, got %s", a.State) + } + + // Task B itself should be QUEUED. + b, err := store.GetTask(taskB.ID) + if err != nil { + t.Fatalf("GetTask B: %v", err) + } + if b.State != task.StateQueued { + t.Errorf("task B: want QUEUED, got %s", b.State) + } +} diff --git a/scripts/reset-failed-tasks b/scripts/reset-failed-tasks index eddfff0..1f3b6d5 100755 --- a/scripts/reset-failed-tasks +++ b/scripts/reset-failed-tasks @@ -1,5 +1,49 @@ #!/bin/bash +# Reset FAILED and CANCELLED tasks to PENDING and delete their preserved workspaces. +# Usage: reset-failed-tasks [--dry-run] -DB_PATH="/site/doot.terst.org/data/claudomator.db" +set -euo pipefail -sqlite3 "$DB_PATH" "UPDATE tasks SET state = 'PENDING' WHERE state = 'FAILED';" +DB_PATH="${CLAUDOMATOR_DB:-/site/doot.terst.org/data/claudomator.db}" +DRY_RUN=false +[[ "${1:-}" == "--dry-run" ]] && DRY_RUN=true + +# Collect preserved sandbox dirs before resetting so we can clean them up. +SANDBOX_DIRS=$(sqlite3 "$DB_PATH" " + SELECT DISTINCT e.sandbox_dir + FROM executions e + JOIN tasks t ON t.id = e.task_id + WHERE t.state IN ('FAILED','CANCELLED') + AND e.sandbox_dir IS NOT NULL + AND e.sandbox_dir != ''; +") + +TASK_COUNT=$(sqlite3 "$DB_PATH" "SELECT COUNT(*) FROM tasks WHERE state IN ('FAILED','CANCELLED');") + +echo "Tasks to reset: $TASK_COUNT" + +if [[ "$DRY_RUN" == "true" ]]; then + echo "[dry-run] Would reset $TASK_COUNT task(s) to PENDING." + if [[ -n "$SANDBOX_DIRS" ]]; then + echo "[dry-run] Workspaces to delete:" + echo "$SANDBOX_DIRS" + else + echo "[dry-run] No preserved workspaces to delete." + fi + exit 0 +fi + +sqlite3 "$DB_PATH" "UPDATE tasks SET state = 'PENDING' WHERE state IN ('FAILED','CANCELLED');" +echo "Reset $TASK_COUNT task(s) to PENDING." + +DELETED=0 +while IFS= read -r dir; do + [[ -z "$dir" ]] && continue + if [[ -d "$dir" ]]; then + rm -rf "$dir" + echo "Deleted workspace: $dir" + DELETED=$((DELETED + 1)) + fi +done <<< "$SANDBOX_DIRS" + +echo "Deleted $DELETED workspace(s)." |
