123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130 |
- package network
- import (
- "leafstalk/log"
- "net"
- "sync"
- "time"
- )
- type TCPServer struct {
- Addr string
- MaxConnNum int
- PendingWriteNum int
- NewAgent func(*TCPConn) Agent
- ln net.Listener
- conns ConnSet
- mutexConns sync.Mutex
- wgLn sync.WaitGroup
- wgConns sync.WaitGroup
- // msg parser
- LenMsgLen int
- MinMsgLen uint32
- MaxMsgLen uint32
- LittleEndian bool
- msgParser *MsgParser
- }
- func (server *TCPServer) Start() {
- server.init()
- go server.run()
- }
- func (server *TCPServer) init() {
- ln, err := net.Listen("tcp", server.Addr)
- if err != nil {
- log.Fatalf("%v", err)
- }
- if server.MaxConnNum <= 0 {
- server.MaxConnNum = 100
- log.Infof("invalid MaxConnNum, reset to %v", server.MaxConnNum)
- }
- if server.PendingWriteNum <= 0 {
- server.PendingWriteNum = 100
- log.Infof("invalid PendingWriteNum, reset to %v", server.PendingWriteNum)
- }
- if server.NewAgent == nil {
- log.Fatal("NewAgent must not be nil")
- }
- server.ln = ln
- server.conns = make(ConnSet)
- // msg parser
- msgParser := NewMsgParser()
- msgParser.SetMsgLen(server.LenMsgLen, server.MinMsgLen, server.MaxMsgLen)
- msgParser.SetByteOrder(server.LittleEndian)
- server.msgParser = msgParser
- }
- func (server *TCPServer) run() {
- server.wgLn.Add(1)
- defer server.wgLn.Done()
- log.Info("TCPServer start accept")
- var tempDelay time.Duration
- for {
- conn, err := server.ln.Accept()
- if err != nil {
- if ne, ok := err.(net.Error); ok && ne.Temporary() {
- if tempDelay == 0 {
- tempDelay = 5 * time.Millisecond
- } else {
- tempDelay *= 2
- }
- if max := 1 * time.Second; tempDelay > max {
- tempDelay = max
- }
- log.Infof("accept error: %v; retrying in %v", err, tempDelay)
- time.Sleep(tempDelay)
- continue
- }
- return
- }
- tempDelay = 0
- server.mutexConns.Lock()
- if len(server.conns) >= server.MaxConnNum {
- server.mutexConns.Unlock()
- conn.Close()
- log.Debug("too many connections")
- continue
- }
- server.conns[conn] = struct{}{}
- server.mutexConns.Unlock()
- server.wgConns.Add(1)
- tcpConn := newTCPConn(conn, server.PendingWriteNum, server.msgParser)
- agent := server.NewAgent(tcpConn)
- go func() {
- agent.Run()
- // cleanup
- tcpConn.Close()
- server.mutexConns.Lock()
- delete(server.conns, conn)
- server.mutexConns.Unlock()
- agent.OnClose()
- server.wgConns.Done()
- }()
- }
- }
- // Close 1
- func (server *TCPServer) Close() {
- server.ln.Close()
- server.wgLn.Wait()
- server.mutexConns.Lock()
- for conn := range server.conns {
- conn.Close()
- }
- server.conns = nil
- server.mutexConns.Unlock()
- server.wgConns.Wait()
- }
|