package storage import ( "database/sql" "encoding/json" "fmt" "strings" "time" "github.com/thepeterstone/claudomator/internal/task" _ "github.com/mattn/go-sqlite3" ) type DB struct { db *sql.DB } func Open(path string) (*DB, error) { db, err := sql.Open("sqlite3", path+"?_journal_mode=WAL&_busy_timeout=5000") if err != nil { return nil, fmt.Errorf("opening database: %w", err) } s := &DB{db: db} if err := s.migrate(); err != nil { db.Close() return nil, fmt.Errorf("running migrations: %w", err) } return s, nil } func (s *DB) Close() error { return s.db.Close() } func (s *DB) migrate() error { schema := ` CREATE TABLE IF NOT EXISTS tasks ( id TEXT PRIMARY KEY, name TEXT NOT NULL, description TEXT, config_json TEXT NOT NULL, priority TEXT NOT NULL DEFAULT 'normal', timeout_ns INTEGER NOT NULL DEFAULT 0, retry_json TEXT NOT NULL DEFAULT '{}', tags_json TEXT NOT NULL DEFAULT '[]', depends_on_json TEXT NOT NULL DEFAULT '[]', parent_task_id TEXT, state TEXT NOT NULL DEFAULT 'PENDING', created_at DATETIME NOT NULL, updated_at DATETIME NOT NULL ); CREATE TABLE IF NOT EXISTS executions ( id TEXT PRIMARY KEY, task_id TEXT NOT NULL, start_time DATETIME NOT NULL, end_time DATETIME, exit_code INTEGER, status TEXT NOT NULL, stdout_path TEXT, stderr_path TEXT, artifact_dir TEXT, cost_usd REAL, error_msg TEXT, FOREIGN KEY (task_id) REFERENCES tasks(id) ); CREATE INDEX IF NOT EXISTS idx_tasks_state ON tasks(state); CREATE INDEX IF NOT EXISTS idx_tasks_parent_task_id ON tasks(parent_task_id); CREATE INDEX IF NOT EXISTS idx_executions_status ON executions(status); CREATE INDEX IF NOT EXISTS idx_executions_task_id ON executions(task_id); CREATE INDEX IF NOT EXISTS idx_executions_start_time ON executions(start_time); ` if _, err := s.db.Exec(schema); err != nil { return err } // Additive migrations for columns added after initial schema. migrations := []string{ `ALTER TABLE tasks ADD COLUMN parent_task_id TEXT`, `ALTER TABLE tasks ADD COLUMN rejection_comment TEXT`, `ALTER TABLE tasks ADD COLUMN question_json TEXT`, `ALTER TABLE executions ADD COLUMN session_id TEXT`, `ALTER TABLE executions ADD COLUMN sandbox_dir TEXT`, `ALTER TABLE tasks ADD COLUMN summary TEXT`, `ALTER TABLE tasks ADD COLUMN interactions_json TEXT NOT NULL DEFAULT '[]'`, `ALTER TABLE executions ADD COLUMN changestats_json TEXT`, `ALTER TABLE executions ADD COLUMN commits_json TEXT NOT NULL DEFAULT '[]'`, `ALTER TABLE tasks ADD COLUMN elaboration_input TEXT`, `ALTER TABLE tasks ADD COLUMN project TEXT`, `ALTER TABLE tasks ADD COLUMN repository_url TEXT`, `CREATE TABLE IF NOT EXISTS push_subscriptions ( id TEXT PRIMARY KEY, endpoint TEXT NOT NULL UNIQUE, p256dh_key TEXT NOT NULL, auth_key TEXT NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP )`, `CREATE TABLE IF NOT EXISTS settings ( key TEXT PRIMARY KEY, value TEXT NOT NULL )`, `CREATE TABLE IF NOT EXISTS agent_events ( id TEXT PRIMARY KEY, agent TEXT NOT NULL, event TEXT NOT NULL, timestamp DATETIME NOT NULL, until DATETIME, reason TEXT )`, `CREATE INDEX IF NOT EXISTS idx_agent_events_agent ON agent_events(agent)`, `CREATE INDEX IF NOT EXISTS idx_agent_events_timestamp ON agent_events(timestamp)`, } for _, m := range migrations { if _, err := s.db.Exec(m); err != nil { // SQLite returns an error if the column already exists; ignore it. if !isColumnExistsError(err) { return err } } } return nil } func isColumnExistsError(err error) bool { msg := err.Error() return strings.Contains(msg, "duplicate column name") || strings.Contains(msg, "already exists") } // CreateTask inserts a task into the database. func (s *DB) CreateTask(t *task.Task) error { configJSON, err := json.Marshal(t.Agent) if err != nil { return fmt.Errorf("marshaling config: %w", err) } retryJSON, err := json.Marshal(t.Retry) if err != nil { return fmt.Errorf("marshaling retry: %w", err) } tagsJSON, err := json.Marshal(t.Tags) if err != nil { return fmt.Errorf("marshaling tags: %w", err) } depsJSON, err := json.Marshal(t.DependsOn) if err != nil { return fmt.Errorf("marshaling depends_on: %w", err) } _, err = s.db.Exec(` INSERT INTO tasks (id, name, description, elaboration_input, project, repository_url, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, t.ID, t.Name, t.Description, t.ElaborationInput, t.Project, t.RepositoryURL, string(configJSON), string(t.Priority), t.Timeout.Duration.Nanoseconds(), string(retryJSON), string(tagsJSON), string(depsJSON), t.ParentTaskID, string(t.State), t.CreatedAt.UTC(), t.UpdatedAt.UTC(), ) return err } // GetTask retrieves a task by ID. func (s *DB) GetTask(id string) (*task.Task, error) { row := s.db.QueryRow(`SELECT id, name, description, elaboration_input, project, repository_url, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json, summary, interactions_json FROM tasks WHERE id = ?`, id) return scanTask(row) } // ListTasks returns tasks matching the given filter. func (s *DB) ListTasks(filter TaskFilter) ([]*task.Task, error) { query := `SELECT id, name, description, elaboration_input, project, repository_url, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json, summary, interactions_json FROM tasks WHERE 1=1` var args []interface{} if filter.State != "" { query += " AND state = ?" args = append(args, string(filter.State)) } if !filter.Since.IsZero() { query += " AND updated_at > ?" args = append(args, filter.Since.UTC()) } query += " ORDER BY created_at DESC" if filter.Limit > 0 { query += " LIMIT ?" args = append(args, filter.Limit) } rows, err := s.db.Query(query, args...) if err != nil { return nil, err } defer rows.Close() var tasks []*task.Task for rows.Next() { t, err := scanTaskRows(rows) if err != nil { return nil, err } tasks = append(tasks, t) } return tasks, rows.Err() } // ListSubtasks returns all tasks whose parent_task_id matches the given ID. func (s *DB) ListSubtasks(parentID string) ([]*task.Task, error) { rows, err := s.db.Query(`SELECT id, name, description, elaboration_input, project, repository_url, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json, summary, interactions_json FROM tasks WHERE parent_task_id = ? ORDER BY created_at ASC`, parentID) if err != nil { return nil, err } defer rows.Close() var tasks []*task.Task for rows.Next() { t, err := scanTaskRows(rows) if err != nil { return nil, err } tasks = append(tasks, t) } return tasks, rows.Err() } // UpdateTaskState atomically updates a task's state, enforcing valid transitions. func (s *DB) UpdateTaskState(id string, newState task.State) error { tx, err := s.db.Begin() if err != nil { return err } defer tx.Rollback() //nolint:errcheck var currentState string if err := tx.QueryRow(`SELECT state FROM tasks WHERE id = ?`, id).Scan(¤tState); err != nil { if err == sql.ErrNoRows { return fmt.Errorf("task %q not found", id) } return err } if !task.ValidTransition(task.State(currentState), newState) { return fmt.Errorf("invalid state transition %s → %s for task %q", currentState, newState, id) } now := time.Now().UTC() if _, err := tx.Exec(`UPDATE tasks SET state = ?, updated_at = ? WHERE id = ?`, string(newState), now, id); err != nil { return err } return tx.Commit() } // ResetTaskForRetry sets a task to QUEUED and clears its agent type/model // so it can be re-classified. func (s *DB) ResetTaskForRetry(id string) (*task.Task, error) { tx, err := s.db.Begin() if err != nil { return nil, err } defer tx.Rollback() //nolint:errcheck t, err := scanTask(tx.QueryRow(`SELECT id, name, description, elaboration_input, project, repository_url, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json, summary, interactions_json FROM tasks WHERE id = ?`, id)) if err != nil { if err == sql.ErrNoRows { return nil, fmt.Errorf("task %q not found", id) } return nil, err } if !task.ValidTransition(t.State, task.StateQueued) { return nil, fmt.Errorf("invalid state transition %s → %s for task %q", t.State, task.StateQueued, id) } t.Agent.Type = "" t.Agent.Model = "" configJSON, _ := json.Marshal(t.Agent) now := time.Now().UTC() if _, err := tx.Exec(`UPDATE tasks SET state = ?, config_json = ?, question_json = NULL, updated_at = ? WHERE id = ?`, string(task.StateQueued), string(configJSON), now, id); err != nil { return nil, err } if err := tx.Commit(); err != nil { return nil, err } t.State = task.StateQueued return t, nil } // UpdateTaskAgent updates only the agent configuration of a task. func (s *DB) UpdateTaskAgent(id string, agent task.AgentConfig) error { configJSON, err := json.Marshal(agent) if err != nil { return fmt.Errorf("marshaling agent config: %w", err) } now := time.Now().UTC() _, err = s.db.Exec(`UPDATE tasks SET config_json = ?, updated_at = ? WHERE id = ?`, string(configJSON), now, id) return err } // RejectTask sets a task's state to PENDING and stores the rejection comment. func (s *DB) RejectTask(id, comment string) error { now := time.Now().UTC() result, err := s.db.Exec(`UPDATE tasks SET state = ?, rejection_comment = ?, updated_at = ? WHERE id = ?`, string(task.StatePending), comment, now, id) if err != nil { return err } n, err := result.RowsAffected() if err != nil { return err } if n == 0 { return fmt.Errorf("task %q not found", id) } return nil } // TaskUpdate holds the fields that UpdateTask may change. type TaskUpdate struct { Name string Description string Config task.AgentConfig Priority task.Priority TimeoutNS int64 Retry task.RetryConfig Tags []string DependsOn []string } // UpdateTask replaces editable fields on a task and resets its state to PENDING. // Returns an error if the task does not exist. func (s *DB) UpdateTask(id string, u TaskUpdate) error { configJSON, err := json.Marshal(u.Config) if err != nil { return fmt.Errorf("marshaling config: %w", err) } retryJSON, err := json.Marshal(u.Retry) if err != nil { return fmt.Errorf("marshaling retry: %w", err) } tags := u.Tags if tags == nil { tags = []string{} } tagsJSON, err := json.Marshal(tags) if err != nil { return fmt.Errorf("marshaling tags: %w", err) } deps := u.DependsOn if deps == nil { deps = []string{} } depsJSON, err := json.Marshal(deps) if err != nil { return fmt.Errorf("marshaling depends_on: %w", err) } now := time.Now().UTC() result, err := s.db.Exec(` UPDATE tasks SET name = ?, description = ?, config_json = ?, priority = ?, timeout_ns = ?, retry_json = ?, tags_json = ?, depends_on_json = ?, state = ?, updated_at = ? WHERE id = ?`, u.Name, u.Description, string(configJSON), string(u.Priority), u.TimeoutNS, string(retryJSON), string(tagsJSON), string(depsJSON), string(task.StatePending), now, id, ) if err != nil { return err } n, err := result.RowsAffected() if err != nil { return err } if n == 0 { return fmt.Errorf("task %q not found", id) } return nil } // TaskFilter specifies criteria for listing tasks. type TaskFilter struct { State task.State Limit int Since time.Time } // GetMaxUpdatedAt returns the most recent updated_at timestamp across all tasks. func (s *DB) GetMaxUpdatedAt() (time.Time, error) { var t sql.NullString err := s.db.QueryRow(`SELECT MAX(updated_at) FROM tasks`).Scan(&t) if err != nil { return time.Time{}, err } if !t.Valid || t.String == "" { return time.Time{}, nil } // Try parsing different formats SQLite might return formats := []string{ "2006-01-02 15:04:05.999999999-07:00", "2006-01-02T15:04:05Z07:00", time.RFC3339, "2006-01-02 15:04:05", } for _, f := range formats { parsed, err := time.Parse(f, t.String) if err == nil { return parsed.UTC(), nil } } return time.Time{}, fmt.Errorf("could not parse max updated_at: %q", t.String) } // Execution represents a single run of a task. type Execution struct { ID string TaskID string StartTime time.Time EndTime time.Time ExitCode int Status string StdoutPath string StderrPath string ArtifactDir string CostUSD float64 ErrorMsg string SessionID string // claude --session-id; persisted for resume SandboxDir string // preserved sandbox path when task is BLOCKED; resume must run here Changestats *task.Changestats // stored as JSON; nil if not yet recorded Commits []task.GitCommit // stored as JSON; empty if no commits // In-memory only: set when creating a resume execution, not stored in DB. ResumeSessionID string ResumeAnswer string // In-memory only: populated by the runner after successful execution. Summary string } // CreateExecution inserts an execution record. func (s *DB) CreateExecution(e *Execution) error { var changestatsJSON *string if e.Changestats != nil { b, err := json.Marshal(e.Changestats) if err != nil { return fmt.Errorf("marshaling changestats: %w", err) } s := string(b) changestatsJSON = &s } commitsJSON := "[]" if len(e.Commits) > 0 { b, err := json.Marshal(e.Commits) if err != nil { return fmt.Errorf("marshaling commits: %w", err) } commitsJSON = string(b) } _, err := s.db.Exec(` INSERT INTO executions (id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, e.ID, e.TaskID, e.StartTime.UTC(), e.EndTime.UTC(), e.ExitCode, e.Status, e.StdoutPath, e.StderrPath, e.ArtifactDir, e.CostUSD, e.ErrorMsg, e.SessionID, e.SandboxDir, changestatsJSON, commitsJSON, ) return err } // GetExecution retrieves an execution by ID. func (s *DB) GetExecution(id string) (*Execution, error) { row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json FROM executions WHERE id = ?`, id) return scanExecution(row) } // ListExecutions returns executions for a task. func (s *DB) ListExecutions(taskID string) ([]*Execution, error) { rows, err := s.db.Query(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json FROM executions WHERE task_id = ? ORDER BY start_time DESC`, taskID) if err != nil { return nil, err } defer rows.Close() var execs []*Execution for rows.Next() { e, err := scanExecutionRows(rows) if err != nil { return nil, err } execs = append(execs, e) } return execs, rows.Err() } // GetLatestExecution returns the most recent execution for a task. func (s *DB) GetLatestExecution(taskID string) (*Execution, error) { row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json FROM executions WHERE task_id = ? ORDER BY start_time DESC LIMIT 1`, taskID) return scanExecution(row) } // DeleteTask removes a task and all its executions and subtasks (recursively). // Returns an error if the task does not exist. All deletes run in a single // transaction so a partial failure cannot leave orphaned executions. func (s *DB) DeleteTask(id string) error { tx, err := s.db.Begin() if err != nil { return err } defer tx.Rollback() //nolint:errcheck // Collect all task IDs (root + all descendants) via recursive CTE. rows, err := tx.Query(` WITH RECURSIVE subtasks(id) AS ( SELECT id FROM tasks WHERE id = ? UNION ALL SELECT t.id FROM tasks t JOIN subtasks s ON t.parent_task_id = s.id ) SELECT id FROM subtasks`, id) if err != nil { return fmt.Errorf("collecting subtask ids: %w", err) } var toDelete []string for rows.Next() { var tid string if err := rows.Scan(&tid); err != nil { rows.Close() return err } toDelete = append(toDelete, tid) } rows.Close() if err := rows.Err(); err != nil { return err } if len(toDelete) == 0 { return fmt.Errorf("task %q not found", id) } // Delete executions for all collected tasks, then the tasks themselves. for _, tid := range toDelete { if _, err := tx.Exec(`DELETE FROM executions WHERE task_id = ?`, tid); err != nil { return fmt.Errorf("deleting executions for task %q: %w", tid, err) } } for _, tid := range toDelete { if _, err := tx.Exec(`DELETE FROM tasks WHERE id = ?`, tid); err != nil { return fmt.Errorf("deleting task %q: %w", tid, err) } } return tx.Commit() } // RecentExecution is returned by ListRecentExecutions (JOIN with tasks for name). type RecentExecution struct { ID string `json:"id"` TaskID string `json:"task_id"` TaskName string `json:"task_name"` State string `json:"state"` StartedAt time.Time `json:"started_at"` FinishedAt *time.Time `json:"finished_at,omitempty"` DurationMS *int64 `json:"duration_ms,omitempty"` ExitCode int `json:"exit_code"` CostUSD float64 `json:"cost_usd"` StdoutPath string `json:"stdout_path"` } // ThroughputBucket is one time-bucket of execution counts by outcome. type ThroughputBucket struct { Hour string `json:"hour"` // RFC3339 truncated to hour Completed int `json:"completed"` Failed int `json:"failed"` Other int `json:"other"` } // BillingDay is the aggregated cost and run count for a calendar day. type BillingDay struct { Day string `json:"day"` // YYYY-MM-DD CostUSD float64 `json:"cost_usd"` Runs int `json:"runs"` } // FailedExecution is a failed/timed-out/budget-exceeded execution with its error. type FailedExecution struct { ID string `json:"id"` TaskID string `json:"task_id"` TaskName string `json:"task_name"` Status string `json:"status"` ErrorMsg string `json:"error_msg"` Category string `json:"category"` // quota | timeout | rate_limit | git | failed StartedAt time.Time `json:"started_at"` } // DashboardStats is returned by QueryDashboardStats. type DashboardStats struct { Throughput []ThroughputBucket `json:"throughput"` Billing []BillingDay `json:"billing"` Failures []FailedExecution `json:"failures"` } // QueryDashboardStats returns pre-aggregated stats for the given window. func (s *DB) QueryDashboardStats(since time.Time) (*DashboardStats, error) { stats := &DashboardStats{ Throughput: []ThroughputBucket{}, Billing: []BillingDay{}, Failures: []FailedExecution{}, } // Throughput: completions per hour bucket tpRows, err := s.db.Query(` SELECT strftime('%Y-%m-%dT%H:00:00Z', start_time) as hour, SUM(CASE WHEN status IN ('COMPLETED','READY') THEN 1 ELSE 0 END), SUM(CASE WHEN status IN ('FAILED','TIMED_OUT','BUDGET_EXCEEDED') THEN 1 ELSE 0 END), SUM(CASE WHEN status NOT IN ('COMPLETED','READY','FAILED','TIMED_OUT','BUDGET_EXCEEDED') THEN 1 ELSE 0 END) FROM executions WHERE start_time >= ? AND status NOT IN ('RUNNING','QUEUED','PENDING') GROUP BY hour ORDER BY hour ASC`, since.UTC()) if err != nil { return nil, err } defer tpRows.Close() for tpRows.Next() { var b ThroughputBucket if err := tpRows.Scan(&b.Hour, &b.Completed, &b.Failed, &b.Other); err != nil { return nil, err } stats.Throughput = append(stats.Throughput, b) } if err := tpRows.Err(); err != nil { return nil, err } // Billing: cost per day billRows, err := s.db.Query(` SELECT date(start_time) as day, COALESCE(SUM(cost_usd),0), COUNT(*) FROM executions WHERE start_time >= ? GROUP BY day ORDER BY day ASC`, since.UTC()) if err != nil { return nil, err } defer billRows.Close() for billRows.Next() { var b BillingDay if err := billRows.Scan(&b.Day, &b.CostUSD, &b.Runs); err != nil { return nil, err } stats.Billing = append(stats.Billing, b) } if err := billRows.Err(); err != nil { return nil, err } // Failures: recent failed executions with error messages failRows, err := s.db.Query(` SELECT e.id, e.task_id, t.name, e.status, COALESCE(e.error_msg,''), e.start_time FROM executions e JOIN tasks t ON e.task_id = t.id WHERE e.start_time >= ? AND e.status IN ('FAILED','TIMED_OUT','BUDGET_EXCEEDED') ORDER BY e.start_time DESC LIMIT 50`, since.UTC()) if err != nil { return nil, err } defer failRows.Close() for failRows.Next() { var f FailedExecution if err := failRows.Scan(&f.ID, &f.TaskID, &f.TaskName, &f.Status, &f.ErrorMsg, &f.StartedAt); err != nil { return nil, err } f.Category = classifyError(f.Status, f.ErrorMsg) stats.Failures = append(stats.Failures, f) } if err := failRows.Err(); err != nil { return nil, err } return stats, nil } // classifyError maps a status + error message to a human category. func classifyError(status, msg string) string { if status == "TIMED_OUT" { return "timeout" } if status == "BUDGET_EXCEEDED" { return "quota" } low := strings.ToLower(msg) if strings.Contains(low, "quota") || strings.Contains(low, "exhausted") || strings.Contains(low, "terminalquota") { return "quota" } if strings.Contains(low, "rate limit") || strings.Contains(low, "429") || strings.Contains(low, "too many requests") { return "rate_limit" } if strings.Contains(low, "git push") || strings.Contains(low, "git pull") { return "git" } if strings.Contains(low, "timeout") || strings.Contains(low, "deadline") { return "timeout" } return "failed" } // ListRecentExecutions returns executions since the given time, joined with task names. // If taskID is non-empty, only executions for that task are returned. func (s *DB) ListRecentExecutions(since time.Time, limit int, taskID string) ([]*RecentExecution, error) { query := `SELECT e.id, e.task_id, t.name, e.status, e.start_time, e.end_time, e.exit_code, e.cost_usd, e.stdout_path FROM executions e JOIN tasks t ON e.task_id = t.id WHERE e.start_time >= ?` args := []interface{}{since.UTC()} if taskID != "" { query += " AND e.task_id = ?" args = append(args, taskID) } query += " ORDER BY e.start_time DESC LIMIT ?" args = append(args, limit) rows, err := s.db.Query(query, args...) if err != nil { return nil, err } defer rows.Close() var results []*RecentExecution for rows.Next() { var re RecentExecution var endTime time.Time var stdoutPath sql.NullString if err := rows.Scan(&re.ID, &re.TaskID, &re.TaskName, &re.State, &re.StartedAt, &endTime, &re.ExitCode, &re.CostUSD, &stdoutPath); err != nil { return nil, err } re.StdoutPath = stdoutPath.String if !endTime.IsZero() { re.FinishedAt = &endTime ms := endTime.Sub(re.StartedAt).Milliseconds() re.DurationMS = &ms } results = append(results, &re) } return results, rows.Err() } // UpdateTaskQuestion stores the pending question JSON on a task. // Pass empty string to clear the question after it has been answered. func (s *DB) UpdateTaskQuestion(taskID, questionJSON string) error { _, err := s.db.Exec(`UPDATE tasks SET question_json = ?, updated_at = ? WHERE id = ?`, questionJSON, time.Now().UTC(), taskID) return err } // UpdateTaskSummary stores the agent's final summary paragraph on a task. func (s *DB) UpdateTaskSummary(taskID, summary string) error { _, err := s.db.Exec(`UPDATE tasks SET summary = ?, updated_at = ? WHERE id = ?`, summary, time.Now().UTC(), taskID) return err } // AppendTaskInteraction appends a Q&A interaction to the task's interaction history. func (s *DB) AppendTaskInteraction(taskID string, interaction task.Interaction) error { tx, err := s.db.Begin() if err != nil { return err } defer tx.Rollback() //nolint:errcheck var raw sql.NullString if err := tx.QueryRow(`SELECT interactions_json FROM tasks WHERE id = ?`, taskID).Scan(&raw); err != nil { if err == sql.ErrNoRows { return fmt.Errorf("task %q not found", taskID) } return err } existing := raw.String if existing == "" { existing = "[]" } var interactions []task.Interaction if err := json.Unmarshal([]byte(existing), &interactions); err != nil { return fmt.Errorf("unmarshaling interactions: %w", err) } interactions = append(interactions, interaction) updated, err := json.Marshal(interactions) if err != nil { return fmt.Errorf("marshaling interactions: %w", err) } if _, err := tx.Exec(`UPDATE tasks SET interactions_json = ?, updated_at = ? WHERE id = ?`, string(updated), time.Now().UTC(), taskID); err != nil { return err } return tx.Commit() } // UpdateExecution updates a completed execution. func (s *DB) UpdateExecution(e *Execution) error { var changestatsJSON *string if e.Changestats != nil { b, err := json.Marshal(e.Changestats) if err != nil { return fmt.Errorf("marshaling changestats: %w", err) } s := string(b) changestatsJSON = &s } commitsJSON := "[]" if len(e.Commits) > 0 { b, err := json.Marshal(e.Commits) if err != nil { return fmt.Errorf("marshaling commits: %w", err) } commitsJSON = string(b) } _, err := s.db.Exec(` UPDATE executions SET end_time = ?, exit_code = ?, status = ?, cost_usd = ?, error_msg = ?, stdout_path = ?, stderr_path = ?, artifact_dir = ?, session_id = ?, sandbox_dir = ?, changestats_json = ?, commits_json = ? WHERE id = ?`, e.EndTime.UTC(), e.ExitCode, e.Status, e.CostUSD, e.ErrorMsg, e.StdoutPath, e.StderrPath, e.ArtifactDir, e.SessionID, e.SandboxDir, changestatsJSON, commitsJSON, e.ID, ) return err } type scanner interface { Scan(dest ...interface{}) error } func scanTask(row scanner) (*task.Task, error) { var ( t task.Task configJSON string retryJSON string tagsJSON string depsJSON string state string priority string timeoutNS int64 parentTaskID sql.NullString elaborationInput sql.NullString project sql.NullString repositoryURL sql.NullString rejectionComment sql.NullString questionJSON sql.NullString summary sql.NullString interactionsJSON sql.NullString ) err := row.Scan(&t.ID, &t.Name, &t.Description, &elaborationInput, &project, &repositoryURL, &configJSON, &priority, &timeoutNS, &retryJSON, &tagsJSON, &depsJSON, &parentTaskID, &state, &t.CreatedAt, &t.UpdatedAt, &rejectionComment, &questionJSON, &summary, &interactionsJSON) t.ParentTaskID = parentTaskID.String t.ElaborationInput = elaborationInput.String t.Project = project.String t.RepositoryURL = repositoryURL.String t.RejectionComment = rejectionComment.String t.QuestionJSON = questionJSON.String t.Summary = summary.String if err != nil { return nil, err } t.State = task.State(state) t.Priority = task.Priority(priority) t.Timeout.Duration = time.Duration(timeoutNS) // Add debug log for configJSON // The logger is not available directly in db.go, so I'll use fmt.Printf for now. // For production code, a logger should be injected. // fmt.Printf("DEBUG: configJSON from DB: %s\n", configJSON) // TODO: Replace with proper logger when available. if err := json.Unmarshal([]byte(configJSON), &t.Agent); err != nil { return nil, fmt.Errorf("unmarshaling agent config: %w", err) } if err := json.Unmarshal([]byte(retryJSON), &t.Retry); err != nil { return nil, fmt.Errorf("unmarshaling retry: %w", err) } if err := json.Unmarshal([]byte(tagsJSON), &t.Tags); err != nil { return nil, fmt.Errorf("unmarshaling tags: %w", err) } if err := json.Unmarshal([]byte(depsJSON), &t.DependsOn); err != nil { return nil, fmt.Errorf("unmarshaling depends_on: %w", err) } raw := interactionsJSON.String if raw == "" { raw = "[]" } if err := json.Unmarshal([]byte(raw), &t.Interactions); err != nil { return nil, fmt.Errorf("unmarshaling interactions: %w", err) } return &t, nil } func scanTaskRows(rows *sql.Rows) (*task.Task, error) { return scanTask(rows) } func scanExecution(row scanner) (*Execution, error) { var e Execution var sessionID sql.NullString var sandboxDir sql.NullString var changestatsJSON sql.NullString var commitsJSON sql.NullString err := row.Scan(&e.ID, &e.TaskID, &e.StartTime, &e.EndTime, &e.ExitCode, &e.Status, &e.StdoutPath, &e.StderrPath, &e.ArtifactDir, &e.CostUSD, &e.ErrorMsg, &sessionID, &sandboxDir, &changestatsJSON, &commitsJSON) if err != nil { return nil, err } e.SessionID = sessionID.String e.SandboxDir = sandboxDir.String if changestatsJSON.Valid && changestatsJSON.String != "" { var cs task.Changestats if err := json.Unmarshal([]byte(changestatsJSON.String), &cs); err != nil { return nil, fmt.Errorf("unmarshaling changestats: %w", err) } e.Changestats = &cs } if commitsJSON.Valid && commitsJSON.String != "" { if err := json.Unmarshal([]byte(commitsJSON.String), &e.Commits); err != nil { return nil, fmt.Errorf("unmarshaling commits: %w", err) } } else { e.Commits = []task.GitCommit{} } return &e, nil } // UpdateExecutionChangestats stores git change metrics for a completed execution. func (s *DB) UpdateExecutionChangestats(execID string, stats *task.Changestats) error { b, err := json.Marshal(stats) if err != nil { return fmt.Errorf("marshaling changestats: %w", err) } _, err = s.db.Exec(`UPDATE executions SET changestats_json = ? WHERE id = ?`, string(b), execID) return err } func scanExecutionRows(rows *sql.Rows) (*Execution, error) { return scanExecution(rows) } // PushSubscription represents a browser push subscription. type PushSubscription struct { ID string `json:"id"` Endpoint string `json:"endpoint"` P256DHKey string `json:"p256dh_key"` AuthKey string `json:"auth_key"` CreatedAt time.Time `json:"created_at"` } // SavePushSubscription inserts or replaces a push subscription by endpoint. func (s *DB) SavePushSubscription(sub PushSubscription) error { _, err := s.db.Exec(` INSERT INTO push_subscriptions (id, endpoint, p256dh_key, auth_key) VALUES (?, ?, ?, ?) ON CONFLICT(endpoint) DO UPDATE SET id = excluded.id, p256dh_key = excluded.p256dh_key, auth_key = excluded.auth_key`, sub.ID, sub.Endpoint, sub.P256DHKey, sub.AuthKey, ) return err } // DeletePushSubscription removes the subscription with the given endpoint. func (s *DB) DeletePushSubscription(endpoint string) error { _, err := s.db.Exec(`DELETE FROM push_subscriptions WHERE endpoint = ?`, endpoint) return err } // ListPushSubscriptions returns all registered push subscriptions. func (s *DB) ListPushSubscriptions() ([]PushSubscription, error) { rows, err := s.db.Query(`SELECT id, endpoint, p256dh_key, auth_key, created_at FROM push_subscriptions ORDER BY created_at`) if err != nil { return nil, err } defer rows.Close() var subs []PushSubscription for rows.Next() { var sub PushSubscription var createdAt string if err := rows.Scan(&sub.ID, &sub.Endpoint, &sub.P256DHKey, &sub.AuthKey, &createdAt); err != nil { return nil, err } // Parse created_at; ignore errors (use zero time on failure). for _, layout := range []string{time.RFC3339, "2006-01-02 15:04:05", "2006-01-02T15:04:05Z"} { if t, err := time.Parse(layout, createdAt); err == nil { sub.CreatedAt = t break } } subs = append(subs, sub) } if subs == nil { subs = []PushSubscription{} } return subs, rows.Err() } // GetSetting returns the value for a key, or ("", nil) if not found. func (s *DB) GetSetting(key string) (string, error) { var value string err := s.db.QueryRow(`SELECT value FROM settings WHERE key = ?`, key).Scan(&value) if err == sql.ErrNoRows { return "", nil } return value, err } // SetSetting upserts a key/value pair in the settings table. func (s *DB) SetSetting(key, value string) error { _, err := s.db.Exec(`INSERT INTO settings (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value = excluded.value`, key, value) return err } // AgentEvent records a rate-limit state change for an agent. type AgentEvent struct { ID string Agent string Event string // "rate_limited" | "available" Timestamp time.Time Until *time.Time // non-nil for "rate_limited" events Reason string // "transient" | "quota" } // RecordAgentEvent inserts an agent rate-limit event. func (s *DB) RecordAgentEvent(e AgentEvent) error { _, err := s.db.Exec( `INSERT INTO agent_events (id, agent, event, timestamp, until, reason) VALUES (?, ?, ?, ?, ?, ?)`, e.ID, e.Agent, e.Event, e.Timestamp.UTC(), timeOrNull(e.Until), e.Reason, ) return err } // ListAgentEvents returns agent events since the given time, newest first. func (s *DB) ListAgentEvents(since time.Time) ([]AgentEvent, error) { rows, err := s.db.Query( `SELECT id, agent, event, timestamp, until, reason FROM agent_events WHERE timestamp >= ? ORDER BY timestamp DESC LIMIT 500`, since.UTC(), ) if err != nil { return nil, err } defer rows.Close() var events []AgentEvent for rows.Next() { var e AgentEvent var until sql.NullTime var reason sql.NullString if err := rows.Scan(&e.ID, &e.Agent, &e.Event, &e.Timestamp, &until, &reason); err != nil { return nil, err } if until.Valid { e.Until = &until.Time } e.Reason = reason.String events = append(events, e) } return events, rows.Err() } func timeOrNull(t *time.Time) interface{} { if t == nil { return nil } return t.UTC() }