123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511 |
- // 实现连接服务器的管理
- // 因为所连服务器的数目一般都比较少,不超过80个,所以用切片实现
- // 服务器供多个协程使用,通过加锁实现互斥访问
- // 实现接口,可以设置服务器为预关闭状态
- // 部分服务器有分组Group,部分服务器没分组
- // 分组是分组更新的意思,执行灰度更新,有分组的服务器需要同组相连
- //无分组的服务器是无状态的服务器,可以开多个,数据处理时:
- // 1、根据玩家ID%服务器数量选择
- // 2、一致性选择服务器
- package servers
- import (
- sjson "encoding/json"
- "errors"
- "leafstalk/covenant/model"
- "leafstalk/log"
- "leafstalk/network/cluster"
- "leafstalk/otherutils"
- "reflect"
- "sync"
- "sync/atomic"
- )
- type Server struct {
- ID int64
- Type string
- Addr string
- RunState int32
- Load int32
- Agent *cluster.ServerAgent
- Lines []int
- // Group int
- }
- var (
- Servers []*Server
- serverMutex sync.RWMutex //sync.Mutex
- // serverGroup int
- serverLines []int
- )
- const (
- ServerType_Login = "login"
- ServerType_Gate = "gate"
- ServerTypeWorld = "world"
- ServerType_Archive = "archive"
- ServerTypeStore = "store"
- ServerType_WxPay = "wepay"
- ServerType_GM = "gm"
- ServerType_Mproom = "mproom"
- ServerType_ChpaterSync = "chaptersync"
- ServerType_ChpaterMatch = "chaptermatch"
- RunState_Normal = 0
- RunState_Stop = 1
- RunState_PrepareStop = 2
- )
- func SetServerLines(lines []int) {
- serverLines = lines
- }
- func GetServerLines() []int {
- return serverLines
- }
- func (s *Server) IsRunNormal() bool {
- return s.RunState == RunState_Normal
- }
- func NewServer(agent *cluster.ServerAgent) *Server {
- server := new(Server)
- server.Agent = agent
- return server
- }
- func AddServer(server *Server) int {
- serverMutex.Lock()
- defer serverMutex.Unlock()
- for _, v := range Servers {
- if v == server {
- return len(Servers)
- }
- if v.ID == server.ID {
- v.Agent.Close()
- }
- }
- Servers = append(Servers, server)
- return len(Servers)
- }
- func RemoveServer(server *Server) int {
- serverMutex.Lock()
- defer serverMutex.Unlock()
- i := 0
- for _, v := range Servers {
- if v != server {
- Servers[i] = v
- i++
- }
- }
- Servers = Servers[:i]
- //RemoveAuthServer(server)
- return len(Servers)
- }
- // GetTypeServer 查询指定类型的服务器
- // 只有一个某种类型服务的情况
- func GetTypeServer(serverType string) *Server {
- serverMutex.RLock()
- defer serverMutex.RUnlock()
- for _, v := range Servers {
- if v.Type != serverType {
- continue
- }
- return v
- }
- return nil
- }
- // GetChpaterSyncServer 返回房间数最少的
- func GetChpaterSyncServer(roomid int64) *Server {
- serverMutex.RLock()
- defer serverMutex.RUnlock()
- min := int32(99999) //
- var server *Server
- for _, v := range Servers {
- if v.Type != ServerType_ChpaterSync {
- continue
- }
- // 寻找最小
- if v.Load < min {
- min = v.Load
- server = v
- }
- }
- return server
- }
- // 查询指定线路 类型的服务器
- // 多条线路 多个同种服务
- func GetServerByTypeAndLine(serverType string, line int) *Server {
- serverMutex.RLock()
- defer serverMutex.RUnlock()
- for _, v := range Servers {
- if v.Type != serverType {
- continue
- }
- for _, l := range v.Lines {
- if l == line {
- return v
- }
- }
- }
- return nil
- }
- func GetServerById(id int64) *Server {
- serverMutex.RLock()
- defer serverMutex.RUnlock()
- for _, v := range Servers {
- if v.ID == id {
- return v
- }
- }
- return nil
- }
- // func GetGateServer() *Server {
- // return GetTypeServer(ServerType_Gate)
- // }
- // GetLoginServer 登陆服务器
- // 连接一个Login的情况
- func GetLoginServer() *Server {
- return GetTypeServer(ServerType_Login)
- }
- // GetArchiveServer 世界服务器
- // 连接一个Archive的情况
- func GetArchiveServer() *Server {
- return GetTypeServer(ServerType_Archive)
- }
- // // GetWorldServerByGroup 世界服务器
- // func GetWorldServerByGroup(group int) *Server {
- // return GetGroupServer(ServerType_World, group)
- // }
- // 连接一个world服务的情况
- func GetWorldServer() *Server {
- return GetTypeServer(ServerTypeWorld)
- }
- func GetWorldServerByPlayerId(playerId int64) *Server {
- _, l := model.SplitDocId(playerId)
- s := GetServerByTypeAndLine(ServerTypeWorld, l)
- return s
- }
- func GetWorldAgent() *cluster.ServerAgent {
- s := GetTypeServer(ServerTypeWorld)
- if s != nil {
- return s.Agent
- }
- return nil
- }
- // 杂货商店
- // func GetStoreServerByGroup(group int) *Server {
- // return GetGroupServer(ServerType_Store, group)
- // }
- // 连接一个Store服务的情况
- func GetStoreServer() *Server {
- return GetTypeServer(ServerTypeStore)
- }
- // store服务
- func GetStoreAgent() *cluster.ServerAgent {
- s := GetTypeServer(ServerTypeStore)
- if s != nil {
- return s.Agent
- }
- return nil
- }
- // 连接一个chapter服务的情况
- // func GetChapterServerByGroup(group int) *Server {
- // return GetGroupServer("chapter", group)
- // }
- func GetChapterServer() *Server {
- return GetTypeServer("chapter")
- }
- func GetChapterAgent() *cluster.ServerAgent {
- s := GetTypeServer("chapter")
- if s != nil {
- return s.Agent
- }
- return nil
- }
- func GetChapterServerByPlayerId(playerId int64) *Server {
- _, l := model.SplitDocId(playerId)
- s := GetServerByTypeAndLine("chapter", l)
- return s
- }
- func GetChapterMatchServer() *Server {
- return GetTypeServer(ServerType_ChpaterMatch)
- }
- func GetChapterMatchAgent() *cluster.ServerAgent {
- s := GetTypeServer(ServerType_ChpaterMatch)
- if s != nil {
- return s.Agent
- }
- return nil
- }
- // 只有一个room服务的情况
- // func GetRoomServerByGroup(group int) *Server {
- // return GetGroupServer(ServerType_Mproom, group)
- // }
- func GetMpRoomServer() *Server {
- return GetTypeServer(ServerType_Mproom)
- }
- func GetGMServer() *Server {
- return GetTypeServer(ServerType_GM)
- }
- func GetChapterSyncServer(roomId int64) *Server {
- return GetChpaterSyncServer(roomId) // GetTypeServer(ServerType_ChpaterSync)
- }
- // 是否是自己的线路
- func IsSelfLine(line int) bool {
- for _, v := range serverLines {
- if v == line {
- return true
- }
- }
- return false
- }
- // 改变负载
- func UpdateLoad(id int64, load int) *Server {
- server := GetServerById(id)
- if server != nil {
- atomic.StoreInt32(&server.Load, int32(load))
- }
- return server
- }
- func (s *Server) IncreaseLoad() {
- atomic.AddInt32(&s.Load, 1)
- }
- func (s *Server) SendToServer(Msg interface{}, playerId int64, gateId int64, rpcMsgId int64) error {
- if s.Agent == nil {
- return errors.New("agent is nil, server is disconnect")
- }
- msgType := reflect.TypeOf(Msg)
- var msgName string
- if msgType != nil && msgType.Kind() == reflect.Ptr {
- msgName = msgType.Elem().Name()
- }
- log.Infof("SendToServer %v %d, playerId:%v", msgName, gateId, playerId)
- return s.Agent.WriteServerRouteMsg(Msg, playerId, gateId, rpcMsgId)
- }
- func (s *Server) SendNotifyToServer(Msg interface{}) error {
- if s.Agent == nil {
- return errors.New("agent is nil, server is disconnect")
- }
- return s.Agent.WriteServerRouteMsg(Msg, 0, 0, 0)
- }
- // RemoteProcessMsg 发送给其他服务处理,PlayerAgent必须有值,并且ServerAgent指向gate
- // func (s *Server) RemoteProcessMsg(Msg interface{}, player *PlayerAgent) error {
- // if s.Agent == nil {
- // return errors.New("agent is nil, server is disconnected")
- // }
- // return s.Agent.WriteServerRouteMsg(Msg, player.PlayerId, player.GateId)
- // }
- func SendToPlayer(playerId int64, Msg interface{}, gateId int64) error {
- server := GetServerById(gateId)
- if server == nil {
- return errors.New("SendToPlayer no found gate server")
- }
- if server.Agent == nil {
- return errors.New("SendToPlayer agent is nil, server is disconnect")
- }
- msgType := reflect.TypeOf(Msg)
- var msgName string
- if msgType != nil && msgType.Kind() == reflect.Ptr {
- msgName = msgType.Elem().Name()
- }
- data := []byte("...")
- switch msgName {
- case "ResponseSyncRivalInfo":
- default:
- data, _ = sjson.Marshal(Msg)
- if len(data) > 2048 {
- data = data[:2048]
- }
- }
- log.Info(msgName, gateId, "playerId:", playerId, string(data))
- _, err := server.Agent.WriteClientRouteMsg(Msg, playerId)
- return err
- }
- func SendToPlayerNoLog(playerId int64, Msg interface{}, gateId int64) error {
- server := GetServerById(gateId)
- if server == nil {
- return errors.New("SendToPlayer no found gate server")
- }
- if server.Agent == nil {
- return errors.New("SendToPlayer agent is nil, server is disconnect")
- }
- _, err := server.Agent.WriteClientRouteMsg(Msg, playerId)
- return err
- }
- // 广播发送, flag=-12:发送给数据包指定的多个玩家;=-11:广播发送给所有在线玩家
- func SendToPlayersByGates(flag int64, Msg interface{}) error {
- return SendToSomePlayersByGates(Msg, flag)
- // log.Infof("SendToPlayersByGates flag:%v, Msg:%+v", flag, otherutils.DumpToJSON(Msg))
- // var data [][]byte
- // var err error
- // RangeServers(func(server *Server) {
- // if server == nil {
- // return
- // }
- // if server.Agent == nil {
- // return
- // }
- // if err != nil {
- // return
- // }
- // if data == nil {
- // data, err = server.Agent.WriteClientRouteMsg(Msg, flag)
- // } else {
- // server.Agent.WriteRawMsg(data...)
- // }
- // }, ServerType_Gate)
- // return err
- }
- func SendToSomePlayersByGates(Msg interface{}, playerIds ...int64) error {
- log.Infof("SendToPlayersByGates playerIds num:%v, Msg:%+v", len(playerIds), otherutils.DumpToJSON(Msg))
- var data [][]byte
- var err error
- RangeServers(func(server *Server) {
- if server == nil {
- return
- }
- if server.Agent == nil {
- return
- }
- if err != nil {
- return
- }
- if data == nil {
- data, err = server.Agent.WriteClientRouteMsg(Msg, playerIds...)
- } else {
- server.Agent.WriteRawMsg(data...)
- }
- }, ServerType_Gate)
- return err
- }
- // 改变运行状态
- func UpdateRunState(id int64, state int) *Server {
- server := GetServerById(id)
- if server != nil {
- atomic.StoreInt32(&server.RunState, int32(state))
- }
- return server
- }
- func Count() int {
- serverMutex.RLock()
- defer serverMutex.RUnlock()
- return len(Servers)
- }
- func RangeServers(f func(*Server), svrType string) {
- serverMutex.RLock()
- defer serverMutex.RUnlock()
- for _, v := range Servers {
- if v.Type == svrType {
- f(v)
- }
- }
- }
- // // 支付服务
- // func GetWxPayServer(group int) *Server {
- // return GetServer(ServerType_WxPay)
- // }
- // func GetToolboxServer() *Server {
- // return GetServer("toolbox")
- // }
- // func GetGroupServer(serverType string, group int) *Server {
- // serverMutex.RLock()
- // defer serverMutex.RUnlock()
- // var minLoadServer *Server = nil
- // for _, v := range Servers {
- // if v.Group != group {
- // continue
- // }
- // if v.Type != serverType {
- // continue
- // }
- // minLoadServer = v
- // break
- // }
- // return minLoadServer
- // }
- // func GetServersId(serverType string) []int {
- // serverMutex.RLock()
- // defer serverMutex.RUnlock()
- // var lst []int
- // for _, v := range Servers {
- // if v.Type != serverType {
- // continue
- // }
- // lst = append(lst, v.ID)
- // }
- // return lst
- // }
|