123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196 |
- package dbserver
- import (
- "context"
- "encoding/json"
- "fmt"
- "leafstalk/covenant/monitor"
- "leafstalk/log"
- "reflect"
- "sync"
- "time"
- "xorm.io/xorm"
- )
- type DbServer struct {
- saveCacheList *CacheSaveList
- dbEngine *xorm.Engine
- cancel context.CancelFunc
- wg sync.WaitGroup
- }
- type SaveDBItem interface {
- TableName() string
- QueryExist(eng *xorm.Engine) (bool, error)
- UpdateDB(eng *xorm.Engine) (int64, error)
- GetUniqueKey() string //表中唯一标识
- }
- func (s *DbServer) Start(routinueNum int, eng *xorm.Engine) {
- s.dbEngine = eng
- s.saveCacheList = NewCacheList()
- if routinueNum < 0 {
- routinueNum = 1
- }
- if routinueNum > 10 {
- routinueNum = 10
- }
- ctx, cancel := context.WithCancel(context.Background())
- s.cancel = cancel
- for i := 0; i < routinueNum; i++ {
- s.wg.Add(1)
- go func() {
- defer s.wg.Done()
- s.processOneItem(ctx)
- }()
- }
- }
- func (s *DbServer) Stop() {
- s.cancel()
- s.wg.Wait()
- for {
- v := s.saveCacheList.Pop()
- if v == nil {
- return
- }
- v1, ok := v.(SaveDBItem)
- if !ok {
- log.Warnln("dbserver.v.(SaveDBItem) error.")
- continue
- }
- err := s.saveCacheItem(v1, false)
- if err == nil {
- continue
- }
- sj, err2 := json.Marshal(v1)
- if err2 != nil {
- log.Warnf("dbserver.Stop error. %v %v", err, err2)
- continue
- }
- log.Warnf("dbserver.saveCacheItem error. %v %v", err, sj)
- }
- }
- // 从列表中取数据保存,可设缓存时间,保存失败需要暂停一秒,否则一直取到无数据;
- // 无数据后,休息一会再看是否有新数据
- func (s *DbServer) processOneItem(ctx context.Context) {
- //ticker := time.NewTicker(time.Millisecond * 50)
- for {
- t := s.saveCacheList.GetHeadTick()
- if t > 0 && t+3 < time.Now().Unix() {
- v := s.saveCacheList.Pop()
- if v != nil {
- v1, ok := v.(SaveDBItem)
- if !ok {
- log.Warnf("processOneItem to SaveDBItem error %#v", v)
- continue
- }
- err := s.saveCacheItem(v1, true)
- if err == nil {
- continue
- }
- time.Sleep(time.Second)
- log.Warn(err)
- }
- }
- select {
- case <-ctx.Done():
- fmt.Println("processOneItem exit.")
- return
- case <-time.After(time.Millisecond * 50):
- }
- }
- }
- func getStructName(request SaveDBItem) (string, error) {
- msgType := reflect.TypeOf(request)
- if msgType == nil || msgType.Kind() != reflect.Ptr {
- return "", fmt.Errorf("getStructName found error struct type")
- }
- type1 := msgType.Elem().Name()
- return type1, nil
- }
- func GetKey(request SaveDBItem) (string, error) {
- playerID := request.GetUniqueKey()
- key, err := getStructName(request)
- if err != nil {
- return "", err
- }
- key = key + "_" + playerID
- return key, nil
- }
- func (s *DbServer) Add(it SaveDBItem) int {
- k, err := GetKey(it)
- if err != nil {
- log.Errorf("GetKey error %#v", it)
- return -1
- }
- return s.saveCacheList.Add(k, it)
- }
- func (s *DbServer) AddnxByType(it SaveDBItem) {
- k, err := GetKey(it)
- if err != nil {
- log.Errorf("GetKey error %#v", it)
- return
- }
- s.saveCacheList.Addnx(k, it)
- }
- func (s *DbServer) saveCacheItem(v SaveDBItem, AddIfFail bool) error {
- defer monitor.MySqlExecTimeoutWarn(v.TableName(), v, time.Now())
- defer func() {
- if r := recover(); r != nil {
- err := r.(error)
- log.Errorln(err.Error())
- }
- }()
- has, err := v.QueryExist(s.dbEngine)
- if err != nil {
- if AddIfFail {
- s.AddnxByType(v)
- }
- log.Warnln(err)
- return err
- }
- if has {
- _, err := v.UpdateDB(s.dbEngine)
- if err != nil {
- if AddIfFail {
- s.AddnxByType(v)
- }
- log.Warnln(err)
- return err
- }
- } else {
- _, err := s.dbEngine.Insert(v)
- if err != nil {
- if AddIfFail {
- s.AddnxByType(v)
- }
- log.Warnln(err)
- return err
- }
- }
- return nil
- }
|