summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--internal/api/changestats.go15
-rw-r--r--internal/executor/executor.go2
-rw-r--r--internal/executor/executor_test.go1
-rw-r--r--internal/executor/helpers.go3
-rw-r--r--internal/executor/question.go84
-rw-r--r--internal/executor/question_test.go58
6 files changed, 3 insertions, 160 deletions
diff --git a/internal/api/changestats.go b/internal/api/changestats.go
deleted file mode 100644
index 4f18f7f..0000000
--- a/internal/api/changestats.go
+++ /dev/null
@@ -1,15 +0,0 @@
-package api
-
-import "github.com/thepeterstone/claudomator/internal/task"
-
-// parseChangestatFromOutput delegates to task.ParseChangestatFromOutput.
-// Kept as a package-local wrapper for use within the api package.
-func parseChangestatFromOutput(output string) *task.Changestats {
- return task.ParseChangestatFromOutput(output)
-}
-
-// parseChangestatFromFile delegates to task.ParseChangestatFromFile.
-// Kept as a package-local wrapper for use within the api package.
-func parseChangestatFromFile(path string) *task.Changestats {
- return task.ParseChangestatFromFile(path)
-}
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
index ac528c6..8257f31 100644
--- a/internal/executor/executor.go
+++ b/internal/executor/executor.go
@@ -84,7 +84,6 @@ type Pool struct {
doneCh chan struct{} // signals when a worker slot is freed
workerWg sync.WaitGroup // tracks in-flight execute/executeResume goroutines
dispatchDone chan struct{} // closed when the dispatch goroutine exits
- Questions *QuestionRegistry
Classifier *Classifier
}
@@ -116,7 +115,6 @@ func NewPool(maxConcurrent int, runners map[string]Runner, store Store, logger *
workCh: make(chan workItem, maxConcurrent*10+100),
doneCh: make(chan struct{}, maxConcurrent),
dispatchDone: make(chan struct{}),
- Questions: NewQuestionRegistry(),
}
go p.dispatch()
return p
diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go
index e947606..94ba65d 100644
--- a/internal/executor/executor_test.go
+++ b/internal/executor/executor_test.go
@@ -1137,7 +1137,6 @@ func newPoolWithMockStore(store Store) *Pool {
resultCh: make(chan *Result, 4),
workCh: make(chan workItem, 4),
doneCh: make(chan struct{}, 2),
- Questions: NewQuestionRegistry(),
}
}
diff --git a/internal/executor/helpers.go b/internal/executor/helpers.go
index aee7da0..bd5d9d5 100644
--- a/internal/executor/helpers.go
+++ b/internal/executor/helpers.go
@@ -88,6 +88,9 @@ Loop:
totalCost = cost
}
}
+ if err := scanner.Err(); err != nil && streamErr == nil {
+ streamErr = fmt.Errorf("reading claude stdout: %w", err)
+ }
return totalCost, sessionID, streamErr
}
diff --git a/internal/executor/question.go b/internal/executor/question.go
index 9a2b55d..0ae1b08 100644
--- a/internal/executor/question.go
+++ b/internal/executor/question.go
@@ -5,92 +5,8 @@ import (
"encoding/json"
"io"
"log/slog"
- "sync"
)
-// QuestionHandler is called when an agent invokes AskUserQuestion.
-// Implementations should broadcast the question and block until an answer arrives.
-type QuestionHandler interface {
- HandleQuestion(taskID, toolUseID string, input json.RawMessage) (string, error)
-}
-
-// PendingQuestion holds state for a question awaiting a user answer.
-type PendingQuestion struct {
- TaskID string `json:"task_id"`
- ToolUseID string `json:"tool_use_id"`
- Input json.RawMessage `json:"input"`
- AnswerCh chan string `json:"-"`
-}
-
-// QuestionRegistry tracks pending questions across running tasks.
-type QuestionRegistry struct {
- mu sync.Mutex
- questions map[string]*PendingQuestion // keyed by toolUseID
-}
-
-// NewQuestionRegistry creates a new registry.
-func NewQuestionRegistry() *QuestionRegistry {
- return &QuestionRegistry{
- questions: make(map[string]*PendingQuestion),
- }
-}
-
-// Register adds a pending question and returns its answer channel.
-func (qr *QuestionRegistry) Register(taskID, toolUseID string, input json.RawMessage) chan string {
- ch := make(chan string, 1)
- qr.mu.Lock()
- qr.questions[toolUseID] = &PendingQuestion{
- TaskID: taskID,
- ToolUseID: toolUseID,
- Input: input,
- AnswerCh: ch,
- }
- qr.mu.Unlock()
- return ch
-}
-
-// Answer delivers an answer for a pending question. Returns false if no such question exists.
-func (qr *QuestionRegistry) Answer(toolUseID, answer string) bool {
- qr.mu.Lock()
- pq, ok := qr.questions[toolUseID]
- if ok {
- delete(qr.questions, toolUseID)
- }
- qr.mu.Unlock()
- if !ok {
- return false
- }
- pq.AnswerCh <- answer
- return true
-}
-
-// Get returns a pending question by tool_use_id, or nil.
-func (qr *QuestionRegistry) Get(toolUseID string) *PendingQuestion {
- qr.mu.Lock()
- defer qr.mu.Unlock()
- return qr.questions[toolUseID]
-}
-
-// PendingForTask returns all pending questions for a given task.
-func (qr *QuestionRegistry) PendingForTask(taskID string) []*PendingQuestion {
- qr.mu.Lock()
- defer qr.mu.Unlock()
- var result []*PendingQuestion
- for _, pq := range qr.questions {
- if pq.TaskID == taskID {
- result = append(result, pq)
- }
- }
- return result
-}
-
-// Remove removes a question without answering it (e.g., on task cancellation).
-func (qr *QuestionRegistry) Remove(toolUseID string) {
- qr.mu.Lock()
- delete(qr.questions, toolUseID)
- qr.mu.Unlock()
-}
-
// extractAskUserQuestion parses a stream-json line and returns the tool_use_id and input
// if the line is an assistant event containing an AskUserQuestion tool_use.
func extractAskUserQuestion(line []byte) (string, json.RawMessage) {
diff --git a/internal/executor/question_test.go b/internal/executor/question_test.go
index d0fbed9..6686c15 100644
--- a/internal/executor/question_test.go
+++ b/internal/executor/question_test.go
@@ -9,64 +9,6 @@ import (
"testing"
)
-func TestQuestionRegistry_RegisterAndAnswer(t *testing.T) {
- qr := NewQuestionRegistry()
-
- ch := qr.Register("task-1", "toolu_abc", json.RawMessage(`{"question":"color?"}`))
-
- // Answer should unblock the channel.
- go func() {
- ok := qr.Answer("toolu_abc", "blue")
- if !ok {
- t.Error("Answer returned false, expected true")
- }
- }()
-
- answer := <-ch
- if answer != "blue" {
- t.Errorf("want 'blue', got %q", answer)
- }
-
- // Question should be removed after answering.
- if qr.Get("toolu_abc") != nil {
- t.Error("question should be removed after answering")
- }
-}
-
-func TestQuestionRegistry_AnswerUnknown(t *testing.T) {
- qr := NewQuestionRegistry()
- ok := qr.Answer("nonexistent", "anything")
- if ok {
- t.Error("expected false for unknown question")
- }
-}
-
-func TestQuestionRegistry_PendingForTask(t *testing.T) {
- qr := NewQuestionRegistry()
- qr.Register("task-1", "toolu_1", json.RawMessage(`{}`))
- qr.Register("task-1", "toolu_2", json.RawMessage(`{}`))
- qr.Register("task-2", "toolu_3", json.RawMessage(`{}`))
-
- pending := qr.PendingForTask("task-1")
- if len(pending) != 2 {
- t.Errorf("want 2 pending for task-1, got %d", len(pending))
- }
-
- pending2 := qr.PendingForTask("task-2")
- if len(pending2) != 1 {
- t.Errorf("want 1 pending for task-2, got %d", len(pending2))
- }
-}
-
-func TestQuestionRegistry_Remove(t *testing.T) {
- qr := NewQuestionRegistry()
- qr.Register("task-1", "toolu_x", json.RawMessage(`{}`))
- qr.Remove("toolu_x")
- if qr.Get("toolu_x") != nil {
- t.Error("question should be removed")
- }
-}
-
func TestExtractAskUserQuestion_DetectsQuestion(t *testing.T) {
// Simulate a stream-json assistant event containing an AskUserQuestion tool_use.
event := map[string]interface{}{