summaryrefslogtreecommitdiff
path: root/internal/api/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/api/server.go')
-rw-r--r--internal/api/server.go131
1 files changed, 115 insertions, 16 deletions
diff --git a/internal/api/server.go b/internal/api/server.go
index 33048e4..28cfe4a 100644
--- a/internal/api/server.go
+++ b/internal/api/server.go
@@ -16,6 +16,7 @@ import (
"github.com/thepeterstone/claudomator/internal/notify"
"github.com/thepeterstone/claudomator/internal/storage"
"github.com/thepeterstone/claudomator/internal/task"
+ "github.com/thepeterstone/claudomator/internal/version"
webui "github.com/thepeterstone/claudomator/web"
"github.com/google/uuid"
)
@@ -31,6 +32,7 @@ type questionStore interface {
// Server provides the REST API and WebSocket endpoint for Claudomator.
type Server struct {
+ ctx context.Context // server lifecycle context; used for pool submissions
store *storage.DB
logStore logStore // injectable for tests; defaults to store
taskLogStore taskLogStore // injectable for tests; defaults to store
@@ -51,7 +53,12 @@ type Server struct {
elaborateLimiter *ipRateLimiter // per-IP rate limiter for elaborate/validate endpoints
webhookSecret string // HMAC-SHA256 secret for GitHub webhook validation
projects []config.Project // configured projects for webhook routing
- llm *llm.Client // optional local LLM client; when set, elaboration prefers it
+ vapidPublicKey string
+ vapidPrivateKey string
+ vapidEmail string
+ pushStore pushSubscriptionStore
+ dropsDir string
+ llm *llm.Client
}
// SetAPIToken configures a bearer token that must be supplied to access the API.
@@ -59,6 +66,12 @@ func (s *Server) SetAPIToken(token string) {
s.apiToken = token
}
+// SetContext replaces the server's lifecycle context used for pool submissions.
+// Call this before StartHub to tie task submissions to the server's shutdown signal.
+func (s *Server) SetContext(ctx context.Context) {
+ s.ctx = ctx
+}
+
// SetNotifier configures a notifier that is called on every task completion.
func (s *Server) SetNotifier(n notify.Notifier) {
s.notifier = n
@@ -75,6 +88,9 @@ func (s *Server) SetWorkspaceRoot(path string) {
s.workspaceRoot = path
}
+// Pool returns the executor pool, for graceful shutdown by the caller.
+func (s *Server) Pool() *executor.Pool { return s.pool }
+
// SetLLM wires a local OpenAI-compatible LLM client for use by elaboration
// (and future internal helpers). When non-nil, elaboration will prefer it
// over the Claude CLI; on failure it falls back to claude → gemini.
@@ -82,9 +98,11 @@ func (s *Server) SetLLM(c *llm.Client) {
s.llm = c
}
+
func NewServer(store *storage.DB, pool *executor.Pool, logger *slog.Logger, claudeBinPath, geminiBinPath string) *Server {
wd, _ := os.Getwd()
s := &Server{
+ ctx: context.Background(),
store: store,
logStore: store,
taskLogStore: store,
@@ -125,6 +143,8 @@ func (s *Server) routes() {
s.mux.HandleFunc("GET /api/tasks/{id}/subtasks", s.handleListSubtasks)
s.mux.HandleFunc("GET /api/tasks/{id}/executions", s.handleListExecutions)
s.mux.HandleFunc("GET /api/executions", s.handleListRecentExecutions)
+ s.mux.HandleFunc("GET /api/stats", s.handleGetDashboardStats)
+ s.mux.HandleFunc("GET /api/agents/status", s.handleGetAgentStatus)
s.mux.HandleFunc("GET /api/executions/{id}", s.handleGetExecution)
s.mux.HandleFunc("GET /api/executions/{id}/log", s.handleGetExecutionLog)
s.mux.HandleFunc("GET /api/tasks/{id}/logs/stream", s.handleStreamTaskLogs)
@@ -135,29 +155,53 @@ func (s *Server) routes() {
s.mux.HandleFunc("GET /api/ws", s.handleWebSocket)
s.mux.HandleFunc("GET /api/workspaces", s.handleListWorkspaces)
s.mux.HandleFunc("GET /api/tasks/{id}/deployment-status", s.handleGetDeploymentStatus)
+ s.mux.HandleFunc("GET /api/projects", s.handleListProjects)
+ s.mux.HandleFunc("POST /api/projects", s.handleCreateProject)
+ s.mux.HandleFunc("GET /api/projects/{id}", s.handleGetProject)
+ s.mux.HandleFunc("PUT /api/projects/{id}", s.handleUpdateProject)
+ s.mux.HandleFunc("POST /api/stories/elaborate", s.handleElaborateStory)
+ s.mux.HandleFunc("POST /api/stories/approve", s.handleApproveStory)
+ s.mux.HandleFunc("GET /api/stories", s.handleListStories)
+ s.mux.HandleFunc("POST /api/stories", s.handleCreateStory)
+ s.mux.HandleFunc("GET /api/stories/{id}", s.handleGetStory)
+ s.mux.HandleFunc("GET /api/stories/{id}/tasks", s.handleListStoryTasks)
+ s.mux.HandleFunc("POST /api/stories/{id}/tasks", s.handleAddTaskToStory)
+ s.mux.HandleFunc("PUT /api/stories/{id}/status", s.handleUpdateStoryStatus)
+ s.mux.HandleFunc("POST /api/stories/{id}/ship", s.handleShipStory)
+ s.mux.HandleFunc("GET /api/stories/{id}/deployment-status", s.handleStoryDeploymentStatus)
s.mux.HandleFunc("GET /api/health", s.handleHealth)
+ s.mux.HandleFunc("GET /api/version", s.handleVersion)
s.mux.HandleFunc("POST /api/webhooks/github", s.handleGitHubWebhook)
+ s.mux.HandleFunc("GET /api/push/vapid-key", s.handleGetVAPIDKey)
+ s.mux.HandleFunc("GET /api/push/sw.js", s.handleServiceWorker)
+ s.mux.HandleFunc("POST /api/push/subscribe", s.handlePushSubscribe)
+ s.mux.HandleFunc("DELETE /api/push/subscribe", s.handlePushUnsubscribe)
+ s.mux.HandleFunc("GET /api/drops", s.handleListDrops)
+ s.mux.HandleFunc("GET /api/drops/{filename}", s.handleGetDrop)
+ s.mux.HandleFunc("POST /api/drops", s.handlePostDrop)
s.mux.Handle("GET /", http.FileServerFS(webui.Files))
}
-// forwardResults listens on the executor pool's result channel and broadcasts via WebSocket.
+// forwardResults listens on the executor pool's result and started channels and broadcasts via WebSocket.
func (s *Server) forwardResults() {
+ go func() {
+ for taskID := range s.pool.Started() {
+ event := map[string]interface{}{
+ "type": "task_started",
+ "task_id": taskID,
+ "timestamp": time.Now().UTC(),
+ }
+ data, _ := json.Marshal(event)
+ s.hub.Broadcast(data)
+ }
+ }()
for result := range s.pool.Results() {
s.processResult(result)
}
}
// processResult broadcasts a task completion event via WebSocket and calls the notifier if set.
-// It also parses git diff stats from the execution stdout log and persists them.
func (s *Server) processResult(result *executor.Result) {
- if result.Execution.StdoutPath != "" {
- if stats := parseChangestatFromFile(result.Execution.StdoutPath); stats != nil {
- if err := s.store.UpdateExecutionChangestats(result.Execution.ID, stats); err != nil {
- s.logger.Error("failed to store changestats", "execID", result.Execution.ID, "error", err)
- }
- }
- }
-
event := map[string]interface{}{
"type": "task_completed",
"task_id": result.TaskID,
@@ -318,7 +362,7 @@ func (s *Server) handleAnswerQuestion(w http.ResponseWriter, r *http.Request) {
ResumeAnswer: input.Answer,
SandboxDir: latest.SandboxDir,
}
- if err := s.pool.SubmitResume(context.Background(), tk, resumeExec); err != nil {
+ if err := s.pool.SubmitResume(s.ctx, tk, resumeExec); err != nil {
writeJSON(w, http.StatusServiceUnavailable, map[string]string{"error": err.Error()})
return
}
@@ -363,7 +407,7 @@ func (s *Server) handleResumeTimedOutTask(w http.ResponseWriter, r *http.Request
ResumeSessionID: latest.SessionID,
ResumeAnswer: resumeMsg,
}
- if err := s.pool.SubmitResume(context.Background(), tk, resumeExec); err != nil {
+ if err := s.pool.SubmitResume(s.ctx, tk, resumeExec); err != nil {
writeJSON(w, http.StatusServiceUnavailable, map[string]string{"error": err.Error()})
return
}
@@ -415,11 +459,17 @@ func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
})
}
+func (s *Server) handleVersion(w http.ResponseWriter, r *http.Request) {
+ writeJSON(w, http.StatusOK, map[string]string{"version": version.Version()})
+}
+
func (s *Server) handleCreateTask(w http.ResponseWriter, r *http.Request) {
var input struct {
Name string `json:"name"`
Description string `json:"description"`
ElaborationInput string `json:"elaboration_input"`
+ Project string `json:"project"`
+ RepositoryURL string `json:"repository_url"`
Agent task.AgentConfig `json:"agent"`
Claude task.AgentConfig `json:"claude"` // legacy alias
Timeout string `json:"timeout"`
@@ -443,6 +493,8 @@ func (s *Server) handleCreateTask(w http.ResponseWriter, r *http.Request) {
Name: input.Name,
Description: input.Description,
ElaborationInput: input.ElaborationInput,
+ Project: input.Project,
+ RepositoryURL: input.RepositoryURL,
Agent: input.Agent,
Priority: task.Priority(input.Priority),
Tags: input.Tags,
@@ -453,6 +505,7 @@ func (s *Server) handleCreateTask(w http.ResponseWriter, r *http.Request) {
UpdatedAt: now,
ParentTaskID: input.ParentTaskID,
}
+
if t.Agent.Type == "" {
t.Agent.Type = "claude"
}
@@ -523,7 +576,11 @@ func (s *Server) handleListTasks(w http.ResponseWriter, r *http.Request) {
if tasks == nil {
tasks = []*task.Task{}
}
- writeJSON(w, http.StatusOK, tasks)
+ views := make([]*taskView, len(tasks))
+ for i, tk := range tasks {
+ views[i] = s.enrichTask(tk)
+ }
+ writeJSON(w, http.StatusOK, views)
}
func (s *Server) handleGetTask(w http.ResponseWriter, r *http.Request) {
@@ -533,8 +590,43 @@ func (s *Server) handleGetTask(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusNotFound, map[string]string{"error": "task not found"})
return
}
- writeJSON(w, http.StatusOK, t)
+ writeJSON(w, http.StatusOK, s.enrichTask(t))
+}
+// retryableDepStates are the states from which a dependency can be retried
+// when cascading a retry from a dependent task.
+var retryableDepStates = map[task.State]bool{
+ task.StateFailed: true,
+ task.StateTimedOut: true,
+ task.StateCancelled: true,
+ task.StateBudgetExceeded: true,
+}
+
+// cascadeRetryDeps resets any dependency (recursively) that is in a retryable
+// terminal state, and submits it to the pool. This ensures that retrying a
+// CANCELLED task that was blocked by a failed dep will also restart that dep.
+func (s *Server) cascadeRetryDeps(ctx context.Context, t *task.Task) {
+ for _, depID := range t.DependsOn {
+ dep, err := s.store.GetTask(depID)
+ if err != nil {
+ s.logger.Warn("cascadeRetryDeps: dep not found", "depID", depID)
+ continue
+ }
+ if !retryableDepStates[dep.State] {
+ continue
+ }
+ // Recursively cascade first (depth-first so root deps go first).
+ s.cascadeRetryDeps(ctx, dep)
+ reset, err := s.store.ResetTaskForRetry(depID)
+ if err != nil {
+ s.logger.Warn("cascadeRetryDeps: reset failed", "depID", depID, "error", err)
+ continue
+ }
+ if submitErr := s.pool.Submit(ctx, reset); submitErr != nil {
+ s.logger.Warn("cascadeRetryDeps: submit failed", "depID", depID, "error", submitErr)
+ }
+ }
}
+
func (s *Server) handleRunTask(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
agentParam := r.URL.Query().Get("agent") // Use a different name to avoid confusion
@@ -583,7 +675,11 @@ func (s *Server) handleRunTask(w http.ResponseWriter, r *http.Request) {
}
// The task `t` now has the correct agent configuration.
- if err := s.pool.Submit(context.Background(), t); err != nil {
+ // 6. Cascade-retry any deps that are in a terminal failure state so the
+ // task isn't immediately re-cancelled by checkDepsReady.
+ s.cascadeRetryDeps(r.Context(), originalTask)
+
+ if err := s.pool.Submit(s.ctx, t); err != nil {
writeJSON(w, http.StatusServiceUnavailable, map[string]string{"error": fmt.Sprintf("executor pool: %v", err)})
return
}
@@ -611,6 +707,9 @@ func (s *Server) handleAcceptTask(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
return
}
+ if t.StoryID != "" {
+ go s.pool.CheckStoryCompletion(r.Context(), t.StoryID)
+ }
writeJSON(w, http.StatusOK, map[string]string{"message": "task accepted", "task_id": id})
}