// 实现连接服务器的管理 // 因为所连服务器的数目一般都比较少,不超过80个,所以用切片实现 // 服务器供多个协程使用,通过加锁实现互斥访问 // 实现接口,可以设置服务器为预关闭状态 // 部分服务器有分组Group,部分服务器没分组 // 分组是分组更新的意思,执行灰度更新,有分组的服务器需要同组相连 //无分组的服务器是无状态的服务器,可以开多个,数据处理时: // 1、根据玩家ID%服务器数量选择 // 2、一致性选择服务器 package servers import ( sjson "encoding/json" "errors" "leafstalk/covenant/model" "leafstalk/log" "leafstalk/network/cluster" "leafstalk/otherutils" "reflect" "sync" "sync/atomic" ) type Server struct { ID int64 Type string Addr string RunState int32 Load int32 Agent *cluster.ServerAgent Lines []int // Group int } var ( Servers []*Server serverMutex sync.RWMutex //sync.Mutex // serverGroup int serverLines []int ) const ( ServerType_Login = "login" ServerType_Gate = "gate" ServerTypeWorld = "world" ServerType_Archive = "archive" ServerTypeStore = "store" ServerType_WxPay = "wepay" ServerType_GM = "gm" ServerType_Mproom = "mproom" ServerType_ChpaterSync = "chaptersync" ServerType_ChpaterMatch = "chaptermatch" RunState_Normal = 0 RunState_Stop = 1 RunState_PrepareStop = 2 ) func SetServerLines(lines []int) { serverLines = lines } func GetServerLines() []int { return serverLines } func (s *Server) IsRunNormal() bool { return s.RunState == RunState_Normal } func NewServer(agent *cluster.ServerAgent) *Server { server := new(Server) server.Agent = agent return server } func AddServer(server *Server) int { serverMutex.Lock() defer serverMutex.Unlock() for _, v := range Servers { if v == server { return len(Servers) } if v.ID == server.ID { v.Agent.Close() } } Servers = append(Servers, server) return len(Servers) } func RemoveServer(server *Server) int { serverMutex.Lock() defer serverMutex.Unlock() i := 0 for _, v := range Servers { if v != server { Servers[i] = v i++ } } Servers = Servers[:i] //RemoveAuthServer(server) return len(Servers) } // GetTypeServer 查询指定类型的服务器 // 只有一个某种类型服务的情况 func GetTypeServer(serverType string) *Server { serverMutex.RLock() defer serverMutex.RUnlock() for _, v := range Servers { if v.Type != serverType { continue } return v } return nil } // GetChpaterSyncServer 返回房间数最少的 func GetChpaterSyncServer(roomid int64) *Server { serverMutex.RLock() defer serverMutex.RUnlock() min := int32(99999) // var server *Server for _, v := range Servers { if v.Type != ServerType_ChpaterSync { continue } // 寻找最小 if v.Load < min { min = v.Load server = v } } return server } // 查询指定线路 类型的服务器 // 多条线路 多个同种服务 func GetServerByTypeAndLine(serverType string, line int) *Server { serverMutex.RLock() defer serverMutex.RUnlock() for _, v := range Servers { if v.Type != serverType { continue } for _, l := range v.Lines { if l == line { return v } } } return nil } func GetServerById(id int64) *Server { serverMutex.RLock() defer serverMutex.RUnlock() for _, v := range Servers { if v.ID == id { return v } } return nil } // func GetGateServer() *Server { // return GetTypeServer(ServerType_Gate) // } // GetLoginServer 登陆服务器 // 连接一个Login的情况 func GetLoginServer() *Server { return GetTypeServer(ServerType_Login) } // GetArchiveServer 世界服务器 // 连接一个Archive的情况 func GetArchiveServer() *Server { return GetTypeServer(ServerType_Archive) } // // GetWorldServerByGroup 世界服务器 // func GetWorldServerByGroup(group int) *Server { // return GetGroupServer(ServerType_World, group) // } // 连接一个world服务的情况 func GetWorldServer() *Server { return GetTypeServer(ServerTypeWorld) } func GetWorldServerByPlayerId(playerId int64) *Server { _, l := model.SplitDocId(playerId) s := GetServerByTypeAndLine(ServerTypeWorld, l) return s } func GetWorldAgent() *cluster.ServerAgent { s := GetTypeServer(ServerTypeWorld) if s != nil { return s.Agent } return nil } // 杂货商店 // func GetStoreServerByGroup(group int) *Server { // return GetGroupServer(ServerType_Store, group) // } // 连接一个Store服务的情况 func GetStoreServer() *Server { return GetTypeServer(ServerTypeStore) } // store服务 func GetStoreAgent() *cluster.ServerAgent { s := GetTypeServer(ServerTypeStore) if s != nil { return s.Agent } return nil } // 连接一个chapter服务的情况 // func GetChapterServerByGroup(group int) *Server { // return GetGroupServer("chapter", group) // } func GetChapterServer() *Server { return GetTypeServer("chapter") } func GetChapterAgent() *cluster.ServerAgent { s := GetTypeServer("chapter") if s != nil { return s.Agent } return nil } func GetChapterServerByPlayerId(playerId int64) *Server { _, l := model.SplitDocId(playerId) s := GetServerByTypeAndLine("chapter", l) return s } func GetChapterMatchServer() *Server { return GetTypeServer(ServerType_ChpaterMatch) } func GetChapterMatchAgent() *cluster.ServerAgent { s := GetTypeServer(ServerType_ChpaterMatch) if s != nil { return s.Agent } return nil } // 只有一个room服务的情况 // func GetRoomServerByGroup(group int) *Server { // return GetGroupServer(ServerType_Mproom, group) // } func GetMpRoomServer() *Server { return GetTypeServer(ServerType_Mproom) } func GetGMServer() *Server { return GetTypeServer(ServerType_GM) } func GetChapterSyncServer(roomId int64) *Server { return GetChpaterSyncServer(roomId) // GetTypeServer(ServerType_ChpaterSync) } // 是否是自己的线路 func IsSelfLine(line int) bool { for _, v := range serverLines { if v == line { return true } } return false } // 改变负载 func UpdateLoad(id int64, load int) *Server { server := GetServerById(id) if server != nil { atomic.StoreInt32(&server.Load, int32(load)) } return server } func (s *Server) IncreaseLoad() { atomic.AddInt32(&s.Load, 1) } func (s *Server) SendToServer(Msg interface{}, playerId int64, gateId int64, rpcMsgId int64) error { if s.Agent == nil { return errors.New("agent is nil, server is disconnect") } msgType := reflect.TypeOf(Msg) var msgName string if msgType != nil && msgType.Kind() == reflect.Ptr { msgName = msgType.Elem().Name() } log.Infof("SendToServer %v %d, playerId:%v", msgName, gateId, playerId) return s.Agent.WriteServerRouteMsg(Msg, playerId, gateId, rpcMsgId) } func (s *Server) SendNotifyToServer(Msg interface{}) error { if s.Agent == nil { return errors.New("agent is nil, server is disconnect") } return s.Agent.WriteServerRouteMsg(Msg, 0, 0, 0) } // RemoteProcessMsg 发送给其他服务处理,PlayerAgent必须有值,并且ServerAgent指向gate // func (s *Server) RemoteProcessMsg(Msg interface{}, player *PlayerAgent) error { // if s.Agent == nil { // return errors.New("agent is nil, server is disconnected") // } // return s.Agent.WriteServerRouteMsg(Msg, player.PlayerId, player.GateId) // } func SendToPlayer(playerId int64, Msg interface{}, gateId int64) error { server := GetServerById(gateId) if server == nil { return errors.New("SendToPlayer no found gate server") } if server.Agent == nil { return errors.New("SendToPlayer agent is nil, server is disconnect") } msgType := reflect.TypeOf(Msg) var msgName string if msgType != nil && msgType.Kind() == reflect.Ptr { msgName = msgType.Elem().Name() } data := []byte("...") switch msgName { case "ResponseSyncRivalInfo": default: data, _ = sjson.Marshal(Msg) if len(data) > 2048 { data = data[:2048] } } log.Info(msgName, gateId, "playerId:", playerId, string(data)) _, err := server.Agent.WriteClientRouteMsg(Msg, playerId) return err } func SendToPlayerNoLog(playerId int64, Msg interface{}, gateId int64) error { server := GetServerById(gateId) if server == nil { return errors.New("SendToPlayer no found gate server") } if server.Agent == nil { return errors.New("SendToPlayer agent is nil, server is disconnect") } _, err := server.Agent.WriteClientRouteMsg(Msg, playerId) return err } // 广播发送, flag=-12:发送给数据包指定的多个玩家;=-11:广播发送给所有在线玩家 func SendToPlayersByGates(flag int64, Msg interface{}) error { return SendToSomePlayersByGates(Msg, flag) // log.Infof("SendToPlayersByGates flag:%v, Msg:%+v", flag, otherutils.DumpToJSON(Msg)) // var data [][]byte // var err error // RangeServers(func(server *Server) { // if server == nil { // return // } // if server.Agent == nil { // return // } // if err != nil { // return // } // if data == nil { // data, err = server.Agent.WriteClientRouteMsg(Msg, flag) // } else { // server.Agent.WriteRawMsg(data...) // } // }, ServerType_Gate) // return err } func SendToSomePlayersByGates(Msg interface{}, playerIds ...int64) error { log.Infof("SendToPlayersByGates playerIds num:%v, Msg:%+v", len(playerIds), otherutils.DumpToJSON(Msg)) var data [][]byte var err error RangeServers(func(server *Server) { if server == nil { return } if server.Agent == nil { return } if err != nil { return } if data == nil { data, err = server.Agent.WriteClientRouteMsg(Msg, playerIds...) } else { server.Agent.WriteRawMsg(data...) } }, ServerType_Gate) return err } // 改变运行状态 func UpdateRunState(id int64, state int) *Server { server := GetServerById(id) if server != nil { atomic.StoreInt32(&server.RunState, int32(state)) } return server } func Count() int { serverMutex.RLock() defer serverMutex.RUnlock() return len(Servers) } func RangeServers(f func(*Server), svrType string) { serverMutex.RLock() defer serverMutex.RUnlock() for _, v := range Servers { if v.Type == svrType { f(v) } } } // // 支付服务 // func GetWxPayServer(group int) *Server { // return GetServer(ServerType_WxPay) // } // func GetToolboxServer() *Server { // return GetServer("toolbox") // } // func GetGroupServer(serverType string, group int) *Server { // serverMutex.RLock() // defer serverMutex.RUnlock() // var minLoadServer *Server = nil // for _, v := range Servers { // if v.Group != group { // continue // } // if v.Type != serverType { // continue // } // minLoadServer = v // break // } // return minLoadServer // } // func GetServersId(serverType string) []int { // serverMutex.RLock() // defer serverMutex.RUnlock() // var lst []int // for _, v := range Servers { // if v.Type != serverType { // continue // } // lst = append(lst, v.ID) // } // return lst // }