summaryrefslogtreecommitdiff
path: root/internal/api/logs.go
diff options
context:
space:
mode:
authorPeter Stone <thepeterstone@gmail.com>2026-03-03 21:15:50 +0000
committerPeter Stone <thepeterstone@gmail.com>2026-03-03 21:15:50 +0000
commit74cc740398cf2d90804ab19db728c844c2e056b7 (patch)
treee8532d1da9273e1613beb7b762b16134da0de286 /internal/api/logs.go
parentf527972f4d8311a09e639ede6c4da4ca669cfd5e (diff)
Add elaborate, logs-stream, templates, and subtask-list endpoints
- POST /api/tasks/elaborate: calls claude to draft a task config from a natural-language prompt - GET /api/executions/{id}/logs/stream: SSE tail of stdout.log - CRUD /api/templates: create/list/get/update/delete reusable task configs - GET /api/tasks/{id}/subtasks: list child tasks - Server.NewServer accepts claudeBinPath for elaborate; injectable elaborateCmdPath and logStore for test isolation - Valid-transition guard added to POST /api/tasks/{id}/run - CLI passes claude binary path through to the server Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/api/logs.go')
-rw-r--r--internal/api/logs.go235
1 files changed, 235 insertions, 0 deletions
diff --git a/internal/api/logs.go b/internal/api/logs.go
new file mode 100644
index 0000000..0354943
--- /dev/null
+++ b/internal/api/logs.go
@@ -0,0 +1,235 @@
+package api
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "os"
+ "time"
+
+ "github.com/thepeterstone/claudomator/internal/storage"
+ "github.com/thepeterstone/claudomator/internal/task"
+)
+
+// logStore is the minimal storage interface needed by handleStreamLogs.
+type logStore interface {
+ GetExecution(id string) (*storage.Execution, error)
+}
+
+const maxTailDuration = 30 * time.Minute
+
+var terminalStates = map[string]bool{
+ string(task.StateCompleted): true,
+ string(task.StateFailed): true,
+ string(task.StateTimedOut): true,
+ string(task.StateCancelled): true,
+ string(task.StateBudgetExceeded): true,
+}
+
+type logStreamMsg struct {
+ Type string `json:"type"`
+ Message *logAssistMsg `json:"message,omitempty"`
+ CostUSD float64 `json:"cost_usd,omitempty"`
+}
+
+type logAssistMsg struct {
+ Content []logContentBlock `json:"content"`
+}
+
+type logContentBlock struct {
+ Type string `json:"type"`
+ Text string `json:"text,omitempty"`
+ Name string `json:"name,omitempty"`
+ Input json.RawMessage `json:"input,omitempty"`
+}
+
+// handleStreamLogs streams parsed execution log content via SSE.
+// GET /api/executions/{id}/logs/stream
+func (s *Server) handleStreamLogs(w http.ResponseWriter, r *http.Request) {
+ id := r.PathValue("id")
+ exec, err := s.logStore.GetExecution(id)
+ if err != nil {
+ http.Error(w, "execution not found", http.StatusNotFound)
+ return
+ }
+
+ w.Header().Set("Content-Type", "text/event-stream")
+ w.Header().Set("Cache-Control", "no-cache")
+ w.Header().Set("X-Accel-Buffering", "no")
+
+ flusher, ok := w.(http.Flusher)
+ if !ok {
+ http.Error(w, "streaming not supported", http.StatusInternalServerError)
+ return
+ }
+
+ ctx := r.Context()
+
+ if terminalStates[exec.Status] {
+ if exec.StdoutPath != "" {
+ if f, err := os.Open(exec.StdoutPath); err == nil {
+ defer f.Close()
+ var offset int64
+ for _, line := range readNewLines(f, &offset) {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+ emitLogLine(w, flusher, line)
+ }
+ }
+ }
+ } else if exec.Status == string(task.StateRunning) {
+ tailRunningExecution(ctx, w, flusher, s.logStore, exec)
+ return // tailRunningExecution sends the done event
+ }
+
+ fmt.Fprintf(w, "event: done\ndata: {}\n\n")
+ flusher.Flush()
+}
+
+// tailRunningExecution live-tails the stdout log for a RUNNING execution, polling every 250ms.
+// It returns when the execution transitions to a terminal state, the context is cancelled,
+// or after maxTailDuration (safety guard against goroutine leaks).
+func tailRunningExecution(ctx context.Context, w http.ResponseWriter, flusher http.Flusher, store logStore, exec *storage.Execution) {
+ var (
+ f *os.File
+ offset int64
+ )
+
+ // Emit any content already written to the log file.
+ if exec.StdoutPath != "" {
+ var err error
+ f, err = os.Open(exec.StdoutPath)
+ if err == nil {
+ defer f.Close()
+ for _, line := range readNewLines(f, &offset) {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+ emitLogLine(w, flusher, line)
+ }
+ }
+ }
+
+ ticker := time.NewTicker(250 * time.Millisecond)
+ defer ticker.Stop()
+ deadline := time.NewTimer(maxTailDuration)
+ defer deadline.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-deadline.C:
+ fmt.Fprintf(w, "event: done\ndata: {}\n\n")
+ flusher.Flush()
+ return
+ case <-ticker.C:
+ // If the log file wasn't open yet, try now (may have been created since).
+ if f == nil && exec.StdoutPath != "" {
+ var err error
+ f, err = os.Open(exec.StdoutPath)
+ if err != nil {
+ f = nil
+ } else {
+ defer f.Close()
+ }
+ }
+
+ // Emit any new complete lines.
+ if f != nil {
+ for _, line := range readNewLines(f, &offset) {
+ emitLogLine(w, flusher, line)
+ }
+ }
+
+ // Re-fetch execution status from DB.
+ updated, err := store.GetExecution(exec.ID)
+ if err != nil {
+ fmt.Fprintf(w, "event: done\ndata: {}\n\n")
+ flusher.Flush()
+ return
+ }
+ if terminalStates[updated.Status] {
+ // Flush any remaining content written between the last read and now.
+ if f != nil {
+ for _, line := range readNewLines(f, &offset) {
+ emitLogLine(w, flusher, line)
+ }
+ }
+ fmt.Fprintf(w, "event: done\ndata: {}\n\n")
+ flusher.Flush()
+ return
+ }
+ }
+ }
+}
+
+// readNewLines reads all complete lines from f starting at *offset.
+// It advances *offset past the last newline so partial trailing lines
+// are deferred to the next call (safe for live-tailing growing files).
+func readNewLines(f *os.File, offset *int64) [][]byte {
+ if _, err := f.Seek(*offset, io.SeekStart); err != nil {
+ return nil
+ }
+ data, err := io.ReadAll(f)
+ if err != nil || len(data) == 0 {
+ return nil
+ }
+ lastNL := bytes.LastIndexByte(data, '\n')
+ if lastNL < 0 {
+ return nil // no complete line yet
+ }
+ *offset += int64(lastNL + 1)
+ return bytes.Split(data[:lastNL], []byte("\n"))
+}
+
+// emitLogLine parses a single stream-json line and emits corresponding SSE events.
+func emitLogLine(w http.ResponseWriter, flusher http.Flusher, line []byte) {
+ if len(line) == 0 {
+ return
+ }
+ var msg logStreamMsg
+ if err := json.Unmarshal(line, &msg); err != nil {
+ return
+ }
+ switch msg.Type {
+ case "assistant":
+ if msg.Message == nil {
+ return
+ }
+ for _, block := range msg.Message.Content {
+ var event map[string]string
+ switch block.Type {
+ case "text":
+ event = map[string]string{"type": "text", "content": block.Text}
+ case "tool_use":
+ summary := string(block.Input)
+ if len(summary) > 80 {
+ summary = summary[:80]
+ }
+ event = map[string]string{"type": "tool_use", "content": fmt.Sprintf("%s(%s)", block.Name, summary)}
+ default:
+ continue
+ }
+ data, _ := json.Marshal(event)
+ fmt.Fprintf(w, "data: %s\n\n", data)
+ flusher.Flush()
+ }
+ case "result":
+ event := map[string]string{
+ "type": "cost",
+ "content": fmt.Sprintf("%g", msg.CostUSD),
+ }
+ data, _ := json.Marshal(event)
+ fmt.Fprintf(w, "data: %s\n\n", data)
+ flusher.Flush()
+ }
+}