summaryrefslogtreecommitdiff
path: root/internal/executor/executor.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/executor/executor.go')
-rw-r--r--internal/executor/executor.go138
1 files changed, 138 insertions, 0 deletions
diff --git a/internal/executor/executor.go b/internal/executor/executor.go
new file mode 100644
index 0000000..c6c5124
--- /dev/null
+++ b/internal/executor/executor.go
@@ -0,0 +1,138 @@
+package executor
+
+import (
+ "context"
+ "fmt"
+ "log/slog"
+ "sync"
+ "time"
+
+ "github.com/claudomator/claudomator/internal/storage"
+ "github.com/claudomator/claudomator/internal/task"
+ "github.com/google/uuid"
+)
+
+// Runner executes a single task and returns the result.
+type Runner interface {
+ Run(ctx context.Context, t *task.Task, exec *storage.Execution) error
+}
+
+// Pool manages a bounded set of concurrent task workers.
+type Pool struct {
+ maxConcurrent int
+ runner Runner
+ store *storage.DB
+ logger *slog.Logger
+
+ mu sync.Mutex
+ active int
+ resultCh chan *Result
+}
+
+// Result is emitted when a task execution completes.
+type Result struct {
+ TaskID string
+ Execution *storage.Execution
+ Err error
+}
+
+func NewPool(maxConcurrent int, runner Runner, store *storage.DB, logger *slog.Logger) *Pool {
+ if maxConcurrent < 1 {
+ maxConcurrent = 1
+ }
+ return &Pool{
+ maxConcurrent: maxConcurrent,
+ runner: runner,
+ store: store,
+ logger: logger,
+ resultCh: make(chan *Result, maxConcurrent*2),
+ }
+}
+
+// Submit dispatches a task for execution. Blocks if pool is at capacity.
+func (p *Pool) Submit(ctx context.Context, t *task.Task) error {
+ p.mu.Lock()
+ if p.active >= p.maxConcurrent {
+ active := p.active
+ max := p.maxConcurrent
+ p.mu.Unlock()
+ return fmt.Errorf("executor pool at capacity (%d/%d)", active, max)
+ }
+ p.active++
+ p.mu.Unlock()
+
+ go p.execute(ctx, t)
+ return nil
+}
+
+// Results returns the channel for reading execution results.
+func (p *Pool) Results() <-chan *Result {
+ return p.resultCh
+}
+
+// ActiveCount returns the number of currently running tasks.
+func (p *Pool) ActiveCount() int {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ return p.active
+}
+
+func (p *Pool) execute(ctx context.Context, t *task.Task) {
+ execID := uuid.New().String()
+ exec := &storage.Execution{
+ ID: execID,
+ TaskID: t.ID,
+ StartTime: time.Now().UTC(),
+ Status: "RUNNING",
+ }
+
+ // Record execution start.
+ if err := p.store.CreateExecution(exec); err != nil {
+ p.logger.Error("failed to create execution record", "error", err)
+ }
+ if err := p.store.UpdateTaskState(t.ID, task.StateRunning); err != nil {
+ p.logger.Error("failed to update task state", "error", err)
+ }
+
+ // Apply task timeout.
+ var cancel context.CancelFunc
+ if t.Timeout.Duration > 0 {
+ ctx, cancel = context.WithTimeout(ctx, t.Timeout.Duration)
+ } else {
+ ctx, cancel = context.WithCancel(ctx)
+ }
+ defer cancel()
+
+ // Run the task.
+ err := p.runner.Run(ctx, t, exec)
+ exec.EndTime = time.Now().UTC()
+
+ if err != nil {
+ if ctx.Err() == context.DeadlineExceeded {
+ exec.Status = "TIMED_OUT"
+ exec.ErrorMsg = "execution timed out"
+ p.store.UpdateTaskState(t.ID, task.StateTimedOut)
+ } else if ctx.Err() == context.Canceled {
+ exec.Status = "CANCELLED"
+ exec.ErrorMsg = "execution cancelled"
+ p.store.UpdateTaskState(t.ID, task.StateCancelled)
+ } else {
+ exec.Status = "FAILED"
+ exec.ErrorMsg = err.Error()
+ p.store.UpdateTaskState(t.ID, task.StateFailed)
+ }
+ } else {
+ exec.Status = "COMPLETED"
+ p.store.UpdateTaskState(t.ID, task.StateCompleted)
+ }
+
+ if updateErr := p.store.UpdateExecution(exec); updateErr != nil {
+ p.logger.Error("failed to update execution", "error", updateErr)
+ }
+
+ p.mu.Lock()
+ p.active--
+ p.mu.Unlock()
+
+ p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err}
+}