package api import ( "bytes" "context" "encoding/json" "fmt" "io" "net/http" "os" "time" "github.com/thepeterstone/claudomator/internal/storage" "github.com/thepeterstone/claudomator/internal/task" ) // logStore is the minimal storage interface needed by handleStreamLogs. type logStore interface { GetExecution(id string) (*storage.Execution, error) } // taskLogStore is the minimal storage interface needed by handleStreamTaskLogs. type taskLogStore interface { GetExecution(id string) (*storage.Execution, error) GetLatestExecution(taskID string) (*storage.Execution, error) } const maxTailDuration = 30 * time.Minute var terminalStates = map[string]bool{ string(task.StateCompleted): true, string(task.StateFailed): true, string(task.StateTimedOut): true, string(task.StateCancelled): true, string(task.StateBudgetExceeded): true, } type logStreamMsg struct { Type string `json:"type"` Message *logAssistMsg `json:"message,omitempty"` CostUSD float64 `json:"cost_usd,omitempty"` } type logAssistMsg struct { Content []logContentBlock `json:"content"` } type logContentBlock struct { Type string `json:"type"` Text string `json:"text,omitempty"` Name string `json:"name,omitempty"` Input json.RawMessage `json:"input,omitempty"` } // handleStreamTaskLogs streams the latest execution log for a task via SSE. // GET /api/tasks/{id}/logs/stream func (s *Server) handleStreamTaskLogs(w http.ResponseWriter, r *http.Request) { taskID := r.PathValue("id") exec, err := s.taskLogStore.GetLatestExecution(taskID) if err != nil { http.Error(w, "task not found", http.StatusNotFound) return } w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("X-Accel-Buffering", "no") flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "streaming not supported", http.StatusInternalServerError) return } ctx := r.Context() if terminalStates[exec.Status] { if exec.StdoutPath != "" { if f, err := os.Open(exec.StdoutPath); err == nil { defer f.Close() var offset int64 for _, line := range readNewLines(f, &offset) { select { case <-ctx.Done(): return default: } emitLogLine(w, flusher, line) } } } } else if exec.Status == string(task.StateRunning) { tailRunningExecution(ctx, w, flusher, s.taskLogStore, exec) return } fmt.Fprintf(w, "event: done\ndata: {}\n\n") flusher.Flush() } // handleStreamLogs streams parsed execution log content via SSE. // GET /api/executions/{id}/logs/stream func (s *Server) handleStreamLogs(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") exec, err := s.logStore.GetExecution(id) if err != nil { http.Error(w, "execution not found", http.StatusNotFound) return } w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("X-Accel-Buffering", "no") flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "streaming not supported", http.StatusInternalServerError) return } ctx := r.Context() if terminalStates[exec.Status] { if exec.StdoutPath != "" { if f, err := os.Open(exec.StdoutPath); err == nil { defer f.Close() var offset int64 for _, line := range readNewLines(f, &offset) { select { case <-ctx.Done(): return default: } emitLogLine(w, flusher, line) } } } } else if exec.Status == string(task.StateRunning) { tailRunningExecution(ctx, w, flusher, s.logStore, exec) return // tailRunningExecution sends the done event } fmt.Fprintf(w, "event: done\ndata: {}\n\n") flusher.Flush() } // tailRunningExecution live-tails the stdout log for a RUNNING execution, polling every 250ms. // It returns when the execution transitions to a terminal state, the context is cancelled, // or after maxTailDuration (safety guard against goroutine leaks). func tailRunningExecution(ctx context.Context, w http.ResponseWriter, flusher http.Flusher, store logStore, exec *storage.Execution) { var ( f *os.File offset int64 ) // Emit any content already written to the log file. if exec.StdoutPath != "" { var err error f, err = os.Open(exec.StdoutPath) if err == nil { defer f.Close() for _, line := range readNewLines(f, &offset) { select { case <-ctx.Done(): return default: } emitLogLine(w, flusher, line) } } } ticker := time.NewTicker(250 * time.Millisecond) defer ticker.Stop() deadline := time.NewTimer(maxTailDuration) defer deadline.Stop() for { select { case <-ctx.Done(): return case <-deadline.C: fmt.Fprintf(w, "event: done\ndata: {}\n\n") flusher.Flush() return case <-ticker.C: // If the log file wasn't open yet, try now (may have been created since). if f == nil && exec.StdoutPath != "" { var err error f, err = os.Open(exec.StdoutPath) if err != nil { f = nil } else { defer f.Close() } } // Emit any new complete lines. if f != nil { for _, line := range readNewLines(f, &offset) { emitLogLine(w, flusher, line) } } // Re-fetch execution status from DB. updated, err := store.GetExecution(exec.ID) if err != nil { fmt.Fprintf(w, "event: done\ndata: {}\n\n") flusher.Flush() return } if terminalStates[updated.Status] { // Flush any remaining content written between the last read and now. if f != nil { for _, line := range readNewLines(f, &offset) { emitLogLine(w, flusher, line) } } fmt.Fprintf(w, "event: done\ndata: {}\n\n") flusher.Flush() return } } } } // readNewLines reads all complete lines from f starting at *offset. // It advances *offset past the last newline so partial trailing lines // are deferred to the next call (safe for live-tailing growing files). func readNewLines(f *os.File, offset *int64) [][]byte { if _, err := f.Seek(*offset, io.SeekStart); err != nil { return nil } data, err := io.ReadAll(f) if err != nil || len(data) == 0 { return nil } lastNL := bytes.LastIndexByte(data, '\n') if lastNL < 0 { return nil // no complete line yet } *offset += int64(lastNL + 1) return bytes.Split(data[:lastNL], []byte("\n")) } // emitLogLine parses a single stream-json line and emits corresponding SSE events. func emitLogLine(w http.ResponseWriter, flusher http.Flusher, line []byte) { if len(line) == 0 { return } var msg logStreamMsg if err := json.Unmarshal(line, &msg); err != nil { return } switch msg.Type { case "assistant": if msg.Message == nil { return } for _, block := range msg.Message.Content { var event map[string]string switch block.Type { case "text": event = map[string]string{"type": "text", "content": block.Text} case "tool_use": summary := string(block.Input) if len(summary) > 80 { summary = summary[:80] } event = map[string]string{"type": "tool_use", "content": fmt.Sprintf("%s(%s)", block.Name, summary)} default: continue } data, _ := json.Marshal(event) fmt.Fprintf(w, "data: %s\n\n", data) flusher.Flush() } case "result": event := map[string]string{ "type": "cost", "content": fmt.Sprintf("%g", msg.CostUSD), } data, _ := json.Marshal(event) fmt.Fprintf(w, "data: %s\n\n", data) flusher.Flush() } }