diff options
| author | Peter Stone <thepeterstone@gmail.com> | 2026-03-03 21:15:50 +0000 |
|---|---|---|
| committer | Peter Stone <thepeterstone@gmail.com> | 2026-03-03 21:15:50 +0000 |
| commit | 74cc740398cf2d90804ab19db728c844c2e056b7 (patch) | |
| tree | e8532d1da9273e1613beb7b762b16134da0de286 /internal/api/logs.go | |
| parent | f527972f4d8311a09e639ede6c4da4ca669cfd5e (diff) | |
Add elaborate, logs-stream, templates, and subtask-list endpoints
- POST /api/tasks/elaborate: calls claude to draft a task config from
a natural-language prompt
- GET /api/executions/{id}/logs/stream: SSE tail of stdout.log
- CRUD /api/templates: create/list/get/update/delete reusable task configs
- GET /api/tasks/{id}/subtasks: list child tasks
- Server.NewServer accepts claudeBinPath for elaborate; injectable
elaborateCmdPath and logStore for test isolation
- Valid-transition guard added to POST /api/tasks/{id}/run
- CLI passes claude binary path through to the server
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/api/logs.go')
| -rw-r--r-- | internal/api/logs.go | 235 |
1 files changed, 235 insertions, 0 deletions
diff --git a/internal/api/logs.go b/internal/api/logs.go new file mode 100644 index 0000000..0354943 --- /dev/null +++ b/internal/api/logs.go @@ -0,0 +1,235 @@ +package api + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "time" + + "github.com/thepeterstone/claudomator/internal/storage" + "github.com/thepeterstone/claudomator/internal/task" +) + +// logStore is the minimal storage interface needed by handleStreamLogs. +type logStore interface { + GetExecution(id string) (*storage.Execution, error) +} + +const maxTailDuration = 30 * time.Minute + +var terminalStates = map[string]bool{ + string(task.StateCompleted): true, + string(task.StateFailed): true, + string(task.StateTimedOut): true, + string(task.StateCancelled): true, + string(task.StateBudgetExceeded): true, +} + +type logStreamMsg struct { + Type string `json:"type"` + Message *logAssistMsg `json:"message,omitempty"` + CostUSD float64 `json:"cost_usd,omitempty"` +} + +type logAssistMsg struct { + Content []logContentBlock `json:"content"` +} + +type logContentBlock struct { + Type string `json:"type"` + Text string `json:"text,omitempty"` + Name string `json:"name,omitempty"` + Input json.RawMessage `json:"input,omitempty"` +} + +// handleStreamLogs streams parsed execution log content via SSE. +// GET /api/executions/{id}/logs/stream +func (s *Server) handleStreamLogs(w http.ResponseWriter, r *http.Request) { + id := r.PathValue("id") + exec, err := s.logStore.GetExecution(id) + if err != nil { + http.Error(w, "execution not found", http.StatusNotFound) + return + } + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("X-Accel-Buffering", "no") + + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "streaming not supported", http.StatusInternalServerError) + return + } + + ctx := r.Context() + + if terminalStates[exec.Status] { + if exec.StdoutPath != "" { + if f, err := os.Open(exec.StdoutPath); err == nil { + defer f.Close() + var offset int64 + for _, line := range readNewLines(f, &offset) { + select { + case <-ctx.Done(): + return + default: + } + emitLogLine(w, flusher, line) + } + } + } + } else if exec.Status == string(task.StateRunning) { + tailRunningExecution(ctx, w, flusher, s.logStore, exec) + return // tailRunningExecution sends the done event + } + + fmt.Fprintf(w, "event: done\ndata: {}\n\n") + flusher.Flush() +} + +// tailRunningExecution live-tails the stdout log for a RUNNING execution, polling every 250ms. +// It returns when the execution transitions to a terminal state, the context is cancelled, +// or after maxTailDuration (safety guard against goroutine leaks). +func tailRunningExecution(ctx context.Context, w http.ResponseWriter, flusher http.Flusher, store logStore, exec *storage.Execution) { + var ( + f *os.File + offset int64 + ) + + // Emit any content already written to the log file. + if exec.StdoutPath != "" { + var err error + f, err = os.Open(exec.StdoutPath) + if err == nil { + defer f.Close() + for _, line := range readNewLines(f, &offset) { + select { + case <-ctx.Done(): + return + default: + } + emitLogLine(w, flusher, line) + } + } + } + + ticker := time.NewTicker(250 * time.Millisecond) + defer ticker.Stop() + deadline := time.NewTimer(maxTailDuration) + defer deadline.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-deadline.C: + fmt.Fprintf(w, "event: done\ndata: {}\n\n") + flusher.Flush() + return + case <-ticker.C: + // If the log file wasn't open yet, try now (may have been created since). + if f == nil && exec.StdoutPath != "" { + var err error + f, err = os.Open(exec.StdoutPath) + if err != nil { + f = nil + } else { + defer f.Close() + } + } + + // Emit any new complete lines. + if f != nil { + for _, line := range readNewLines(f, &offset) { + emitLogLine(w, flusher, line) + } + } + + // Re-fetch execution status from DB. + updated, err := store.GetExecution(exec.ID) + if err != nil { + fmt.Fprintf(w, "event: done\ndata: {}\n\n") + flusher.Flush() + return + } + if terminalStates[updated.Status] { + // Flush any remaining content written between the last read and now. + if f != nil { + for _, line := range readNewLines(f, &offset) { + emitLogLine(w, flusher, line) + } + } + fmt.Fprintf(w, "event: done\ndata: {}\n\n") + flusher.Flush() + return + } + } + } +} + +// readNewLines reads all complete lines from f starting at *offset. +// It advances *offset past the last newline so partial trailing lines +// are deferred to the next call (safe for live-tailing growing files). +func readNewLines(f *os.File, offset *int64) [][]byte { + if _, err := f.Seek(*offset, io.SeekStart); err != nil { + return nil + } + data, err := io.ReadAll(f) + if err != nil || len(data) == 0 { + return nil + } + lastNL := bytes.LastIndexByte(data, '\n') + if lastNL < 0 { + return nil // no complete line yet + } + *offset += int64(lastNL + 1) + return bytes.Split(data[:lastNL], []byte("\n")) +} + +// emitLogLine parses a single stream-json line and emits corresponding SSE events. +func emitLogLine(w http.ResponseWriter, flusher http.Flusher, line []byte) { + if len(line) == 0 { + return + } + var msg logStreamMsg + if err := json.Unmarshal(line, &msg); err != nil { + return + } + switch msg.Type { + case "assistant": + if msg.Message == nil { + return + } + for _, block := range msg.Message.Content { + var event map[string]string + switch block.Type { + case "text": + event = map[string]string{"type": "text", "content": block.Text} + case "tool_use": + summary := string(block.Input) + if len(summary) > 80 { + summary = summary[:80] + } + event = map[string]string{"type": "tool_use", "content": fmt.Sprintf("%s(%s)", block.Name, summary)} + default: + continue + } + data, _ := json.Marshal(event) + fmt.Fprintf(w, "data: %s\n\n", data) + flusher.Flush() + } + case "result": + event := map[string]string{ + "type": "cost", + "content": fmt.Sprintf("%g", msg.CostUSD), + } + data, _ := json.Marshal(event) + fmt.Fprintf(w, "data: %s\n\n", data) + flusher.Flush() + } +} |
