package network import ( "net" "sync" "time" "leafstalk/log" ) // 构造好TCPClient对象后,可以设置参数,然后调用Run()进行连接服务器,读取数据,发送数据 //TCPClient 1 type TCPClient struct { sync.Mutex Addr string ConnNum int ConnectInterval time.Duration MaxConnectCount int ConnectCount int PendingWriteNum int AutoReconnect bool NewAgent func(*TCPConn) Agent conns ConnSet wg sync.WaitGroup closeFlag bool // msg parser LenMsgLen int MinMsgLen uint32 MaxMsgLen uint32 LittleEndian bool msgParser *MsgParser } // func Test1() { // client := NewTCPClient("127.0.0.1:8888", "123") // client.Start() // } // //NewTCPClient 1 // func NewTCPClient(addr string, data interface{}) *TCPClient { // client := new(TCPClient) // client.Addr = addr // client.ConnNum = 1 // client.AutoReconnect = true // client.ConnectInterval = 3 * time.Second // client.PendingWriteNum = 100 // client.LenMsgLen = 4 // client.MaxMsgLen = math.MaxUint32 // // tag := addr.Tag // client.CreateAgentF = func(conn *TCPConn) *ConnAgent { // a := new(ConnAgent) // a.conn = conn // a.SetUserData(data) // // if HandlerServer != nil { // // HandlerServer.Go("NewClusterAgent", a) //本机充当客户机去链接服务器 // // } // return a // } // return client // } //Start 启动 func (client *TCPClient) Start() { client.init() for i := 0; i < client.ConnNum; i++ { client.wg.Add(1) go client.connect(client.wg.Done) } } func (client *TCPClient) init() { client.Lock() defer client.Unlock() if client.ConnNum <= 0 { client.ConnNum = 1 log.Infof("invalid ConnNum, reset to %v", client.ConnNum) } client.ConnectCount = 0 client.MaxConnectCount = 0 if client.ConnectInterval <= 0 { client.ConnectInterval = 3 * time.Second log.Infof("invalid ConnectInterval, reset to %v", client.ConnectInterval) } if client.PendingWriteNum <= 0 { client.PendingWriteNum = 100 log.Infof("invalid PendingWriteNum, reset to %v", client.PendingWriteNum) } if client.NewAgent == nil { log.Fatal("NewAgent must not be nil") } if client.conns != nil { log.Fatal("client is running") } client.conns = make(ConnSet) client.closeFlag = false // msg parser msgParser := NewMsgParser() msgParser.SetMsgLen(client.LenMsgLen, client.MinMsgLen, client.MaxMsgLen) msgParser.SetByteOrder(client.LittleEndian) client.msgParser = msgParser } func (client *TCPClient) dial() net.Conn { for { conn, err := net.Dial("tcp", client.Addr) if err == nil || client.closeFlag { return conn } if client.MaxConnectCount > 0 { if client.ConnectCount < client.MaxConnectCount { client.ConnectCount += 1 } else { return nil } } log.Infof("connect to %v error: %v", client.Addr, err) time.Sleep(client.ConnectInterval) continue } } func (client *TCPClient) connect(funDone func()) { defer funDone() //client.wg.Done() reconnect: //log.Infoln("client start dial...") conn := client.dial() if conn == nil { log.Warn("client dial failed") return } client.Lock() if client.closeFlag { client.Unlock() conn.Close() log.Warn("client connect but closeFlag is true") return } client.conns[conn] = struct{}{} client.Unlock() //log.Infoln("client new connect run...") tcpConn := newTCPConn(conn, client.PendingWriteNum, client.msgParser) agent := client.NewAgent(tcpConn) agent.Run() client.ConnectCount = 0 // cleanup tcpConn.Close() client.Lock() delete(client.conns, conn) client.Unlock() agent.OnClose() if client.AutoReconnect { time.Sleep(client.ConnectInterval) goto reconnect } } //Close 关闭同服务器的链接 func (client *TCPClient) Close() { client.Lock() client.closeFlag = true for conn := range client.conns { conn.Close() } client.conns = nil client.Unlock() client.wg.Wait() }