redispool.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. package redisdo
  2. import (
  3. "errors"
  4. "leafstalk/covenant/monitor"
  5. "strconv"
  6. "sync"
  7. "time"
  8. "github.com/gomodule/redigo/redis"
  9. )
  10. type RedisPool struct {
  11. pool *redis.Pool
  12. closeFlag bool
  13. wg sync.WaitGroup
  14. }
  15. func NewRedisPool(addr string, password string, db int) *RedisPool {
  16. pool := redis.Pool{
  17. MaxIdle: 10,
  18. IdleTimeout: 240 * time.Second,
  19. // Dial or DialContext must be set. When both are set, DialContext takes precedence over Dial.
  20. Dial: func() (redis.Conn, error) {
  21. c, err := redis.Dial("tcp", addr)
  22. if err != nil {
  23. return nil, err
  24. }
  25. if len(password) != 0 {
  26. if _, err := c.Do("AUTH", password); err != nil {
  27. c.Close()
  28. return nil, err
  29. }
  30. }
  31. if _, err := c.Do("SELECT", db); err != nil {
  32. c.Close()
  33. return nil, err
  34. }
  35. return c, nil
  36. },
  37. TestOnBorrow: func(c redis.Conn, t time.Time) error {
  38. if time.Since(t) < time.Minute {
  39. return nil
  40. }
  41. _, err := c.Do("PING")
  42. return err
  43. },
  44. }
  45. rp := new(RedisPool)
  46. rp.pool = &pool
  47. return rp
  48. }
  49. // CloseRedisPool 关闭reids池
  50. func (r *RedisPool) Close() {
  51. r.closeFlag = true
  52. if r.pool != nil {
  53. r.pool.Close()
  54. }
  55. r.wg.Wait()
  56. }
  57. func (r *RedisPool) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
  58. defer monitor.RedisDoTimeoutWarn("Do", map[string]interface{}{
  59. "commandName": commandName,
  60. "args": args,
  61. }, time.Now())
  62. conn := r.pool.Get()
  63. defer conn.Close()
  64. return conn.Do(commandName, args...)
  65. }
  66. func (r *RedisPool) DoMulti(f func(conn redis.Conn) error) error {
  67. defer monitor.RedisDoTimeoutWarn("DoMulti", "", time.Now())
  68. conn := r.pool.Get()
  69. defer conn.Close()
  70. return f(conn)
  71. }
  72. // 保存一个集合
  73. func (r *RedisPool) DoFlat(commandName string, key string, val interface{}) (reply interface{}, err error) {
  74. conn := r.pool.Get()
  75. defer conn.Close()
  76. //组织为串
  77. return conn.Do(commandName, redis.Args{key}.AddFlat(val)...)
  78. }
  79. func (r *RedisPool) IsNilError(err error) bool {
  80. return err == redis.ErrNil
  81. }
  82. func (r *RedisPool) DoHashSet(key string, val interface{}) (reply interface{}, err error) {
  83. conn := r.pool.Get()
  84. defer conn.Close()
  85. //以hash类型保存
  86. return conn.Do("hmset", redis.Args{key}.AddFlat(val)...)
  87. }
  88. func (r *RedisPool) DoHashGet(key string, strc interface{}) (interface{}, error) {
  89. conn := r.pool.Get()
  90. defer conn.Close()
  91. //获取缓存
  92. value, err := redis.Values(conn.Do("hgetall", key))
  93. if err != nil {
  94. if r.IsNilError(err) {
  95. return nil, nil
  96. }
  97. return nil, err
  98. }
  99. if len(value) == 0 {
  100. return nil, errors.New("value is nil")
  101. }
  102. //将values转成结构体 只能使用简单
  103. err = redis.ScanStruct(value, strc)
  104. if err != nil {
  105. return nil, err
  106. }
  107. return strc, nil
  108. }
  109. // 模糊匹配搜索
  110. func (r *RedisPool) ScanKeys(start int, pattern string, count int) (int, []string, error) {
  111. rep, err := r.Do("scan", start, "match", pattern, "count", count)
  112. if err != nil {
  113. return 0, nil, err
  114. }
  115. rep2 := rep.([]interface{})
  116. if len(rep2) != 2 {
  117. return 0, nil, errors.New("replay length is error")
  118. }
  119. nextSt := ""
  120. if strSt, ok := rep2[0].([]byte); !ok {
  121. return 0, nil, errors.New("replay type is error")
  122. } else {
  123. nextSt = string(strSt)
  124. }
  125. var lst []string
  126. if ks, ok := rep2[1].([]interface{}); !ok {
  127. return 0, nil, errors.New("replay type is error")
  128. } else {
  129. for _, v := range ks {
  130. if nv, ok := v.([]byte); ok {
  131. k := string(nv)
  132. lst = append(lst, k)
  133. }
  134. }
  135. }
  136. st, err := strconv.Atoi(nextSt)
  137. if err != nil {
  138. return 0, lst, nil
  139. }
  140. return st, lst, nil
  141. }
  142. func (r *RedisPool) ColScan(scanCmd string, key string, start int, pattern string, count int) (int, interface{}, error) {
  143. rep, err := redis.Values(r.Do(scanCmd, key, start, "match", pattern, "count", count))
  144. if err != nil {
  145. return 0, nil, err
  146. }
  147. next, err := redis.Int(rep[0], err)
  148. if err != nil {
  149. return 0, nil, err
  150. }
  151. // kvs, err := redis.StringMap(rep[1], err)
  152. // if err != nil {
  153. // return 0, nil, err
  154. // }
  155. // st, err := strconv.Atoi(next)
  156. // if err != nil {
  157. // return 0, kvs, nil
  158. // }
  159. return next, rep[1], err
  160. }
  161. func (r *RedisPool) Subscribe(subject string, callBack func(string, []byte)) chan struct{} {
  162. done := make(chan struct{}, 1)
  163. r.wg.Add(1)
  164. go r.subscribe(subject, callBack, done)
  165. return done
  166. }
  167. func (r *RedisPool) StopSubscribe(done chan struct{}) {
  168. done <- struct{}{}
  169. }
  170. func (r *RedisPool) subscribe(subject string, callBack func(string, []byte), done chan struct{}) {
  171. defer r.wg.Done()
  172. defer close(done)
  173. doneInternal := make(chan struct{}, 1)
  174. defer close(doneInternal)
  175. ticker := time.NewTicker(time.Minute)
  176. defer ticker.Stop()
  177. stopSubscribe := false
  178. var wg sync.WaitGroup
  179. for {
  180. conn := r.pool.Get()
  181. psc := redis.PubSubConn{Conn: conn}
  182. if err := psc.Subscribe(subject); err != nil {
  183. // fmt.Printf("failed to subscribe to channel %s: %v", subject, err)
  184. goto sleep
  185. }
  186. wg.Add(1)
  187. go func() {
  188. defer wg.Done()
  189. for {
  190. switch msg := psc.Receive().(type) {
  191. case redis.Message:
  192. // 接收到消息
  193. // fmt.Printf("Received message: %s\n", msg.Data)
  194. callBack(msg.Channel, msg.Data)
  195. case redis.Subscription:
  196. // 订阅、取消订阅的事件处理
  197. // fmt.Printf("Subscription: %s %d\n", msg.Channel, msg.Count)
  198. if msg.Count == 0 {
  199. doneInternal <- struct{}{}
  200. return
  201. }
  202. case error:
  203. // 处理错误
  204. doneInternal <- struct{}{}
  205. // 打印错误日志?
  206. return
  207. case redis.Pong:
  208. continue
  209. }
  210. }
  211. }()
  212. timer:
  213. for {
  214. select {
  215. case <-ticker.C:
  216. // 发送 ping 测试连接是否正常,如果不正常,接收函数会出错退出.
  217. if err := psc.Ping(""); err != nil {
  218. break timer
  219. }
  220. case <-doneInternal:
  221. break timer
  222. case <-done:
  223. stopSubscribe = true
  224. psc.Unsubscribe()
  225. }
  226. }
  227. sleep:
  228. wg.Wait()
  229. conn.Close()
  230. for i := 0; i < 60; i++ {
  231. if r.closeFlag || stopSubscribe {
  232. return
  233. }
  234. time.Sleep(time.Millisecond * 50)
  235. }
  236. }
  237. }
  238. func test2() {
  239. //onlinestate.Do("SET", "foo", "bar")
  240. // currentTimeStart := time.Now()
  241. // onlinestate.DoMulti(func(conn redis.Conn) error {
  242. // for i := 0; i < 100000; i++ {
  243. // conn.Do("INCR", "TESTKEY")
  244. // }
  245. // return err
  246. // })
  247. // fmt.Println(time.Since(currentTimeStart))
  248. // currentTimeStart = time.Now()
  249. // onlinestate.DoMulti(func(conn redis.Conn) error {
  250. // for i := 0; i < 100000; i++ {
  251. // conn.Send("INCR", "TESTKEY")
  252. // }
  253. // conn.Flush()
  254. // conn.Receive()
  255. // return err
  256. // })
  257. // fmt.Println(time.Since(currentTimeStart))
  258. // currentTimeStart = time.Now()
  259. // onlinestate.DoMulti(func(conn redis.Conn) error {
  260. // for i := 0; i < 100000; i++ {
  261. // conn.Send("INCR", "TESTKEY")
  262. // conn.Flush()
  263. // }
  264. // conn.Receive()
  265. // return err
  266. // })
  267. // fmt.Println(time.Since(currentTimeStart))
  268. // done := shareRedisPool.Subscribe("playerlogin", func(s string, b []byte) {
  269. // fmt.Printf("Received message1: %s %s \n", s, b)
  270. // })
  271. // time.Sleep(time.Second * 20)
  272. // shareRedisPool.StopSubscribe(done)
  273. }