package g import ( "container/list" "leafstalk/conf" "leafstalk/log" "runtime" "sync" ) // one Go per goroutine (goroutine not safe) type Go struct { ChanCb chan func() //skeleton用来接收Go(f(),f())的第二个f()的通道 pendingGo int } type LinearGo struct { f func() cb func() } type LinearContext struct { g *Go linearGo *list.List mutexLinearGo sync.Mutex mutexExecution sync.Mutex } func New(l int) *Go { g := new(Go) g.ChanCb = make(chan func(), l) return g } func (g *Go) Go(f func(), cb func()) { g.pendingGo++ if f == nil { g.ChanCb <- cb return } go func() { defer func() { g.ChanCb <- cb if r := recover(); r != nil { if conf.LenStackBuf > 0 { buf := make([]byte, conf.LenStackBuf) l := runtime.Stack(buf, false) log.Errorf("%v: %s", r, buf[:l]) } else { log.Errorf("%v", r) } } }() f() }() } func (g *Go) Cb(cb func()) { defer func() { g.pendingGo-- if r := recover(); r != nil { if conf.LenStackBuf > 0 { buf := make([]byte, conf.LenStackBuf) l := runtime.Stack(buf, false) log.Errorf("%v: %s", r, buf[:l]) } else { log.Errorf("%v", r) } } }() if cb != nil { cb() } } // 对于Go(f func(), cb func())异步执行结果进行处理 func (g *Go) Close() { for g.pendingGo > 0 { g.Cb(<-g.ChanCb) } } func (g *Go) Idle() bool { return g.pendingGo == 0 } func (g *Go) NewLinearContext() *LinearContext { c := new(LinearContext) c.g = g c.linearGo = list.New() return c } func (c *LinearContext) Go(f func(), cb func()) { c.g.pendingGo++ c.mutexLinearGo.Lock() c.linearGo.PushBack(&LinearGo{f: f, cb: cb}) c.mutexLinearGo.Unlock() go func() { c.mutexExecution.Lock() defer c.mutexExecution.Unlock() c.mutexLinearGo.Lock() e := c.linearGo.Remove(c.linearGo.Front()).(*LinearGo) c.mutexLinearGo.Unlock() defer func() { c.g.ChanCb <- e.cb if r := recover(); r != nil { if conf.LenStackBuf > 0 { buf := make([]byte, conf.LenStackBuf) l := runtime.Stack(buf, false) log.Errorf("%v: %s", r, buf[:l]) } else { log.Errorf("%v", r) } } }() e.f() }() }