skeleton.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. package module
  2. import (
  3. "leafstalk/conf"
  4. "leafstalk/log"
  5. "leafstalk/module/console"
  6. g "leafstalk/module/go"
  7. "leafstalk/module/handler"
  8. "leafstalk/module/timer"
  9. "runtime"
  10. "time"
  11. )
  12. // Skeleton 1
  13. type Skeleton struct {
  14. GoLen int //go协程结果缓冲数
  15. TimerDispatcherLen int //定时器任务通道缓冲长度
  16. AsynCallLen int //协程调用结果通道缓冲长度
  17. HandlerServer *handler.Server //路由服务
  18. g *g.Go
  19. dispatcher *timer.Dispatcher
  20. client *handler.Client
  21. server *handler.Server
  22. commandServer *handler.Server
  23. ticker *timer.SubTicker //接收定时事件的接收器
  24. }
  25. // NewSkeleton 构造
  26. func NewSkeleton(conf *conf.Config) *Skeleton {
  27. skeleton := &Skeleton{
  28. GoLen: conf.GetInt("skeleton.golen"),
  29. TimerDispatcherLen: conf.GetInt("skeleton.timerdispatcherlen"),
  30. AsynCallLen: conf.GetInt("skeleton.asyncalllen"),
  31. HandlerServer: handler.NewServer(conf.GetInt("skeleton.chanrpclen")),
  32. }
  33. skeleton.Init()
  34. return skeleton
  35. }
  36. // Init 初始结构变量
  37. func (s *Skeleton) Init() {
  38. if s.GoLen <= 0 {
  39. s.GoLen = 0
  40. }
  41. if s.TimerDispatcherLen <= 0 {
  42. s.TimerDispatcherLen = 0
  43. }
  44. if s.AsynCallLen <= 0 {
  45. s.AsynCallLen = 0
  46. }
  47. s.g = g.New(s.GoLen)
  48. s.dispatcher = timer.NewDispatcher(s.TimerDispatcherLen)
  49. s.client = handler.NewClient(s.AsynCallLen)
  50. s.server = s.HandlerServer
  51. s.ticker = timer.NewSubTicker(1)
  52. if s.server == nil {
  53. s.server = handler.NewServer(0)
  54. }
  55. s.commandServer = handler.NewServer(0)
  56. }
  57. // Run 通道数据调度
  58. // 关闭时,未执行的返回错误;对于有结果的,处理结果,退出前清理完通道
  59. func (s *Skeleton) Run(closeSig chan bool) {
  60. for {
  61. select {
  62. case <-closeSig:
  63. s.commandServer.Close()
  64. s.server.Close()
  65. for !s.g.Idle() || !s.client.Idle() {
  66. s.g.Close()
  67. s.client.Close()
  68. }
  69. return
  70. case ri := <-s.client.ChanAsynRet:
  71. s.client.Cb(ri)
  72. case ci := <-s.server.ChanCall:
  73. s.server.Exec(ci)
  74. case ci := <-s.commandServer.ChanCall:
  75. s.commandServer.Exec(ci)
  76. case cb := <-s.g.ChanCb:
  77. s.g.Cb(cb)
  78. case t := <-s.dispatcher.ChanTimer:
  79. t.Cb()
  80. case <-s.ticker.ChanTicker:
  81. if s.ticker.Cb != nil {
  82. s.ticker.Cb()
  83. }
  84. }
  85. }
  86. }
  87. // AfterFunc 1
  88. func (s *Skeleton) AfterFunc(d time.Duration, cb func()) *timer.Timer {
  89. if s.TimerDispatcherLen == 0 {
  90. panic("invalid TimerDispatcherLen")
  91. }
  92. return s.dispatcher.AfterFunc(d, cb)
  93. }
  94. // CronFunc 1
  95. func (s *Skeleton) CronFunc(cronExpr *timer.CronExpr, cb func()) *timer.Cron {
  96. if s.TimerDispatcherLen == 0 {
  97. panic("invalid TimerDispatcherLen")
  98. }
  99. return s.dispatcher.CronFunc(cronExpr, cb)
  100. }
  101. // Go 另开协程执行f,主协程执行cb
  102. func (s *Skeleton) Go(f func(), cb func()) {
  103. if s.GoLen == 0 {
  104. panic("invalid GoLen")
  105. }
  106. s.g.Go(f, cb)
  107. }
  108. // NewLinearContext 1
  109. func (s *Skeleton) NewLinearContext() *g.LinearContext {
  110. if s.GoLen == 0 {
  111. panic("invalid GoLen")
  112. }
  113. return s.g.NewLinearContext()
  114. }
  115. // AsynCall 异步调用
  116. func (s *Skeleton) AsynCall(server *handler.Server, id interface{}, args ...interface{}) {
  117. if s.AsynCallLen == 0 {
  118. panic("invalid AsynCallLen")
  119. }
  120. s.client.Attach(server)
  121. s.client.AsynCall(id, args...)
  122. }
  123. // RegisterChanRPC 注册结构体
  124. func (s *Skeleton) RegisterChanRPC(id interface{}, f interface{}) {
  125. if s.HandlerServer == nil {
  126. panic("invalid HandlerServer")
  127. }
  128. s.server.Register(id, f)
  129. }
  130. // RegisterCommand 注册命令
  131. func (s *Skeleton) RegisterCommand(name string, help string, f interface{}) {
  132. console.Register(name, help, f, s.commandServer)
  133. }
  134. // TimerChan 1
  135. func (s *Skeleton) triggerTimer() {
  136. select {
  137. case s.ticker.ChanTicker <- struct{}{}:
  138. default:
  139. }
  140. }
  141. func (s *Skeleton) StartTicker(d time.Duration, f func()) {
  142. sf := func() {
  143. defer func() {
  144. if r := recover(); r != nil {
  145. if conf.LenStackBuf > 0 {
  146. buf := make([]byte, conf.LenStackBuf)
  147. l := runtime.Stack(buf, false)
  148. log.Errorf("%v: %s", r, buf[:l])
  149. } else {
  150. log.Errorf("%v", r)
  151. }
  152. }
  153. }()
  154. f()
  155. }
  156. s.ticker.Cb = sf
  157. s.ticker.Ticker = time.NewTicker(d)
  158. s.ticker.Wg.Add(1)
  159. go func() {
  160. defer s.ticker.Wg.Done()
  161. for {
  162. select {
  163. case _, ok := <-s.ticker.Ticker.C:
  164. if !ok {
  165. return
  166. }
  167. s.triggerTimer()
  168. case <-s.ticker.QuitChan:
  169. s.ticker.Ticker.Stop()
  170. return
  171. }
  172. }
  173. }()
  174. }
  175. func (s *Skeleton) StopTicker() {
  176. s.ticker.QuitChan <- struct{}{}
  177. s.ticker.Wg.Wait()
  178. }
  179. func (s *Skeleton) GetMetrics() (int, int, int, int) {
  180. //console.Register(name, help, f, s.commandServer)
  181. d1 := len(s.client.ChanAsynRet)
  182. d2 := len(s.server.ChanCall)
  183. d3 := len(s.commandServer.ChanCall)
  184. d4 := len(s.dispatcher.ChanTimer)
  185. return d1, d2, d3, d4
  186. }