cluster.go 8.8 KB


  1. package cluster
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "math"
  6. "net"
  7. "reflect"
  8. "sync"
  9. "time"
  10. "leafstalk/conf"
  11. gconf "leafstalk/conf"
  12. "leafstalk/covenant/pbfmsg"
  13. "leafstalk/log"
  14. "leafstalk/module/handler"
  15. "leafstalk/network"
  16. )
  17. // Cluster 1
  18. type Cluster struct {
  19. server *network.TCPServer
  20. clients []*network.TCPClient
  21. AgentChanRPC *handler.Server
  22. Processor network.Processor
  23. SubMsgProcessor network.Processor
  24. Conf *conf.Config
  25. }
  26. // NewCluster 构造
  27. func NewCluster(processor network.Processor, agentServer *handler.Server, conf *conf.Config) *Cluster {
  28. cluster := new(Cluster)
  29. cluster.AgentChanRPC = agentServer
  30. cluster.Processor = processor
  31. cluster.SubMsgProcessor = processor
  32. cluster.Conf = conf
  33. return cluster
  34. }
  35. // Init 1
  36. func (c *Cluster) Init(conf *gconf.Config) {
  37. log.Info("cluster Init...")
  38. if conf.GetString("cluster.listenaddr") != "" {
  39. c.server = new(network.TCPServer)
  40. c.server.Addr = conf.GetString("cluster.listenaddr")
  41. c.server.MaxConnNum = int(math.MaxInt32)
  42. c.server.PendingWriteNum = conf.GetInt("cluster.pendingwritenum")
  43. c.server.LenMsgLen = 4
  44. c.server.MaxMsgLen = math.MaxUint32
  45. c.server.NewAgent = newAgent(c)
  46. c.server.Start()
  47. }
  48. Addrs := []gconf.ConnAddr{}
  49. conf.UnmarshalKey("cluster.connaddrs", &Addrs)
  50. for _, addr := range Addrs {
  51. client := new(network.TCPClient)
  52. client.Addr = addr.Addr
  53. client.ConnNum = 1
  54. client.ConnectCount = 0
  55. client.MaxConnectCount = 0
  56. if conf.GetDuration("cluster.connectinterval") > 0 {
  57. client.AutoReconnect = true
  58. client.ConnectInterval = conf.GetDuration("cluster.connectinterval")
  59. } else {
  60. client.ConnectInterval = 3 * time.Second
  61. }
  62. client.PendingWriteNum = conf.GetInt("cluster.pendingwritenum")
  63. client.LenMsgLen = 4
  64. client.MaxMsgLen = math.MaxUint32
  65. tag := addr.Tag
  66. client.NewAgent = func(conn *network.TCPConn) network.Agent {
  67. log.Info("NewServerAgent")
  68. a := new(ServerAgent)
  69. a.conn = conn
  70. a.SetUserData(tag)
  71. a.cluster = c
  72. //log.Infoln("NewAgent..")
  73. if c.AgentChanRPC != nil {
  74. c.AgentChanRPC.Go("NewClusterAgent", a, true)
  75. } else {
  76. log.Warn("not set client AgentChanRPC")
  77. }
  78. //log.Infoln("NewAgent end")
  79. return a
  80. }
  81. //fmt.Println(client)
  82. //log.Infoln(client)
  83. client.Start()
  84. c.clients = append(c.clients, client)
  85. }
  86. }
  87. func (c *Cluster) ConnNewServer(addrAndPort string) {
  88. log.Info("ConnNewServer ", addrAndPort)
  89. conf := c.Conf
  90. addr := gconf.ConnAddr{
  91. Addr: addrAndPort,
  92. Tag: 1,
  93. }
  94. client := new(network.TCPClient)
  95. client.Addr = addr.Addr
  96. client.ConnNum = 1
  97. client.ConnectCount = 0
  98. client.MaxConnectCount = 1200
  99. client.AutoReconnect = true
  100. if conf.GetDuration("cluster.connectinterval") > 0 {
  101. client.ConnectInterval = conf.GetDuration("cluster.connectinterval")
  102. } else {
  103. client.ConnectInterval = 3 * time.Second
  104. }
  105. client.PendingWriteNum = conf.GetInt("cluster.pendingwritenum")
  106. client.LenMsgLen = 4
  107. client.MaxMsgLen = math.MaxUint32
  108. tag := addr.Tag
  109. client.NewAgent = func(conn *network.TCPConn) network.Agent {
  110. log.Info("NewAgent")
  111. a := new(ServerAgent)
  112. a.conn = conn
  113. a.SetUserData(tag)
  114. a.cluster = c
  115. if c.AgentChanRPC != nil {
  116. c.AgentChanRPC.Go("NewClusterAgent", a, true)
  117. } else {
  118. log.Warn("not set client AgentChanRPC")
  119. }
  120. return a
  121. }
  122. client.Start()
  123. c.clients = append(c.clients, client)
  124. }
  125. // Destroy 1
  126. func (c *Cluster) Destroy() {
  127. if c.server != nil {
  128. c.server.Close()
  129. }
  130. for _, client := range c.clients {
  131. client.Close()
  132. }
  133. }
  134. func (c *Cluster) OnInit() {
  135. c.Init(c.Conf)
  136. }
  137. func (c *Cluster) OnDestroy() {
  138. c.Destroy()
  139. }
  140. func (c *Cluster) Run(closeSig chan bool) {
  141. // for {
  142. // select {
  143. // case <-closeSig:
  144. // return
  145. // }
  146. }
  147. // ServerAgent 1
  148. type ServerAgent struct {
  149. conn *network.TCPConn
  150. userData interface{}
  151. cluster *Cluster
  152. mutexUserData sync.Mutex
  153. }
  154. func newAgent(c *Cluster) func(conn *network.TCPConn) network.Agent {
  155. return func(conn *network.TCPConn) network.Agent {
  156. a := new(ServerAgent)
  157. a.conn = conn
  158. a.cluster = c
  159. if a.cluster.AgentChanRPC != nil {
  160. a.cluster.AgentChanRPC.Go("NewClusterAgent", a, false)
  161. } else {
  162. log.Warn("newAgent not set client AgentChanRPC")
  163. }
  164. return a
  165. }
  166. }
  167. // Run 1
  168. func (a *ServerAgent) Run() {
  169. var tsRead time.Duration
  170. var tsRoute time.Duration
  171. var readCount uint
  172. for {
  173. ts := time.Now()
  174. data, err := a.conn.ReadMsg()
  175. if err != nil {
  176. log.Infof("read message error: %v", err)
  177. break
  178. }
  179. tsRead += time.Since(ts)
  180. if a.cluster.Processor != nil {
  181. msg, err := a.cluster.Processor.Unmarshal(data)
  182. if err != nil {
  183. log.Infof("unmarshal message error: %v", err)
  184. break
  185. }
  186. err = a.cluster.Processor.Route(msg, a)
  187. if err != nil {
  188. log.Infof("route message error: %v", err)
  189. break
  190. }
  191. }
  192. tsRoute += time.Since(ts)
  193. readCount++
  194. if readCount >= 1000000 {
  195. log.Infof("from server read count %v, read time: %v, route time: %v", readCount, tsRead, tsRoute)
  196. tsRoute = 0
  197. readCount = 0
  198. }
  199. }
  200. }
  201. // OnClose 1
  202. func (a *ServerAgent) OnClose() {
  203. if a.cluster.AgentChanRPC != nil {
  204. err := a.cluster.AgentChanRPC.Call0("CloseClusterAgent", a)
  205. if err != nil {
  206. log.Errorf("handler error: %v", err)
  207. }
  208. }
  209. }
  210. // WriteMsg 1
  211. func (a *ServerAgent) WriteMsg(msg interface{}) {
  212. if a.cluster.Processor != nil {
  213. data, err := a.cluster.Processor.Marshal(msg)
  214. if err != nil {
  215. log.Errorf("marshal message %v error: %v", reflect.TypeOf(msg), err)
  216. return
  217. }
  218. err = a.conn.WriteMsg(data...)
  219. if err != nil {
  220. log.Errorf("write message %v error: %v", reflect.TypeOf(msg), err)
  221. }
  222. }
  223. }
  224. // WriteRawMsg 服务器之间直接发送protobuf包
  225. func (a *ServerAgent) WriteRawMsg(args ...[]byte) {
  226. err := a.conn.WriteMsg(args...)
  227. if err != nil {
  228. log.Errorf("write message %v error: %v", args, err)
  229. }
  230. }
  231. // 服务器之间消息路由
  232. func (s *ServerAgent) WriteServerNotifyMsg(subMsg interface{}) error {
  233. return s.WriteServerRouteMsg(subMsg, 0, 0, 0)
  234. }
  235. // 服务器之间消息路由
  236. // rpc回应消息,需要rpcMsgId>0; 其他消息rpcMsgId=0
  237. func (a *ServerAgent) WriteServerRouteMsg(subMsg interface{}, playerId int64, gateId int64, rpcMsgId int64) error {
  238. if a.cluster.Processor != nil && a.cluster.SubMsgProcessor != nil {
  239. data, err := a.cluster.SubMsgProcessor.Marshal(subMsg)
  240. if err != nil {
  241. log.Errorf("marshal message %v error: %v", reflect.TypeOf(subMsg), err)
  242. return err
  243. }
  244. if len(data) > 1 {
  245. log.Errorf("marshal message %v error: data array too long", reflect.TypeOf(subMsg))
  246. return errors.New("contain too much data")
  247. }
  248. msg1 := new(pbfmsg.RouteMsg)
  249. msg1.Data = data[0]
  250. msg1.PlayerId = playerId
  251. msg1.GateId = gateId
  252. //msg1.MsgName = msgID //提醒禁止传msgName
  253. msg1.RpcId = rpcMsgId
  254. data2, err := a.cluster.Processor.Marshal(msg1)
  255. if err != nil {
  256. log.Errorf("marshal message %v error: %v", reflect.TypeOf(subMsg), err)
  257. return err
  258. }
  259. err = a.conn.WriteMsg(data2...)
  260. if err != nil {
  261. log.Errorf("write message %v error: %v", reflect.TypeOf(subMsg), err)
  262. return err
  263. }
  264. return nil
  265. }
  266. return errors.New("Processor is nil")
  267. }
  268. // 经网关发送给客户端
  269. func (a *ServerAgent) WriteClientRouteMsg(subMsg interface{}, ids ...int64) ([][]byte, error) {
  270. if a.cluster.Processor != nil && a.cluster.SubMsgProcessor != nil {
  271. data, err := a.cluster.SubMsgProcessor.Marshal(subMsg)
  272. if err != nil {
  273. log.Errorf("marshal message %v error: %v", reflect.TypeOf(subMsg), err)
  274. return nil, err
  275. }
  276. msg1 := new(pbfmsg.ResponseRouteMsg)
  277. msg1.Data = data[0]
  278. cnt := len(ids)
  279. if cnt == 1 {
  280. msg1.PlayerId = ids[0]
  281. } else if cnt > 1 {
  282. data3, err3 := json.Marshal(ids)
  283. if err3 != nil {
  284. log.Errorf("WriteRouteMsgToClients marshal ids %v error: %v", reflect.TypeOf(ids), err3)
  285. return nil, err3
  286. }
  287. msg1.PlayerId = -13
  288. msg1.Param = data3
  289. } else {
  290. return nil, errors.New("WriteClientRouteMsg no playerid")
  291. }
  292. data2, err := a.cluster.Processor.Marshal(msg1)
  293. if err != nil {
  294. log.Errorf("marshal message %v error: %v", reflect.TypeOf(msg1), err)
  295. return nil, err
  296. }
  297. err = a.conn.WriteMsg(data2...)
  298. if err != nil {
  299. log.Errorf("write message %v error: %v", reflect.TypeOf(subMsg), err)
  300. return nil, err
  301. }
  302. return data2, nil
  303. }
  304. return nil, errors.New("Processor or MarshalProcessor of server is nil")
  305. }
  306. // LocalAddr 1
  307. func (a *ServerAgent) LocalAddr() net.Addr {
  308. return a.conn.LocalAddr()
  309. }
  310. // RemoteAddr 1
  311. func (a *ServerAgent) RemoteAddr() net.Addr {
  312. return a.conn.RemoteAddr()
  313. }
  314. // Close 1
  315. func (a *ServerAgent) Close() {
  316. a.conn.Close()
  317. }
  318. // Destroy 1
  319. func (a *ServerAgent) Destroy() {
  320. a.conn.Destroy()
  321. }
  322. // UserData 1
  323. func (a *ServerAgent) UserData() interface{} {
  324. a.mutexUserData.Lock()
  325. defer a.mutexUserData.Unlock()
  326. return a.userData
  327. }
  328. // SetUserData 1
  329. func (a *ServerAgent) SetUserData(data interface{}) {
  330. a.mutexUserData.Lock()
  331. defer a.mutexUserData.Unlock()
  332. a.userData = data
  333. }