diff options
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/api/changestats.go | 15 | ||||
| -rw-r--r-- | internal/executor/executor.go | 2 | ||||
| -rw-r--r-- | internal/executor/executor_test.go | 1 | ||||
| -rw-r--r-- | internal/executor/helpers.go | 3 | ||||
| -rw-r--r-- | internal/executor/question.go | 84 | ||||
| -rw-r--r-- | internal/executor/question_test.go | 58 |
6 files changed, 3 insertions, 160 deletions
diff --git a/internal/api/changestats.go b/internal/api/changestats.go deleted file mode 100644 index 4f18f7f..0000000 --- a/internal/api/changestats.go +++ /dev/null @@ -1,15 +0,0 @@ -package api - -import "github.com/thepeterstone/claudomator/internal/task" - -// parseChangestatFromOutput delegates to task.ParseChangestatFromOutput. -// Kept as a package-local wrapper for use within the api package. -func parseChangestatFromOutput(output string) *task.Changestats { - return task.ParseChangestatFromOutput(output) -} - -// parseChangestatFromFile delegates to task.ParseChangestatFromFile. -// Kept as a package-local wrapper for use within the api package. -func parseChangestatFromFile(path string) *task.Changestats { - return task.ParseChangestatFromFile(path) -} diff --git a/internal/executor/executor.go b/internal/executor/executor.go index ac528c6..8257f31 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -84,7 +84,6 @@ type Pool struct { doneCh chan struct{} // signals when a worker slot is freed workerWg sync.WaitGroup // tracks in-flight execute/executeResume goroutines dispatchDone chan struct{} // closed when the dispatch goroutine exits - Questions *QuestionRegistry Classifier *Classifier } @@ -116,7 +115,6 @@ func NewPool(maxConcurrent int, runners map[string]Runner, store Store, logger * workCh: make(chan workItem, maxConcurrent*10+100), doneCh: make(chan struct{}, maxConcurrent), dispatchDone: make(chan struct{}), - Questions: NewQuestionRegistry(), } go p.dispatch() return p diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go index e947606..94ba65d 100644 --- a/internal/executor/executor_test.go +++ b/internal/executor/executor_test.go @@ -1137,7 +1137,6 @@ func newPoolWithMockStore(store Store) *Pool { resultCh: make(chan *Result, 4), workCh: make(chan workItem, 4), doneCh: make(chan struct{}, 2), - Questions: NewQuestionRegistry(), } } diff --git a/internal/executor/helpers.go b/internal/executor/helpers.go index aee7da0..bd5d9d5 100644 --- a/internal/executor/helpers.go +++ b/internal/executor/helpers.go @@ -88,6 +88,9 @@ Loop: totalCost = cost } } + if err := scanner.Err(); err != nil && streamErr == nil { + streamErr = fmt.Errorf("reading claude stdout: %w", err) + } return totalCost, sessionID, streamErr } diff --git a/internal/executor/question.go b/internal/executor/question.go index 9a2b55d..0ae1b08 100644 --- a/internal/executor/question.go +++ b/internal/executor/question.go @@ -5,92 +5,8 @@ import ( "encoding/json" "io" "log/slog" - "sync" ) -// QuestionHandler is called when an agent invokes AskUserQuestion. -// Implementations should broadcast the question and block until an answer arrives. -type QuestionHandler interface { - HandleQuestion(taskID, toolUseID string, input json.RawMessage) (string, error) -} - -// PendingQuestion holds state for a question awaiting a user answer. -type PendingQuestion struct { - TaskID string `json:"task_id"` - ToolUseID string `json:"tool_use_id"` - Input json.RawMessage `json:"input"` - AnswerCh chan string `json:"-"` -} - -// QuestionRegistry tracks pending questions across running tasks. -type QuestionRegistry struct { - mu sync.Mutex - questions map[string]*PendingQuestion // keyed by toolUseID -} - -// NewQuestionRegistry creates a new registry. -func NewQuestionRegistry() *QuestionRegistry { - return &QuestionRegistry{ - questions: make(map[string]*PendingQuestion), - } -} - -// Register adds a pending question and returns its answer channel. -func (qr *QuestionRegistry) Register(taskID, toolUseID string, input json.RawMessage) chan string { - ch := make(chan string, 1) - qr.mu.Lock() - qr.questions[toolUseID] = &PendingQuestion{ - TaskID: taskID, - ToolUseID: toolUseID, - Input: input, - AnswerCh: ch, - } - qr.mu.Unlock() - return ch -} - -// Answer delivers an answer for a pending question. Returns false if no such question exists. -func (qr *QuestionRegistry) Answer(toolUseID, answer string) bool { - qr.mu.Lock() - pq, ok := qr.questions[toolUseID] - if ok { - delete(qr.questions, toolUseID) - } - qr.mu.Unlock() - if !ok { - return false - } - pq.AnswerCh <- answer - return true -} - -// Get returns a pending question by tool_use_id, or nil. -func (qr *QuestionRegistry) Get(toolUseID string) *PendingQuestion { - qr.mu.Lock() - defer qr.mu.Unlock() - return qr.questions[toolUseID] -} - -// PendingForTask returns all pending questions for a given task. -func (qr *QuestionRegistry) PendingForTask(taskID string) []*PendingQuestion { - qr.mu.Lock() - defer qr.mu.Unlock() - var result []*PendingQuestion - for _, pq := range qr.questions { - if pq.TaskID == taskID { - result = append(result, pq) - } - } - return result -} - -// Remove removes a question without answering it (e.g., on task cancellation). -func (qr *QuestionRegistry) Remove(toolUseID string) { - qr.mu.Lock() - delete(qr.questions, toolUseID) - qr.mu.Unlock() -} - // extractAskUserQuestion parses a stream-json line and returns the tool_use_id and input // if the line is an assistant event containing an AskUserQuestion tool_use. func extractAskUserQuestion(line []byte) (string, json.RawMessage) { diff --git a/internal/executor/question_test.go b/internal/executor/question_test.go index d0fbed9..6686c15 100644 --- a/internal/executor/question_test.go +++ b/internal/executor/question_test.go @@ -9,64 +9,6 @@ import ( "testing" ) -func TestQuestionRegistry_RegisterAndAnswer(t *testing.T) { - qr := NewQuestionRegistry() - - ch := qr.Register("task-1", "toolu_abc", json.RawMessage(`{"question":"color?"}`)) - - // Answer should unblock the channel. - go func() { - ok := qr.Answer("toolu_abc", "blue") - if !ok { - t.Error("Answer returned false, expected true") - } - }() - - answer := <-ch - if answer != "blue" { - t.Errorf("want 'blue', got %q", answer) - } - - // Question should be removed after answering. - if qr.Get("toolu_abc") != nil { - t.Error("question should be removed after answering") - } -} - -func TestQuestionRegistry_AnswerUnknown(t *testing.T) { - qr := NewQuestionRegistry() - ok := qr.Answer("nonexistent", "anything") - if ok { - t.Error("expected false for unknown question") - } -} - -func TestQuestionRegistry_PendingForTask(t *testing.T) { - qr := NewQuestionRegistry() - qr.Register("task-1", "toolu_1", json.RawMessage(`{}`)) - qr.Register("task-1", "toolu_2", json.RawMessage(`{}`)) - qr.Register("task-2", "toolu_3", json.RawMessage(`{}`)) - - pending := qr.PendingForTask("task-1") - if len(pending) != 2 { - t.Errorf("want 2 pending for task-1, got %d", len(pending)) - } - - pending2 := qr.PendingForTask("task-2") - if len(pending2) != 1 { - t.Errorf("want 1 pending for task-2, got %d", len(pending2)) - } -} - -func TestQuestionRegistry_Remove(t *testing.T) { - qr := NewQuestionRegistry() - qr.Register("task-1", "toolu_x", json.RawMessage(`{}`)) - qr.Remove("toolu_x") - if qr.Get("toolu_x") != nil { - t.Error("question should be removed") - } -} - func TestExtractAskUserQuestion_DetectsQuestion(t *testing.T) { // Simulate a stream-json assistant event containing an AskUserQuestion tool_use. event := map[string]interface{}{ |
