server.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. package networkserver
  2. import (
  3. "errors"
  4. "leafstalk/conf"
  5. "net"
  6. "reflect"
  7. "sync"
  8. "time"
  9. "leafstalk/log"
  10. "leafstalk/module/handler"
  11. "leafstalk/network"
  12. )
  13. // Gate 网络服务管理
  14. type Gate struct {
  15. MaxConnNum int //最大客户端链接数
  16. PendingWriteNum int //通道内最大可以有几个待发送包
  17. MaxMsgLen uint32 //包最大长度
  18. Processor network.Processor
  19. AgentServer *handler.Server //代理上下线通知到的目标
  20. // websocket
  21. WSAddr string
  22. HTTPTimeout time.Duration //websocket链接、读写超时
  23. CertFile string
  24. KeyFile string
  25. // tcp
  26. TCPAddr string
  27. LenMsgLen int //消息长度几个字节
  28. LittleEndian bool
  29. }
  30. // NewGate 构造
  31. func NewGate(processor network.Processor, agentServer *handler.Server, conf *conf.Config) *Gate {
  32. gate := Gate{
  33. MaxConnNum: conf.GetInt("gate.maxconnnum"),
  34. PendingWriteNum: conf.GetInt("gate.pendingwritenum"),
  35. MaxMsgLen: conf.GetUint32("gate.maxmsglen"),
  36. WSAddr: conf.GetString("gate.wsaddr"),
  37. HTTPTimeout: conf.GetDuration("gate.httptimeout"),
  38. CertFile: conf.GetString("gate.certfile"),
  39. KeyFile: conf.GetString("gate.keyfile"),
  40. TCPAddr: conf.GetString("gate.tcpaddr"),
  41. LenMsgLen: conf.GetInt("gate.lenmsglen"),
  42. LittleEndian: conf.GetBool("gate.littleendian"),
  43. Processor: processor,
  44. AgentServer: agentServer,
  45. }
  46. return &gate
  47. }
  48. func (gate *Gate) OnInit() {
  49. }
  50. // Run 1
  51. func (gate *Gate) Run(closeSig chan bool) {
  52. log.Info("gate server start Run...")
  53. var wsServer *network.WSServer
  54. if gate.WSAddr != "" {
  55. wsServer = new(network.WSServer)
  56. wsServer.Addr = gate.WSAddr
  57. wsServer.MaxConnNum = gate.MaxConnNum
  58. wsServer.PendingWriteNum = gate.PendingWriteNum
  59. wsServer.MaxMsgLen = gate.MaxMsgLen
  60. wsServer.HTTPTimeout = gate.HTTPTimeout
  61. wsServer.CertFile = gate.CertFile
  62. wsServer.KeyFile = gate.KeyFile
  63. wsServer.NewAgent = func(conn *network.WSConn) network.Agent {
  64. a := &clientAgent{conn: conn, gate: gate}
  65. if gate.AgentServer != nil {
  66. gate.AgentServer.Go("NewAgent", a) //客户端链接到服务器
  67. }
  68. return a
  69. }
  70. }
  71. var tcpServer *network.TCPServer
  72. if gate.TCPAddr != "" {
  73. tcpServer = new(network.TCPServer)
  74. tcpServer.Addr = gate.TCPAddr
  75. tcpServer.MaxConnNum = gate.MaxConnNum
  76. tcpServer.PendingWriteNum = gate.PendingWriteNum
  77. tcpServer.LenMsgLen = gate.LenMsgLen
  78. tcpServer.MaxMsgLen = gate.MaxMsgLen
  79. tcpServer.LittleEndian = gate.LittleEndian
  80. tcpServer.NewAgent = func(conn *network.TCPConn) network.Agent {
  81. a := &clientAgent{conn: conn, gate: gate}
  82. if gate.AgentServer != nil {
  83. gate.AgentServer.Go("NewAgent", a) //客户端链接到服务器
  84. }
  85. return a
  86. }
  87. }
  88. if wsServer != nil {
  89. wsServer.Start()
  90. }
  91. if tcpServer != nil {
  92. tcpServer.Start()
  93. }
  94. <-closeSig
  95. if wsServer != nil {
  96. wsServer.Close()
  97. }
  98. if tcpServer != nil {
  99. tcpServer.Close()
  100. }
  101. }
  102. // OnDestroy 1
  103. func (gate *Gate) OnDestroy() {}
  104. ///*
  105. //*/
  106. type clientAgent struct {
  107. conn network.Conn
  108. gate *Gate
  109. userData interface{}
  110. mutexUserData sync.Mutex
  111. }
  112. func MsgHeader(data []byte) string {
  113. l := len(data)
  114. if l == 0 {
  115. return "nilMsgId"
  116. }
  117. maxLen := 50
  118. if l > maxLen {
  119. l = maxLen
  120. }
  121. // var m map[string]json.RawMessage
  122. // err := json.Unmarshal(data, &m)
  123. // if err == nil {
  124. // for k := range m {
  125. // return k
  126. // }
  127. // }
  128. // if l > 10 {
  129. // l = 10
  130. // }
  131. return string(data[:l])
  132. }
  133. func (a *clientAgent) Run() {
  134. var tsRead time.Duration
  135. var tsRoute time.Duration
  136. var readCount uint
  137. for {
  138. ts := time.Now()
  139. data, err := a.conn.ReadMsg()
  140. if err != nil {
  141. log.Debugf("read message: %v", err.Error())
  142. break
  143. }
  144. tsRead += time.Since(ts)
  145. if a.gate.Processor != nil {
  146. msg, err := a.gate.Processor.Unmarshal(data)
  147. //log.Infof("ReadMsg: %v", string(data))
  148. if err != nil {
  149. header := MsgHeader(data)
  150. log.Warnf("unmarshal message error 2: %v %v", header, err.Error())
  151. break
  152. }
  153. err = a.gate.Processor.Route(msg, a)
  154. if err != nil {
  155. header := MsgHeader(data)
  156. log.Warnf("route message error 2: %v %v", header, err.Error())
  157. break
  158. }
  159. }
  160. tsRoute += time.Since(ts)
  161. readCount++
  162. if readCount >= 200000 {
  163. log.Debugf("from client read count %v, read time: %v, route time: %v", readCount, tsRead, tsRoute)
  164. tsRead = 0
  165. tsRoute = 0
  166. readCount = 0
  167. }
  168. }
  169. }
  170. func (a *clientAgent) OnClose() {
  171. if a.gate.AgentServer != nil {
  172. err := a.gate.AgentServer.Call0("CloseAgent", a)
  173. if err != nil {
  174. log.Errorf("handler error: %v", err)
  175. }
  176. }
  177. }
  178. func (a *clientAgent) WriteMsg(msg interface{}) {
  179. if a.gate.Processor != nil {
  180. data, err := a.gate.Processor.Marshal(msg)
  181. if err != nil {
  182. log.Errorf("marshal message %v error: %v", reflect.TypeOf(msg), err)
  183. return
  184. }
  185. err = a.conn.WriteMsg(data...)
  186. if err != nil {
  187. log.Errorf("write message %v error: %v", reflect.TypeOf(msg), err)
  188. }
  189. }
  190. }
  191. func (a *clientAgent) MarshalMsg(msg interface{}) ([][]byte, error) {
  192. if a.gate.Processor != nil {
  193. data, err := a.gate.Processor.Marshal(msg)
  194. if err != nil {
  195. log.Errorf("marshal message %v error: %v", reflect.TypeOf(msg), err)
  196. return nil, err
  197. }
  198. return data, err
  199. }
  200. return nil, errors.New("MarshalMsg no gate.Processor")
  201. }
  202. func (a *clientAgent) WriteBytes(data [][]byte) {
  203. err := a.conn.WriteMsg(data...)
  204. if err != nil {
  205. log.Errorf("write message error: %v", err)
  206. }
  207. }
  208. func (a *clientAgent) LocalAddr() net.Addr {
  209. return a.conn.LocalAddr()
  210. }
  211. func (a *clientAgent) RemoteAddr() net.Addr {
  212. return a.conn.RemoteAddr()
  213. }
  214. func (a *clientAgent) Close() {
  215. a.conn.Close()
  216. }
  217. func (a *clientAgent) Destroy() {
  218. a.conn.Destroy()
  219. }
  220. func (a *clientAgent) UserData() interface{} {
  221. a.mutexUserData.Lock()
  222. defer a.mutexUserData.Unlock()
  223. return a.userData
  224. }
  225. func (a *clientAgent) SetUserData(data interface{}) {
  226. a.mutexUserData.Lock()
  227. defer a.mutexUserData.Unlock()
  228. a.userData = data
  229. }