diff options
Diffstat (limited to 'internal/api/logs.go')
| -rw-r--r-- | internal/api/logs.go | 52 |
1 files changed, 52 insertions, 0 deletions
diff --git a/internal/api/logs.go b/internal/api/logs.go index 0354943..1ba4b00 100644 --- a/internal/api/logs.go +++ b/internal/api/logs.go @@ -19,6 +19,12 @@ type logStore interface { GetExecution(id string) (*storage.Execution, error) } +// taskLogStore is the minimal storage interface needed by handleStreamTaskLogs. +type taskLogStore interface { + GetExecution(id string) (*storage.Execution, error) + GetLatestExecution(taskID string) (*storage.Execution, error) +} + const maxTailDuration = 30 * time.Minute var terminalStates = map[string]bool{ @@ -46,6 +52,52 @@ type logContentBlock struct { Input json.RawMessage `json:"input,omitempty"` } +// handleStreamTaskLogs streams the latest execution log for a task via SSE. +// GET /api/tasks/{id}/logs/stream +func (s *Server) handleStreamTaskLogs(w http.ResponseWriter, r *http.Request) { + taskID := r.PathValue("id") + exec, err := s.taskLogStore.GetLatestExecution(taskID) + if err != nil { + http.Error(w, "task not found", http.StatusNotFound) + return + } + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("X-Accel-Buffering", "no") + + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "streaming not supported", http.StatusInternalServerError) + return + } + + ctx := r.Context() + + if terminalStates[exec.Status] { + if exec.StdoutPath != "" { + if f, err := os.Open(exec.StdoutPath); err == nil { + defer f.Close() + var offset int64 + for _, line := range readNewLines(f, &offset) { + select { + case <-ctx.Done(): + return + default: + } + emitLogLine(w, flusher, line) + } + } + } + } else if exec.Status == string(task.StateRunning) { + tailRunningExecution(ctx, w, flusher, s.taskLogStore, exec) + return + } + + fmt.Fprintf(w, "event: done\ndata: {}\n\n") + flusher.Flush() +} + // handleStreamLogs streams parsed execution log content via SSE. // GET /api/executions/{id}/logs/stream func (s *Server) handleStreamLogs(w http.ResponseWriter, r *http.Request) { |
