package network import ( "bufio" "errors" "net" "sync" "time" "leafstalk/log" ) // ConnSet 1 type ConnSet map[net.Conn]struct{} // TCPConn 12 type TCPConn struct { sync.Mutex conn net.Conn reader *bufio.Reader writeChan chan []byte closeFlag bool msgParser *MsgParser } func newTCPConn(conn net.Conn, pendingWriteNum int, msgParser *MsgParser) *TCPConn { tcpConn := new(TCPConn) tcpConn.conn = conn tcpConn.writeChan = make(chan []byte, pendingWriteNum) tcpConn.msgParser = msgParser tcpConn.reader = bufio.NewReaderSize(tcpConn, 1024) // byte // var a = 'c' go func() { var tsWrite time.Duration var writeCount uint for b := range tcpConn.writeChan { if b == nil { break } ts := time.Now() _, err := conn.Write(b) if err != nil { break } // log.Warnln(b) tsWrite += time.Since(ts) writeCount++ if writeCount >= 20000 { lwc := len(tcpConn.writeChan) if lwc > 10000 { log.Warnf("to client write count %v, write time: %v, len(chan): %v", writeCount, tsWrite, lwc) } tsWrite = 0 writeCount = 0 } } conn.Close() tcpConn.Lock() tcpConn.closeFlag = true tcpConn.Unlock() }() return tcpConn } func (tcpConn *TCPConn) doDestroy() { tcpConn.conn.(*net.TCPConn).SetLinger(0) tcpConn.conn.Close() if !tcpConn.closeFlag { close(tcpConn.writeChan) tcpConn.closeFlag = true } } // Destroy 销毁链接对象 func (tcpConn *TCPConn) Destroy() { tcpConn.Lock() defer tcpConn.Unlock() tcpConn.doDestroy() } // Close 关闭链接 func (tcpConn *TCPConn) Close() { tcpConn.Lock() defer tcpConn.Unlock() if tcpConn.closeFlag { return } tcpConn.doWrite(nil) tcpConn.closeFlag = true } func (tcpConn *TCPConn) doWrite(b []byte) error { l := len(tcpConn.writeChan) if l == cap(tcpConn.writeChan) { log.Debugf("close conn: channel full, count: %v", l) // tcpConn.doDestroy() // return errors.New("channel full") } tcpConn.writeChan <- b return nil } // Write b must not be modified by the others goroutines // 实现writer接口,编码后调用 func (tcpConn *TCPConn) Write(b []byte) (int, error) { tcpConn.Lock() defer tcpConn.Unlock() if tcpConn.closeFlag || b == nil { return 0, errors.New("connect closed or b is nil") } if err := tcpConn.doWrite(b); err != nil { return 0, err } return len(b), nil } // Read 实现reader接口, func (tcpConn *TCPConn) Read(b []byte) (int, error) { return tcpConn.conn.Read(b) } // LocalAddr 取链接的本地地址 func (tcpConn *TCPConn) LocalAddr() net.Addr { return tcpConn.conn.LocalAddr() } // RemoteAddr 取链接的对端地址 func (tcpConn *TCPConn) RemoteAddr() net.Addr { return tcpConn.conn.RemoteAddr() } // ReadMsg 读整个消息报 func (tcpConn *TCPConn) ReadMsg() ([]byte, error) { return tcpConn.msgParser.Read(tcpConn.reader) } // WriteMsg 写整条消息 func (tcpConn *TCPConn) WriteMsg(args ...[]byte) error { return tcpConn.msgParser.Write(tcpConn, args...) }