process.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. package servers
  2. import (
  3. "leafstalk/conf"
  4. msg "leafstalk/covenant/pbfmsg"
  5. "sync"
  6. "time"
  7. "leafstalk/log"
  8. "leafstalk/network/cluster"
  9. )
  10. type OfflineServer struct {
  11. Id int64
  12. Tick int64
  13. }
  14. var DisconnectMutex sync.Mutex
  15. var DisconnectServers map[int64]*OfflineServer
  16. // rpcNewClusterAgent 有新的服务器链接,添加服务器
  17. func OnRpcNewClusterAgent(args []interface{}, conf *conf.Config) {
  18. a := args[0].(*cluster.ServerAgent)
  19. isListenServer := args[1].(bool)
  20. log.Infof("rpcNewClusterAgent %v", a.RemoteAddr())
  21. //加入认证列表,等待认证
  22. server := NewServer(a)
  23. a.SetUserData(server)
  24. AddWaitAuthServer(server, isListenServer)
  25. if isListenServer {
  26. IntroduceServerSelf(server, conf)
  27. }
  28. //认证过程可能比较慢,需要等待对方响应
  29. //主动连接方需要主动发认证消息
  30. //连接断开时,就不需要认证了
  31. }
  32. // rpcCloseClusterAgent 服务器断开链接
  33. func OnRpcCloseClusterAgent(args []interface{}) {
  34. a := args[0].(*cluster.ServerAgent)
  35. log.Infof("rpcCloseClusterAgent %v", a.RemoteAddr())
  36. if server := ResolveServerAgent(a); server != nil {
  37. if server.ID > 0 {
  38. AddDisconnectServer(server.ID)
  39. }
  40. RemoveServer(server)
  41. RemoveAuthServer(server)
  42. } else {
  43. log.Errorf("RemoveServer but userData is nil")
  44. }
  45. //与服务器断开连接,
  46. //如果对方是游戏服务器的话,是否删除所有在线玩家,需要看是否无状态
  47. }
  48. // IntroduceServerSelf 服务器之间进行认证
  49. func IntroduceServerSelf(server *Server, conf *conf.Config) {
  50. log.Infof("IntroduceServerSelf %v", server)
  51. //发送登录消息
  52. id := conf.GetInt64("cluster.id")
  53. serverType := conf.GetString("cluster.servertype")
  54. lines := conf.GetIntSlice("cluster.lines")
  55. addr := conf.GetString("gate.toclient.wsaddr")
  56. msg1 := msg.ServerLogin{
  57. ID: id,
  58. Type: serverType,
  59. GateAddr: addr,
  60. }
  61. for _, v := range lines {
  62. msg1.Lines = append(msg1.Lines, int32(v))
  63. }
  64. server.Agent.WriteMsg(&msg1)
  65. }
  66. func AuthAllServer(conf *conf.Config) {
  67. RangeListenServer(func(server *Server) {
  68. IntroduceServerSelf(server, conf)
  69. })
  70. }
  71. func HandleServerLogin(args []interface{}, conf *conf.Config) {
  72. a := args[1].(*cluster.ServerAgent)
  73. request := args[0].(*msg.ServerLogin)
  74. if request.ID == 0 {
  75. log.Warnf("handleServerLogin faild, id can not be 0, %v %v", request.Type, request.GateAddr)
  76. return
  77. }
  78. log.Infof("handleServerLogin, id=%v, type=%v", request.ID, request.Type)
  79. if server := ResolveServerAgent(a); server != nil {
  80. server.ID = request.ID
  81. server.Type = request.Type
  82. server.Addr = request.GateAddr
  83. for _, v := range request.Lines {
  84. server.Lines = append(server.Lines, int(v))
  85. }
  86. RemoveDisconnectServer(server.ID)
  87. RemoveAuthServer(server)
  88. AddServer(server)
  89. //回应
  90. id := conf.GetInt64("cluster.id")
  91. serverType := conf.GetString("cluster.servertype")
  92. lines := conf.GetIntSlice("cluster.lines")
  93. addr := conf.GetString("gate.toclient.wsaddr")
  94. msg1 := msg.ResponseServerLogin{
  95. ID: id,
  96. Type: serverType,
  97. GateAddr: addr,
  98. // Lines: int32(group),
  99. }
  100. for _, v := range lines {
  101. msg1.Lines = append(msg1.Lines, int32(v))
  102. }
  103. a.WriteMsg(&msg1)
  104. }
  105. }
  106. func HandleResponseServerLogin(args []interface{}) {
  107. a := args[1].(*cluster.ServerAgent)
  108. request := args[0].(*msg.ResponseServerLogin)
  109. if request.ID == 0 {
  110. log.Warnf("handleResponseServerLogin faild, id can not be 0, %v %v", request.Type, request.GateAddr)
  111. return
  112. }
  113. log.Infof("handleResponseServerLogin, id=%v, type=%v", request.ID, request.Type)
  114. if a.UserData() != nil {
  115. if server, ok := a.UserData().(*Server); ok {
  116. server.ID = request.ID
  117. server.Type = request.Type
  118. server.Addr = request.GateAddr
  119. // server.Group = int(request.Group)
  120. for _, v := range request.Lines {
  121. server.Lines = append(server.Lines, int(v))
  122. }
  123. RemoveDisconnectServer(server.ID)
  124. RemoveAuthServer(server)
  125. AddServer(server)
  126. }
  127. }
  128. }
  129. func ResolveServerAgent(agent *cluster.ServerAgent) *Server {
  130. if agent == nil {
  131. return nil
  132. }
  133. userData := agent.UserData()
  134. if userData == nil {
  135. return nil
  136. }
  137. switch v := userData.(type) {
  138. case int64:
  139. return nil
  140. case *Server:
  141. return v
  142. }
  143. return nil
  144. }
  145. // 添加离线服务器
  146. func AddDisconnectServer(id int64) {
  147. if id <= 0 {
  148. return
  149. }
  150. DisconnectMutex.Lock()
  151. defer DisconnectMutex.Unlock()
  152. if DisconnectServers == nil {
  153. DisconnectServers = make(map[int64]*OfflineServer)
  154. }
  155. for _, v := range DisconnectServers {
  156. if v.Id == id {
  157. return
  158. }
  159. }
  160. s := new(OfflineServer)
  161. s.Id = id
  162. s.Tick = time.Now().Unix()
  163. DisconnectServers[id] = s
  164. }
  165. func RemoveDisconnectServer(id int64) {
  166. DisconnectMutex.Lock()
  167. defer DisconnectMutex.Unlock()
  168. delete(DisconnectServers, id)
  169. }
  170. // 检查离线时长
  171. func CheckOfflineTimeLong(id int64) int64 {
  172. DisconnectMutex.Lock()
  173. defer DisconnectMutex.Unlock()
  174. var tl int64 = -1
  175. for _, v := range DisconnectServers {
  176. if v.Id == id {
  177. c := time.Now().Unix()
  178. tl = c - v.Tick
  179. if tl < 0 {
  180. v.Tick = c
  181. tl = 0
  182. }
  183. break
  184. }
  185. }
  186. return tl
  187. }
  188. // 获取离线超时服务器 S
  189. func GetOfflineServer(timeOut int64) []int64 {
  190. DisconnectMutex.Lock()
  191. defer DisconnectMutex.Unlock()
  192. var lst []int64
  193. c := time.Now().Unix()
  194. for _, v := range DisconnectServers {
  195. if c-v.Tick > timeOut {
  196. lst = append(lst, v.Id)
  197. }
  198. }
  199. return lst
  200. }
  201. // func IsDisconnectServer(id int) bool {
  202. // for _, v := range DisconnectServers {
  203. // if v.Id == id {
  204. // return true
  205. // }
  206. // }
  207. // return false
  208. // }