123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185 |
- package network
- import (
- "net"
- "sync"
- "time"
- "leafstalk/log"
- )
- // 构造好TCPClient对象后,可以设置参数,然后调用Run()进行连接服务器,读取数据,发送数据
- //TCPClient 1
- type TCPClient struct {
- sync.Mutex
- Addr string
- ConnNum int
- ConnectInterval time.Duration
- MaxConnectCount int
- ConnectCount int
- PendingWriteNum int
- AutoReconnect bool
- NewAgent func(*TCPConn) Agent
- conns ConnSet
- wg sync.WaitGroup
- closeFlag bool
- // msg parser
- LenMsgLen int
- MinMsgLen uint32
- MaxMsgLen uint32
- LittleEndian bool
- msgParser *MsgParser
- }
- // func Test1() {
- // client := NewTCPClient("127.0.0.1:8888", "123")
- // client.Start()
- // }
- // //NewTCPClient 1
- // func NewTCPClient(addr string, data interface{}) *TCPClient {
- // client := new(TCPClient)
- // client.Addr = addr
- // client.ConnNum = 1
- // client.AutoReconnect = true
- // client.ConnectInterval = 3 * time.Second
- // client.PendingWriteNum = 100
- // client.LenMsgLen = 4
- // client.MaxMsgLen = math.MaxUint32
- // // tag := addr.Tag
- // client.CreateAgentF = func(conn *TCPConn) *ConnAgent {
- // a := new(ConnAgent)
- // a.conn = conn
- // a.SetUserData(data)
- // // if HandlerServer != nil {
- // // HandlerServer.Go("NewClusterAgent", a) //本机充当客户机去链接服务器
- // // }
- // return a
- // }
- // return client
- // }
- //Start 启动
- func (client *TCPClient) Start() {
- client.init()
- for i := 0; i < client.ConnNum; i++ {
- client.wg.Add(1)
- go client.connect(client.wg.Done)
- }
- }
- func (client *TCPClient) init() {
- client.Lock()
- defer client.Unlock()
- if client.ConnNum <= 0 {
- client.ConnNum = 1
- log.Infof("invalid ConnNum, reset to %v", client.ConnNum)
- }
- client.ConnectCount = 0
- client.MaxConnectCount = 0
- if client.ConnectInterval <= 0 {
- client.ConnectInterval = 3 * time.Second
- log.Infof("invalid ConnectInterval, reset to %v", client.ConnectInterval)
- }
- if client.PendingWriteNum <= 0 {
- client.PendingWriteNum = 100
- log.Infof("invalid PendingWriteNum, reset to %v", client.PendingWriteNum)
- }
- if client.NewAgent == nil {
- log.Fatal("NewAgent must not be nil")
- }
- if client.conns != nil {
- log.Fatal("client is running")
- }
- client.conns = make(ConnSet)
- client.closeFlag = false
- // msg parser
- msgParser := NewMsgParser()
- msgParser.SetMsgLen(client.LenMsgLen, client.MinMsgLen, client.MaxMsgLen)
- msgParser.SetByteOrder(client.LittleEndian)
- client.msgParser = msgParser
- }
- func (client *TCPClient) dial() net.Conn {
- for {
- conn, err := net.Dial("tcp", client.Addr)
- if err == nil || client.closeFlag {
- return conn
- }
- if client.MaxConnectCount > 0 {
- if client.ConnectCount < client.MaxConnectCount {
- client.ConnectCount += 1
- } else {
- return nil
- }
- }
- log.Infof("connect to %v error: %v", client.Addr, err)
- time.Sleep(client.ConnectInterval)
- continue
- }
- }
- func (client *TCPClient) connect(funDone func()) {
- defer funDone()
- //client.wg.Done()
- reconnect:
- //log.Infoln("client start dial...")
- conn := client.dial()
- if conn == nil {
- log.Warn("client dial failed")
- return
- }
- client.Lock()
- if client.closeFlag {
- client.Unlock()
- conn.Close()
- log.Warn("client connect but closeFlag is true")
- return
- }
- client.conns[conn] = struct{}{}
- client.Unlock()
- //log.Infoln("client new connect run...")
- tcpConn := newTCPConn(conn, client.PendingWriteNum, client.msgParser)
- agent := client.NewAgent(tcpConn)
- agent.Run()
- client.ConnectCount = 0
- // cleanup
- tcpConn.Close()
- client.Lock()
- delete(client.conns, conn)
- client.Unlock()
- agent.OnClose()
- if client.AutoReconnect {
- time.Sleep(client.ConnectInterval)
- goto reconnect
- }
- }
- //Close 关闭同服务器的链接
- func (client *TCPClient) Close() {
- client.Lock()
- client.closeFlag = true
- for conn := range client.conns {
- conn.Close()
- }
- client.conns = nil
- client.Unlock()
- client.wg.Wait()
- }
|