exec.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. // 注册通道的消息处理函数,从通道接收数据后交给处理函数,处理结果发送给结果通道
  2. //一个server对应多个client,
  3. //每个client对外提供函数调用接口,ra, err := c.Call1("add", 1, 2)
  4. //把消息打包后发给服务端来处理,在结果通道接收函数结果
  5. package handler
  6. import (
  7. "errors"
  8. "fmt"
  9. "leafstalk/conf"
  10. "leafstalk/covenant/monitor"
  11. "leafstalk/log"
  12. "runtime"
  13. "time"
  14. )
  15. //协程开客户端后执行函数调用是线程安全的 c := s.Open(10) err := c.Call0("f0")
  16. // one server per goroutine (goroutine not safe)
  17. // one client per goroutine (goroutine not safe)
  18. type Server struct {
  19. // id -> function
  20. //
  21. // function:
  22. // func(args []interface{})
  23. // func(args []interface{}) interface{}
  24. // func(args []interface{}) []interface{}
  25. functions map[interface{}]interface{}
  26. ChanCall chan *CallInfo
  27. WarnSpanCount uint
  28. }
  29. // CallInfo 函数、参数、结果接收通道 包装
  30. type CallInfo struct {
  31. f interface{} // 函数
  32. args []interface{} // 函数参数
  33. chanRet chan *RetInfo //函数结果写入通道
  34. cb interface{}
  35. }
  36. // RetInfo 函数执行结果
  37. type RetInfo struct {
  38. // nil
  39. // interface{}
  40. // []interface{}
  41. ret interface{} //函数执行返回结果
  42. err error
  43. // callback:
  44. // func(err error)
  45. // func(ret interface{}, err error)
  46. // func(ret []interface{}, err error)
  47. cb interface{}
  48. }
  49. type Client struct {
  50. s *Server
  51. chanSyncRet chan *RetInfo //接收函数结果的同步通道
  52. ChanAsynRet chan *RetInfo //接收函数异步执行结果的通道
  53. pendingAsynCall int
  54. }
  55. // NewServer 可以缓冲多个函数调用
  56. func NewServer(l int) *Server {
  57. s := new(Server)
  58. s.functions = make(map[interface{}]interface{})
  59. s.ChanCall = make(chan *CallInfo, l)
  60. return s
  61. }
  62. func assert(i interface{}) []interface{} {
  63. if i == nil {
  64. return nil
  65. } else {
  66. return i.([]interface{})
  67. }
  68. }
  69. // Register 注册结构体的处理函数
  70. // you must call the function before calling Open and Go
  71. func (s *Server) Register(id interface{}, f interface{}) {
  72. switch f.(type) {
  73. case func([]interface{}):
  74. case func([]interface{}) interface{}:
  75. case func([]interface{}) []interface{}:
  76. default:
  77. panic(fmt.Sprintf("function id %v: definition of function is invalid", id))
  78. }
  79. if _, ok := s.functions[id]; ok {
  80. panic(fmt.Sprintf("function id %v: already registered", id))
  81. }
  82. s.functions[id] = f
  83. }
  84. // ret 函数执行结果写入发给接收通道
  85. func (s *Server) ret(ci *CallInfo, ri *RetInfo) (err error) {
  86. if ci.chanRet == nil {
  87. return
  88. }
  89. defer func() {
  90. if r := recover(); r != nil {
  91. err = r.(error)
  92. }
  93. }()
  94. ri.cb = ci.cb
  95. ci.chanRet <- ri
  96. return
  97. }
  98. func (s *Server) exec(ci *CallInfo) (err error) {
  99. defer monitor.GoExecTimeoutWarn(ci.f, ci.args, time.Now())
  100. defer func() {
  101. if r := recover(); r != nil {
  102. if conf.LenStackBuf > 0 {
  103. buf := make([]byte, conf.LenStackBuf)
  104. l := runtime.Stack(buf, false)
  105. err = fmt.Errorf("%v: %s", r, buf[:l])
  106. } else {
  107. err = fmt.Errorf("%v", r)
  108. }
  109. s.ret(ci, &RetInfo{err: fmt.Errorf("%v", r)})
  110. }
  111. }()
  112. // execute
  113. switch ci.f.(type) {
  114. case func([]interface{}):
  115. ci.f.(func([]interface{}))(ci.args)
  116. return s.ret(ci, &RetInfo{})
  117. case func([]interface{}) interface{}:
  118. ret := ci.f.(func([]interface{}) interface{})(ci.args)
  119. return s.ret(ci, &RetInfo{ret: ret})
  120. case func([]interface{}) []interface{}:
  121. ret := ci.f.(func([]interface{}) []interface{})(ci.args)
  122. return s.ret(ci, &RetInfo{ret: ret})
  123. }
  124. panic("bug")
  125. }
  126. // Exec 执行函数,返回结果
  127. func (s *Server) Exec(ci *CallInfo) {
  128. err := s.exec(ci)
  129. if err != nil {
  130. log.Errorf("%v", err)
  131. }
  132. }
  133. // Go 发送消息标示及参数,让系统去执行,不要结果
  134. // goroutine safe
  135. func (s *Server) Go(id interface{}, args ...interface{}) {
  136. f := s.functions[id]
  137. if f == nil {
  138. log.Warnf("Go no found %v", id)
  139. // for k, v := range s.functions {
  140. // log.Info(k, v)
  141. // }
  142. return
  143. }
  144. defer func() {
  145. if err := recover(); err != nil {
  146. log.Warnf("Go exception %v", id)
  147. }
  148. }()
  149. s.ChanCall <- &CallInfo{
  150. f: f,
  151. args: args,
  152. }
  153. s.WarnSpanCount++
  154. if s.WarnSpanCount > 5000 {
  155. s.WarnSpanCount = 0
  156. l := len(s.ChanCall)
  157. cl := cap(s.ChanCall)
  158. if l > cl/2 {
  159. log.Warnf("Go ChanCall len %v / %v", l, cl)
  160. }
  161. }
  162. }
  163. // goroutine safe
  164. func (s *Server) Call0(id interface{}, args ...interface{}) error {
  165. return s.Open(0).Call0(id, args...)
  166. }
  167. // goroutine safe
  168. func (s *Server) Call1(id interface{}, args ...interface{}) (interface{}, error) {
  169. return s.Open(0).Call1(id, args...)
  170. }
  171. // goroutine safe
  172. func (s *Server) CallN(id interface{}, args ...interface{}) ([]interface{}, error) {
  173. return s.Open(0).CallN(id, args...)
  174. }
  175. // 通道中未处理的消息,直接返回错误
  176. func (s *Server) Close() {
  177. close(s.ChanCall)
  178. for ci := range s.ChanCall {
  179. s.ret(ci, &RetInfo{
  180. err: errors.New("chanrpc server closed"),
  181. })
  182. }
  183. }
  184. // goroutine safe
  185. func (s *Server) Open(l int) *Client {
  186. c := NewClient(l)
  187. c.Attach(s)
  188. return c
  189. }
  190. func NewClient(l int) *Client {
  191. c := new(Client)
  192. c.chanSyncRet = make(chan *RetInfo, 1)
  193. c.ChanAsynRet = make(chan *RetInfo, l)
  194. return c
  195. }
  196. func (c *Client) Attach(s *Server) {
  197. c.s = s
  198. }
  199. // call 调用函数信息 发往执行通道
  200. func (c *Client) call(ci *CallInfo, block bool) (err error) {
  201. defer func() {
  202. if r := recover(); r != nil {
  203. err = r.(error)
  204. }
  205. }()
  206. if block {
  207. c.s.ChanCall <- ci
  208. } else {
  209. select {
  210. case c.s.ChanCall <- ci:
  211. default:
  212. err = errors.New("chanrpc channel full")
  213. }
  214. }
  215. return
  216. }
  217. // f 查找函数
  218. func (c *Client) f(id interface{}, n int) (f interface{}, err error) {
  219. if c.s == nil {
  220. err = errors.New("server not attached")
  221. return
  222. }
  223. f = c.s.functions[id]
  224. if f == nil {
  225. err = fmt.Errorf("function id %v: function not registered", id)
  226. return
  227. }
  228. var ok bool
  229. switch n {
  230. case 0:
  231. _, ok = f.(func([]interface{}))
  232. case 1:
  233. _, ok = f.(func([]interface{}) interface{})
  234. case 2:
  235. _, ok = f.(func([]interface{}) []interface{})
  236. default:
  237. panic("bug")
  238. }
  239. if !ok {
  240. err = fmt.Errorf("function id %v: return type mismatch", id)
  241. }
  242. return
  243. }
  244. // Call0 让其他协程执行函数,并等待执行结果
  245. func (c *Client) Call0(id interface{}, args ...interface{}) error {
  246. f, err := c.f(id, 0)
  247. if err != nil {
  248. return err
  249. }
  250. err = c.call(&CallInfo{
  251. f: f,
  252. args: args,
  253. chanRet: c.chanSyncRet,
  254. }, true)
  255. if err != nil {
  256. return err
  257. }
  258. ri := <-c.chanSyncRet
  259. return ri.err
  260. }
  261. // Call1 让其他协程执行函数,并等待执行结果
  262. func (c *Client) Call1(id interface{}, args ...interface{}) (interface{}, error) {
  263. f, err := c.f(id, 1)
  264. if err != nil {
  265. return nil, err
  266. }
  267. err = c.call(&CallInfo{
  268. f: f,
  269. args: args,
  270. chanRet: c.chanSyncRet,
  271. }, true)
  272. if err != nil {
  273. return nil, err
  274. }
  275. ri := <-c.chanSyncRet
  276. return ri.ret, ri.err
  277. }
  278. // CallN 让其他协程执行函数,并等待执行结果
  279. func (c *Client) CallN(id interface{}, args ...interface{}) ([]interface{}, error) {
  280. f, err := c.f(id, 2)
  281. if err != nil {
  282. return nil, err
  283. }
  284. err = c.call(&CallInfo{
  285. f: f,
  286. args: args,
  287. chanRet: c.chanSyncRet,
  288. }, true)
  289. if err != nil {
  290. return nil, err
  291. }
  292. ri := <-c.chanSyncRet
  293. return assert(ri.ret), ri.err
  294. }
  295. // asynCall 让其他协程执行函数,返回参数是否正确
  296. func (c *Client) asynCall(id interface{}, args []interface{}, cb interface{}, n int) {
  297. f, err := c.f(id, n)
  298. if err != nil {
  299. c.ChanAsynRet <- &RetInfo{err: err, cb: cb}
  300. return
  301. }
  302. err = c.call(&CallInfo{
  303. f: f,
  304. args: args,
  305. chanRet: c.ChanAsynRet,
  306. cb: cb,
  307. }, false)
  308. if err != nil {
  309. c.ChanAsynRet <- &RetInfo{err: err, cb: cb}
  310. return
  311. }
  312. }
  313. // AsynCall 让其他协程执行函数,返回参数是否正确????
  314. func (c *Client) AsynCall(id interface{}, _args ...interface{}) {
  315. if len(_args) < 1 {
  316. panic("callback function not found")
  317. }
  318. args := _args[:len(_args)-1]
  319. cb := _args[len(_args)-1]
  320. var n int
  321. switch cb.(type) {
  322. case func(error):
  323. n = 0
  324. case func(interface{}, error):
  325. n = 1
  326. case func([]interface{}, error):
  327. n = 2
  328. default:
  329. panic("definition of callback function is invalid")
  330. }
  331. // too many calls
  332. if c.pendingAsynCall >= cap(c.ChanAsynRet) {
  333. execCb(&RetInfo{err: errors.New("too many calls"), cb: cb})
  334. return
  335. }
  336. c.asynCall(id, args, cb, n)
  337. c.pendingAsynCall++
  338. }
  339. func execCb(ri *RetInfo) {
  340. defer func() {
  341. if r := recover(); r != nil {
  342. if conf.LenStackBuf > 0 {
  343. buf := make([]byte, conf.LenStackBuf)
  344. l := runtime.Stack(buf, false)
  345. log.Errorf("%v: %s", r, buf[:l])
  346. } else {
  347. log.Errorf("%v", r)
  348. }
  349. }
  350. }()
  351. // execute
  352. switch ri.cb.(type) {
  353. case func(error):
  354. ri.cb.(func(error))(ri.err)
  355. case func(interface{}, error):
  356. ri.cb.(func(interface{}, error))(ri.ret, ri.err)
  357. case func([]interface{}, error):
  358. ri.cb.(func([]interface{}, error))(assert(ri.ret), ri.err)
  359. default:
  360. panic("bug")
  361. }
  362. return
  363. }
  364. // Cb 异步调用函数时,函数结束,结果发过来了,这里执行绑定的处理函数
  365. func (c *Client) Cb(ri *RetInfo) {
  366. c.pendingAsynCall--
  367. execCb(ri)
  368. }
  369. // 对于有结果的异步调用进行最后的处理
  370. func (c *Client) Close() {
  371. for c.pendingAsynCall > 0 {
  372. c.Cb(<-c.ChanAsynRet)
  373. }
  374. }
  375. func (c *Client) Idle() bool {
  376. return c.pendingAsynCall == 0
  377. }