summaryrefslogtreecommitdiff
path: root/internal/api/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/api/server.go')
-rw-r--r--internal/api/server.go92
1 files changed, 84 insertions, 8 deletions
diff --git a/internal/api/server.go b/internal/api/server.go
index 5b027e4..dd4627c 100644
--- a/internal/api/server.go
+++ b/internal/api/server.go
@@ -19,23 +19,27 @@ import (
// Server provides the REST API and WebSocket endpoint for Claudomator.
type Server struct {
store *storage.DB
- logStore logStore // injectable for tests; defaults to store
+ logStore logStore // injectable for tests; defaults to store
+ taskLogStore taskLogStore // injectable for tests; defaults to store
pool *executor.Pool
hub *Hub
logger *slog.Logger
mux *http.ServeMux
claudeBinPath string // path to claude binary; defaults to "claude"
elaborateCmdPath string // overrides claudeBinPath; used in tests
+ validateCmdPath string // overrides claudeBinPath for validate; used in tests
startNextTaskScript string // path to start-next-task script; overridden in tests
+ deployScript string // path to deploy script; overridden in tests
workDir string // working directory injected into elaborate system prompt
}
func NewServer(store *storage.DB, pool *executor.Pool, logger *slog.Logger, claudeBinPath string) *Server {
wd, _ := os.Getwd()
s := &Server{
- store: store,
- logStore: store,
- pool: pool,
+ store: store,
+ logStore: store,
+ taskLogStore: store,
+ pool: pool,
hub: NewHub(),
logger: logger,
mux: http.NewServeMux(),
@@ -57,6 +61,7 @@ func (s *Server) StartHub() {
func (s *Server) routes() {
s.mux.HandleFunc("POST /api/tasks/elaborate", s.handleElaborateTask)
+ s.mux.HandleFunc("POST /api/tasks/validate", s.handleValidateTask)
s.mux.HandleFunc("POST /api/tasks", s.handleCreateTask)
s.mux.HandleFunc("GET /api/tasks", s.handleListTasks)
s.mux.HandleFunc("GET /api/tasks/{id}", s.handleGetTask)
@@ -64,11 +69,13 @@ func (s *Server) routes() {
s.mux.HandleFunc("POST /api/tasks/{id}/cancel", s.handleCancelTask)
s.mux.HandleFunc("POST /api/tasks/{id}/accept", s.handleAcceptTask)
s.mux.HandleFunc("POST /api/tasks/{id}/reject", s.handleRejectTask)
+ s.mux.HandleFunc("DELETE /api/tasks/{id}", s.handleDeleteTask)
s.mux.HandleFunc("GET /api/tasks/{id}/subtasks", s.handleListSubtasks)
s.mux.HandleFunc("GET /api/tasks/{id}/executions", s.handleListExecutions)
s.mux.HandleFunc("GET /api/executions", s.handleListRecentExecutions)
s.mux.HandleFunc("GET /api/executions/{id}", s.handleGetExecution)
s.mux.HandleFunc("GET /api/executions/{id}/log", s.handleGetExecutionLog)
+ s.mux.HandleFunc("GET /api/tasks/{id}/logs/stream", s.handleStreamTaskLogs)
s.mux.HandleFunc("GET /api/executions/{id}/logs/stream", s.handleStreamLogs)
s.mux.HandleFunc("GET /api/templates", s.handleListTemplates)
s.mux.HandleFunc("POST /api/templates", s.handleCreateTemplate)
@@ -76,7 +83,9 @@ func (s *Server) routes() {
s.mux.HandleFunc("PUT /api/templates/{id}", s.handleUpdateTemplate)
s.mux.HandleFunc("DELETE /api/templates/{id}", s.handleDeleteTemplate)
s.mux.HandleFunc("POST /api/tasks/{id}/answer", s.handleAnswerQuestion)
+ s.mux.HandleFunc("POST /api/tasks/{id}/resume", s.handleResumeTimedOutTask)
s.mux.HandleFunc("POST /api/scripts/start-next-task", s.handleStartNextTask)
+ s.mux.HandleFunc("POST /api/scripts/deploy", s.handleDeploy)
s.mux.HandleFunc("GET /api/ws", s.handleWebSocket)
s.mux.HandleFunc("GET /api/health", s.handleHealth)
s.mux.Handle("GET /", http.FileServerFS(webui.Files))
@@ -112,17 +121,46 @@ func (s *Server) BroadcastQuestion(taskID, toolUseID string, questionData json.R
s.hub.Broadcast(data)
}
+func (s *Server) handleDeleteTask(w http.ResponseWriter, r *http.Request) {
+ id := r.PathValue("id")
+ t, err := s.store.GetTask(id)
+ if err != nil {
+ writeJSON(w, http.StatusNotFound, map[string]string{"error": "task not found"})
+ return
+ }
+ if t.State == task.StateRunning || t.State == task.StateQueued {
+ writeJSON(w, http.StatusConflict, map[string]string{"error": "cannot delete a running or queued task"})
+ return
+ }
+ if err := s.store.DeleteTask(id); err != nil {
+ writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
+ return
+ }
+ w.WriteHeader(http.StatusNoContent)
+}
+
func (s *Server) handleCancelTask(w http.ResponseWriter, r *http.Request) {
taskID := r.PathValue("id")
- if _, err := s.store.GetTask(taskID); err != nil {
+ tk, err := s.store.GetTask(taskID)
+ if err != nil {
writeJSON(w, http.StatusNotFound, map[string]string{"error": "task not found"})
return
}
- if !s.pool.Cancel(taskID) {
- writeJSON(w, http.StatusConflict, map[string]string{"error": "task is not running"})
+ // If the task is actively running in the pool, cancel it there.
+ if s.pool.Cancel(taskID) {
+ writeJSON(w, http.StatusOK, map[string]string{"status": "cancelling"})
+ return
+ }
+ // For non-running tasks (PENDING, QUEUED), transition directly to CANCELLED.
+ if !task.ValidTransition(tk.State, task.StateCancelled) {
+ writeJSON(w, http.StatusConflict, map[string]string{"error": "task cannot be cancelled from state " + string(tk.State)})
return
}
- writeJSON(w, http.StatusOK, map[string]string{"status": "cancelling"})
+ if err := s.store.UpdateTaskState(taskID, task.StateCancelled); err != nil {
+ writeJSON(w, http.StatusInternalServerError, map[string]string{"error": "failed to cancel task"})
+ return
+ }
+ writeJSON(w, http.StatusOK, map[string]string{"status": "cancelled"})
}
func (s *Server) handleAnswerQuestion(w http.ResponseWriter, r *http.Request) {
@@ -176,6 +214,44 @@ func (s *Server) handleAnswerQuestion(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, map[string]string{"status": "queued"})
}
+func (s *Server) handleResumeTimedOutTask(w http.ResponseWriter, r *http.Request) {
+ taskID := r.PathValue("id")
+
+ tk, err := s.store.GetTask(taskID)
+ if err != nil {
+ writeJSON(w, http.StatusNotFound, map[string]string{"error": "task not found"})
+ return
+ }
+ if tk.State != task.StateTimedOut {
+ writeJSON(w, http.StatusConflict, map[string]string{"error": "task is not timed out"})
+ return
+ }
+
+ latest, err := s.store.GetLatestExecution(taskID)
+ if err != nil || latest.SessionID == "" {
+ writeJSON(w, http.StatusInternalServerError, map[string]string{"error": "no resumable session found"})
+ return
+ }
+
+ s.store.UpdateTaskState(taskID, task.StateQueued)
+
+ resumeExec := &storage.Execution{
+ ID: uuid.New().String(),
+ TaskID: taskID,
+ ResumeSessionID: latest.SessionID,
+ ResumeAnswer: "Your previous execution timed out. Please continue where you left off and complete the task.",
+ }
+ if err := s.pool.SubmitResume(context.Background(), tk, resumeExec); err != nil {
+ writeJSON(w, http.StatusServiceUnavailable, map[string]string{"error": err.Error()})
+ return
+ }
+
+ writeJSON(w, http.StatusAccepted, map[string]string{
+ "message": "task queued for resume",
+ "task_id": taskID,
+ })
+}
+
func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
}