dbserver.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. package dbserver
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "leafstalk/covenant/monitor"
  7. "leafstalk/log"
  8. "reflect"
  9. "sync"
  10. "time"
  11. "xorm.io/xorm"
  12. )
  13. type DbServer struct {
  14. saveCacheList *CacheSaveList
  15. dbEngine *xorm.Engine
  16. cancel context.CancelFunc
  17. wg sync.WaitGroup
  18. }
  19. type SaveDBItem interface {
  20. TableName() string
  21. QueryExist(eng *xorm.Engine) (bool, error)
  22. UpdateDB(eng *xorm.Engine) (int64, error)
  23. GetUniqueKey() string //表中唯一标识
  24. }
  25. func (s *DbServer) Start(routinueNum int, eng *xorm.Engine) {
  26. s.dbEngine = eng
  27. s.saveCacheList = NewCacheList()
  28. if routinueNum < 0 {
  29. routinueNum = 1
  30. }
  31. if routinueNum > 10 {
  32. routinueNum = 10
  33. }
  34. ctx, cancel := context.WithCancel(context.Background())
  35. s.cancel = cancel
  36. for i := 0; i < routinueNum; i++ {
  37. s.wg.Add(1)
  38. go func() {
  39. defer s.wg.Done()
  40. s.processOneItem(ctx)
  41. }()
  42. }
  43. }
  44. func (s *DbServer) Stop() {
  45. s.cancel()
  46. s.wg.Wait()
  47. for {
  48. v := s.saveCacheList.Pop()
  49. if v == nil {
  50. return
  51. }
  52. v1, ok := v.(SaveDBItem)
  53. if !ok {
  54. log.Warnln("dbserver.v.(SaveDBItem) error.")
  55. continue
  56. }
  57. err := s.saveCacheItem(v1, false)
  58. if err == nil {
  59. continue
  60. }
  61. sj, err2 := json.Marshal(v1)
  62. if err2 != nil {
  63. log.Warnf("dbserver.Stop error. %v %v", err, err2)
  64. continue
  65. }
  66. log.Warnf("dbserver.saveCacheItem error. %v %v", err, sj)
  67. }
  68. }
  69. // 从列表中取数据保存,可设缓存时间,保存失败需要暂停一秒,否则一直取到无数据;
  70. // 无数据后,休息一会再看是否有新数据
  71. func (s *DbServer) processOneItem(ctx context.Context) {
  72. //ticker := time.NewTicker(time.Millisecond * 50)
  73. for {
  74. t := s.saveCacheList.GetHeadTick()
  75. if t > 0 && t+3 < time.Now().Unix() {
  76. v := s.saveCacheList.Pop()
  77. if v != nil {
  78. v1, ok := v.(SaveDBItem)
  79. if !ok {
  80. log.Warnf("processOneItem to SaveDBItem error %#v", v)
  81. continue
  82. }
  83. err := s.saveCacheItem(v1, true)
  84. if err == nil {
  85. continue
  86. }
  87. time.Sleep(time.Second)
  88. log.Warn(err)
  89. }
  90. }
  91. select {
  92. case <-ctx.Done():
  93. fmt.Println("processOneItem exit.")
  94. return
  95. case <-time.After(time.Millisecond * 50):
  96. }
  97. }
  98. }
  99. func getStructName(request SaveDBItem) (string, error) {
  100. msgType := reflect.TypeOf(request)
  101. if msgType == nil || msgType.Kind() != reflect.Ptr {
  102. return "", fmt.Errorf("getStructName found error struct type")
  103. }
  104. type1 := msgType.Elem().Name()
  105. return type1, nil
  106. }
  107. func GetKey(request SaveDBItem) (string, error) {
  108. playerID := request.GetUniqueKey()
  109. key, err := getStructName(request)
  110. if err != nil {
  111. return "", err
  112. }
  113. key = key + "_" + playerID
  114. return key, nil
  115. }
  116. func (s *DbServer) Add(it SaveDBItem) int {
  117. k, err := GetKey(it)
  118. if err != nil {
  119. log.Errorf("GetKey error %#v", it)
  120. return -1
  121. }
  122. return s.saveCacheList.Add(k, it)
  123. }
  124. func (s *DbServer) AddnxByType(it SaveDBItem) {
  125. k, err := GetKey(it)
  126. if err != nil {
  127. log.Errorf("GetKey error %#v", it)
  128. return
  129. }
  130. s.saveCacheList.Addnx(k, it)
  131. }
  132. func (s *DbServer) saveCacheItem(v SaveDBItem, AddIfFail bool) error {
  133. defer monitor.MySqlExecTimeoutWarn(v.TableName(), v, time.Now())
  134. defer func() {
  135. if r := recover(); r != nil {
  136. err := r.(error)
  137. log.Errorln(err.Error())
  138. }
  139. }()
  140. has, err := v.QueryExist(s.dbEngine)
  141. if err != nil {
  142. if AddIfFail {
  143. s.AddnxByType(v)
  144. }
  145. log.Warnln(err)
  146. return err
  147. }
  148. if has {
  149. _, err := v.UpdateDB(s.dbEngine)
  150. if err != nil {
  151. if AddIfFail {
  152. s.AddnxByType(v)
  153. }
  154. log.Warnln(err)
  155. return err
  156. }
  157. } else {
  158. _, err := s.dbEngine.Insert(v)
  159. if err != nil {
  160. if AddIfFail {
  161. s.AddnxByType(v)
  162. }
  163. log.Warnln(err)
  164. return err
  165. }
  166. }
  167. return nil
  168. }