json.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. package json
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "leafstalk/module/handler"
  7. "reflect"
  8. jsoniter "github.com/json-iterator/go"
  9. "leafstalk/log"
  10. )
  11. type Processor struct {
  12. msgInfo map[string]*MsgInfo
  13. }
  14. type MsgInfo struct {
  15. msgType reflect.Type
  16. msgRouter *handler.Server
  17. msgHandler MsgHandler
  18. msgRawHandler MsgHandler
  19. msgSubscribe []*handler.Server
  20. }
  21. type MsgHandler func([]interface{})
  22. type MsgRaw struct {
  23. msgID string
  24. msgRawData json.RawMessage
  25. }
  26. func NewProcessor() *Processor {
  27. p := new(Processor)
  28. p.msgInfo = make(map[string]*MsgInfo)
  29. return p
  30. }
  31. var json2 = jsoniter.ConfigCompatibleWithStandardLibrary
  32. // Register 注册系统中有哪些需要处理的消息
  33. // It's dangerous to call the method on routing or marshaling (unmarshaling)
  34. func (p *Processor) Register(msg interface{}) string {
  35. msgType := reflect.TypeOf(msg)
  36. if msgType == nil || msgType.Kind() != reflect.Ptr {
  37. log.Fatal("json message pointer required")
  38. }
  39. msgID := msgType.Elem().Name()
  40. if msgID == "" {
  41. log.Fatal("unnamed json message")
  42. }
  43. if _, ok := p.msgInfo[msgID]; ok {
  44. log.Fatalf("message %v is already registered", msgID)
  45. }
  46. i := new(MsgInfo)
  47. i.msgType = msgType
  48. p.msgInfo[msgID] = i
  49. return msgID
  50. }
  51. // SetRouter 设置消息发送目的
  52. // It's dangerous to call the method on routing or marshaling (unmarshaling)
  53. func (p *Processor) SetRouter(msg interface{}, msgRouter *handler.Server) {
  54. msgType := reflect.TypeOf(msg)
  55. if msgType == nil || msgType.Kind() != reflect.Ptr {
  56. log.Fatal("json message pointer required")
  57. }
  58. msgID := msgType.Elem().Name()
  59. i, ok := p.msgInfo[msgID]
  60. if !ok {
  61. log.Fatalf("message %v not registered", msgID)
  62. }
  63. i.msgRouter = msgRouter
  64. }
  65. func (p *Processor) SetSubscribe(msg interface{}, msgRouter *handler.Server) {
  66. msgType := reflect.TypeOf(msg)
  67. if msgType == nil || msgType.Kind() != reflect.Ptr {
  68. log.Fatal("json message pointer required")
  69. }
  70. msgID := msgType.Elem().Name()
  71. i, ok := p.msgInfo[msgID]
  72. if !ok {
  73. log.Fatalf("message %v not registered", msgID)
  74. }
  75. found := false
  76. for _, v := range i.msgSubscribe {
  77. if v == msgRouter {
  78. found = true
  79. }
  80. }
  81. if !found {
  82. i.msgSubscribe = append(i.msgSubscribe, msgRouter)
  83. }
  84. }
  85. // SetHandler 设置消息处理函数
  86. // It's dangerous to call the method on routing or marshaling (unmarshaling)
  87. func (p *Processor) SetHandler(msg interface{}, msgHandler MsgHandler) {
  88. msgType := reflect.TypeOf(msg)
  89. if msgType == nil || msgType.Kind() != reflect.Ptr {
  90. log.Fatal("json message pointer required")
  91. }
  92. msgID := msgType.Elem().Name()
  93. i, ok := p.msgInfo[msgID]
  94. if !ok {
  95. log.Fatalf("message %v not registered", msgID)
  96. }
  97. i.msgHandler = msgHandler
  98. }
  99. // SetRawHandler 设置消息处理函数
  100. // It's dangerous to call the method on routing or marshaling (unmarshaling)
  101. func (p *Processor) SetRawHandler(msgID string, msgRawHandler MsgHandler) {
  102. i, ok := p.msgInfo[msgID]
  103. if !ok {
  104. log.Fatalf("message %v not registered", msgID)
  105. }
  106. i.msgRawHandler = msgRawHandler
  107. }
  108. // GetMsgInfo
  109. // It's dangerous to call the method on routing or marshaling (unmarshaling)
  110. func (p *Processor) GetMsgInfo(msgID string) *MsgInfo {
  111. i, _ := p.msgInfo[msgID]
  112. return i
  113. }
  114. // Route 原始消息-》调用对应处理函数;结构体-》先调用处理函数,再发给路由
  115. // goroutine safe
  116. func (p *Processor) Route(msg interface{}, userData interface{}) error {
  117. // raw
  118. if msgRaw, ok := msg.(MsgRaw); ok {
  119. //log.Debugf("route message %s", msgRaw.msgID)
  120. i, ok := p.msgInfo[msgRaw.msgID]
  121. if !ok {
  122. return fmt.Errorf("message %v not registered", msgRaw.msgID)
  123. }
  124. if i.msgRawHandler != nil {
  125. i.msgRawHandler([]interface{}{msgRaw.msgID, msgRaw.msgRawData, userData})
  126. }
  127. return nil
  128. }
  129. // json
  130. msgType := reflect.TypeOf(msg)
  131. if msgType == nil || msgType.Kind() != reflect.Ptr {
  132. return errors.New("json message pointer required")
  133. }
  134. msgID := msgType.Elem().Name()
  135. i, ok := p.msgInfo[msgID]
  136. if !ok {
  137. return fmt.Errorf("message %v not registered", msgID)
  138. }
  139. if i.msgHandler != nil {
  140. i.msgHandler([]interface{}{msg, userData})
  141. }
  142. if i.msgRouter != nil {
  143. i.msgRouter.Go(msgType, msg, userData)
  144. }
  145. for _, v := range i.msgSubscribe {
  146. v.Go(msgType, msg, userData)
  147. }
  148. return nil
  149. }
  150. func (p *Processor) RouteById(msg interface{}, getUserData func() interface{}, nodeId int64) error {
  151. // // raw
  152. if msgRaw, ok := msg.(MsgRaw); ok {
  153. log.Warnf("RouteById message found error. %s", msgRaw.msgID)
  154. return nil
  155. }
  156. // json
  157. msgType := reflect.TypeOf(msg)
  158. if msgType == nil || msgType.Kind() != reflect.Ptr {
  159. return errors.New("json message pointer required")
  160. }
  161. msgID := fmt.Sprintf("ResponseRpc:%v", nodeId)
  162. msgName := msgType.Elem().Name()
  163. i, ok := p.msgInfo[msgID]
  164. if !ok {
  165. return fmt.Errorf("message %v %v not registered", msgID, msgName)
  166. }
  167. if i.msgHandler != nil {
  168. i.msgHandler([]interface{}{msg, getUserData()})
  169. }
  170. if i.msgRouter != nil {
  171. i.msgRouter.Go(msgType, msg, getUserData())
  172. }
  173. for _, v := range i.msgSubscribe {
  174. v.Go(msgType, msg, getUserData())
  175. }
  176. return nil
  177. }
  178. // Unmarshal 字节切片转结构体
  179. // goroutine safe
  180. func (p *Processor) Unmarshal(data []byte) (interface{}, error) {
  181. var m map[string]json.RawMessage
  182. err := json2.Unmarshal(data, &m)
  183. if err != nil {
  184. return nil, err
  185. }
  186. if len(m) != 1 {
  187. return nil, errors.New("invalid json data")
  188. }
  189. for msgID, data := range m {
  190. i, ok := p.msgInfo[msgID]
  191. if !ok {
  192. return nil, fmt.Errorf("message %v not registered", msgID)
  193. }
  194. // msg
  195. if i.msgRawHandler != nil {
  196. return MsgRaw{msgID, data}, nil
  197. } else {
  198. msg := reflect.New(i.msgType.Elem()).Interface()
  199. return msg, json2.Unmarshal(data, msg)
  200. }
  201. }
  202. panic("bug")
  203. }
  204. // gate打包的数据包不带消息头,通过其他方式传输
  205. func (p *Processor) Unmarshal3(msgID string, data []byte) (interface{}, error) {
  206. i, ok := p.msgInfo[msgID]
  207. if !ok {
  208. return nil, fmt.Errorf("message %v not registered", msgID)
  209. }
  210. // msg
  211. if i.msgRawHandler != nil {
  212. return MsgRaw{msgID, data}, nil
  213. } else {
  214. msg := reflect.New(i.msgType.Elem()).Interface()
  215. return msg, json.Unmarshal(data, msg)
  216. }
  217. panic("bug")
  218. }
  219. // Marshal 结构体转字节切片
  220. // goroutine safe
  221. func (p *Processor) Marshal(msg interface{}) ([][]byte, error) {
  222. msgType := reflect.TypeOf(msg)
  223. if msgType == nil || msgType.Kind() != reflect.Ptr {
  224. return nil, errors.New("json message pointer required")
  225. }
  226. msgID := msgType.Elem().Name()
  227. if _, ok := p.msgInfo[msgID]; !ok {
  228. return nil, fmt.Errorf("message %v not registered", msgID)
  229. }
  230. // data
  231. m := map[string]interface{}{msgID: msg}
  232. data, err := json2.Marshal(m)
  233. return [][]byte{data}, err
  234. }
  235. func (p *Processor) MarshalOnly(msg interface{}) (string, []byte, error) {
  236. msgType := reflect.TypeOf(msg)
  237. if msgType == nil || msgType.Kind() != reflect.Ptr {
  238. return "", nil, errors.New("json message pointer required")
  239. }
  240. msgID := msgType.Elem().Name()
  241. if _, ok := p.msgInfo[msgID]; !ok {
  242. return "", nil, fmt.Errorf("message %v not registered", msgID)
  243. }
  244. // data
  245. //m := map[string]interface{}{msgID: msg}
  246. data, err := json.Marshal(msg)
  247. return msgID, data, err
  248. }
  249. func (p *Processor) GetRawMsgId(data []byte) string {
  250. var m map[string]json.RawMessage
  251. err := json.Unmarshal(data, &m)
  252. if err != nil {
  253. return ""
  254. }
  255. if len(m) != 1 {
  256. return ""
  257. }
  258. for msgID, _ := range m {
  259. return msgID
  260. }
  261. return ""
  262. }