package redisdo import ( "errors" "leafstalk/covenant/monitor" "strconv" "sync" "time" "github.com/gomodule/redigo/redis" ) type RedisPool struct { pool *redis.Pool closeFlag bool wg sync.WaitGroup } func NewRedisPool(addr string, password string, db int) *RedisPool { pool := redis.Pool{ MaxIdle: 10, IdleTimeout: 240 * time.Second, // Dial or DialContext must be set. When both are set, DialContext takes precedence over Dial. Dial: func() (redis.Conn, error) { c, err := redis.Dial("tcp", addr) if err != nil { return nil, err } if len(password) != 0 { if _, err := c.Do("AUTH", password); err != nil { c.Close() return nil, err } } if _, err := c.Do("SELECT", db); err != nil { c.Close() return nil, err } return c, nil }, TestOnBorrow: func(c redis.Conn, t time.Time) error { if time.Since(t) < time.Minute { return nil } _, err := c.Do("PING") return err }, } rp := new(RedisPool) rp.pool = &pool return rp } // CloseRedisPool 关闭reids池 func (r *RedisPool) Close() { r.closeFlag = true if r.pool != nil { r.pool.Close() } r.wg.Wait() } func (r *RedisPool) Do(commandName string, args ...interface{}) (reply interface{}, err error) { defer monitor.RedisDoTimeoutWarn("Do", map[string]interface{}{ "commandName": commandName, "args": args, }, time.Now()) conn := r.pool.Get() defer conn.Close() return conn.Do(commandName, args...) } func (r *RedisPool) DoMulti(f func(conn redis.Conn) error) error { defer monitor.RedisDoTimeoutWarn("DoMulti", "", time.Now()) conn := r.pool.Get() defer conn.Close() return f(conn) } // 保存一个集合 func (r *RedisPool) DoFlat(commandName string, key string, val interface{}) (reply interface{}, err error) { conn := r.pool.Get() defer conn.Close() //组织为串 return conn.Do(commandName, redis.Args{key}.AddFlat(val)...) } func (r *RedisPool) IsNilError(err error) bool { return err == redis.ErrNil } func (r *RedisPool) DoHashSet(key string, val interface{}) (reply interface{}, err error) { conn := r.pool.Get() defer conn.Close() //以hash类型保存 return conn.Do("hmset", redis.Args{key}.AddFlat(val)...) } func (r *RedisPool) DoHashGet(key string, strc interface{}) (interface{}, error) { conn := r.pool.Get() defer conn.Close() //获取缓存 value, err := redis.Values(conn.Do("hgetall", key)) if err != nil { if r.IsNilError(err) { return nil, nil } return nil, err } if len(value) == 0 { return nil, errors.New("value is nil") } //将values转成结构体 只能使用简单 err = redis.ScanStruct(value, strc) if err != nil { return nil, err } return strc, nil } // 模糊匹配搜索 func (r *RedisPool) ScanKeys(start int, pattern string, count int) (int, []string, error) { rep, err := r.Do("scan", start, "match", pattern, "count", count) if err != nil { return 0, nil, err } rep2 := rep.([]interface{}) if len(rep2) != 2 { return 0, nil, errors.New("replay length is error") } nextSt := "" if strSt, ok := rep2[0].([]byte); !ok { return 0, nil, errors.New("replay type is error") } else { nextSt = string(strSt) } var lst []string if ks, ok := rep2[1].([]interface{}); !ok { return 0, nil, errors.New("replay type is error") } else { for _, v := range ks { if nv, ok := v.([]byte); ok { k := string(nv) lst = append(lst, k) } } } st, err := strconv.Atoi(nextSt) if err != nil { return 0, lst, nil } return st, lst, nil } func (r *RedisPool) ColScan(scanCmd string, key string, start int, pattern string, count int) (int, interface{}, error) { rep, err := redis.Values(r.Do(scanCmd, key, start, "match", pattern, "count", count)) if err != nil { return 0, nil, err } next, err := redis.Int(rep[0], err) if err != nil { return 0, nil, err } // kvs, err := redis.StringMap(rep[1], err) // if err != nil { // return 0, nil, err // } // st, err := strconv.Atoi(next) // if err != nil { // return 0, kvs, nil // } return next, rep[1], err } func (r *RedisPool) Subscribe(subject string, callBack func(string, []byte)) chan struct{} { done := make(chan struct{}, 1) r.wg.Add(1) go r.subscribe(subject, callBack, done) return done } func (r *RedisPool) StopSubscribe(done chan struct{}) { done <- struct{}{} } func (r *RedisPool) subscribe(subject string, callBack func(string, []byte), done chan struct{}) { defer r.wg.Done() defer close(done) doneInternal := make(chan struct{}, 1) defer close(doneInternal) ticker := time.NewTicker(time.Minute) defer ticker.Stop() stopSubscribe := false var wg sync.WaitGroup for { conn := r.pool.Get() psc := redis.PubSubConn{Conn: conn} if err := psc.Subscribe(subject); err != nil { // fmt.Printf("failed to subscribe to channel %s: %v", subject, err) goto sleep } wg.Add(1) go func() { defer wg.Done() for { switch msg := psc.Receive().(type) { case redis.Message: // 接收到消息 // fmt.Printf("Received message: %s\n", msg.Data) callBack(msg.Channel, msg.Data) case redis.Subscription: // 订阅、取消订阅的事件处理 // fmt.Printf("Subscription: %s %d\n", msg.Channel, msg.Count) if msg.Count == 0 { doneInternal <- struct{}{} return } case error: // 处理错误 doneInternal <- struct{}{} // 打印错误日志? return case redis.Pong: continue } } }() timer: for { select { case <-ticker.C: // 发送 ping 测试连接是否正常,如果不正常,接收函数会出错退出. if err := psc.Ping(""); err != nil { break timer } case <-doneInternal: break timer case <-done: stopSubscribe = true psc.Unsubscribe() } } sleep: wg.Wait() conn.Close() for i := 0; i < 60; i++ { if r.closeFlag || stopSubscribe { return } time.Sleep(time.Millisecond * 50) } } } func test2() { //onlinestate.Do("SET", "foo", "bar") // currentTimeStart := time.Now() // onlinestate.DoMulti(func(conn redis.Conn) error { // for i := 0; i < 100000; i++ { // conn.Do("INCR", "TESTKEY") // } // return err // }) // fmt.Println(time.Since(currentTimeStart)) // currentTimeStart = time.Now() // onlinestate.DoMulti(func(conn redis.Conn) error { // for i := 0; i < 100000; i++ { // conn.Send("INCR", "TESTKEY") // } // conn.Flush() // conn.Receive() // return err // }) // fmt.Println(time.Since(currentTimeStart)) // currentTimeStart = time.Now() // onlinestate.DoMulti(func(conn redis.Conn) error { // for i := 0; i < 100000; i++ { // conn.Send("INCR", "TESTKEY") // conn.Flush() // } // conn.Receive() // return err // }) // fmt.Println(time.Since(currentTimeStart)) // done := shareRedisPool.Subscribe("playerlogin", func(s string, b []byte) { // fmt.Printf("Received message1: %s %s \n", s, b) // }) // time.Sleep(time.Second * 20) // shareRedisPool.StopSubscribe(done) }