servers.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511
  1. // 实现连接服务器的管理
  2. // 因为所连服务器的数目一般都比较少,不超过80个,所以用切片实现
  3. // 服务器供多个协程使用,通过加锁实现互斥访问
  4. // 实现接口,可以设置服务器为预关闭状态
  5. // 部分服务器有分组Group,部分服务器没分组
  6. // 分组是分组更新的意思,执行灰度更新,有分组的服务器需要同组相连
  7. //无分组的服务器是无状态的服务器,可以开多个,数据处理时:
  8. // 1、根据玩家ID%服务器数量选择
  9. // 2、一致性选择服务器
  10. package servers
  11. import (
  12. sjson "encoding/json"
  13. "errors"
  14. "leafstalk/covenant/model"
  15. "leafstalk/log"
  16. "leafstalk/network/cluster"
  17. "leafstalk/otherutils"
  18. "reflect"
  19. "sync"
  20. "sync/atomic"
  21. )
  22. type Server struct {
  23. ID int64
  24. Type string
  25. Addr string
  26. RunState int32
  27. Load int32
  28. Agent *cluster.ServerAgent
  29. Lines []int
  30. // Group int
  31. }
  32. var (
  33. Servers []*Server
  34. serverMutex sync.RWMutex //sync.Mutex
  35. // serverGroup int
  36. serverLines []int
  37. )
  38. const (
  39. ServerType_Login = "login"
  40. ServerType_Gate = "gate"
  41. ServerTypeWorld = "world"
  42. ServerType_Archive = "archive"
  43. ServerTypeStore = "store"
  44. ServerType_WxPay = "wepay"
  45. ServerType_GM = "gm"
  46. ServerType_Mproom = "mproom"
  47. ServerType_ChpaterSync = "chaptersync"
  48. ServerType_ChpaterMatch = "chaptermatch"
  49. RunState_Normal = 0
  50. RunState_Stop = 1
  51. RunState_PrepareStop = 2
  52. )
  53. func SetServerLines(lines []int) {
  54. serverLines = lines
  55. }
  56. func GetServerLines() []int {
  57. return serverLines
  58. }
  59. func (s *Server) IsRunNormal() bool {
  60. return s.RunState == RunState_Normal
  61. }
  62. func NewServer(agent *cluster.ServerAgent) *Server {
  63. server := new(Server)
  64. server.Agent = agent
  65. return server
  66. }
  67. func AddServer(server *Server) int {
  68. serverMutex.Lock()
  69. defer serverMutex.Unlock()
  70. for _, v := range Servers {
  71. if v == server {
  72. return len(Servers)
  73. }
  74. if v.ID == server.ID {
  75. v.Agent.Close()
  76. }
  77. }
  78. Servers = append(Servers, server)
  79. return len(Servers)
  80. }
  81. func RemoveServer(server *Server) int {
  82. serverMutex.Lock()
  83. defer serverMutex.Unlock()
  84. i := 0
  85. for _, v := range Servers {
  86. if v != server {
  87. Servers[i] = v
  88. i++
  89. }
  90. }
  91. Servers = Servers[:i]
  92. //RemoveAuthServer(server)
  93. return len(Servers)
  94. }
  95. // GetTypeServer 查询指定类型的服务器
  96. // 只有一个某种类型服务的情况
  97. func GetTypeServer(serverType string) *Server {
  98. serverMutex.RLock()
  99. defer serverMutex.RUnlock()
  100. for _, v := range Servers {
  101. if v.Type != serverType {
  102. continue
  103. }
  104. return v
  105. }
  106. return nil
  107. }
  108. // GetChpaterSyncServer 返回房间数最少的
  109. func GetChpaterSyncServer(roomid int64) *Server {
  110. serverMutex.RLock()
  111. defer serverMutex.RUnlock()
  112. min := int32(99999) //
  113. var server *Server
  114. for _, v := range Servers {
  115. if v.Type != ServerType_ChpaterSync {
  116. continue
  117. }
  118. // 寻找最小
  119. if v.Load < min {
  120. min = v.Load
  121. server = v
  122. }
  123. }
  124. return server
  125. }
  126. // 查询指定线路 类型的服务器
  127. // 多条线路 多个同种服务
  128. func GetServerByTypeAndLine(serverType string, line int) *Server {
  129. serverMutex.RLock()
  130. defer serverMutex.RUnlock()
  131. for _, v := range Servers {
  132. if v.Type != serverType {
  133. continue
  134. }
  135. for _, l := range v.Lines {
  136. if l == line {
  137. return v
  138. }
  139. }
  140. }
  141. return nil
  142. }
  143. func GetServerById(id int64) *Server {
  144. serverMutex.RLock()
  145. defer serverMutex.RUnlock()
  146. for _, v := range Servers {
  147. if v.ID == id {
  148. return v
  149. }
  150. }
  151. return nil
  152. }
  153. // func GetGateServer() *Server {
  154. // return GetTypeServer(ServerType_Gate)
  155. // }
  156. // GetLoginServer 登陆服务器
  157. // 连接一个Login的情况
  158. func GetLoginServer() *Server {
  159. return GetTypeServer(ServerType_Login)
  160. }
  161. // GetArchiveServer 世界服务器
  162. // 连接一个Archive的情况
  163. func GetArchiveServer() *Server {
  164. return GetTypeServer(ServerType_Archive)
  165. }
  166. // // GetWorldServerByGroup 世界服务器
  167. // func GetWorldServerByGroup(group int) *Server {
  168. // return GetGroupServer(ServerType_World, group)
  169. // }
  170. // 连接一个world服务的情况
  171. func GetWorldServer() *Server {
  172. return GetTypeServer(ServerTypeWorld)
  173. }
  174. func GetWorldServerByPlayerId(playerId int64) *Server {
  175. _, l := model.SplitDocId(playerId)
  176. s := GetServerByTypeAndLine(ServerTypeWorld, l)
  177. return s
  178. }
  179. func GetWorldAgent() *cluster.ServerAgent {
  180. s := GetTypeServer(ServerTypeWorld)
  181. if s != nil {
  182. return s.Agent
  183. }
  184. return nil
  185. }
  186. // 杂货商店
  187. // func GetStoreServerByGroup(group int) *Server {
  188. // return GetGroupServer(ServerType_Store, group)
  189. // }
  190. // 连接一个Store服务的情况
  191. func GetStoreServer() *Server {
  192. return GetTypeServer(ServerTypeStore)
  193. }
  194. // store服务
  195. func GetStoreAgent() *cluster.ServerAgent {
  196. s := GetTypeServer(ServerTypeStore)
  197. if s != nil {
  198. return s.Agent
  199. }
  200. return nil
  201. }
  202. // 连接一个chapter服务的情况
  203. // func GetChapterServerByGroup(group int) *Server {
  204. // return GetGroupServer("chapter", group)
  205. // }
  206. func GetChapterServer() *Server {
  207. return GetTypeServer("chapter")
  208. }
  209. func GetChapterAgent() *cluster.ServerAgent {
  210. s := GetTypeServer("chapter")
  211. if s != nil {
  212. return s.Agent
  213. }
  214. return nil
  215. }
  216. func GetChapterServerByPlayerId(playerId int64) *Server {
  217. _, l := model.SplitDocId(playerId)
  218. s := GetServerByTypeAndLine("chapter", l)
  219. return s
  220. }
  221. func GetChapterMatchServer() *Server {
  222. return GetTypeServer(ServerType_ChpaterMatch)
  223. }
  224. func GetChapterMatchAgent() *cluster.ServerAgent {
  225. s := GetTypeServer(ServerType_ChpaterMatch)
  226. if s != nil {
  227. return s.Agent
  228. }
  229. return nil
  230. }
  231. // 只有一个room服务的情况
  232. // func GetRoomServerByGroup(group int) *Server {
  233. // return GetGroupServer(ServerType_Mproom, group)
  234. // }
  235. func GetMpRoomServer() *Server {
  236. return GetTypeServer(ServerType_Mproom)
  237. }
  238. func GetGMServer() *Server {
  239. return GetTypeServer(ServerType_GM)
  240. }
  241. func GetChapterSyncServer(roomId int64) *Server {
  242. return GetChpaterSyncServer(roomId) // GetTypeServer(ServerType_ChpaterSync)
  243. }
  244. // 是否是自己的线路
  245. func IsSelfLine(line int) bool {
  246. for _, v := range serverLines {
  247. if v == line {
  248. return true
  249. }
  250. }
  251. return false
  252. }
  253. // 改变负载
  254. func UpdateLoad(id int64, load int) *Server {
  255. server := GetServerById(id)
  256. if server != nil {
  257. atomic.StoreInt32(&server.Load, int32(load))
  258. }
  259. return server
  260. }
  261. func (s *Server) IncreaseLoad() {
  262. atomic.AddInt32(&s.Load, 1)
  263. }
  264. func (s *Server) SendToServer(Msg interface{}, playerId int64, gateId int64, rpcMsgId int64) error {
  265. if s.Agent == nil {
  266. return errors.New("agent is nil, server is disconnect")
  267. }
  268. msgType := reflect.TypeOf(Msg)
  269. var msgName string
  270. if msgType != nil && msgType.Kind() == reflect.Ptr {
  271. msgName = msgType.Elem().Name()
  272. }
  273. log.Infof("SendToServer %v %d, playerId:%v", msgName, gateId, playerId)
  274. return s.Agent.WriteServerRouteMsg(Msg, playerId, gateId, rpcMsgId)
  275. }
  276. func (s *Server) SendNotifyToServer(Msg interface{}) error {
  277. if s.Agent == nil {
  278. return errors.New("agent is nil, server is disconnect")
  279. }
  280. return s.Agent.WriteServerRouteMsg(Msg, 0, 0, 0)
  281. }
  282. // RemoteProcessMsg 发送给其他服务处理,PlayerAgent必须有值,并且ServerAgent指向gate
  283. // func (s *Server) RemoteProcessMsg(Msg interface{}, player *PlayerAgent) error {
  284. // if s.Agent == nil {
  285. // return errors.New("agent is nil, server is disconnected")
  286. // }
  287. // return s.Agent.WriteServerRouteMsg(Msg, player.PlayerId, player.GateId)
  288. // }
  289. func SendToPlayer(playerId int64, Msg interface{}, gateId int64) error {
  290. server := GetServerById(gateId)
  291. if server == nil {
  292. return errors.New("SendToPlayer no found gate server")
  293. }
  294. if server.Agent == nil {
  295. return errors.New("SendToPlayer agent is nil, server is disconnect")
  296. }
  297. msgType := reflect.TypeOf(Msg)
  298. var msgName string
  299. if msgType != nil && msgType.Kind() == reflect.Ptr {
  300. msgName = msgType.Elem().Name()
  301. }
  302. data := []byte("...")
  303. switch msgName {
  304. case "ResponseSyncRivalInfo":
  305. default:
  306. data, _ = sjson.Marshal(Msg)
  307. if len(data) > 2048 {
  308. data = data[:2048]
  309. }
  310. }
  311. log.Info(msgName, gateId, "playerId:", playerId, string(data))
  312. _, err := server.Agent.WriteClientRouteMsg(Msg, playerId)
  313. return err
  314. }
  315. func SendToPlayerNoLog(playerId int64, Msg interface{}, gateId int64) error {
  316. server := GetServerById(gateId)
  317. if server == nil {
  318. return errors.New("SendToPlayer no found gate server")
  319. }
  320. if server.Agent == nil {
  321. return errors.New("SendToPlayer agent is nil, server is disconnect")
  322. }
  323. _, err := server.Agent.WriteClientRouteMsg(Msg, playerId)
  324. return err
  325. }
  326. // 广播发送, flag=-12:发送给数据包指定的多个玩家;=-11:广播发送给所有在线玩家
  327. func SendToPlayersByGates(flag int64, Msg interface{}) error {
  328. return SendToSomePlayersByGates(Msg, flag)
  329. // log.Infof("SendToPlayersByGates flag:%v, Msg:%+v", flag, otherutils.DumpToJSON(Msg))
  330. // var data [][]byte
  331. // var err error
  332. // RangeServers(func(server *Server) {
  333. // if server == nil {
  334. // return
  335. // }
  336. // if server.Agent == nil {
  337. // return
  338. // }
  339. // if err != nil {
  340. // return
  341. // }
  342. // if data == nil {
  343. // data, err = server.Agent.WriteClientRouteMsg(Msg, flag)
  344. // } else {
  345. // server.Agent.WriteRawMsg(data...)
  346. // }
  347. // }, ServerType_Gate)
  348. // return err
  349. }
  350. func SendToSomePlayersByGates(Msg interface{}, playerIds ...int64) error {
  351. log.Infof("SendToPlayersByGates playerIds num:%v, Msg:%+v", len(playerIds), otherutils.DumpToJSON(Msg))
  352. var data [][]byte
  353. var err error
  354. RangeServers(func(server *Server) {
  355. if server == nil {
  356. return
  357. }
  358. if server.Agent == nil {
  359. return
  360. }
  361. if err != nil {
  362. return
  363. }
  364. if data == nil {
  365. data, err = server.Agent.WriteClientRouteMsg(Msg, playerIds...)
  366. } else {
  367. server.Agent.WriteRawMsg(data...)
  368. }
  369. }, ServerType_Gate)
  370. return err
  371. }
  372. // 改变运行状态
  373. func UpdateRunState(id int64, state int) *Server {
  374. server := GetServerById(id)
  375. if server != nil {
  376. atomic.StoreInt32(&server.RunState, int32(state))
  377. }
  378. return server
  379. }
  380. func Count() int {
  381. serverMutex.RLock()
  382. defer serverMutex.RUnlock()
  383. return len(Servers)
  384. }
  385. func RangeServers(f func(*Server), svrType string) {
  386. serverMutex.RLock()
  387. defer serverMutex.RUnlock()
  388. for _, v := range Servers {
  389. if v.Type == svrType {
  390. f(v)
  391. }
  392. }
  393. }
  394. // // 支付服务
  395. // func GetWxPayServer(group int) *Server {
  396. // return GetServer(ServerType_WxPay)
  397. // }
  398. // func GetToolboxServer() *Server {
  399. // return GetServer("toolbox")
  400. // }
  401. // func GetGroupServer(serverType string, group int) *Server {
  402. // serverMutex.RLock()
  403. // defer serverMutex.RUnlock()
  404. // var minLoadServer *Server = nil
  405. // for _, v := range Servers {
  406. // if v.Group != group {
  407. // continue
  408. // }
  409. // if v.Type != serverType {
  410. // continue
  411. // }
  412. // minLoadServer = v
  413. // break
  414. // }
  415. // return minLoadServer
  416. // }
  417. // func GetServersId(serverType string) []int {
  418. // serverMutex.RLock()
  419. // defer serverMutex.RUnlock()
  420. // var lst []int
  421. // for _, v := range Servers {
  422. // if v.Type != serverType {
  423. // continue
  424. // }
  425. // lst = append(lst, v.ID)
  426. // }
  427. // return lst
  428. // }