package networkserver import ( "errors" "leafstalk/conf" "net" "reflect" "sync" "time" "leafstalk/log" "leafstalk/module/handler" "leafstalk/network" ) // Gate 网络服务管理 type Gate struct { MaxConnNum int //最大客户端链接数 PendingWriteNum int //通道内最大可以有几个待发送包 MaxMsgLen uint32 //包最大长度 Processor network.Processor AgentServer *handler.Server //代理上下线通知到的目标 // websocket WSAddr string HTTPTimeout time.Duration //websocket链接、读写超时 CertFile string KeyFile string // tcp TCPAddr string LenMsgLen int //消息长度几个字节 LittleEndian bool } // NewGate 构造 func NewGate(processor network.Processor, agentServer *handler.Server, conf *conf.Config) *Gate { gate := Gate{ MaxConnNum: conf.GetInt("gate.maxconnnum"), PendingWriteNum: conf.GetInt("gate.pendingwritenum"), MaxMsgLen: conf.GetUint32("gate.maxmsglen"), WSAddr: conf.GetString("gate.wsaddr"), HTTPTimeout: conf.GetDuration("gate.httptimeout"), CertFile: conf.GetString("gate.certfile"), KeyFile: conf.GetString("gate.keyfile"), TCPAddr: conf.GetString("gate.tcpaddr"), LenMsgLen: conf.GetInt("gate.lenmsglen"), LittleEndian: conf.GetBool("gate.littleendian"), Processor: processor, AgentServer: agentServer, } return &gate } func (gate *Gate) OnInit() { } // Run 1 func (gate *Gate) Run(closeSig chan bool) { log.Info("gate server start Run...") var wsServer *network.WSServer if gate.WSAddr != "" { wsServer = new(network.WSServer) wsServer.Addr = gate.WSAddr wsServer.MaxConnNum = gate.MaxConnNum wsServer.PendingWriteNum = gate.PendingWriteNum wsServer.MaxMsgLen = gate.MaxMsgLen wsServer.HTTPTimeout = gate.HTTPTimeout wsServer.CertFile = gate.CertFile wsServer.KeyFile = gate.KeyFile wsServer.NewAgent = func(conn *network.WSConn) network.Agent { a := &clientAgent{conn: conn, gate: gate} if gate.AgentServer != nil { gate.AgentServer.Go("NewAgent", a) //客户端链接到服务器 } return a } } var tcpServer *network.TCPServer if gate.TCPAddr != "" { tcpServer = new(network.TCPServer) tcpServer.Addr = gate.TCPAddr tcpServer.MaxConnNum = gate.MaxConnNum tcpServer.PendingWriteNum = gate.PendingWriteNum tcpServer.LenMsgLen = gate.LenMsgLen tcpServer.MaxMsgLen = gate.MaxMsgLen tcpServer.LittleEndian = gate.LittleEndian tcpServer.NewAgent = func(conn *network.TCPConn) network.Agent { a := &clientAgent{conn: conn, gate: gate} if gate.AgentServer != nil { gate.AgentServer.Go("NewAgent", a) //客户端链接到服务器 } return a } } if wsServer != nil { wsServer.Start() } if tcpServer != nil { tcpServer.Start() } <-closeSig if wsServer != nil { wsServer.Close() } if tcpServer != nil { tcpServer.Close() } } // OnDestroy 1 func (gate *Gate) OnDestroy() {} ///* //*/ type clientAgent struct { conn network.Conn gate *Gate userData interface{} mutexUserData sync.Mutex } func MsgHeader(data []byte) string { l := len(data) if l == 0 { return "nilMsgId" } maxLen := 50 if l > maxLen { l = maxLen } // var m map[string]json.RawMessage // err := json.Unmarshal(data, &m) // if err == nil { // for k := range m { // return k // } // } // if l > 10 { // l = 10 // } return string(data[:l]) } func (a *clientAgent) Run() { var tsRead time.Duration var tsRoute time.Duration var readCount uint for { ts := time.Now() data, err := a.conn.ReadMsg() if err != nil { log.Debugf("read message: %v", err.Error()) break } tsRead += time.Since(ts) if a.gate.Processor != nil { msg, err := a.gate.Processor.Unmarshal(data) //log.Infof("ReadMsg: %v", string(data)) if err != nil { header := MsgHeader(data) log.Warnf("unmarshal message error 2: %v %v", header, err.Error()) break } err = a.gate.Processor.Route(msg, a) if err != nil { header := MsgHeader(data) log.Warnf("route message error 2: %v %v", header, err.Error()) break } } tsRoute += time.Since(ts) readCount++ if readCount >= 200000 { log.Debugf("from client read count %v, read time: %v, route time: %v", readCount, tsRead, tsRoute) tsRead = 0 tsRoute = 0 readCount = 0 } } } func (a *clientAgent) OnClose() { if a.gate.AgentServer != nil { err := a.gate.AgentServer.Call0("CloseAgent", a) if err != nil { log.Errorf("handler error: %v", err) } } } func (a *clientAgent) WriteMsg(msg interface{}) { if a.gate.Processor != nil { data, err := a.gate.Processor.Marshal(msg) if err != nil { log.Errorf("marshal message %v error: %v", reflect.TypeOf(msg), err) return } err = a.conn.WriteMsg(data...) if err != nil { log.Errorf("write message %v error: %v", reflect.TypeOf(msg), err) } } } func (a *clientAgent) MarshalMsg(msg interface{}) ([][]byte, error) { if a.gate.Processor != nil { data, err := a.gate.Processor.Marshal(msg) if err != nil { log.Errorf("marshal message %v error: %v", reflect.TypeOf(msg), err) return nil, err } return data, err } return nil, errors.New("MarshalMsg no gate.Processor") } func (a *clientAgent) WriteBytes(data [][]byte) { err := a.conn.WriteMsg(data...) if err != nil { log.Errorf("write message error: %v", err) } } func (a *clientAgent) LocalAddr() net.Addr { return a.conn.LocalAddr() } func (a *clientAgent) RemoteAddr() net.Addr { return a.conn.RemoteAddr() } func (a *clientAgent) Close() { a.conn.Close() } func (a *clientAgent) Destroy() { a.conn.Destroy() } func (a *clientAgent) UserData() interface{} { a.mutexUserData.Lock() defer a.mutexUserData.Unlock() return a.userData } func (a *clientAgent) SetUserData(data interface{}) { a.mutexUserData.Lock() defer a.mutexUserData.Unlock() a.userData = data }