report.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package metrics
  2. import (
  3. "context"
  4. "runtime"
  5. "time"
  6. )
  7. var metricsReporters []Reporter
  8. // AddMetricsReporter to be used
  9. func AddMetricsReporter(mr Reporter) {
  10. metricsReporters = append(metricsReporters, mr)
  11. }
  12. // GetMetricsReporters gets registered metrics reporters
  13. func GetMetricsReporters() []Reporter {
  14. return metricsReporters
  15. }
  16. // func GetMetricsReporter() metrics.Reporter {
  17. // if len(app.metricsReporters) > 0 {
  18. // return app.metricsReporters[0]
  19. // }
  20. // return nil
  21. // }
  22. // 上报系统运行信息
  23. func PeriodicSysMetrics() {
  24. period := time.Second * 15 //.config.GetDuration("pitaya.metrics.periodicMetrics.period")
  25. go ReportSysMetrics(metricsReporters, period)
  26. // if app.worker.Started() {
  27. // go worker.Report(app.metricsReporters, period)
  28. // }
  29. }
  30. // ReportTimingFromCtx reports the latency from the context
  31. func ReportTimingFromCtx(ctx context.Context, reporters []Reporter, typ string, err error) {
  32. if ctx == nil {
  33. return
  34. }
  35. code := CodeFromError(err)
  36. status := "ok"
  37. if err != nil {
  38. status = "failed"
  39. }
  40. if len(reporters) > 0 {
  41. startTime := GetFromPropagateCtx(ctx, StartTimeKey)
  42. route := GetFromPropagateCtx(ctx, RouteKey)
  43. elapsed := time.Since(time.Unix(0, startTime.(int64)))
  44. tags := getTags(ctx, map[string]string{
  45. "route": route.(string),
  46. "status": status,
  47. "type": typ,
  48. "code": code,
  49. })
  50. for _, r := range reporters {
  51. r.ReportSummary(ResponseTime, tags, float64(elapsed.Nanoseconds()))
  52. }
  53. }
  54. }
  55. // ReportMessageProcessDelayFromCtx reports the delay to process the messages
  56. // �ϱ����յ���Ϣ����ʼ����֮��ļ��
  57. func ReportMessageProcessDelayFromCtx(ctx context.Context, reporters []Reporter, typ string) {
  58. if len(reporters) > 0 {
  59. startTime := GetFromPropagateCtx(ctx, StartTimeKey)
  60. elapsed := time.Since(time.Unix(0, startTime.(int64)))
  61. route := GetFromPropagateCtx(ctx, RouteKey)
  62. tags := getTags(ctx, map[string]string{
  63. "route": route.(string),
  64. "type": typ,
  65. })
  66. for _, r := range reporters {
  67. r.ReportSummary(ProcessDelay, tags, float64(elapsed.Nanoseconds()))
  68. }
  69. }
  70. }
  71. // ReportNumberOfConnectedClients reports the number of connected clients
  72. func ReportNumberOfConnectedClients(reporters []Reporter, number int64) {
  73. for _, r := range reporters {
  74. r.ReportGauge(ConnectedClients, map[string]string{}, float64(number))
  75. }
  76. }
  77. // ReportSysMetrics reports sys metrics
  78. func ReportSysMetrics(reporters []Reporter, period time.Duration) {
  79. tick := time.NewTicker(period)
  80. for {
  81. select {
  82. case <-tick.C:
  83. for _, r := range reporters {
  84. num := runtime.NumGoroutine()
  85. m := &runtime.MemStats{}
  86. runtime.ReadMemStats(m)
  87. r.ReportGauge(Goroutines, map[string]string{}, float64(num))
  88. r.ReportGauge(HeapSize, map[string]string{}, float64(m.Alloc))
  89. r.ReportGauge(HeapObjects, map[string]string{}, float64(m.HeapObjects))
  90. }
  91. }
  92. }
  93. }
  94. // ReportExceededRateLimiting reports the number of requests made
  95. // after exceeded rate limiting in a connection
  96. func ReportExceededRateLimiting(reporters []Reporter) {
  97. for _, r := range reporters {
  98. r.ReportCount(ExceededRateLimiting, map[string]string{}, 1)
  99. }
  100. }
  101. func tagsFromContext(ctx context.Context) map[string]string {
  102. val := GetFromPropagateCtx(ctx, MetricTagsKey)
  103. if val == nil {
  104. return map[string]string{}
  105. }
  106. tags, ok := val.(map[string]string)
  107. if !ok {
  108. return map[string]string{}
  109. }
  110. return tags
  111. }
  112. func getTags(ctx context.Context, tags map[string]string) map[string]string {
  113. for k, v := range tagsFromContext(ctx) {
  114. tags[k] = v
  115. }
  116. return tags
  117. }
  118. // 上报消息计数
  119. func ReportMessageHandleCount(msg string) {
  120. for _, r := range metricsReporters {
  121. r.ReportCount(MessageHandler, map[string]string{"handler": msg}, float64(1))
  122. }
  123. }
  124. // 上报通道容量 已使用容量
  125. func ReportChannelCapacity(channel string, cap int) {
  126. for _, r := range metricsReporters {
  127. r.ReportGauge(ChannelCapacity, map[string]string{"channel": channel}, float64(cap))
  128. }
  129. }
  130. // 上报已连接客户端数量
  131. func ReportConnectedClient(num int) {
  132. for _, r := range metricsReporters {
  133. r.ReportGauge(ConnectedClients, map[string]string{}, float64(num))
  134. }
  135. }
  136. // 上报连接服务器数量
  137. func ReportConnectedServer(typ string, num int) {
  138. for _, r := range metricsReporters {
  139. r.ReportGauge(CountServers, map[string]string{"type": typ}, float64(num))
  140. }
  141. }
  142. // ReportMessageResponseTime 上报消息处理耗时
  143. func ReportMessageResponseTime(label string, value int64) {
  144. for _, r := range metricsReporters {
  145. r.ReportHistogram(MessageResponseTime, label, float64(value))
  146. }
  147. }