tcp_client.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. package network
  2. import (
  3. "net"
  4. "sync"
  5. "time"
  6. "leafstalk/log"
  7. )
  8. // 构造好TCPClient对象后,可以设置参数,然后调用Run()进行连接服务器,读取数据,发送数据
  9. //TCPClient 1
  10. type TCPClient struct {
  11. sync.Mutex
  12. Addr string
  13. ConnNum int
  14. ConnectInterval time.Duration
  15. MaxConnectCount int
  16. ConnectCount int
  17. PendingWriteNum int
  18. AutoReconnect bool
  19. NewAgent func(*TCPConn) Agent
  20. conns ConnSet
  21. wg sync.WaitGroup
  22. closeFlag bool
  23. // msg parser
  24. LenMsgLen int
  25. MinMsgLen uint32
  26. MaxMsgLen uint32
  27. LittleEndian bool
  28. msgParser *MsgParser
  29. }
  30. // func Test1() {
  31. // client := NewTCPClient("127.0.0.1:8888", "123")
  32. // client.Start()
  33. // }
  34. // //NewTCPClient 1
  35. // func NewTCPClient(addr string, data interface{}) *TCPClient {
  36. // client := new(TCPClient)
  37. // client.Addr = addr
  38. // client.ConnNum = 1
  39. // client.AutoReconnect = true
  40. // client.ConnectInterval = 3 * time.Second
  41. // client.PendingWriteNum = 100
  42. // client.LenMsgLen = 4
  43. // client.MaxMsgLen = math.MaxUint32
  44. // // tag := addr.Tag
  45. // client.CreateAgentF = func(conn *TCPConn) *ConnAgent {
  46. // a := new(ConnAgent)
  47. // a.conn = conn
  48. // a.SetUserData(data)
  49. // // if HandlerServer != nil {
  50. // // HandlerServer.Go("NewClusterAgent", a) //本机充当客户机去链接服务器
  51. // // }
  52. // return a
  53. // }
  54. // return client
  55. // }
  56. //Start 启动
  57. func (client *TCPClient) Start() {
  58. client.init()
  59. for i := 0; i < client.ConnNum; i++ {
  60. client.wg.Add(1)
  61. go client.connect(client.wg.Done)
  62. }
  63. }
  64. func (client *TCPClient) init() {
  65. client.Lock()
  66. defer client.Unlock()
  67. if client.ConnNum <= 0 {
  68. client.ConnNum = 1
  69. log.Infof("invalid ConnNum, reset to %v", client.ConnNum)
  70. }
  71. client.ConnectCount = 0
  72. client.MaxConnectCount = 0
  73. if client.ConnectInterval <= 0 {
  74. client.ConnectInterval = 3 * time.Second
  75. log.Infof("invalid ConnectInterval, reset to %v", client.ConnectInterval)
  76. }
  77. if client.PendingWriteNum <= 0 {
  78. client.PendingWriteNum = 100
  79. log.Infof("invalid PendingWriteNum, reset to %v", client.PendingWriteNum)
  80. }
  81. if client.NewAgent == nil {
  82. log.Fatal("NewAgent must not be nil")
  83. }
  84. if client.conns != nil {
  85. log.Fatal("client is running")
  86. }
  87. client.conns = make(ConnSet)
  88. client.closeFlag = false
  89. // msg parser
  90. msgParser := NewMsgParser()
  91. msgParser.SetMsgLen(client.LenMsgLen, client.MinMsgLen, client.MaxMsgLen)
  92. msgParser.SetByteOrder(client.LittleEndian)
  93. client.msgParser = msgParser
  94. }
  95. func (client *TCPClient) dial() net.Conn {
  96. for {
  97. conn, err := net.Dial("tcp", client.Addr)
  98. if err == nil || client.closeFlag {
  99. return conn
  100. }
  101. if client.MaxConnectCount > 0 {
  102. if client.ConnectCount < client.MaxConnectCount {
  103. client.ConnectCount += 1
  104. } else {
  105. return nil
  106. }
  107. }
  108. log.Infof("connect to %v error: %v", client.Addr, err)
  109. time.Sleep(client.ConnectInterval)
  110. continue
  111. }
  112. }
  113. func (client *TCPClient) connect(funDone func()) {
  114. defer funDone()
  115. //client.wg.Done()
  116. reconnect:
  117. //log.Infoln("client start dial...")
  118. conn := client.dial()
  119. if conn == nil {
  120. log.Warn("client dial failed")
  121. return
  122. }
  123. client.Lock()
  124. if client.closeFlag {
  125. client.Unlock()
  126. conn.Close()
  127. log.Warn("client connect but closeFlag is true")
  128. return
  129. }
  130. client.conns[conn] = struct{}{}
  131. client.Unlock()
  132. //log.Infoln("client new connect run...")
  133. tcpConn := newTCPConn(conn, client.PendingWriteNum, client.msgParser)
  134. agent := client.NewAgent(tcpConn)
  135. agent.Run()
  136. client.ConnectCount = 0
  137. // cleanup
  138. tcpConn.Close()
  139. client.Lock()
  140. delete(client.conns, conn)
  141. client.Unlock()
  142. agent.OnClose()
  143. if client.AutoReconnect {
  144. time.Sleep(client.ConnectInterval)
  145. goto reconnect
  146. }
  147. }
  148. //Close 关闭同服务器的链接
  149. func (client *TCPClient) Close() {
  150. client.Lock()
  151. client.closeFlag = true
  152. for conn := range client.conns {
  153. conn.Close()
  154. }
  155. client.conns = nil
  156. client.Unlock()
  157. client.wg.Wait()
  158. }