summaryrefslogtreecommitdiff
path: root/internal/executor/question.go
diff options
context:
space:
mode:
authorPeter Stone <thepeterstone@gmail.com>2026-03-05 18:51:50 +0000
committerPeter Stone <thepeterstone@gmail.com>2026-03-05 18:51:50 +0000
commitcf83444a9d341ae362e65a9f995100c69176887c (patch)
tree0dc12aea9510d10d9e60e9c58473cbdb9db5db47 /internal/executor/question.go
parent680e5f668637248073c1f8f7e3547810ab1ada36 (diff)
Rescue work from claudomator-work: question/answer, ratelimit, start-next-task
Merges features developed in /site/doot.terst.org/claudomator-work (a stale clone) into the canonical repo: - executor: QuestionRegistry for human-in-the-loop answers, rate limit detection and exponential backoff retry (ratelimit.go, question.go) - executor/claude.go: process group isolation (SIGKILL orphans on cancel), os.Pipe for reliable stdout drain, backoff retry on rate limits - api/scripts.go: POST /api/scripts/start-next-task handler - api/server.go: startNextTaskScript field, answer-question route, BroadcastQuestion for WebSocket question events - web: Cancel/Restart buttons, question banner UI, log viewer, validate section, WebSocket auto-connect All tests pass. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/executor/question.go')
-rw-r--r--internal/executor/question.go172
1 files changed, 172 insertions, 0 deletions
diff --git a/internal/executor/question.go b/internal/executor/question.go
new file mode 100644
index 0000000..9a2b55d
--- /dev/null
+++ b/internal/executor/question.go
@@ -0,0 +1,172 @@
+package executor
+
+import (
+ "bufio"
+ "encoding/json"
+ "io"
+ "log/slog"
+ "sync"
+)
+
+// QuestionHandler is called when an agent invokes AskUserQuestion.
+// Implementations should broadcast the question and block until an answer arrives.
+type QuestionHandler interface {
+ HandleQuestion(taskID, toolUseID string, input json.RawMessage) (string, error)
+}
+
+// PendingQuestion holds state for a question awaiting a user answer.
+type PendingQuestion struct {
+ TaskID string `json:"task_id"`
+ ToolUseID string `json:"tool_use_id"`
+ Input json.RawMessage `json:"input"`
+ AnswerCh chan string `json:"-"`
+}
+
+// QuestionRegistry tracks pending questions across running tasks.
+type QuestionRegistry struct {
+ mu sync.Mutex
+ questions map[string]*PendingQuestion // keyed by toolUseID
+}
+
+// NewQuestionRegistry creates a new registry.
+func NewQuestionRegistry() *QuestionRegistry {
+ return &QuestionRegistry{
+ questions: make(map[string]*PendingQuestion),
+ }
+}
+
+// Register adds a pending question and returns its answer channel.
+func (qr *QuestionRegistry) Register(taskID, toolUseID string, input json.RawMessage) chan string {
+ ch := make(chan string, 1)
+ qr.mu.Lock()
+ qr.questions[toolUseID] = &PendingQuestion{
+ TaskID: taskID,
+ ToolUseID: toolUseID,
+ Input: input,
+ AnswerCh: ch,
+ }
+ qr.mu.Unlock()
+ return ch
+}
+
+// Answer delivers an answer for a pending question. Returns false if no such question exists.
+func (qr *QuestionRegistry) Answer(toolUseID, answer string) bool {
+ qr.mu.Lock()
+ pq, ok := qr.questions[toolUseID]
+ if ok {
+ delete(qr.questions, toolUseID)
+ }
+ qr.mu.Unlock()
+ if !ok {
+ return false
+ }
+ pq.AnswerCh <- answer
+ return true
+}
+
+// Get returns a pending question by tool_use_id, or nil.
+func (qr *QuestionRegistry) Get(toolUseID string) *PendingQuestion {
+ qr.mu.Lock()
+ defer qr.mu.Unlock()
+ return qr.questions[toolUseID]
+}
+
+// PendingForTask returns all pending questions for a given task.
+func (qr *QuestionRegistry) PendingForTask(taskID string) []*PendingQuestion {
+ qr.mu.Lock()
+ defer qr.mu.Unlock()
+ var result []*PendingQuestion
+ for _, pq := range qr.questions {
+ if pq.TaskID == taskID {
+ result = append(result, pq)
+ }
+ }
+ return result
+}
+
+// Remove removes a question without answering it (e.g., on task cancellation).
+func (qr *QuestionRegistry) Remove(toolUseID string) {
+ qr.mu.Lock()
+ delete(qr.questions, toolUseID)
+ qr.mu.Unlock()
+}
+
+// extractAskUserQuestion parses a stream-json line and returns the tool_use_id and input
+// if the line is an assistant event containing an AskUserQuestion tool_use.
+func extractAskUserQuestion(line []byte) (string, json.RawMessage) {
+ var event struct {
+ Type string `json:"type"`
+ Message struct {
+ Content []struct {
+ Type string `json:"type"`
+ ID string `json:"id"`
+ Name string `json:"name"`
+ Input json.RawMessage `json:"input"`
+ } `json:"content"`
+ } `json:"message"`
+ }
+ if err := json.Unmarshal(line, &event); err != nil {
+ return "", nil
+ }
+ if event.Type != "assistant" {
+ return "", nil
+ }
+ for _, block := range event.Message.Content {
+ if block.Type == "tool_use" && block.Name == "AskUserQuestion" {
+ return block.ID, block.Input
+ }
+ }
+ return "", nil
+}
+
+// streamAndParseWithQuestions reads streaming JSON, writes to w, parses cost,
+// and calls onQuestion for each detected AskUserQuestion tool_use.
+func streamAndParseWithQuestions(r io.Reader, w io.Writer, _ *slog.Logger, onQuestion func(string, json.RawMessage)) float64 {
+ tee := io.TeeReader(r, w)
+ scanner := bufio.NewScanner(tee)
+ scanner.Buffer(make([]byte, 1024*1024), 1024*1024)
+
+ var totalCost float64
+ for scanner.Scan() {
+ line := scanner.Bytes()
+
+ if toolUseID, input := extractAskUserQuestion(line); toolUseID != "" {
+ if onQuestion != nil {
+ onQuestion(toolUseID, input)
+ }
+ }
+
+ var msg map[string]interface{}
+ if err := json.Unmarshal(line, &msg); err != nil {
+ continue
+ }
+ if costData, ok := msg["cost_usd"]; ok {
+ if cost, ok := costData.(float64); ok {
+ totalCost = cost
+ }
+ }
+ }
+ return totalCost
+}
+
+// buildToolResultMessage builds a tool_result message to feed back to Claude
+// as the answer to an AskUserQuestion tool_use.
+func buildToolResultMessage(toolUseID, answer string) []byte {
+ answerJSON, _ := json.Marshal(map[string]interface{}{
+ "answers": map[string]string{"answer": answer},
+ })
+ msg := map[string]interface{}{
+ "message": map[string]interface{}{
+ "role": "user",
+ "content": []map[string]interface{}{
+ {
+ "type": "tool_result",
+ "tool_use_id": toolUseID,
+ "content": string(answerJSON),
+ },
+ },
+ },
+ }
+ result, _ := json.Marshal(msg)
+ return result
+}