summaryrefslogtreecommitdiff
path: root/internal/storage
diff options
context:
space:
mode:
Diffstat (limited to 'internal/storage')
-rw-r--r--internal/storage/db.go45
-rw-r--r--internal/storage/db_test.go68
2 files changed, 103 insertions, 10 deletions
diff --git a/internal/storage/db.go b/internal/storage/db.go
index e656f98..0e4d6f1 100644
--- a/internal/storage/db.go
+++ b/internal/storage/db.go
@@ -88,6 +88,8 @@ func (s *DB) migrate() error {
migrations := []string{
`ALTER TABLE tasks ADD COLUMN parent_task_id TEXT`,
`ALTER TABLE tasks ADD COLUMN rejection_comment TEXT`,
+ `ALTER TABLE tasks ADD COLUMN question_json TEXT`,
+ `ALTER TABLE executions ADD COLUMN session_id TEXT`,
}
for _, m := range migrations {
if _, err := s.db.Exec(m); err != nil {
@@ -136,13 +138,13 @@ func (s *DB) CreateTask(t *task.Task) error {
// GetTask retrieves a task by ID.
func (s *DB) GetTask(id string) (*task.Task, error) {
- row := s.db.QueryRow(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment FROM tasks WHERE id = ?`, id)
+ row := s.db.QueryRow(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json FROM tasks WHERE id = ?`, id)
return scanTask(row)
}
// ListTasks returns tasks matching the given filter.
func (s *DB) ListTasks(filter TaskFilter) ([]*task.Task, error) {
- query := `SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment FROM tasks WHERE 1=1`
+ query := `SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json FROM tasks WHERE 1=1`
var args []interface{}
if filter.State != "" {
@@ -174,7 +176,7 @@ func (s *DB) ListTasks(filter TaskFilter) ([]*task.Task, error) {
// ListSubtasks returns all tasks whose parent_task_id matches the given ID.
func (s *DB) ListSubtasks(parentID string) ([]*task.Task, error) {
- rows, err := s.db.Query(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment FROM tasks WHERE parent_task_id = ? ORDER BY created_at ASC`, parentID)
+ rows, err := s.db.Query(`SELECT id, name, description, config_json, priority, timeout_ns, retry_json, tags_json, depends_on_json, parent_task_id, state, created_at, updated_at, rejection_comment, question_json FROM tasks WHERE parent_task_id = ? ORDER BY created_at ASC`, parentID)
if err != nil {
return nil, err
}
@@ -308,28 +310,33 @@ type Execution struct {
ArtifactDir string
CostUSD float64
ErrorMsg string
+ SessionID string // claude --session-id; persisted for resume
+
+ // In-memory only: set when creating a resume execution, not stored in DB.
+ ResumeSessionID string
+ ResumeAnswer string
}
// CreateExecution inserts an execution record.
func (s *DB) CreateExecution(e *Execution) error {
_, err := s.db.Exec(`
- INSERT INTO executions (id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
+ INSERT INTO executions (id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
e.ID, e.TaskID, e.StartTime.UTC(), e.EndTime.UTC(), e.ExitCode, e.Status,
- e.StdoutPath, e.StderrPath, e.ArtifactDir, e.CostUSD, e.ErrorMsg,
+ e.StdoutPath, e.StderrPath, e.ArtifactDir, e.CostUSD, e.ErrorMsg, e.SessionID,
)
return err
}
// GetExecution retrieves an execution by ID.
func (s *DB) GetExecution(id string) (*Execution, error) {
- row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg FROM executions WHERE id = ?`, id)
+ row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id FROM executions WHERE id = ?`, id)
return scanExecution(row)
}
// ListExecutions returns executions for a task.
func (s *DB) ListExecutions(taskID string) ([]*Execution, error) {
- rows, err := s.db.Query(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg FROM executions WHERE task_id = ? ORDER BY start_time DESC`, taskID)
+ rows, err := s.db.Query(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id FROM executions WHERE task_id = ? ORDER BY start_time DESC`, taskID)
if err != nil {
return nil, err
}
@@ -346,6 +353,20 @@ func (s *DB) ListExecutions(taskID string) ([]*Execution, error) {
return execs, rows.Err()
}
+// GetLatestExecution returns the most recent execution for a task.
+func (s *DB) GetLatestExecution(taskID string) (*Execution, error) {
+ row := s.db.QueryRow(`SELECT id, task_id, start_time, end_time, exit_code, status, stdout_path, stderr_path, artifact_dir, cost_usd, error_msg, session_id FROM executions WHERE task_id = ? ORDER BY start_time DESC LIMIT 1`, taskID)
+ return scanExecution(row)
+}
+
+// UpdateTaskQuestion stores the pending question JSON on a task.
+// Pass empty string to clear the question after it has been answered.
+func (s *DB) UpdateTaskQuestion(taskID, questionJSON string) error {
+ _, err := s.db.Exec(`UPDATE tasks SET question_json = ?, updated_at = ? WHERE id = ?`,
+ questionJSON, time.Now().UTC(), taskID)
+ return err
+}
+
// UpdateExecution updates a completed execution.
func (s *DB) UpdateExecution(e *Execution) error {
_, err := s.db.Exec(`
@@ -374,10 +395,12 @@ func scanTask(row scanner) (*task.Task, error) {
timeoutNS int64
parentTaskID sql.NullString
rejectionComment sql.NullString
+ questionJSON sql.NullString
)
- err := row.Scan(&t.ID, &t.Name, &t.Description, &configJSON, &priority, &timeoutNS, &retryJSON, &tagsJSON, &depsJSON, &parentTaskID, &state, &t.CreatedAt, &t.UpdatedAt, &rejectionComment)
+ err := row.Scan(&t.ID, &t.Name, &t.Description, &configJSON, &priority, &timeoutNS, &retryJSON, &tagsJSON, &depsJSON, &parentTaskID, &state, &t.CreatedAt, &t.UpdatedAt, &rejectionComment, &questionJSON)
t.ParentTaskID = parentTaskID.String
t.RejectionComment = rejectionComment.String
+ t.QuestionJSON = questionJSON.String
if err != nil {
return nil, err
}
@@ -405,11 +428,13 @@ func scanTaskRows(rows *sql.Rows) (*task.Task, error) {
func scanExecution(row scanner) (*Execution, error) {
var e Execution
+ var sessionID sql.NullString
err := row.Scan(&e.ID, &e.TaskID, &e.StartTime, &e.EndTime, &e.ExitCode, &e.Status,
- &e.StdoutPath, &e.StderrPath, &e.ArtifactDir, &e.CostUSD, &e.ErrorMsg)
+ &e.StdoutPath, &e.StderrPath, &e.ArtifactDir, &e.CostUSD, &e.ErrorMsg, &sessionID)
if err != nil {
return nil, err
}
+ e.SessionID = sessionID.String
return &e, nil
}
diff --git a/internal/storage/db_test.go b/internal/storage/db_test.go
index 4f9069a..395574c 100644
--- a/internal/storage/db_test.go
+++ b/internal/storage/db_test.go
@@ -421,3 +421,71 @@ func TestUpdateExecution(t *testing.T) {
t.Errorf("artifact_dir: want /tmp/exec, got %q", got.ArtifactDir)
}
}
+
+func makeTestTask(id string, now time.Time) *task.Task {
+ return &task.Task{
+ ID: id, Name: "T-" + id, Claude: task.ClaudeConfig{Instructions: "x"},
+ Priority: task.PriorityNormal, Retry: task.RetryConfig{MaxAttempts: 1, Backoff: "linear"},
+ Tags: []string{}, DependsOn: []string{},
+ State: task.StatePending, CreatedAt: now, UpdatedAt: now,
+ }
+}
+
+func TestStorage_SessionID_RoundTrip(t *testing.T) {
+ db := testDB(t)
+ now := time.Now().UTC()
+ db.CreateTask(makeTestTask("sid-task", now))
+
+ exec := &Execution{
+ ID: "sid-exec", TaskID: "sid-task", StartTime: now, Status: "RUNNING",
+ SessionID: "550e8400-e29b-41d4-a716-446655440000",
+ }
+ if err := db.CreateExecution(exec); err != nil {
+ t.Fatalf("create: %v", err)
+ }
+
+ got, err := db.GetExecution("sid-exec")
+ if err != nil {
+ t.Fatalf("get: %v", err)
+ }
+ if got.SessionID != exec.SessionID {
+ t.Errorf("session_id: want %q, got %q", exec.SessionID, got.SessionID)
+ }
+}
+
+func TestStorage_UpdateTaskQuestion(t *testing.T) {
+ db := testDB(t)
+ now := time.Now().UTC()
+ tk := makeTestTask("q-task", now)
+ db.CreateTask(tk)
+
+ q := `{"text":"Which branch?","options":["main","develop"]}`
+ if err := db.UpdateTaskQuestion("q-task", q); err != nil {
+ t.Fatalf("update question: %v", err)
+ }
+
+ got, err := db.GetTask("q-task")
+ if err != nil {
+ t.Fatalf("get: %v", err)
+ }
+ if got.QuestionJSON != q {
+ t.Errorf("question_json: want %q, got %q", q, got.QuestionJSON)
+ }
+}
+
+func TestStorage_GetLatestExecution(t *testing.T) {
+ db := testDB(t)
+ now := time.Now().UTC()
+ db.CreateTask(makeTestTask("le-task", now))
+
+ db.CreateExecution(&Execution{ID: "le-1", TaskID: "le-task", StartTime: now, Status: "COMPLETED"})
+ db.CreateExecution(&Execution{ID: "le-2", TaskID: "le-task", StartTime: now.Add(time.Minute), Status: "RUNNING"})
+
+ got, err := db.GetLatestExecution("le-task")
+ if err != nil {
+ t.Fatalf("get latest: %v", err)
+ }
+ if got.ID != "le-2" {
+ t.Errorf("want le-2, got %q", got.ID)
+ }
+}