From 2e2b2187b957e9af78797a67ec5c6874615fae02 Mon Sep 17 00:00:00 2001 From: Peter Stone Date: Sun, 8 Feb 2026 21:35:45 -1000 Subject: Initial project: task model, executor, API server, CLI, storage, reporter Claudomator automation toolkit for Claude Code with: - Task model with YAML parsing, validation, state machine (49 tests, 0 races) - SQLite storage for tasks and executions - Executor pool with bounded concurrency, timeout, cancellation - REST API + WebSocket for mobile PWA integration - Webhook/multi-notifier system - CLI: init, run, serve, list, status commands - Console, JSON, HTML reporters with cost tracking Co-Authored-By: Claude Opus 4.6 --- internal/executor/executor.go | 138 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 138 insertions(+) create mode 100644 internal/executor/executor.go (limited to 'internal/executor/executor.go') diff --git a/internal/executor/executor.go b/internal/executor/executor.go new file mode 100644 index 0000000..c6c5124 --- /dev/null +++ b/internal/executor/executor.go @@ -0,0 +1,138 @@ +package executor + +import ( + "context" + "fmt" + "log/slog" + "sync" + "time" + + "github.com/claudomator/claudomator/internal/storage" + "github.com/claudomator/claudomator/internal/task" + "github.com/google/uuid" +) + +// Runner executes a single task and returns the result. +type Runner interface { + Run(ctx context.Context, t *task.Task, exec *storage.Execution) error +} + +// Pool manages a bounded set of concurrent task workers. +type Pool struct { + maxConcurrent int + runner Runner + store *storage.DB + logger *slog.Logger + + mu sync.Mutex + active int + resultCh chan *Result +} + +// Result is emitted when a task execution completes. +type Result struct { + TaskID string + Execution *storage.Execution + Err error +} + +func NewPool(maxConcurrent int, runner Runner, store *storage.DB, logger *slog.Logger) *Pool { + if maxConcurrent < 1 { + maxConcurrent = 1 + } + return &Pool{ + maxConcurrent: maxConcurrent, + runner: runner, + store: store, + logger: logger, + resultCh: make(chan *Result, maxConcurrent*2), + } +} + +// Submit dispatches a task for execution. Blocks if pool is at capacity. +func (p *Pool) Submit(ctx context.Context, t *task.Task) error { + p.mu.Lock() + if p.active >= p.maxConcurrent { + active := p.active + max := p.maxConcurrent + p.mu.Unlock() + return fmt.Errorf("executor pool at capacity (%d/%d)", active, max) + } + p.active++ + p.mu.Unlock() + + go p.execute(ctx, t) + return nil +} + +// Results returns the channel for reading execution results. +func (p *Pool) Results() <-chan *Result { + return p.resultCh +} + +// ActiveCount returns the number of currently running tasks. +func (p *Pool) ActiveCount() int { + p.mu.Lock() + defer p.mu.Unlock() + return p.active +} + +func (p *Pool) execute(ctx context.Context, t *task.Task) { + execID := uuid.New().String() + exec := &storage.Execution{ + ID: execID, + TaskID: t.ID, + StartTime: time.Now().UTC(), + Status: "RUNNING", + } + + // Record execution start. + if err := p.store.CreateExecution(exec); err != nil { + p.logger.Error("failed to create execution record", "error", err) + } + if err := p.store.UpdateTaskState(t.ID, task.StateRunning); err != nil { + p.logger.Error("failed to update task state", "error", err) + } + + // Apply task timeout. + var cancel context.CancelFunc + if t.Timeout.Duration > 0 { + ctx, cancel = context.WithTimeout(ctx, t.Timeout.Duration) + } else { + ctx, cancel = context.WithCancel(ctx) + } + defer cancel() + + // Run the task. + err := p.runner.Run(ctx, t, exec) + exec.EndTime = time.Now().UTC() + + if err != nil { + if ctx.Err() == context.DeadlineExceeded { + exec.Status = "TIMED_OUT" + exec.ErrorMsg = "execution timed out" + p.store.UpdateTaskState(t.ID, task.StateTimedOut) + } else if ctx.Err() == context.Canceled { + exec.Status = "CANCELLED" + exec.ErrorMsg = "execution cancelled" + p.store.UpdateTaskState(t.ID, task.StateCancelled) + } else { + exec.Status = "FAILED" + exec.ErrorMsg = err.Error() + p.store.UpdateTaskState(t.ID, task.StateFailed) + } + } else { + exec.Status = "COMPLETED" + p.store.UpdateTaskState(t.ID, task.StateCompleted) + } + + if updateErr := p.store.UpdateExecution(exec); updateErr != nil { + p.logger.Error("failed to update execution", "error", updateErr) + } + + p.mu.Lock() + p.active-- + p.mu.Unlock() + + p.resultCh <- &Result{TaskID: t.ID, Execution: exec, Err: err} +} -- cgit v1.2.3