123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298 |
- package json
- import (
- "encoding/json"
- "errors"
- "fmt"
- "leafstalk/module/handler"
- "reflect"
- jsoniter "github.com/json-iterator/go"
- "leafstalk/log"
- )
- type Processor struct {
- msgInfo map[string]*MsgInfo
- }
- type MsgInfo struct {
- msgType reflect.Type
- msgRouter *handler.Server
- msgHandler MsgHandler
- msgRawHandler MsgHandler
- msgSubscribe []*handler.Server
- }
- type MsgHandler func([]interface{})
- type MsgRaw struct {
- msgID string
- msgRawData json.RawMessage
- }
- func NewProcessor() *Processor {
- p := new(Processor)
- p.msgInfo = make(map[string]*MsgInfo)
- return p
- }
- var json2 = jsoniter.ConfigCompatibleWithStandardLibrary
- // Register 注册系统中有哪些需要处理的消息
- // It's dangerous to call the method on routing or marshaling (unmarshaling)
- func (p *Processor) Register(msg interface{}) string {
- msgType := reflect.TypeOf(msg)
- if msgType == nil || msgType.Kind() != reflect.Ptr {
- log.Fatal("json message pointer required")
- }
- msgID := msgType.Elem().Name()
- if msgID == "" {
- log.Fatal("unnamed json message")
- }
- if _, ok := p.msgInfo[msgID]; ok {
- log.Fatalf("message %v is already registered", msgID)
- }
- i := new(MsgInfo)
- i.msgType = msgType
- p.msgInfo[msgID] = i
- return msgID
- }
- // SetRouter 设置消息发送目的
- // It's dangerous to call the method on routing or marshaling (unmarshaling)
- func (p *Processor) SetRouter(msg interface{}, msgRouter *handler.Server) {
- msgType := reflect.TypeOf(msg)
- if msgType == nil || msgType.Kind() != reflect.Ptr {
- log.Fatal("json message pointer required")
- }
- msgID := msgType.Elem().Name()
- i, ok := p.msgInfo[msgID]
- if !ok {
- log.Fatalf("message %v not registered", msgID)
- }
- i.msgRouter = msgRouter
- }
- func (p *Processor) SetSubscribe(msg interface{}, msgRouter *handler.Server) {
- msgType := reflect.TypeOf(msg)
- if msgType == nil || msgType.Kind() != reflect.Ptr {
- log.Fatal("json message pointer required")
- }
- msgID := msgType.Elem().Name()
- i, ok := p.msgInfo[msgID]
- if !ok {
- log.Fatalf("message %v not registered", msgID)
- }
- found := false
- for _, v := range i.msgSubscribe {
- if v == msgRouter {
- found = true
- }
- }
- if !found {
- i.msgSubscribe = append(i.msgSubscribe, msgRouter)
- }
- }
- // SetHandler 设置消息处理函数
- // It's dangerous to call the method on routing or marshaling (unmarshaling)
- func (p *Processor) SetHandler(msg interface{}, msgHandler MsgHandler) {
- msgType := reflect.TypeOf(msg)
- if msgType == nil || msgType.Kind() != reflect.Ptr {
- log.Fatal("json message pointer required")
- }
- msgID := msgType.Elem().Name()
- i, ok := p.msgInfo[msgID]
- if !ok {
- log.Fatalf("message %v not registered", msgID)
- }
- i.msgHandler = msgHandler
- }
- // SetRawHandler 设置消息处理函数
- // It's dangerous to call the method on routing or marshaling (unmarshaling)
- func (p *Processor) SetRawHandler(msgID string, msgRawHandler MsgHandler) {
- i, ok := p.msgInfo[msgID]
- if !ok {
- log.Fatalf("message %v not registered", msgID)
- }
- i.msgRawHandler = msgRawHandler
- }
- // GetMsgInfo
- // It's dangerous to call the method on routing or marshaling (unmarshaling)
- func (p *Processor) GetMsgInfo(msgID string) *MsgInfo {
- i, _ := p.msgInfo[msgID]
- return i
- }
- // Route 原始消息-》调用对应处理函数;结构体-》先调用处理函数,再发给路由
- // goroutine safe
- func (p *Processor) Route(msg interface{}, userData interface{}) error {
- // raw
- if msgRaw, ok := msg.(MsgRaw); ok {
- //log.Debugf("route message %s", msgRaw.msgID)
- i, ok := p.msgInfo[msgRaw.msgID]
- if !ok {
- return fmt.Errorf("message %v not registered", msgRaw.msgID)
- }
- if i.msgRawHandler != nil {
- i.msgRawHandler([]interface{}{msgRaw.msgID, msgRaw.msgRawData, userData})
- }
- return nil
- }
- // json
- msgType := reflect.TypeOf(msg)
- if msgType == nil || msgType.Kind() != reflect.Ptr {
- return errors.New("json message pointer required")
- }
- msgID := msgType.Elem().Name()
- i, ok := p.msgInfo[msgID]
- if !ok {
- return fmt.Errorf("message %v not registered", msgID)
- }
- if i.msgHandler != nil {
- i.msgHandler([]interface{}{msg, userData})
- }
- if i.msgRouter != nil {
- i.msgRouter.Go(msgType, msg, userData)
- }
- for _, v := range i.msgSubscribe {
- v.Go(msgType, msg, userData)
- }
- return nil
- }
- func (p *Processor) RouteById(msg interface{}, getUserData func() interface{}, nodeId int64) error {
- // // raw
- if msgRaw, ok := msg.(MsgRaw); ok {
- log.Warnf("RouteById message found error. %s", msgRaw.msgID)
- return nil
- }
- // json
- msgType := reflect.TypeOf(msg)
- if msgType == nil || msgType.Kind() != reflect.Ptr {
- return errors.New("json message pointer required")
- }
- msgID := fmt.Sprintf("ResponseRpc:%v", nodeId)
- msgName := msgType.Elem().Name()
- i, ok := p.msgInfo[msgID]
- if !ok {
- return fmt.Errorf("message %v %v not registered", msgID, msgName)
- }
- if i.msgHandler != nil {
- i.msgHandler([]interface{}{msg, getUserData()})
- }
- if i.msgRouter != nil {
- i.msgRouter.Go(msgType, msg, getUserData())
- }
- for _, v := range i.msgSubscribe {
- v.Go(msgType, msg, getUserData())
- }
- return nil
- }
- // Unmarshal 字节切片转结构体
- // goroutine safe
- func (p *Processor) Unmarshal(data []byte) (interface{}, error) {
- var m map[string]json.RawMessage
- err := json2.Unmarshal(data, &m)
- if err != nil {
- return nil, err
- }
- if len(m) != 1 {
- return nil, errors.New("invalid json data")
- }
- for msgID, data := range m {
- i, ok := p.msgInfo[msgID]
- if !ok {
- return nil, fmt.Errorf("message %v not registered", msgID)
- }
- // msg
- if i.msgRawHandler != nil {
- return MsgRaw{msgID, data}, nil
- } else {
- msg := reflect.New(i.msgType.Elem()).Interface()
- return msg, json2.Unmarshal(data, msg)
- }
- }
- panic("bug")
- }
- // gate打包的数据包不带消息头,通过其他方式传输
- func (p *Processor) Unmarshal3(msgID string, data []byte) (interface{}, error) {
- i, ok := p.msgInfo[msgID]
- if !ok {
- return nil, fmt.Errorf("message %v not registered", msgID)
- }
- // msg
- if i.msgRawHandler != nil {
- return MsgRaw{msgID, data}, nil
- } else {
- msg := reflect.New(i.msgType.Elem()).Interface()
- return msg, json.Unmarshal(data, msg)
- }
- panic("bug")
- }
- // Marshal 结构体转字节切片
- // goroutine safe
- func (p *Processor) Marshal(msg interface{}) ([][]byte, error) {
- msgType := reflect.TypeOf(msg)
- if msgType == nil || msgType.Kind() != reflect.Ptr {
- return nil, errors.New("json message pointer required")
- }
- msgID := msgType.Elem().Name()
- if _, ok := p.msgInfo[msgID]; !ok {
- return nil, fmt.Errorf("message %v not registered", msgID)
- }
- // data
- m := map[string]interface{}{msgID: msg}
- data, err := json2.Marshal(m)
- return [][]byte{data}, err
- }
- func (p *Processor) MarshalOnly(msg interface{}) (string, []byte, error) {
- msgType := reflect.TypeOf(msg)
- if msgType == nil || msgType.Kind() != reflect.Ptr {
- return "", nil, errors.New("json message pointer required")
- }
- msgID := msgType.Elem().Name()
- if _, ok := p.msgInfo[msgID]; !ok {
- return "", nil, fmt.Errorf("message %v not registered", msgID)
- }
- // data
- //m := map[string]interface{}{msgID: msg}
- data, err := json.Marshal(msg)
- return msgID, data, err
- }
- func (p *Processor) GetRawMsgId(data []byte) string {
- var m map[string]json.RawMessage
- err := json.Unmarshal(data, &m)
- if err != nil {
- return ""
- }
- if len(m) != 1 {
- return ""
- }
- for msgID, _ := range m {
- return msgID
- }
- return ""
- }
|