123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362 |
- package dbserver
- import (
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "leafstalk/covenant/monitor"
- "leafstalk/log"
- slog "log"
- "os"
- "path/filepath"
- "sort"
- "sync"
- "time"
- "xorm.io/xorm"
- )
- type DbSaveQueue struct {
- *xorm.Engine
- name string
- delayQueue *SortQueue
- chSaveEvent chan struct{}
- wg sync.WaitGroup
- cancel context.CancelFunc
- logFile *os.File
- logger *slog.Logger
- }
- var saveQueues map[string]*DbSaveQueue
- func NewDbSaveQueue(name string, eng *xorm.Engine) (*DbSaveQueue, error) {
- if saveQueues == nil {
- saveQueues = make(map[string]*DbSaveQueue)
- }
- if _, ok := saveQueues[name]; ok {
- return nil, errors.New("duplicate name")
- }
- sq, err := newDbSaveQueue(name, eng)
- if err != nil {
- return nil, err
- }
- saveQueues[name] = sq
- return sq, nil
- }
- func newDbSaveQueue(name string, eng *xorm.Engine) (*DbSaveQueue, error) {
- sq := new(DbSaveQueue)
- sq.name = name
- sq.delayQueue = NewSortQueue(100)
- sq.Engine = eng
- // dump文件
- removeName := fmt.Sprintf("dumpToMysql%s.*.txt", name)
- if err := RemoveMatchedFiles(removeName); err != nil {
- return nil, err
- }
- ct := time.Now()
- fileName := fmt.Sprintf("dumpToMysql%s.%s.txt", name, ct.Format("20060102150405"))
- filePath := "./" + fileName
- logFile, err := os.Create(filePath)
- if err != nil {
- return nil, err
- }
- sq.logFile = logFile
- sq.logger = slog.New(logFile, "", slog.Ldate|slog.Ltime|slog.Lshortfile)
- // 开检测协程,保存延迟数据
- sq.chSaveEvent = make(chan struct{}, 1)
- ctx, cancel := context.WithCancel(context.Background())
- sq.cancel = cancel
- sq.wg.Add(1)
- go func() {
- defer sq.wg.Done()
- sq.checkTimeout(ctx, sq.chSaveEvent)
- }()
- return sq, nil
- }
- // request := new(model.Letter)
- //
- // request.Content = "1234"
- // letterSaveQueue.Set(request, 10, false)
- // letterSaveQueue.Set(request, 10, false)
- // dbserver.Stop()
- func Stop() {
- // 关闭存储协程
- for _, sq := range saveQueues {
- sq.cancel()
- sq.wg.Wait()
- // 未存储数据需要写本地文件
- sq.FlushAll()
- sq.logFile.Close()
- }
- saveQueues = nil
- }
- // 列表中的记录全部存库,存储失败则写文件
- func (sq *DbSaveQueue) FlushAll() error {
- for _, it := range sq.delayQueue.heap {
- // 存库
- it2, ok := it.value.(SaveDBItem)
- if !ok {
- szItem, err := json.Marshal(it.value)
- sq.logger.Print(string(szItem), err, "it.value.(SaveDBItem) error")
- if it.chEndEvent != nil {
- close(it.chEndEvent)
- }
- continue
- }
- err := sq.saveOneItem(it2)
- if err != nil {
- // 存库失败写文件
- szItem, err := json.Marshal(it2)
- sq.logger.Print(string(szItem), err)
- }
- if it.chEndEvent != nil {
- close(it.chEndEvent)
- }
- }
- return nil
- }
- // needEndEvent 为true时,会建立一个独有的 channel,如果创建太多通道的话,性能应该是受影响的
- // 所以needEndEvent最好是不要创建的太频繁
- func (sq *DbSaveQueue) Set(obj SaveDBItem, delay int, needEndEvent bool) chan struct{} {
- if delay < 0 {
- delay = 0
- }
- tick := time.Now().Unix() + int64(delay)
- ch := sq.delayQueue.Set(obj, tick, needEndEvent)
- select {
- case sq.chSaveEvent <- struct{}{}:
- default:
- }
- return ch
- }
- // 同步存库,存储后返回
- func (sq *DbSaveQueue) Save(obj SaveDBItem) error {
- ch := sq.Set(obj, 0, true)
- <-ch
- return nil
- }
- // 同步,确保历史记录存库,本条记录不存库,但用到了OBJ的key
- func (sq *DbSaveQueue) FlushItem(obj SaveDBItem) error {
- tick := time.Now().Unix()
- ch := sq.delayQueue.SetUtil(obj, tick, true)
- if ch == nil {
- return nil
- }
- select {
- case sq.chSaveEvent <- struct{}{}:
- default:
- }
- <-ch
- return nil
- }
- // 协程保存
- // 开多个协程进行保存工作
- // 每个协程只能处理对应种类的数据,不能两个协程保存同类型数据
- // 单个协程检查超时列表,取到数据后,执行存储操作
- // 数据保存失败时写日志
- func (sq *DbSaveQueue) checkTimeout(ctx context.Context, chSaveEvent chan struct{}) {
- ticker := time.NewTicker(time.Second)
- for {
- var ct time.Time
- select {
- case <-ctx.Done():
- ticker.Stop()
- // log.Warnf("checkTimeout exit.")
- return
- // case <-time.After(time.Millisecond * 50):
- case ct = <-ticker.C:
- // fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "ticker")
- case <-chSaveEvent:
- ct = time.Now()
- }
- // fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "process")
- cst := ct.Unix()
- for {
- // 是否到时间
- ci, saveTick := sq.delayQueue.First()
- if ci == nil || saveTick > cst {
- break
- }
- // 取第一个
- it, chEndEvent := sq.delayQueue.PopItem()
- if it == nil {
- continue
- }
- dbItem, ok := it.(SaveDBItem)
- if !ok {
- // 写错误日志
- szItem, err := json.Marshal(it)
- sq.logger.Print(string(szItem), err, "it.(SaveDBItem) error")
- if chEndEvent != nil {
- close(chEndEvent)
- }
- continue
- }
- err := sq.trySaveOneItem(ctx, dbItem)
- if err != nil {
- // log.Warnf("SaveDBItem trySaveOneItem error. %v", err)
- // dumpToFile(dbItem)
- // 存库失败写文件
- szItem, err := json.Marshal(it)
- sq.logger.Print(string(szItem), err, "trySaveOneItem error")
- }
- if chEndEvent != nil {
- close(chEndEvent)
- }
- }
- }
- }
- // 保存成功返回nil,
- // 数据库关闭返回error
- // 其他错误写入日志文件
- func (sq *DbSaveQueue) saveOneItem(it SaveDBItem) error {
- // fmt.Println("start save to db", it)
- defer monitor.MySqlExecTimeoutWarn(it.TableName(), it, time.Now())
- defer func() {
- if r := recover(); r != nil {
- err := r.(error)
- strItem, _ := json.Marshal(it)
- log.Errorf("saveOneItem error. %v %v", err.Error(), string(strItem))
- }
- }()
- has, err := it.QueryExist(sq.Engine)
- if err != nil {
- return err
- }
- if has {
- _, err := it.UpdateDB(sq.Engine)
- if err != nil {
- return err
- }
- } else {
- _, err := sq.Engine.Insert(it)
- if err != nil {
- return err
- }
- }
- return nil
- }
- // 实际的存储操作
- // 当数据库关闭,保存失败后一直尝试;
- // 其他失败则写日志
- // 当服务关闭时,添加到队列头,然后退出
- func (sq *DbSaveQueue) trySaveOneItem(ctx context.Context, it SaveDBItem) error {
- isServerStop := func() bool {
- select {
- case <-ctx.Done():
- return true
- default:
- }
- return false
- }
- var err error
- for i := 0; i < 3; i += 1 {
- err = sq.saveOneItem(it)
- if err == nil {
- return nil
- }
- if isServerStop() {
- return err
- }
- time.Sleep(time.Millisecond * 100 * time.Duration(i+1))
- }
- return err
- }
- // 搜索匹配文件,删除字典顺序排名靠前的文件
- // fmt.Sprintf("dumpTo%s.*.txt", name)
- func RemoveMatchedFiles(pattern string) error {
- // 使用filepath.Glob匹配文件
- matches, err := filepath.Glob(pattern)
- if err != nil {
- return err
- }
- // 遍历匹配到的文件路径
- var toUnlink []string
- var zeroFiles []string
- for _, path := range matches {
- // fi, err := os.Stat(path)
- // if err != nil {
- // continue
- // }
- fl, err := os.Lstat(path)
- if err != nil {
- continue
- }
- if fl.Mode()&os.ModeSymlink == os.ModeSymlink {
- continue
- }
- if fl.Size() == 0 {
- zeroFiles = append(zeroFiles, path)
- } else {
- toUnlink = append(toUnlink, path)
- }
- }
- sort.Strings(toUnlink)
- for _, path := range zeroFiles {
- os.Remove(path)
- }
- rotationCount := 7
- if rotationCount >= len(toUnlink) {
- return nil
- }
- toUnlink = toUnlink[:len(toUnlink)-int(rotationCount)]
- // 尝试删除文件
- for _, path := range toUnlink {
- os.Remove(path)
- }
- return nil
- }
|