package redisserver import ( "context" "encoding/json" "fmt" "leafstalk/log" slog "log" "os" "path/filepath" "sort" "sync" "time" ) type SaveQueue struct { name string delayQueue *SortQueue chSaveEvent chan struct{} wg sync.WaitGroup cancel context.CancelFunc logFile *os.File // jsonEncoder *json.Encoder logger *slog.Logger saveFunc func(it SaveItem) error } // var saveQueues map[string]*DbSaveQueue // func NewDbSaveQueue(name string, eng *xorm.Engine) *DbSaveQueue { // if saveQueues == nil { // saveQueues = make(map[string]*DbSaveQueue) // } // if v, ok := saveQueues[name]; ok { // return v // } // sq := newDbSaveQueue(name, eng) // saveQueues[name] = sq // return sq // } // 搜索匹配文件,删除字典顺序排名靠前的文件 // fmt.Sprintf("dumpTo%s.*.txt", name) func RemoveMatchedFiles(pattern string) error { // 使用filepath.Glob匹配文件 matches, err := filepath.Glob(pattern) if err != nil { return err } // 遍历匹配到的文件路径 var toUnlink []string var zeroFiles []string for _, path := range matches { // fi, err := os.Stat(path) // if err != nil { // continue // } fl, err := os.Lstat(path) if err != nil { continue } if fl.Mode()&os.ModeSymlink == os.ModeSymlink { continue } if fl.Size() == 0 { zeroFiles = append(zeroFiles, path) } else { toUnlink = append(toUnlink, path) } } sort.Strings(toUnlink) for _, path := range zeroFiles { os.Remove(path) } rotationCount := 7 if rotationCount >= len(toUnlink) { return nil } toUnlink = toUnlink[:len(toUnlink)-int(rotationCount)] // 尝试删除文件 for _, path := range toUnlink { os.Remove(path) } return nil } // , eng *xorm.Engine func NewSaveQueue(name string, saveFunc func(it SaveItem) error) *SaveQueue { sq := new(SaveQueue) sq.name = name sq.delayQueue = NewSortQueue(100) sq.saveFunc = saveFunc // 开检测协程,保存延迟数据 sq.chSaveEvent = make(chan struct{}, 1) removeName := fmt.Sprintf("dumpToRedis%s.*.txt", name) if err := RemoveMatchedFiles(removeName); err != nil { log.Fatal("删除日志文件失败", err) } ct := time.Now() fileName := fmt.Sprintf("dumpToRedis%s.%s.txt", name, ct.Format("20060102150405")) filePath := "./" + fileName logFile, err := os.Create(filePath) if err != nil { log.Fatal("写日志文件失败", err) } sq.logFile = logFile sq.logger = slog.New(logFile, "", slog.Ldate|slog.Ltime|slog.Lshortfile) ctx, cancel := context.WithCancel(context.Background()) sq.cancel = cancel sq.wg.Add(1) go func() { defer sq.wg.Done() sq.checkTimeout(ctx, sq.chSaveEvent) }() return sq } func (sq *SaveQueue) Stop() { if sq.cancel == nil { return } // 关闭存储协程 sq.cancel() sq.wg.Wait() // 未存储数据需要写本地文件 for _, it := range sq.delayQueue.heap { dbItem, ok := it.value.(SaveItem) if !ok { // 写错误日志 szItem, err := json.Marshal(it) sq.logger.Print(string(szItem), err, "type error") if it.chEndEvent != nil { close(it.chEndEvent) } continue } err := sq.saveFunc(dbItem) if err != nil { szItem, err := json.Marshal(it) sq.logger.Print(string(szItem), err, "type error") } if it.chEndEvent != nil { close(it.chEndEvent) } } sq.cancel = nil sq.logFile.Close() } // needEndEvent 为true时,会建立一个独有的 channel,如果创建太多通道的话,性能应该是受影响的 // 所以needEndEvent最好是不要创建的太频繁 func (sq *SaveQueue) Set(obj SaveItem, delay int, needEndEvent bool) chan struct{} { if delay < 0 { delay = 0 } tick := time.Now().Unix() + int64(delay) ch := sq.delayQueue.Set(obj, tick, needEndEvent) select { case sq.chSaveEvent <- struct{}{}: default: } return ch } // 同步存库,存储后返回 func (sq *SaveQueue) Save(obj SaveItem) error { ch := sq.Set(obj, 0, true) <-ch return nil } // 同步,确保历史记录存库,本条记录不存库 func (sq *SaveQueue) Flush(obj SaveItem) error { tick := time.Now().Unix() ch := sq.delayQueue.SetUtil(obj, tick, true) if ch == nil { return nil } select { case sq.chSaveEvent <- struct{}{}: default: } <-ch return nil } // 协程保存 // 开多个协程进行保存工作 // 每个协程只能处理对应种类的数据,不能两个协程保存同类型数据 // 单个协程检查超时列表,取到数据后,执行存储操作 // 数据保存失败时写日志 func (sq *SaveQueue) checkTimeout(ctx context.Context, chSaveEvent chan struct{}) { ticker := time.NewTicker(time.Second) for { var ct time.Time select { case <-ctx.Done(): ticker.Stop() log.Warnf("checkTimeout exit.") return // case <-time.After(time.Millisecond * 50): case ct = <-ticker.C: // fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "ticker") case <-chSaveEvent: ct = time.Now() } // fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "process") cst := ct.Unix() for { // 是否到时间 ci, saveTick := sq.delayQueue.First() if ci == nil || saveTick > cst { break } // 取第一个 it, chEndEvent := sq.delayQueue.PopItem() if it == nil { continue } dbItem, ok := it.(SaveItem) if !ok { // 写错误日志 szItem, err := json.Marshal(it) sq.logger.Print(string(szItem), err, "type error") if chEndEvent != nil { close(chEndEvent) } continue } err := sq.safeSaveOneItem(ctx, dbItem) if err != nil { szItem, err := json.Marshal(it) sq.logger.Print(string(szItem), err, "type error") } if chEndEvent != nil { close(chEndEvent) } } } } // 实际的存储操作 // 当数据库关闭,保存失败后一直尝试; // 其他失败则写日志 // 当服务关闭时,添加到队列头,然后退出 func (sq *SaveQueue) safeSaveOneItem(ctx context.Context, it SaveItem) error { isServerStop := func() bool { select { case <-ctx.Done(): return true default: } return false } var err error for i := 0; i < 3; i += 1 { err = sq.saveFunc(it) if err == nil { return nil } if isServerStop() { return err } time.Sleep(time.Millisecond) } return err } // func (sq *SaveQueue) dumpToFile(si SaveItem, reason string) { // if si != nil { // contents := make(map[string]interface{}) // contents["item"] = si.TableName() + si.GetUniqueKey() // contents["reason"] = reason // contents["data"] = si // err := sq.jsonEncoder.Encode(contents) // if err != nil { // sj, err := json.Marshal(contents) // fmt.Println(string(sj), err) // } // return // } // for _, it := range sq.delayQueue.heap { // contents := make(map[string]interface{}) // contents["item"] = it.value.TableName() + it.value.GetUniqueKey() // contents["reason"] = reason // contents["data"] = it // err := sq.jsonEncoder.Encode(contents) // if err != nil { // sj, err := json.Marshal(contents) // fmt.Println(string(sj), err) // } // if it.chEndEvent != nil { // close(it.chEndEvent) // } // } // sq.delayQueue.Clear() // } // 保存成功返回nil, // 数据库关闭返回error // 其他错误写入日志文件 func saveToDb(it SaveItem) error { // fmt.Println("start save to db", it) /*defer monitor.MySqlExecTimeoutWarn(it.TableName(), it, time.Now()) defer func() { if r := recover(); r != nil { err := r.(error) strItem, _ := json.Marshal(it) log.Errorf("saveOneItem error. %v %v", err.Error(), string(strItem)) } }() has, err := it.QueryExist(sq.Engine) if err != nil { return err } if has { _, err := it.UpdateDB(sq.Engine) if err != nil { return err } } else { _, err := sq.Engine.Insert(it) if err != nil { return err } }*/ return nil } func saveToRedis(it SaveItem) error { return nil } // func // 需要保存的数据记录写入日志中 // func dumpToFile(si SaveItem) { // // log.Infoln("dumpToFile start ...") // // ct := time.Now() // // if si != nil { // // tab := si.TableName() // id := si.GetUniqueKey() // fileName := fmt.Sprintf("dumpToFile.%v%v.%s.txt", tab, id, ct.Format("20060102150405")) // filePath := "./" + fileName // itemFile, err := os.Create(filePath) // if err != nil { // log.Errorln("写日志文件失败", err) // } // // defer itemFile.Close() // // encoder := json.NewEncoder(itemFile) // err = encoder.Encode(si) // if err != nil { // sj, err2 := json.Marshal(si) // log.Infoln("encoder.Encode failed", err.Error()) // } // // return // } // // fileName := fmt.Sprintf("dumpToFile.%s.txt", ct.Format("20060102150405")) // filePath := "./" + fileName // itemFile, err := os.Create(filePath) // if err != nil { // log.Errorln("写日志文件失败", err) // } // defer itemFile.Close() // encoder := json.NewEncoder(itemFile) // // 保存所有未存储数据 // // for _, sq := range saveQueues { // // for _, it := range sq.delayQueue.heap { // // // write file // err = encoder.Encode(it.value) // if err != nil { // sj, _ := json.Marshal(it.value) // log.Infoln("encoder.Encode failed", err.Error(), string(sj)) // } // // if it.chEndEvent != nil { // close(it.chEndEvent) // } // } // } // } // 尽可能保存所有项目, // 不能保存到数据库时,写文件 // func flushQueue() { // for _, sq := range saveQueues { // for { // it, chEndEvent := sq.delayQueue.PopItem() // if it == nil { // break // } // dbItem, ok := it.(SaveDBItem) // if !ok { // continue // } // if chEndEvent != nil { // close(chEndEvent) // } // err := sq.saveOneItem(dbItem) // if err != nil { // dumpToFile(dbItem) // dumpToFile(nil) // return // } // } // } // }