package rpc import ( "errors" "fmt" "leafstalk/covenant/msg" "leafstalk/covenant/servers" "leafstalk/module" "leafstalk/network/cluster" "leafstalk/otherutils/snowflake" "leafstalk/otherutils/timercache" "os" "time" ) var ( ErrNoConnected = errors.New("remoterpc: no connected agent") ErrTimeOut = errors.New("remoterpc: response time out") ErrResponseType = errors.New("remoterpc: response type error") // ErrNil = errors.New("localrpc: nil returned") NodeNum = int64(0) ) type RemoteCallService struct { Skeleton *module.Skeleton timerCache *timercache.CacheKeys NodeSequence *snowflake.Node } // 时间毫秒28(74小时),node10(1023),步长25(33554432个) // 调用RollGenerate产生时间循环ID func NewTimeRoolNode(node int64) (*snowflake.Node, error) { startTs := time.Date(2020, 1, 1, 0, 0, 0, 0, time.Local) nodeBits := uint8(10) stepBits := uint8(35 - nodeBits) return snowflake.NewNode(node, nodeBits, stepBits, 1, startTs.Unix()) } func SplitNodeId(id int64) int64 { nodeBits := uint8(10) stepBits := uint8(35 - nodeBits) nodeMax := int64(-1 ^ (-1 << nodeBits)) tmp := nodeMax & (id >> stepBits) return tmp } func NewAndInit(m *module.Skeleton) *RemoteCallService { s := new(RemoteCallService) s.Skeleton = m s.timerCache = timercache.NewCacheKeys(time.Second * 10) NodeNum += 1 nodeId := NodeNum var err error s.NodeSequence, err = NewTimeRoolNode(nodeId) if err != nil { fmt.Fprintf(os.Stderr, "NewTimeRoolNode error. %#v", err) return nil } // id := s.NodeSequence.RollGenerate().Int64() // nodeId2 := SplitNodeId(id) // fmt.Sprintln(nodeId, nodeId2) // s.Processor = Processor // s.getAgent = getAgent s.Skeleton.RegisterChanRPC("RpcTimeout", handleRpcTimeout) msgName := fmt.Sprintf("ResponseRpc:%v", nodeId) s.Skeleton.RegisterChanRPC(msgName, s.HandleRpcResponse) return s } // 主协程处理超时消息 func handleRpcTimeout(args []interface{}) { f, ok := args[0].(func(resp msg.RpcResponse, err error)) if !ok { fmt.Fprintf(os.Stderr, "handleRpcTimeout func type error. %#v", args[0]) return } f(nil, ErrTimeOut) } // 主协程处理返回的响应消息 func (rcs *RemoteCallService) HandleRpcResponse(args []interface{}) { resp, ok1 := args[0].(msg.RpcResponse) if !ok1 { fmt.Fprintf(os.Stderr, "HandleRpcResponse error. %#v", args[0]) return } k := resp.GetMsgId() f := rcs.timerCache.Remove(k) if f == nil { fmt.Fprintf(os.Stderr, "HandleRpcResponse no response func. %#v", resp) return } f2, ok := f.(func(msg.RpcResponse, error)) if !ok { fmt.Fprintf(os.Stderr, "HandleRpcResponse func type error. %#v", f) return } f2(resp, nil) } func (rcs *RemoteCallService) Handler(args []interface{}) { rcs.HandleRpcResponse(args) } func (s *RemoteCallService) SendMsgTo(msg msg.RPCer, fSucess func(resp msg.RpcResponse, err error), toServerAgent *cluster.ServerAgent, playerId int64, gateId int64) error { if toServerAgent == nil { return ErrNoConnected } msgId := s.NodeSequence.RollGenerate().Int64() //定时缓存 k := s.timerCache.Add(fSucess, func(e *timercache.TimerEntry) { //另一个协程,发送超时消息给处理协程 s.Skeleton.HandlerServer.Go("RpcTimeout", fSucess) }, msgId) msg.SetMsgId(k) //发送 err := toServerAgent.WriteServerRouteMsg(msg, playerId, gateId, 0) if err != nil { s.timerCache.Remove(k) return err } return nil } // 和SendAsyncMsg一样,只是形式参数不一样 func SendAsyncMsgByRpcService[T msg.RpcResponse](request msg.RPCer, sucess func(respMsg T), fail func(errCode int, err error), noexpect func(exceptCode int), transService *RemoteCallService, toServerAgent *cluster.ServerAgent, opts ...SendOpt) { processResponse := func(respa msg.RpcResponse, err error) { if err != nil { noexpect(22) //超时 return } resp3, ok := respa.(T) if !ok { noexpect(23) //数据类型错误 return } // 添加失败打印错误,并不恢复 errCode := resp3.GetErrCode() if errCode != 0 { fail(errCode, fmt.Errorf("%v", errCode)) return } // 奖品发送给客户端 sucess(resp3) } config := RemoteCallSendParam{} for _, opt := range opts { if opt == nil { continue } if err := opt.configureSendParam(&config); err != nil { fail(25, err) return } } if transService == nil { fail(26, fmt.Errorf("%v", 26)) return } err := transService.SendMsgTo(request, processResponse, toServerAgent, config.playerId, config.gateId) if err != nil { fail(21, err) } } type RemoteCallSendParam struct { // toServerAgent *cluster.ServerAgent playerId int64 gateId int64 } type SendOpt interface { configureSendParam(opts *RemoteCallSendParam) error } type SendOptFn func(opts *RemoteCallSendParam) error func (opt SendOptFn) configureSendParam(opts *RemoteCallSendParam) error { return opt(opts) } // func ReceiveServer(agent *cluster.ServerAgent) SendOpt { // return SendOptFn(func(opts *RemoteCallSendParam) error { // opts.toServerAgent = agent // return nil // }) // } func AboutPlayer(playerId int64, gateId int64) SendOpt { return SendOptFn(func(opts *RemoteCallSendParam) error { opts.playerId = playerId opts.gateId = gateId return nil }) } func AboutGate(gateId int64) SendOpt { return SendOptFn(func(opts *RemoteCallSendParam) error { opts.gateId = gateId return nil }) } func AboutPlayerAgent(playerAgent *servers.PlayerAgent) SendOpt { return SendOptFn(func(opts *RemoteCallSendParam) error { if playerAgent == nil { return errors.New("playerAgent is nil") } opts.playerId = playerAgent.PlayerId opts.gateId = playerAgent.GateId return nil }) }