package json import ( "encoding/json" "errors" "fmt" "leafstalk/module/handler" "reflect" jsoniter "github.com/json-iterator/go" "leafstalk/log" ) type Processor struct { msgInfo map[string]*MsgInfo } type MsgInfo struct { msgType reflect.Type msgRouter *handler.Server msgHandler MsgHandler msgRawHandler MsgHandler msgSubscribe []*handler.Server } type MsgHandler func([]interface{}) type MsgRaw struct { msgID string msgRawData json.RawMessage } func NewProcessor() *Processor { p := new(Processor) p.msgInfo = make(map[string]*MsgInfo) return p } var json2 = jsoniter.ConfigCompatibleWithStandardLibrary // Register 注册系统中有哪些需要处理的消息 // It's dangerous to call the method on routing or marshaling (unmarshaling) func (p *Processor) Register(msg interface{}) string { msgType := reflect.TypeOf(msg) if msgType == nil || msgType.Kind() != reflect.Ptr { log.Fatal("json message pointer required") } msgID := msgType.Elem().Name() if msgID == "" { log.Fatal("unnamed json message") } if _, ok := p.msgInfo[msgID]; ok { log.Fatalf("message %v is already registered", msgID) } i := new(MsgInfo) i.msgType = msgType p.msgInfo[msgID] = i return msgID } // SetRouter 设置消息发送目的 // It's dangerous to call the method on routing or marshaling (unmarshaling) func (p *Processor) SetRouter(msg interface{}, msgRouter *handler.Server) { msgType := reflect.TypeOf(msg) if msgType == nil || msgType.Kind() != reflect.Ptr { log.Fatal("json message pointer required") } msgID := msgType.Elem().Name() i, ok := p.msgInfo[msgID] if !ok { log.Fatalf("message %v not registered", msgID) } i.msgRouter = msgRouter } func (p *Processor) SetSubscribe(msg interface{}, msgRouter *handler.Server) { msgType := reflect.TypeOf(msg) if msgType == nil || msgType.Kind() != reflect.Ptr { log.Fatal("json message pointer required") } msgID := msgType.Elem().Name() i, ok := p.msgInfo[msgID] if !ok { log.Fatalf("message %v not registered", msgID) } found := false for _, v := range i.msgSubscribe { if v == msgRouter { found = true } } if !found { i.msgSubscribe = append(i.msgSubscribe, msgRouter) } } // SetHandler 设置消息处理函数 // It's dangerous to call the method on routing or marshaling (unmarshaling) func (p *Processor) SetHandler(msg interface{}, msgHandler MsgHandler) { msgType := reflect.TypeOf(msg) if msgType == nil || msgType.Kind() != reflect.Ptr { log.Fatal("json message pointer required") } msgID := msgType.Elem().Name() i, ok := p.msgInfo[msgID] if !ok { log.Fatalf("message %v not registered", msgID) } i.msgHandler = msgHandler } // SetRawHandler 设置消息处理函数 // It's dangerous to call the method on routing or marshaling (unmarshaling) func (p *Processor) SetRawHandler(msgID string, msgRawHandler MsgHandler) { i, ok := p.msgInfo[msgID] if !ok { log.Fatalf("message %v not registered", msgID) } i.msgRawHandler = msgRawHandler } // GetMsgInfo // It's dangerous to call the method on routing or marshaling (unmarshaling) func (p *Processor) GetMsgInfo(msgID string) *MsgInfo { i, _ := p.msgInfo[msgID] return i } // Route 原始消息-》调用对应处理函数;结构体-》先调用处理函数,再发给路由 // goroutine safe func (p *Processor) Route(msg interface{}, userData interface{}) error { // raw if msgRaw, ok := msg.(MsgRaw); ok { //log.Debugf("route message %s", msgRaw.msgID) i, ok := p.msgInfo[msgRaw.msgID] if !ok { return fmt.Errorf("message %v not registered", msgRaw.msgID) } if i.msgRawHandler != nil { i.msgRawHandler([]interface{}{msgRaw.msgID, msgRaw.msgRawData, userData}) } return nil } // json msgType := reflect.TypeOf(msg) if msgType == nil || msgType.Kind() != reflect.Ptr { return errors.New("json message pointer required") } msgID := msgType.Elem().Name() i, ok := p.msgInfo[msgID] if !ok { return fmt.Errorf("message %v not registered", msgID) } if i.msgHandler != nil { i.msgHandler([]interface{}{msg, userData}) } if i.msgRouter != nil { i.msgRouter.Go(msgType, msg, userData) } for _, v := range i.msgSubscribe { v.Go(msgType, msg, userData) } return nil } func (p *Processor) RouteById(msg interface{}, getUserData func() interface{}, nodeId int64) error { // // raw if msgRaw, ok := msg.(MsgRaw); ok { log.Warnf("RouteById message found error. %s", msgRaw.msgID) return nil } // json msgType := reflect.TypeOf(msg) if msgType == nil || msgType.Kind() != reflect.Ptr { return errors.New("json message pointer required") } msgID := fmt.Sprintf("ResponseRpc:%v", nodeId) msgName := msgType.Elem().Name() i, ok := p.msgInfo[msgID] if !ok { return fmt.Errorf("message %v %v not registered", msgID, msgName) } if i.msgHandler != nil { i.msgHandler([]interface{}{msg, getUserData()}) } if i.msgRouter != nil { i.msgRouter.Go(msgType, msg, getUserData()) } for _, v := range i.msgSubscribe { v.Go(msgType, msg, getUserData()) } return nil } // Unmarshal 字节切片转结构体 // goroutine safe func (p *Processor) Unmarshal(data []byte) (interface{}, error) { var m map[string]json.RawMessage err := json2.Unmarshal(data, &m) if err != nil { return nil, err } if len(m) != 1 { return nil, errors.New("invalid json data") } for msgID, data := range m { i, ok := p.msgInfo[msgID] if !ok { return nil, fmt.Errorf("message %v not registered", msgID) } // msg if i.msgRawHandler != nil { return MsgRaw{msgID, data}, nil } else { msg := reflect.New(i.msgType.Elem()).Interface() return msg, json2.Unmarshal(data, msg) } } panic("bug") } // gate打包的数据包不带消息头,通过其他方式传输 func (p *Processor) Unmarshal3(msgID string, data []byte) (interface{}, error) { i, ok := p.msgInfo[msgID] if !ok { return nil, fmt.Errorf("message %v not registered", msgID) } // msg if i.msgRawHandler != nil { return MsgRaw{msgID, data}, nil } else { msg := reflect.New(i.msgType.Elem()).Interface() return msg, json.Unmarshal(data, msg) } panic("bug") } // Marshal 结构体转字节切片 // goroutine safe func (p *Processor) Marshal(msg interface{}) ([][]byte, error) { msgType := reflect.TypeOf(msg) if msgType == nil || msgType.Kind() != reflect.Ptr { return nil, errors.New("json message pointer required") } msgID := msgType.Elem().Name() if _, ok := p.msgInfo[msgID]; !ok { return nil, fmt.Errorf("message %v not registered", msgID) } // data m := map[string]interface{}{msgID: msg} data, err := json2.Marshal(m) return [][]byte{data}, err } func (p *Processor) MarshalOnly(msg interface{}) (string, []byte, error) { msgType := reflect.TypeOf(msg) if msgType == nil || msgType.Kind() != reflect.Ptr { return "", nil, errors.New("json message pointer required") } msgID := msgType.Elem().Name() if _, ok := p.msgInfo[msgID]; !ok { return "", nil, fmt.Errorf("message %v not registered", msgID) } // data //m := map[string]interface{}{msgID: msg} data, err := json.Marshal(msg) return msgID, data, err } func (p *Processor) GetRawMsgId(data []byte) string { var m map[string]json.RawMessage err := json.Unmarshal(data, &m) if err != nil { return "" } if len(m) != 1 { return "" } for msgID, _ := range m { return msgID } return "" }