package redisserver import ( "context" "encoding/json" "fmt" "leafstalk/log" slog "log" "os" "path/filepath" "sort" "sync" "time" ) type SaveQueue struct { name string delayQueue *SortQueue chSaveEvent chan struct{} wg sync.WaitGroup cancel context.CancelFunc logFile *os.File logger *slog.Logger saveFunc func(it SaveItem) error } // , eng *xorm.Engine func NewSaveQueue(name string, saveFunc func(it SaveItem) error) *SaveQueue { sq := new(SaveQueue) sq.name = name sq.delayQueue = NewSortQueue(100) sq.saveFunc = saveFunc // 开检测协程,保存延迟数据 sq.chSaveEvent = make(chan struct{}, 1) removeName := fmt.Sprintf("dumpToRedis%s.*.txt", name) if err := RemoveMatchedFiles(removeName); err != nil { log.Fatal("删除日志文件失败", err) } ct := time.Now() fileName := fmt.Sprintf("dumpToRedis%s.%s.txt", name, ct.Format("20060102150405")) filePath := "./" + fileName logFile, err := os.Create(filePath) if err != nil { log.Fatal("写日志文件失败", err) } sq.logFile = logFile sq.logger = slog.New(logFile, "", slog.Ldate|slog.Ltime|slog.Lshortfile) ctx, cancel := context.WithCancel(context.Background()) sq.cancel = cancel sq.wg.Add(1) go func() { defer sq.wg.Done() sq.checkTimeout(ctx, sq.chSaveEvent) }() return sq } func (sq *SaveQueue) Stop() { if sq.cancel == nil { return } // 关闭存储协程 sq.cancel() sq.wg.Wait() // 未存储数据需要写本地文件 for _, it := range sq.delayQueue.heap { dbItem := it.value err := sq.saveFunc(dbItem) if err != nil { szItem, err := json.Marshal(dbItem) sq.logger.Print(string(szItem), err, "type error") } if it.chEndEvent != nil { close(it.chEndEvent) } } sq.cancel = nil sq.logFile.Close() } // needEndEvent 为true时,会建立一个独有的 channel,如果创建太多通道的话,性能应该是受影响的 // 所以needEndEvent最好是不要创建的太频繁 func (sq *SaveQueue) Set(obj SaveItem, delay int, needEndEvent bool) chan struct{} { if delay < 0 { delay = 0 } tick := time.Now().Unix() + int64(delay) ch := sq.delayQueue.Set(obj, tick, needEndEvent) select { case sq.chSaveEvent <- struct{}{}: default: } return ch } // 同步存库,存储后返回 func (sq *SaveQueue) Save(obj SaveItem) error { ch := sq.Set(obj, 0, true) <-ch return nil } // 同步,确保历史记录存库,本条记录不存库 func (sq *SaveQueue) Flush(obj SaveItem) error { tick := time.Now().Unix() ch := sq.delayQueue.SetUtil(obj, tick, true) if ch == nil { return nil } select { case sq.chSaveEvent <- struct{}{}: default: } <-ch return nil } // 协程保存 // 开多个协程进行保存工作 // 每个协程只能处理对应种类的数据,不能两个协程保存同类型数据 // 单个协程检查超时列表,取到数据后,执行存储操作 // 数据保存失败时写日志 func (sq *SaveQueue) checkTimeout(ctx context.Context, chSaveEvent chan struct{}) { ticker := time.NewTicker(time.Second) for { var ct time.Time select { case <-ctx.Done(): ticker.Stop() log.Warnf("checkTimeout exit.") return // case <-time.After(time.Millisecond * 50): case ct = <-ticker.C: // fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "ticker") case <-chSaveEvent: ct = time.Now() } // fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "process") cst := ct.Unix() for { // 是否到时间 ci, saveTick := sq.delayQueue.First() if ci == nil || saveTick > cst { break } // 取第一个 dbItem, chEndEvent := sq.delayQueue.PopItem() if dbItem == nil { continue } err := sq.safeSaveOneItem(ctx, dbItem) if err != nil { szItem, err := json.Marshal(dbItem) sq.logger.Print(string(szItem), err, "type error") } if chEndEvent != nil { close(chEndEvent) } } } } // 实际的存储操作 // 当数据库关闭,保存失败后一直尝试; // 其他失败则写日志 // 当服务关闭时,添加到队列头,然后退出 func (sq *SaveQueue) safeSaveOneItem(ctx context.Context, it SaveItem) error { isServerStop := func() bool { select { case <-ctx.Done(): return true default: } return false } var err error for i := 0; i < 3; i += 1 { err = sq.saveFunc(it) if err == nil { return nil } if isServerStop() { return err } time.Sleep(time.Millisecond) } return err } func saveToRedis(it SaveItem) error { return nil } // 搜索匹配文件,删除字典顺序排名靠前的文件 // fmt.Sprintf("dumpTo%s.*.txt", name) func RemoveMatchedFiles(pattern string) error { // 使用filepath.Glob匹配文件 matches, err := filepath.Glob(pattern) if err != nil { return err } // 遍历匹配到的文件路径 var toUnlink []string var zeroFiles []string for _, path := range matches { // fi, err := os.Stat(path) // if err != nil { // continue // } fl, err := os.Lstat(path) if err != nil { continue } if fl.Mode()&os.ModeSymlink == os.ModeSymlink { continue } if fl.Size() == 0 { zeroFiles = append(zeroFiles, path) } else { toUnlink = append(toUnlink, path) } } sort.Strings(toUnlink) for _, path := range zeroFiles { os.Remove(path) } rotationCount := 7 if rotationCount >= len(toUnlink) { return nil } toUnlink = toUnlink[:len(toUnlink)-int(rotationCount)] // 尝试删除文件 for _, path := range toUnlink { os.Remove(path) } return nil }