package nats import ( "context" "encoding/json" "fmt" "gadmin/internal/queues" "sync" "time" "github.com/nats-io/nats.go" "github.com/sirupsen/logrus" ) type NatsClient struct { natsConn *nats.Conn jetStream nats.JetStreamContext streamGroup string streamSubjects string } var ( lock sync.Mutex subMap = map[string]queues.SubStrategy{} ) func New(natsURL string, streamName string) *NatsClient { lock.Lock() defer lock.Unlock() client := new(NatsClient) client.natsConn = getNatsConn(natsURL) client.streamGroup = streamName client.streamSubjects = streamName + ".*" if err := client.createStream(streamName); err != nil { logrus.Panicf("nats createStream err:%v", err) } return client } func getNatsConn(natsURL string) *nats.Conn { c, err := nats.Connect(natsURL) if err != nil { logrus.Panicf("nats connect err:%v", err) return nil } return c } func (client *NatsClient) AddSubscriber(lists map[string]queues.SubStrategy) *NatsClient { if len(lists) == 0 { return client } lock.Lock() defer lock.Unlock() for k, v := range lists { subMap[k] = v } return client } func (client *NatsClient) GenTopicLabel(topic string) string { return client.streamGroup + "." + topic } // Subscribe 订阅 func (client *NatsClient) Subscribe(ctx context.Context) { if len(subMap) == 0 { logrus.Panic("Groups without subscriptions configured") return } _, err := client.jetStream.Subscribe(client.streamSubjects, func(m *nats.Msg) { //logrus.Printf("received nats message, Subject:%v, Data:%+v\n", m.Subject, string(m.Data)) if h, ok := subMap[m.Subject]; ok { if err := h.Handle(m); err != nil { logrus.Errorf("nats Subscribe Handle err: %v", err) return } if err := m.Ack(); err != nil { logrus.Errorf("nats Subscribe m.Ack err: %v", err) return } return } logrus.Warnf("Consumers who have not set the theme to change, Subject: %v", m.Subject) //if err := m.Ack(); err != nil { // logrus.Errorf("nats Subscribe m.Ack err: %v", err) // return //} }, nats.Durable(client.streamGroup), nats.ManualAck()) if err != nil { logrus.Panicf("nats Subscribe err: %v", err) return } <-ctx.Done() logrus.Println("nats Subscribe quit..") } // Publish 发布 func (client *NatsClient) Publish(topic string, v interface{}) error { c, err := nats.NewEncodedConn(client.natsConn, "json") if err != nil { logrus.Warnf("nats NewEncodedConn err:%v", err) return err } if err := c.Publish(client.streamGroup+"."+topic, v); err != nil { logrus.Warnf("nats Publish err:%v", err) return err } return nil } func (client *NatsClient) createStream(streamName string) error { jetStream, err := client.natsConn.JetStream() if err != nil { return err } client.jetStream = jetStream // 检查流是否存在; 不存在就创建 stream, err := client.jetStream.StreamInfo(streamName) if err != nil { logrus.Infof("nats StreamInfo err: %v:", err) } if stream == nil { logrus.Infof("creating stream %q and subjects %q", streamName, client.streamSubjects) _, err = client.jetStream.AddStream(&nats.StreamConfig{ Name: streamName, Subjects: []string{client.streamSubjects}, MaxAge: time.Hour * 24 * 30, }) if err != nil { return err } } return nil } // GmRequest gm请求 func (client *NatsClient) GmRequest(serverId int, method string, v interface{}) (b []byte, err error) { marshal, err := json.Marshal(&v) if err != nil { return } newMsg := new(nats.Msg) newMsg.Subject = fmt.Sprintf("gm%v", serverId) newMsg.Header = make(nats.Header) newMsg.Header.Set("method", method) newMsg.Data = marshal logrus.Warnf("GmRequest subject:%+v, method:%v, data:%v", newMsg.Subject, method, string(marshal)) msg, err := client.natsConn.RequestMsg(newMsg, 10*time.Second) if err != nil { return } logrus.Warnf("GmRequest response:%v", string(msg.Data)) return msg.Data, nil } func (client *NatsClient) GmLoginServRequest(method string, v interface{}) (b []byte, err error) { marshal, err := json.Marshal(&v) if err != nil { return } newMsg := new(nats.Msg) newMsg.Subject = "gmlogin" newMsg.Header = make(nats.Header) newMsg.Header.Set("method", method) newMsg.Data = marshal logrus.Warnf("GmLoginServRequest subject:%+v, method:%v, data:%v", newMsg.Subject, method, string(marshal)) msg, err := client.natsConn.RequestMsg(newMsg, 10*time.Second) if err != nil { return } logrus.Warnf("GmLoginServRequest response:%v", string(msg.Data)) return msg.Data, nil } func (client *NatsClient) GetConn() *nats.Conn { if client == nil { return nil } return client.natsConn }