package nhook import ( "errors" "fmt" "log" "strings" "github.com/nats-io/nats.go" "github.com/sirupsen/logrus" ) // HookConf defines the vars needed to connect to nats and add the logrus hook type HookConf struct { NatsConfig Subject string `json:"subject"` Dimensions map[string]interface{} `json:"dimensions"` } // NatsHook will emit logs to the subject provided type NatsHook struct { conn *nats.Conn jetStream nats.JetStreamContext subject string extraFields map[string]interface{} dynamicFields map[string]func() interface{} Formatter logrus.Formatter LogLevels []logrus.Level } // 使用JetStreamContext创建流 func createStream(js nats.JetStreamContext, streamName string) error { // 检查流是否存在; 不存在就创建 stream, err := js.StreamInfo(streamName) if err != nil { log.Println(err) } streamSubjects := streamName + ".*" if stream == nil { log.Printf("creating stream %q and subjects %q", streamName, streamSubjects) _, err = js.AddStream(&nats.StreamConfig{ Name: streamName, Subjects: []string{streamSubjects}, }) if err != nil { return err } } return nil } // AddNatsHook will connect to nats, add the hook to logrus, and percolate any errors up func AddNatsHook(conf *HookConf, lgrs *logrus.Logger) (*nats.Conn, *NatsHook, error) { if conf.Subject == "" { return nil, nil, errors.New("Must provide a subject for the nats hook") } nc, err := ConnectToNatsNoTls(&conf.NatsConfig) if err != nil { return nil, nil, err } hook := NewNatsHook(nc, conf.Subject) for k, v := range conf.Dimensions { hook.AddField(k, v) } js, err := nc.JetStream() if err != nil { return nil, nil, err } ns := strings.SplitN(conf.Subject, ".", 2) err = createStream(js, ns[0]) if err != nil { return nil, nil, err } hook.jetStream = js lgrs.AddHook(hook) return nc, hook, nil } // NewNatsHook will create a logrus hook that will automatically send // new info into the channel func NewNatsHook(conn *nats.Conn, subject string) *NatsHook { hook := NatsHook{ conn: conn, subject: subject, extraFields: make(map[string]interface{}), dynamicFields: make(map[string]func() interface{}), Formatter: &logrus.JSONFormatter{}, LogLevels: []logrus.Level{ logrus.PanicLevel, logrus.FatalLevel, logrus.ErrorLevel, logrus.WarnLevel, logrus.InfoLevel, logrus.DebugLevel, }, } return &hook } // AddField will add a simple value each emission func (hook *NatsHook) AddField(key string, value interface{}) *NatsHook { hook.extraFields[key] = value return hook } // AddDynamicField will call that method on each fire func (hook *NatsHook) AddDynamicField(key string, generator func() interface{}) *NatsHook { hook.dynamicFields[key] = generator return hook } // Fire will use the connection and try to send the message to the right destination func (hook *NatsHook) Fire(entry *logrus.Entry) error { if hook.conn.IsClosed() { return fmt.Errorf("Attempted to log on a closed connection") } // add in the new fields for k, v := range hook.extraFields { entry.Data[k] = v } for k, generator := range hook.dynamicFields { entry.Data[k] = generator() } bytes, err := hook.Formatter.Format(entry) if err != nil { return err } _, err = hook.jetStream.Publish(hook.subject, bytes) // err = hook.conn.Publish(hook.subject, bytes) return err } // Levels will describe what levels the NatsHook is associated with func (hook *NatsHook) Levels() []logrus.Level { return hook.LogLevels } // var conn1 *net.TCPConn // func dial() { // tcpAddr, err := net.ResolveTCPAddr("tcp4", "127.0.0.1:8080") // if err != nil { // log.Printf("Resolve tcp addr failed: %v\n", err) // return // } // conn1, err = net.DialTCP("tcp", nil, tcpAddr) // if err != nil { // log.Printf("Dial to server failed: %v\n", err) // return // } // } // // 向服务器端发消息 // func SendMsg(msg []byte) { // _, err := conn1.Write([]byte(msg)) // if err != nil { // log.Printf("SendMsg failed: %v\n", err) // } // }