package dbserver import ( "context" "encoding/json" "fmt" "leafstalk/covenant/monitor" "leafstalk/log" "reflect" "sync" "time" "xorm.io/xorm" ) type DbServer struct { saveCacheList *CacheSaveList dbEngine *xorm.Engine cancel context.CancelFunc wg sync.WaitGroup } type SaveDBItem interface { TableName() string QueryExist(eng *xorm.Engine) (bool, error) UpdateDB(eng *xorm.Engine) (int64, error) GetUniqueKey() string //表中唯一标识 } func (s *DbServer) Start(routinueNum int, eng *xorm.Engine) { s.dbEngine = eng s.saveCacheList = NewCacheList() if routinueNum < 0 { routinueNum = 1 } if routinueNum > 10 { routinueNum = 10 } ctx, cancel := context.WithCancel(context.Background()) s.cancel = cancel for i := 0; i < routinueNum; i++ { s.wg.Add(1) go func() { defer s.wg.Done() s.processOneItem(ctx) }() } } func (s *DbServer) Stop() { s.cancel() s.wg.Wait() for { v := s.saveCacheList.Pop() if v == nil { return } v1, ok := v.(SaveDBItem) if !ok { log.Warnln("dbserver.v.(SaveDBItem) error.") continue } err := s.saveCacheItem(v1, false) if err == nil { continue } sj, err2 := json.Marshal(v1) if err2 != nil { log.Warnf("dbserver.Stop error. %v %v", err, err2) continue } log.Warnf("dbserver.saveCacheItem error. %v %v", err, sj) } } // 从列表中取数据保存,可设缓存时间,保存失败需要暂停一秒,否则一直取到无数据; // 无数据后,休息一会再看是否有新数据 func (s *DbServer) processOneItem(ctx context.Context) { //ticker := time.NewTicker(time.Millisecond * 50) for { t := s.saveCacheList.GetHeadTick() if t > 0 && t+3 < time.Now().Unix() { v := s.saveCacheList.Pop() if v != nil { v1, ok := v.(SaveDBItem) if !ok { log.Warnf("processOneItem to SaveDBItem error %#v", v) continue } err := s.saveCacheItem(v1, true) if err == nil { continue } time.Sleep(time.Second) log.Warn(err) } } select { case <-ctx.Done(): fmt.Println("processOneItem exit.") return case <-time.After(time.Millisecond * 50): } } } func getStructName(request SaveDBItem) (string, error) { msgType := reflect.TypeOf(request) if msgType == nil || msgType.Kind() != reflect.Ptr { return "", fmt.Errorf("getStructName found error struct type") } type1 := msgType.Elem().Name() return type1, nil } func GetKey(request SaveDBItem) (string, error) { playerID := request.GetUniqueKey() key, err := getStructName(request) if err != nil { return "", err } key = key + "_" + playerID return key, nil } func (s *DbServer) Add(it SaveDBItem) int { k, err := GetKey(it) if err != nil { log.Errorf("GetKey error %#v", it) return -1 } return s.saveCacheList.Add(k, it) } func (s *DbServer) AddnxByType(it SaveDBItem) { k, err := GetKey(it) if err != nil { log.Errorf("GetKey error %#v", it) return } s.saveCacheList.Addnx(k, it) } func (s *DbServer) saveCacheItem(v SaveDBItem, AddIfFail bool) error { defer monitor.MySqlExecTimeoutWarn(v.TableName(), v, time.Now()) defer func() { if r := recover(); r != nil { err := r.(error) log.Errorln(err.Error()) } }() has, err := v.QueryExist(s.dbEngine) if err != nil { if AddIfFail { s.AddnxByType(v) } log.Warnln(err) return err } if has { _, err := v.UpdateDB(s.dbEngine) if err != nil { if AddIfFail { s.AddnxByType(v) } log.Warnln(err) return err } } else { _, err := s.dbEngine.Insert(v) if err != nil { if AddIfFail { s.AddnxByType(v) } log.Warnln(err) return err } } return nil }