go.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. package g
  2. import (
  3. "container/list"
  4. "leafstalk/conf"
  5. "leafstalk/log"
  6. "runtime"
  7. "sync"
  8. )
  9. // one Go per goroutine (goroutine not safe)
  10. type Go struct {
  11. ChanCb chan func() //skeleton用来接收Go(f(),f())的第二个f()的通道
  12. pendingGo int
  13. }
  14. type LinearGo struct {
  15. f func()
  16. cb func()
  17. }
  18. type LinearContext struct {
  19. g *Go
  20. linearGo *list.List
  21. mutexLinearGo sync.Mutex
  22. mutexExecution sync.Mutex
  23. }
  24. func New(l int) *Go {
  25. g := new(Go)
  26. g.ChanCb = make(chan func(), l)
  27. return g
  28. }
  29. func (g *Go) Go(f func(), cb func()) {
  30. g.pendingGo++
  31. if f == nil {
  32. g.ChanCb <- cb
  33. return
  34. }
  35. go func() {
  36. defer func() {
  37. g.ChanCb <- cb
  38. if r := recover(); r != nil {
  39. if conf.LenStackBuf > 0 {
  40. buf := make([]byte, conf.LenStackBuf)
  41. l := runtime.Stack(buf, false)
  42. log.Errorf("%v: %s", r, buf[:l])
  43. } else {
  44. log.Errorf("%v", r)
  45. }
  46. }
  47. }()
  48. f()
  49. }()
  50. }
  51. func (g *Go) Cb(cb func()) {
  52. defer func() {
  53. g.pendingGo--
  54. if r := recover(); r != nil {
  55. if conf.LenStackBuf > 0 {
  56. buf := make([]byte, conf.LenStackBuf)
  57. l := runtime.Stack(buf, false)
  58. log.Errorf("%v: %s", r, buf[:l])
  59. } else {
  60. log.Errorf("%v", r)
  61. }
  62. }
  63. }()
  64. if cb != nil {
  65. cb()
  66. }
  67. }
  68. // 对于Go(f func(), cb func())异步执行结果进行处理
  69. func (g *Go) Close() {
  70. for g.pendingGo > 0 {
  71. g.Cb(<-g.ChanCb)
  72. }
  73. }
  74. func (g *Go) Idle() bool {
  75. return g.pendingGo == 0
  76. }
  77. func (g *Go) NewLinearContext() *LinearContext {
  78. c := new(LinearContext)
  79. c.g = g
  80. c.linearGo = list.New()
  81. return c
  82. }
  83. func (c *LinearContext) Go(f func(), cb func()) {
  84. c.g.pendingGo++
  85. c.mutexLinearGo.Lock()
  86. c.linearGo.PushBack(&LinearGo{f: f, cb: cb})
  87. c.mutexLinearGo.Unlock()
  88. go func() {
  89. c.mutexExecution.Lock()
  90. defer c.mutexExecution.Unlock()
  91. c.mutexLinearGo.Lock()
  92. e := c.linearGo.Remove(c.linearGo.Front()).(*LinearGo)
  93. c.mutexLinearGo.Unlock()
  94. defer func() {
  95. c.g.ChanCb <- e.cb
  96. if r := recover(); r != nil {
  97. if conf.LenStackBuf > 0 {
  98. buf := make([]byte, conf.LenStackBuf)
  99. l := runtime.Stack(buf, false)
  100. log.Errorf("%v: %s", r, buf[:l])
  101. } else {
  102. log.Errorf("%v", r)
  103. }
  104. }
  105. }()
  106. e.f()
  107. }()
  108. }