123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130 |
- package g
- import (
- "container/list"
- "leafstalk/conf"
- "leafstalk/log"
- "runtime"
- "sync"
- )
- // one Go per goroutine (goroutine not safe)
- type Go struct {
- ChanCb chan func() //skeleton用来接收Go(f(),f())的第二个f()的通道
- pendingGo int
- }
- type LinearGo struct {
- f func()
- cb func()
- }
- type LinearContext struct {
- g *Go
- linearGo *list.List
- mutexLinearGo sync.Mutex
- mutexExecution sync.Mutex
- }
- func New(l int) *Go {
- g := new(Go)
- g.ChanCb = make(chan func(), l)
- return g
- }
- func (g *Go) Go(f func(), cb func()) {
- g.pendingGo++
- if f == nil {
- g.ChanCb <- cb
- return
- }
- go func() {
- defer func() {
- g.ChanCb <- cb
- if r := recover(); r != nil {
- if conf.LenStackBuf > 0 {
- buf := make([]byte, conf.LenStackBuf)
- l := runtime.Stack(buf, false)
- log.Errorf("%v: %s", r, buf[:l])
- } else {
- log.Errorf("%v", r)
- }
- }
- }()
- f()
- }()
- }
- func (g *Go) Cb(cb func()) {
- defer func() {
- g.pendingGo--
- if r := recover(); r != nil {
- if conf.LenStackBuf > 0 {
- buf := make([]byte, conf.LenStackBuf)
- l := runtime.Stack(buf, false)
- log.Errorf("%v: %s", r, buf[:l])
- } else {
- log.Errorf("%v", r)
- }
- }
- }()
- if cb != nil {
- cb()
- }
- }
- // 对于Go(f func(), cb func())异步执行结果进行处理
- func (g *Go) Close() {
- for g.pendingGo > 0 {
- g.Cb(<-g.ChanCb)
- }
- }
- func (g *Go) Idle() bool {
- return g.pendingGo == 0
- }
- func (g *Go) NewLinearContext() *LinearContext {
- c := new(LinearContext)
- c.g = g
- c.linearGo = list.New()
- return c
- }
- func (c *LinearContext) Go(f func(), cb func()) {
- c.g.pendingGo++
- c.mutexLinearGo.Lock()
- c.linearGo.PushBack(&LinearGo{f: f, cb: cb})
- c.mutexLinearGo.Unlock()
- go func() {
- c.mutexExecution.Lock()
- defer c.mutexExecution.Unlock()
- c.mutexLinearGo.Lock()
- e := c.linearGo.Remove(c.linearGo.Front()).(*LinearGo)
- c.mutexLinearGo.Unlock()
- defer func() {
- c.g.ChanCb <- e.cb
- if r := recover(); r != nil {
- if conf.LenStackBuf > 0 {
- buf := make([]byte, conf.LenStackBuf)
- l := runtime.Stack(buf, false)
- log.Errorf("%v: %s", r, buf[:l])
- } else {
- log.Errorf("%v", r)
- }
- }
- }()
- e.f()
- }()
- }
|