package module import ( "leafstalk/conf" "leafstalk/log" "leafstalk/module/console" g "leafstalk/module/go" "leafstalk/module/handler" "leafstalk/module/timer" "runtime" "time" ) // Skeleton 1 type Skeleton struct { GoLen int //go协程结果缓冲数 TimerDispatcherLen int //定时器任务通道缓冲长度 AsynCallLen int //协程调用结果通道缓冲长度 HandlerServer *handler.Server //路由服务 g *g.Go dispatcher *timer.Dispatcher client *handler.Client server *handler.Server commandServer *handler.Server ticker *timer.SubTicker //接收定时事件的接收器 } // NewSkeleton 构造 func NewSkeleton(conf *conf.Config) *Skeleton { skeleton := &Skeleton{ GoLen: conf.GetInt("skeleton.golen"), TimerDispatcherLen: conf.GetInt("skeleton.timerdispatcherlen"), AsynCallLen: conf.GetInt("skeleton.asyncalllen"), HandlerServer: handler.NewServer(conf.GetInt("skeleton.chanrpclen")), } skeleton.Init() return skeleton } // Init 初始结构变量 func (s *Skeleton) Init() { if s.GoLen <= 0 { s.GoLen = 0 } if s.TimerDispatcherLen <= 0 { s.TimerDispatcherLen = 0 } if s.AsynCallLen <= 0 { s.AsynCallLen = 0 } s.g = g.New(s.GoLen) s.dispatcher = timer.NewDispatcher(s.TimerDispatcherLen) s.client = handler.NewClient(s.AsynCallLen) s.server = s.HandlerServer s.ticker = timer.NewSubTicker(1) if s.server == nil { s.server = handler.NewServer(0) } s.commandServer = handler.NewServer(0) } // Run 通道数据调度 // 关闭时,未执行的返回错误;对于有结果的,处理结果,退出前清理完通道 func (s *Skeleton) Run(closeSig chan bool) { for { select { case <-closeSig: s.commandServer.Close() s.server.Close() for !s.g.Idle() || !s.client.Idle() { s.g.Close() s.client.Close() } return case ri := <-s.client.ChanAsynRet: s.client.Cb(ri) case ci := <-s.server.ChanCall: s.server.Exec(ci) case ci := <-s.commandServer.ChanCall: s.commandServer.Exec(ci) case cb := <-s.g.ChanCb: s.g.Cb(cb) case t := <-s.dispatcher.ChanTimer: t.Cb() case <-s.ticker.ChanTicker: if s.ticker.Cb != nil { s.ticker.Cb() } } } } // AfterFunc 1 func (s *Skeleton) AfterFunc(d time.Duration, cb func()) *timer.Timer { if s.TimerDispatcherLen == 0 { panic("invalid TimerDispatcherLen") } return s.dispatcher.AfterFunc(d, cb) } // CronFunc 1 func (s *Skeleton) CronFunc(cronExpr *timer.CronExpr, cb func()) *timer.Cron { if s.TimerDispatcherLen == 0 { panic("invalid TimerDispatcherLen") } return s.dispatcher.CronFunc(cronExpr, cb) } // Go 另开协程执行f,主协程执行cb func (s *Skeleton) Go(f func(), cb func()) { if s.GoLen == 0 { panic("invalid GoLen") } s.g.Go(f, cb) } // NewLinearContext 1 func (s *Skeleton) NewLinearContext() *g.LinearContext { if s.GoLen == 0 { panic("invalid GoLen") } return s.g.NewLinearContext() } // AsynCall 异步调用 func (s *Skeleton) AsynCall(server *handler.Server, id interface{}, args ...interface{}) { if s.AsynCallLen == 0 { panic("invalid AsynCallLen") } s.client.Attach(server) s.client.AsynCall(id, args...) } // RegisterChanRPC 注册结构体 func (s *Skeleton) RegisterChanRPC(id interface{}, f interface{}) { if s.HandlerServer == nil { panic("invalid HandlerServer") } s.server.Register(id, f) } // RegisterCommand 注册命令 func (s *Skeleton) RegisterCommand(name string, help string, f interface{}) { console.Register(name, help, f, s.commandServer) } // TimerChan 1 func (s *Skeleton) triggerTimer() { select { case s.ticker.ChanTicker <- struct{}{}: default: } } func (s *Skeleton) StartTicker(d time.Duration, f func()) { sf := func() { defer func() { if r := recover(); r != nil { if conf.LenStackBuf > 0 { buf := make([]byte, conf.LenStackBuf) l := runtime.Stack(buf, false) log.Errorf("%v: %s", r, buf[:l]) } else { log.Errorf("%v", r) } } }() f() } s.ticker.Cb = sf s.ticker.Ticker = time.NewTicker(d) s.ticker.Wg.Add(1) go func() { defer s.ticker.Wg.Done() for { select { case _, ok := <-s.ticker.Ticker.C: if !ok { return } s.triggerTimer() case <-s.ticker.QuitChan: s.ticker.Ticker.Stop() return } } }() } func (s *Skeleton) StopTicker() { s.ticker.QuitChan <- struct{}{} s.ticker.Wg.Wait() } func (s *Skeleton) GetMetrics() (int, int, int, int) { //console.Register(name, help, f, s.commandServer) d1 := len(s.client.ChanAsynRet) d2 := len(s.server.ChanCall) d3 := len(s.commandServer.ChanCall) d4 := len(s.dispatcher.ChanTimer) return d1, d2, d3, d4 }