diff options
Diffstat (limited to 'internal/cli/serve.go')
| -rw-r--r-- | internal/cli/serve.go | 126 |
1 files changed, 106 insertions, 20 deletions
diff --git a/internal/cli/serve.go b/internal/cli/serve.go index 5101b81..459c35b 100644 --- a/internal/cli/serve.go +++ b/internal/cli/serve.go @@ -35,6 +35,8 @@ func newServeCmd() *cobra.Command { cmd.Flags().StringVar(&addr, "addr", ":8484", "listen address") cmd.Flags().StringVar(&workspaceRoot, "workspace-root", "/workspace", "root directory for listing workspaces") + cmd.Flags().StringVar(&cfg.ClaudeImage, "claude-image", cfg.ClaudeImage, "docker image for claude agents") + cmd.Flags().StringVar(&cfg.GeminiImage, "gemini-image", cfg.GeminiImage, "docker image for gemini agents") return cmd } @@ -50,25 +52,68 @@ func serve(addr string) error { } defer store.Close() + // Load VAPID keys from DB; generate and persist if missing. + if cfg.VAPIDPublicKey == "" || cfg.VAPIDPrivateKey == "" { + pub, _ := store.GetSetting("vapid_public_key") + priv, _ := store.GetSetting("vapid_private_key") + if pub == "" || priv == "" || !notify.ValidateVAPIDPublicKey(pub) { + pub, priv, err = notify.GenerateVAPIDKeys() + if err != nil { + return fmt.Errorf("generating VAPID keys: %w", err) + } + _ = store.SetSetting("vapid_public_key", pub) + _ = store.SetSetting("vapid_private_key", priv) + } + cfg.VAPIDPublicKey = pub + cfg.VAPIDPrivateKey = priv + } + logger := newLogger(verbose) apiURL := "http://localhost" + addr if len(addr) > 0 && addr[0] != ':' { apiURL = "http://" + addr } - + + // Use configured credentials dir; sync-credentials keeps this populated. + claudeConfigDir := cfg.ClaudeConfigDir + + repoDir, _ := os.Getwd() runners := map[string]executor.Runner{ - "claude": &executor.ClaudeRunner{ - BinaryPath: cfg.ClaudeBinaryPath, - Logger: logger, - LogDir: cfg.LogDir, - APIURL: apiURL, + // ContainerRunner: binaries are resolved via PATH inside the container image, + // so ClaudeBinary/GeminiBinary are left empty (host paths would not exist inside). + "claude": &executor.ContainerRunner{ + Image: cfg.ClaudeImage, + Logger: logger, + LogDir: cfg.LogDir, + APIURL: apiURL, + DropsDir: cfg.DropsDir, + SSHAuthSock: cfg.SSHAuthSock, + ClaudeConfigDir: claudeConfigDir, + CredentialSyncCmd: filepath.Join(repoDir, "scripts", "sync-credentials"), + Store: store, + }, + "gemini": &executor.ContainerRunner{ + Image: cfg.GeminiImage, + Logger: logger, + LogDir: cfg.LogDir, + APIURL: apiURL, + DropsDir: cfg.DropsDir, + SSHAuthSock: cfg.SSHAuthSock, + ClaudeConfigDir: claudeConfigDir, + CredentialSyncCmd: filepath.Join(repoDir, "scripts", "sync-credentials"), + Store: store, }, - "gemini": &executor.GeminiRunner{ - BinaryPath: cfg.GeminiBinaryPath, - Logger: logger, - LogDir: cfg.LogDir, - APIURL: apiURL, + "container": &executor.ContainerRunner{ + Image: "claudomator-agent:latest", + Logger: logger, + LogDir: cfg.LogDir, + APIURL: apiURL, + DropsDir: cfg.DropsDir, + SSHAuthSock: cfg.SSHAuthSock, + ClaudeConfigDir: claudeConfigDir, + CredentialSyncCmd: filepath.Join(repoDir, "scripts", "sync-credentials"), + Store: store, }, } @@ -83,6 +128,7 @@ func serve(addr string) error { logger.Info("local runner registered", "endpoint", cfg.LocalModel.Endpoint, "model", cfg.LocalModel.Model) } + pool := executor.NewPool(cfg.MaxConcurrent, runners, store, logger) pool.Classifier = &executor.Classifier{ LLM: localClient, @@ -91,14 +137,36 @@ func serve(addr string) error { if localClient != nil { pool.LLM = localClient } + + if err := store.SeedProjects(); err != nil { + logger.Error("failed to seed projects", "error", err) + } + pool.RecoverStaleRunning(context.Background()) pool.RecoverStaleQueued(context.Background()) pool.RecoverStaleBlocked() srv := api.NewServer(store, pool, logger, cfg.ClaudeBinaryPath, cfg.GeminiBinaryPath) + + // Configure notifiers: combine webhook (if set) with web push. + notifiers := []notify.Notifier{} if cfg.WebhookURL != "" { - srv.SetNotifier(notify.NewWebhookNotifier(cfg.WebhookURL, logger)) + notifiers = append(notifiers, notify.NewWebhookNotifier(cfg.WebhookURL, logger)) + } + webPushNotifier := ¬ify.WebPushNotifier{ + Store: store, + VAPIDPublicKey: cfg.VAPIDPublicKey, + VAPIDPrivateKey: cfg.VAPIDPrivateKey, + VAPIDEmail: cfg.VAPIDEmail, + Logger: logger, } + notifiers = append(notifiers, webPushNotifier) + srv.SetNotifier(notify.NewMultiNotifier(logger, notifiers...)) + + srv.SetVAPIDConfig(cfg.VAPIDPublicKey, cfg.VAPIDPrivateKey, cfg.VAPIDEmail) + srv.SetPushStore(store) + srv.SetDropsDir(cfg.DropsDir) + if cfg.WorkspaceRoot != "" { srv.SetWorkspaceRoot(cfg.WorkspaceRoot) } @@ -115,6 +183,11 @@ func serve(addr string) error { "deploy": filepath.Join(wd, "scripts", "deploy"), }) + // Graceful shutdown. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + srv.SetContext(ctx) srv.StartHub() httpSrv := &http.Server{ @@ -122,19 +195,31 @@ func serve(addr string) error { Handler: srv.Handler(), } - // Graceful shutdown. - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + workerTimeout := 3 * time.Minute + if cfg.ShutdownTimeout > 0 { + workerTimeout = cfg.ShutdownTimeout + } sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) go func() { <-sigCh - logger.Info("shutting down server...") - shutdownCtx, shutdownCancel := context.WithTimeout(ctx, 5*time.Second) - defer shutdownCancel() - if err := httpSrv.Shutdown(shutdownCtx); err != nil { - logger.Warn("shutdown error", "err", err) + logger.Info("shutting down: draining workers...", "timeout", workerTimeout) + + // Stop the HTTP server so no new requests come in. + httpCtx, httpCancel := context.WithTimeout(ctx, 5*time.Second) + defer httpCancel() + if err := httpSrv.Shutdown(httpCtx); err != nil { + logger.Warn("http shutdown error", "err", err) + } + + // Wait for in-flight task workers to finish. + workerCtx, workerCancel := context.WithTimeout(context.Background(), workerTimeout) + defer workerCancel() + if err := srv.Pool().Shutdown(workerCtx); err != nil { + logger.Warn("worker drain timed out", "err", err) + } else { + logger.Info("all workers finished cleanly") } }() @@ -144,3 +229,4 @@ func serve(addr string) error { } return nil } + |
