diff options
Diffstat (limited to 'internal/cli')
| -rw-r--r-- | internal/cli/list.go | 6 | ||||
| -rw-r--r-- | internal/cli/project_test.go | 102 | ||||
| -rw-r--r-- | internal/cli/root.go | 2 | ||||
| -rw-r--r-- | internal/cli/run.go | 32 | ||||
| -rw-r--r-- | internal/cli/serve.go | 126 | ||||
| -rw-r--r-- | internal/cli/status.go | 3 | ||||
| -rw-r--r-- | internal/cli/version.go | 18 |
7 files changed, 258 insertions, 31 deletions
diff --git a/internal/cli/list.go b/internal/cli/list.go index 3425388..ab80868 100644 --- a/internal/cli/list.go +++ b/internal/cli/list.go @@ -49,10 +49,10 @@ func listTasks(state string) error { } w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) - fmt.Fprintln(w, "ID\tNAME\tSTATE\tPRIORITY\tCREATED") + fmt.Fprintln(w, "ID\tNAME\tPROJECT\tSTATE\tPRIORITY\tCREATED") for _, t := range tasks { - fmt.Fprintf(w, "%.8s\t%s\t%s\t%s\t%s\n", - t.ID, t.Name, t.State, t.Priority, t.CreatedAt.Format("2006-01-02 15:04")) + fmt.Fprintf(w, "%.8s\t%s\t%s\t%s\t%s\t%s\n", + t.ID, t.Name, t.Project, t.State, t.Priority, t.CreatedAt.Format("2006-01-02 15:04")) } w.Flush() return nil diff --git a/internal/cli/project_test.go b/internal/cli/project_test.go new file mode 100644 index 0000000..c62e181 --- /dev/null +++ b/internal/cli/project_test.go @@ -0,0 +1,102 @@ +package cli + +import ( + "bytes" + "io" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/thepeterstone/claudomator/internal/config" + "github.com/thepeterstone/claudomator/internal/storage" + "github.com/thepeterstone/claudomator/internal/task" +) + +func makeProjectTask(t *testing.T, dir string) *task.Task { + t.Helper() + db, err := storage.Open(filepath.Join(dir, "test.db")) + if err != nil { + t.Fatalf("storage.Open: %v", err) + } + defer db.Close() + + now := time.Now().UTC() + tk := &task.Task{ + ID: "proj-task-id", + Name: "Project Task", + Project: "test-project", + Agent: task.AgentConfig{Type: "claude", Instructions: "do it", Model: "sonnet"}, + Priority: task.PriorityNormal, + Tags: []string{}, + DependsOn: []string{}, + Retry: task.RetryConfig{MaxAttempts: 1, Backoff: "exponential"}, + State: task.StatePending, + CreatedAt: now, + UpdatedAt: now, + } + if err := db.CreateTask(tk); err != nil { + t.Fatalf("CreateTask: %v", err) + } + return tk +} + +func captureStdout(fn func()) string { + old := os.Stdout + r, w, _ := os.Pipe() + os.Stdout = w + + fn() + + w.Close() + os.Stdout = old + var buf bytes.Buffer + io.Copy(&buf, r) + return buf.String() +} + +func withDB(t *testing.T, dbPath string, fn func()) { + t.Helper() + origCfg := cfg + if cfg == nil { + cfg = &config.Config{} + } + cfg.DBPath = dbPath + defer func() { cfg = origCfg }() + fn() +} + +func TestListTasks_ShowsProject(t *testing.T) { + dir := t.TempDir() + dbPath := filepath.Join(dir, "test.db") + makeProjectTask(t, dir) + + withDB(t, dbPath, func() { + out := captureStdout(func() { + if err := listTasks(""); err != nil { + t.Fatalf("listTasks: %v", err) + } + }) + if !strings.Contains(out, "test-project") { + t.Errorf("list output missing project 'test-project':\n%s", out) + } + }) +} + +func TestStatusCmd_ShowsProject(t *testing.T) { + dir := t.TempDir() + dbPath := filepath.Join(dir, "test.db") + tk := makeProjectTask(t, dir) + + withDB(t, dbPath, func() { + out := captureStdout(func() { + if err := showStatus(tk.ID); err != nil { + t.Fatalf("showStatus: %v", err) + } + }) + if !strings.Contains(out, "test-project") { + t.Errorf("status output missing project 'test-project':\n%s", out) + } + }) +} diff --git a/internal/cli/root.go b/internal/cli/root.go index 7c4f2ff..e57a9d9 100644 --- a/internal/cli/root.go +++ b/internal/cli/root.go @@ -60,6 +60,7 @@ func NewRootCmd() *cobra.Command { } cfg.DBPath = filepath.Join(cfg.DataDir, "claudomator.db") cfg.LogDir = filepath.Join(cfg.DataDir, "executions") + cfg.DropsDir = filepath.Join(cfg.DataDir, "drops") return nil } @@ -73,6 +74,7 @@ func NewRootCmd() *cobra.Command { newStartCmd(), newCreateCmd(), newReportCmd(), + newVersionCmd(), ) return cmd diff --git a/internal/cli/run.go b/internal/cli/run.go index 2d7c3d7..48f34b7 100644 --- a/internal/cli/run.go +++ b/internal/cli/run.go @@ -72,16 +72,31 @@ func runTasks(file string, parallel int, dryRun bool) error { logger := newLogger(verbose) + apiURL := "http://localhost" + cfg.ServerAddr + if len(cfg.ServerAddr) > 0 && cfg.ServerAddr[0] != ':' { + apiURL = "http://" + cfg.ServerAddr + } + runners := map[string]executor.Runner{ - "claude": &executor.ClaudeRunner{ - BinaryPath: cfg.ClaudeBinaryPath, - Logger: logger, - LogDir: cfg.LogDir, + "claude": &executor.ContainerRunner{ + Image: cfg.ClaudeImage, + Logger: logger, + LogDir: cfg.LogDir, + APIURL: apiURL, + DropsDir: cfg.DropsDir, + SSHAuthSock: cfg.SSHAuthSock, + ClaudeBinary: cfg.ClaudeBinaryPath, + GeminiBinary: cfg.GeminiBinaryPath, }, - "gemini": &executor.GeminiRunner{ - BinaryPath: cfg.GeminiBinaryPath, - Logger: logger, - LogDir: cfg.LogDir, + "gemini": &executor.ContainerRunner{ + Image: cfg.GeminiImage, + Logger: logger, + LogDir: cfg.LogDir, + APIURL: apiURL, + DropsDir: cfg.DropsDir, + SSHAuthSock: cfg.SSHAuthSock, + ClaudeBinary: cfg.ClaudeBinaryPath, + GeminiBinary: cfg.GeminiBinaryPath, }, } @@ -95,6 +110,7 @@ func runTasks(file string, parallel int, dryRun bool) error { } } + pool := executor.NewPool(parallel, runners, store, logger) pool.Classifier = &executor.Classifier{ LLM: localClient, diff --git a/internal/cli/serve.go b/internal/cli/serve.go index 5101b81..459c35b 100644 --- a/internal/cli/serve.go +++ b/internal/cli/serve.go @@ -35,6 +35,8 @@ func newServeCmd() *cobra.Command { cmd.Flags().StringVar(&addr, "addr", ":8484", "listen address") cmd.Flags().StringVar(&workspaceRoot, "workspace-root", "/workspace", "root directory for listing workspaces") + cmd.Flags().StringVar(&cfg.ClaudeImage, "claude-image", cfg.ClaudeImage, "docker image for claude agents") + cmd.Flags().StringVar(&cfg.GeminiImage, "gemini-image", cfg.GeminiImage, "docker image for gemini agents") return cmd } @@ -50,25 +52,68 @@ func serve(addr string) error { } defer store.Close() + // Load VAPID keys from DB; generate and persist if missing. + if cfg.VAPIDPublicKey == "" || cfg.VAPIDPrivateKey == "" { + pub, _ := store.GetSetting("vapid_public_key") + priv, _ := store.GetSetting("vapid_private_key") + if pub == "" || priv == "" || !notify.ValidateVAPIDPublicKey(pub) { + pub, priv, err = notify.GenerateVAPIDKeys() + if err != nil { + return fmt.Errorf("generating VAPID keys: %w", err) + } + _ = store.SetSetting("vapid_public_key", pub) + _ = store.SetSetting("vapid_private_key", priv) + } + cfg.VAPIDPublicKey = pub + cfg.VAPIDPrivateKey = priv + } + logger := newLogger(verbose) apiURL := "http://localhost" + addr if len(addr) > 0 && addr[0] != ':' { apiURL = "http://" + addr } - + + // Use configured credentials dir; sync-credentials keeps this populated. + claudeConfigDir := cfg.ClaudeConfigDir + + repoDir, _ := os.Getwd() runners := map[string]executor.Runner{ - "claude": &executor.ClaudeRunner{ - BinaryPath: cfg.ClaudeBinaryPath, - Logger: logger, - LogDir: cfg.LogDir, - APIURL: apiURL, + // ContainerRunner: binaries are resolved via PATH inside the container image, + // so ClaudeBinary/GeminiBinary are left empty (host paths would not exist inside). + "claude": &executor.ContainerRunner{ + Image: cfg.ClaudeImage, + Logger: logger, + LogDir: cfg.LogDir, + APIURL: apiURL, + DropsDir: cfg.DropsDir, + SSHAuthSock: cfg.SSHAuthSock, + ClaudeConfigDir: claudeConfigDir, + CredentialSyncCmd: filepath.Join(repoDir, "scripts", "sync-credentials"), + Store: store, + }, + "gemini": &executor.ContainerRunner{ + Image: cfg.GeminiImage, + Logger: logger, + LogDir: cfg.LogDir, + APIURL: apiURL, + DropsDir: cfg.DropsDir, + SSHAuthSock: cfg.SSHAuthSock, + ClaudeConfigDir: claudeConfigDir, + CredentialSyncCmd: filepath.Join(repoDir, "scripts", "sync-credentials"), + Store: store, }, - "gemini": &executor.GeminiRunner{ - BinaryPath: cfg.GeminiBinaryPath, - Logger: logger, - LogDir: cfg.LogDir, - APIURL: apiURL, + "container": &executor.ContainerRunner{ + Image: "claudomator-agent:latest", + Logger: logger, + LogDir: cfg.LogDir, + APIURL: apiURL, + DropsDir: cfg.DropsDir, + SSHAuthSock: cfg.SSHAuthSock, + ClaudeConfigDir: claudeConfigDir, + CredentialSyncCmd: filepath.Join(repoDir, "scripts", "sync-credentials"), + Store: store, }, } @@ -83,6 +128,7 @@ func serve(addr string) error { logger.Info("local runner registered", "endpoint", cfg.LocalModel.Endpoint, "model", cfg.LocalModel.Model) } + pool := executor.NewPool(cfg.MaxConcurrent, runners, store, logger) pool.Classifier = &executor.Classifier{ LLM: localClient, @@ -91,14 +137,36 @@ func serve(addr string) error { if localClient != nil { pool.LLM = localClient } + + if err := store.SeedProjects(); err != nil { + logger.Error("failed to seed projects", "error", err) + } + pool.RecoverStaleRunning(context.Background()) pool.RecoverStaleQueued(context.Background()) pool.RecoverStaleBlocked() srv := api.NewServer(store, pool, logger, cfg.ClaudeBinaryPath, cfg.GeminiBinaryPath) + + // Configure notifiers: combine webhook (if set) with web push. + notifiers := []notify.Notifier{} if cfg.WebhookURL != "" { - srv.SetNotifier(notify.NewWebhookNotifier(cfg.WebhookURL, logger)) + notifiers = append(notifiers, notify.NewWebhookNotifier(cfg.WebhookURL, logger)) + } + webPushNotifier := ¬ify.WebPushNotifier{ + Store: store, + VAPIDPublicKey: cfg.VAPIDPublicKey, + VAPIDPrivateKey: cfg.VAPIDPrivateKey, + VAPIDEmail: cfg.VAPIDEmail, + Logger: logger, } + notifiers = append(notifiers, webPushNotifier) + srv.SetNotifier(notify.NewMultiNotifier(logger, notifiers...)) + + srv.SetVAPIDConfig(cfg.VAPIDPublicKey, cfg.VAPIDPrivateKey, cfg.VAPIDEmail) + srv.SetPushStore(store) + srv.SetDropsDir(cfg.DropsDir) + if cfg.WorkspaceRoot != "" { srv.SetWorkspaceRoot(cfg.WorkspaceRoot) } @@ -115,6 +183,11 @@ func serve(addr string) error { "deploy": filepath.Join(wd, "scripts", "deploy"), }) + // Graceful shutdown. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + srv.SetContext(ctx) srv.StartHub() httpSrv := &http.Server{ @@ -122,19 +195,31 @@ func serve(addr string) error { Handler: srv.Handler(), } - // Graceful shutdown. - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + workerTimeout := 3 * time.Minute + if cfg.ShutdownTimeout > 0 { + workerTimeout = cfg.ShutdownTimeout + } sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) go func() { <-sigCh - logger.Info("shutting down server...") - shutdownCtx, shutdownCancel := context.WithTimeout(ctx, 5*time.Second) - defer shutdownCancel() - if err := httpSrv.Shutdown(shutdownCtx); err != nil { - logger.Warn("shutdown error", "err", err) + logger.Info("shutting down: draining workers...", "timeout", workerTimeout) + + // Stop the HTTP server so no new requests come in. + httpCtx, httpCancel := context.WithTimeout(ctx, 5*time.Second) + defer httpCancel() + if err := httpSrv.Shutdown(httpCtx); err != nil { + logger.Warn("http shutdown error", "err", err) + } + + // Wait for in-flight task workers to finish. + workerCtx, workerCancel := context.WithTimeout(context.Background(), workerTimeout) + defer workerCancel() + if err := srv.Pool().Shutdown(workerCtx); err != nil { + logger.Warn("worker drain timed out", "err", err) + } else { + logger.Info("all workers finished cleanly") } }() @@ -144,3 +229,4 @@ func serve(addr string) error { } return nil } + diff --git a/internal/cli/status.go b/internal/cli/status.go index 16b88b0..77a30d5 100644 --- a/internal/cli/status.go +++ b/internal/cli/status.go @@ -39,6 +39,9 @@ func showStatus(id string) error { fmt.Printf("State: %s\n", t.State) fmt.Printf("Priority: %s\n", t.Priority) fmt.Printf("Model: %s\n", t.Agent.Model) + if t.Project != "" { + fmt.Printf("Project: %s\n", t.Project) + } if t.Description != "" { fmt.Printf("Description: %s\n", t.Description) } diff --git a/internal/cli/version.go b/internal/cli/version.go new file mode 100644 index 0000000..789416a --- /dev/null +++ b/internal/cli/version.go @@ -0,0 +1,18 @@ +package cli + +import ( + "fmt" + + "github.com/thepeterstone/claudomator/internal/version" + "github.com/spf13/cobra" +) + +func newVersionCmd() *cobra.Command { + return &cobra.Command{ + Use: "version", + Short: "Show the version of claudomator", + Run: func(cmd *cobra.Command, args []string) { + fmt.Printf("claudomator version %s\n", version.Version()) + }, + } +} |
