package executor import ( "bufio" "encoding/json" "io" "log/slog" "sync" ) // QuestionHandler is called when an agent invokes AskUserQuestion. // Implementations should broadcast the question and block until an answer arrives. type QuestionHandler interface { HandleQuestion(taskID, toolUseID string, input json.RawMessage) (string, error) } // PendingQuestion holds state for a question awaiting a user answer. type PendingQuestion struct { TaskID string `json:"task_id"` ToolUseID string `json:"tool_use_id"` Input json.RawMessage `json:"input"` AnswerCh chan string `json:"-"` } // QuestionRegistry tracks pending questions across running tasks. type QuestionRegistry struct { mu sync.Mutex questions map[string]*PendingQuestion // keyed by toolUseID } // NewQuestionRegistry creates a new registry. func NewQuestionRegistry() *QuestionRegistry { return &QuestionRegistry{ questions: make(map[string]*PendingQuestion), } } // Register adds a pending question and returns its answer channel. func (qr *QuestionRegistry) Register(taskID, toolUseID string, input json.RawMessage) chan string { ch := make(chan string, 1) qr.mu.Lock() qr.questions[toolUseID] = &PendingQuestion{ TaskID: taskID, ToolUseID: toolUseID, Input: input, AnswerCh: ch, } qr.mu.Unlock() return ch } // Answer delivers an answer for a pending question. Returns false if no such question exists. func (qr *QuestionRegistry) Answer(toolUseID, answer string) bool { qr.mu.Lock() pq, ok := qr.questions[toolUseID] if ok { delete(qr.questions, toolUseID) } qr.mu.Unlock() if !ok { return false } pq.AnswerCh <- answer return true } // Get returns a pending question by tool_use_id, or nil. func (qr *QuestionRegistry) Get(toolUseID string) *PendingQuestion { qr.mu.Lock() defer qr.mu.Unlock() return qr.questions[toolUseID] } // PendingForTask returns all pending questions for a given task. func (qr *QuestionRegistry) PendingForTask(taskID string) []*PendingQuestion { qr.mu.Lock() defer qr.mu.Unlock() var result []*PendingQuestion for _, pq := range qr.questions { if pq.TaskID == taskID { result = append(result, pq) } } return result } // Remove removes a question without answering it (e.g., on task cancellation). func (qr *QuestionRegistry) Remove(toolUseID string) { qr.mu.Lock() delete(qr.questions, toolUseID) qr.mu.Unlock() } // extractAskUserQuestion parses a stream-json line and returns the tool_use_id and input // if the line is an assistant event containing an AskUserQuestion tool_use. func extractAskUserQuestion(line []byte) (string, json.RawMessage) { var event struct { Type string `json:"type"` Message struct { Content []struct { Type string `json:"type"` ID string `json:"id"` Name string `json:"name"` Input json.RawMessage `json:"input"` } `json:"content"` } `json:"message"` } if err := json.Unmarshal(line, &event); err != nil { return "", nil } if event.Type != "assistant" { return "", nil } for _, block := range event.Message.Content { if block.Type == "tool_use" && block.Name == "AskUserQuestion" { return block.ID, block.Input } } return "", nil } // streamAndParseWithQuestions reads streaming JSON, writes to w, parses cost, // and calls onQuestion for each detected AskUserQuestion tool_use. func streamAndParseWithQuestions(r io.Reader, w io.Writer, _ *slog.Logger, onQuestion func(string, json.RawMessage)) float64 { tee := io.TeeReader(r, w) scanner := bufio.NewScanner(tee) scanner.Buffer(make([]byte, 1024*1024), 1024*1024) var totalCost float64 for scanner.Scan() { line := scanner.Bytes() if toolUseID, input := extractAskUserQuestion(line); toolUseID != "" { if onQuestion != nil { onQuestion(toolUseID, input) } } var msg map[string]interface{} if err := json.Unmarshal(line, &msg); err != nil { continue } if costData, ok := msg["cost_usd"]; ok { if cost, ok := costData.(float64); ok { totalCost = cost } } } return totalCost } // buildToolResultMessage builds a tool_result message to feed back to Claude // as the answer to an AskUserQuestion tool_use. func buildToolResultMessage(toolUseID, answer string) []byte { answerJSON, _ := json.Marshal(map[string]interface{}{ "answers": map[string]string{"answer": answer}, }) msg := map[string]interface{}{ "message": map[string]interface{}{ "role": "user", "content": []map[string]interface{}{ { "type": "tool_result", "tool_use_id": toolUseID, "content": string(answerJSON), }, }, }, } result, _ := json.Marshal(msg) return result }