summaryrefslogtreecommitdiff
path: root/internal/cli
diff options
context:
space:
mode:
Diffstat (limited to 'internal/cli')
-rw-r--r--internal/cli/list.go6
-rw-r--r--internal/cli/project_test.go102
-rw-r--r--internal/cli/root.go2
-rw-r--r--internal/cli/run.go32
-rw-r--r--internal/cli/serve.go126
-rw-r--r--internal/cli/status.go3
-rw-r--r--internal/cli/version.go18
7 files changed, 258 insertions, 31 deletions
diff --git a/internal/cli/list.go b/internal/cli/list.go
index 3425388..ab80868 100644
--- a/internal/cli/list.go
+++ b/internal/cli/list.go
@@ -49,10 +49,10 @@ func listTasks(state string) error {
}
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
- fmt.Fprintln(w, "ID\tNAME\tSTATE\tPRIORITY\tCREATED")
+ fmt.Fprintln(w, "ID\tNAME\tPROJECT\tSTATE\tPRIORITY\tCREATED")
for _, t := range tasks {
- fmt.Fprintf(w, "%.8s\t%s\t%s\t%s\t%s\n",
- t.ID, t.Name, t.State, t.Priority, t.CreatedAt.Format("2006-01-02 15:04"))
+ fmt.Fprintf(w, "%.8s\t%s\t%s\t%s\t%s\t%s\n",
+ t.ID, t.Name, t.Project, t.State, t.Priority, t.CreatedAt.Format("2006-01-02 15:04"))
}
w.Flush()
return nil
diff --git a/internal/cli/project_test.go b/internal/cli/project_test.go
new file mode 100644
index 0000000..c62e181
--- /dev/null
+++ b/internal/cli/project_test.go
@@ -0,0 +1,102 @@
+package cli
+
+import (
+ "bytes"
+ "io"
+ "os"
+ "path/filepath"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/thepeterstone/claudomator/internal/config"
+ "github.com/thepeterstone/claudomator/internal/storage"
+ "github.com/thepeterstone/claudomator/internal/task"
+)
+
+func makeProjectTask(t *testing.T, dir string) *task.Task {
+ t.Helper()
+ db, err := storage.Open(filepath.Join(dir, "test.db"))
+ if err != nil {
+ t.Fatalf("storage.Open: %v", err)
+ }
+ defer db.Close()
+
+ now := time.Now().UTC()
+ tk := &task.Task{
+ ID: "proj-task-id",
+ Name: "Project Task",
+ Project: "test-project",
+ Agent: task.AgentConfig{Type: "claude", Instructions: "do it", Model: "sonnet"},
+ Priority: task.PriorityNormal,
+ Tags: []string{},
+ DependsOn: []string{},
+ Retry: task.RetryConfig{MaxAttempts: 1, Backoff: "exponential"},
+ State: task.StatePending,
+ CreatedAt: now,
+ UpdatedAt: now,
+ }
+ if err := db.CreateTask(tk); err != nil {
+ t.Fatalf("CreateTask: %v", err)
+ }
+ return tk
+}
+
+func captureStdout(fn func()) string {
+ old := os.Stdout
+ r, w, _ := os.Pipe()
+ os.Stdout = w
+
+ fn()
+
+ w.Close()
+ os.Stdout = old
+ var buf bytes.Buffer
+ io.Copy(&buf, r)
+ return buf.String()
+}
+
+func withDB(t *testing.T, dbPath string, fn func()) {
+ t.Helper()
+ origCfg := cfg
+ if cfg == nil {
+ cfg = &config.Config{}
+ }
+ cfg.DBPath = dbPath
+ defer func() { cfg = origCfg }()
+ fn()
+}
+
+func TestListTasks_ShowsProject(t *testing.T) {
+ dir := t.TempDir()
+ dbPath := filepath.Join(dir, "test.db")
+ makeProjectTask(t, dir)
+
+ withDB(t, dbPath, func() {
+ out := captureStdout(func() {
+ if err := listTasks(""); err != nil {
+ t.Fatalf("listTasks: %v", err)
+ }
+ })
+ if !strings.Contains(out, "test-project") {
+ t.Errorf("list output missing project 'test-project':\n%s", out)
+ }
+ })
+}
+
+func TestStatusCmd_ShowsProject(t *testing.T) {
+ dir := t.TempDir()
+ dbPath := filepath.Join(dir, "test.db")
+ tk := makeProjectTask(t, dir)
+
+ withDB(t, dbPath, func() {
+ out := captureStdout(func() {
+ if err := showStatus(tk.ID); err != nil {
+ t.Fatalf("showStatus: %v", err)
+ }
+ })
+ if !strings.Contains(out, "test-project") {
+ t.Errorf("status output missing project 'test-project':\n%s", out)
+ }
+ })
+}
diff --git a/internal/cli/root.go b/internal/cli/root.go
index 7c4f2ff..e57a9d9 100644
--- a/internal/cli/root.go
+++ b/internal/cli/root.go
@@ -60,6 +60,7 @@ func NewRootCmd() *cobra.Command {
}
cfg.DBPath = filepath.Join(cfg.DataDir, "claudomator.db")
cfg.LogDir = filepath.Join(cfg.DataDir, "executions")
+ cfg.DropsDir = filepath.Join(cfg.DataDir, "drops")
return nil
}
@@ -73,6 +74,7 @@ func NewRootCmd() *cobra.Command {
newStartCmd(),
newCreateCmd(),
newReportCmd(),
+ newVersionCmd(),
)
return cmd
diff --git a/internal/cli/run.go b/internal/cli/run.go
index 2d7c3d7..48f34b7 100644
--- a/internal/cli/run.go
+++ b/internal/cli/run.go
@@ -72,16 +72,31 @@ func runTasks(file string, parallel int, dryRun bool) error {
logger := newLogger(verbose)
+ apiURL := "http://localhost" + cfg.ServerAddr
+ if len(cfg.ServerAddr) > 0 && cfg.ServerAddr[0] != ':' {
+ apiURL = "http://" + cfg.ServerAddr
+ }
+
runners := map[string]executor.Runner{
- "claude": &executor.ClaudeRunner{
- BinaryPath: cfg.ClaudeBinaryPath,
- Logger: logger,
- LogDir: cfg.LogDir,
+ "claude": &executor.ContainerRunner{
+ Image: cfg.ClaudeImage,
+ Logger: logger,
+ LogDir: cfg.LogDir,
+ APIURL: apiURL,
+ DropsDir: cfg.DropsDir,
+ SSHAuthSock: cfg.SSHAuthSock,
+ ClaudeBinary: cfg.ClaudeBinaryPath,
+ GeminiBinary: cfg.GeminiBinaryPath,
},
- "gemini": &executor.GeminiRunner{
- BinaryPath: cfg.GeminiBinaryPath,
- Logger: logger,
- LogDir: cfg.LogDir,
+ "gemini": &executor.ContainerRunner{
+ Image: cfg.GeminiImage,
+ Logger: logger,
+ LogDir: cfg.LogDir,
+ APIURL: apiURL,
+ DropsDir: cfg.DropsDir,
+ SSHAuthSock: cfg.SSHAuthSock,
+ ClaudeBinary: cfg.ClaudeBinaryPath,
+ GeminiBinary: cfg.GeminiBinaryPath,
},
}
@@ -95,6 +110,7 @@ func runTasks(file string, parallel int, dryRun bool) error {
}
}
+
pool := executor.NewPool(parallel, runners, store, logger)
pool.Classifier = &executor.Classifier{
LLM: localClient,
diff --git a/internal/cli/serve.go b/internal/cli/serve.go
index 5101b81..459c35b 100644
--- a/internal/cli/serve.go
+++ b/internal/cli/serve.go
@@ -35,6 +35,8 @@ func newServeCmd() *cobra.Command {
cmd.Flags().StringVar(&addr, "addr", ":8484", "listen address")
cmd.Flags().StringVar(&workspaceRoot, "workspace-root", "/workspace", "root directory for listing workspaces")
+ cmd.Flags().StringVar(&cfg.ClaudeImage, "claude-image", cfg.ClaudeImage, "docker image for claude agents")
+ cmd.Flags().StringVar(&cfg.GeminiImage, "gemini-image", cfg.GeminiImage, "docker image for gemini agents")
return cmd
}
@@ -50,25 +52,68 @@ func serve(addr string) error {
}
defer store.Close()
+ // Load VAPID keys from DB; generate and persist if missing.
+ if cfg.VAPIDPublicKey == "" || cfg.VAPIDPrivateKey == "" {
+ pub, _ := store.GetSetting("vapid_public_key")
+ priv, _ := store.GetSetting("vapid_private_key")
+ if pub == "" || priv == "" || !notify.ValidateVAPIDPublicKey(pub) {
+ pub, priv, err = notify.GenerateVAPIDKeys()
+ if err != nil {
+ return fmt.Errorf("generating VAPID keys: %w", err)
+ }
+ _ = store.SetSetting("vapid_public_key", pub)
+ _ = store.SetSetting("vapid_private_key", priv)
+ }
+ cfg.VAPIDPublicKey = pub
+ cfg.VAPIDPrivateKey = priv
+ }
+
logger := newLogger(verbose)
apiURL := "http://localhost" + addr
if len(addr) > 0 && addr[0] != ':' {
apiURL = "http://" + addr
}
-
+
+ // Use configured credentials dir; sync-credentials keeps this populated.
+ claudeConfigDir := cfg.ClaudeConfigDir
+
+ repoDir, _ := os.Getwd()
runners := map[string]executor.Runner{
- "claude": &executor.ClaudeRunner{
- BinaryPath: cfg.ClaudeBinaryPath,
- Logger: logger,
- LogDir: cfg.LogDir,
- APIURL: apiURL,
+ // ContainerRunner: binaries are resolved via PATH inside the container image,
+ // so ClaudeBinary/GeminiBinary are left empty (host paths would not exist inside).
+ "claude": &executor.ContainerRunner{
+ Image: cfg.ClaudeImage,
+ Logger: logger,
+ LogDir: cfg.LogDir,
+ APIURL: apiURL,
+ DropsDir: cfg.DropsDir,
+ SSHAuthSock: cfg.SSHAuthSock,
+ ClaudeConfigDir: claudeConfigDir,
+ CredentialSyncCmd: filepath.Join(repoDir, "scripts", "sync-credentials"),
+ Store: store,
+ },
+ "gemini": &executor.ContainerRunner{
+ Image: cfg.GeminiImage,
+ Logger: logger,
+ LogDir: cfg.LogDir,
+ APIURL: apiURL,
+ DropsDir: cfg.DropsDir,
+ SSHAuthSock: cfg.SSHAuthSock,
+ ClaudeConfigDir: claudeConfigDir,
+ CredentialSyncCmd: filepath.Join(repoDir, "scripts", "sync-credentials"),
+ Store: store,
},
- "gemini": &executor.GeminiRunner{
- BinaryPath: cfg.GeminiBinaryPath,
- Logger: logger,
- LogDir: cfg.LogDir,
- APIURL: apiURL,
+ "container": &executor.ContainerRunner{
+ Image: "claudomator-agent:latest",
+ Logger: logger,
+ LogDir: cfg.LogDir,
+ APIURL: apiURL,
+ DropsDir: cfg.DropsDir,
+ SSHAuthSock: cfg.SSHAuthSock,
+ ClaudeConfigDir: claudeConfigDir,
+ CredentialSyncCmd: filepath.Join(repoDir, "scripts", "sync-credentials"),
+ Store: store,
},
}
@@ -83,6 +128,7 @@ func serve(addr string) error {
logger.Info("local runner registered", "endpoint", cfg.LocalModel.Endpoint, "model", cfg.LocalModel.Model)
}
+
pool := executor.NewPool(cfg.MaxConcurrent, runners, store, logger)
pool.Classifier = &executor.Classifier{
LLM: localClient,
@@ -91,14 +137,36 @@ func serve(addr string) error {
if localClient != nil {
pool.LLM = localClient
}
+
+ if err := store.SeedProjects(); err != nil {
+ logger.Error("failed to seed projects", "error", err)
+ }
+
pool.RecoverStaleRunning(context.Background())
pool.RecoverStaleQueued(context.Background())
pool.RecoverStaleBlocked()
srv := api.NewServer(store, pool, logger, cfg.ClaudeBinaryPath, cfg.GeminiBinaryPath)
+
+ // Configure notifiers: combine webhook (if set) with web push.
+ notifiers := []notify.Notifier{}
if cfg.WebhookURL != "" {
- srv.SetNotifier(notify.NewWebhookNotifier(cfg.WebhookURL, logger))
+ notifiers = append(notifiers, notify.NewWebhookNotifier(cfg.WebhookURL, logger))
+ }
+ webPushNotifier := &notify.WebPushNotifier{
+ Store: store,
+ VAPIDPublicKey: cfg.VAPIDPublicKey,
+ VAPIDPrivateKey: cfg.VAPIDPrivateKey,
+ VAPIDEmail: cfg.VAPIDEmail,
+ Logger: logger,
}
+ notifiers = append(notifiers, webPushNotifier)
+ srv.SetNotifier(notify.NewMultiNotifier(logger, notifiers...))
+
+ srv.SetVAPIDConfig(cfg.VAPIDPublicKey, cfg.VAPIDPrivateKey, cfg.VAPIDEmail)
+ srv.SetPushStore(store)
+ srv.SetDropsDir(cfg.DropsDir)
+
if cfg.WorkspaceRoot != "" {
srv.SetWorkspaceRoot(cfg.WorkspaceRoot)
}
@@ -115,6 +183,11 @@ func serve(addr string) error {
"deploy": filepath.Join(wd, "scripts", "deploy"),
})
+ // Graceful shutdown.
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ srv.SetContext(ctx)
srv.StartHub()
httpSrv := &http.Server{
@@ -122,19 +195,31 @@ func serve(addr string) error {
Handler: srv.Handler(),
}
- // Graceful shutdown.
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
+ workerTimeout := 3 * time.Minute
+ if cfg.ShutdownTimeout > 0 {
+ workerTimeout = cfg.ShutdownTimeout
+ }
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
go func() {
<-sigCh
- logger.Info("shutting down server...")
- shutdownCtx, shutdownCancel := context.WithTimeout(ctx, 5*time.Second)
- defer shutdownCancel()
- if err := httpSrv.Shutdown(shutdownCtx); err != nil {
- logger.Warn("shutdown error", "err", err)
+ logger.Info("shutting down: draining workers...", "timeout", workerTimeout)
+
+ // Stop the HTTP server so no new requests come in.
+ httpCtx, httpCancel := context.WithTimeout(ctx, 5*time.Second)
+ defer httpCancel()
+ if err := httpSrv.Shutdown(httpCtx); err != nil {
+ logger.Warn("http shutdown error", "err", err)
+ }
+
+ // Wait for in-flight task workers to finish.
+ workerCtx, workerCancel := context.WithTimeout(context.Background(), workerTimeout)
+ defer workerCancel()
+ if err := srv.Pool().Shutdown(workerCtx); err != nil {
+ logger.Warn("worker drain timed out", "err", err)
+ } else {
+ logger.Info("all workers finished cleanly")
}
}()
@@ -144,3 +229,4 @@ func serve(addr string) error {
}
return nil
}
+
diff --git a/internal/cli/status.go b/internal/cli/status.go
index 16b88b0..77a30d5 100644
--- a/internal/cli/status.go
+++ b/internal/cli/status.go
@@ -39,6 +39,9 @@ func showStatus(id string) error {
fmt.Printf("State: %s\n", t.State)
fmt.Printf("Priority: %s\n", t.Priority)
fmt.Printf("Model: %s\n", t.Agent.Model)
+ if t.Project != "" {
+ fmt.Printf("Project: %s\n", t.Project)
+ }
if t.Description != "" {
fmt.Printf("Description: %s\n", t.Description)
}
diff --git a/internal/cli/version.go b/internal/cli/version.go
new file mode 100644
index 0000000..789416a
--- /dev/null
+++ b/internal/cli/version.go
@@ -0,0 +1,18 @@
+package cli
+
+import (
+ "fmt"
+
+ "github.com/thepeterstone/claudomator/internal/version"
+ "github.com/spf13/cobra"
+)
+
+func newVersionCmd() *cobra.Command {
+ return &cobra.Command{
+ Use: "version",
+ Short: "Show the version of claudomator",
+ Run: func(cmd *cobra.Command, args []string) {
+ fmt.Printf("claudomator version %s\n", version.Version())
+ },
+ }
+}