// 注册通道的消息处理函数,从通道接收数据后交给处理函数,处理结果发送给结果通道 //一个server对应多个client, //每个client对外提供函数调用接口,ra, err := c.Call1("add", 1, 2) //把消息打包后发给服务端来处理,在结果通道接收函数结果 package handler import ( "errors" "fmt" "leafstalk/conf" "leafstalk/covenant/monitor" "leafstalk/log" "runtime" "time" ) //协程开客户端后执行函数调用是线程安全的 c := s.Open(10) err := c.Call0("f0") // one server per goroutine (goroutine not safe) // one client per goroutine (goroutine not safe) type Server struct { // id -> function // // function: // func(args []interface{}) // func(args []interface{}) interface{} // func(args []interface{}) []interface{} functions map[interface{}]interface{} ChanCall chan *CallInfo WarnSpanCount uint } // CallInfo 函数、参数、结果接收通道 包装 type CallInfo struct { f interface{} // 函数 args []interface{} // 函数参数 chanRet chan *RetInfo //函数结果写入通道 cb interface{} } // RetInfo 函数执行结果 type RetInfo struct { // nil // interface{} // []interface{} ret interface{} //函数执行返回结果 err error // callback: // func(err error) // func(ret interface{}, err error) // func(ret []interface{}, err error) cb interface{} } type Client struct { s *Server chanSyncRet chan *RetInfo //接收函数结果的同步通道 ChanAsynRet chan *RetInfo //接收函数异步执行结果的通道 pendingAsynCall int } // NewServer 可以缓冲多个函数调用 func NewServer(l int) *Server { s := new(Server) s.functions = make(map[interface{}]interface{}) s.ChanCall = make(chan *CallInfo, l) return s } func assert(i interface{}) []interface{} { if i == nil { return nil } else { return i.([]interface{}) } } // Register 注册结构体的处理函数 // you must call the function before calling Open and Go func (s *Server) Register(id interface{}, f interface{}) { switch f.(type) { case func([]interface{}): case func([]interface{}) interface{}: case func([]interface{}) []interface{}: default: panic(fmt.Sprintf("function id %v: definition of function is invalid", id)) } if _, ok := s.functions[id]; ok { panic(fmt.Sprintf("function id %v: already registered", id)) } s.functions[id] = f } // ret 函数执行结果写入发给接收通道 func (s *Server) ret(ci *CallInfo, ri *RetInfo) (err error) { if ci.chanRet == nil { return } defer func() { if r := recover(); r != nil { err = r.(error) } }() ri.cb = ci.cb ci.chanRet <- ri return } func (s *Server) exec(ci *CallInfo) (err error) { defer monitor.GoExecTimeoutWarn(ci.f, ci.args, time.Now()) defer func() { if r := recover(); r != nil { if conf.LenStackBuf > 0 { buf := make([]byte, conf.LenStackBuf) l := runtime.Stack(buf, false) err = fmt.Errorf("%v: %s", r, buf[:l]) } else { err = fmt.Errorf("%v", r) } s.ret(ci, &RetInfo{err: fmt.Errorf("%v", r)}) } }() // execute switch ci.f.(type) { case func([]interface{}): ci.f.(func([]interface{}))(ci.args) return s.ret(ci, &RetInfo{}) case func([]interface{}) interface{}: ret := ci.f.(func([]interface{}) interface{})(ci.args) return s.ret(ci, &RetInfo{ret: ret}) case func([]interface{}) []interface{}: ret := ci.f.(func([]interface{}) []interface{})(ci.args) return s.ret(ci, &RetInfo{ret: ret}) } panic("bug") } // Exec 执行函数,返回结果 func (s *Server) Exec(ci *CallInfo) { err := s.exec(ci) if err != nil { log.Errorf("%v", err) } } // Go 发送消息标示及参数,让系统去执行,不要结果 // goroutine safe func (s *Server) Go(id interface{}, args ...interface{}) { f := s.functions[id] if f == nil { log.Warnf("Go no found %v", id) // for k, v := range s.functions { // log.Info(k, v) // } return } defer func() { if err := recover(); err != nil { log.Warnf("Go exception %v", id) } }() s.ChanCall <- &CallInfo{ f: f, args: args, } s.WarnSpanCount++ if s.WarnSpanCount > 5000 { s.WarnSpanCount = 0 l := len(s.ChanCall) cl := cap(s.ChanCall) if l > cl/2 { log.Warnf("Go ChanCall len %v / %v", l, cl) } } } // goroutine safe func (s *Server) Call0(id interface{}, args ...interface{}) error { return s.Open(0).Call0(id, args...) } // goroutine safe func (s *Server) Call1(id interface{}, args ...interface{}) (interface{}, error) { return s.Open(0).Call1(id, args...) } // goroutine safe func (s *Server) CallN(id interface{}, args ...interface{}) ([]interface{}, error) { return s.Open(0).CallN(id, args...) } // 通道中未处理的消息,直接返回错误 func (s *Server) Close() { close(s.ChanCall) for ci := range s.ChanCall { s.ret(ci, &RetInfo{ err: errors.New("chanrpc server closed"), }) } } // goroutine safe func (s *Server) Open(l int) *Client { c := NewClient(l) c.Attach(s) return c } func NewClient(l int) *Client { c := new(Client) c.chanSyncRet = make(chan *RetInfo, 1) c.ChanAsynRet = make(chan *RetInfo, l) return c } func (c *Client) Attach(s *Server) { c.s = s } // call 调用函数信息 发往执行通道 func (c *Client) call(ci *CallInfo, block bool) (err error) { defer func() { if r := recover(); r != nil { err = r.(error) } }() if block { c.s.ChanCall <- ci } else { select { case c.s.ChanCall <- ci: default: err = errors.New("chanrpc channel full") } } return } // f 查找函数 func (c *Client) f(id interface{}, n int) (f interface{}, err error) { if c.s == nil { err = errors.New("server not attached") return } f = c.s.functions[id] if f == nil { err = fmt.Errorf("function id %v: function not registered", id) return } var ok bool switch n { case 0: _, ok = f.(func([]interface{})) case 1: _, ok = f.(func([]interface{}) interface{}) case 2: _, ok = f.(func([]interface{}) []interface{}) default: panic("bug") } if !ok { err = fmt.Errorf("function id %v: return type mismatch", id) } return } // Call0 让其他协程执行函数,并等待执行结果 func (c *Client) Call0(id interface{}, args ...interface{}) error { f, err := c.f(id, 0) if err != nil { return err } err = c.call(&CallInfo{ f: f, args: args, chanRet: c.chanSyncRet, }, true) if err != nil { return err } ri := <-c.chanSyncRet return ri.err } // Call1 让其他协程执行函数,并等待执行结果 func (c *Client) Call1(id interface{}, args ...interface{}) (interface{}, error) { f, err := c.f(id, 1) if err != nil { return nil, err } err = c.call(&CallInfo{ f: f, args: args, chanRet: c.chanSyncRet, }, true) if err != nil { return nil, err } ri := <-c.chanSyncRet return ri.ret, ri.err } // CallN 让其他协程执行函数,并等待执行结果 func (c *Client) CallN(id interface{}, args ...interface{}) ([]interface{}, error) { f, err := c.f(id, 2) if err != nil { return nil, err } err = c.call(&CallInfo{ f: f, args: args, chanRet: c.chanSyncRet, }, true) if err != nil { return nil, err } ri := <-c.chanSyncRet return assert(ri.ret), ri.err } // asynCall 让其他协程执行函数,返回参数是否正确 func (c *Client) asynCall(id interface{}, args []interface{}, cb interface{}, n int) { f, err := c.f(id, n) if err != nil { c.ChanAsynRet <- &RetInfo{err: err, cb: cb} return } err = c.call(&CallInfo{ f: f, args: args, chanRet: c.ChanAsynRet, cb: cb, }, false) if err != nil { c.ChanAsynRet <- &RetInfo{err: err, cb: cb} return } } // AsynCall 让其他协程执行函数,返回参数是否正确???? func (c *Client) AsynCall(id interface{}, _args ...interface{}) { if len(_args) < 1 { panic("callback function not found") } args := _args[:len(_args)-1] cb := _args[len(_args)-1] var n int switch cb.(type) { case func(error): n = 0 case func(interface{}, error): n = 1 case func([]interface{}, error): n = 2 default: panic("definition of callback function is invalid") } // too many calls if c.pendingAsynCall >= cap(c.ChanAsynRet) { execCb(&RetInfo{err: errors.New("too many calls"), cb: cb}) return } c.asynCall(id, args, cb, n) c.pendingAsynCall++ } func execCb(ri *RetInfo) { defer func() { if r := recover(); r != nil { if conf.LenStackBuf > 0 { buf := make([]byte, conf.LenStackBuf) l := runtime.Stack(buf, false) log.Errorf("%v: %s", r, buf[:l]) } else { log.Errorf("%v", r) } } }() // execute switch ri.cb.(type) { case func(error): ri.cb.(func(error))(ri.err) case func(interface{}, error): ri.cb.(func(interface{}, error))(ri.ret, ri.err) case func([]interface{}, error): ri.cb.(func([]interface{}, error))(assert(ri.ret), ri.err) default: panic("bug") } return } // Cb 异步调用函数时,函数结束,结果发过来了,这里执行绑定的处理函数 func (c *Client) Cb(ri *RetInfo) { c.pendingAsynCall-- execCb(ri) } // 对于有结果的异步调用进行最后的处理 func (c *Client) Close() { for c.pendingAsynCall > 0 { c.Cb(<-c.ChanAsynRet) } } func (c *Client) Idle() bool { return c.pendingAsynCall == 0 }