|
- package redisserver
- import (
- "context"
- "encoding/json"
- "fmt"
- "leafstalk/log"
- slog "log"
- "os"
- "path/filepath"
- "sort"
- "sync"
- "time"
- )
- type SaveQueue struct {
- name string
- delayQueue *SortQueue
- chSaveEvent chan struct{}
- wg sync.WaitGroup
- cancel context.CancelFunc
- logFile *os.File
- // jsonEncoder *json.Encoder
- logger *slog.Logger
- saveFunc func(it SaveItem) error
- }
- // var saveQueues map[string]*DbSaveQueue
- // func NewDbSaveQueue(name string, eng *xorm.Engine) *DbSaveQueue {
- // if saveQueues == nil {
- // saveQueues = make(map[string]*DbSaveQueue)
- // }
- // if v, ok := saveQueues[name]; ok {
- // return v
- // }
- // sq := newDbSaveQueue(name, eng)
- // saveQueues[name] = sq
- // return sq
- // }
- // 搜索匹配文件,删除字典顺序排名靠前的文件
- // fmt.Sprintf("dumpTo%s.*.txt", name)
- func RemoveMatchedFiles(pattern string) error {
- // 使用filepath.Glob匹配文件
- matches, err := filepath.Glob(pattern)
- if err != nil {
- return err
- }
- // 遍历匹配到的文件路径
- var toUnlink []string
- var zeroFiles []string
- for _, path := range matches {
- // fi, err := os.Stat(path)
- // if err != nil {
- // continue
- // }
- fl, err := os.Lstat(path)
- if err != nil {
- continue
- }
- if fl.Mode()&os.ModeSymlink == os.ModeSymlink {
- continue
- }
- if fl.Size() == 0 {
- zeroFiles = append(zeroFiles, path)
- } else {
- toUnlink = append(toUnlink, path)
- }
- }
- sort.Strings(toUnlink)
- for _, path := range zeroFiles {
- os.Remove(path)
- }
- rotationCount := 7
- if rotationCount >= len(toUnlink) {
- return nil
- }
- toUnlink = toUnlink[:len(toUnlink)-int(rotationCount)]
- // 尝试删除文件
- for _, path := range toUnlink {
- os.Remove(path)
- }
- return nil
- }
- // , eng *xorm.Engine
- func NewSaveQueue(name string, saveFunc func(it SaveItem) error) *SaveQueue {
- sq := new(SaveQueue)
- sq.name = name
- sq.delayQueue = NewSortQueue(100)
- sq.saveFunc = saveFunc
- // 开检测协程,保存延迟数据
- sq.chSaveEvent = make(chan struct{}, 1)
- removeName := fmt.Sprintf("dumpToRedis%s.*.txt", name)
- if err := RemoveMatchedFiles(removeName); err != nil {
- log.Fatal("删除日志文件失败", err)
- }
- ct := time.Now()
- fileName := fmt.Sprintf("dumpToRedis%s.%s.txt", name, ct.Format("20060102150405"))
- filePath := "./" + fileName
- logFile, err := os.Create(filePath)
- if err != nil {
- log.Fatal("写日志文件失败", err)
- }
- sq.logFile = logFile
- sq.logger = slog.New(logFile, "", slog.Ldate|slog.Ltime|slog.Lshortfile)
- ctx, cancel := context.WithCancel(context.Background())
- sq.cancel = cancel
- sq.wg.Add(1)
- go func() {
- defer sq.wg.Done()
- sq.checkTimeout(ctx, sq.chSaveEvent)
- }()
- return sq
- }
- func (sq *SaveQueue) Stop() {
- if sq.cancel == nil {
- return
- }
- // 关闭存储协程
- sq.cancel()
- sq.wg.Wait()
- // 未存储数据需要写本地文件
- for _, it := range sq.delayQueue.heap {
- dbItem, ok := it.value.(SaveItem)
- if !ok {
- // 写错误日志
- szItem, err := json.Marshal(it)
- sq.logger.Print(string(szItem), err, "type error")
- if it.chEndEvent != nil {
- close(it.chEndEvent)
- }
- continue
- }
- err := sq.saveFunc(dbItem)
- if err != nil {
- szItem, err := json.Marshal(it)
- sq.logger.Print(string(szItem), err, "type error")
- }
- if it.chEndEvent != nil {
- close(it.chEndEvent)
- }
- }
- sq.cancel = nil
- sq.logFile.Close()
- }
- // needEndEvent 为true时,会建立一个独有的 channel,如果创建太多通道的话,性能应该是受影响的
- // 所以needEndEvent最好是不要创建的太频繁
- func (sq *SaveQueue) Set(obj SaveItem, delay int, needEndEvent bool) chan struct{} {
- if delay < 0 {
- delay = 0
- }
- tick := time.Now().Unix() + int64(delay)
- ch := sq.delayQueue.Set(obj, tick, needEndEvent)
- select {
- case sq.chSaveEvent <- struct{}{}:
- default:
- }
- return ch
- }
- // 同步存库,存储后返回
- func (sq *SaveQueue) Save(obj SaveItem) error {
- ch := sq.Set(obj, 0, true)
- <-ch
- return nil
- }
- // 同步,确保历史记录存库,本条记录不存库
- func (sq *SaveQueue) Flush(obj SaveItem) error {
- tick := time.Now().Unix()
- ch := sq.delayQueue.SetUtil(obj, tick, true)
- if ch == nil {
- return nil
- }
- select {
- case sq.chSaveEvent <- struct{}{}:
- default:
- }
- <-ch
- return nil
- }
- // 协程保存
- // 开多个协程进行保存工作
- // 每个协程只能处理对应种类的数据,不能两个协程保存同类型数据
- // 单个协程检查超时列表,取到数据后,执行存储操作
- // 数据保存失败时写日志
- func (sq *SaveQueue) checkTimeout(ctx context.Context, chSaveEvent chan struct{}) {
- ticker := time.NewTicker(time.Second)
- for {
- var ct time.Time
- select {
- case <-ctx.Done():
- ticker.Stop()
- log.Warnf("checkTimeout exit.")
- return
- // case <-time.After(time.Millisecond * 50):
- case ct = <-ticker.C:
- // fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "ticker")
- case <-chSaveEvent:
- ct = time.Now()
- }
- // fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "process")
- cst := ct.Unix()
- for {
- // 是否到时间
- ci, saveTick := sq.delayQueue.First()
- if ci == nil || saveTick > cst {
- break
- }
- // 取第一个
- it, chEndEvent := sq.delayQueue.PopItem()
- if it == nil {
- continue
- }
- dbItem, ok := it.(SaveItem)
- if !ok {
- // 写错误日志
- szItem, err := json.Marshal(it)
- sq.logger.Print(string(szItem), err, "type error")
- if chEndEvent != nil {
- close(chEndEvent)
- }
- continue
- }
- err := sq.safeSaveOneItem(ctx, dbItem)
- if err != nil {
- szItem, err := json.Marshal(it)
- sq.logger.Print(string(szItem), err, "type error")
- }
- if chEndEvent != nil {
- close(chEndEvent)
- }
- }
- }
- }
- // 实际的存储操作
- // 当数据库关闭,保存失败后一直尝试;
- // 其他失败则写日志
- // 当服务关闭时,添加到队列头,然后退出
- func (sq *SaveQueue) safeSaveOneItem(ctx context.Context, it SaveItem) error {
- isServerStop := func() bool {
- select {
- case <-ctx.Done():
- return true
- default:
- }
- return false
- }
- var err error
- for i := 0; i < 3; i += 1 {
- err = sq.saveFunc(it)
- if err == nil {
- return nil
- }
- if isServerStop() {
- return err
- }
- time.Sleep(time.Millisecond)
- }
- return err
- }
- // func (sq *SaveQueue) dumpToFile(si SaveItem, reason string) {
- // if si != nil {
- // contents := make(map[string]interface{})
- // contents["item"] = si.TableName() + si.GetUniqueKey()
- // contents["reason"] = reason
- // contents["data"] = si
- // err := sq.jsonEncoder.Encode(contents)
- // if err != nil {
- // sj, err := json.Marshal(contents)
- // fmt.Println(string(sj), err)
- // }
- // return
- // }
- // for _, it := range sq.delayQueue.heap {
- // contents := make(map[string]interface{})
- // contents["item"] = it.value.TableName() + it.value.GetUniqueKey()
- // contents["reason"] = reason
- // contents["data"] = it
- // err := sq.jsonEncoder.Encode(contents)
- // if err != nil {
- // sj, err := json.Marshal(contents)
- // fmt.Println(string(sj), err)
- // }
- // if it.chEndEvent != nil {
- // close(it.chEndEvent)
- // }
- // }
- // sq.delayQueue.Clear()
- // }
- // 保存成功返回nil,
- // 数据库关闭返回error
- // 其他错误写入日志文件
- func saveToDb(it SaveItem) error {
- // fmt.Println("start save to db", it)
- /*defer monitor.MySqlExecTimeoutWarn(it.TableName(), it, time.Now())
- defer func() {
- if r := recover(); r != nil {
- err := r.(error)
- strItem, _ := json.Marshal(it)
- log.Errorf("saveOneItem error. %v %v", err.Error(), string(strItem))
- }
- }()
- has, err := it.QueryExist(sq.Engine)
- if err != nil {
- return err
- }
- if has {
- _, err := it.UpdateDB(sq.Engine)
- if err != nil {
- return err
- }
- } else {
- _, err := sq.Engine.Insert(it)
- if err != nil {
- return err
- }
- }*/
- return nil
- }
- func saveToRedis(it SaveItem) error {
- return nil
- }
- // func
- // 需要保存的数据记录写入日志中
- // func dumpToFile(si SaveItem) {
- //
- // log.Infoln("dumpToFile start ...")
- //
- // ct := time.Now()
- //
- // if si != nil {
- //
- // tab := si.TableName()
- // id := si.GetUniqueKey()
- // fileName := fmt.Sprintf("dumpToFile.%v%v.%s.txt", tab, id, ct.Format("20060102150405"))
- // filePath := "./" + fileName
- // itemFile, err := os.Create(filePath)
- // if err != nil {
- // log.Errorln("写日志文件失败", err)
- // }
- //
- // defer itemFile.Close()
- //
- // encoder := json.NewEncoder(itemFile)
- // err = encoder.Encode(si)
- // if err != nil {
- // sj, err2 := json.Marshal(si)
- // log.Infoln("encoder.Encode failed", err.Error())
- // }
- //
- // return
- // }
- //
- // fileName := fmt.Sprintf("dumpToFile.%s.txt", ct.Format("20060102150405"))
- // filePath := "./" + fileName
- // itemFile, err := os.Create(filePath)
- // if err != nil {
- // log.Errorln("写日志文件失败", err)
- // }
- // defer itemFile.Close()
- // encoder := json.NewEncoder(itemFile)
- //
- // 保存所有未存储数据
- // // for _, sq := range saveQueues {
- // // for _, it := range sq.delayQueue.heap {
- // //
- // write file
- // err = encoder.Encode(it.value)
- // if err != nil {
- // sj, _ := json.Marshal(it.value)
- // log.Infoln("encoder.Encode failed", err.Error(), string(sj))
- // }
- //
- // if it.chEndEvent != nil {
- // close(it.chEndEvent)
- // }
- // }
- // }
- // }
- // 尽可能保存所有项目,
- // 不能保存到数据库时,写文件
- // func flushQueue() {
- // for _, sq := range saveQueues {
- // for {
- // it, chEndEvent := sq.delayQueue.PopItem()
- // if it == nil {
- // break
- // }
- // dbItem, ok := it.(SaveDBItem)
- // if !ok {
- // continue
- // }
- // if chEndEvent != nil {
- // close(chEndEvent)
- // }
- // err := sq.saveOneItem(dbItem)
- // if err != nil {
- // dumpToFile(dbItem)
- // dumpToFile(nil)
- // return
- // }
- // }
- // }
- // }
|