123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196 |
- package nats
- import (
- "context"
- "encoding/json"
- "fmt"
- "gadmin/internal/queues"
- "sync"
- "time"
- "github.com/nats-io/nats.go"
- "github.com/sirupsen/logrus"
- )
- type NatsClient struct {
- natsConn *nats.Conn
- jetStream nats.JetStreamContext
- streamGroup string
- streamSubjects string
- }
- var (
- lock sync.Mutex
- subMap = map[string]queues.SubStrategy{}
- )
- func New(natsURL string, streamName string) *NatsClient {
- lock.Lock()
- defer lock.Unlock()
- client := new(NatsClient)
- client.natsConn = getNatsConn(natsURL)
- client.streamGroup = streamName
- client.streamSubjects = streamName + ".*"
- if err := client.createStream(streamName); err != nil {
- logrus.Panicf("nats createStream err:%v", err)
- }
- return client
- }
- func getNatsConn(natsURL string) *nats.Conn {
- c, err := nats.Connect(natsURL)
- if err != nil {
- logrus.Panicf("nats connect err:%v", err)
- return nil
- }
- return c
- }
- func (client *NatsClient) AddSubscriber(lists map[string]queues.SubStrategy) *NatsClient {
- if len(lists) == 0 {
- return client
- }
- lock.Lock()
- defer lock.Unlock()
- for k, v := range lists {
- subMap[k] = v
- }
- return client
- }
- func (client *NatsClient) GenTopicLabel(topic string) string {
- return client.streamGroup + "." + topic
- }
- // Subscribe 订阅
- func (client *NatsClient) Subscribe(ctx context.Context) {
- if len(subMap) == 0 {
- logrus.Panic("Groups without subscriptions configured")
- return
- }
- _, err := client.jetStream.Subscribe(client.streamSubjects, func(m *nats.Msg) {
- //logrus.Printf("received nats message, Subject:%v, Data:%+v\n", m.Subject, string(m.Data))
- if h, ok := subMap[m.Subject]; ok {
- if err := h.Handle(m); err != nil {
- logrus.Errorf("nats Subscribe Handle err: %v", err)
- return
- }
- if err := m.Ack(); err != nil {
- logrus.Errorf("nats Subscribe m.Ack err: %v", err)
- return
- }
- return
- }
- logrus.Warnf("Consumers who have not set the theme to change, Subject: %v", m.Subject)
- //if err := m.Ack(); err != nil {
- // logrus.Errorf("nats Subscribe m.Ack err: %v", err)
- // return
- //}
- }, nats.Durable(client.streamGroup), nats.ManualAck())
- if err != nil {
- logrus.Panicf("nats Subscribe err: %v", err)
- return
- }
- <-ctx.Done()
- logrus.Println("nats Subscribe quit..")
- }
- // Publish 发布
- func (client *NatsClient) Publish(topic string, v interface{}) error {
- c, err := nats.NewEncodedConn(client.natsConn, "json")
- if err != nil {
- logrus.Warnf("nats NewEncodedConn err:%v", err)
- return err
- }
- if err := c.Publish(client.streamGroup+"."+topic, v); err != nil {
- logrus.Warnf("nats Publish err:%v", err)
- return err
- }
- return nil
- }
- func (client *NatsClient) createStream(streamName string) error {
- jetStream, err := client.natsConn.JetStream()
- if err != nil {
- return err
- }
- client.jetStream = jetStream
- // 检查流是否存在; 不存在就创建
- stream, err := client.jetStream.StreamInfo(streamName)
- if err != nil {
- logrus.Infof("nats StreamInfo err: %v:", err)
- }
- if stream == nil {
- logrus.Infof("creating stream %q and subjects %q", streamName, client.streamSubjects)
- _, err = client.jetStream.AddStream(&nats.StreamConfig{
- Name: streamName,
- Subjects: []string{client.streamSubjects},
- MaxAge: time.Hour * 24 * 30,
- })
- if err != nil {
- return err
- }
- }
- return nil
- }
- // GmRequest gm请求
- func (client *NatsClient) GmRequest(serverId int, method string, v interface{}) (b []byte, err error) {
- marshal, err := json.Marshal(&v)
- if err != nil {
- return
- }
- newMsg := new(nats.Msg)
- newMsg.Subject = fmt.Sprintf("gm%v", serverId)
- newMsg.Header = make(nats.Header)
- newMsg.Header.Set("method", method)
- newMsg.Data = marshal
- logrus.Warnf("GmRequest subject:%+v, method:%v, data:%v", newMsg.Subject, method, string(marshal))
- msg, err := client.natsConn.RequestMsg(newMsg, 10*time.Second)
- if err != nil {
- return
- }
- logrus.Warnf("GmRequest response:%v", string(msg.Data))
- return msg.Data, nil
- }
- func (client *NatsClient) GmLoginServRequest(method string, v interface{}) (b []byte, err error) {
- marshal, err := json.Marshal(&v)
- if err != nil {
- return
- }
- newMsg := new(nats.Msg)
- newMsg.Subject = "gmlogin"
- newMsg.Header = make(nats.Header)
- newMsg.Header.Set("method", method)
- newMsg.Data = marshal
- logrus.Warnf("GmLoginServRequest subject:%+v, method:%v, data:%v", newMsg.Subject, method, string(marshal))
- msg, err := client.natsConn.RequestMsg(newMsg, 10*time.Second)
- if err != nil {
- return
- }
- logrus.Warnf("GmLoginServRequest response:%v", string(msg.Data))
- return msg.Data, nil
- }
- func (client *NatsClient) GetConn() *nats.Conn {
- if client == nil {
- return nil
- }
- return client.natsConn
- }
|