package natsjs import ( "leafstalk/log" "strings" "time" "github.com/nats-io/nats.go" ) type NatsClient struct { Servers []string ClientName string Conn *nats.Conn JetStream nats.JetStreamContext dropped int } // var natsClient *NatsClient func New(servers []string, clientName string) *NatsClient { client := new(NatsClient) client.Servers = servers client.ClientName = clientName return client } func (ns *NatsClient) ServerString() string { return strings.Join(ns.Servers, ",") } func (ns *NatsClient) ConnectServer(opts ...nats.Option) error { conn, err := nats.Connect( strings.Join(ns.Servers, ","), append([]nats.Option{ nats.Name(ns.ClientName), nats.MaxReconnects(-1), nats.DisconnectErrHandler(func(_ *nats.Conn, err error) { log.Warnf("disconnected from nats! %v", err) }), nats.ReconnectHandler(func(nc *nats.Conn) { log.Warnf("reconnected to nats %s!", nc.ConnectedUrl()) }), nats.ClosedHandler(func(nc *nats.Conn) { err := nc.LastError() if err == nil { log.Warn("nats connection closed with no error.") return } log.Errorf("nats connection closed. reason: %q", nc.LastError()) }), nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) { log.Warnf("nats ErrorHandler: %v", err) }), }, opts...)..., ) if err != nil { return err } ns.Conn = conn js, err := ns.Conn.JetStream() if err != nil { return err } ns.JetStream = js return nil } func (ns *NatsClient) Stop() { if ns.Conn != nil { ns.Conn.Close() ns.Conn = nil } } func (ns *NatsClient) CreateStream(streamName string, subjects ...string) error { // 检查流是否存在; 不存在就创建 stream, err := ns.JetStream.StreamInfo(streamName) if err != nil { //不存在就创建 log.Info(err) } var lst []string if len(subjects) == 0 { subject := streamName + ".>" subject = strings.ToLower(subject) lst = append(lst, subject) } else { lst = append(lst, subjects...) } if stream == nil { log.Infof("creating stream %q and subjects %q", streamName, subjects) _, err = ns.JetStream.AddStream(&nats.StreamConfig{ Name: streamName, Subjects: lst, MaxAge: time.Hour * 24 * 30, }) if err != nil { return err } } return nil } func (ns *NatsClient) AddConsumer(streamName string, consumer string) error { _, err := ns.JetStream.AddConsumer(streamName, &nats.ConsumerConfig{ Durable: consumer, // DeliverSubject: subject, 不需要指定,会自动生成 AckPolicy: nats.AckExplicitPolicy, }) return err } func (ns *NatsClient) Publish(topic string, v []byte) error { _, err := ns.JetStream.Publish(topic, v) return err } // subscribe 订阅主题 func (ns *NatsClient) chanSubscribe(topic string, subChan chan *nats.Msg) (*nats.Subscription, error) { sub, err := ns.JetStream.ChanSubscribe(topic, subChan) // if err == nil { // ns.subscriptions[topic] = sub // } return sub, err } func (ns *NatsClient) Subscribe(topic string, cb func(string, []byte), opts ...nats.SubOpt) (*nats.Subscription, error) { var sub *nats.Subscription var err error sub, err = ns.JetStream.Subscribe(topic, func(msg *nats.Msg) { dropped, err := sub.Dropped() if err != nil { log.Errorf("error getting number of dropped messages: %s", err.Error()) } if dropped > ns.dropped { log.Warnf("[rpc server] some messages were dropped! numDropped: %d", dropped) ns.dropped = dropped } log.Debugf("subs channel dropped: %d", dropped) cb(msg.Subject, msg.Data) }, opts...) // if err == nil { // ns.subscriptions[topic] = sub // } // sub.Drain() return sub, err } // 例子 // nsClient.Subscribe("players.*", func(subject string, b []byte) { // fmt.Println(subject, string(b)) // switch subject { // case "players.levelChanged": // m.HandlerServer.Go("LevelChanged", b) // case "players.lastLoginServer": // m.HandlerServer.Go("LastLoginServer", b) // } // }, nats.Durable("login"), nats.AckExplicit()) //, nats.Durable("login")