package storage import ( "database/sql" "encoding/json" "fmt" "time" "github.com/thepeterstone/claudomator/internal/task" _ "github.com/mattn/go-sqlite3" ) type DB struct { db *sql.DB } func Open(path string) (*DB, error) { db, err := sql.Open("sqlite3", path+"?_journal_mode=WAL&_busy_timeout=5000") if err != nil { return nil, fmt.Errorf("opening database: %w", err) } s := &DB{db: db} if err := s.migrate(); err != nil { db.Close() return nil, fmt.Errorf("running migrations: %w", err) } return s, nil } func (s *DB) Close() error { return s.db.Close() } func (s *DB) migrate() error { schema := ` CREATE TABLE IF NOT EXISTS tasks ( id TEXT PRIMARY KEY, name TEXT NOT NULL, description TEXT, config_json TEXT NOT NULL, priority TEXT NOT NULL DEFAULT 'normal', timeout_ns INTEGER NOT NULL DEFAULT 0, retry_json TEXT NOT NULL DEFAULT '{}', tags_json TEXT NOT NULL DEFAULT '[]', depends_on_json TEXT NOT NULL DEFAULT '[]', state TEXT NOT NULL DEFAULT 'PENDING', created_at DATETIME NOT NULL, updated_at DATETIME NOT NULL ); CREATE TABLE IF NOT EXISTS executions ( id TEXT PRIMARY KEY, task_id TEXT NOT NULL, start_time DATETIME NOT NULL, end_time DATETIME, exit_code INTEGER, status TEXT NOT NULL, stdout_path TEXT, stderr_path TEXT, artifact_dir TEXT, cost_usd REAL, error_msg TEXT, FOREIGN KEY (task_id) REFERENCES tasks(id) ); CREATE INDEX IF NOT EXISTS idx_tasks_state ON tasks(state); CREATE INDEX IF NOT EXISTS idx_executions_status ON executions(status); CREATE INDEX IF NOT EXISTS idx_executions_task_id ON executions(task_id); ` _, err := s.db.Exec(schema) return err } // CreateTask inserts a task into the database. func (s *DB) CreateTask(t *task.Task) error { configJSON, err := json.Marshal(t.Claude) if err != nil { return fmt.Errorf("marshaling config: %w", err) } retryJSON, err := json.Marshal(t.Retry) if err != nil { return fmt.Errorf("marshaling retry: %w", err) } tagsJSON, err := json.Marshal(t.Tags) if err != nil { return fmt.Errorf("marshaling tags: %w", err) } depsJSON, err := json.Marshal(t.DependsOn) if err != nil { return fmt.Errorf("marshaling depends_on: %w", err) } _, err = s.db.Exec(` INSERT INTO tasks (id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, state, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, t.ID, t.Name, t.Description, string(configJSON), string(t.Priority), t.Timeout.Duration.Nanoseconds(), string(retryJSON), string(tagsJSON), string(depsJSON), string(t.State), t.CreatedAt.UTC(), t.UpdatedAt.UTC(), ) return err } // GetTask retrieves a task by ID. func (s *DB) GetTask(id string) (*task.Task, error) { row := s.db.QueryRow(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, state, created_at, updated_at FROM tasks WHERE id = ?`, id) return scanTask(row) } // ListTasks returns tasks matching the given filter. func (s *DB) ListTasks(filter TaskFilter) ([]*task.Task, error) { query := `SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, state, created_at, updated_at FROM tasks WHERE 1=1` var args []interface{} if filter.State != "" { query += " AND state = ?" args = append(args, string(filter.State)) } query += " ORDER BY created_at DESC" if filter.Limit > 0 { query += " LIMIT ?" args = append(args, filter.Limit) } rows, err := s.db.Query(query, args...) if err != nil { return nil, err } defer rows.Close() var tasks []*task.Task for rows.Next() { t, err := scanTaskRows(rows) if err != nil { return nil, err } tasks = append(tasks, t) } return tasks, rows.Err() } // UpdateTaskState atomically updates a task's state. func (s *DB) UpdateTaskState(id string, newState task.State) error { now := time.Now().UTC() result, err := s.db.Exec(`UPDATE tasks SET state = ?, updated_at = ? WHERE id = ?`, string(newState), now, id) if err != nil { return err } n, err := result.RowsAffected() if err != nil { return err } if n == 0 { return fmt.Errorf("task %q not found", id) } return nil } // TaskFilter specifies criteria for listing tasks. type TaskFilter struct { State task.State Limit int } // Execution represents a single run of a task. type Execution struct { ID string TaskID string StartTime time.Time EndTime time.Time ExitCode int Status string StdoutPath string StderrPath string ArtifactDir string CostUSD float64 ErrorMsg string } // CreateExecution inserts an execution record. func (s *DB) CreateExecution(e *Execution) error { _, err := s.db.Exec(` INSERT INTO executions (id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, e.ID, e.TaskID, e.StartTime.UTC(), e.EndTime.UTC(), e.ExitCode, e.Status, e.StdoutPath, e.StderrPath, e.ArtifactDir, e.CostUSD, e.ErrorMsg, ) return err } // GetExecution retrieves an execution by ID. func (s *DB) GetExecution(id string) (*Execution, error) { row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg FROM executions WHERE id = ?`, id) return scanExecution(row) } // ListExecutions returns executions for a task. func (s *DB) ListExecutions(taskID string) ([]*Execution, error) { rows, err := s.db.Query(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg FROM executions WHERE task_id = ? ORDER BY start_time DESC`, taskID) if err != nil { return nil, err } defer rows.Close() var execs []*Execution for rows.Next() { e, err := scanExecutionRows(rows) if err != nil { return nil, err } execs = append(execs, e) } return execs, rows.Err() } // UpdateExecution updates a completed execution. func (s *DB) UpdateExecution(e *Execution) error { _, err := s.db.Exec(` UPDATE executions SET end_time = ?, exit_code = ?, status = ?, cost_usd = ?, error_msg = ? WHERE id = ?`, e.EndTime.UTC(), e.ExitCode, e.Status, e.CostUSD, e.ErrorMsg, e.ID, ) return err } type scanner interface { Scan(dest ...interface{}) error } func scanTask(row scanner) (*task.Task, error) { var ( t task.Task configJSON string retryJSON string tagsJSON string depsJSON string state string priority string timeoutNS int64 ) err := row.Scan(&t.ID, &t.Name, &t.Description, &configJSON, &priority, &timeoutNS, &retryJSON, &tagsJSON, &depsJSON, &state, &t.CreatedAt, &t.UpdatedAt) if err != nil { return nil, err } t.State = task.State(state) t.Priority = task.Priority(priority) t.Timeout.Duration = time.Duration(timeoutNS) if err := json.Unmarshal([]byte(configJSON), &t.Claude); err != nil { return nil, fmt.Errorf("unmarshaling config: %w", err) } if err := json.Unmarshal([]byte(retryJSON), &t.Retry); err != nil { return nil, fmt.Errorf("unmarshaling retry: %w", err) } if err := json.Unmarshal([]byte(tagsJSON), &t.Tags); err != nil { return nil, fmt.Errorf("unmarshaling tags: %w", err) } if err := json.Unmarshal([]byte(depsJSON), &t.DependsOn); err != nil { return nil, fmt.Errorf("unmarshaling depends_on: %w", err) } return &t, nil } func scanTaskRows(rows *sql.Rows) (*task.Task, error) { return scanTask(rows) } func scanExecution(row scanner) (*Execution, error) { var e Execution err := row.Scan(&e.ID, &e.TaskID, &e.StartTime, &e.EndTime, &e.ExitCode, &e.Status, &e.StdoutPath, &e.StderrPath, &e.ArtifactDir, &e.CostUSD, &e.ErrorMsg) if err != nil { return nil, err } return &e, nil } func scanExecutionRows(rows *sql.Rows) (*Execution, error) { return scanExecution(rows) }