json.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. package json
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "leafstalk/module/handler"
  7. "reflect"
  8. jsoniter "github.com/json-iterator/go"
  9. "leafstalk/log"
  10. )
  11. type Processor struct {
  12. msgInfo map[string]*MsgInfo
  13. }
  14. type MsgInfo struct {
  15. msgType reflect.Type
  16. msgRouter *handler.Server
  17. msgHandler MsgHandler
  18. msgRawHandler MsgHandler
  19. msgSubscribe []*handler.Server
  20. }
  21. type MsgHandler func([]interface{})
  22. type MsgRaw struct {
  23. msgID string
  24. msgRawData json.RawMessage
  25. }
  26. func NewProcessor() *Processor {
  27. p := new(Processor)
  28. p.msgInfo = make(map[string]*MsgInfo)
  29. return p
  30. }
  31. var json2 = jsoniter.ConfigCompatibleWithStandardLibrary
  32. // Register 注册系统中有哪些需要处理的消息
  33. // It's dangerous to call the method on routing or marshaling (unmarshaling)
  34. func (p *Processor) Register(msg interface{}) string {
  35. msgType := reflect.TypeOf(msg)
  36. if msgType == nil || msgType.Kind() != reflect.Ptr {
  37. log.Fatal("json message pointer required")
  38. }
  39. msgID := msgType.Elem().Name()
  40. if msgID == "" {
  41. log.Fatal("unnamed json message")
  42. }
  43. if _, ok := p.msgInfo[msgID]; ok {
  44. log.Fatalf("message %v is already registered", msgID)
  45. }
  46. i := new(MsgInfo)
  47. i.msgType = msgType
  48. p.msgInfo[msgID] = i
  49. return msgID
  50. }
  51. // SetRouter 设置消息发送目的
  52. // It's dangerous to call the method on routing or marshaling (unmarshaling)
  53. func (p *Processor) SetRouter(msg interface{}, msgRouter *handler.Server) {
  54. msgType := reflect.TypeOf(msg)
  55. if msgType == nil || msgType.Kind() != reflect.Ptr {
  56. log.Fatal("json message pointer required")
  57. }
  58. msgID := msgType.Elem().Name()
  59. i, ok := p.msgInfo[msgID]
  60. if !ok {
  61. log.Fatalf("message %v not registered", msgID)
  62. }
  63. i.msgRouter = msgRouter
  64. }
  65. func (p *Processor) SetRegisterAndRoute(msgID string, msg interface{}, msgRouter *handler.Server) string {
  66. msgType := reflect.TypeOf(msg)
  67. if msgType == nil || msgType.Kind() != reflect.Ptr {
  68. log.Fatal("json message pointer required")
  69. }
  70. // msgID := msgType.Elem().Name()
  71. if msgID == "" {
  72. log.Fatal("unnamed json message")
  73. }
  74. if _, ok := p.msgInfo[msgID]; ok {
  75. log.Fatalf("message %v is already registered", msgID)
  76. }
  77. i := new(MsgInfo)
  78. i.msgType = msgType
  79. i.msgRouter = msgRouter
  80. p.msgInfo[msgID] = i
  81. return msgID
  82. }
  83. func (p *Processor) SetSubscribe(msg interface{}, msgRouter *handler.Server) {
  84. msgType := reflect.TypeOf(msg)
  85. if msgType == nil || msgType.Kind() != reflect.Ptr {
  86. log.Fatal("json message pointer required")
  87. }
  88. msgID := msgType.Elem().Name()
  89. i, ok := p.msgInfo[msgID]
  90. if !ok {
  91. log.Fatalf("message %v not registered", msgID)
  92. }
  93. found := false
  94. for _, v := range i.msgSubscribe {
  95. if v == msgRouter {
  96. found = true
  97. }
  98. }
  99. if !found {
  100. i.msgSubscribe = append(i.msgSubscribe, msgRouter)
  101. }
  102. }
  103. // SetHandler 设置消息处理函数
  104. // It's dangerous to call the method on routing or marshaling (unmarshaling)
  105. func (p *Processor) SetHandler(msg interface{}, msgHandler MsgHandler) {
  106. msgType := reflect.TypeOf(msg)
  107. if msgType == nil || msgType.Kind() != reflect.Ptr {
  108. log.Fatal("json message pointer required")
  109. }
  110. msgID := msgType.Elem().Name()
  111. i, ok := p.msgInfo[msgID]
  112. if !ok {
  113. log.Fatalf("message %v not registered", msgID)
  114. }
  115. i.msgHandler = msgHandler
  116. }
  117. // SetRawHandler 设置消息处理函数
  118. // It's dangerous to call the method on routing or marshaling (unmarshaling)
  119. func (p *Processor) SetRawHandler(msgID string, msgRawHandler MsgHandler) {
  120. i, ok := p.msgInfo[msgID]
  121. if !ok {
  122. log.Fatalf("message %v not registered", msgID)
  123. }
  124. i.msgRawHandler = msgRawHandler
  125. }
  126. // GetMsgInfo
  127. // It's dangerous to call the method on routing or marshaling (unmarshaling)
  128. func (p *Processor) GetMsgInfo(msgID string) *MsgInfo {
  129. i, _ := p.msgInfo[msgID]
  130. return i
  131. }
  132. // Route 原始消息-》调用对应处理函数;结构体-》先调用处理函数,再发给路由
  133. // goroutine safe
  134. func (p *Processor) Route(msg interface{}, userData interface{}) error {
  135. // raw
  136. if msgRaw, ok := msg.(MsgRaw); ok {
  137. //log.Debugf("route message %s", msgRaw.msgID)
  138. i, ok := p.msgInfo[msgRaw.msgID]
  139. if !ok {
  140. return fmt.Errorf("message %v not registered", msgRaw.msgID)
  141. }
  142. if i.msgRawHandler != nil {
  143. i.msgRawHandler([]interface{}{msgRaw.msgID, msgRaw.msgRawData, userData})
  144. }
  145. return nil
  146. }
  147. // json
  148. msgType := reflect.TypeOf(msg)
  149. if msgType == nil || msgType.Kind() != reflect.Ptr {
  150. return errors.New("json message pointer required")
  151. }
  152. msgID := msgType.Elem().Name()
  153. i, ok := p.msgInfo[msgID]
  154. if !ok {
  155. return fmt.Errorf("message %v not registered", msgID)
  156. }
  157. if i.msgHandler != nil {
  158. i.msgHandler([]interface{}{msg, userData})
  159. }
  160. if i.msgRouter != nil {
  161. i.msgRouter.Go(msgType, msg, userData)
  162. }
  163. for _, v := range i.msgSubscribe {
  164. v.Go(msgType, msg, userData)
  165. }
  166. return nil
  167. }
  168. func (p *Processor) RouteToModule(msg interface{}, getUserData func() interface{}, moduleName string) error {
  169. // // raw
  170. if msgRaw, ok := msg.(MsgRaw); ok {
  171. log.Warnf("RouteById message found error. %s", msgRaw.msgID)
  172. return nil
  173. }
  174. // json
  175. msgType := reflect.TypeOf(msg)
  176. if msgType == nil || msgType.Kind() != reflect.Ptr {
  177. return errors.New("json message pointer required")
  178. }
  179. msgID := fmt.Sprintf("module:%v", moduleName)
  180. msgName := msgType.Elem().Name()
  181. i, ok := p.msgInfo[msgID]
  182. if !ok {
  183. return fmt.Errorf("message %v %v not registered", msgID, msgName)
  184. }
  185. if i.msgHandler != nil {
  186. i.msgHandler([]interface{}{msg, getUserData()})
  187. }
  188. if i.msgRouter != nil {
  189. i.msgRouter.Go(msgType, msg, getUserData())
  190. }
  191. for _, v := range i.msgSubscribe {
  192. v.Go(msgType, msg, getUserData())
  193. }
  194. return nil
  195. }
  196. // Unmarshal 字节切片转结构体
  197. // goroutine safe
  198. func (p *Processor) Unmarshal(data []byte) (interface{}, error) {
  199. var m map[string]json.RawMessage
  200. err := json2.Unmarshal(data, &m)
  201. if err != nil {
  202. return nil, err
  203. }
  204. if len(m) != 1 {
  205. return nil, errors.New("invalid json data")
  206. }
  207. for msgID, data := range m {
  208. i, ok := p.msgInfo[msgID]
  209. if !ok {
  210. return nil, fmt.Errorf("message %v not registered", msgID)
  211. }
  212. // msg
  213. if i.msgRawHandler != nil {
  214. return MsgRaw{msgID, data}, nil
  215. } else {
  216. msg := reflect.New(i.msgType.Elem()).Interface()
  217. return msg, json2.Unmarshal(data, msg)
  218. }
  219. }
  220. panic("bug")
  221. }
  222. // gate打包的数据包不带消息头,通过其他方式传输
  223. func (p *Processor) Unmarshal3(msgID string, data []byte) (interface{}, error) {
  224. i, ok := p.msgInfo[msgID]
  225. if !ok {
  226. return nil, fmt.Errorf("message %v not registered", msgID)
  227. }
  228. // msg
  229. if i.msgRawHandler != nil {
  230. return MsgRaw{msgID, data}, nil
  231. } else {
  232. msg := reflect.New(i.msgType.Elem()).Interface()
  233. return msg, json.Unmarshal(data, msg)
  234. }
  235. panic("bug")
  236. }
  237. // Marshal 结构体转字节切片
  238. // goroutine safe
  239. func (p *Processor) Marshal(msg interface{}) ([][]byte, error) {
  240. msgType := reflect.TypeOf(msg)
  241. if msgType == nil || msgType.Kind() != reflect.Ptr {
  242. return nil, errors.New("json message pointer required")
  243. }
  244. msgID := msgType.Elem().Name()
  245. if _, ok := p.msgInfo[msgID]; !ok {
  246. return nil, fmt.Errorf("message %v not registered", msgID)
  247. }
  248. // data
  249. m := map[string]interface{}{msgID: msg}
  250. data, err := json2.Marshal(m)
  251. return [][]byte{data}, err
  252. }
  253. func (p *Processor) MarshalOnly(msg interface{}) (string, []byte, error) {
  254. msgType := reflect.TypeOf(msg)
  255. if msgType == nil || msgType.Kind() != reflect.Ptr {
  256. return "", nil, errors.New("json message pointer required")
  257. }
  258. msgID := msgType.Elem().Name()
  259. if _, ok := p.msgInfo[msgID]; !ok {
  260. return "", nil, fmt.Errorf("message %v not registered", msgID)
  261. }
  262. // data
  263. //m := map[string]interface{}{msgID: msg}
  264. data, err := json.Marshal(msg)
  265. return msgID, data, err
  266. }
  267. func (p *Processor) GetRawMsgId(data []byte) string {
  268. var m map[string]json.RawMessage
  269. err := json.Unmarshal(data, &m)
  270. if err != nil {
  271. return ""
  272. }
  273. if len(m) != 1 {
  274. return ""
  275. }
  276. for msgID, _ := range m {
  277. return msgID
  278. }
  279. return ""
  280. }