nats_logrus_hook.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. package nhook
  2. import (
  3. "errors"
  4. "fmt"
  5. "log"
  6. "strings"
  7. "github.com/nats-io/nats.go"
  8. "github.com/sirupsen/logrus"
  9. )
  10. // HookConf defines the vars needed to connect to nats and add the logrus hook
  11. type HookConf struct {
  12. NatsConfig
  13. Subject string `json:"subject"`
  14. Dimensions map[string]interface{} `json:"dimensions"`
  15. }
  16. // NatsHook will emit logs to the subject provided
  17. type NatsHook struct {
  18. conn *nats.Conn
  19. jetStream nats.JetStreamContext
  20. subject string
  21. extraFields map[string]interface{}
  22. dynamicFields map[string]func() interface{}
  23. Formatter logrus.Formatter
  24. LogLevels []logrus.Level
  25. }
  26. // 使用JetStreamContext创建流
  27. func createStream(js nats.JetStreamContext, streamName string) error {
  28. // 检查流是否存在; 不存在就创建
  29. stream, err := js.StreamInfo(streamName)
  30. if err != nil {
  31. log.Println(err)
  32. }
  33. streamSubjects := streamName + ".*"
  34. if stream == nil {
  35. log.Printf("creating stream %q and subjects %q", streamName, streamSubjects)
  36. _, err = js.AddStream(&nats.StreamConfig{
  37. Name: streamName,
  38. Subjects: []string{streamSubjects},
  39. })
  40. if err != nil {
  41. return err
  42. }
  43. }
  44. return nil
  45. }
  46. // AddNatsHook will connect to nats, add the hook to logrus, and percolate any errors up
  47. func AddNatsHook(conf *HookConf, lgrs *logrus.Logger) (*nats.Conn, *NatsHook, error) {
  48. if conf.Subject == "" {
  49. return nil, nil, errors.New("Must provide a subject for the nats hook")
  50. }
  51. nc, err := ConnectToNatsNoTls(&conf.NatsConfig)
  52. if err != nil {
  53. return nil, nil, err
  54. }
  55. hook := NewNatsHook(nc, conf.Subject)
  56. for k, v := range conf.Dimensions {
  57. hook.AddField(k, v)
  58. }
  59. js, err := nc.JetStream()
  60. if err != nil {
  61. return nil, nil, err
  62. }
  63. ns := strings.SplitN(conf.Subject, ".", 2)
  64. err = createStream(js, ns[0])
  65. if err != nil {
  66. return nil, nil, err
  67. }
  68. hook.jetStream = js
  69. lgrs.AddHook(hook)
  70. return nc, hook, nil
  71. }
  72. // NewNatsHook will create a logrus hook that will automatically send
  73. // new info into the channel
  74. func NewNatsHook(conn *nats.Conn, subject string) *NatsHook {
  75. hook := NatsHook{
  76. conn: conn,
  77. subject: subject,
  78. extraFields: make(map[string]interface{}),
  79. dynamicFields: make(map[string]func() interface{}),
  80. Formatter: &logrus.JSONFormatter{},
  81. LogLevels: []logrus.Level{
  82. logrus.PanicLevel,
  83. logrus.FatalLevel,
  84. logrus.ErrorLevel,
  85. logrus.WarnLevel,
  86. logrus.InfoLevel,
  87. logrus.DebugLevel,
  88. },
  89. }
  90. return &hook
  91. }
  92. // AddField will add a simple value each emission
  93. func (hook *NatsHook) AddField(key string, value interface{}) *NatsHook {
  94. hook.extraFields[key] = value
  95. return hook
  96. }
  97. // AddDynamicField will call that method on each fire
  98. func (hook *NatsHook) AddDynamicField(key string, generator func() interface{}) *NatsHook {
  99. hook.dynamicFields[key] = generator
  100. return hook
  101. }
  102. // Fire will use the connection and try to send the message to the right destination
  103. func (hook *NatsHook) Fire(entry *logrus.Entry) error {
  104. if hook.conn.IsClosed() {
  105. return fmt.Errorf("Attempted to log on a closed connection")
  106. }
  107. // add in the new fields
  108. for k, v := range hook.extraFields {
  109. entry.Data[k] = v
  110. }
  111. for k, generator := range hook.dynamicFields {
  112. entry.Data[k] = generator()
  113. }
  114. bytes, err := hook.Formatter.Format(entry)
  115. if err != nil {
  116. return err
  117. }
  118. _, err = hook.jetStream.Publish(hook.subject, bytes)
  119. // err = hook.conn.Publish(hook.subject, bytes)
  120. return err
  121. }
  122. // Levels will describe what levels the NatsHook is associated with
  123. func (hook *NatsHook) Levels() []logrus.Level {
  124. return hook.LogLevels
  125. }
  126. // var conn1 *net.TCPConn
  127. // func dial() {
  128. // tcpAddr, err := net.ResolveTCPAddr("tcp4", "127.0.0.1:8080")
  129. // if err != nil {
  130. // log.Printf("Resolve tcp addr failed: %v\n", err)
  131. // return
  132. // }
  133. // conn1, err = net.DialTCP("tcp", nil, tcpAddr)
  134. // if err != nil {
  135. // log.Printf("Dial to server failed: %v\n", err)
  136. // return
  137. // }
  138. // }
  139. // // 向服务器端发消息
  140. // func SendMsg(msg []byte) {
  141. // _, err := conn1.Write([]byte(msg))
  142. // if err != nil {
  143. // log.Printf("SendMsg failed: %v\n", err)
  144. // }
  145. // }