client.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. package natsjs
  2. import (
  3. "leafstalk/log"
  4. "strings"
  5. "time"
  6. "github.com/nats-io/nats.go"
  7. )
  8. type NatsClient struct {
  9. Servers []string
  10. ClientName string
  11. Conn *nats.Conn
  12. JetStream nats.JetStreamContext
  13. dropped int
  14. }
  15. // var natsClient *NatsClient
  16. func New(servers []string, clientName string) *NatsClient {
  17. client := new(NatsClient)
  18. client.Servers = servers
  19. client.ClientName = clientName
  20. return client
  21. }
  22. func (ns *NatsClient) ServerString() string {
  23. return strings.Join(ns.Servers, ",")
  24. }
  25. func (ns *NatsClient) ConnectServer(opts ...nats.Option) error {
  26. conn, err := nats.Connect(
  27. strings.Join(ns.Servers, ","),
  28. append([]nats.Option{
  29. nats.Name(ns.ClientName),
  30. nats.MaxReconnects(-1),
  31. nats.DisconnectErrHandler(func(_ *nats.Conn, err error) {
  32. log.Warnf("disconnected from nats! %v", err)
  33. }),
  34. nats.ReconnectHandler(func(nc *nats.Conn) {
  35. log.Warnf("reconnected to nats %s!", nc.ConnectedUrl())
  36. }),
  37. nats.ClosedHandler(func(nc *nats.Conn) {
  38. err := nc.LastError()
  39. if err == nil {
  40. log.Warn("nats connection closed with no error.")
  41. return
  42. }
  43. log.Errorf("nats connection closed. reason: %q", nc.LastError())
  44. }),
  45. nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) {
  46. log.Warnf("nats ErrorHandler: %v", err)
  47. }),
  48. }, opts...)...,
  49. )
  50. if err != nil {
  51. return err
  52. }
  53. ns.Conn = conn
  54. js, err := ns.Conn.JetStream()
  55. if err != nil {
  56. return err
  57. }
  58. ns.JetStream = js
  59. return nil
  60. }
  61. func (ns *NatsClient) Stop() {
  62. if ns.Conn != nil {
  63. ns.Conn.Close()
  64. ns.Conn = nil
  65. }
  66. }
  67. func (ns *NatsClient) CreateStream(streamName string, subjects ...string) error {
  68. // 检查流是否存在; 不存在就创建
  69. stream, err := ns.JetStream.StreamInfo(streamName)
  70. if err != nil {
  71. //不存在就创建
  72. log.Info(err)
  73. }
  74. var lst []string
  75. if len(subjects) == 0 {
  76. subject := streamName + ".>"
  77. subject = strings.ToLower(subject)
  78. lst = append(lst, subject)
  79. } else {
  80. lst = append(lst, subjects...)
  81. }
  82. if stream == nil {
  83. log.Infof("creating stream %q and subjects %q", streamName, subjects)
  84. _, err = ns.JetStream.AddStream(&nats.StreamConfig{
  85. Name: streamName,
  86. Subjects: lst,
  87. MaxAge: time.Hour * 24 * 30,
  88. })
  89. if err != nil {
  90. return err
  91. }
  92. }
  93. return nil
  94. }
  95. func (ns *NatsClient) AddConsumer(streamName string, consumer string) error {
  96. _, err := ns.JetStream.AddConsumer(streamName, &nats.ConsumerConfig{
  97. Durable: consumer,
  98. // DeliverSubject: subject, 不需要指定,会自动生成
  99. AckPolicy: nats.AckExplicitPolicy,
  100. })
  101. return err
  102. }
  103. func (ns *NatsClient) Publish(topic string, v []byte) error {
  104. _, err := ns.JetStream.Publish(topic, v)
  105. return err
  106. }
  107. // subscribe 订阅主题
  108. func (ns *NatsClient) chanSubscribe(topic string, subChan chan *nats.Msg) (*nats.Subscription, error) {
  109. sub, err := ns.JetStream.ChanSubscribe(topic, subChan)
  110. // if err == nil {
  111. // ns.subscriptions[topic] = sub
  112. // }
  113. return sub, err
  114. }
  115. func (ns *NatsClient) Subscribe(topic string, cb func(string, []byte), opts ...nats.SubOpt) (*nats.Subscription, error) {
  116. var sub *nats.Subscription
  117. var err error
  118. sub, err = ns.JetStream.Subscribe(topic, func(msg *nats.Msg) {
  119. dropped, err := sub.Dropped()
  120. if err != nil {
  121. log.Errorf("error getting number of dropped messages: %s", err.Error())
  122. }
  123. if dropped > ns.dropped {
  124. log.Warnf("[rpc server] some messages were dropped! numDropped: %d", dropped)
  125. ns.dropped = dropped
  126. }
  127. log.Debugf("subs channel dropped: %d", dropped)
  128. cb(msg.Subject, msg.Data)
  129. }, opts...)
  130. // if err == nil {
  131. // ns.subscriptions[topic] = sub
  132. // }
  133. // sub.Drain()
  134. return sub, err
  135. }
  136. // 例子
  137. // nsClient.Subscribe("players.*", func(subject string, b []byte) {
  138. // fmt.Println(subject, string(b))
  139. // switch subject {
  140. // case "players.levelChanged":
  141. // m.HandlerServer.Go("LevelChanged", b)
  142. // case "players.lastLoginServer":
  143. // m.HandlerServer.Go("LastLoginServer", b)
  144. // }
  145. // }, nats.Durable("login"), nats.AckExplicit()) //, nats.Durable("login")