123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- package natsjs
- import (
- "leafstalk/log"
- "strings"
- "time"
- "github.com/nats-io/nats.go"
- )
- type NatsClient struct {
- Servers []string
- ClientName string
- Conn *nats.Conn
- JetStream nats.JetStreamContext
- dropped int
- }
- // var natsClient *NatsClient
- func New(servers []string, clientName string) *NatsClient {
- client := new(NatsClient)
- client.Servers = servers
- client.ClientName = clientName
- return client
- }
- func (ns *NatsClient) ServerString() string {
- return strings.Join(ns.Servers, ",")
- }
- func (ns *NatsClient) ConnectServer(opts ...nats.Option) error {
- conn, err := nats.Connect(
- strings.Join(ns.Servers, ","),
- append([]nats.Option{
- nats.Name(ns.ClientName),
- nats.MaxReconnects(-1),
- nats.DisconnectErrHandler(func(_ *nats.Conn, err error) {
- log.Warnf("disconnected from nats! %v", err)
- }),
- nats.ReconnectHandler(func(nc *nats.Conn) {
- log.Warnf("reconnected to nats %s!", nc.ConnectedUrl())
- }),
- nats.ClosedHandler(func(nc *nats.Conn) {
- err := nc.LastError()
- if err == nil {
- log.Warn("nats connection closed with no error.")
- return
- }
- log.Errorf("nats connection closed. reason: %q", nc.LastError())
- }),
- nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) {
- log.Warnf("nats ErrorHandler: %v", err)
- }),
- }, opts...)...,
- )
- if err != nil {
- return err
- }
- ns.Conn = conn
- js, err := ns.Conn.JetStream()
- if err != nil {
- return err
- }
- ns.JetStream = js
- return nil
- }
- func (ns *NatsClient) Stop() {
- if ns.Conn != nil {
- ns.Conn.Close()
- ns.Conn = nil
- }
- }
- func (ns *NatsClient) CreateStream(streamName string, subjects ...string) error {
- // 检查流是否存在; 不存在就创建
- stream, err := ns.JetStream.StreamInfo(streamName)
- if err != nil {
- //不存在就创建
- log.Info(err)
- }
- var lst []string
- if len(subjects) == 0 {
- subject := streamName + ".>"
- subject = strings.ToLower(subject)
- lst = append(lst, subject)
- } else {
- lst = append(lst, subjects...)
- }
- if stream == nil {
- log.Infof("creating stream %q and subjects %q", streamName, subjects)
- _, err = ns.JetStream.AddStream(&nats.StreamConfig{
- Name: streamName,
- Subjects: lst,
- MaxAge: time.Hour * 24 * 30,
- })
- if err != nil {
- return err
- }
- }
- return nil
- }
- func (ns *NatsClient) AddConsumer(streamName string, consumer string) error {
- _, err := ns.JetStream.AddConsumer(streamName, &nats.ConsumerConfig{
- Durable: consumer,
- // DeliverSubject: subject, 不需要指定,会自动生成
- AckPolicy: nats.AckExplicitPolicy,
- })
- return err
- }
- func (ns *NatsClient) Publish(topic string, v []byte) error {
- _, err := ns.JetStream.Publish(topic, v)
- return err
- }
- // subscribe 订阅主题
- func (ns *NatsClient) chanSubscribe(topic string, subChan chan *nats.Msg) (*nats.Subscription, error) {
- sub, err := ns.JetStream.ChanSubscribe(topic, subChan)
- // if err == nil {
- // ns.subscriptions[topic] = sub
- // }
- return sub, err
- }
- func (ns *NatsClient) Subscribe(topic string, cb func(string, []byte), opts ...nats.SubOpt) (*nats.Subscription, error) {
- var sub *nats.Subscription
- var err error
- sub, err = ns.JetStream.Subscribe(topic, func(msg *nats.Msg) {
- dropped, err := sub.Dropped()
- if err != nil {
- log.Errorf("error getting number of dropped messages: %s", err.Error())
- }
- if dropped > ns.dropped {
- log.Warnf("[rpc server] some messages were dropped! numDropped: %d", dropped)
- ns.dropped = dropped
- }
- log.Debugf("subs channel dropped: %d", dropped)
- cb(msg.Subject, msg.Data)
- }, opts...)
- // if err == nil {
- // ns.subscriptions[topic] = sub
- // }
- // sub.Drain()
- return sub, err
- }
- // 例子
- // nsClient.Subscribe("players.*", func(subject string, b []byte) {
- // fmt.Println(subject, string(b))
- // switch subject {
- // case "players.levelChanged":
- // m.HandlerServer.Go("LevelChanged", b)
- // case "players.lastLoginServer":
- // m.HandlerServer.Go("LastLoginServer", b)
- // }
- // }, nats.Durable("login"), nats.AckExplicit()) //, nats.Durable("login")
|