123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215 |
- package module
- import (
- "leafstalk/conf"
- "leafstalk/log"
- "leafstalk/module/console"
- g "leafstalk/module/go"
- "leafstalk/module/handler"
- "leafstalk/module/timer"
- "runtime"
- "time"
- )
- // Skeleton 1
- type Skeleton struct {
- GoLen int //go协程结果缓冲数
- TimerDispatcherLen int //定时器任务通道缓冲长度
- AsynCallLen int //协程调用结果通道缓冲长度
- HandlerServer *handler.Server //路由服务
- g *g.Go
- dispatcher *timer.Dispatcher
- client *handler.Client
- server *handler.Server
- commandServer *handler.Server
- ticker *timer.SubTicker //接收定时事件的接收器
- }
- // NewSkeleton 构造
- func NewSkeleton(conf *conf.Config) *Skeleton {
- skeleton := &Skeleton{
- GoLen: conf.GetInt("skeleton.golen"),
- TimerDispatcherLen: conf.GetInt("skeleton.timerdispatcherlen"),
- AsynCallLen: conf.GetInt("skeleton.asyncalllen"),
- HandlerServer: handler.NewServer(conf.GetInt("skeleton.chanrpclen")),
- }
- skeleton.Init()
- return skeleton
- }
- // Init 初始结构变量
- func (s *Skeleton) Init() {
- if s.GoLen <= 0 {
- s.GoLen = 0
- }
- if s.TimerDispatcherLen <= 0 {
- s.TimerDispatcherLen = 0
- }
- if s.AsynCallLen <= 0 {
- s.AsynCallLen = 0
- }
- s.g = g.New(s.GoLen)
- s.dispatcher = timer.NewDispatcher(s.TimerDispatcherLen)
- s.client = handler.NewClient(s.AsynCallLen)
- s.server = s.HandlerServer
- s.ticker = timer.NewSubTicker(1)
- if s.server == nil {
- s.server = handler.NewServer(0)
- }
- s.commandServer = handler.NewServer(0)
- }
- // Run 通道数据调度
- // 关闭时,未执行的返回错误;对于有结果的,处理结果,退出前清理完通道
- func (s *Skeleton) Run(closeSig chan bool) {
- for {
- select {
- case <-closeSig:
- s.commandServer.Close()
- s.server.Close()
- for !s.g.Idle() || !s.client.Idle() {
- s.g.Close()
- s.client.Close()
- }
- return
- case ri := <-s.client.ChanAsynRet:
- s.client.Cb(ri)
- case ci := <-s.server.ChanCall:
- s.server.Exec(ci)
- case ci := <-s.commandServer.ChanCall:
- s.commandServer.Exec(ci)
- case cb := <-s.g.ChanCb:
- s.g.Cb(cb)
- case t := <-s.dispatcher.ChanTimer:
- t.Cb()
- case <-s.ticker.ChanTicker:
- if s.ticker.Cb != nil {
- s.ticker.Cb()
- }
- }
- }
- }
- // AfterFunc 1
- func (s *Skeleton) AfterFunc(d time.Duration, cb func()) *timer.Timer {
- if s.TimerDispatcherLen == 0 {
- panic("invalid TimerDispatcherLen")
- }
- return s.dispatcher.AfterFunc(d, cb)
- }
- // CronFunc 1
- func (s *Skeleton) CronFunc(cronExpr *timer.CronExpr, cb func()) *timer.Cron {
- if s.TimerDispatcherLen == 0 {
- panic("invalid TimerDispatcherLen")
- }
- return s.dispatcher.CronFunc(cronExpr, cb)
- }
- // Go 另开协程执行f,主协程执行cb
- func (s *Skeleton) Go(f func(), cb func()) {
- if s.GoLen == 0 {
- panic("invalid GoLen")
- }
- s.g.Go(f, cb)
- }
- // NewLinearContext 1
- func (s *Skeleton) NewLinearContext() *g.LinearContext {
- if s.GoLen == 0 {
- panic("invalid GoLen")
- }
- return s.g.NewLinearContext()
- }
- // AsynCall 异步调用
- func (s *Skeleton) AsynCall(server *handler.Server, id interface{}, args ...interface{}) {
- if s.AsynCallLen == 0 {
- panic("invalid AsynCallLen")
- }
- s.client.Attach(server)
- s.client.AsynCall(id, args...)
- }
- // RegisterChanRPC 注册结构体
- func (s *Skeleton) RegisterChanRPC(id interface{}, f interface{}) {
- if s.HandlerServer == nil {
- panic("invalid HandlerServer")
- }
- s.server.Register(id, f)
- }
- // RegisterCommand 注册命令
- func (s *Skeleton) RegisterCommand(name string, help string, f interface{}) {
- console.Register(name, help, f, s.commandServer)
- }
- // TimerChan 1
- func (s *Skeleton) triggerTimer() {
- select {
- case s.ticker.ChanTicker <- struct{}{}:
- default:
- }
- }
- func (s *Skeleton) StartTicker(d time.Duration, f func()) {
- sf := func() {
- defer func() {
- if r := recover(); r != nil {
- if conf.LenStackBuf > 0 {
- buf := make([]byte, conf.LenStackBuf)
- l := runtime.Stack(buf, false)
- log.Errorf("%v: %s", r, buf[:l])
- } else {
- log.Errorf("%v", r)
- }
- }
- }()
- f()
- }
- s.ticker.Cb = sf
- s.ticker.Ticker = time.NewTicker(d)
- s.ticker.Wg.Add(1)
- go func() {
- defer s.ticker.Wg.Done()
- for {
- select {
- case _, ok := <-s.ticker.Ticker.C:
- if !ok {
- return
- }
- s.triggerTimer()
- case <-s.ticker.QuitChan:
- s.ticker.Ticker.Stop()
- return
- }
- }
- }()
- }
- func (s *Skeleton) StopTicker() {
- s.ticker.QuitChan <- struct{}{}
- s.ticker.Wg.Wait()
- }
- func (s *Skeleton) GetMetrics() (int, int, int, int) {
- //console.Register(name, help, f, s.commandServer)
- d1 := len(s.client.ChanAsynRet)
- d2 := len(s.server.ChanCall)
- d3 := len(s.commandServer.ChanCall)
- d4 := len(s.dispatcher.ChanTimer)
- return d1, d2, d3, d4
- }
|