tcp_server.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. package network
  2. import (
  3. "leafstalk/log"
  4. "net"
  5. "sync"
  6. "time"
  7. )
  8. type TCPServer struct {
  9. Addr string
  10. MaxConnNum int
  11. PendingWriteNum int
  12. NewAgent func(*TCPConn) Agent
  13. ln net.Listener
  14. conns ConnSet
  15. mutexConns sync.Mutex
  16. wgLn sync.WaitGroup
  17. wgConns sync.WaitGroup
  18. // msg parser
  19. LenMsgLen int
  20. MinMsgLen uint32
  21. MaxMsgLen uint32
  22. LittleEndian bool
  23. msgParser *MsgParser
  24. }
  25. func (server *TCPServer) Start() {
  26. server.init()
  27. go server.run()
  28. }
  29. func (server *TCPServer) init() {
  30. ln, err := net.Listen("tcp", server.Addr)
  31. if err != nil {
  32. log.Fatalf("%v", err)
  33. }
  34. if server.MaxConnNum <= 0 {
  35. server.MaxConnNum = 100
  36. log.Infof("invalid MaxConnNum, reset to %v", server.MaxConnNum)
  37. }
  38. if server.PendingWriteNum <= 0 {
  39. server.PendingWriteNum = 100
  40. log.Infof("invalid PendingWriteNum, reset to %v", server.PendingWriteNum)
  41. }
  42. if server.NewAgent == nil {
  43. log.Fatal("NewAgent must not be nil")
  44. }
  45. server.ln = ln
  46. server.conns = make(ConnSet)
  47. // msg parser
  48. msgParser := NewMsgParser()
  49. msgParser.SetMsgLen(server.LenMsgLen, server.MinMsgLen, server.MaxMsgLen)
  50. msgParser.SetByteOrder(server.LittleEndian)
  51. server.msgParser = msgParser
  52. }
  53. func (server *TCPServer) run() {
  54. server.wgLn.Add(1)
  55. defer server.wgLn.Done()
  56. log.Info("TCPServer start accept")
  57. var tempDelay time.Duration
  58. for {
  59. conn, err := server.ln.Accept()
  60. if err != nil {
  61. if ne, ok := err.(net.Error); ok && ne.Temporary() {
  62. if tempDelay == 0 {
  63. tempDelay = 5 * time.Millisecond
  64. } else {
  65. tempDelay *= 2
  66. }
  67. if max := 1 * time.Second; tempDelay > max {
  68. tempDelay = max
  69. }
  70. log.Infof("accept error: %v; retrying in %v", err, tempDelay)
  71. time.Sleep(tempDelay)
  72. continue
  73. }
  74. return
  75. }
  76. tempDelay = 0
  77. server.mutexConns.Lock()
  78. if len(server.conns) >= server.MaxConnNum {
  79. server.mutexConns.Unlock()
  80. conn.Close()
  81. log.Debug("too many connections")
  82. continue
  83. }
  84. server.conns[conn] = struct{}{}
  85. server.mutexConns.Unlock()
  86. server.wgConns.Add(1)
  87. tcpConn := newTCPConn(conn, server.PendingWriteNum, server.msgParser)
  88. agent := server.NewAgent(tcpConn)
  89. go func() {
  90. agent.Run()
  91. // cleanup
  92. tcpConn.Close()
  93. server.mutexConns.Lock()
  94. delete(server.conns, conn)
  95. server.mutexConns.Unlock()
  96. agent.OnClose()
  97. server.wgConns.Done()
  98. }()
  99. }
  100. }
  101. // Close 1
  102. func (server *TCPServer) Close() {
  103. server.ln.Close()
  104. server.wgLn.Wait()
  105. server.mutexConns.Lock()
  106. for conn := range server.conns {
  107. conn.Close()
  108. }
  109. server.conns = nil
  110. server.mutexConns.Unlock()
  111. server.wgConns.Wait()
  112. }