remotecall2.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. package rpc
  2. import (
  3. "errors"
  4. "fmt"
  5. "leafstalk/covenant/msg"
  6. "leafstalk/covenant/servers"
  7. "leafstalk/module"
  8. "leafstalk/network/cluster"
  9. "leafstalk/otherutils/snowflake"
  10. "leafstalk/otherutils/timercache"
  11. "leafstalk/router/json"
  12. "os"
  13. "reflect"
  14. "time"
  15. )
  16. var (
  17. ErrNoConnected = errors.New("remoterpc: no connected agent")
  18. ErrTimeOut = errors.New("remoterpc: response time out")
  19. ErrResponseType = errors.New("remoterpc: response type error")
  20. // ErrNil = errors.New("localrpc: nil returned")
  21. NodeNum = int64(0)
  22. )
  23. type RemoteCallService struct {
  24. Skeleton *module.Skeleton
  25. timerCache *timercache.CacheKeys
  26. NodeSequence *snowflake.Node
  27. moduleName string
  28. process *json.Processor
  29. }
  30. // 时间毫秒28(74小时),node10(1023),步长25(33554432个)
  31. // 调用RollGenerate产生时间循环ID
  32. func NewTimeRoolNode(node int64) (*snowflake.Node, error) {
  33. startTs := time.Date(2020, 1, 1, 0, 0, 0, 0, time.Local)
  34. nodeBits := uint8(10)
  35. stepBits := uint8(35 - nodeBits)
  36. return snowflake.NewNode(node, nodeBits, stepBits, 1, startTs.Unix())
  37. }
  38. func SplitNodeId(id int64) int64 {
  39. nodeBits := uint8(10)
  40. stepBits := uint8(35 - nodeBits)
  41. nodeMax := int64(-1 ^ (-1 << nodeBits))
  42. tmp := nodeMax & (id >> stepBits)
  43. return tmp
  44. }
  45. func NewAndInit(m *module.Skeleton, process *json.Processor, moduleName string) *RemoteCallService {
  46. s := new(RemoteCallService)
  47. s.Skeleton = m
  48. s.process = process
  49. s.timerCache = timercache.NewCacheKeys(time.Second * 10)
  50. NodeNum += 1
  51. nodeId := NodeNum
  52. var err error
  53. s.NodeSequence, err = NewTimeRoolNode(nodeId)
  54. if err != nil {
  55. fmt.Fprintf(os.Stderr, "NewTimeRoolNode error. %#v", err)
  56. return nil
  57. }
  58. s.moduleName = moduleName
  59. // id := s.NodeSequence.RollGenerate().Int64()
  60. // nodeId2 := SplitNodeId(id)
  61. // fmt.Sprintln(nodeId, nodeId2)
  62. // s.Processor = Processor
  63. // s.getAgent = getAgent
  64. s.Skeleton.RegisterChanRPC("RpcTimeout", handleRpcTimeout)
  65. // msgName := fmt.Sprintf("module:%v", moduleName)
  66. // process.SetRegisterAndRoute(msgName)
  67. // s.Skeleton.RegisterChanRPC(msgName, s.HandleRpcResponse)
  68. return s
  69. }
  70. func (rcs *RemoteCallService) HandleMsg(msg interface{}) *RemoteCallService {
  71. //设置处理函数
  72. rcs.Skeleton.RegisterChanRPC(reflect.TypeOf(msg), rcs.HandleRpcResponse)
  73. //消息路由目标是本模块
  74. rcs.process.Register(msg)
  75. rcs.process.SetRouter(msg, rcs.Skeleton.HandlerServer)
  76. return rcs
  77. }
  78. func (rcs *RemoteCallService) RegisterMsg(msgs ...interface{}) *RemoteCallService {
  79. for _, msg := range msgs {
  80. rcs.process.Register(msg)
  81. }
  82. return rcs
  83. }
  84. // 主协程处理超时消息
  85. func handleRpcTimeout(args []interface{}) {
  86. f, ok := args[0].(func(resp msg.RpcResponse, err error))
  87. if !ok {
  88. fmt.Fprintf(os.Stderr, "handleRpcTimeout func type error. %#v", args[0])
  89. return
  90. }
  91. f(nil, ErrTimeOut)
  92. }
  93. // 主协程处理返回的响应消息
  94. func (rcs *RemoteCallService) HandleRpcResponse(args []interface{}) {
  95. resp, ok1 := args[0].(msg.RpcResponse)
  96. if !ok1 {
  97. fmt.Fprintf(os.Stderr, "HandleRpcResponse error. %#v", args[0])
  98. return
  99. }
  100. k := resp.GetMsgId()
  101. f := rcs.timerCache.Remove(k)
  102. if f == nil {
  103. fmt.Fprintf(os.Stderr, "HandleRpcResponse no response func. %#v", resp)
  104. return
  105. }
  106. f2, ok := f.(func(msg.RpcResponse, error))
  107. if !ok {
  108. fmt.Fprintf(os.Stderr, "HandleRpcResponse func type error. %#v", f)
  109. return
  110. }
  111. f2(resp, nil)
  112. }
  113. func (rcs *RemoteCallService) Handler(args []interface{}) {
  114. rcs.HandleRpcResponse(args)
  115. }
  116. func (s *RemoteCallService) SendMsgTo(msg msg.RpcRequest, fSucess func(resp msg.RpcResponse, err error), toServerAgent *cluster.ServerAgent, playerId int64, gateId int64) error {
  117. if toServerAgent == nil {
  118. return ErrNoConnected
  119. }
  120. msgId := s.NodeSequence.RollGenerate().Int64()
  121. //定时缓存
  122. k := s.timerCache.Add(fSucess, func(e *timercache.TimerEntry) {
  123. //另一个协程,发送超时消息给处理协程
  124. s.Skeleton.HandlerServer.Go("RpcTimeout", fSucess)
  125. }, msgId)
  126. msg.SetMsgId(k)
  127. msg.SetSender(s.moduleName)
  128. //发送
  129. err := toServerAgent.WriteServerRouteMsg(msg, playerId, gateId, "")
  130. if err != nil {
  131. s.timerCache.Remove(k)
  132. return err
  133. }
  134. return nil
  135. }
  136. // 和SendAsyncMsg一样,只是形式参数不一样
  137. func SendAsyncMsgByRpcService[T msg.RpcResponse](request msg.RpcRequest, sucess func(respMsg T),
  138. fail func(errCode int, err error), noexpect func(exceptCode int),
  139. transService *RemoteCallService, toServerAgent *cluster.ServerAgent, opts ...SendOpt) {
  140. processResponse := func(respa msg.RpcResponse, err error) {
  141. if err != nil {
  142. noexpect(22) //超时
  143. return
  144. }
  145. resp3, ok := respa.(T)
  146. if !ok {
  147. noexpect(23) //数据类型错误
  148. return
  149. }
  150. // 添加失败打印错误,并不恢复
  151. errCode := resp3.GetErrCode()
  152. if errCode != 0 {
  153. fail(errCode, fmt.Errorf("%v", errCode))
  154. return
  155. }
  156. // 奖品发送给客户端
  157. sucess(resp3)
  158. }
  159. config := RemoteCallSendParam{}
  160. for _, opt := range opts {
  161. if opt == nil {
  162. continue
  163. }
  164. if err := opt.configureSendParam(&config); err != nil {
  165. fail(25, err)
  166. return
  167. }
  168. }
  169. if transService == nil {
  170. fail(26, fmt.Errorf("%v", 26))
  171. return
  172. }
  173. err := transService.SendMsgTo(request, processResponse, toServerAgent, config.playerId, config.gateId)
  174. if err != nil {
  175. fail(21, err)
  176. }
  177. }
  178. type RemoteCallSendParam struct {
  179. // toServerAgent *cluster.ServerAgent
  180. playerId int64
  181. gateId int64
  182. }
  183. type SendOpt interface {
  184. configureSendParam(opts *RemoteCallSendParam) error
  185. }
  186. type SendOptFn func(opts *RemoteCallSendParam) error
  187. func (opt SendOptFn) configureSendParam(opts *RemoteCallSendParam) error {
  188. return opt(opts)
  189. }
  190. // func ReceiveServer(agent *cluster.ServerAgent) SendOpt {
  191. // return SendOptFn(func(opts *RemoteCallSendParam) error {
  192. // opts.toServerAgent = agent
  193. // return nil
  194. // })
  195. // }
  196. func AboutPlayer(playerId int64, gateId int64) SendOpt {
  197. return SendOptFn(func(opts *RemoteCallSendParam) error {
  198. opts.playerId = playerId
  199. opts.gateId = gateId
  200. return nil
  201. })
  202. }
  203. func AboutGate(gateId int64) SendOpt {
  204. return SendOptFn(func(opts *RemoteCallSendParam) error {
  205. opts.gateId = gateId
  206. return nil
  207. })
  208. }
  209. func AboutPlayerAgent(playerAgent *servers.PlayerAgent) SendOpt {
  210. return SendOptFn(func(opts *RemoteCallSendParam) error {
  211. if playerAgent == nil {
  212. return errors.New("playerAgent is nil")
  213. }
  214. opts.playerId = playerAgent.PlayerId
  215. opts.gateId = playerAgent.GateId
  216. return nil
  217. })
  218. }