package cluster import ( "encoding/json" "errors" "math" "net" "reflect" "sync" "time" "leafstalk/conf" gconf "leafstalk/conf" "leafstalk/covenant/pbfmsg" "leafstalk/log" "leafstalk/module/handler" "leafstalk/network" ) // Cluster 1 type Cluster struct { server *network.TCPServer clients []*network.TCPClient AgentChanRPC *handler.Server Processor network.Processor SubMsgProcessor network.Processor Conf *conf.Config } // NewCluster 构造 func NewCluster(processor network.Processor, agentServer *handler.Server, conf *conf.Config) *Cluster { cluster := new(Cluster) cluster.AgentChanRPC = agentServer cluster.Processor = processor cluster.SubMsgProcessor = processor cluster.Conf = conf return cluster } // Init 1 func (c *Cluster) Init(conf *gconf.Config) { log.Info("cluster Init...") if conf.GetString("cluster.listenaddr") != "" { c.server = new(network.TCPServer) c.server.Addr = conf.GetString("cluster.listenaddr") c.server.MaxConnNum = int(math.MaxInt32) c.server.PendingWriteNum = conf.GetInt("cluster.pendingwritenum") c.server.LenMsgLen = 4 c.server.MaxMsgLen = math.MaxUint32 c.server.NewAgent = newAgent(c) c.server.Start() } Addrs := []gconf.ConnAddr{} conf.UnmarshalKey("cluster.connaddrs", &Addrs) for _, addr := range Addrs { client := new(network.TCPClient) client.Addr = addr.Addr client.ConnNum = 1 client.ConnectCount = 0 client.MaxConnectCount = 0 if conf.GetDuration("cluster.connectinterval") > 0 { client.AutoReconnect = true client.ConnectInterval = conf.GetDuration("cluster.connectinterval") } else { client.ConnectInterval = 3 * time.Second } client.PendingWriteNum = conf.GetInt("cluster.pendingwritenum") client.LenMsgLen = 4 client.MaxMsgLen = math.MaxUint32 tag := addr.Tag client.NewAgent = func(conn *network.TCPConn) network.Agent { log.Info("NewServerAgent") a := new(ServerAgent) a.conn = conn a.SetUserData(tag) a.cluster = c //log.Infoln("NewAgent..") if c.AgentChanRPC != nil { c.AgentChanRPC.Go("NewClusterAgent", a, true) } else { log.Warn("not set client AgentChanRPC") } //log.Infoln("NewAgent end") return a } //fmt.Println(client) //log.Infoln(client) client.Start() c.clients = append(c.clients, client) } } func (c *Cluster) ConnNewServer(addrAndPort string) { log.Info("ConnNewServer ", addrAndPort) conf := c.Conf addr := gconf.ConnAddr{ Addr: addrAndPort, Tag: 1, } client := new(network.TCPClient) client.Addr = addr.Addr client.ConnNum = 1 client.ConnectCount = 0 client.MaxConnectCount = 1200 client.AutoReconnect = true if conf.GetDuration("cluster.connectinterval") > 0 { client.ConnectInterval = conf.GetDuration("cluster.connectinterval") } else { client.ConnectInterval = 3 * time.Second } client.PendingWriteNum = conf.GetInt("cluster.pendingwritenum") client.LenMsgLen = 4 client.MaxMsgLen = math.MaxUint32 tag := addr.Tag client.NewAgent = func(conn *network.TCPConn) network.Agent { log.Info("NewAgent") a := new(ServerAgent) a.conn = conn a.SetUserData(tag) a.cluster = c if c.AgentChanRPC != nil { c.AgentChanRPC.Go("NewClusterAgent", a, true) } else { log.Warn("not set client AgentChanRPC") } return a } client.Start() c.clients = append(c.clients, client) } // Destroy 1 func (c *Cluster) Destroy() { if c.server != nil { c.server.Close() } for _, client := range c.clients { client.Close() } } func (c *Cluster) OnInit() { c.Init(c.Conf) } func (c *Cluster) OnDestroy() { c.Destroy() } func (c *Cluster) Run(closeSig chan bool) { // for { // select { // case <-closeSig: // return // } } // ServerAgent 1 type ServerAgent struct { conn *network.TCPConn userData interface{} cluster *Cluster mutexUserData sync.Mutex } func newAgent(c *Cluster) func(conn *network.TCPConn) network.Agent { return func(conn *network.TCPConn) network.Agent { a := new(ServerAgent) a.conn = conn a.cluster = c if a.cluster.AgentChanRPC != nil { a.cluster.AgentChanRPC.Go("NewClusterAgent", a, false) } else { log.Warn("newAgent not set client AgentChanRPC") } return a } } // Run 1 func (a *ServerAgent) Run() { var tsRead time.Duration var tsRoute time.Duration var readCount uint for { ts := time.Now() data, err := a.conn.ReadMsg() if err != nil { log.Infof("read message error: %v", err) break } tsRead += time.Since(ts) if a.cluster.Processor != nil { msg, err := a.cluster.Processor.Unmarshal(data) if err != nil { log.Infof("unmarshal message error: %v", err) break } err = a.cluster.Processor.Route(msg, a) if err != nil { log.Infof("route message error: %v", err) break } } tsRoute += time.Since(ts) readCount++ if readCount >= 1000000 { log.Infof("from server read count %v, read time: %v, route time: %v", readCount, tsRead, tsRoute) tsRoute = 0 readCount = 0 } } } // OnClose 1 func (a *ServerAgent) OnClose() { if a.cluster.AgentChanRPC != nil { err := a.cluster.AgentChanRPC.Call0("CloseClusterAgent", a) if err != nil { log.Errorf("handler error: %v", err) } } } // WriteMsg 1 func (a *ServerAgent) WriteMsg(msg interface{}) { if a.cluster.Processor != nil { data, err := a.cluster.Processor.Marshal(msg) if err != nil { log.Errorf("marshal message %v error: %v", reflect.TypeOf(msg), err) return } err = a.conn.WriteMsg(data...) if err != nil { log.Errorf("write message %v error: %v", reflect.TypeOf(msg), err) } } } // WriteRawMsg 服务器之间直接发送protobuf包 func (a *ServerAgent) WriteRawMsg(args ...[]byte) { err := a.conn.WriteMsg(args...) if err != nil { log.Errorf("write message %v error: %v", args, err) } } // 服务器之间消息路由 func (s *ServerAgent) WriteServerNotifyMsg(subMsg interface{}) error { return s.WriteServerRouteMsg(subMsg, 0, 0, 0) } // 服务器之间消息路由 // rpc回应消息,需要rpcMsgId>0; 其他消息rpcMsgId=0 func (a *ServerAgent) WriteServerRouteMsg(subMsg interface{}, playerId int64, gateId int64, rpcMsgId int64) error { if a.cluster.Processor != nil && a.cluster.SubMsgProcessor != nil { data, err := a.cluster.SubMsgProcessor.Marshal(subMsg) if err != nil { log.Errorf("marshal message %v error: %v", reflect.TypeOf(subMsg), err) return err } if len(data) > 1 { log.Errorf("marshal message %v error: data array too long", reflect.TypeOf(subMsg)) return errors.New("contain too much data") } msg1 := new(pbfmsg.RouteMsg) msg1.Data = data[0] msg1.PlayerId = playerId msg1.GateId = gateId //msg1.MsgName = msgID //提醒禁止传msgName msg1.RpcId = rpcMsgId data2, err := a.cluster.Processor.Marshal(msg1) if err != nil { log.Errorf("marshal message %v error: %v", reflect.TypeOf(subMsg), err) return err } err = a.conn.WriteMsg(data2...) if err != nil { log.Errorf("write message %v error: %v", reflect.TypeOf(subMsg), err) return err } return nil } return errors.New("Processor is nil") } // 经网关发送给客户端 func (a *ServerAgent) WriteClientRouteMsg(subMsg interface{}, ids ...int64) ([][]byte, error) { if a.cluster.Processor != nil && a.cluster.SubMsgProcessor != nil { data, err := a.cluster.SubMsgProcessor.Marshal(subMsg) if err != nil { log.Errorf("marshal message %v error: %v", reflect.TypeOf(subMsg), err) return nil, err } msg1 := new(pbfmsg.ResponseRouteMsg) msg1.Data = data[0] cnt := len(ids) if cnt == 1 { msg1.PlayerId = ids[0] } else if cnt > 1 { data3, err3 := json.Marshal(ids) if err3 != nil { log.Errorf("WriteRouteMsgToClients marshal ids %v error: %v", reflect.TypeOf(ids), err3) return nil, err3 } msg1.PlayerId = -13 msg1.Param = data3 } else { return nil, errors.New("WriteClientRouteMsg no playerid") } data2, err := a.cluster.Processor.Marshal(msg1) if err != nil { log.Errorf("marshal message %v error: %v", reflect.TypeOf(msg1), err) return nil, err } err = a.conn.WriteMsg(data2...) if err != nil { log.Errorf("write message %v error: %v", reflect.TypeOf(subMsg), err) return nil, err } return data2, nil } return nil, errors.New("Processor or MarshalProcessor of server is nil") } // LocalAddr 1 func (a *ServerAgent) LocalAddr() net.Addr { return a.conn.LocalAddr() } // RemoteAddr 1 func (a *ServerAgent) RemoteAddr() net.Addr { return a.conn.RemoteAddr() } // Close 1 func (a *ServerAgent) Close() { a.conn.Close() } // Destroy 1 func (a *ServerAgent) Destroy() { a.conn.Destroy() } // UserData 1 func (a *ServerAgent) UserData() interface{} { a.mutexUserData.Lock() defer a.mutexUserData.Unlock() return a.userData } // SetUserData 1 func (a *ServerAgent) SetUserData(data interface{}) { a.mutexUserData.Lock() defer a.mutexUserData.Unlock() a.userData = data }