summaryrefslogtreecommitdiff
path: root/internal/api/logs.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/api/logs.go')
-rw-r--r--internal/api/logs.go52
1 files changed, 52 insertions, 0 deletions
diff --git a/internal/api/logs.go b/internal/api/logs.go
index 0354943..1ba4b00 100644
--- a/internal/api/logs.go
+++ b/internal/api/logs.go
@@ -19,6 +19,12 @@ type logStore interface {
GetExecution(id string) (*storage.Execution, error)
}
+// taskLogStore is the minimal storage interface needed by handleStreamTaskLogs.
+type taskLogStore interface {
+ GetExecution(id string) (*storage.Execution, error)
+ GetLatestExecution(taskID string) (*storage.Execution, error)
+}
+
const maxTailDuration = 30 * time.Minute
var terminalStates = map[string]bool{
@@ -46,6 +52,52 @@ type logContentBlock struct {
Input json.RawMessage `json:"input,omitempty"`
}
+// handleStreamTaskLogs streams the latest execution log for a task via SSE.
+// GET /api/tasks/{id}/logs/stream
+func (s *Server) handleStreamTaskLogs(w http.ResponseWriter, r *http.Request) {
+ taskID := r.PathValue("id")
+ exec, err := s.taskLogStore.GetLatestExecution(taskID)
+ if err != nil {
+ http.Error(w, "task not found", http.StatusNotFound)
+ return
+ }
+
+ w.Header().Set("Content-Type", "text/event-stream")
+ w.Header().Set("Cache-Control", "no-cache")
+ w.Header().Set("X-Accel-Buffering", "no")
+
+ flusher, ok := w.(http.Flusher)
+ if !ok {
+ http.Error(w, "streaming not supported", http.StatusInternalServerError)
+ return
+ }
+
+ ctx := r.Context()
+
+ if terminalStates[exec.Status] {
+ if exec.StdoutPath != "" {
+ if f, err := os.Open(exec.StdoutPath); err == nil {
+ defer f.Close()
+ var offset int64
+ for _, line := range readNewLines(f, &offset) {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+ emitLogLine(w, flusher, line)
+ }
+ }
+ }
+ } else if exec.Status == string(task.StateRunning) {
+ tailRunningExecution(ctx, w, flusher, s.taskLogStore, exec)
+ return
+ }
+
+ fmt.Fprintf(w, "event: done\ndata: {}\n\n")
+ flusher.Flush()
+}
+
// handleStreamLogs streams parsed execution log content via SSE.
// GET /api/executions/{id}/logs/stream
func (s *Server) handleStreamLogs(w http.ResponseWriter, r *http.Request) {