123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154 |
- package network
- import (
- "bufio"
- "errors"
- "net"
- "sync"
- "time"
- "leafstalk/log"
- )
- // ConnSet 1
- type ConnSet map[net.Conn]struct{}
- // TCPConn 12
- type TCPConn struct {
- sync.Mutex
- conn net.Conn
- reader *bufio.Reader
- writeChan chan []byte
- closeFlag bool
- msgParser *MsgParser
- }
- func newTCPConn(conn net.Conn, pendingWriteNum int, msgParser *MsgParser) *TCPConn {
- tcpConn := new(TCPConn)
- tcpConn.conn = conn
- tcpConn.writeChan = make(chan []byte, pendingWriteNum)
- tcpConn.msgParser = msgParser
- tcpConn.reader = bufio.NewReaderSize(tcpConn, 1024)
- // byte
- // var a = 'c'
- go func() {
- var tsWrite time.Duration
- var writeCount uint
- for b := range tcpConn.writeChan {
- if b == nil {
- break
- }
- ts := time.Now()
- _, err := conn.Write(b)
- if err != nil {
- break
- }
- // log.Warnln(b)
- tsWrite += time.Since(ts)
- writeCount++
- if writeCount >= 20000 {
- lwc := len(tcpConn.writeChan)
- if lwc > 10000 {
- log.Warnf("to client write count %v, write time: %v, len(chan): %v", writeCount, tsWrite, lwc)
- }
- tsWrite = 0
- writeCount = 0
- }
- }
- conn.Close()
- tcpConn.Lock()
- tcpConn.closeFlag = true
- tcpConn.Unlock()
- }()
- return tcpConn
- }
- func (tcpConn *TCPConn) doDestroy() {
- tcpConn.conn.(*net.TCPConn).SetLinger(0)
- tcpConn.conn.Close()
- if !tcpConn.closeFlag {
- close(tcpConn.writeChan)
- tcpConn.closeFlag = true
- }
- }
- // Destroy 销毁链接对象
- func (tcpConn *TCPConn) Destroy() {
- tcpConn.Lock()
- defer tcpConn.Unlock()
- tcpConn.doDestroy()
- }
- // Close 关闭链接
- func (tcpConn *TCPConn) Close() {
- tcpConn.Lock()
- defer tcpConn.Unlock()
- if tcpConn.closeFlag {
- return
- }
- tcpConn.doWrite(nil)
- tcpConn.closeFlag = true
- }
- func (tcpConn *TCPConn) doWrite(b []byte) error {
- l := len(tcpConn.writeChan)
- if l == cap(tcpConn.writeChan) {
- log.Debugf("close conn: channel full, count: %v", l)
- // tcpConn.doDestroy()
- // return errors.New("channel full")
- }
- tcpConn.writeChan <- b
- return nil
- }
- // Write b must not be modified by the others goroutines
- // 实现writer接口,编码后调用
- func (tcpConn *TCPConn) Write(b []byte) (int, error) {
- tcpConn.Lock()
- defer tcpConn.Unlock()
- if tcpConn.closeFlag || b == nil {
- return 0, errors.New("connect closed or b is nil")
- }
- if err := tcpConn.doWrite(b); err != nil {
- return 0, err
- }
- return len(b), nil
- }
- // Read 实现reader接口,
- func (tcpConn *TCPConn) Read(b []byte) (int, error) {
- return tcpConn.conn.Read(b)
- }
- // LocalAddr 取链接的本地地址
- func (tcpConn *TCPConn) LocalAddr() net.Addr {
- return tcpConn.conn.LocalAddr()
- }
- // RemoteAddr 取链接的对端地址
- func (tcpConn *TCPConn) RemoteAddr() net.Addr {
- return tcpConn.conn.RemoteAddr()
- }
- // ReadMsg 读整个消息报
- func (tcpConn *TCPConn) ReadMsg() ([]byte, error) {
- return tcpConn.msgParser.Read(tcpConn.reader)
- }
- // WriteMsg 写整条消息
- func (tcpConn *TCPConn) WriteMsg(args ...[]byte) error {
- return tcpConn.msgParser.Write(tcpConn, args...)
- }
|