queue.go 1.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546
  1. package cmd
  2. import (
  3. "context"
  4. "gadmin/internal/admin/consts"
  5. "gadmin/internal/admin/library/nats"
  6. "gadmin/internal/queues"
  7. "gadmin/internal/queues/subscribes"
  8. "os"
  9. "os/signal"
  10. "github.com/sirupsen/logrus"
  11. "github.com/spf13/cobra"
  12. )
  13. var queueCmd = &cobra.Command{
  14. Use: "queue",
  15. Short: "消息队列消费者监听",
  16. Long: ``,
  17. Run: func(cmd *cobra.Command, args []string) {
  18. logrus.Info("queue called")
  19. ctx, cancel := context.WithCancel(context.Background())
  20. defer cancel()
  21. go func(ctx context.Context) {
  22. client := nats.New(os.Getenv("NATS_URL"), os.Getenv("NATS_STREAM"))
  23. if client == nil {
  24. logrus.Panic("nats initialization failed")
  25. }
  26. subMap := map[string]queues.SubStrategy{
  27. client.GenTopicLabel(consts.NatsTopicAdminLog): subscribes.AdminLog, // 后台日志
  28. }
  29. client.AddSubscriber(subMap).Subscribe(ctx)
  30. }(ctx)
  31. c := make(chan os.Signal, 1)
  32. signal.Notify(c, os.Interrupt, os.Kill)
  33. sig := <-c
  34. logrus.Printf("queue closing down (signal: %v)", sig)
  35. },
  36. }
  37. func init() {
  38. rootCmd.AddCommand(queueCmd)
  39. }