tcp_conn.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package network
  2. import (
  3. "bufio"
  4. "errors"
  5. "net"
  6. "sync"
  7. "time"
  8. "leafstalk/log"
  9. )
  10. // ConnSet 1
  11. type ConnSet map[net.Conn]struct{}
  12. // TCPConn 12
  13. type TCPConn struct {
  14. sync.Mutex
  15. conn net.Conn
  16. reader *bufio.Reader
  17. writeChan chan []byte
  18. closeFlag bool
  19. msgParser *MsgParser
  20. }
  21. func newTCPConn(conn net.Conn, pendingWriteNum int, msgParser *MsgParser) *TCPConn {
  22. tcpConn := new(TCPConn)
  23. tcpConn.conn = conn
  24. tcpConn.writeChan = make(chan []byte, pendingWriteNum)
  25. tcpConn.msgParser = msgParser
  26. tcpConn.reader = bufio.NewReaderSize(tcpConn, 1024)
  27. // byte
  28. // var a = 'c'
  29. go func() {
  30. var tsWrite time.Duration
  31. var writeCount uint
  32. for b := range tcpConn.writeChan {
  33. if b == nil {
  34. break
  35. }
  36. ts := time.Now()
  37. _, err := conn.Write(b)
  38. if err != nil {
  39. break
  40. }
  41. // log.Warnln(b)
  42. tsWrite += time.Since(ts)
  43. writeCount++
  44. if writeCount >= 20000 {
  45. lwc := len(tcpConn.writeChan)
  46. if lwc > 10000 {
  47. log.Warnf("to client write count %v, write time: %v, len(chan): %v", writeCount, tsWrite, lwc)
  48. }
  49. tsWrite = 0
  50. writeCount = 0
  51. }
  52. }
  53. conn.Close()
  54. tcpConn.Lock()
  55. tcpConn.closeFlag = true
  56. tcpConn.Unlock()
  57. }()
  58. return tcpConn
  59. }
  60. func (tcpConn *TCPConn) doDestroy() {
  61. tcpConn.conn.(*net.TCPConn).SetLinger(0)
  62. tcpConn.conn.Close()
  63. if !tcpConn.closeFlag {
  64. close(tcpConn.writeChan)
  65. tcpConn.closeFlag = true
  66. }
  67. }
  68. // Destroy 销毁链接对象
  69. func (tcpConn *TCPConn) Destroy() {
  70. tcpConn.Lock()
  71. defer tcpConn.Unlock()
  72. tcpConn.doDestroy()
  73. }
  74. // Close 关闭链接
  75. func (tcpConn *TCPConn) Close() {
  76. tcpConn.Lock()
  77. defer tcpConn.Unlock()
  78. if tcpConn.closeFlag {
  79. return
  80. }
  81. tcpConn.doWrite(nil)
  82. tcpConn.closeFlag = true
  83. }
  84. func (tcpConn *TCPConn) doWrite(b []byte) error {
  85. l := len(tcpConn.writeChan)
  86. if l == cap(tcpConn.writeChan) {
  87. log.Debugf("close conn: channel full, count: %v", l)
  88. // tcpConn.doDestroy()
  89. // return errors.New("channel full")
  90. }
  91. tcpConn.writeChan <- b
  92. return nil
  93. }
  94. // Write b must not be modified by the others goroutines
  95. // 实现writer接口,编码后调用
  96. func (tcpConn *TCPConn) Write(b []byte) (int, error) {
  97. tcpConn.Lock()
  98. defer tcpConn.Unlock()
  99. if tcpConn.closeFlag || b == nil {
  100. return 0, errors.New("connect closed or b is nil")
  101. }
  102. if err := tcpConn.doWrite(b); err != nil {
  103. return 0, err
  104. }
  105. return len(b), nil
  106. }
  107. // Read 实现reader接口,
  108. func (tcpConn *TCPConn) Read(b []byte) (int, error) {
  109. return tcpConn.conn.Read(b)
  110. }
  111. // LocalAddr 取链接的本地地址
  112. func (tcpConn *TCPConn) LocalAddr() net.Addr {
  113. return tcpConn.conn.LocalAddr()
  114. }
  115. // RemoteAddr 取链接的对端地址
  116. func (tcpConn *TCPConn) RemoteAddr() net.Addr {
  117. return tcpConn.conn.RemoteAddr()
  118. }
  119. // ReadMsg 读整个消息报
  120. func (tcpConn *TCPConn) ReadMsg() ([]byte, error) {
  121. return tcpConn.msgParser.Read(tcpConn.reader)
  122. }
  123. // WriteMsg 写整条消息
  124. func (tcpConn *TCPConn) WriteMsg(args ...[]byte) error {
  125. return tcpConn.msgParser.Write(tcpConn, args...)
  126. }