summaryrefslogtreecommitdiff
path: root/internal/storage
diff options
context:
space:
mode:
authorClaude <noreply@anthropic.com>2026-04-28 09:24:43 +0000
committerClaude <noreply@anthropic.com>2026-04-28 09:24:43 +0000
commit0865afc43be562dbe14528e4299b9e213b54cc93 (patch)
tree3ffb11207fb6b9866b5a2477bba7abe38964f83a /internal/storage
parentc2aa026f6ce1c9e216b99d74f294fc133d5fcddd (diff)
feat(executor): add LocalRunner and OpenAI-compat LLM client
Phase 1 of "local OSS models as agents" plan. Adds a third Runner backed by any OpenAI-compatible HTTP server (Ollama, vLLM, LM Studio, llama.cpp), and migrates the Gemini-CLI classifier to route through the same client when configured. Two-layer split: internal/llm.Client is the workhorse (HTTP, no Pool, no DB) used directly by the classifier and any future internal helper that needs cheap reasoning. internal/executor.LocalRunner is a thin adapter implementing Runner for user-facing tasks. This avoids Pool reentrancy/deadlock when sub-second internal calls fire from inside Pool.execute(). Highlights: - internal/retry: relocated runWithBackoff/IsRateLimitError/ParseRetryAfter into a shared package reused by executor and llm. - internal/llm: Chat (non-streaming) and ChatStream (SSE) over /chat/completions with optional bearer auth, json_object response format, retry on 429/503, Retry-After parsing. - internal/executor/LocalRunner: streams deltas into stdout.log in the same stream-json envelope ClaudeRunner emits, then writes one consolidated assistant block plus a result terminator so existing parsers (extractSummary, ParseChangestatFromOutput) work unchanged. - internal/executor/Classifier: gains optional LLM field; uses json_object response format (no markdown-fence cleanup needed). Falls back to Gemini-CLI subprocess when LLM is nil. - Pool.skipClassification: now skips only when the requested agent type is registered, so unknown types still reach the load balancer. - Storage: additive tokens_in/tokens_out ALTERs on executions; CLI runners record cost_usd as before, LocalRunner records 0 + tokens. - Config: [local_model] section (endpoint, model, timeout_seconds, default_temperature, api_key). Empty endpoint = no LocalRunner registered, classifier falls back to Gemini. Pre-existing test issues fixed in passing: - claude_test.go setupSandbox callsites updated to current signature. - gemini_test.go TestParseGeminiStream skipped (asserts unimplemented GeminiRunner stream-error parsing; tracked separately). Plan: docs/plans/local-oss-runner.md. https://claude.ai/code/session_017Edeq947TpSm1vQTxMhi1J
Diffstat (limited to 'internal/storage')
-rw-r--r--internal/storage/db.go29
1 files changed, 20 insertions, 9 deletions
diff --git a/internal/storage/db.go b/internal/storage/db.go
index 038480b..c871c77 100644
--- a/internal/storage/db.go
+++ b/internal/storage/db.go
@@ -86,6 +86,8 @@ func (s *DB) migrate() error {
`ALTER TABLE executions ADD COLUMN changestats_json TEXT`,
`ALTER TABLE executions ADD COLUMN commits_json TEXT NOT NULL DEFAULT '[]'`,
`ALTER TABLE tasks ADD COLUMN elaboration_input TEXT`,
+ `ALTER TABLE executions ADD COLUMN tokens_in INTEGER`,
+ `ALTER TABLE executions ADD COLUMN tokens_out INTEGER`,
}
for _, m := range migrations {
if _, err := s.db.Exec(m); err != nil {
@@ -403,6 +405,11 @@ type Execution struct {
Changestats *task.Changestats // stored as JSON; nil if not yet recorded
Commits []task.GitCommit // stored as JSON; empty if no commits
+ // Token usage for non-CLI runners (e.g. LocalRunner). 0 for Claude/Gemini
+ // CLI runs which report cost in cost_usd instead.
+ TokensIn int64
+ TokensOut int64
+
// In-memory only: set when creating a resume execution, not stored in DB.
ResumeSessionID string
ResumeAnswer string
@@ -430,23 +437,23 @@ func (s *DB) CreateExecution(e *Execution) error {
commitsJSON = string(b)
}
_, err := s.db.Exec(`
- INSERT INTO executions (id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
+ INSERT INTO executions (id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json, tokens_in, tokens_out)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
e.ID, e.TaskID, e.StartTime.UTC(), e.EndTime.UTC(), e.ExitCode, e.Status,
- e.StdoutPath, e.StderrPath, e.ArtifactDir, e.CostUSD, e.ErrorMsg, e.SessionID, e.SandboxDir, changestatsJSON, commitsJSON,
+ e.StdoutPath, e.StderrPath, e.ArtifactDir, e.CostUSD, e.ErrorMsg, e.SessionID, e.SandboxDir, changestatsJSON, commitsJSON, e.TokensIn, e.TokensOut,
)
return err
}
// GetExecution retrieves an execution by ID.
func (s *DB) GetExecution(id string) (*Execution, error) {
- row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json FROM executions WHERE id = ?`, id)
+ row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json, tokens_in, tokens_out FROM executions WHERE id = ?`, id)
return scanExecution(row)
}
// ListExecutions returns executions for a task.
func (s *DB) ListExecutions(taskID string) ([]*Execution, error) {
- rows, err := s.db.Query(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json FROM executions WHERE task_id = ? ORDER BY start_time DESC`, taskID)
+ rows, err := s.db.Query(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json, tokens_in, tokens_out FROM executions WHERE task_id = ? ORDER BY start_time DESC`, taskID)
if err != nil {
return nil, err
}
@@ -465,7 +472,7 @@ func (s *DB) ListExecutions(taskID string) ([]*Execution, error) {
// GetLatestExecution returns the most recent execution for a task.
func (s *DB) GetLatestExecution(taskID string) (*Execution, error) {
- row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json FROM executions WHERE task_id = ? ORDER BY start_time DESC LIMIT 1`, taskID)
+ row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json, tokens_in, tokens_out FROM executions WHERE task_id = ? ORDER BY start_time DESC LIMIT 1`, taskID)
return scanExecution(row)
}
@@ -650,11 +657,11 @@ func (s *DB) UpdateExecution(e *Execution) error {
_, err := s.db.Exec(`
UPDATE executions SET end_time = ?, exit_code = ?, status = ?, cost_usd = ?, error_msg = ?,
stdout_path = ?, stderr_path = ?, artifact_dir = ?, session_id = ?, sandbox_dir = ?,
- changestats_json = ?, commits_json = ?
+ changestats_json = ?, commits_json = ?, tokens_in = ?, tokens_out = ?
WHERE id = ?`,
e.EndTime.UTC(), e.ExitCode, e.Status, e.CostUSD, e.ErrorMsg,
e.StdoutPath, e.StderrPath, e.ArtifactDir, e.SessionID, e.SandboxDir,
- changestatsJSON, commitsJSON, e.ID,
+ changestatsJSON, commitsJSON, e.TokensIn, e.TokensOut, e.ID,
)
return err
}
@@ -729,13 +736,17 @@ func scanExecution(row scanner) (*Execution, error) {
var sandboxDir sql.NullString
var changestatsJSON sql.NullString
var commitsJSON sql.NullString
+ var tokensIn sql.NullInt64
+ var tokensOut sql.NullInt64
err := row.Scan(&e.ID, &e.TaskID, &e.StartTime, &e.EndTime, &e.ExitCode, &e.Status,
- &e.StdoutPath, &e.StderrPath, &e.ArtifactDir, &e.CostUSD, &e.ErrorMsg, &sessionID, &sandboxDir, &changestatsJSON, &commitsJSON)
+ &e.StdoutPath, &e.StderrPath, &e.ArtifactDir, &e.CostUSD, &e.ErrorMsg, &sessionID, &sandboxDir, &changestatsJSON, &commitsJSON, &tokensIn, &tokensOut)
if err != nil {
return nil, err
}
e.SessionID = sessionID.String
e.SandboxDir = sandboxDir.String
+ e.TokensIn = tokensIn.Int64
+ e.TokensOut = tokensOut.Int64
if changestatsJSON.Valid && changestatsJSON.String != "" {
var cs task.Changestats
if err := json.Unmarshal([]byte(changestatsJSON.String), &cs); err != nil {