package dbserver import ( "context" "encoding/json" "errors" "fmt" "leafstalk/covenant/monitor" "leafstalk/log" slog "log" "os" "path/filepath" "sort" "sync" "time" "xorm.io/xorm" ) type DbSaveQueue struct { *xorm.Engine name string delayQueue *SortQueue chSaveEvent chan struct{} wg sync.WaitGroup cancel context.CancelFunc logFile *os.File logger *slog.Logger } var saveQueues map[string]*DbSaveQueue func NewDbSaveQueue(name string, eng *xorm.Engine) (*DbSaveQueue, error) { if saveQueues == nil { saveQueues = make(map[string]*DbSaveQueue) } if _, ok := saveQueues[name]; ok { return nil, errors.New("duplicate name") } sq, err := newDbSaveQueue(name, eng) if err != nil { return nil, err } saveQueues[name] = sq return sq, nil } func newDbSaveQueue(name string, eng *xorm.Engine) (*DbSaveQueue, error) { sq := new(DbSaveQueue) sq.name = name sq.delayQueue = NewSortQueue(100) sq.Engine = eng // dump文件 removeName := fmt.Sprintf("dumpToMysql%s.*.txt", name) if err := RemoveMatchedFiles(removeName); err != nil { return nil, err } ct := time.Now() fileName := fmt.Sprintf("dumpToMysql%s.%s.txt", name, ct.Format("20060102150405")) filePath := "./" + fileName logFile, err := os.Create(filePath) if err != nil { return nil, err } sq.logFile = logFile sq.logger = slog.New(logFile, "", slog.Ldate|slog.Ltime|slog.Lshortfile) // 开检测协程,保存延迟数据 sq.chSaveEvent = make(chan struct{}, 1) ctx, cancel := context.WithCancel(context.Background()) sq.cancel = cancel sq.wg.Add(1) go func() { defer sq.wg.Done() sq.checkTimeout(ctx, sq.chSaveEvent) }() return sq, nil } // request := new(model.Letter) // // request.Content = "1234" // letterSaveQueue.Set(request, 10, false) // letterSaveQueue.Set(request, 10, false) // dbserver.Stop() func Stop() { // 关闭存储协程 for _, sq := range saveQueues { sq.cancel() sq.wg.Wait() // 未存储数据需要写本地文件 sq.FlushAll() sq.logFile.Close() } saveQueues = nil } // 列表中的记录全部存库,存储失败则写文件 func (sq *DbSaveQueue) FlushAll() error { for _, it := range sq.delayQueue.heap { // 存库 it2, ok := it.value.(SaveDBItem) if !ok { szItem, err := json.Marshal(it.value) sq.logger.Print(string(szItem), err, "it.value.(SaveDBItem) error") if it.chEndEvent != nil { close(it.chEndEvent) } continue } err := sq.saveOneItem(it2) if err != nil { // 存库失败写文件 szItem, err := json.Marshal(it2) sq.logger.Print(string(szItem), err) } if it.chEndEvent != nil { close(it.chEndEvent) } } return nil } // needEndEvent 为true时,会建立一个独有的 channel,如果创建太多通道的话,性能应该是受影响的 // 所以needEndEvent最好是不要创建的太频繁 func (sq *DbSaveQueue) Set(obj SaveDBItem, delay int, needEndEvent bool) chan struct{} { if delay < 0 { delay = 0 } tick := time.Now().Unix() + int64(delay) ch := sq.delayQueue.Set(obj, tick, needEndEvent) select { case sq.chSaveEvent <- struct{}{}: default: } return ch } // 同步存库,存储后返回 func (sq *DbSaveQueue) Save(obj SaveDBItem) error { ch := sq.Set(obj, 0, true) <-ch return nil } // 同步,确保历史记录存库,本条记录不存库,但用到了OBJ的key func (sq *DbSaveQueue) FlushItem(obj SaveDBItem) error { tick := time.Now().Unix() ch := sq.delayQueue.SetUtil(obj, tick, true) if ch == nil { return nil } select { case sq.chSaveEvent <- struct{}{}: default: } <-ch return nil } // 协程保存 // 开多个协程进行保存工作 // 每个协程只能处理对应种类的数据,不能两个协程保存同类型数据 // 单个协程检查超时列表,取到数据后,执行存储操作 // 数据保存失败时写日志 func (sq *DbSaveQueue) checkTimeout(ctx context.Context, chSaveEvent chan struct{}) { ticker := time.NewTicker(time.Second) for { var ct time.Time select { case <-ctx.Done(): ticker.Stop() // log.Warnf("checkTimeout exit.") return // case <-time.After(time.Millisecond * 50): case ct = <-ticker.C: // fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "ticker") case <-chSaveEvent: ct = time.Now() } // fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "process") cst := ct.Unix() for { // 是否到时间 ci, saveTick := sq.delayQueue.First() if ci == nil || saveTick > cst { break } // 取第一个 it, chEndEvent := sq.delayQueue.PopItem() if it == nil { continue } dbItem, ok := it.(SaveDBItem) if !ok { // 写错误日志 szItem, err := json.Marshal(it) sq.logger.Print(string(szItem), err, "it.(SaveDBItem) error") if chEndEvent != nil { close(chEndEvent) } continue } err := sq.trySaveOneItem(ctx, dbItem) if err != nil { // log.Warnf("SaveDBItem trySaveOneItem error. %v", err) // dumpToFile(dbItem) // 存库失败写文件 szItem, err := json.Marshal(it) sq.logger.Print(string(szItem), err, "trySaveOneItem error") } if chEndEvent != nil { close(chEndEvent) } } } } // 保存成功返回nil, // 数据库关闭返回error // 其他错误写入日志文件 func (sq *DbSaveQueue) saveOneItem(it SaveDBItem) error { // fmt.Println("start save to db", it) defer monitor.MySqlExecTimeoutWarn(it.TableName(), it, time.Now()) defer func() { if r := recover(); r != nil { err := r.(error) strItem, _ := json.Marshal(it) log.Errorf("saveOneItem error. %v %v", err.Error(), string(strItem)) } }() has, err := it.QueryExist(sq.Engine) if err != nil { return err } if has { _, err := it.UpdateDB(sq.Engine) if err != nil { return err } } else { _, err := sq.Engine.Insert(it) if err != nil { return err } } return nil } // 实际的存储操作 // 当数据库关闭,保存失败后一直尝试; // 其他失败则写日志 // 当服务关闭时,添加到队列头,然后退出 func (sq *DbSaveQueue) trySaveOneItem(ctx context.Context, it SaveDBItem) error { isServerStop := func() bool { select { case <-ctx.Done(): return true default: } return false } var err error for i := 0; i < 3; i += 1 { err = sq.saveOneItem(it) if err == nil { return nil } if isServerStop() { return err } time.Sleep(time.Millisecond * 100 * time.Duration(i+1)) } return err } // 搜索匹配文件,删除字典顺序排名靠前的文件 // fmt.Sprintf("dumpTo%s.*.txt", name) func RemoveMatchedFiles(pattern string) error { // 使用filepath.Glob匹配文件 matches, err := filepath.Glob(pattern) if err != nil { return err } // 遍历匹配到的文件路径 var toUnlink []string var zeroFiles []string for _, path := range matches { // fi, err := os.Stat(path) // if err != nil { // continue // } fl, err := os.Lstat(path) if err != nil { continue } if fl.Mode()&os.ModeSymlink == os.ModeSymlink { continue } if fl.Size() == 0 { zeroFiles = append(zeroFiles, path) } else { toUnlink = append(toUnlink, path) } } sort.Strings(toUnlink) for _, path := range zeroFiles { os.Remove(path) } rotationCount := 7 if rotationCount >= len(toUnlink) { return nil } toUnlink = toUnlink[:len(toUnlink)-int(rotationCount)] // 尝试删除文件 for _, path := range toUnlink { os.Remove(path) } return nil }