123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- package metrics
- import (
- "context"
- "runtime"
- "time"
- )
- var metricsReporters []Reporter
- // AddMetricsReporter to be used
- func AddMetricsReporter(mr Reporter) {
- metricsReporters = append(metricsReporters, mr)
- }
- // GetMetricsReporters gets registered metrics reporters
- func GetMetricsReporters() []Reporter {
- return metricsReporters
- }
- // func GetMetricsReporter() metrics.Reporter {
- // if len(app.metricsReporters) > 0 {
- // return app.metricsReporters[0]
- // }
- // return nil
- // }
- // 上报系统运行信息
- func PeriodicSysMetrics() {
- period := time.Second * 15 //.config.GetDuration("pitaya.metrics.periodicMetrics.period")
- go ReportSysMetrics(metricsReporters, period)
- // if app.worker.Started() {
- // go worker.Report(app.metricsReporters, period)
- // }
- }
- // ReportTimingFromCtx reports the latency from the context
- func ReportTimingFromCtx(ctx context.Context, reporters []Reporter, typ string, err error) {
- if ctx == nil {
- return
- }
- code := CodeFromError(err)
- status := "ok"
- if err != nil {
- status = "failed"
- }
- if len(reporters) > 0 {
- startTime := GetFromPropagateCtx(ctx, StartTimeKey)
- route := GetFromPropagateCtx(ctx, RouteKey)
- elapsed := time.Since(time.Unix(0, startTime.(int64)))
- tags := getTags(ctx, map[string]string{
- "route": route.(string),
- "status": status,
- "type": typ,
- "code": code,
- })
- for _, r := range reporters {
- r.ReportSummary(ResponseTime, tags, float64(elapsed.Nanoseconds()))
- }
- }
- }
- // ReportMessageProcessDelayFromCtx reports the delay to process the messages
- // �ϱ����յ���Ϣ����ʼ����֮��ļ��
- func ReportMessageProcessDelayFromCtx(ctx context.Context, reporters []Reporter, typ string) {
- if len(reporters) > 0 {
- startTime := GetFromPropagateCtx(ctx, StartTimeKey)
- elapsed := time.Since(time.Unix(0, startTime.(int64)))
- route := GetFromPropagateCtx(ctx, RouteKey)
- tags := getTags(ctx, map[string]string{
- "route": route.(string),
- "type": typ,
- })
- for _, r := range reporters {
- r.ReportSummary(ProcessDelay, tags, float64(elapsed.Nanoseconds()))
- }
- }
- }
- // ReportNumberOfConnectedClients reports the number of connected clients
- func ReportNumberOfConnectedClients(reporters []Reporter, number int64) {
- for _, r := range reporters {
- r.ReportGauge(ConnectedClients, map[string]string{}, float64(number))
- }
- }
- // ReportSysMetrics reports sys metrics
- func ReportSysMetrics(reporters []Reporter, period time.Duration) {
- tick := time.NewTicker(period)
- for {
- select {
- case <-tick.C:
- for _, r := range reporters {
- num := runtime.NumGoroutine()
- m := &runtime.MemStats{}
- runtime.ReadMemStats(m)
- r.ReportGauge(Goroutines, map[string]string{}, float64(num))
- r.ReportGauge(HeapSize, map[string]string{}, float64(m.Alloc))
- r.ReportGauge(HeapObjects, map[string]string{}, float64(m.HeapObjects))
- }
- }
- }
- }
- // ReportExceededRateLimiting reports the number of requests made
- // after exceeded rate limiting in a connection
- func ReportExceededRateLimiting(reporters []Reporter) {
- for _, r := range reporters {
- r.ReportCount(ExceededRateLimiting, map[string]string{}, 1)
- }
- }
- func tagsFromContext(ctx context.Context) map[string]string {
- val := GetFromPropagateCtx(ctx, MetricTagsKey)
- if val == nil {
- return map[string]string{}
- }
- tags, ok := val.(map[string]string)
- if !ok {
- return map[string]string{}
- }
- return tags
- }
- func getTags(ctx context.Context, tags map[string]string) map[string]string {
- for k, v := range tagsFromContext(ctx) {
- tags[k] = v
- }
- return tags
- }
- // 上报消息计数
- func ReportMessageHandleCount(msg string) {
- for _, r := range metricsReporters {
- r.ReportCount(MessageHandler, map[string]string{"handler": msg}, float64(1))
- }
- }
- // 上报通道容量 已使用容量
- func ReportChannelCapacity(channel string, cap int) {
- for _, r := range metricsReporters {
- r.ReportGauge(ChannelCapacity, map[string]string{"channel": channel}, float64(cap))
- }
- }
- // 上报已连接客户端数量
- func ReportConnectedClient(num int) {
- for _, r := range metricsReporters {
- r.ReportGauge(ConnectedClients, map[string]string{}, float64(num))
- }
- }
- // 上报连接服务器数量
- func ReportConnectedServer(typ string, num int) {
- for _, r := range metricsReporters {
- r.ReportGauge(CountServers, map[string]string{"type": typ}, float64(num))
- }
- }
- // ReportMessageResponseTime 上报消息处理耗时
- func ReportMessageResponseTime(label string, value int64) {
- for _, r := range metricsReporters {
- r.ReportHistogram(MessageResponseTime, label, float64(value))
- }
- }
|