From cf83444a9d341ae362e65a9f995100c69176887c Mon Sep 17 00:00:00 2001 From: Peter Stone Date: Thu, 5 Mar 2026 18:51:50 +0000 Subject: Rescue work from claudomator-work: question/answer, ratelimit, start-next-task Merges features developed in /site/doot.terst.org/claudomator-work (a stale clone) into the canonical repo: - executor: QuestionRegistry for human-in-the-loop answers, rate limit detection and exponential backoff retry (ratelimit.go, question.go) - executor/claude.go: process group isolation (SIGKILL orphans on cancel), os.Pipe for reliable stdout drain, backoff retry on rate limits - api/scripts.go: POST /api/scripts/start-next-task handler - api/server.go: startNextTaskScript field, answer-question route, BroadcastQuestion for WebSocket question events - web: Cancel/Restart buttons, question banner UI, log viewer, validate section, WebSocket auto-connect All tests pass. Co-Authored-By: Claude Sonnet 4.6 --- internal/executor/question.go | 172 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 172 insertions(+) create mode 100644 internal/executor/question.go (limited to 'internal/executor/question.go') diff --git a/internal/executor/question.go b/internal/executor/question.go new file mode 100644 index 0000000..9a2b55d --- /dev/null +++ b/internal/executor/question.go @@ -0,0 +1,172 @@ +package executor + +import ( + "bufio" + "encoding/json" + "io" + "log/slog" + "sync" +) + +// QuestionHandler is called when an agent invokes AskUserQuestion. +// Implementations should broadcast the question and block until an answer arrives. +type QuestionHandler interface { + HandleQuestion(taskID, toolUseID string, input json.RawMessage) (string, error) +} + +// PendingQuestion holds state for a question awaiting a user answer. +type PendingQuestion struct { + TaskID string `json:"task_id"` + ToolUseID string `json:"tool_use_id"` + Input json.RawMessage `json:"input"` + AnswerCh chan string `json:"-"` +} + +// QuestionRegistry tracks pending questions across running tasks. +type QuestionRegistry struct { + mu sync.Mutex + questions map[string]*PendingQuestion // keyed by toolUseID +} + +// NewQuestionRegistry creates a new registry. +func NewQuestionRegistry() *QuestionRegistry { + return &QuestionRegistry{ + questions: make(map[string]*PendingQuestion), + } +} + +// Register adds a pending question and returns its answer channel. +func (qr *QuestionRegistry) Register(taskID, toolUseID string, input json.RawMessage) chan string { + ch := make(chan string, 1) + qr.mu.Lock() + qr.questions[toolUseID] = &PendingQuestion{ + TaskID: taskID, + ToolUseID: toolUseID, + Input: input, + AnswerCh: ch, + } + qr.mu.Unlock() + return ch +} + +// Answer delivers an answer for a pending question. Returns false if no such question exists. +func (qr *QuestionRegistry) Answer(toolUseID, answer string) bool { + qr.mu.Lock() + pq, ok := qr.questions[toolUseID] + if ok { + delete(qr.questions, toolUseID) + } + qr.mu.Unlock() + if !ok { + return false + } + pq.AnswerCh <- answer + return true +} + +// Get returns a pending question by tool_use_id, or nil. +func (qr *QuestionRegistry) Get(toolUseID string) *PendingQuestion { + qr.mu.Lock() + defer qr.mu.Unlock() + return qr.questions[toolUseID] +} + +// PendingForTask returns all pending questions for a given task. +func (qr *QuestionRegistry) PendingForTask(taskID string) []*PendingQuestion { + qr.mu.Lock() + defer qr.mu.Unlock() + var result []*PendingQuestion + for _, pq := range qr.questions { + if pq.TaskID == taskID { + result = append(result, pq) + } + } + return result +} + +// Remove removes a question without answering it (e.g., on task cancellation). +func (qr *QuestionRegistry) Remove(toolUseID string) { + qr.mu.Lock() + delete(qr.questions, toolUseID) + qr.mu.Unlock() +} + +// extractAskUserQuestion parses a stream-json line and returns the tool_use_id and input +// if the line is an assistant event containing an AskUserQuestion tool_use. +func extractAskUserQuestion(line []byte) (string, json.RawMessage) { + var event struct { + Type string `json:"type"` + Message struct { + Content []struct { + Type string `json:"type"` + ID string `json:"id"` + Name string `json:"name"` + Input json.RawMessage `json:"input"` + } `json:"content"` + } `json:"message"` + } + if err := json.Unmarshal(line, &event); err != nil { + return "", nil + } + if event.Type != "assistant" { + return "", nil + } + for _, block := range event.Message.Content { + if block.Type == "tool_use" && block.Name == "AskUserQuestion" { + return block.ID, block.Input + } + } + return "", nil +} + +// streamAndParseWithQuestions reads streaming JSON, writes to w, parses cost, +// and calls onQuestion for each detected AskUserQuestion tool_use. +func streamAndParseWithQuestions(r io.Reader, w io.Writer, _ *slog.Logger, onQuestion func(string, json.RawMessage)) float64 { + tee := io.TeeReader(r, w) + scanner := bufio.NewScanner(tee) + scanner.Buffer(make([]byte, 1024*1024), 1024*1024) + + var totalCost float64 + for scanner.Scan() { + line := scanner.Bytes() + + if toolUseID, input := extractAskUserQuestion(line); toolUseID != "" { + if onQuestion != nil { + onQuestion(toolUseID, input) + } + } + + var msg map[string]interface{} + if err := json.Unmarshal(line, &msg); err != nil { + continue + } + if costData, ok := msg["cost_usd"]; ok { + if cost, ok := costData.(float64); ok { + totalCost = cost + } + } + } + return totalCost +} + +// buildToolResultMessage builds a tool_result message to feed back to Claude +// as the answer to an AskUserQuestion tool_use. +func buildToolResultMessage(toolUseID, answer string) []byte { + answerJSON, _ := json.Marshal(map[string]interface{}{ + "answers": map[string]string{"answer": answer}, + }) + msg := map[string]interface{}{ + "message": map[string]interface{}{ + "role": "user", + "content": []map[string]interface{}{ + { + "type": "tool_result", + "tool_use_id": toolUseID, + "content": string(answerJSON), + }, + }, + }, + } + result, _ := json.Marshal(msg) + return result +} -- cgit v1.2.3