|
- package rpc
- import (
- "errors"
- "fmt"
- "leafstalk/covenant/msg"
- "leafstalk/covenant/servers"
- "leafstalk/module"
- "leafstalk/network/cluster"
- "leafstalk/otherutils/snowflake"
- "leafstalk/otherutils/timercache"
- "os"
- "time"
- )
- var (
- ErrNoConnected = errors.New("remoterpc: no connected agent")
- ErrTimeOut = errors.New("remoterpc: response time out")
- ErrResponseType = errors.New("remoterpc: response type error")
- // ErrNil = errors.New("localrpc: nil returned")
- NodeNum = int64(0)
- )
- type RemoteCallService struct {
- Skeleton *module.Skeleton
- timerCache *timercache.CacheKeys
- NodeSequence *snowflake.Node
- }
- // 时间毫秒28(74小时),node10(1023),步长25(33554432个)
- // 调用RollGenerate产生时间循环ID
- func NewTimeRoolNode(node int64) (*snowflake.Node, error) {
- startTs := time.Date(2020, 1, 1, 0, 0, 0, 0, time.Local)
- nodeBits := uint8(10)
- stepBits := uint8(35 - nodeBits)
- return snowflake.NewNode(node, nodeBits, stepBits, 1, startTs.Unix())
- }
- func SplitNodeId(id int64) int64 {
- nodeBits := uint8(10)
- stepBits := uint8(35 - nodeBits)
- nodeMax := int64(-1 ^ (-1 << nodeBits))
- tmp := nodeMax & (id >> stepBits)
- return tmp
- }
- func NewAndInit(m *module.Skeleton) *RemoteCallService {
- s := new(RemoteCallService)
- s.Skeleton = m
- s.timerCache = timercache.NewCacheKeys(time.Second * 10)
- NodeNum += 1
- nodeId := NodeNum
- var err error
- s.NodeSequence, err = NewTimeRoolNode(nodeId)
- if err != nil {
- fmt.Fprintf(os.Stderr, "NewTimeRoolNode error. %#v", err)
- return nil
- }
- // id := s.NodeSequence.RollGenerate().Int64()
- // nodeId2 := SplitNodeId(id)
- // fmt.Sprintln(nodeId, nodeId2)
- // s.Processor = Processor
- // s.getAgent = getAgent
- s.Skeleton.RegisterChanRPC("RpcTimeout", handleRpcTimeout)
- msgName := fmt.Sprintf("ResponseRpc:%v", nodeId)
- s.Skeleton.RegisterChanRPC(msgName, s.HandleRpcResponse)
- return s
- }
- // 主协程处理超时消息
- func handleRpcTimeout(args []interface{}) {
- f, ok := args[0].(func(resp msg.RpcResponse, err error))
- if !ok {
- fmt.Fprintf(os.Stderr, "handleRpcTimeout func type error. %#v", args[0])
- return
- }
- f(nil, ErrTimeOut)
- }
- // 主协程处理返回的响应消息
- func (rcs *RemoteCallService) HandleRpcResponse(args []interface{}) {
- resp, ok1 := args[0].(msg.RpcResponse)
- if !ok1 {
- fmt.Fprintf(os.Stderr, "HandleRpcResponse error. %#v", args[0])
- return
- }
- k := resp.GetMsgId()
- f := rcs.timerCache.Remove(k)
- if f == nil {
- fmt.Fprintf(os.Stderr, "HandleRpcResponse no response func. %#v", resp)
- return
- }
- f2, ok := f.(func(msg.RpcResponse, error))
- if !ok {
- fmt.Fprintf(os.Stderr, "HandleRpcResponse func type error. %#v", f)
- return
- }
- f2(resp, nil)
- }
- func (rcs *RemoteCallService) Handler(args []interface{}) {
- rcs.HandleRpcResponse(args)
- }
- func (s *RemoteCallService) SendMsgTo(msg msg.RPCer, fSucess func(resp msg.RpcResponse, err error), toServerAgent *cluster.ServerAgent, playerId int64, gateId int64) error {
- if toServerAgent == nil {
- return ErrNoConnected
- }
- msgId := s.NodeSequence.RollGenerate().Int64()
- //定时缓存
- k := s.timerCache.Add(fSucess, func(e *timercache.TimerEntry) {
- //另一个协程,发送超时消息给处理协程
- s.Skeleton.HandlerServer.Go("RpcTimeout", fSucess)
- }, msgId)
- msg.SetMsgId(k)
- //发送
- err := toServerAgent.WriteServerRouteMsg(msg, playerId, gateId, 0)
- if err != nil {
- s.timerCache.Remove(k)
- return err
- }
- return nil
- }
- // 和SendAsyncMsg一样,只是形式参数不一样
- func SendAsyncMsgByRpcService[T msg.RpcResponse](request msg.RPCer, sucess func(respMsg T),
- fail func(errCode int, err error), noexpect func(exceptCode int),
- transService *RemoteCallService, toServerAgent *cluster.ServerAgent, opts ...SendOpt) {
- processResponse := func(respa msg.RpcResponse, err error) {
- if err != nil {
- noexpect(22) //超时
- return
- }
- resp3, ok := respa.(T)
- if !ok {
- noexpect(23) //数据类型错误
- return
- }
- // 添加失败打印错误,并不恢复
- errCode := resp3.GetErrCode()
- if errCode != 0 {
- fail(errCode, fmt.Errorf("%v", errCode))
- return
- }
- // 奖品发送给客户端
- sucess(resp3)
- }
- config := RemoteCallSendParam{}
- for _, opt := range opts {
- if opt == nil {
- continue
- }
- if err := opt.configureSendParam(&config); err != nil {
- fail(25, err)
- return
- }
- }
- if transService == nil {
- fail(26, fmt.Errorf("%v", 26))
- return
- }
- err := transService.SendMsgTo(request, processResponse, toServerAgent, config.playerId, config.gateId)
- if err != nil {
- fail(21, err)
- }
- }
- type RemoteCallSendParam struct {
- // toServerAgent *cluster.ServerAgent
- playerId int64
- gateId int64
- }
- type SendOpt interface {
- configureSendParam(opts *RemoteCallSendParam) error
- }
- type SendOptFn func(opts *RemoteCallSendParam) error
- func (opt SendOptFn) configureSendParam(opts *RemoteCallSendParam) error {
- return opt(opts)
- }
- // func ReceiveServer(agent *cluster.ServerAgent) SendOpt {
- // return SendOptFn(func(opts *RemoteCallSendParam) error {
- // opts.toServerAgent = agent
- // return nil
- // })
- // }
- func AboutPlayer(playerId int64, gateId int64) SendOpt {
- return SendOptFn(func(opts *RemoteCallSendParam) error {
- opts.playerId = playerId
- opts.gateId = gateId
- return nil
- })
- }
- func AboutGate(gateId int64) SendOpt {
- return SendOptFn(func(opts *RemoteCallSendParam) error {
- opts.gateId = gateId
- return nil
- })
- }
- func AboutPlayerAgent(playerAgent *servers.PlayerAgent) SendOpt {
- return SendOptFn(func(opts *RemoteCallSendParam) error {
- if playerAgent == nil {
- return errors.New("playerAgent is nil")
- }
- opts.playerId = playerAgent.PlayerId
- opts.gateId = playerAgent.GateId
- return nil
- })
- }
|