package api import ( "encoding/json" "errors" "net/http" "net/http/httptest" "os" "path/filepath" "strings" "sync/atomic" "testing" "time" "github.com/thepeterstone/claudomator/internal/storage" ) // mockTaskLogStore implements taskLogStore for testing handleStreamTaskLogs. type mockTaskLogStore struct { getExecution func(id string) (*storage.Execution, error) getLatestExecution func(taskID string) (*storage.Execution, error) } func (m *mockTaskLogStore) GetExecution(id string) (*storage.Execution, error) { return m.getExecution(id) } func (m *mockTaskLogStore) GetLatestExecution(taskID string) (*storage.Execution, error) { return m.getLatestExecution(taskID) } func taskLogsMux(srv *Server) *http.ServeMux { mux := http.NewServeMux() mux.HandleFunc("GET /api/tasks/{id}/logs/stream", srv.handleStreamTaskLogs) return mux } // mockLogStore implements logStore for testing. type mockLogStore struct { fn func(id string) (*storage.Execution, error) } func (m *mockLogStore) GetExecution(id string) (*storage.Execution, error) { return m.fn(id) } func logsMux(srv *Server) *http.ServeMux { mux := http.NewServeMux() mux.HandleFunc("GET /api/executions/{id}/logs", srv.handleStreamLogs) return mux } // TestHandleStreamLogs_NotFound verifies that an unknown execution ID yields 404. func TestHandleStreamLogs_NotFound(t *testing.T) { srv := &Server{ logStore: &mockLogStore{fn: func(id string) (*storage.Execution, error) { return nil, errors.New("not found") }}, } req := httptest.NewRequest("GET", "/api/executions/nonexistent/logs", nil) w := httptest.NewRecorder() logsMux(srv).ServeHTTP(w, req) if w.Code != http.StatusNotFound { t.Errorf("status: want 404, got %d; body: %s", w.Code, w.Body.String()) } } // TestHandleStreamLogs_TerminalState_EmitsEventsAndDone verifies that a COMPLETED // execution with a populated stdout.log streams SSE events and terminates with a done event. func TestHandleStreamLogs_TerminalState_EmitsEventsAndDone(t *testing.T) { dir := t.TempDir() logPath := filepath.Join(dir, "stdout.log") lines := strings.Join([]string{ `{"type":"assistant","message":{"content":[{"type":"text","text":"Hello world"}]}}`, `{"type":"assistant","message":{"content":[{"type":"tool_use","name":"Bash","input":{"command":"ls"}}]}}`, `{"type":"result","cost_usd":0.0042}`, }, "\n") + "\n" if err := os.WriteFile(logPath, []byte(lines), 0600); err != nil { t.Fatal(err) } exec := &storage.Execution{ ID: "exec-terminal-1", TaskID: "task-terminal-1", StartTime: time.Now(), Status: "COMPLETED", StdoutPath: logPath, } srv := &Server{ logStore: &mockLogStore{fn: func(id string) (*storage.Execution, error) { return exec, nil }}, } req := httptest.NewRequest("GET", "/api/executions/exec-terminal-1/logs", nil) w := httptest.NewRecorder() logsMux(srv).ServeHTTP(w, req) if ct := w.Header().Get("Content-Type"); !strings.HasPrefix(ct, "text/event-stream") { t.Errorf("Content-Type: want text/event-stream, got %q", ct) } body := w.Body.String() if !strings.Contains(body, "data: ") { t.Error("expected at least one SSE 'data: ' event in body") } if !strings.Contains(body, "\n\n") { t.Error("expected SSE double-newline event termination") } if !strings.HasSuffix(body, "event: done\ndata: {}\n\n") { t.Errorf("body does not end with done event; got:\n%s", body) } } // TestHandleStreamLogs_EmptyLog verifies that a COMPLETED execution with no stdout path // responds with only the done sentinel event. func TestHandleStreamLogs_EmptyLog(t *testing.T) { exec := &storage.Execution{ ID: "exec-empty-1", TaskID: "task-empty-1", StartTime: time.Now(), Status: "COMPLETED", // StdoutPath intentionally empty } srv := &Server{ logStore: &mockLogStore{fn: func(id string) (*storage.Execution, error) { return exec, nil }}, } req := httptest.NewRequest("GET", "/api/executions/exec-empty-1/logs", nil) w := httptest.NewRecorder() logsMux(srv).ServeHTTP(w, req) body := w.Body.String() if body != "event: done\ndata: {}\n\n" { t.Errorf("want only done event, got:\n%s", body) } } // TestHandleStreamLogs_RunningState_LiveTail verifies that a RUNNING execution streams // initial log content and emits a done event once it transitions to a terminal state. func TestHandleStreamLogs_RunningState_LiveTail(t *testing.T) { dir := t.TempDir() logPath := filepath.Join(dir, "stdout.log") logLines := strings.Join([]string{ `{"type":"assistant","message":{"content":[{"type":"text","text":"Working..."}]}}`, `{"type":"result","cost_usd":0.001}`, }, "\n") + "\n" if err := os.WriteFile(logPath, []byte(logLines), 0600); err != nil { t.Fatal(err) } runningExec := &storage.Execution{ ID: "exec-running-1", TaskID: "task-running-1", StartTime: time.Now(), Status: "RUNNING", StdoutPath: logPath, } // callCount tracks how many times GetExecution has been called. // Call 1: initial fetch in handleStreamLogs → RUNNING // Call 2+: poll in tailRunningExecution → COMPLETED var callCount atomic.Int32 mock := &mockLogStore{fn: func(id string) (*storage.Execution, error) { n := callCount.Add(1) if n <= 1 { return runningExec, nil } completed := *runningExec completed.Status = "COMPLETED" return &completed, nil }} srv := &Server{logStore: mock} req := httptest.NewRequest("GET", "/api/executions/exec-running-1/logs", nil) w := httptest.NewRecorder() logsMux(srv).ServeHTTP(w, req) body := w.Body.String() if !strings.Contains(body, `"Working..."`) { t.Errorf("expected initial text event in body; got:\n%s", body) } if !strings.Contains(body, `"type":"cost"`) { t.Errorf("expected cost event in body; got:\n%s", body) } if !strings.HasSuffix(body, "event: done\ndata: {}\n\n") { t.Errorf("body does not end with done event; got:\n%s", body) } } // --- Task-level SSE log streaming tests (handleStreamTaskLogs) --- // TestHandleStreamTaskLogs_TaskNotFound verifies that a task with no executions yields 404. func TestHandleStreamTaskLogs_TaskNotFound(t *testing.T) { srv := &Server{ taskLogStore: &mockTaskLogStore{ getLatestExecution: func(taskID string) (*storage.Execution, error) { return nil, errors.New("not found") }, }, } req := httptest.NewRequest("GET", "/api/tasks/nonexistent/logs/stream", nil) w := httptest.NewRecorder() taskLogsMux(srv).ServeHTTP(w, req) if w.Code != http.StatusNotFound { t.Errorf("status: want 404, got %d; body: %s", w.Code, w.Body.String()) } } // TestHandleStreamTaskLogs_NoStdoutPath_EmitsDone verifies that a completed execution with no // stdout log path emits only the done sentinel event. func TestHandleStreamTaskLogs_NoStdoutPath_EmitsDone(t *testing.T) { exec := &storage.Execution{ ID: "exec-task-empty", TaskID: "task-no-log", StartTime: time.Now(), Status: "COMPLETED", // StdoutPath intentionally empty } srv := &Server{ taskLogStore: &mockTaskLogStore{ getLatestExecution: func(taskID string) (*storage.Execution, error) { return exec, nil }, getExecution: func(id string) (*storage.Execution, error) { return exec, nil }, }, } req := httptest.NewRequest("GET", "/api/tasks/task-no-log/logs/stream", nil) w := httptest.NewRecorder() taskLogsMux(srv).ServeHTTP(w, req) if ct := w.Header().Get("Content-Type"); !strings.HasPrefix(ct, "text/event-stream") { t.Errorf("Content-Type: want text/event-stream, got %q", ct) } body := w.Body.String() if body != "event: done\ndata: {}\n\n" { t.Errorf("want only done event, got:\n%s", body) } } // TestHandleStreamTaskLogs_TerminalExecution_EmitsEventsAndDone verifies that a COMPLETED // execution streams SSE events and ends with a done event. func TestHandleStreamTaskLogs_TerminalExecution_EmitsEventsAndDone(t *testing.T) { dir := t.TempDir() logPath := filepath.Join(dir, "stdout.log") lines := strings.Join([]string{ `{"type":"assistant","message":{"content":[{"type":"text","text":"Task output here"}]}}`, `{"type":"result","cost_usd":0.007}`, }, "\n") + "\n" if err := os.WriteFile(logPath, []byte(lines), 0600); err != nil { t.Fatal(err) } exec := &storage.Execution{ ID: "exec-task-done", TaskID: "task-done", StartTime: time.Now(), Status: "COMPLETED", StdoutPath: logPath, } srv := &Server{ taskLogStore: &mockTaskLogStore{ getLatestExecution: func(taskID string) (*storage.Execution, error) { return exec, nil }, getExecution: func(id string) (*storage.Execution, error) { return exec, nil }, }, } req := httptest.NewRequest("GET", "/api/tasks/task-done/logs/stream", nil) w := httptest.NewRecorder() taskLogsMux(srv).ServeHTTP(w, req) body := w.Body.String() if !strings.Contains(body, `"Task output here"`) { t.Errorf("expected text event content in body; got:\n%s", body) } if !strings.Contains(body, `"type":"cost"`) { t.Errorf("expected cost event in body; got:\n%s", body) } if !strings.HasSuffix(body, "event: done\ndata: {}\n\n") { t.Errorf("body does not end with done event; got:\n%s", body) } } // TestEmitLogLine_ToolUse_EmitsNameField verifies that emitLogLine emits a tool_use SSE event // with a "name" field matching the tool name so the web UI can display it as "[ToolName]". func TestEmitLogLine_ToolUse_EmitsNameField(t *testing.T) { line := []byte(`{"type":"assistant","message":{"content":[{"type":"tool_use","name":"Bash","input":{"command":"ls -la"}}]}}`) w := httptest.NewRecorder() emitLogLine(w, w, line) body := w.Body.String() var found bool for _, chunk := range strings.Split(body, "\n\n") { chunk = strings.TrimSpace(chunk) if !strings.HasPrefix(chunk, "data: ") { continue } jsonStr := strings.TrimPrefix(chunk, "data: ") var e map[string]interface{} if err := json.Unmarshal([]byte(jsonStr), &e); err != nil { continue } if e["type"] == "tool_use" { if e["name"] != "Bash" { t.Errorf("tool_use event name: want Bash, got %v", e["name"]) } if e["input"] == nil { t.Error("tool_use event input: expected non-nil") } found = true } } if !found { t.Errorf("no tool_use event found in SSE output:\n%s", body) } } // TestEmitLogLine_Cost_EmitsTotalCostField verifies that emitLogLine emits a cost SSE event // with a numeric "total_cost" field so the web UI can display it correctly. func TestEmitLogLine_Cost_EmitsTotalCostField(t *testing.T) { line := []byte(`{"type":"result","total_cost_usd":0.0042}`) w := httptest.NewRecorder() emitLogLine(w, w, line) body := w.Body.String() var found bool for _, chunk := range strings.Split(body, "\n\n") { chunk = strings.TrimSpace(chunk) if !strings.HasPrefix(chunk, "data: ") { continue } jsonStr := strings.TrimPrefix(chunk, "data: ") var e map[string]interface{} if err := json.Unmarshal([]byte(jsonStr), &e); err != nil { continue } if e["type"] == "cost" { if e["total_cost"] == nil { t.Error("cost event total_cost: expected non-nil numeric field") } if v, ok := e["total_cost"].(float64); !ok || v != 0.0042 { t.Errorf("cost event total_cost: want 0.0042, got %v", e["total_cost"]) } found = true } } if !found { t.Errorf("no cost event found in SSE output:\n%s", body) } } // TestHandleStreamTaskLogs_RunningExecution_LiveTails verifies that a RUNNING execution is // live-tailed and a done event is emitted once it transitions to a terminal state. func TestHandleStreamTaskLogs_RunningExecution_LiveTails(t *testing.T) { dir := t.TempDir() logPath := filepath.Join(dir, "stdout.log") logLines := strings.Join([]string{ `{"type":"assistant","message":{"content":[{"type":"text","text":"Still running..."}]}}`, `{"type":"result","cost_usd":0.003}`, }, "\n") + "\n" if err := os.WriteFile(logPath, []byte(logLines), 0600); err != nil { t.Fatal(err) } runningExec := &storage.Execution{ ID: "exec-task-running", TaskID: "task-running", StartTime: time.Now(), Status: "RUNNING", StdoutPath: logPath, } // getLatestExecution is called once (initial lookup); getExecution polls for state change. var pollCount atomic.Int32 srv := &Server{ taskLogStore: &mockTaskLogStore{ getLatestExecution: func(taskID string) (*storage.Execution, error) { return runningExec, nil }, getExecution: func(id string) (*storage.Execution, error) { n := pollCount.Add(1) if n <= 1 { return runningExec, nil } completed := *runningExec completed.Status = "COMPLETED" return &completed, nil }, }, } req := httptest.NewRequest("GET", "/api/tasks/task-running/logs/stream", nil) w := httptest.NewRecorder() taskLogsMux(srv).ServeHTTP(w, req) body := w.Body.String() if !strings.Contains(body, `"Still running..."`) { t.Errorf("expected live-tail text in body; got:\n%s", body) } if !strings.HasSuffix(body, "event: done\ndata: {}\n\n") { t.Errorf("body does not end with done event; got:\n%s", body) } }