1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
|
package executor
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"os"
"path/filepath"
"strings"
"time"
"github.com/thepeterstone/claudomator/internal/llm"
"github.com/thepeterstone/claudomator/internal/storage"
"github.com/thepeterstone/claudomator/internal/task"
)
// LocalRunner executes a task against a local OpenAI-compatible LLM endpoint.
// Unlike ClaudeRunner/GeminiRunner it does not spawn a subprocess, does not
// create a git sandbox, and does not edit files in project_dir — it produces
// text completions that are streamed to stdout.log in the same stream-json
// envelope Claude uses, so existing parsers (extractSummary, ParseChangestat)
// keep working unchanged.
type LocalRunner struct {
Client *llm.Client
Logger *slog.Logger
LogDir string
DefaultTemperature float64
}
// ExecLogDir implements LogPather so the pool can persist log paths before
// execution starts.
func (r *LocalRunner) ExecLogDir(execID string) string {
if r.LogDir == "" {
return ""
}
return filepath.Join(r.LogDir, execID)
}
// Run streams a chat completion to stdout.log. The response is wrapped in
// stream-json envelopes line-by-line so downstream parsers (summary,
// changestats) read it the same way they read Claude output.
func (r *LocalRunner) Run(ctx context.Context, t *task.Task, e *storage.Execution) error {
if r.Client == nil {
return fmt.Errorf("local runner: no LLM client configured")
}
if t.Agent.Instructions == "" {
return fmt.Errorf("local runner: empty instructions")
}
logDir := r.ExecLogDir(e.ID)
if logDir == "" {
return fmt.Errorf("local runner: LogDir not set")
}
if err := os.MkdirAll(logDir, 0o700); err != nil {
return fmt.Errorf("local runner: mkdir log: %w", err)
}
stdoutPath := filepath.Join(logDir, "stdout.log")
stderrPath := filepath.Join(logDir, "stderr.log")
e.StdoutPath = stdoutPath
e.StderrPath = stderrPath
stdout, err := os.Create(stdoutPath)
if err != nil {
return fmt.Errorf("local runner: create stdout: %w", err)
}
defer stdout.Close()
messages := []llm.Message{}
if sys := strings.TrimSpace(t.Agent.SystemPromptAppend); sys != "" {
messages = append(messages, llm.Message{Role: "system", Content: sys})
}
messages = append(messages, llm.Message{Role: "user", Content: t.Agent.Instructions})
temperature := t.Agent.Temperature
if temperature == nil && r.DefaultTemperature > 0 {
v := r.DefaultTemperature
temperature = &v
}
req := llm.ChatRequest{
Model: t.Agent.Model,
Messages: messages,
Temperature: temperature,
MaxTokens: t.Agent.MaxTokens,
}
start := time.Now()
resp, err := r.Client.ChatStream(ctx, req, func(delta string) {
if delta == "" {
return
}
writeAssistantTextLine(stdout, delta)
})
if err != nil {
writeResultLine(stdout, "error", err.Error(), 0, 0)
return fmt.Errorf("local runner: chat: %w", err)
}
elapsed := time.Since(start)
// Write one consolidated assistant envelope containing the full response.
// extractSummary and ParseChangestatFromOutput operate per-line, so a
// single envelope with the full text is what they expect to find.
if resp.Content != "" {
writeAssistantTextLine(stdout, resp.Content)
}
writeResultLine(stdout, "success", "", resp.PromptTokens, resp.OutputTokens)
e.CostUSD = 0
e.TokensIn = int64(resp.PromptTokens)
e.TokensOut = int64(resp.OutputTokens)
if r.Logger != nil {
r.Logger.Info("local runner completed",
"taskID", t.ID,
"model", resp.Model,
"tokens_in", resp.PromptTokens,
"tokens_out", resp.OutputTokens,
"finish_reason", resp.FinishReason,
"elapsed_ms", elapsed.Milliseconds(),
)
}
return nil
}
// writeAssistantTextLine writes a single stream-json line wrapping `text` as
// an assistant text block. Format matches what ClaudeRunner emits, so
// extractSummary and ParseChangestatFromFile read it transparently.
func writeAssistantTextLine(w *os.File, text string) {
line := struct {
Type string `json:"type"`
Message struct {
Content []struct {
Type string `json:"type"`
Text string `json:"text"`
} `json:"content"`
} `json:"message"`
}{Type: "assistant"}
line.Message.Content = []struct {
Type string `json:"type"`
Text string `json:"text"`
}{{Type: "text", Text: text}}
b, err := json.Marshal(line)
if err != nil {
return
}
w.Write(b)
w.Write([]byte("\n"))
}
// writeResultLine writes a final stream-json terminator line that downstream
// parsers can recognise. Mirrors the shape of the result line ClaudeRunner emits.
func writeResultLine(w *os.File, subtype, errMsg string, promptTokens, outputTokens int) {
line := map[string]any{
"type": "result",
"subtype": subtype,
"is_error": errMsg != "",
"prompt_tokens": promptTokens,
"output_tokens": outputTokens,
"total_cost_usd": 0.0,
}
if errMsg != "" {
line["result"] = errMsg
}
b, err := json.Marshal(line)
if err != nil {
return
}
w.Write(b)
w.Write([]byte("\n"))
}
|