package storage import ( "database/sql" "encoding/json" "fmt" "strings" "time" "github.com/thepeterstone/claudomator/internal/task" _ "github.com/mattn/go-sqlite3" ) type DB struct { db *sql.DB } func Open(path string) (*DB, error) { db, err := sql.Open("sqlite3", path+"?_journal_mode=WAL&_busy_timeout=5000") if err != nil { return nil, fmt.Errorf("opening database: %w", err) } s := &DB{db: db} if err := s.migrate(); err != nil { db.Close() return nil, fmt.Errorf("running migrations: %w", err) } return s, nil } func (s *DB) Close() error { return s.db.Close() } func (s *DB) migrate() error { schema := ` CREATE TABLE IF NOT EXISTS tasks ( id TEXT PRIMARY KEY, name TEXT NOT NULL, description TEXT, config_json TEXT NOT NULL, priority TEXT NOT NULL DEFAULT 'normal', timeout_ns INTEGER NOT NULL DEFAULT 0, retry_json TEXT NOT NULL DEFAULT '{}', tags_json TEXT NOT NULL DEFAULT '[]', depends_on_json TEXT NOT NULL DEFAULT '[]', parent_task_id TEXT, state TEXT NOT NULL DEFAULT 'PENDING', created_at DATETIME NOT NULL, updated_at DATETIME NOT NULL ); CREATE TABLE IF NOT EXISTS executions ( id TEXT PRIMARY KEY, task_id TEXT NOT NULL, start_time DATETIME NOT NULL, end_time DATETIME, exit_code INTEGER, status TEXT NOT NULL, stdout_path TEXT, stderr_path TEXT, artifact_dir TEXT, cost_usd REAL, error_msg TEXT, FOREIGN KEY (task_id) REFERENCES tasks(id) ); CREATE INDEX IF NOT EXISTS idx_tasks_state ON tasks(state); CREATE INDEX IF NOT EXISTS idx_tasks_parent_task_id ON tasks(parent_task_id); CREATE INDEX IF NOT EXISTS idx_executions_status ON executions(status); CREATE INDEX IF NOT EXISTS idx_executions_task_id ON executions(task_id); CREATE INDEX IF NOT EXISTS idx_executions_start_time ON executions(start_time); ` if _, err := s.db.Exec(schema); err != nil { return err } // Additive migrations for columns added after initial schema. migrations := []string{ `ALTER TABLE tasks ADD COLUMN parent_task_id TEXT`, `ALTER TABLE tasks ADD COLUMN rejection_comment TEXT`, `ALTER TABLE tasks ADD COLUMN question_json TEXT`, `ALTER TABLE executions ADD COLUMN session_id TEXT`, } for _, m := range migrations { if _, err := s.db.Exec(m); err != nil { // SQLite returns an error if the column already exists; ignore it. if !isColumnExistsError(err) { return err } } } return nil } func isColumnExistsError(err error) bool { msg := err.Error() return strings.Contains(msg, "duplicate column name") || strings.Contains(msg, "already exists") } // CreateTask inserts a task into the database. func (s *DB) CreateTask(t *task.Task) error { configJSON, err := json.Marshal(t.Agent) if err != nil { return fmt.Errorf("marshaling config: %w", err) } retryJSON, err := json.Marshal(t.Retry) if err != nil { return fmt.Errorf("marshaling retry: %w", err) } tagsJSON, err := json.Marshal(t.Tags) if err != nil { return fmt.Errorf("marshaling tags: %w", err) } depsJSON, err := json.Marshal(t.DependsOn) if err != nil { return fmt.Errorf("marshaling depends_on: %w", err) } _, err = s.db.Exec(` INSERT INTO tasks (id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, t.ID, t.Name, t.Description, string(configJSON), string(t.Priority), t.Timeout.Duration.Nanoseconds(), string(retryJSON), string(tagsJSON), string(depsJSON), t.ParentTaskID, string(t.State), t.CreatedAt.UTC(), t.UpdatedAt.UTC(), ) return err } // GetTask retrieves a task by ID. func (s *DB) GetTask(id string) (*task.Task, error) { row := s.db.QueryRow(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json FROM tasks WHERE id = ?`, id) return scanTask(row) } // ListTasks returns tasks matching the given filter. func (s *DB) ListTasks(filter TaskFilter) ([]*task.Task, error) { query := `SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json FROM tasks WHERE 1=1` var args []interface{} if filter.State != "" { query += " AND state = ?" args = append(args, string(filter.State)) } query += " ORDER BY created_at DESC" if filter.Limit > 0 { query += " LIMIT ?" args = append(args, filter.Limit) } rows, err := s.db.Query(query, args...) if err != nil { return nil, err } defer rows.Close() var tasks []*task.Task for rows.Next() { t, err := scanTaskRows(rows) if err != nil { return nil, err } tasks = append(tasks, t) } return tasks, rows.Err() } // ListSubtasks returns all tasks whose parent_task_id matches the given ID. func (s *DB) ListSubtasks(parentID string) ([]*task.Task, error) { rows, err := s.db.Query(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json FROM tasks WHERE parent_task_id = ? ORDER BY created_at ASC`, parentID) if err != nil { return nil, err } defer rows.Close() var tasks []*task.Task for rows.Next() { t, err := scanTaskRows(rows) if err != nil { return nil, err } tasks = append(tasks, t) } return tasks, rows.Err() } // UpdateTaskState atomically updates a task's state, enforcing valid transitions. func (s *DB) UpdateTaskState(id string, newState task.State) error { tx, err := s.db.Begin() if err != nil { return err } defer tx.Rollback() //nolint:errcheck var currentState string if err := tx.QueryRow(`SELECT state FROM tasks WHERE id = ?`, id).Scan(¤tState); err != nil { if err == sql.ErrNoRows { return fmt.Errorf("task %q not found", id) } return err } if !task.ValidTransition(task.State(currentState), newState) { return fmt.Errorf("invalid state transition %s → %s for task %q", currentState, newState, id) } now := time.Now().UTC() if _, err := tx.Exec(`UPDATE tasks SET state = ?, updated_at = ? WHERE id = ?`, string(newState), now, id); err != nil { return err } return tx.Commit() } // RejectTask sets a task's state to PENDING and stores the rejection comment. func (s *DB) RejectTask(id, comment string) error { now := time.Now().UTC() result, err := s.db.Exec(`UPDATE tasks SET state = ?, rejection_comment = ?, updated_at = ? WHERE id = ?`, string(task.StatePending), comment, now, id) if err != nil { return err } n, err := result.RowsAffected() if err != nil { return err } if n == 0 { return fmt.Errorf("task %q not found", id) } return nil } // TaskUpdate holds the fields that UpdateTask may change. type TaskUpdate struct { Name string Description string Config task.AgentConfig Priority task.Priority TimeoutNS int64 Retry task.RetryConfig Tags []string DependsOn []string } // UpdateTask replaces editable fields on a task and resets its state to PENDING. // Returns an error if the task does not exist. func (s *DB) UpdateTask(id string, u TaskUpdate) error { configJSON, err := json.Marshal(u.Config) if err != nil { return fmt.Errorf("marshaling config: %w", err) } retryJSON, err := json.Marshal(u.Retry) if err != nil { return fmt.Errorf("marshaling retry: %w", err) } tags := u.Tags if tags == nil { tags = []string{} } tagsJSON, err := json.Marshal(tags) if err != nil { return fmt.Errorf("marshaling tags: %w", err) } deps := u.DependsOn if deps == nil { deps = []string{} } depsJSON, err := json.Marshal(deps) if err != nil { return fmt.Errorf("marshaling depends_on: %w", err) } now := time.Now().UTC() result, err := s.db.Exec(` UPDATE tasks SET name = ?, description = ?, config_json = ?, priority = ?, timeout_ns = ?, retry_json = ?, tags_json = ?, depends_on_json = ?, state = ?, updated_at = ? WHERE id = ?`, u.Name, u.Description, string(configJSON), string(u.Priority), u.TimeoutNS, string(retryJSON), string(tagsJSON), string(depsJSON), string(task.StatePending), now, id, ) if err != nil { return err } n, err := result.RowsAffected() if err != nil { return err } if n == 0 { return fmt.Errorf("task %q not found", id) } return nil } // TaskFilter specifies criteria for listing tasks. type TaskFilter struct { State task.State Limit int } // Execution represents a single run of a task. type Execution struct { ID string TaskID string StartTime time.Time EndTime time.Time ExitCode int Status string StdoutPath string StderrPath string ArtifactDir string CostUSD float64 ErrorMsg string SessionID string // claude --session-id; persisted for resume // In-memory only: set when creating a resume execution, not stored in DB. ResumeSessionID string ResumeAnswer string } // CreateExecution inserts an execution record. func (s *DB) CreateExecution(e *Execution) error { _, err := s.db.Exec(` INSERT INTO executions (id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, e.ID, e.TaskID, e.StartTime.UTC(), e.EndTime.UTC(), e.ExitCode, e.Status, e.StdoutPath, e.StderrPath, e.ArtifactDir, e.CostUSD, e.ErrorMsg, e.SessionID, ) return err } // GetExecution retrieves an execution by ID. func (s *DB) GetExecution(id string) (*Execution, error) { row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id FROM executions WHERE id = ?`, id) return scanExecution(row) } // ListExecutions returns executions for a task. func (s *DB) ListExecutions(taskID string) ([]*Execution, error) { rows, err := s.db.Query(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id FROM executions WHERE task_id = ? ORDER BY start_time DESC`, taskID) if err != nil { return nil, err } defer rows.Close() var execs []*Execution for rows.Next() { e, err := scanExecutionRows(rows) if err != nil { return nil, err } execs = append(execs, e) } return execs, rows.Err() } // GetLatestExecution returns the most recent execution for a task. func (s *DB) GetLatestExecution(taskID string) (*Execution, error) { row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id FROM executions WHERE task_id = ? ORDER BY start_time DESC LIMIT 1`, taskID) return scanExecution(row) } // DeleteTask removes a task and all its executions and subtasks (recursively). // Returns an error if the task does not exist. All deletes run in a single // transaction so a partial failure cannot leave orphaned executions. func (s *DB) DeleteTask(id string) error { tx, err := s.db.Begin() if err != nil { return err } defer tx.Rollback() //nolint:errcheck // Collect all task IDs (root + all descendants) via recursive CTE. rows, err := tx.Query(` WITH RECURSIVE subtasks(id) AS ( SELECT id FROM tasks WHERE id = ? UNION ALL SELECT t.id FROM tasks t JOIN subtasks s ON t.parent_task_id = s.id ) SELECT id FROM subtasks`, id) if err != nil { return fmt.Errorf("collecting subtask ids: %w", err) } var toDelete []string for rows.Next() { var tid string if err := rows.Scan(&tid); err != nil { rows.Close() return err } toDelete = append(toDelete, tid) } rows.Close() if err := rows.Err(); err != nil { return err } if len(toDelete) == 0 { return fmt.Errorf("task %q not found", id) } // Delete executions for all collected tasks, then the tasks themselves. for _, tid := range toDelete { if _, err := tx.Exec(`DELETE FROM executions WHERE task_id = ?`, tid); err != nil { return fmt.Errorf("deleting executions for task %q: %w", tid, err) } } for _, tid := range toDelete { if _, err := tx.Exec(`DELETE FROM tasks WHERE id = ?`, tid); err != nil { return fmt.Errorf("deleting task %q: %w", tid, err) } } return tx.Commit() } // RecentExecution is returned by ListRecentExecutions (JOIN with tasks for name). type RecentExecution struct { ID string `json:"id"` TaskID string `json:"task_id"` TaskName string `json:"task_name"` State string `json:"state"` StartedAt time.Time `json:"started_at"` FinishedAt *time.Time `json:"finished_at,omitempty"` DurationMS *int64 `json:"duration_ms,omitempty"` ExitCode int `json:"exit_code"` CostUSD float64 `json:"cost_usd"` StdoutPath string `json:"stdout_path"` } // ListRecentExecutions returns executions since the given time, joined with task names. // If taskID is non-empty, only executions for that task are returned. func (s *DB) ListRecentExecutions(since time.Time, limit int, taskID string) ([]*RecentExecution, error) { query := `SELECT e.id, e.task_id, t.name, e.status, e.start_time, e.end_time, e.exit_code, e.cost_usd, e.stdout_path FROM executions e JOIN tasks t ON e.task_id = t.id WHERE e.start_time >= ?` args := []interface{}{since.UTC()} if taskID != "" { query += " AND e.task_id = ?" args = append(args, taskID) } query += " ORDER BY e.start_time DESC LIMIT ?" args = append(args, limit) rows, err := s.db.Query(query, args...) if err != nil { return nil, err } defer rows.Close() var results []*RecentExecution for rows.Next() { var re RecentExecution var endTime time.Time var stdoutPath sql.NullString if err := rows.Scan(&re.ID, &re.TaskID, &re.TaskName, &re.State, &re.StartedAt, &endTime, &re.ExitCode, &re.CostUSD, &stdoutPath); err != nil { return nil, err } re.StdoutPath = stdoutPath.String if !endTime.IsZero() { re.FinishedAt = &endTime ms := endTime.Sub(re.StartedAt).Milliseconds() re.DurationMS = &ms } results = append(results, &re) } return results, rows.Err() } // UpdateTaskQuestion stores the pending question JSON on a task. // Pass empty string to clear the question after it has been answered. func (s *DB) UpdateTaskQuestion(taskID, questionJSON string) error { _, err := s.db.Exec(`UPDATE tasks SET question_json = ?, updated_at = ? WHERE id = ?`, questionJSON, time.Now().UTC(), taskID) return err } // UpdateExecution updates a completed execution. func (s *DB) UpdateExecution(e *Execution) error { _, err := s.db.Exec(` UPDATE executions SET end_time = ?, exit_code = ?, status = ?, cost_usd = ?, error_msg = ?, stdout_path = ?, stderr_path = ?, artifact_dir = ?, session_id = ? WHERE id = ?`, e.EndTime.UTC(), e.ExitCode, e.Status, e.CostUSD, e.ErrorMsg, e.StdoutPath, e.StderrPath, e.ArtifactDir, e.SessionID, e.ID, ) return err } type scanner interface { Scan(dest ...interface{}) error } func scanTask(row scanner) (*task.Task, error) { var ( t task.Task configJSON string retryJSON string tagsJSON string depsJSON string state string priority string timeoutNS int64 parentTaskID sql.NullString rejectionComment sql.NullString questionJSON sql.NullString ) err := row.Scan(&t.ID, &t.Name, &t.Description, &configJSON, &priority, &timeoutNS, &retryJSON, &tagsJSON, &depsJSON, &parentTaskID, &state, &t.CreatedAt, &t.UpdatedAt, &rejectionComment, &questionJSON) t.ParentTaskID = parentTaskID.String t.RejectionComment = rejectionComment.String t.QuestionJSON = questionJSON.String if err != nil { return nil, err } t.State = task.State(state) t.Priority = task.Priority(priority) t.Timeout.Duration = time.Duration(timeoutNS) if err := json.Unmarshal([]byte(configJSON), &t.Agent); err != nil { return nil, fmt.Errorf("unmarshaling agent config: %w", err) } // Fallback for legacy 'claude' field if t.Agent.Instructions == "" { var legacy struct { Claude task.AgentConfig `json:"claude"` } if err := json.Unmarshal([]byte(configJSON), &legacy); err == nil && legacy.Claude.Instructions != "" { t.Agent = legacy.Claude } } if err := json.Unmarshal([]byte(retryJSON), &t.Retry); err != nil { return nil, fmt.Errorf("unmarshaling retry: %w", err) } if err := json.Unmarshal([]byte(tagsJSON), &t.Tags); err != nil { return nil, fmt.Errorf("unmarshaling tags: %w", err) } if err := json.Unmarshal([]byte(depsJSON), &t.DependsOn); err != nil { return nil, fmt.Errorf("unmarshaling depends_on: %w", err) } return &t, nil } func scanTaskRows(rows *sql.Rows) (*task.Task, error) { return scanTask(rows) } func scanExecution(row scanner) (*Execution, error) { var e Execution var sessionID sql.NullString err := row.Scan(&e.ID, &e.TaskID, &e.StartTime, &e.EndTime, &e.ExitCode, &e.Status, &e.StdoutPath, &e.StderrPath, &e.ArtifactDir, &e.CostUSD, &e.ErrorMsg, &sessionID) if err != nil { return nil, err } e.SessionID = sessionID.String return &e, nil } func scanExecutionRows(rows *sql.Rows) (*Execution, error) { return scanExecution(rows) }