123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273 |
- package redisserver
- import (
- "context"
- "encoding/json"
- "fmt"
- "leafstalk/log"
- slog "log"
- "os"
- "path/filepath"
- "sort"
- "sync"
- "time"
- )
- type SaveQueue struct {
- name string
- delayQueue *SortQueue
- chSaveEvent chan struct{}
- wg sync.WaitGroup
- cancel context.CancelFunc
- logFile *os.File
- logger *slog.Logger
- saveFunc func(it SaveItem) error
- }
- // , eng *xorm.Engine
- func NewSaveQueue(name string, saveFunc func(it SaveItem) error) *SaveQueue {
- sq := new(SaveQueue)
- sq.name = name
- sq.delayQueue = NewSortQueue(100)
- sq.saveFunc = saveFunc
- // 开检测协程,保存延迟数据
- sq.chSaveEvent = make(chan struct{}, 1)
- removeName := fmt.Sprintf("dumpToRedis%s.*.txt", name)
- if err := RemoveMatchedFiles(removeName); err != nil {
- log.Fatal("删除日志文件失败", err)
- }
- ct := time.Now()
- fileName := fmt.Sprintf("dumpToRedis%s.%s.txt", name, ct.Format("20060102150405"))
- filePath := "./" + fileName
- logFile, err := os.Create(filePath)
- if err != nil {
- log.Fatal("写日志文件失败", err)
- }
- sq.logFile = logFile
- sq.logger = slog.New(logFile, "", slog.Ldate|slog.Ltime|slog.Lshortfile)
- ctx, cancel := context.WithCancel(context.Background())
- sq.cancel = cancel
- sq.wg.Add(1)
- go func() {
- defer sq.wg.Done()
- sq.checkTimeout(ctx, sq.chSaveEvent)
- }()
- return sq
- }
- func (sq *SaveQueue) Stop() {
- if sq.cancel == nil {
- return
- }
- // 关闭存储协程
- sq.cancel()
- sq.wg.Wait()
- // 未存储数据需要写本地文件
- for _, it := range sq.delayQueue.heap {
- dbItem := it.value
- err := sq.saveFunc(dbItem)
- if err != nil {
- szItem, err := json.Marshal(dbItem)
- sq.logger.Print(string(szItem), err, "type error")
- }
- if it.chEndEvent != nil {
- close(it.chEndEvent)
- }
- }
- sq.cancel = nil
- sq.logFile.Close()
- }
- // needEndEvent 为true时,会建立一个独有的 channel,如果创建太多通道的话,性能应该是受影响的
- // 所以needEndEvent最好是不要创建的太频繁
- func (sq *SaveQueue) Set(obj SaveItem, delay int, needEndEvent bool) chan struct{} {
- if delay < 0 {
- delay = 0
- }
- tick := time.Now().Unix() + int64(delay)
- ch := sq.delayQueue.Set(obj, tick, needEndEvent)
- select {
- case sq.chSaveEvent <- struct{}{}:
- default:
- }
- return ch
- }
- // 同步存库,存储后返回
- func (sq *SaveQueue) Save(obj SaveItem) error {
- ch := sq.Set(obj, 0, true)
- <-ch
- return nil
- }
- // 同步,确保历史记录存库,本条记录不存库
- func (sq *SaveQueue) Flush(obj SaveItem) error {
- tick := time.Now().Unix()
- ch := sq.delayQueue.SetUtil(obj, tick, true)
- if ch == nil {
- return nil
- }
- select {
- case sq.chSaveEvent <- struct{}{}:
- default:
- }
- <-ch
- return nil
- }
- // 协程保存
- // 开多个协程进行保存工作
- // 每个协程只能处理对应种类的数据,不能两个协程保存同类型数据
- // 单个协程检查超时列表,取到数据后,执行存储操作
- // 数据保存失败时写日志
- func (sq *SaveQueue) checkTimeout(ctx context.Context, chSaveEvent chan struct{}) {
- ticker := time.NewTicker(time.Second)
- for {
- var ct time.Time
- select {
- case <-ctx.Done():
- ticker.Stop()
- log.Warnf("checkTimeout exit.")
- return
- // case <-time.After(time.Millisecond * 50):
- case ct = <-ticker.C:
- // fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "ticker")
- case <-chSaveEvent:
- ct = time.Now()
- }
- // fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "process")
- cst := ct.Unix()
- for {
- // 是否到时间
- ci, saveTick := sq.delayQueue.First()
- if ci == nil || saveTick > cst {
- break
- }
- // 取第一个
- dbItem, chEndEvent := sq.delayQueue.PopItem()
- if dbItem == nil {
- continue
- }
- err := sq.safeSaveOneItem(ctx, dbItem)
- if err != nil {
- szItem, err := json.Marshal(dbItem)
- sq.logger.Print(string(szItem), err, "type error")
- }
- if chEndEvent != nil {
- close(chEndEvent)
- }
- }
- }
- }
- // 实际的存储操作
- // 当数据库关闭,保存失败后一直尝试;
- // 其他失败则写日志
- // 当服务关闭时,添加到队列头,然后退出
- func (sq *SaveQueue) safeSaveOneItem(ctx context.Context, it SaveItem) error {
- isServerStop := func() bool {
- select {
- case <-ctx.Done():
- return true
- default:
- }
- return false
- }
- var err error
- for i := 0; i < 3; i += 1 {
- err = sq.saveFunc(it)
- if err == nil {
- return nil
- }
- if isServerStop() {
- return err
- }
- time.Sleep(time.Millisecond)
- }
- return err
- }
- func saveToRedis(it SaveItem) error {
- return nil
- }
- // 搜索匹配文件,删除字典顺序排名靠前的文件
- // fmt.Sprintf("dumpTo%s.*.txt", name)
- func RemoveMatchedFiles(pattern string) error {
- // 使用filepath.Glob匹配文件
- matches, err := filepath.Glob(pattern)
- if err != nil {
- return err
- }
- // 遍历匹配到的文件路径
- var toUnlink []string
- var zeroFiles []string
- for _, path := range matches {
- // fi, err := os.Stat(path)
- // if err != nil {
- // continue
- // }
- fl, err := os.Lstat(path)
- if err != nil {
- continue
- }
- if fl.Mode()&os.ModeSymlink == os.ModeSymlink {
- continue
- }
- if fl.Size() == 0 {
- zeroFiles = append(zeroFiles, path)
- } else {
- toUnlink = append(toUnlink, path)
- }
- }
- sort.Strings(toUnlink)
- for _, path := range zeroFiles {
- os.Remove(path)
- }
- rotationCount := 7
- if rotationCount >= len(toUnlink) {
- return nil
- }
- toUnlink = toUnlink[:len(toUnlink)-int(rotationCount)]
- // 尝试删除文件
- for _, path := range toUnlink {
- os.Remove(path)
- }
- return nil
- }
|