123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437 |
- // 注册通道的消息处理函数,从通道接收数据后交给处理函数,处理结果发送给结果通道
- //一个server对应多个client,
- //每个client对外提供函数调用接口,ra, err := c.Call1("add", 1, 2)
- //把消息打包后发给服务端来处理,在结果通道接收函数结果
- package handler
- import (
- "errors"
- "fmt"
- "leafstalk/conf"
- "leafstalk/covenant/monitor"
- "leafstalk/log"
- "runtime"
- "time"
- )
- //协程开客户端后执行函数调用是线程安全的 c := s.Open(10) err := c.Call0("f0")
- // one server per goroutine (goroutine not safe)
- // one client per goroutine (goroutine not safe)
- type Server struct {
- // id -> function
- //
- // function:
- // func(args []interface{})
- // func(args []interface{}) interface{}
- // func(args []interface{}) []interface{}
- functions map[interface{}]interface{}
- ChanCall chan *CallInfo
- WarnSpanCount uint
- }
- // CallInfo 函数、参数、结果接收通道 包装
- type CallInfo struct {
- f interface{} // 函数
- args []interface{} // 函数参数
- chanRet chan *RetInfo //函数结果写入通道
- cb interface{}
- }
- // RetInfo 函数执行结果
- type RetInfo struct {
- // nil
- // interface{}
- // []interface{}
- ret interface{} //函数执行返回结果
- err error
- // callback:
- // func(err error)
- // func(ret interface{}, err error)
- // func(ret []interface{}, err error)
- cb interface{}
- }
- type Client struct {
- s *Server
- chanSyncRet chan *RetInfo //接收函数结果的同步通道
- ChanAsynRet chan *RetInfo //接收函数异步执行结果的通道
- pendingAsynCall int
- }
- // NewServer 可以缓冲多个函数调用
- func NewServer(l int) *Server {
- s := new(Server)
- s.functions = make(map[interface{}]interface{})
- s.ChanCall = make(chan *CallInfo, l)
- return s
- }
- func assert(i interface{}) []interface{} {
- if i == nil {
- return nil
- } else {
- return i.([]interface{})
- }
- }
- // Register 注册结构体的处理函数
- // you must call the function before calling Open and Go
- func (s *Server) Register(id interface{}, f interface{}) {
- switch f.(type) {
- case func([]interface{}):
- case func([]interface{}) interface{}:
- case func([]interface{}) []interface{}:
- default:
- panic(fmt.Sprintf("function id %v: definition of function is invalid", id))
- }
- if _, ok := s.functions[id]; ok {
- panic(fmt.Sprintf("function id %v: already registered", id))
- }
- s.functions[id] = f
- }
- // ret 函数执行结果写入发给接收通道
- func (s *Server) ret(ci *CallInfo, ri *RetInfo) (err error) {
- if ci.chanRet == nil {
- return
- }
- defer func() {
- if r := recover(); r != nil {
- err = r.(error)
- }
- }()
- ri.cb = ci.cb
- ci.chanRet <- ri
- return
- }
- func (s *Server) exec(ci *CallInfo) (err error) {
- defer monitor.GoExecTimeoutWarn(ci.f, ci.args, time.Now())
- defer func() {
- if r := recover(); r != nil {
- if conf.LenStackBuf > 0 {
- buf := make([]byte, conf.LenStackBuf)
- l := runtime.Stack(buf, false)
- err = fmt.Errorf("%v: %s", r, buf[:l])
- } else {
- err = fmt.Errorf("%v", r)
- }
- s.ret(ci, &RetInfo{err: fmt.Errorf("%v", r)})
- }
- }()
- // execute
- switch ci.f.(type) {
- case func([]interface{}):
- ci.f.(func([]interface{}))(ci.args)
- return s.ret(ci, &RetInfo{})
- case func([]interface{}) interface{}:
- ret := ci.f.(func([]interface{}) interface{})(ci.args)
- return s.ret(ci, &RetInfo{ret: ret})
- case func([]interface{}) []interface{}:
- ret := ci.f.(func([]interface{}) []interface{})(ci.args)
- return s.ret(ci, &RetInfo{ret: ret})
- }
- panic("bug")
- }
- // Exec 执行函数,返回结果
- func (s *Server) Exec(ci *CallInfo) {
- err := s.exec(ci)
- if err != nil {
- log.Errorf("%v", err)
- }
- }
- // Go 发送消息标示及参数,让系统去执行,不要结果
- // goroutine safe
- func (s *Server) Go(id interface{}, args ...interface{}) {
- f := s.functions[id]
- if f == nil {
- log.Warnf("Go no found %v", id)
- // for k, v := range s.functions {
- // log.Info(k, v)
- // }
- return
- }
- defer func() {
- if err := recover(); err != nil {
- log.Warnf("Go exception %v", id)
- }
- }()
- s.ChanCall <- &CallInfo{
- f: f,
- args: args,
- }
- s.WarnSpanCount++
- if s.WarnSpanCount > 5000 {
- s.WarnSpanCount = 0
- l := len(s.ChanCall)
- cl := cap(s.ChanCall)
- if l > cl/2 {
- log.Warnf("Go ChanCall len %v / %v", l, cl)
- }
- }
- }
- // goroutine safe
- func (s *Server) Call0(id interface{}, args ...interface{}) error {
- return s.Open(0).Call0(id, args...)
- }
- // goroutine safe
- func (s *Server) Call1(id interface{}, args ...interface{}) (interface{}, error) {
- return s.Open(0).Call1(id, args...)
- }
- // goroutine safe
- func (s *Server) CallN(id interface{}, args ...interface{}) ([]interface{}, error) {
- return s.Open(0).CallN(id, args...)
- }
- // 通道中未处理的消息,直接返回错误
- func (s *Server) Close() {
- close(s.ChanCall)
- for ci := range s.ChanCall {
- s.ret(ci, &RetInfo{
- err: errors.New("chanrpc server closed"),
- })
- }
- }
- // goroutine safe
- func (s *Server) Open(l int) *Client {
- c := NewClient(l)
- c.Attach(s)
- return c
- }
- func NewClient(l int) *Client {
- c := new(Client)
- c.chanSyncRet = make(chan *RetInfo, 1)
- c.ChanAsynRet = make(chan *RetInfo, l)
- return c
- }
- func (c *Client) Attach(s *Server) {
- c.s = s
- }
- // call 调用函数信息 发往执行通道
- func (c *Client) call(ci *CallInfo, block bool) (err error) {
- defer func() {
- if r := recover(); r != nil {
- err = r.(error)
- }
- }()
- if block {
- c.s.ChanCall <- ci
- } else {
- select {
- case c.s.ChanCall <- ci:
- default:
- err = errors.New("chanrpc channel full")
- }
- }
- return
- }
- // f 查找函数
- func (c *Client) f(id interface{}, n int) (f interface{}, err error) {
- if c.s == nil {
- err = errors.New("server not attached")
- return
- }
- f = c.s.functions[id]
- if f == nil {
- err = fmt.Errorf("function id %v: function not registered", id)
- return
- }
- var ok bool
- switch n {
- case 0:
- _, ok = f.(func([]interface{}))
- case 1:
- _, ok = f.(func([]interface{}) interface{})
- case 2:
- _, ok = f.(func([]interface{}) []interface{})
- default:
- panic("bug")
- }
- if !ok {
- err = fmt.Errorf("function id %v: return type mismatch", id)
- }
- return
- }
- // Call0 让其他协程执行函数,并等待执行结果
- func (c *Client) Call0(id interface{}, args ...interface{}) error {
- f, err := c.f(id, 0)
- if err != nil {
- return err
- }
- err = c.call(&CallInfo{
- f: f,
- args: args,
- chanRet: c.chanSyncRet,
- }, true)
- if err != nil {
- return err
- }
- ri := <-c.chanSyncRet
- return ri.err
- }
- // Call1 让其他协程执行函数,并等待执行结果
- func (c *Client) Call1(id interface{}, args ...interface{}) (interface{}, error) {
- f, err := c.f(id, 1)
- if err != nil {
- return nil, err
- }
- err = c.call(&CallInfo{
- f: f,
- args: args,
- chanRet: c.chanSyncRet,
- }, true)
- if err != nil {
- return nil, err
- }
- ri := <-c.chanSyncRet
- return ri.ret, ri.err
- }
- // CallN 让其他协程执行函数,并等待执行结果
- func (c *Client) CallN(id interface{}, args ...interface{}) ([]interface{}, error) {
- f, err := c.f(id, 2)
- if err != nil {
- return nil, err
- }
- err = c.call(&CallInfo{
- f: f,
- args: args,
- chanRet: c.chanSyncRet,
- }, true)
- if err != nil {
- return nil, err
- }
- ri := <-c.chanSyncRet
- return assert(ri.ret), ri.err
- }
- // asynCall 让其他协程执行函数,返回参数是否正确
- func (c *Client) asynCall(id interface{}, args []interface{}, cb interface{}, n int) {
- f, err := c.f(id, n)
- if err != nil {
- c.ChanAsynRet <- &RetInfo{err: err, cb: cb}
- return
- }
- err = c.call(&CallInfo{
- f: f,
- args: args,
- chanRet: c.ChanAsynRet,
- cb: cb,
- }, false)
- if err != nil {
- c.ChanAsynRet <- &RetInfo{err: err, cb: cb}
- return
- }
- }
- // AsynCall 让其他协程执行函数,返回参数是否正确????
- func (c *Client) AsynCall(id interface{}, _args ...interface{}) {
- if len(_args) < 1 {
- panic("callback function not found")
- }
- args := _args[:len(_args)-1]
- cb := _args[len(_args)-1]
- var n int
- switch cb.(type) {
- case func(error):
- n = 0
- case func(interface{}, error):
- n = 1
- case func([]interface{}, error):
- n = 2
- default:
- panic("definition of callback function is invalid")
- }
- // too many calls
- if c.pendingAsynCall >= cap(c.ChanAsynRet) {
- execCb(&RetInfo{err: errors.New("too many calls"), cb: cb})
- return
- }
- c.asynCall(id, args, cb, n)
- c.pendingAsynCall++
- }
- func execCb(ri *RetInfo) {
- defer func() {
- if r := recover(); r != nil {
- if conf.LenStackBuf > 0 {
- buf := make([]byte, conf.LenStackBuf)
- l := runtime.Stack(buf, false)
- log.Errorf("%v: %s", r, buf[:l])
- } else {
- log.Errorf("%v", r)
- }
- }
- }()
- // execute
- switch ri.cb.(type) {
- case func(error):
- ri.cb.(func(error))(ri.err)
- case func(interface{}, error):
- ri.cb.(func(interface{}, error))(ri.ret, ri.err)
- case func([]interface{}, error):
- ri.cb.(func([]interface{}, error))(assert(ri.ret), ri.err)
- default:
- panic("bug")
- }
- return
- }
- // Cb 异步调用函数时,函数结束,结果发过来了,这里执行绑定的处理函数
- func (c *Client) Cb(ri *RetInfo) {
- c.pendingAsynCall--
- execCb(ri)
- }
- // 对于有结果的异步调用进行最后的处理
- func (c *Client) Close() {
- for c.pendingAsynCall > 0 {
- c.Cb(<-c.ChanAsynRet)
- }
- }
- func (c *Client) Idle() bool {
- return c.pendingAsynCall == 0
- }
|