123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389 |
- package cluster
- import (
- "encoding/json"
- "errors"
- "math"
- "net"
- "reflect"
- "sync"
- "time"
- "leafstalk/conf"
- gconf "leafstalk/conf"
- "leafstalk/covenant/pbfmsg"
- "leafstalk/log"
- "leafstalk/module/handler"
- "leafstalk/network"
- )
- // Cluster 1
- type Cluster struct {
- server *network.TCPServer
- clients []*network.TCPClient
- AgentChanRPC *handler.Server
- Processor network.Processor
- SubMsgProcessor network.Processor
- Conf *conf.Config
- }
- // NewCluster 构造
- func NewCluster(processor network.Processor, agentServer *handler.Server, conf *conf.Config) *Cluster {
- cluster := new(Cluster)
- cluster.AgentChanRPC = agentServer
- cluster.Processor = processor
- cluster.SubMsgProcessor = processor
- cluster.Conf = conf
- return cluster
- }
- // Init 1
- func (c *Cluster) Init(conf *gconf.Config) {
- log.Info("cluster Init...")
- if conf.GetString("cluster.listenaddr") != "" {
- c.server = new(network.TCPServer)
- c.server.Addr = conf.GetString("cluster.listenaddr")
- c.server.MaxConnNum = int(math.MaxInt32)
- c.server.PendingWriteNum = conf.GetInt("cluster.pendingwritenum")
- c.server.LenMsgLen = 4
- c.server.MaxMsgLen = math.MaxUint32
- c.server.NewAgent = newAgent(c)
- c.server.Start()
- }
- Addrs := []gconf.ConnAddr{}
- conf.UnmarshalKey("cluster.connaddrs", &Addrs)
- for _, addr := range Addrs {
- client := new(network.TCPClient)
- client.Addr = addr.Addr
- client.ConnNum = 1
- client.ConnectCount = 0
- client.MaxConnectCount = 0
- if conf.GetDuration("cluster.connectinterval") > 0 {
- client.AutoReconnect = true
- client.ConnectInterval = conf.GetDuration("cluster.connectinterval")
- } else {
- client.ConnectInterval = 3 * time.Second
- }
- client.PendingWriteNum = conf.GetInt("cluster.pendingwritenum")
- client.LenMsgLen = 4
- client.MaxMsgLen = math.MaxUint32
- tag := addr.Tag
- client.NewAgent = func(conn *network.TCPConn) network.Agent {
- log.Info("NewServerAgent")
- a := new(ServerAgent)
- a.conn = conn
- a.SetUserData(tag)
- a.cluster = c
- //log.Infoln("NewAgent..")
- if c.AgentChanRPC != nil {
- c.AgentChanRPC.Go("NewClusterAgent", a, true)
- } else {
- log.Warn("not set client AgentChanRPC")
- }
- //log.Infoln("NewAgent end")
- return a
- }
- //fmt.Println(client)
- //log.Infoln(client)
- client.Start()
- c.clients = append(c.clients, client)
- }
- }
- func (c *Cluster) ConnNewServer(addrAndPort string) {
- log.Info("ConnNewServer ", addrAndPort)
- conf := c.Conf
- addr := gconf.ConnAddr{
- Addr: addrAndPort,
- Tag: 1,
- }
- client := new(network.TCPClient)
- client.Addr = addr.Addr
- client.ConnNum = 1
- client.ConnectCount = 0
- client.MaxConnectCount = 1200
- client.AutoReconnect = true
- if conf.GetDuration("cluster.connectinterval") > 0 {
- client.ConnectInterval = conf.GetDuration("cluster.connectinterval")
- } else {
- client.ConnectInterval = 3 * time.Second
- }
- client.PendingWriteNum = conf.GetInt("cluster.pendingwritenum")
- client.LenMsgLen = 4
- client.MaxMsgLen = math.MaxUint32
- tag := addr.Tag
- client.NewAgent = func(conn *network.TCPConn) network.Agent {
- log.Info("NewAgent")
- a := new(ServerAgent)
- a.conn = conn
- a.SetUserData(tag)
- a.cluster = c
- if c.AgentChanRPC != nil {
- c.AgentChanRPC.Go("NewClusterAgent", a, true)
- } else {
- log.Warn("not set client AgentChanRPC")
- }
- return a
- }
- client.Start()
- c.clients = append(c.clients, client)
- }
- // Destroy 1
- func (c *Cluster) Destroy() {
- if c.server != nil {
- c.server.Close()
- }
- for _, client := range c.clients {
- client.Close()
- }
- }
- func (c *Cluster) OnInit() {
- c.Init(c.Conf)
- }
- func (c *Cluster) OnDestroy() {
- c.Destroy()
- }
- func (c *Cluster) Run(closeSig chan bool) {
- // for {
- // select {
- // case <-closeSig:
- // return
- // }
- }
- // ServerAgent 1
- type ServerAgent struct {
- conn *network.TCPConn
- userData interface{}
- cluster *Cluster
- mutexUserData sync.Mutex
- }
- func newAgent(c *Cluster) func(conn *network.TCPConn) network.Agent {
- return func(conn *network.TCPConn) network.Agent {
- a := new(ServerAgent)
- a.conn = conn
- a.cluster = c
- if a.cluster.AgentChanRPC != nil {
- a.cluster.AgentChanRPC.Go("NewClusterAgent", a, false)
- } else {
- log.Warn("newAgent not set client AgentChanRPC")
- }
- return a
- }
- }
- // Run 1
- func (a *ServerAgent) Run() {
- var tsRead time.Duration
- var tsRoute time.Duration
- var readCount uint
- for {
- ts := time.Now()
- data, err := a.conn.ReadMsg()
- if err != nil {
- log.Infof("read message error: %v", err)
- break
- }
- tsRead += time.Since(ts)
- if a.cluster.Processor != nil {
- msg, err := a.cluster.Processor.Unmarshal(data)
- if err != nil {
- log.Infof("unmarshal message error: %v", err)
- break
- }
- err = a.cluster.Processor.Route(msg, a)
- if err != nil {
- log.Infof("route message error: %v", err)
- break
- }
- }
- tsRoute += time.Since(ts)
- readCount++
- if readCount >= 1000000 {
- log.Infof("from server read count %v, read time: %v, route time: %v", readCount, tsRead, tsRoute)
- tsRoute = 0
- readCount = 0
- }
- }
- }
- // OnClose 1
- func (a *ServerAgent) OnClose() {
- if a.cluster.AgentChanRPC != nil {
- err := a.cluster.AgentChanRPC.Call0("CloseClusterAgent", a)
- if err != nil {
- log.Errorf("handler error: %v", err)
- }
- }
- }
- // WriteMsg 1
- func (a *ServerAgent) WriteMsg(msg interface{}) {
- if a.cluster.Processor != nil {
- data, err := a.cluster.Processor.Marshal(msg)
- if err != nil {
- log.Errorf("marshal message %v error: %v", reflect.TypeOf(msg), err)
- return
- }
- err = a.conn.WriteMsg(data...)
- if err != nil {
- log.Errorf("write message %v error: %v", reflect.TypeOf(msg), err)
- }
- }
- }
- // WriteRawMsg 服务器之间直接发送protobuf包
- func (a *ServerAgent) WriteRawMsg(args ...[]byte) {
- err := a.conn.WriteMsg(args...)
- if err != nil {
- log.Errorf("write message %v error: %v", args, err)
- }
- }
- // 服务器之间消息路由
- func (s *ServerAgent) WriteServerNotifyMsg(subMsg interface{}) error {
- return s.WriteServerRouteMsg(subMsg, 0, 0, 0)
- }
- // 服务器之间消息路由
- // rpc回应消息,需要rpcMsgId>0; 其他消息rpcMsgId=0
- func (a *ServerAgent) WriteServerRouteMsg(subMsg interface{}, playerId int64, gateId int64, rpcMsgId int64) error {
- if a.cluster.Processor != nil && a.cluster.SubMsgProcessor != nil {
- data, err := a.cluster.SubMsgProcessor.Marshal(subMsg)
- if err != nil {
- log.Errorf("marshal message %v error: %v", reflect.TypeOf(subMsg), err)
- return err
- }
- if len(data) > 1 {
- log.Errorf("marshal message %v error: data array too long", reflect.TypeOf(subMsg))
- return errors.New("contain too much data")
- }
- msg1 := new(pbfmsg.RouteMsg)
- msg1.Data = data[0]
- msg1.PlayerId = playerId
- msg1.GateId = gateId
- //msg1.MsgName = msgID //提醒禁止传msgName
- msg1.RpcId = rpcMsgId
- data2, err := a.cluster.Processor.Marshal(msg1)
- if err != nil {
- log.Errorf("marshal message %v error: %v", reflect.TypeOf(subMsg), err)
- return err
- }
- err = a.conn.WriteMsg(data2...)
- if err != nil {
- log.Errorf("write message %v error: %v", reflect.TypeOf(subMsg), err)
- return err
- }
- return nil
- }
- return errors.New("Processor is nil")
- }
- // 经网关发送给客户端
- func (a *ServerAgent) WriteClientRouteMsg(subMsg interface{}, ids ...int64) ([][]byte, error) {
- if a.cluster.Processor != nil && a.cluster.SubMsgProcessor != nil {
- data, err := a.cluster.SubMsgProcessor.Marshal(subMsg)
- if err != nil {
- log.Errorf("marshal message %v error: %v", reflect.TypeOf(subMsg), err)
- return nil, err
- }
- msg1 := new(pbfmsg.ResponseRouteMsg)
- msg1.Data = data[0]
- cnt := len(ids)
- if cnt == 1 {
- msg1.PlayerId = ids[0]
- } else if cnt > 1 {
- data3, err3 := json.Marshal(ids)
- if err3 != nil {
- log.Errorf("WriteRouteMsgToClients marshal ids %v error: %v", reflect.TypeOf(ids), err3)
- return nil, err3
- }
- msg1.PlayerId = -13
- msg1.Param = data3
- } else {
- return nil, errors.New("WriteClientRouteMsg no playerid")
- }
- data2, err := a.cluster.Processor.Marshal(msg1)
- if err != nil {
- log.Errorf("marshal message %v error: %v", reflect.TypeOf(msg1), err)
- return nil, err
- }
- err = a.conn.WriteMsg(data2...)
- if err != nil {
- log.Errorf("write message %v error: %v", reflect.TypeOf(subMsg), err)
- return nil, err
- }
- return data2, nil
- }
- return nil, errors.New("Processor or MarshalProcessor of server is nil")
- }
- // LocalAddr 1
- func (a *ServerAgent) LocalAddr() net.Addr {
- return a.conn.LocalAddr()
- }
- // RemoteAddr 1
- func (a *ServerAgent) RemoteAddr() net.Addr {
- return a.conn.RemoteAddr()
- }
- // Close 1
- func (a *ServerAgent) Close() {
- a.conn.Close()
- }
- // Destroy 1
- func (a *ServerAgent) Destroy() {
- a.conn.Destroy()
- }
- // UserData 1
- func (a *ServerAgent) UserData() interface{} {
- a.mutexUserData.Lock()
- defer a.mutexUserData.Unlock()
- return a.userData
- }
- // SetUserData 1
- func (a *ServerAgent) SetUserData(data interface{}) {
- a.mutexUserData.Lock()
- defer a.mutexUserData.Unlock()
- a.userData = data
- }
|