summaryrefslogtreecommitdiff
path: root/internal/storage/db.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/storage/db.go')
-rw-r--r--internal/storage/db.go45
1 files changed, 35 insertions, 10 deletions
diff --git a/internal/storage/db.go b/internal/storage/db.go
index e656f98..0e4d6f1 100644
--- a/internal/storage/db.go
+++ b/internal/storage/db.go
@@ -88,6 +88,8 @@ func (s *DB) migrate() error {
migrations := []string{
`ALTER TABLE tasks ADD COLUMN parent_task_id TEXT`,
`ALTER TABLE tasks ADD COLUMN rejection_comment TEXT`,
+ `ALTER TABLE tasks ADD COLUMN question_json TEXT`,
+ `ALTER TABLE executions ADD COLUMN session_id TEXT`,
}
for _, m := range migrations {
if _, err := s.db.Exec(m); err != nil {
@@ -136,13 +138,13 @@ func (s *DB) CreateTask(t *task.Task) error {
// GetTask retrieves a task by ID.
func (s *DB) GetTask(id string) (*task.Task, error) {
- row := s.db.QueryRow(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment FROM tasks WHERE id = ?`, id)
+ row := s.db.QueryRow(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json FROM tasks WHERE id = ?`, id)
return scanTask(row)
}
// ListTasks returns tasks matching the given filter.
func (s *DB) ListTasks(filter TaskFilter) ([]*task.Task, error) {
- query := `SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment FROM tasks WHERE 1=1`
+ query := `SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json FROM tasks WHERE 1=1`
var args []interface{}
if filter.State != "" {
@@ -174,7 +176,7 @@ func (s *DB) ListTasks(filter TaskFilter) ([]*task.Task, error) {
// ListSubtasks returns all tasks whose parent_task_id matches the given ID.
func (s *DB) ListSubtasks(parentID string) ([]*task.Task, error) {
- rows, err := s.db.Query(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment FROM tasks WHERE parent_task_id = ? ORDER BY created_at ASC`, parentID)
+ rows, err := s.db.Query(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json FROM tasks WHERE parent_task_id = ? ORDER BY created_at ASC`, parentID)
if err != nil {
return nil, err
}
@@ -308,28 +310,33 @@ type Execution struct {
ArtifactDir string
CostUSD float64
ErrorMsg string
+ SessionID string // claude --session-id; persisted for resume
+
+ // In-memory only: set when creating a resume execution, not stored in DB.
+ ResumeSessionID string
+ ResumeAnswer string
}
// CreateExecution inserts an execution record.
func (s *DB) CreateExecution(e *Execution) error {
_, err := s.db.Exec(`
- INSERT INTO executions (id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
+ INSERT INTO executions (id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
e.ID, e.TaskID, e.StartTime.UTC(), e.EndTime.UTC(), e.ExitCode, e.Status,
- e.StdoutPath, e.StderrPath, e.ArtifactDir, e.CostUSD, e.ErrorMsg,
+ e.StdoutPath, e.StderrPath, e.ArtifactDir, e.CostUSD, e.ErrorMsg, e.SessionID,
)
return err
}
// GetExecution retrieves an execution by ID.
func (s *DB) GetExecution(id string) (*Execution, error) {
- row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg FROM executions WHERE id = ?`, id)
+ row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id FROM executions WHERE id = ?`, id)
return scanExecution(row)
}
// ListExecutions returns executions for a task.
func (s *DB) ListExecutions(taskID string) ([]*Execution, error) {
- rows, err := s.db.Query(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg FROM executions WHERE task_id = ? ORDER BY start_time DESC`, taskID)
+ rows, err := s.db.Query(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id FROM executions WHERE task_id = ? ORDER BY start_time DESC`, taskID)
if err != nil {
return nil, err
}
@@ -346,6 +353,20 @@ func (s *DB) ListExecutions(taskID string) ([]*Execution, error) {
return execs, rows.Err()
}
+// GetLatestExecution returns the most recent execution for a task.
+func (s *DB) GetLatestExecution(taskID string) (*Execution, error) {
+ row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id FROM executions WHERE task_id = ? ORDER BY start_time DESC LIMIT 1`, taskID)
+ return scanExecution(row)
+}
+
+// UpdateTaskQuestion stores the pending question JSON on a task.
+// Pass empty string to clear the question after it has been answered.
+func (s *DB) UpdateTaskQuestion(taskID, questionJSON string) error {
+ _, err := s.db.Exec(`UPDATE tasks SET question_json = ?, updated_at = ? WHERE id = ?`,
+ questionJSON, time.Now().UTC(), taskID)
+ return err
+}
+
// UpdateExecution updates a completed execution.
func (s *DB) UpdateExecution(e *Execution) error {
_, err := s.db.Exec(`
@@ -374,10 +395,12 @@ func scanTask(row scanner) (*task.Task, error) {
timeoutNS int64
parentTaskID sql.NullString
rejectionComment sql.NullString
+ questionJSON sql.NullString
)
- err := row.Scan(&t.ID, &t.Name, &t.Description, &configJSON, &priority, &timeoutNS, &retryJSON, &tagsJSON, &depsJSON, &parentTaskID, &state, &t.CreatedAt, &t.UpdatedAt, &rejectionComment)
+ err := row.Scan(&t.ID, &t.Name, &t.Description, &configJSON, &priority, &timeoutNS, &retryJSON, &tagsJSON, &depsJSON, &parentTaskID, &state, &t.CreatedAt, &t.UpdatedAt, &rejectionComment, &questionJSON)
t.ParentTaskID = parentTaskID.String
t.RejectionComment = rejectionComment.String
+ t.QuestionJSON = questionJSON.String
if err != nil {
return nil, err
}
@@ -405,11 +428,13 @@ func scanTaskRows(rows *sql.Rows) (*task.Task, error) {
func scanExecution(row scanner) (*Execution, error) {
var e Execution
+ var sessionID sql.NullString
err := row.Scan(&e.ID, &e.TaskID, &e.StartTime, &e.EndTime, &e.ExitCode, &e.Status,
- &e.StdoutPath, &e.StderrPath, &e.ArtifactDir, &e.CostUSD, &e.ErrorMsg)
+ &e.StdoutPath, &e.StderrPath, &e.ArtifactDir, &e.CostUSD, &e.ErrorMsg, &sessionID)
if err != nil {
return nil, err
}
+ e.SessionID = sessionID.String
return &e, nil
}