package storage import ( "database/sql" "encoding/json" "fmt" "strings" "time" "github.com/thepeterstone/claudomator/internal/task" _ "github.com/mattn/go-sqlite3" ) type DB struct { db *sql.DB } func Open(path string) (*DB, error) { db, err := sql.Open("sqlite3", path+"?_journal_mode=WAL&_busy_timeout=5000") if err != nil { return nil, fmt.Errorf("opening database: %w", err) } s := &DB{db: db} if err := s.migrate(); err != nil { db.Close() return nil, fmt.Errorf("running migrations: %w", err) } return s, nil } func (s *DB) Close() error { return s.db.Close() } func (s *DB) migrate() error { schema := ` CREATE TABLE IF NOT EXISTS tasks ( id TEXT PRIMARY KEY, name TEXT NOT NULL, description TEXT, config_json TEXT NOT NULL, priority TEXT NOT NULL DEFAULT 'normal', timeout_ns INTEGER NOT NULL DEFAULT 0, retry_json TEXT NOT NULL DEFAULT '{}', tags_json TEXT NOT NULL DEFAULT '[]', depends_on_json TEXT NOT NULL DEFAULT '[]', parent_task_id TEXT, state TEXT NOT NULL DEFAULT 'PENDING', created_at DATETIME NOT NULL, updated_at DATETIME NOT NULL ); CREATE TABLE IF NOT EXISTS executions ( id TEXT PRIMARY KEY, task_id TEXT NOT NULL, start_time DATETIME NOT NULL, end_time DATETIME, exit_code INTEGER, status TEXT NOT NULL, stdout_path TEXT, stderr_path TEXT, artifact_dir TEXT, cost_usd REAL, error_msg TEXT, FOREIGN KEY (task_id) REFERENCES tasks(id) ); CREATE INDEX IF NOT EXISTS idx_tasks_state ON tasks(state); CREATE INDEX IF NOT EXISTS idx_executions_status ON executions(status); CREATE INDEX IF NOT EXISTS idx_executions_task_id ON executions(task_id); CREATE TABLE IF NOT EXISTS templates ( id TEXT PRIMARY KEY, name TEXT NOT NULL, description TEXT, config_json TEXT NOT NULL DEFAULT '{}', timeout TEXT, priority TEXT NOT NULL DEFAULT 'normal', tags_json TEXT NOT NULL DEFAULT '[]', created_at DATETIME NOT NULL, updated_at DATETIME NOT NULL ); ` if _, err := s.db.Exec(schema); err != nil { return err } // Additive migrations for columns added after initial schema. migrations := []string{ `ALTER TABLE tasks ADD COLUMN parent_task_id TEXT`, `ALTER TABLE tasks ADD COLUMN rejection_comment TEXT`, } for _, m := range migrations { if _, err := s.db.Exec(m); err != nil { // SQLite returns an error if the column already exists; ignore it. if !isColumnExistsError(err) { return err } } } return nil } func isColumnExistsError(err error) bool { msg := err.Error() return strings.Contains(msg, "duplicate column name") || strings.Contains(msg, "already exists") } // CreateTask inserts a task into the database. func (s *DB) CreateTask(t *task.Task) error { configJSON, err := json.Marshal(t.Claude) if err != nil { return fmt.Errorf("marshaling config: %w", err) } retryJSON, err := json.Marshal(t.Retry) if err != nil { return fmt.Errorf("marshaling retry: %w", err) } tagsJSON, err := json.Marshal(t.Tags) if err != nil { return fmt.Errorf("marshaling tags: %w", err) } depsJSON, err := json.Marshal(t.DependsOn) if err != nil { return fmt.Errorf("marshaling depends_on: %w", err) } _, err = s.db.Exec(` INSERT INTO tasks (id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, t.ID, t.Name, t.Description, string(configJSON), string(t.Priority), t.Timeout.Duration.Nanoseconds(), string(retryJSON), string(tagsJSON), string(depsJSON), t.ParentTaskID, string(t.State), t.CreatedAt.UTC(), t.UpdatedAt.UTC(), ) return err } // GetTask retrieves a task by ID. func (s *DB) GetTask(id string) (*task.Task, error) { row := s.db.QueryRow(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment FROM tasks WHERE id = ?`, id) return scanTask(row) } // ListTasks returns tasks matching the given filter. func (s *DB) ListTasks(filter TaskFilter) ([]*task.Task, error) { query := `SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment FROM tasks WHERE 1=1` var args []interface{} if filter.State != "" { query += " AND state = ?" args = append(args, string(filter.State)) } query += " ORDER BY created_at DESC" if filter.Limit > 0 { query += " LIMIT ?" args = append(args, filter.Limit) } rows, err := s.db.Query(query, args...) if err != nil { return nil, err } defer rows.Close() var tasks []*task.Task for rows.Next() { t, err := scanTaskRows(rows) if err != nil { return nil, err } tasks = append(tasks, t) } return tasks, rows.Err() } // ListSubtasks returns all tasks whose parent_task_id matches the given ID. func (s *DB) ListSubtasks(parentID string) ([]*task.Task, error) { rows, err := s.db.Query(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment FROM tasks WHERE parent_task_id = ? ORDER BY created_at ASC`, parentID) if err != nil { return nil, err } defer rows.Close() var tasks []*task.Task for rows.Next() { t, err := scanTaskRows(rows) if err != nil { return nil, err } tasks = append(tasks, t) } return tasks, rows.Err() } // UpdateTaskState atomically updates a task's state. func (s *DB) UpdateTaskState(id string, newState task.State) error { now := time.Now().UTC() result, err := s.db.Exec(`UPDATE tasks SET state = ?, updated_at = ? WHERE id = ?`, string(newState), now, id) if err != nil { return err } n, err := result.RowsAffected() if err != nil { return err } if n == 0 { return fmt.Errorf("task %q not found", id) } return nil } // RejectTask sets a task's state to PENDING and stores the rejection comment. func (s *DB) RejectTask(id, comment string) error { now := time.Now().UTC() result, err := s.db.Exec(`UPDATE tasks SET state = ?, rejection_comment = ?, updated_at = ? WHERE id = ?`, string(task.StatePending), comment, now, id) if err != nil { return err } n, err := result.RowsAffected() if err != nil { return err } if n == 0 { return fmt.Errorf("task %q not found", id) } return nil } // TaskUpdate holds the fields that UpdateTask may change. type TaskUpdate struct { Name string Description string Config task.ClaudeConfig Priority task.Priority TimeoutNS int64 Retry task.RetryConfig Tags []string DependsOn []string } // UpdateTask replaces editable fields on a task and resets its state to PENDING. // Returns an error if the task does not exist. func (s *DB) UpdateTask(id string, u TaskUpdate) error { configJSON, err := json.Marshal(u.Config) if err != nil { return fmt.Errorf("marshaling config: %w", err) } retryJSON, err := json.Marshal(u.Retry) if err != nil { return fmt.Errorf("marshaling retry: %w", err) } tags := u.Tags if tags == nil { tags = []string{} } tagsJSON, err := json.Marshal(tags) if err != nil { return fmt.Errorf("marshaling tags: %w", err) } deps := u.DependsOn if deps == nil { deps = []string{} } depsJSON, err := json.Marshal(deps) if err != nil { return fmt.Errorf("marshaling depends_on: %w", err) } now := time.Now().UTC() result, err := s.db.Exec(` UPDATE tasks SET name = ?, description = ?, config_json = ?, priority = ?, timeout_ns = ?, retry_json = ?, tags_json = ?, depends_on_json = ?, state = ?, updated_at = ? WHERE id = ?`, u.Name, u.Description, string(configJSON), string(u.Priority), u.TimeoutNS, string(retryJSON), string(tagsJSON), string(depsJSON), string(task.StatePending), now, id, ) if err != nil { return err } n, err := result.RowsAffected() if err != nil { return err } if n == 0 { return fmt.Errorf("task %q not found", id) } return nil } // TaskFilter specifies criteria for listing tasks. type TaskFilter struct { State task.State Limit int } // Execution represents a single run of a task. type Execution struct { ID string TaskID string StartTime time.Time EndTime time.Time ExitCode int Status string StdoutPath string StderrPath string ArtifactDir string CostUSD float64 ErrorMsg string } // CreateExecution inserts an execution record. func (s *DB) CreateExecution(e *Execution) error { _, err := s.db.Exec(` INSERT INTO executions (id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, e.ID, e.TaskID, e.StartTime.UTC(), e.EndTime.UTC(), e.ExitCode, e.Status, e.StdoutPath, e.StderrPath, e.ArtifactDir, e.CostUSD, e.ErrorMsg, ) return err } // GetExecution retrieves an execution by ID. func (s *DB) GetExecution(id string) (*Execution, error) { row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg FROM executions WHERE id = ?`, id) return scanExecution(row) } // ListExecutions returns executions for a task. func (s *DB) ListExecutions(taskID string) ([]*Execution, error) { rows, err := s.db.Query(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg FROM executions WHERE task_id = ? ORDER BY start_time DESC`, taskID) if err != nil { return nil, err } defer rows.Close() var execs []*Execution for rows.Next() { e, err := scanExecutionRows(rows) if err != nil { return nil, err } execs = append(execs, e) } return execs, rows.Err() } // UpdateExecution updates a completed execution. func (s *DB) UpdateExecution(e *Execution) error { _, err := s.db.Exec(` UPDATE executions SET end_time = ?, exit_code = ?, status = ?, cost_usd = ?, error_msg = ?, stdout_path = ?, stderr_path = ?, artifact_dir = ? WHERE id = ?`, e.EndTime.UTC(), e.ExitCode, e.Status, e.CostUSD, e.ErrorMsg, e.StdoutPath, e.StderrPath, e.ArtifactDir, e.ID, ) return err } type scanner interface { Scan(dest ...interface{}) error } func scanTask(row scanner) (*task.Task, error) { var ( t task.Task configJSON string retryJSON string tagsJSON string depsJSON string state string priority string timeoutNS int64 parentTaskID sql.NullString rejectionComment sql.NullString ) err := row.Scan(&t.ID, &t.Name, &t.Description, &configJSON, &priority, &timeoutNS, &retryJSON, &tagsJSON, &depsJSON, &parentTaskID, &state, &t.CreatedAt, &t.UpdatedAt, &rejectionComment) t.ParentTaskID = parentTaskID.String t.RejectionComment = rejectionComment.String if err != nil { return nil, err } t.State = task.State(state) t.Priority = task.Priority(priority) t.Timeout.Duration = time.Duration(timeoutNS) if err := json.Unmarshal([]byte(configJSON), &t.Claude); err != nil { return nil, fmt.Errorf("unmarshaling config: %w", err) } if err := json.Unmarshal([]byte(retryJSON), &t.Retry); err != nil { return nil, fmt.Errorf("unmarshaling retry: %w", err) } if err := json.Unmarshal([]byte(tagsJSON), &t.Tags); err != nil { return nil, fmt.Errorf("unmarshaling tags: %w", err) } if err := json.Unmarshal([]byte(depsJSON), &t.DependsOn); err != nil { return nil, fmt.Errorf("unmarshaling depends_on: %w", err) } return &t, nil } func scanTaskRows(rows *sql.Rows) (*task.Task, error) { return scanTask(rows) } func scanExecution(row scanner) (*Execution, error) { var e Execution err := row.Scan(&e.ID, &e.TaskID, &e.StartTime, &e.EndTime, &e.ExitCode, &e.Status, &e.StdoutPath, &e.StderrPath, &e.ArtifactDir, &e.CostUSD, &e.ErrorMsg) if err != nil { return nil, err } return &e, nil } func scanExecutionRows(rows *sql.Rows) (*Execution, error) { return scanExecution(rows) }