123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- package nhook
- import (
- "errors"
- "fmt"
- "log"
- "strings"
- "github.com/nats-io/nats.go"
- "github.com/sirupsen/logrus"
- )
- // HookConf defines the vars needed to connect to nats and add the logrus hook
- type HookConf struct {
- NatsConfig
- Subject string `json:"subject"`
- Dimensions map[string]interface{} `json:"dimensions"`
- }
- // NatsHook will emit logs to the subject provided
- type NatsHook struct {
- conn *nats.Conn
- jetStream nats.JetStreamContext
- subject string
- extraFields map[string]interface{}
- dynamicFields map[string]func() interface{}
- Formatter logrus.Formatter
- LogLevels []logrus.Level
- }
- // 使用JetStreamContext创建流
- func createStream(js nats.JetStreamContext, streamName string) error {
- // 检查流是否存在; 不存在就创建
- stream, err := js.StreamInfo(streamName)
- if err != nil {
- log.Println(err)
- }
- streamSubjects := streamName + ".*"
- if stream == nil {
- log.Printf("creating stream %q and subjects %q", streamName, streamSubjects)
- _, err = js.AddStream(&nats.StreamConfig{
- Name: streamName,
- Subjects: []string{streamSubjects},
- })
- if err != nil {
- return err
- }
- }
- return nil
- }
- // AddNatsHook will connect to nats, add the hook to logrus, and percolate any errors up
- func AddNatsHook(conf *HookConf, lgrs *logrus.Logger) (*nats.Conn, *NatsHook, error) {
- if conf.Subject == "" {
- return nil, nil, errors.New("Must provide a subject for the nats hook")
- }
- nc, err := ConnectToNatsNoTls(&conf.NatsConfig)
- if err != nil {
- return nil, nil, err
- }
- hook := NewNatsHook(nc, conf.Subject)
- for k, v := range conf.Dimensions {
- hook.AddField(k, v)
- }
- js, err := nc.JetStream()
- if err != nil {
- return nil, nil, err
- }
- ns := strings.SplitN(conf.Subject, ".", 2)
- err = createStream(js, ns[0])
- if err != nil {
- return nil, nil, err
- }
- hook.jetStream = js
- lgrs.AddHook(hook)
- return nc, hook, nil
- }
- // NewNatsHook will create a logrus hook that will automatically send
- // new info into the channel
- func NewNatsHook(conn *nats.Conn, subject string) *NatsHook {
- hook := NatsHook{
- conn: conn,
- subject: subject,
- extraFields: make(map[string]interface{}),
- dynamicFields: make(map[string]func() interface{}),
- Formatter: &logrus.JSONFormatter{},
- LogLevels: []logrus.Level{
- logrus.PanicLevel,
- logrus.FatalLevel,
- logrus.ErrorLevel,
- logrus.WarnLevel,
- logrus.InfoLevel,
- logrus.DebugLevel,
- },
- }
- return &hook
- }
- // AddField will add a simple value each emission
- func (hook *NatsHook) AddField(key string, value interface{}) *NatsHook {
- hook.extraFields[key] = value
- return hook
- }
- // AddDynamicField will call that method on each fire
- func (hook *NatsHook) AddDynamicField(key string, generator func() interface{}) *NatsHook {
- hook.dynamicFields[key] = generator
- return hook
- }
- // Fire will use the connection and try to send the message to the right destination
- func (hook *NatsHook) Fire(entry *logrus.Entry) error {
- if hook.conn.IsClosed() {
- return fmt.Errorf("Attempted to log on a closed connection")
- }
- // add in the new fields
- for k, v := range hook.extraFields {
- entry.Data[k] = v
- }
- for k, generator := range hook.dynamicFields {
- entry.Data[k] = generator()
- }
- bytes, err := hook.Formatter.Format(entry)
- if err != nil {
- return err
- }
- _, err = hook.jetStream.Publish(hook.subject, bytes)
- // err = hook.conn.Publish(hook.subject, bytes)
- return err
- }
- // Levels will describe what levels the NatsHook is associated with
- func (hook *NatsHook) Levels() []logrus.Level {
- return hook.LogLevels
- }
- // var conn1 *net.TCPConn
- // func dial() {
- // tcpAddr, err := net.ResolveTCPAddr("tcp4", "127.0.0.1:8080")
- // if err != nil {
- // log.Printf("Resolve tcp addr failed: %v\n", err)
- // return
- // }
- // conn1, err = net.DialTCP("tcp", nil, tcpAddr)
- // if err != nil {
- // log.Printf("Dial to server failed: %v\n", err)
- // return
- // }
- // }
- // // 向服务器端发消息
- // func SendMsg(msg []byte) {
- // _, err := conn1.Write([]byte(msg))
- // if err != nil {
- // log.Printf("SendMsg failed: %v\n", err)
- // }
- // }
|