123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322 |
- package redisdo
- import (
- "errors"
- "leafstalk/covenant/monitor"
- "strconv"
- "sync"
- "time"
- "github.com/gomodule/redigo/redis"
- )
- type RedisPool struct {
- pool *redis.Pool
- closeFlag bool
- wg sync.WaitGroup
- }
- func NewRedisPool(addr string, password string, db int) *RedisPool {
- pool := redis.Pool{
- MaxIdle: 10,
- IdleTimeout: 240 * time.Second,
- // Dial or DialContext must be set. When both are set, DialContext takes precedence over Dial.
- Dial: func() (redis.Conn, error) {
- c, err := redis.Dial("tcp", addr)
- if err != nil {
- return nil, err
- }
- if len(password) != 0 {
- if _, err := c.Do("AUTH", password); err != nil {
- c.Close()
- return nil, err
- }
- }
- if _, err := c.Do("SELECT", db); err != nil {
- c.Close()
- return nil, err
- }
- return c, nil
- },
- TestOnBorrow: func(c redis.Conn, t time.Time) error {
- if time.Since(t) < time.Minute {
- return nil
- }
- _, err := c.Do("PING")
- return err
- },
- }
- rp := new(RedisPool)
- rp.pool = &pool
- return rp
- }
- // CloseRedisPool 关闭reids池
- func (r *RedisPool) Close() {
- r.closeFlag = true
- if r.pool != nil {
- r.pool.Close()
- }
- r.wg.Wait()
- }
- func (r *RedisPool) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
- defer monitor.RedisDoTimeoutWarn("Do", map[string]interface{}{
- "commandName": commandName,
- "args": args,
- }, time.Now())
- conn := r.pool.Get()
- defer conn.Close()
- return conn.Do(commandName, args...)
- }
- func (r *RedisPool) DoMulti(f func(conn redis.Conn) error) error {
- defer monitor.RedisDoTimeoutWarn("DoMulti", "", time.Now())
- conn := r.pool.Get()
- defer conn.Close()
- return f(conn)
- }
- // 保存一个集合
- func (r *RedisPool) DoFlat(commandName string, key string, val interface{}) (reply interface{}, err error) {
- conn := r.pool.Get()
- defer conn.Close()
- //组织为串
- return conn.Do(commandName, redis.Args{key}.AddFlat(val)...)
- }
- func (r *RedisPool) IsNilError(err error) bool {
- return err == redis.ErrNil
- }
- func (r *RedisPool) DoHashSet(key string, val interface{}) (reply interface{}, err error) {
- conn := r.pool.Get()
- defer conn.Close()
- //以hash类型保存
- return conn.Do("hmset", redis.Args{key}.AddFlat(val)...)
- }
- func (r *RedisPool) DoHashGet(key string, strc interface{}) (interface{}, error) {
- conn := r.pool.Get()
- defer conn.Close()
- //获取缓存
- value, err := redis.Values(conn.Do("hgetall", key))
- if err != nil {
- if r.IsNilError(err) {
- return nil, nil
- }
- return nil, err
- }
- if len(value) == 0 {
- return nil, errors.New("value is nil")
- }
- //将values转成结构体 只能使用简单
- err = redis.ScanStruct(value, strc)
- if err != nil {
- return nil, err
- }
- return strc, nil
- }
- // 模糊匹配搜索
- func (r *RedisPool) ScanKeys(start int, pattern string, count int) (int, []string, error) {
- rep, err := r.Do("scan", start, "match", pattern, "count", count)
- if err != nil {
- return 0, nil, err
- }
- rep2 := rep.([]interface{})
- if len(rep2) != 2 {
- return 0, nil, errors.New("replay length is error")
- }
- nextSt := ""
- if strSt, ok := rep2[0].([]byte); !ok {
- return 0, nil, errors.New("replay type is error")
- } else {
- nextSt = string(strSt)
- }
- var lst []string
- if ks, ok := rep2[1].([]interface{}); !ok {
- return 0, nil, errors.New("replay type is error")
- } else {
- for _, v := range ks {
- if nv, ok := v.([]byte); ok {
- k := string(nv)
- lst = append(lst, k)
- }
- }
- }
- st, err := strconv.Atoi(nextSt)
- if err != nil {
- return 0, lst, nil
- }
- return st, lst, nil
- }
- func (r *RedisPool) ColScan(scanCmd string, key string, start int, pattern string, count int) (int, interface{}, error) {
- rep, err := redis.Values(r.Do(scanCmd, key, start, "match", pattern, "count", count))
- if err != nil {
- return 0, nil, err
- }
- next, err := redis.Int(rep[0], err)
- if err != nil {
- return 0, nil, err
- }
- // kvs, err := redis.StringMap(rep[1], err)
- // if err != nil {
- // return 0, nil, err
- // }
- // st, err := strconv.Atoi(next)
- // if err != nil {
- // return 0, kvs, nil
- // }
- return next, rep[1], err
- }
- func (r *RedisPool) Subscribe(subject string, callBack func(string, []byte)) chan struct{} {
- done := make(chan struct{}, 1)
- r.wg.Add(1)
- go r.subscribe(subject, callBack, done)
- return done
- }
- func (r *RedisPool) StopSubscribe(done chan struct{}) {
- done <- struct{}{}
- }
- func (r *RedisPool) subscribe(subject string, callBack func(string, []byte), done chan struct{}) {
- defer r.wg.Done()
- defer close(done)
- doneInternal := make(chan struct{}, 1)
- defer close(doneInternal)
- ticker := time.NewTicker(time.Minute)
- defer ticker.Stop()
- stopSubscribe := false
- var wg sync.WaitGroup
- for {
- conn := r.pool.Get()
- psc := redis.PubSubConn{Conn: conn}
- if err := psc.Subscribe(subject); err != nil {
- // fmt.Printf("failed to subscribe to channel %s: %v", subject, err)
- goto sleep
- }
- wg.Add(1)
- go func() {
- defer wg.Done()
- for {
- switch msg := psc.Receive().(type) {
- case redis.Message:
- // 接收到消息
- // fmt.Printf("Received message: %s\n", msg.Data)
- callBack(msg.Channel, msg.Data)
- case redis.Subscription:
- // 订阅、取消订阅的事件处理
- // fmt.Printf("Subscription: %s %d\n", msg.Channel, msg.Count)
- if msg.Count == 0 {
- doneInternal <- struct{}{}
- return
- }
- case error:
- // 处理错误
- doneInternal <- struct{}{}
- // 打印错误日志?
- return
- case redis.Pong:
- continue
- }
- }
- }()
- timer:
- for {
- select {
- case <-ticker.C:
- // 发送 ping 测试连接是否正常,如果不正常,接收函数会出错退出.
- if err := psc.Ping(""); err != nil {
- break timer
- }
- case <-doneInternal:
- break timer
- case <-done:
- stopSubscribe = true
- psc.Unsubscribe()
- }
- }
- sleep:
- wg.Wait()
- conn.Close()
- for i := 0; i < 60; i++ {
- if r.closeFlag || stopSubscribe {
- return
- }
- time.Sleep(time.Millisecond * 50)
- }
- }
- }
- func test2() {
- //onlinestate.Do("SET", "foo", "bar")
- // currentTimeStart := time.Now()
- // onlinestate.DoMulti(func(conn redis.Conn) error {
- // for i := 0; i < 100000; i++ {
- // conn.Do("INCR", "TESTKEY")
- // }
- // return err
- // })
- // fmt.Println(time.Since(currentTimeStart))
- // currentTimeStart = time.Now()
- // onlinestate.DoMulti(func(conn redis.Conn) error {
- // for i := 0; i < 100000; i++ {
- // conn.Send("INCR", "TESTKEY")
- // }
- // conn.Flush()
- // conn.Receive()
- // return err
- // })
- // fmt.Println(time.Since(currentTimeStart))
- // currentTimeStart = time.Now()
- // onlinestate.DoMulti(func(conn redis.Conn) error {
- // for i := 0; i < 100000; i++ {
- // conn.Send("INCR", "TESTKEY")
- // conn.Flush()
- // }
- // conn.Receive()
- // return err
- // })
- // fmt.Println(time.Since(currentTimeStart))
- // done := shareRedisPool.Subscribe("playerlogin", func(s string, b []byte) {
- // fmt.Printf("Received message1: %s %s \n", s, b)
- // })
- // time.Sleep(time.Second * 20)
- // shareRedisPool.StopSubscribe(done)
- }
|