diff options
Diffstat (limited to 'internal/api/server.go')
| -rw-r--r-- | internal/api/server.go | 131 |
1 files changed, 115 insertions, 16 deletions
diff --git a/internal/api/server.go b/internal/api/server.go index 33048e4..28cfe4a 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -16,6 +16,7 @@ import ( "github.com/thepeterstone/claudomator/internal/notify" "github.com/thepeterstone/claudomator/internal/storage" "github.com/thepeterstone/claudomator/internal/task" + "github.com/thepeterstone/claudomator/internal/version" webui "github.com/thepeterstone/claudomator/web" "github.com/google/uuid" ) @@ -31,6 +32,7 @@ type questionStore interface { // Server provides the REST API and WebSocket endpoint for Claudomator. type Server struct { + ctx context.Context // server lifecycle context; used for pool submissions store *storage.DB logStore logStore // injectable for tests; defaults to store taskLogStore taskLogStore // injectable for tests; defaults to store @@ -51,7 +53,12 @@ type Server struct { elaborateLimiter *ipRateLimiter // per-IP rate limiter for elaborate/validate endpoints webhookSecret string // HMAC-SHA256 secret for GitHub webhook validation projects []config.Project // configured projects for webhook routing - llm *llm.Client // optional local LLM client; when set, elaboration prefers it + vapidPublicKey string + vapidPrivateKey string + vapidEmail string + pushStore pushSubscriptionStore + dropsDir string + llm *llm.Client } // SetAPIToken configures a bearer token that must be supplied to access the API. @@ -59,6 +66,12 @@ func (s *Server) SetAPIToken(token string) { s.apiToken = token } +// SetContext replaces the server's lifecycle context used for pool submissions. +// Call this before StartHub to tie task submissions to the server's shutdown signal. +func (s *Server) SetContext(ctx context.Context) { + s.ctx = ctx +} + // SetNotifier configures a notifier that is called on every task completion. func (s *Server) SetNotifier(n notify.Notifier) { s.notifier = n @@ -75,6 +88,9 @@ func (s *Server) SetWorkspaceRoot(path string) { s.workspaceRoot = path } +// Pool returns the executor pool, for graceful shutdown by the caller. +func (s *Server) Pool() *executor.Pool { return s.pool } + // SetLLM wires a local OpenAI-compatible LLM client for use by elaboration // (and future internal helpers). When non-nil, elaboration will prefer it // over the Claude CLI; on failure it falls back to claude → gemini. @@ -82,9 +98,11 @@ func (s *Server) SetLLM(c *llm.Client) { s.llm = c } + func NewServer(store *storage.DB, pool *executor.Pool, logger *slog.Logger, claudeBinPath, geminiBinPath string) *Server { wd, _ := os.Getwd() s := &Server{ + ctx: context.Background(), store: store, logStore: store, taskLogStore: store, @@ -125,6 +143,8 @@ func (s *Server) routes() { s.mux.HandleFunc("GET /api/tasks/{id}/subtasks", s.handleListSubtasks) s.mux.HandleFunc("GET /api/tasks/{id}/executions", s.handleListExecutions) s.mux.HandleFunc("GET /api/executions", s.handleListRecentExecutions) + s.mux.HandleFunc("GET /api/stats", s.handleGetDashboardStats) + s.mux.HandleFunc("GET /api/agents/status", s.handleGetAgentStatus) s.mux.HandleFunc("GET /api/executions/{id}", s.handleGetExecution) s.mux.HandleFunc("GET /api/executions/{id}/log", s.handleGetExecutionLog) s.mux.HandleFunc("GET /api/tasks/{id}/logs/stream", s.handleStreamTaskLogs) @@ -135,29 +155,53 @@ func (s *Server) routes() { s.mux.HandleFunc("GET /api/ws", s.handleWebSocket) s.mux.HandleFunc("GET /api/workspaces", s.handleListWorkspaces) s.mux.HandleFunc("GET /api/tasks/{id}/deployment-status", s.handleGetDeploymentStatus) + s.mux.HandleFunc("GET /api/projects", s.handleListProjects) + s.mux.HandleFunc("POST /api/projects", s.handleCreateProject) + s.mux.HandleFunc("GET /api/projects/{id}", s.handleGetProject) + s.mux.HandleFunc("PUT /api/projects/{id}", s.handleUpdateProject) + s.mux.HandleFunc("POST /api/stories/elaborate", s.handleElaborateStory) + s.mux.HandleFunc("POST /api/stories/approve", s.handleApproveStory) + s.mux.HandleFunc("GET /api/stories", s.handleListStories) + s.mux.HandleFunc("POST /api/stories", s.handleCreateStory) + s.mux.HandleFunc("GET /api/stories/{id}", s.handleGetStory) + s.mux.HandleFunc("GET /api/stories/{id}/tasks", s.handleListStoryTasks) + s.mux.HandleFunc("POST /api/stories/{id}/tasks", s.handleAddTaskToStory) + s.mux.HandleFunc("PUT /api/stories/{id}/status", s.handleUpdateStoryStatus) + s.mux.HandleFunc("POST /api/stories/{id}/ship", s.handleShipStory) + s.mux.HandleFunc("GET /api/stories/{id}/deployment-status", s.handleStoryDeploymentStatus) s.mux.HandleFunc("GET /api/health", s.handleHealth) + s.mux.HandleFunc("GET /api/version", s.handleVersion) s.mux.HandleFunc("POST /api/webhooks/github", s.handleGitHubWebhook) + s.mux.HandleFunc("GET /api/push/vapid-key", s.handleGetVAPIDKey) + s.mux.HandleFunc("GET /api/push/sw.js", s.handleServiceWorker) + s.mux.HandleFunc("POST /api/push/subscribe", s.handlePushSubscribe) + s.mux.HandleFunc("DELETE /api/push/subscribe", s.handlePushUnsubscribe) + s.mux.HandleFunc("GET /api/drops", s.handleListDrops) + s.mux.HandleFunc("GET /api/drops/{filename}", s.handleGetDrop) + s.mux.HandleFunc("POST /api/drops", s.handlePostDrop) s.mux.Handle("GET /", http.FileServerFS(webui.Files)) } -// forwardResults listens on the executor pool's result channel and broadcasts via WebSocket. +// forwardResults listens on the executor pool's result and started channels and broadcasts via WebSocket. func (s *Server) forwardResults() { + go func() { + for taskID := range s.pool.Started() { + event := map[string]interface{}{ + "type": "task_started", + "task_id": taskID, + "timestamp": time.Now().UTC(), + } + data, _ := json.Marshal(event) + s.hub.Broadcast(data) + } + }() for result := range s.pool.Results() { s.processResult(result) } } // processResult broadcasts a task completion event via WebSocket and calls the notifier if set. -// It also parses git diff stats from the execution stdout log and persists them. func (s *Server) processResult(result *executor.Result) { - if result.Execution.StdoutPath != "" { - if stats := parseChangestatFromFile(result.Execution.StdoutPath); stats != nil { - if err := s.store.UpdateExecutionChangestats(result.Execution.ID, stats); err != nil { - s.logger.Error("failed to store changestats", "execID", result.Execution.ID, "error", err) - } - } - } - event := map[string]interface{}{ "type": "task_completed", "task_id": result.TaskID, @@ -318,7 +362,7 @@ func (s *Server) handleAnswerQuestion(w http.ResponseWriter, r *http.Request) { ResumeAnswer: input.Answer, SandboxDir: latest.SandboxDir, } - if err := s.pool.SubmitResume(context.Background(), tk, resumeExec); err != nil { + if err := s.pool.SubmitResume(s.ctx, tk, resumeExec); err != nil { writeJSON(w, http.StatusServiceUnavailable, map[string]string{"error": err.Error()}) return } @@ -363,7 +407,7 @@ func (s *Server) handleResumeTimedOutTask(w http.ResponseWriter, r *http.Request ResumeSessionID: latest.SessionID, ResumeAnswer: resumeMsg, } - if err := s.pool.SubmitResume(context.Background(), tk, resumeExec); err != nil { + if err := s.pool.SubmitResume(s.ctx, tk, resumeExec); err != nil { writeJSON(w, http.StatusServiceUnavailable, map[string]string{"error": err.Error()}) return } @@ -415,11 +459,17 @@ func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { }) } +func (s *Server) handleVersion(w http.ResponseWriter, r *http.Request) { + writeJSON(w, http.StatusOK, map[string]string{"version": version.Version()}) +} + func (s *Server) handleCreateTask(w http.ResponseWriter, r *http.Request) { var input struct { Name string `json:"name"` Description string `json:"description"` ElaborationInput string `json:"elaboration_input"` + Project string `json:"project"` + RepositoryURL string `json:"repository_url"` Agent task.AgentConfig `json:"agent"` Claude task.AgentConfig `json:"claude"` // legacy alias Timeout string `json:"timeout"` @@ -443,6 +493,8 @@ func (s *Server) handleCreateTask(w http.ResponseWriter, r *http.Request) { Name: input.Name, Description: input.Description, ElaborationInput: input.ElaborationInput, + Project: input.Project, + RepositoryURL: input.RepositoryURL, Agent: input.Agent, Priority: task.Priority(input.Priority), Tags: input.Tags, @@ -453,6 +505,7 @@ func (s *Server) handleCreateTask(w http.ResponseWriter, r *http.Request) { UpdatedAt: now, ParentTaskID: input.ParentTaskID, } + if t.Agent.Type == "" { t.Agent.Type = "claude" } @@ -523,7 +576,11 @@ func (s *Server) handleListTasks(w http.ResponseWriter, r *http.Request) { if tasks == nil { tasks = []*task.Task{} } - writeJSON(w, http.StatusOK, tasks) + views := make([]*taskView, len(tasks)) + for i, tk := range tasks { + views[i] = s.enrichTask(tk) + } + writeJSON(w, http.StatusOK, views) } func (s *Server) handleGetTask(w http.ResponseWriter, r *http.Request) { @@ -533,8 +590,43 @@ func (s *Server) handleGetTask(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusNotFound, map[string]string{"error": "task not found"}) return } - writeJSON(w, http.StatusOK, t) + writeJSON(w, http.StatusOK, s.enrichTask(t)) +} +// retryableDepStates are the states from which a dependency can be retried +// when cascading a retry from a dependent task. +var retryableDepStates = map[task.State]bool{ + task.StateFailed: true, + task.StateTimedOut: true, + task.StateCancelled: true, + task.StateBudgetExceeded: true, +} + +// cascadeRetryDeps resets any dependency (recursively) that is in a retryable +// terminal state, and submits it to the pool. This ensures that retrying a +// CANCELLED task that was blocked by a failed dep will also restart that dep. +func (s *Server) cascadeRetryDeps(ctx context.Context, t *task.Task) { + for _, depID := range t.DependsOn { + dep, err := s.store.GetTask(depID) + if err != nil { + s.logger.Warn("cascadeRetryDeps: dep not found", "depID", depID) + continue + } + if !retryableDepStates[dep.State] { + continue + } + // Recursively cascade first (depth-first so root deps go first). + s.cascadeRetryDeps(ctx, dep) + reset, err := s.store.ResetTaskForRetry(depID) + if err != nil { + s.logger.Warn("cascadeRetryDeps: reset failed", "depID", depID, "error", err) + continue + } + if submitErr := s.pool.Submit(ctx, reset); submitErr != nil { + s.logger.Warn("cascadeRetryDeps: submit failed", "depID", depID, "error", submitErr) + } + } } + func (s *Server) handleRunTask(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") agentParam := r.URL.Query().Get("agent") // Use a different name to avoid confusion @@ -583,7 +675,11 @@ func (s *Server) handleRunTask(w http.ResponseWriter, r *http.Request) { } // The task `t` now has the correct agent configuration. - if err := s.pool.Submit(context.Background(), t); err != nil { + // 6. Cascade-retry any deps that are in a terminal failure state so the + // task isn't immediately re-cancelled by checkDepsReady. + s.cascadeRetryDeps(r.Context(), originalTask) + + if err := s.pool.Submit(s.ctx, t); err != nil { writeJSON(w, http.StatusServiceUnavailable, map[string]string{"error": fmt.Sprintf("executor pool: %v", err)}) return } @@ -611,6 +707,9 @@ func (s *Server) handleAcceptTask(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) return } + if t.StoryID != "" { + go s.pool.CheckStoryCompletion(r.Context(), t.StoryID) + } writeJSON(w, http.StatusOK, map[string]string{"message": "task accepted", "task_id": id}) } |
