diff options
Diffstat (limited to 'internal/storage/db.go')
| -rw-r--r-- | internal/storage/db.go | 569 |
1 files changed, 551 insertions, 18 deletions
diff --git a/internal/storage/db.go b/internal/storage/db.go index ce60e2f..4adc1ba 100644 --- a/internal/storage/db.go +++ b/internal/storage/db.go @@ -8,7 +8,6 @@ import ( "time" "github.com/thepeterstone/claudomator/internal/task" - _ "github.com/mattn/go-sqlite3" ) type DB struct { @@ -20,6 +19,10 @@ func Open(path string) (*DB, error) { if err != nil { return nil, fmt.Errorf("opening database: %w", err) } + // SQLite only allows one concurrent writer. Limiting to one open connection + // prevents "database is locked" errors when multiple goroutines write + // simultaneously via database/sql's connection pool. + db.SetMaxOpenConns(1) s := &DB{db: db} if err := s.migrate(); err != nil { db.Close() @@ -86,6 +89,54 @@ func (s *DB) migrate() error { `ALTER TABLE executions ADD COLUMN changestats_json TEXT`, `ALTER TABLE executions ADD COLUMN commits_json TEXT NOT NULL DEFAULT '[]'`, `ALTER TABLE tasks ADD COLUMN elaboration_input TEXT`, + `ALTER TABLE tasks ADD COLUMN project TEXT`, + `ALTER TABLE tasks ADD COLUMN repository_url TEXT`, + `CREATE TABLE IF NOT EXISTS push_subscriptions ( + id TEXT PRIMARY KEY, + endpoint TEXT NOT NULL UNIQUE, + p256dh_key TEXT NOT NULL, + auth_key TEXT NOT NULL, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP + )`, + `CREATE TABLE IF NOT EXISTS settings ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL + )`, + `CREATE TABLE IF NOT EXISTS agent_events ( + id TEXT PRIMARY KEY, + agent TEXT NOT NULL, + event TEXT NOT NULL, + timestamp DATETIME NOT NULL, + until DATETIME, + reason TEXT + )`, + `CREATE INDEX IF NOT EXISTS idx_agent_events_agent ON agent_events(agent)`, + `CREATE INDEX IF NOT EXISTS idx_agent_events_timestamp ON agent_events(timestamp)`, + `CREATE TABLE IF NOT EXISTS projects ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + remote_url TEXT NOT NULL DEFAULT '', + local_path TEXT NOT NULL DEFAULT '', + type TEXT NOT NULL DEFAULT 'web', + deploy_script TEXT NOT NULL DEFAULT '', + created_at DATETIME NOT NULL, + updated_at DATETIME NOT NULL + )`, + `CREATE TABLE IF NOT EXISTS stories ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + project_id TEXT NOT NULL DEFAULT '', + branch_name TEXT NOT NULL DEFAULT '', + deploy_config TEXT NOT NULL DEFAULT '', + validation_json TEXT NOT NULL DEFAULT '', + status TEXT NOT NULL DEFAULT 'PENDING', + created_at DATETIME NOT NULL, + updated_at DATETIME NOT NULL + )`, + `ALTER TABLE tasks ADD COLUMN story_id TEXT`, + `ALTER TABLE tasks ADD COLUMN acceptance_criteria TEXT NOT NULL DEFAULT ''`, + `ALTER TABLE tasks ADD COLUMN checker_for_task_id TEXT NOT NULL DEFAULT ''`, + `ALTER TABLE tasks ADD COLUMN checker_report TEXT NOT NULL DEFAULT ''`, `ALTER TABLE executions ADD COLUMN tokens_in INTEGER`, `ALTER TABLE executions ADD COLUMN tokens_out INTEGER`, } @@ -125,24 +176,25 @@ func (s *DB) CreateTask(t *task.Task) error { } _, err = s.db.Exec(` - INSERT INTO tasks (id, name, description, elaboration_input, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, - t.ID, t.Name, t.Description, t.ElaborationInput, string(configJSON), string(t.Priority), + INSERT INTO tasks (id, name, description, elaboration_input, project, repository_url, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, story_id, acceptance_criteria, checker_for_task_id, checker_report) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + t.ID, t.Name, t.Description, t.ElaborationInput, t.Project, t.RepositoryURL, string(configJSON), string(t.Priority), t.Timeout.Duration.Nanoseconds(), string(retryJSON), string(tagsJSON), string(depsJSON), - t.ParentTaskID, string(t.State), t.CreatedAt.UTC(), t.UpdatedAt.UTC(), + t.ParentTaskID, string(t.State), t.CreatedAt.UTC(), t.UpdatedAt.UTC(), t.StoryID, + t.AcceptanceCriteria, t.CheckerForTaskID, t.CheckerReport, ) return err } // GetTask retrieves a task by ID. func (s *DB) GetTask(id string) (*task.Task, error) { - row := s.db.QueryRow(`SELECT id, name, description, elaboration_input, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json, summary, interactions_json FROM tasks WHERE id = ?`, id) + row := s.db.QueryRow(`SELECT id, name, description, elaboration_input, project, repository_url, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json, summary, interactions_json, story_id, acceptance_criteria, checker_for_task_id, checker_report FROM tasks WHERE id = ?`, id) return scanTask(row) } // ListTasks returns tasks matching the given filter. func (s *DB) ListTasks(filter TaskFilter) ([]*task.Task, error) { - query := `SELECT id, name, description, elaboration_input, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json, summary, interactions_json FROM tasks WHERE 1=1` + query := `SELECT id, name, description, elaboration_input, project, repository_url, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json, summary, interactions_json, story_id, acceptance_criteria, checker_for_task_id, checker_report FROM tasks WHERE 1=1` var args []interface{} if filter.State != "" { @@ -178,7 +230,7 @@ func (s *DB) ListTasks(filter TaskFilter) ([]*task.Task, error) { // ListSubtasks returns all tasks whose parent_task_id matches the given ID. func (s *DB) ListSubtasks(parentID string) ([]*task.Task, error) { - rows, err := s.db.Query(`SELECT id, name, description, elaboration_input, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json, summary, interactions_json FROM tasks WHERE parent_task_id = ? ORDER BY created_at ASC`, parentID) + rows, err := s.db.Query(`SELECT id, name, description, elaboration_input, project, repository_url, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json, summary, interactions_json, story_id, acceptance_criteria, checker_for_task_id, checker_report FROM tasks WHERE parent_task_id = ? ORDER BY created_at ASC`, parentID) if err != nil { return nil, err } @@ -231,7 +283,7 @@ func (s *DB) ResetTaskForRetry(id string) (*task.Task, error) { } defer tx.Rollback() //nolint:errcheck - t, err := scanTask(tx.QueryRow(`SELECT id, name, description, elaboration_input, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json, summary, interactions_json FROM tasks WHERE id = ?`, id)) + t, err := scanTask(tx.QueryRow(`SELECT id, name, description, elaboration_input, project, repository_url, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json, summary, interactions_json, story_id, acceptance_criteria, checker_for_task_id, checker_report FROM tasks WHERE id = ?`, id)) if err != nil { if err == sql.ErrNoRows { return nil, fmt.Errorf("task %q not found", id) @@ -292,9 +344,10 @@ func (s *DB) RejectTask(id, comment string) error { // TaskUpdate holds the fields that UpdateTask may change. type TaskUpdate struct { - Name string - Description string - Config task.AgentConfig + Name string + Description string + RepositoryURL string + Config task.AgentConfig Priority task.Priority TimeoutNS int64 Retry task.RetryConfig @@ -333,13 +386,11 @@ func (s *DB) UpdateTask(id string, u TaskUpdate) error { now := time.Now().UTC() result, err := s.db.Exec(` UPDATE tasks - SET name = ?, description = ?, config_json = ?, priority = ?, timeout_ns = ?, + SET name = ?, description = ?, repository_url = ?, config_json = ?, priority = ?, timeout_ns = ?, retry_json = ?, tags_json = ?, depends_on_json = ?, state = ?, updated_at = ? WHERE id = ?`, - u.Name, u.Description, string(configJSON), string(u.Priority), u.TimeoutNS, - string(retryJSON), string(tagsJSON), string(depsJSON), string(task.StatePending), now, - id, - ) + u.Name, u.Description, u.RepositoryURL, configJSON, string(u.Priority), u.TimeoutNS, + retryJSON, tagsJSON, depsJSON, string(task.StatePending), now, id) if err != nil { return err } @@ -376,6 +427,8 @@ func (s *DB) GetMaxUpdatedAt() (time.Time, error) { "2006-01-02T15:04:05Z07:00", time.RFC3339, "2006-01-02 15:04:05", + "2006-01-02 15:04:05 +0000 UTC", + "2006-01-02 15:04:05.999999999 +0000 UTC", } for _, f := range formats { parsed, err := time.Parse(f, t.String) @@ -417,6 +470,55 @@ type Execution struct { Summary string } +// CreateExecutionAndSetRunning inserts an execution record and transitions the +// task to RUNNING in a single transaction, preventing a crash-window where the +// task stays PENDING with an orphaned RUNNING execution record. +func (s *DB) CreateExecutionAndSetRunning(e *Execution) error { + tx, err := s.db.Begin() + if err != nil { + return err + } + defer tx.Rollback() //nolint:errcheck + + // Validate state transition. + var currentState string + if err := tx.QueryRow(`SELECT state FROM tasks WHERE id = ?`, e.TaskID).Scan(¤tState); err != nil { + if err == sql.ErrNoRows { + return fmt.Errorf("task %q not found", e.TaskID) + } + return err + } + if !task.ValidTransition(task.State(currentState), task.StateRunning) { + return fmt.Errorf("invalid state transition %s → RUNNING for task %q", currentState, e.TaskID) + } + + // Insert execution record. + commitsJSON := "[]" + if len(e.Commits) > 0 { + b, err := json.Marshal(e.Commits) + if err != nil { + return fmt.Errorf("marshaling commits: %w", err) + } + commitsJSON = string(b) + } + if _, err := tx.Exec(` + INSERT INTO executions (id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id, sandbox_dir, changestats_json, commits_json) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL, ?)`, + e.ID, e.TaskID, e.StartTime.UTC(), e.EndTime.UTC(), e.ExitCode, e.Status, + e.StdoutPath, e.StderrPath, e.ArtifactDir, e.CostUSD, e.ErrorMsg, e.SessionID, e.SandboxDir, commitsJSON, + ); err != nil { + return err + } + + // Transition task to RUNNING. + now := time.Now().UTC() + if _, err := tx.Exec(`UPDATE tasks SET state = ?, updated_at = ? WHERE id = ?`, string(task.StateRunning), now, e.TaskID); err != nil { + return err + } + + return tx.Commit() +} + // CreateExecution inserts an execution record. func (s *DB) CreateExecution(e *Execution) error { var changestatsJSON *string @@ -544,6 +646,141 @@ type RecentExecution struct { StdoutPath string `json:"stdout_path"` } +// ThroughputBucket is one time-bucket of execution counts by outcome. +type ThroughputBucket struct { + Hour string `json:"hour"` // RFC3339 truncated to hour + Completed int `json:"completed"` + Failed int `json:"failed"` + Other int `json:"other"` +} + +// BillingDay is the aggregated cost and run count for a calendar day. +type BillingDay struct { + Day string `json:"day"` // YYYY-MM-DD + CostUSD float64 `json:"cost_usd"` + Runs int `json:"runs"` +} + +// FailedExecution is a failed/timed-out/budget-exceeded execution with its error. +type FailedExecution struct { + ID string `json:"id"` + TaskID string `json:"task_id"` + TaskName string `json:"task_name"` + Status string `json:"status"` + ErrorMsg string `json:"error_msg"` + Category string `json:"category"` // quota | timeout | rate_limit | git | failed + StartedAt time.Time `json:"started_at"` +} + +// DashboardStats is returned by QueryDashboardStats. +type DashboardStats struct { + Throughput []ThroughputBucket `json:"throughput"` + Billing []BillingDay `json:"billing"` + Failures []FailedExecution `json:"failures"` +} + +// QueryDashboardStats returns pre-aggregated stats for the given window. +func (s *DB) QueryDashboardStats(since time.Time) (*DashboardStats, error) { + stats := &DashboardStats{ + Throughput: []ThroughputBucket{}, + Billing: []BillingDay{}, + Failures: []FailedExecution{}, + } + + // Throughput: completions per hour bucket + tpRows, err := s.db.Query(` + SELECT strftime('%Y-%m-%dT%H:00:00Z', start_time) as hour, + SUM(CASE WHEN status IN ('COMPLETED','READY') THEN 1 ELSE 0 END), + SUM(CASE WHEN status IN ('FAILED','TIMED_OUT','BUDGET_EXCEEDED') THEN 1 ELSE 0 END), + SUM(CASE WHEN status NOT IN ('COMPLETED','READY','FAILED','TIMED_OUT','BUDGET_EXCEEDED') THEN 1 ELSE 0 END) + FROM executions + WHERE start_time >= ? AND status NOT IN ('RUNNING','QUEUED','PENDING') + GROUP BY hour ORDER BY hour ASC`, since.UTC()) + if err != nil { + return nil, err + } + defer tpRows.Close() + for tpRows.Next() { + var b ThroughputBucket + if err := tpRows.Scan(&b.Hour, &b.Completed, &b.Failed, &b.Other); err != nil { + return nil, err + } + stats.Throughput = append(stats.Throughput, b) + } + if err := tpRows.Err(); err != nil { + return nil, err + } + + // Billing: cost per day + billRows, err := s.db.Query(` + SELECT date(start_time) as day, COALESCE(SUM(cost_usd),0), COUNT(*) + FROM executions + WHERE start_time >= ? + GROUP BY day ORDER BY day ASC`, since.UTC()) + if err != nil { + return nil, err + } + defer billRows.Close() + for billRows.Next() { + var b BillingDay + if err := billRows.Scan(&b.Day, &b.CostUSD, &b.Runs); err != nil { + return nil, err + } + stats.Billing = append(stats.Billing, b) + } + if err := billRows.Err(); err != nil { + return nil, err + } + + // Failures: recent failed executions with error messages + failRows, err := s.db.Query(` + SELECT e.id, e.task_id, t.name, e.status, COALESCE(e.error_msg,''), e.start_time + FROM executions e JOIN tasks t ON e.task_id = t.id + WHERE e.start_time >= ? AND e.status IN ('FAILED','TIMED_OUT','BUDGET_EXCEEDED') + ORDER BY e.start_time DESC LIMIT 50`, since.UTC()) + if err != nil { + return nil, err + } + defer failRows.Close() + for failRows.Next() { + var f FailedExecution + if err := failRows.Scan(&f.ID, &f.TaskID, &f.TaskName, &f.Status, &f.ErrorMsg, &f.StartedAt); err != nil { + return nil, err + } + f.Category = classifyError(f.Status, f.ErrorMsg) + stats.Failures = append(stats.Failures, f) + } + if err := failRows.Err(); err != nil { + return nil, err + } + + return stats, nil +} + +// classifyError maps a status + error message to a human category. +func classifyError(status, msg string) string { + if status == "TIMED_OUT" { + return "timeout" + } + if status == "BUDGET_EXCEEDED" { + return "quota" + } + low := strings.ToLower(msg) + if strings.Contains(low, "quota") || strings.Contains(low, "exhausted") || strings.Contains(low, "terminalquota") { + return "quota" + } + if strings.Contains(low, "rate limit") || strings.Contains(low, "429") || strings.Contains(low, "too many requests") { + return "rate_limit" + } + if strings.Contains(low, "git push") || strings.Contains(low, "git pull") { + return "git" + } + if strings.Contains(low, "timeout") || strings.Contains(low, "deadline") { + return "timeout" + } + return "failed" +} + // ListRecentExecutions returns executions since the given time, joined with task names. // If taskID is non-empty, only executions for that task are returned. func (s *DB) ListRecentExecutions(since time.Time, limit int, taskID string) ([]*RecentExecution, error) { @@ -600,6 +837,24 @@ func (s *DB) UpdateTaskSummary(taskID, summary string) error { return err } +// UpdateTaskCheckerReport sets the checker_report field on a task. +func (s *DB) UpdateTaskCheckerReport(id, report string) error { + now := time.Now().UTC() + _, err := s.db.Exec(`UPDATE tasks SET checker_report = ?, updated_at = ? WHERE id = ?`, report, now, id) + return err +} + +// GetCheckerTask returns the checker task for the given checked task ID, +// or nil if no checker task exists. +func (s *DB) GetCheckerTask(checkedTaskID string) (*task.Task, error) { + row := s.db.QueryRow(`SELECT id, name, description, elaboration_input, project, repository_url, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json, summary, interactions_json, story_id, acceptance_criteria, checker_for_task_id, checker_report FROM tasks WHERE checker_for_task_id = ? LIMIT 1`, checkedTaskID) + t, err := scanTask(row) + if err == sql.ErrNoRows { + return nil, nil + } + return t, err +} + // AppendTaskInteraction appends a Q&A interaction to the task's interaction history. func (s *DB) AppendTaskInteraction(taskID string, interaction task.Interaction) error { tx, err := s.db.Begin() @@ -682,17 +937,35 @@ func scanTask(row scanner) (*task.Task, error) { timeoutNS int64 parentTaskID sql.NullString elaborationInput sql.NullString + project sql.NullString + repositoryURL sql.NullString rejectionComment sql.NullString questionJSON sql.NullString summary sql.NullString interactionsJSON sql.NullString + storyID sql.NullString + acceptanceCriteria sql.NullString + checkerForTaskID sql.NullString + checkerReport sql.NullString + ) + err := row.Scan( + &t.ID, &t.Name, &t.Description, &elaborationInput, &project, &repositoryURL, + &configJSON, &priority, &timeoutNS, &retryJSON, &tagsJSON, &depsJSON, + &parentTaskID, &state, &t.CreatedAt, &t.UpdatedAt, + &rejectionComment, &questionJSON, &summary, &interactionsJSON, &storyID, + &acceptanceCriteria, &checkerForTaskID, &checkerReport, ) - err := row.Scan(&t.ID, &t.Name, &t.Description, &elaborationInput, &configJSON, &priority, &timeoutNS, &retryJSON, &tagsJSON, &depsJSON, &parentTaskID, &state, &t.CreatedAt, &t.UpdatedAt, &rejectionComment, &questionJSON, &summary, &interactionsJSON) t.ParentTaskID = parentTaskID.String t.ElaborationInput = elaborationInput.String + t.Project = project.String + t.RepositoryURL = repositoryURL.String t.RejectionComment = rejectionComment.String t.QuestionJSON = questionJSON.String t.Summary = summary.String + t.StoryID = storyID.String + t.AcceptanceCriteria = acceptanceCriteria.String + t.CheckerForTaskID = checkerForTaskID.String + t.CheckerReport = checkerReport.String if err != nil { return nil, err } @@ -772,3 +1045,263 @@ func (s *DB) UpdateExecutionChangestats(execID string, stats *task.Changestats) func scanExecutionRows(rows *sql.Rows) (*Execution, error) { return scanExecution(rows) } + +// PushSubscription represents a browser push subscription. +type PushSubscription struct { + ID string `json:"id"` + Endpoint string `json:"endpoint"` + P256DHKey string `json:"p256dh_key"` + AuthKey string `json:"auth_key"` + CreatedAt time.Time `json:"created_at"` +} + +// SavePushSubscription inserts or replaces a push subscription by endpoint. +func (s *DB) SavePushSubscription(sub PushSubscription) error { + _, err := s.db.Exec(` + INSERT INTO push_subscriptions (id, endpoint, p256dh_key, auth_key) + VALUES (?, ?, ?, ?) + ON CONFLICT(endpoint) DO UPDATE SET + id = excluded.id, + p256dh_key = excluded.p256dh_key, + auth_key = excluded.auth_key`, + sub.ID, sub.Endpoint, sub.P256DHKey, sub.AuthKey, + ) + return err +} + +// DeletePushSubscription removes the subscription with the given endpoint. +func (s *DB) DeletePushSubscription(endpoint string) error { + _, err := s.db.Exec(`DELETE FROM push_subscriptions WHERE endpoint = ?`, endpoint) + return err +} + +// ListPushSubscriptions returns all registered push subscriptions. +func (s *DB) ListPushSubscriptions() ([]PushSubscription, error) { + rows, err := s.db.Query(`SELECT id, endpoint, p256dh_key, auth_key, created_at FROM push_subscriptions ORDER BY created_at`) + if err != nil { + return nil, err + } + defer rows.Close() + + var subs []PushSubscription + for rows.Next() { + var sub PushSubscription + var createdAt string + if err := rows.Scan(&sub.ID, &sub.Endpoint, &sub.P256DHKey, &sub.AuthKey, &createdAt); err != nil { + return nil, err + } + // Parse created_at; ignore errors (use zero time on failure). + for _, layout := range []string{time.RFC3339, "2006-01-02 15:04:05", "2006-01-02T15:04:05Z"} { + if t, err := time.Parse(layout, createdAt); err == nil { + sub.CreatedAt = t + break + } + } + subs = append(subs, sub) + } + if subs == nil { + subs = []PushSubscription{} + } + return subs, rows.Err() +} + +// GetSetting returns the value for a key, or ("", nil) if not found. +func (s *DB) GetSetting(key string) (string, error) { + var value string + err := s.db.QueryRow(`SELECT value FROM settings WHERE key = ?`, key).Scan(&value) + if err == sql.ErrNoRows { + return "", nil + } + return value, err +} + +// SetSetting upserts a key/value pair in the settings table. +func (s *DB) SetSetting(key, value string) error { + _, err := s.db.Exec(`INSERT INTO settings (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value = excluded.value`, key, value) + return err +} + +// AgentEvent records a rate-limit state change for an agent. +type AgentEvent struct { + ID string `json:"id"` + Agent string `json:"agent"` + Event string `json:"event"` // "rate_limited" | "available" + Timestamp time.Time `json:"timestamp"` + Until *time.Time `json:"until,omitempty"` // non-nil for "rate_limited" events + Reason string `json:"reason"` // "transient" | "quota" +} + +// RecordAgentEvent inserts an agent rate-limit event. +func (s *DB) RecordAgentEvent(e AgentEvent) error { + _, err := s.db.Exec( + `INSERT INTO agent_events (id, agent, event, timestamp, until, reason) VALUES (?, ?, ?, ?, ?, ?)`, + e.ID, e.Agent, e.Event, e.Timestamp.UTC(), timeOrNull(e.Until), e.Reason, + ) + return err +} + +// ListAgentEvents returns agent events since the given time, newest first. +func (s *DB) ListAgentEvents(since time.Time) ([]AgentEvent, error) { + rows, err := s.db.Query( + `SELECT id, agent, event, timestamp, until, reason FROM agent_events WHERE timestamp >= ? ORDER BY timestamp DESC LIMIT 500`, + since.UTC(), + ) + if err != nil { + return nil, err + } + defer rows.Close() + var events []AgentEvent + for rows.Next() { + var e AgentEvent + var until sql.NullTime + var reason sql.NullString + if err := rows.Scan(&e.ID, &e.Agent, &e.Event, &e.Timestamp, &until, &reason); err != nil { + return nil, err + } + if until.Valid { + e.Until = &until.Time + } + e.Reason = reason.String + events = append(events, e) + } + return events, rows.Err() +} + +func timeOrNull(t *time.Time) interface{} { + if t == nil { + return nil + } + return t.UTC() +} + +// CreateProject inserts a new project. +func (s *DB) CreateProject(p *task.Project) error { + now := time.Now().UTC() + _, err := s.db.Exec( + `INSERT INTO projects (id, name, remote_url, local_path, type, deploy_script, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`, + p.ID, p.Name, p.RemoteURL, p.LocalPath, p.Type, p.DeployScript, now, now, + ) + return err +} + +// GetProject retrieves a project by ID. +func (s *DB) GetProject(id string) (*task.Project, error) { + row := s.db.QueryRow(`SELECT id, name, remote_url, local_path, type, deploy_script FROM projects WHERE id = ?`, id) + p := &task.Project{} + if err := row.Scan(&p.ID, &p.Name, &p.RemoteURL, &p.LocalPath, &p.Type, &p.DeployScript); err != nil { + return nil, err + } + return p, nil +} + +// ListProjects returns all projects. +func (s *DB) ListProjects() ([]*task.Project, error) { + rows, err := s.db.Query(`SELECT id, name, remote_url, local_path, type, deploy_script FROM projects ORDER BY name`) + if err != nil { + return nil, err + } + defer rows.Close() + var projects []*task.Project + for rows.Next() { + p := &task.Project{} + if err := rows.Scan(&p.ID, &p.Name, &p.RemoteURL, &p.LocalPath, &p.Type, &p.DeployScript); err != nil { + return nil, err + } + projects = append(projects, p) + } + return projects, rows.Err() +} + +// UpdateProject updates an existing project. +func (s *DB) UpdateProject(p *task.Project) error { + now := time.Now().UTC() + _, err := s.db.Exec( + `UPDATE projects SET name = ?, remote_url = ?, local_path = ?, type = ?, deploy_script = ?, updated_at = ? WHERE id = ?`, + p.Name, p.RemoteURL, p.LocalPath, p.Type, p.DeployScript, now, p.ID, + ) + return err +} + +// UpsertProject inserts or updates a project by ID (used for seeding). +func (s *DB) UpsertProject(p *task.Project) error { + now := time.Now().UTC() + _, err := s.db.Exec( + `INSERT INTO projects (id, name, remote_url, local_path, type, deploy_script, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(id) DO UPDATE SET name=excluded.name, remote_url=excluded.remote_url, + local_path=excluded.local_path, type=excluded.type, deploy_script=excluded.deploy_script, updated_at=excluded.updated_at`, + p.ID, p.Name, p.RemoteURL, p.LocalPath, p.Type, p.DeployScript, now, now, + ) + return err +} + +// CreateStory inserts a new story. +func (s *DB) CreateStory(st *task.Story) error { + now := time.Now().UTC() + _, err := s.db.Exec( + `INSERT INTO stories (id, name, project_id, branch_name, deploy_config, validation_json, status, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`, + st.ID, st.Name, st.ProjectID, st.BranchName, st.DeployConfig, st.ValidationJSON, string(st.Status), now, now, + ) + return err +} + +// GetStory retrieves a story by ID. +func (s *DB) GetStory(id string) (*task.Story, error) { + row := s.db.QueryRow(`SELECT id, name, project_id, branch_name, deploy_config, validation_json, status, created_at, updated_at FROM stories WHERE id = ?`, id) + st := &task.Story{} + var status string + if err := row.Scan(&st.ID, &st.Name, &st.ProjectID, &st.BranchName, &st.DeployConfig, &st.ValidationJSON, &status, &st.CreatedAt, &st.UpdatedAt); err != nil { + return nil, err + } + st.Status = task.StoryState(status) + return st, nil +} + +// ListStories returns all stories ordered by creation time descending. +func (s *DB) ListStories() ([]*task.Story, error) { + rows, err := s.db.Query(`SELECT id, name, project_id, branch_name, deploy_config, validation_json, status, created_at, updated_at FROM stories ORDER BY created_at DESC`) + if err != nil { + return nil, err + } + defer rows.Close() + var stories []*task.Story + for rows.Next() { + st := &task.Story{} + var status string + if err := rows.Scan(&st.ID, &st.Name, &st.ProjectID, &st.BranchName, &st.DeployConfig, &st.ValidationJSON, &status, &st.CreatedAt, &st.UpdatedAt); err != nil { + return nil, err + } + st.Status = task.StoryState(status) + stories = append(stories, st) + } + return stories, rows.Err() +} + +// UpdateStoryStatus updates the status of a story. +func (s *DB) UpdateStoryStatus(id string, status task.StoryState) error { + now := time.Now().UTC() + _, err := s.db.Exec(`UPDATE stories SET status = ?, updated_at = ? WHERE id = ?`, string(status), now, id) + return err +} + +// ListTasksByStory returns all tasks associated with a story, ordered by creation time ascending. +func (s *DB) ListTasksByStory(storyID string) ([]*task.Task, error) { + rows, err := s.db.Query( + `SELECT id, name, description, elaboration_input, project, repository_url, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json, summary, interactions_json, story_id, acceptance_criteria, checker_for_task_id, checker_report FROM tasks WHERE story_id = ? ORDER BY created_at ASC`, + storyID, + ) + if err != nil { + return nil, err + } + defer rows.Close() + var tasks []*task.Task + for rows.Next() { + t, err := scanTaskRows(rows) + if err != nil { + return nil, err + } + tasks = append(tasks, t) + } + return tasks, rows.Err() +} |
