remotecall2.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. package rpc
  2. import (
  3. "errors"
  4. "fmt"
  5. "leafstalk/covenant/msg"
  6. "leafstalk/covenant/servers"
  7. "leafstalk/module"
  8. "leafstalk/network/cluster"
  9. "leafstalk/otherutils/snowflake"
  10. "leafstalk/otherutils/timercache"
  11. "os"
  12. "time"
  13. )
  14. var (
  15. ErrNoConnected = errors.New("remoterpc: no connected agent")
  16. ErrTimeOut = errors.New("remoterpc: response time out")
  17. ErrResponseType = errors.New("remoterpc: response type error")
  18. // ErrNil = errors.New("localrpc: nil returned")
  19. NodeNum = int64(0)
  20. )
  21. type RemoteCallService struct {
  22. Skeleton *module.Skeleton
  23. timerCache *timercache.CacheKeys
  24. NodeSequence *snowflake.Node
  25. }
  26. // 时间毫秒28(74小时),node10(1023),步长25(33554432个)
  27. // 调用RollGenerate产生时间循环ID
  28. func NewTimeRoolNode(node int64) (*snowflake.Node, error) {
  29. startTs := time.Date(2020, 1, 1, 0, 0, 0, 0, time.Local)
  30. nodeBits := uint8(10)
  31. stepBits := uint8(35 - nodeBits)
  32. return snowflake.NewNode(node, nodeBits, stepBits, 1, startTs.Unix())
  33. }
  34. func SplitNodeId(id int64) int64 {
  35. nodeBits := uint8(10)
  36. stepBits := uint8(35 - nodeBits)
  37. nodeMax := int64(-1 ^ (-1 << nodeBits))
  38. tmp := nodeMax & (id >> stepBits)
  39. return tmp
  40. }
  41. func NewAndInit(m *module.Skeleton) *RemoteCallService {
  42. s := new(RemoteCallService)
  43. s.Skeleton = m
  44. s.timerCache = timercache.NewCacheKeys(time.Second * 10)
  45. NodeNum += 1
  46. nodeId := NodeNum
  47. var err error
  48. s.NodeSequence, err = NewTimeRoolNode(nodeId)
  49. if err != nil {
  50. fmt.Fprintf(os.Stderr, "NewTimeRoolNode error. %#v", err)
  51. return nil
  52. }
  53. // id := s.NodeSequence.RollGenerate().Int64()
  54. // nodeId2 := SplitNodeId(id)
  55. // fmt.Sprintln(nodeId, nodeId2)
  56. // s.Processor = Processor
  57. // s.getAgent = getAgent
  58. s.Skeleton.RegisterChanRPC("RpcTimeout", handleRpcTimeout)
  59. msgName := fmt.Sprintf("ResponseRpc:%v", nodeId)
  60. s.Skeleton.RegisterChanRPC(msgName, s.HandleRpcResponse)
  61. return s
  62. }
  63. // 主协程处理超时消息
  64. func handleRpcTimeout(args []interface{}) {
  65. f, ok := args[0].(func(resp msg.RpcResponse, err error))
  66. if !ok {
  67. fmt.Fprintf(os.Stderr, "handleRpcTimeout func type error. %#v", args[0])
  68. return
  69. }
  70. f(nil, ErrTimeOut)
  71. }
  72. // 主协程处理返回的响应消息
  73. func (rcs *RemoteCallService) HandleRpcResponse(args []interface{}) {
  74. resp, ok1 := args[0].(msg.RpcResponse)
  75. if !ok1 {
  76. fmt.Fprintf(os.Stderr, "HandleRpcResponse error. %#v", args[0])
  77. return
  78. }
  79. k := resp.GetMsgId()
  80. f := rcs.timerCache.Remove(k)
  81. if f == nil {
  82. fmt.Fprintf(os.Stderr, "HandleRpcResponse no response func. %#v", resp)
  83. return
  84. }
  85. f2, ok := f.(func(msg.RpcResponse, error))
  86. if !ok {
  87. fmt.Fprintf(os.Stderr, "HandleRpcResponse func type error. %#v", f)
  88. return
  89. }
  90. f2(resp, nil)
  91. }
  92. func (rcs *RemoteCallService) Handler(args []interface{}) {
  93. rcs.HandleRpcResponse(args)
  94. }
  95. func (s *RemoteCallService) SendMsgTo(msg msg.RPCer, fSucess func(resp msg.RpcResponse, err error), toServerAgent *cluster.ServerAgent, playerId int64, gateId int64) error {
  96. if toServerAgent == nil {
  97. return ErrNoConnected
  98. }
  99. msgId := s.NodeSequence.RollGenerate().Int64()
  100. //定时缓存
  101. k := s.timerCache.Add(fSucess, func(e *timercache.TimerEntry) {
  102. //另一个协程,发送超时消息给处理协程
  103. s.Skeleton.HandlerServer.Go("RpcTimeout", fSucess)
  104. }, msgId)
  105. msg.SetMsgId(k)
  106. //发送
  107. err := toServerAgent.WriteServerRouteMsg(msg, playerId, gateId, 0)
  108. if err != nil {
  109. s.timerCache.Remove(k)
  110. return err
  111. }
  112. return nil
  113. }
  114. // 和SendAsyncMsg一样,只是形式参数不一样
  115. func SendAsyncMsgByRpcService[T msg.RpcResponse](request msg.RPCer, sucess func(respMsg T),
  116. fail func(errCode int, err error), noexpect func(exceptCode int),
  117. transService *RemoteCallService, toServerAgent *cluster.ServerAgent, opts ...SendOpt) {
  118. processResponse := func(respa msg.RpcResponse, err error) {
  119. if err != nil {
  120. noexpect(22) //超时
  121. return
  122. }
  123. resp3, ok := respa.(T)
  124. if !ok {
  125. noexpect(23) //数据类型错误
  126. return
  127. }
  128. // 添加失败打印错误,并不恢复
  129. errCode := resp3.GetErrCode()
  130. if errCode != 0 {
  131. fail(errCode, fmt.Errorf("%v", errCode))
  132. return
  133. }
  134. // 奖品发送给客户端
  135. sucess(resp3)
  136. }
  137. config := RemoteCallSendParam{}
  138. for _, opt := range opts {
  139. if opt == nil {
  140. continue
  141. }
  142. if err := opt.configureSendParam(&config); err != nil {
  143. fail(25, err)
  144. return
  145. }
  146. }
  147. if transService == nil {
  148. fail(26, fmt.Errorf("%v", 26))
  149. return
  150. }
  151. err := transService.SendMsgTo(request, processResponse, toServerAgent, config.playerId, config.gateId)
  152. if err != nil {
  153. fail(21, err)
  154. }
  155. }
  156. type RemoteCallSendParam struct {
  157. // toServerAgent *cluster.ServerAgent
  158. playerId int64
  159. gateId int64
  160. }
  161. type SendOpt interface {
  162. configureSendParam(opts *RemoteCallSendParam) error
  163. }
  164. type SendOptFn func(opts *RemoteCallSendParam) error
  165. func (opt SendOptFn) configureSendParam(opts *RemoteCallSendParam) error {
  166. return opt(opts)
  167. }
  168. // func ReceiveServer(agent *cluster.ServerAgent) SendOpt {
  169. // return SendOptFn(func(opts *RemoteCallSendParam) error {
  170. // opts.toServerAgent = agent
  171. // return nil
  172. // })
  173. // }
  174. func AboutPlayer(playerId int64, gateId int64) SendOpt {
  175. return SendOptFn(func(opts *RemoteCallSendParam) error {
  176. opts.playerId = playerId
  177. opts.gateId = gateId
  178. return nil
  179. })
  180. }
  181. func AboutGate(gateId int64) SendOpt {
  182. return SendOptFn(func(opts *RemoteCallSendParam) error {
  183. opts.gateId = gateId
  184. return nil
  185. })
  186. }
  187. func AboutPlayerAgent(playerAgent *servers.PlayerAgent) SendOpt {
  188. return SendOptFn(func(opts *RemoteCallSendParam) error {
  189. if playerAgent == nil {
  190. return errors.New("playerAgent is nil")
  191. }
  192. opts.playerId = playerAgent.PlayerId
  193. opts.gateId = playerAgent.GateId
  194. return nil
  195. })
  196. }