servers.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487
  1. // 实现连接服务器的管理
  2. // 因为所连服务器的数目一般都比较少,不超过80个,所以用切片实现
  3. // 服务器供多个协程使用,通过加锁实现互斥访问
  4. // 实现接口,可以设置服务器为预关闭状态
  5. // 部分服务器有分组Group,部分服务器没分组
  6. // 分组是分组更新的意思,执行灰度更新,有分组的服务器需要同组相连
  7. //无分组的服务器是无状态的服务器,可以开多个,数据处理时:
  8. // 1、根据玩家ID%服务器数量选择
  9. // 2、一致性选择服务器
  10. package servers
  11. import (
  12. sjson "encoding/json"
  13. "errors"
  14. "leafstalk/covenant/model"
  15. "leafstalk/log"
  16. "leafstalk/network/cluster"
  17. "leafstalk/otherutils"
  18. "reflect"
  19. "sync"
  20. "sync/atomic"
  21. )
  22. type Server struct {
  23. ID int64
  24. Type string
  25. Addr string
  26. RunState int32
  27. Load int32
  28. Agent *cluster.ServerAgent
  29. Lines []int
  30. // Group int
  31. }
  32. var (
  33. Servers []*Server
  34. serverMutex sync.RWMutex //sync.Mutex
  35. // serverGroup int
  36. serverLines []int
  37. )
  38. const (
  39. ServerType_Login = "login"
  40. ServerType_Gate = "gate"
  41. ServerTypeWorld = "world"
  42. // ServerType_Archive = "archive"
  43. // ServerTypeStore = "store"
  44. // ServerType_WxPay = "wepay"
  45. ServerType_GM = "gm"
  46. // ServerType_Mproom = "mproom"
  47. // ServerType_Chpater = "chapter"
  48. // ServerTypeChpaterSync = "chaptersync"
  49. ServerTypeChpaterMatch = "chaptermatch"
  50. ServerTypeFrameSync = "framesync"
  51. )
  52. const (
  53. RunState_Normal = 0
  54. RunState_Stop = 1
  55. RunState_PrepareStop = 2
  56. )
  57. func SetServerLines(lines []int) {
  58. serverLines = lines
  59. }
  60. func GetServerLines() []int {
  61. return serverLines
  62. }
  63. func (s *Server) IsRunNormal() bool {
  64. return s.RunState == RunState_Normal
  65. }
  66. func NewServer(agent *cluster.ServerAgent) *Server {
  67. server := new(Server)
  68. server.Agent = agent
  69. return server
  70. }
  71. func AddServer(server *Server) int {
  72. serverMutex.Lock()
  73. defer serverMutex.Unlock()
  74. for _, v := range Servers {
  75. if v == server {
  76. return len(Servers)
  77. }
  78. if v.ID == server.ID {
  79. v.Agent.Close()
  80. }
  81. }
  82. Servers = append(Servers, server)
  83. return len(Servers)
  84. }
  85. func RemoveServer(server *Server) int {
  86. serverMutex.Lock()
  87. defer serverMutex.Unlock()
  88. i := 0
  89. for _, v := range Servers {
  90. if v != server {
  91. Servers[i] = v
  92. i++
  93. }
  94. }
  95. Servers = Servers[:i]
  96. //RemoveAuthServer(server)
  97. return len(Servers)
  98. }
  99. // GetTypeServer 查询指定类型的服务器
  100. // 只有一个某种类型服务的情况
  101. func GetTypeServer(serverType string) *Server {
  102. serverMutex.RLock()
  103. defer serverMutex.RUnlock()
  104. for _, v := range Servers {
  105. if v.Type != serverType {
  106. continue
  107. }
  108. return v
  109. }
  110. return nil
  111. }
  112. // GetFrameSyncServer 返回房间数最少的
  113. func GetFrameSyncServerByCheckLoad() *Server {
  114. serverMutex.RLock()
  115. defer serverMutex.RUnlock()
  116. min := int32(99999) //
  117. var server *Server
  118. for _, v := range Servers {
  119. if v.Type != ServerTypeFrameSync {
  120. continue
  121. }
  122. // 寻找最小
  123. if v.Load < min {
  124. min = v.Load
  125. server = v
  126. }
  127. }
  128. return server
  129. }
  130. func GetFrameSyncServerByServerId(id int64) *Server {
  131. serverMutex.RLock()
  132. defer serverMutex.RUnlock()
  133. for _, v := range Servers {
  134. if v.Type != ServerTypeFrameSync {
  135. continue
  136. }
  137. // 寻找最小
  138. if v.ID == id {
  139. return v
  140. }
  141. }
  142. return nil
  143. }
  144. // 查询指定线路 类型的服务器
  145. // 多条线路 多个同种服务
  146. func GetServerByTypeAndLine(serverType string, line int) *Server {
  147. serverMutex.RLock()
  148. defer serverMutex.RUnlock()
  149. for _, v := range Servers {
  150. if v.Type != serverType {
  151. continue
  152. }
  153. for _, l := range v.Lines {
  154. if l == line {
  155. return v
  156. }
  157. }
  158. }
  159. return nil
  160. }
  161. func GetServerById(id int64) *Server {
  162. serverMutex.RLock()
  163. defer serverMutex.RUnlock()
  164. for _, v := range Servers {
  165. if v.ID == id {
  166. return v
  167. }
  168. }
  169. return nil
  170. }
  171. // func GetGateServer() *Server {
  172. // return GetTypeServer(ServerType_Gate)
  173. // }
  174. // GetLoginServer 登陆服务器
  175. // 连接一个Login的情况
  176. func GetLoginServer() *Server {
  177. return GetTypeServer(ServerType_Login)
  178. }
  179. // GetArchiveServer 世界服务器
  180. // 连接一个Archive的情况
  181. // func GetArchiveServer() *Server {
  182. // return GetTypeServer(ServerType_Archive)
  183. // }
  184. // 连接一个world服务的情况
  185. func GetWorldServer() *Server {
  186. return GetTypeServer(ServerTypeWorld)
  187. }
  188. func GetWorldServerByPlayerId(playerId int64) *Server {
  189. _, l := model.SplitDocId(playerId)
  190. s := GetServerByTypeAndLine(ServerTypeWorld, l)
  191. return s
  192. }
  193. func GetWorldAgent() *cluster.ServerAgent {
  194. s := GetTypeServer(ServerTypeWorld)
  195. if s != nil {
  196. return s.Agent
  197. }
  198. return nil
  199. }
  200. // 杂货商店
  201. // func GetStoreServerByGroup(group int) *Server {
  202. // return GetGroupServer(ServerType_Store, group)
  203. // }
  204. // 连接一个framesync服务的情况
  205. func GetFrameSyncServer() *Server {
  206. return GetTypeServer(ServerTypeFrameSync)
  207. }
  208. // framesync服务
  209. func GetFrameSyncAgent() *cluster.ServerAgent {
  210. s := GetTypeServer(ServerTypeFrameSync)
  211. if s != nil {
  212. return s.Agent
  213. }
  214. return nil
  215. }
  216. // 连接一个chapter服务的情况
  217. // func GetChapterServerByGroup(group int) *Server {
  218. // return GetGroupServer("chapter", group)
  219. // }
  220. // func GetChapterServer() *Server {
  221. // return GetTypeServer(ServerType_Chpater)
  222. // }
  223. // func GetChapterAgent() *cluster.ServerAgent {
  224. // s := GetTypeServer(ServerType_Chpater)
  225. // if s != nil {
  226. // return s.Agent
  227. // }
  228. // return nil
  229. // }
  230. // func GetFrameSyncServer() *Server {
  231. // return GetTypeServer(ServerTypeFrameSync)
  232. // }
  233. // func GetFrameSyncAgent() *cluster.ServerAgent {
  234. // s := GetTypeServer(ServerTypeFrameSync)
  235. // if s != nil {
  236. // return s.Agent
  237. // }
  238. // return nil
  239. // }
  240. // func GetChapterServerByPlayerId(playerId int64) *Server {
  241. // _, l := model.SplitDocId(playerId)
  242. // s := GetServerByTypeAndLine(ServerType_Chpater, l)
  243. // return s
  244. // }
  245. func GetChapterMatchServer() *Server {
  246. return GetTypeServer(ServerTypeChpaterMatch)
  247. }
  248. func GetChapterMatchAgent() *cluster.ServerAgent {
  249. s := GetTypeServer(ServerTypeChpaterMatch)
  250. if s != nil {
  251. return s.Agent
  252. }
  253. return nil
  254. }
  255. // 只有一个room服务的情况
  256. // func GetRoomServerByGroup(group int) *Server {
  257. // return GetGroupServer(ServerType_Mproom, group)
  258. // }
  259. // func GetMpRoomServer() *Server {
  260. // return GetTypeServer(ServerType_Mproom)
  261. // }
  262. func GetGMServer() *Server {
  263. return GetTypeServer(ServerType_GM)
  264. }
  265. // func GetChapterSyncServer(roomId int64) *Server {
  266. // return GetChpaterSyncServer(roomId) // GetTypeServer(ServerType_ChpaterSync)
  267. // }
  268. func GetSocialServer() *Server {
  269. return GetTypeServer("social")
  270. }
  271. func GetSocialServerAgent() *cluster.ServerAgent {
  272. s := GetTypeServer("social")
  273. if s != nil {
  274. return s.Agent
  275. }
  276. return nil
  277. }
  278. // 是否是自己的线路
  279. func IsSelfLine(line int) bool {
  280. for _, v := range serverLines {
  281. if v == line {
  282. return true
  283. }
  284. }
  285. return false
  286. }
  287. // 改变负载
  288. func UpdateLoad(id int64, load int) *Server {
  289. server := GetServerById(id)
  290. if server != nil {
  291. atomic.StoreInt32(&server.Load, int32(load))
  292. }
  293. return server
  294. }
  295. func (s *Server) IncreaseLoad() {
  296. atomic.AddInt32(&s.Load, 1)
  297. }
  298. func (s *Server) SendToServer(Msg interface{}, playerId int64, gateId int64) error {
  299. if s.Agent == nil {
  300. return errors.New("agent is nil, server is disconnect")
  301. }
  302. msgType := reflect.TypeOf(Msg)
  303. var msgName string
  304. if msgType != nil && msgType.Kind() == reflect.Ptr {
  305. msgName = msgType.Elem().Name()
  306. }
  307. log.Infof("SendToServer %v %d, playerId:%v", msgName, gateId, playerId)
  308. return s.Agent.WriteServerRouteMsg(Msg, playerId, gateId, "")
  309. }
  310. func (s *Server) SendNotifyToServer(Msg interface{}) error {
  311. if s.Agent == nil {
  312. return errors.New("agent is nil, server is disconnect")
  313. }
  314. return s.Agent.WriteServerRouteMsg(Msg, 0, 0, "")
  315. }
  316. // RemoteProcessMsg 发送给其他服务处理,PlayerAgent必须有值,并且ServerAgent指向gate
  317. // func (s *Server) RemoteProcessMsg(Msg interface{}, player *PlayerAgent) error {
  318. // if s.Agent == nil {
  319. // return errors.New("agent is nil, server is disconnected")
  320. // }
  321. // return s.Agent.WriteServerRouteMsg(Msg, player.PlayerId, player.GateId)
  322. // }
  323. func SendToPlayer(playerId int64, Msg interface{}, gateId int64) error {
  324. server := GetServerById(gateId)
  325. if server == nil {
  326. return errors.New("SendToPlayer no found gate server")
  327. }
  328. if server.Agent == nil {
  329. return errors.New("SendToPlayer agent is nil, server is disconnect")
  330. }
  331. msgType := reflect.TypeOf(Msg)
  332. var msgName string
  333. if msgType != nil && msgType.Kind() == reflect.Ptr {
  334. msgName = msgType.Elem().Name()
  335. }
  336. data := []byte("...")
  337. switch msgName {
  338. case "ResponseSyncRivalInfo":
  339. default:
  340. data, _ = sjson.Marshal(Msg)
  341. if len(data) > 2048 {
  342. data = data[:2048]
  343. }
  344. }
  345. log.Info(msgName, gateId, "playerId:", playerId, string(data))
  346. _, err := server.Agent.WriteClientRouteMsg(Msg, playerId)
  347. return err
  348. }
  349. func SendToPlayerNoLog(playerId int64, Msg interface{}, gateId int64) error {
  350. server := GetServerById(gateId)
  351. if server == nil {
  352. return errors.New("SendToPlayer no found gate server")
  353. }
  354. if server.Agent == nil {
  355. return errors.New("SendToPlayer agent is nil, server is disconnect")
  356. }
  357. _, err := server.Agent.WriteClientRouteMsg(Msg, playerId)
  358. return err
  359. }
  360. // // 广播发送, flag=-13:发送给数据包指定的多个玩家;=-11:广播发送给所有在线玩家
  361. // func SendToPlayersByGates(flag int64, Msg interface{}) error {
  362. // return SendToSomePlayersByGates(Msg, flag)
  363. // }
  364. // -11:广播发送给所有在线玩家
  365. func SendToSomePlayersByGates(Msg interface{}, playerIds ...int64) error {
  366. log.Infof("SendToPlayersByGates playerIds num:%v, Msg:%+v", len(playerIds), otherutils.DumpToJSON(Msg))
  367. var data [][]byte
  368. var err error
  369. RangeServers(func(server *Server) {
  370. if server == nil {
  371. return
  372. }
  373. if server.Agent == nil {
  374. return
  375. }
  376. if err != nil {
  377. return
  378. }
  379. if data == nil {
  380. data, err = server.Agent.WriteClientRouteMsg(Msg, playerIds...)
  381. } else {
  382. server.Agent.WriteRawMsg(data...)
  383. }
  384. }, ServerType_Gate)
  385. return err
  386. }
  387. // 改变运行状态
  388. func UpdateRunState(id int64, state int) *Server {
  389. server := GetServerById(id)
  390. if server != nil {
  391. atomic.StoreInt32(&server.RunState, int32(state))
  392. }
  393. return server
  394. }
  395. func Count() int {
  396. serverMutex.RLock()
  397. defer serverMutex.RUnlock()
  398. return len(Servers)
  399. }
  400. func RangeServers(f func(*Server), svrType string) {
  401. serverMutex.RLock()
  402. defer serverMutex.RUnlock()
  403. for _, v := range Servers {
  404. if v.Type == svrType {
  405. f(v)
  406. }
  407. }
  408. }