Commit 591d8aaa authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Refactor metrics registration and reporting.

parent 3246f380
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -130,7 +130,7 @@ func main() {

	// Start up server components.
	cookie := newOrLoadCookie(config)
	metrics := server.NewMetrics(logger, startupLogger, db, config)
	metrics := server.NewLocalMetrics(logger, startupLogger, db, config)
	sessionRegistry := server.NewLocalSessionRegistry(metrics)
	sessionCache := server.NewLocalSessionCache(config)
	statusRegistry := server.NewStatusRegistry(logger, config, sessionRegistry, jsonpbMarshaler)
@@ -217,13 +217,13 @@ func main() {
	// Gracefully stop remaining server components.
	apiServer.Stop()
	consoleServer.Stop()
	metrics.Stop(logger)
	matchmaker.Stop()
	leaderboardScheduler.Stop()
	tracker.Stop()
	statusRegistry.Stop()
	sessionCache.Stop()
	sessionRegistry.Stop()
	metrics.Stop(logger)

	if gaenabled {
		_ = ga.SendSessionStop(telemetryClient, gacode, cookie)
+5 −5
Original line number Diff line number Diff line
@@ -73,13 +73,13 @@ type ApiServer struct {
	matchRegistry        MatchRegistry
	tracker              Tracker
	router               MessageRouter
	metrics              *Metrics
	metrics              Metrics
	runtime              *Runtime
	grpcServer           *grpc.Server
	grpcGatewayServer    *http.Server
}

func StartApiServer(logger *zap.Logger, startupLogger *zap.Logger, db *sql.DB, protojsonMarshaler *protojson.MarshalOptions, protojsonUnmarshaler *protojson.UnmarshalOptions, config Config, socialClient *social.Client, leaderboardCache LeaderboardCache, leaderboardRankCache LeaderboardRankCache, sessionRegistry SessionRegistry, sessionCache SessionCache, statusRegistry *StatusRegistry, matchRegistry MatchRegistry, matchmaker Matchmaker, tracker Tracker, router MessageRouter, metrics *Metrics, pipeline *Pipeline, runtime *Runtime) *ApiServer {
func StartApiServer(logger *zap.Logger, startupLogger *zap.Logger, db *sql.DB, protojsonMarshaler *protojson.MarshalOptions, protojsonUnmarshaler *protojson.UnmarshalOptions, config Config, socialClient *social.Client, leaderboardCache LeaderboardCache, leaderboardRankCache LeaderboardRankCache, sessionRegistry SessionRegistry, sessionCache SessionCache, statusRegistry *StatusRegistry, matchRegistry MatchRegistry, matchmaker Matchmaker, tracker Tracker, router MessageRouter, metrics Metrics, pipeline *Pipeline, runtime *Runtime) *ApiServer {
	var gatewayContextTimeoutMs string
	if config.GetSocket().IdleTimeoutMs > 500 {
		// Ensure the GRPC Gateway timeout is just under the idle timeout (if possible) to ensure it has priority.
@@ -91,7 +91,7 @@ func StartApiServer(logger *zap.Logger, startupLogger *zap.Logger, db *sql.DB, p
	}

	serverOpts := []grpc.ServerOption{
		grpc.StatsHandler(&MetricsGrpcHandler{metrics: metrics}),
		grpc.StatsHandler(&MetricsGrpcHandler{MetricsFn: metrics.Api}),
		grpc.MaxRecvMsgSize(int(config.GetSocket().MaxRequestSizeBytes)),
		grpc.UnaryInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
			ctx, err := securityInterceptorFunc(logger, config, sessionCache, ctx, req, info)
@@ -538,7 +538,7 @@ func extractClientAddress(logger *zap.Logger, clientAddr string) (string, string
	return clientIP, clientPort
}

func traceApiBefore(ctx context.Context, logger *zap.Logger, metrics *Metrics, fullMethodName string, fn func(clientIP, clientPort string) error) error {
func traceApiBefore(ctx context.Context, logger *zap.Logger, metrics Metrics, fullMethodName string, fn func(clientIP, clientPort string) error) error {
	clientIP, clientPort := extractClientAddressFromContext(logger, ctx)
	start := time.Now()

@@ -550,7 +550,7 @@ func traceApiBefore(ctx context.Context, logger *zap.Logger, metrics *Metrics, f
	return err
}

func traceApiAfter(ctx context.Context, logger *zap.Logger, metrics *Metrics, fullMethodName string, fn func(clientIP, clientPort string) error) {
func traceApiAfter(ctx context.Context, logger *zap.Logger, metrics Metrics, fullMethodName string, fn func(clientIP, clientPort string) error) {
	clientIP, clientPort := extractClientAddressFromContext(logger, ctx)
	start := time.Now()

+2 −2
Original line number Diff line number Diff line
@@ -131,7 +131,7 @@ type LocalMatchRegistry struct {
	sessionRegistry SessionRegistry
	tracker         Tracker
	router          MessageRouter
	metrics         *Metrics
	metrics         Metrics
	node            string

	ctx         context.Context
@@ -148,7 +148,7 @@ type LocalMatchRegistry struct {
	stoppedCh chan struct{}
}

func NewLocalMatchRegistry(logger, startupLogger *zap.Logger, config Config, sessionRegistry SessionRegistry, tracker Tracker, router MessageRouter, metrics *Metrics, node string) MatchRegistry {
func NewLocalMatchRegistry(logger, startupLogger *zap.Logger, config Config, sessionRegistry SessionRegistry, tracker Tracker, router MessageRouter, metrics Metrics, node string) MatchRegistry {
	mapping := bleve.NewIndexMapping()
	mapping.DefaultAnalyzer = keyword.Name

+145 −97
Original line number Diff line number Diff line
@@ -30,43 +30,75 @@ import (
	"go.uber.org/zap"
)

type Metrics struct {
type Metrics interface {
	Stop(logger *zap.Logger)

	SnapshotLatencyMs() float64
	SnapshotRateSec() float64
	SnapshotRecvKbSec() float64
	SnapshotSentKbSec() float64

	Api(name string, elapsed time.Duration, recvBytes, sentBytes int64, isErr bool)
	ApiBefore(name string, elapsed time.Duration, isErr bool)
	ApiAfter(name string, elapsed time.Duration, isErr bool)

	Message(recvBytes int64, isErr bool)
	MessageBytesSent(sentBytes int64)

	GaugeRuntimes(value float64)
	GaugeLuaRuntimes(value float64)
	GaugeJsRuntimes(value float64)
	GaugeAuthoritativeMatches(value float64)
	CountDroppedEvents(delta int64)
	CountWebsocketOpened(delta int64)
	CountWebsocketClosed(delta int64)
	GaugeSessions(value float64)
	GaugePresences(value float64)

	CustomCounter(name string, tags map[string]string, delta int64)
	CustomGauge(name string, tags map[string]string, value float64)
	CustomTimer(name string, tags map[string]string, value time.Duration)
}

var _ Metrics = &LocalMetrics{}

type LocalMetrics struct {
	logger *zap.Logger
	config Config
	db     *sql.DB

	cancelFn context.CancelFunc

	SnapshotLatencyMs *atomic.Float64
	SnapshotRateSec   *atomic.Float64
	SnapshotRecvKbSec *atomic.Float64
	SnapshotSentKbSec *atomic.Float64
	snapshotLatencyMs *atomic.Float64
	snapshotRateSec   *atomic.Float64
	snapshotRecvKbSec *atomic.Float64
	snapshotSentKbSec *atomic.Float64

	currentReqCount  *atomic.Int64
	currentMsTotal   *atomic.Int64
	currentRecvBytes *atomic.Int64
	currentSentBytes *atomic.Int64

	prometheusScope       tally.Scope
	PrometheusScope       tally.Scope
	prometheusCustomScope tally.Scope
	prometheusCloser      io.Closer
	prometheusHTTPServer  *http.Server
}

func NewMetrics(logger, startupLogger *zap.Logger, db *sql.DB, config Config) *Metrics {
func NewLocalMetrics(logger, startupLogger *zap.Logger, db *sql.DB, config Config) *LocalMetrics {
	ctx, cancelFn := context.WithCancel(context.Background())

	m := &Metrics{
	m := &LocalMetrics{
		logger: logger,
		config: config,
		db:     db,

		cancelFn: cancelFn,

		SnapshotLatencyMs: atomic.NewFloat64(0),
		SnapshotRateSec:   atomic.NewFloat64(0),
		SnapshotRecvKbSec: atomic.NewFloat64(0),
		SnapshotSentKbSec: atomic.NewFloat64(0),
		snapshotLatencyMs: atomic.NewFloat64(0),
		snapshotRateSec:   atomic.NewFloat64(0),
		snapshotRecvKbSec: atomic.NewFloat64(0),
		snapshotSentKbSec: atomic.NewFloat64(0),

		currentMsTotal:   atomic.NewInt64(0),
		currentReqCount:  atomic.NewInt64(0),
@@ -87,13 +119,13 @@ func NewMetrics(logger, startupLogger *zap.Logger, db *sql.DB, config Config) *M
				sentBytes := float64(m.currentSentBytes.Swap(0))

				if reqCount > 0 {
					m.SnapshotLatencyMs.Store(totalMs / reqCount)
					m.snapshotLatencyMs.Store(totalMs / reqCount)
				} else {
					m.SnapshotLatencyMs.Store(0)
					m.snapshotLatencyMs.Store(0)
				}
				m.SnapshotRateSec.Store(reqCount / snapshotFrequencySec)
				m.SnapshotRecvKbSec.Store((recvBytes / 1024) / snapshotFrequencySec)
				m.SnapshotSentKbSec.Store((sentBytes / 1024) / snapshotFrequencySec)
				m.snapshotRateSec.Store(reqCount / snapshotFrequencySec)
				m.snapshotRecvKbSec.Store((recvBytes / 1024) / snapshotFrequencySec)
				m.snapshotSentKbSec.Store((sentBytes / 1024) / snapshotFrequencySec)
			}
		}
	}()
@@ -108,14 +140,14 @@ func NewMetrics(logger, startupLogger *zap.Logger, db *sql.DB, config Config) *M
	if namespace := config.GetMetrics().Namespace; namespace != "" {
		tags["namespace"] = namespace
	}
	m.prometheusScope, m.prometheusCloser = tally.NewRootScope(tally.ScopeOptions{
	m.PrometheusScope, m.prometheusCloser = tally.NewRootScope(tally.ScopeOptions{
		Prefix:          config.GetMetrics().Prefix,
		Tags:            tags,
		CachedReporter:  reporter,
		Separator:       prometheus.DefaultSeparator,
		SanitizeOptions: &prometheus.DefaultSanitizerOpts,
	}, time.Duration(config.GetMetrics().ReportingFreqSec)*time.Second)
	m.prometheusCustomScope = m.prometheusScope.SubScope(config.GetMetrics().CustomPrefix)
	m.prometheusCustomScope = m.PrometheusScope.SubScope(config.GetMetrics().CustomPrefix)

	// Check if exposing Prometheus metrics directly is enabled.
	if config.GetMetrics().PrometheusPort > 0 {
@@ -144,25 +176,25 @@ func NewMetrics(logger, startupLogger *zap.Logger, db *sql.DB, config Config) *M
	return m
}

func (m *Metrics) refreshDBStats(next http.Handler) http.Handler {
func (m *LocalMetrics) refreshDBStats(next http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		dbStats := m.db.Stats()

		m.prometheusScope.Gauge("db_max_open_conns").Update(float64(dbStats.MaxOpenConnections))
		m.prometheusScope.Gauge("db_total_open_conns").Update(float64(dbStats.OpenConnections))
		m.prometheusScope.Gauge("db_in_use_conns").Update(float64(dbStats.InUse))
		m.prometheusScope.Gauge("db_idle_conns").Update(float64(dbStats.Idle))
		m.prometheusScope.Gauge("db_total_wait_count").Update(float64(dbStats.WaitCount))
		m.prometheusScope.Gauge("db_total_wait_time_nanos").Update(float64(dbStats.WaitDuration))
		m.prometheusScope.Gauge("db_total_max_idle_closed").Update(float64(dbStats.MaxIdleClosed))
		m.prometheusScope.Gauge("db_total_max_idle_time_closed").Update(float64(dbStats.MaxIdleTimeClosed))
		m.prometheusScope.Gauge("db_total_max_lifetime_closed").Update(float64(dbStats.MaxLifetimeClosed))
		m.PrometheusScope.Gauge("db_max_open_conns").Update(float64(dbStats.MaxOpenConnections))
		m.PrometheusScope.Gauge("db_total_open_conns").Update(float64(dbStats.OpenConnections))
		m.PrometheusScope.Gauge("db_in_use_conns").Update(float64(dbStats.InUse))
		m.PrometheusScope.Gauge("db_idle_conns").Update(float64(dbStats.Idle))
		m.PrometheusScope.Gauge("db_total_wait_count").Update(float64(dbStats.WaitCount))
		m.PrometheusScope.Gauge("db_total_wait_time_nanos").Update(float64(dbStats.WaitDuration))
		m.PrometheusScope.Gauge("db_total_max_idle_closed").Update(float64(dbStats.MaxIdleClosed))
		m.PrometheusScope.Gauge("db_total_max_idle_time_closed").Update(float64(dbStats.MaxIdleTimeClosed))
		m.PrometheusScope.Gauge("db_total_max_lifetime_closed").Update(float64(dbStats.MaxLifetimeClosed))

		next.ServeHTTP(w, r)
	})
}

func (m *Metrics) Stop(logger *zap.Logger) {
func (m *LocalMetrics) Stop(logger *zap.Logger) {
	if m.prometheusHTTPServer != nil {
		// Stop Prometheus server if one is running.
		if err := m.prometheusHTTPServer.Shutdown(context.Background()); err != nil {
@@ -177,7 +209,23 @@ func (m *Metrics) Stop(logger *zap.Logger) {
	m.cancelFn()
}

func (m *Metrics) Api(name string, elapsed time.Duration, recvBytes, sentBytes int64, isErr bool) {
func (m *LocalMetrics) SnapshotLatencyMs() float64 {
	return m.snapshotLatencyMs.Load()
}

func (m *LocalMetrics) SnapshotRateSec() float64 {
	return m.snapshotRateSec.Load()
}

func (m *LocalMetrics) SnapshotRecvKbSec() float64 {
	return m.snapshotRecvKbSec.Load()
}

func (m *LocalMetrics) SnapshotSentKbSec() float64 {
	return m.snapshotSentKbSec.Load()
}

func (m *LocalMetrics) Api(name string, elapsed time.Duration, recvBytes, sentBytes int64, isErr bool) {
	name = strings.TrimPrefix(name, API_PREFIX)

	// Increment ongoing statistics for current measurement window.
@@ -187,65 +235,65 @@ func (m *Metrics) Api(name string, elapsed time.Duration, recvBytes, sentBytes i
	m.currentSentBytes.Add(sentBytes)

	// Global stats.
	m.prometheusScope.Counter("overall_count").Inc(1)
	m.prometheusScope.Counter("overall_request_count").Inc(1)
	m.prometheusScope.Counter("overall_recv_bytes").Inc(recvBytes)
	m.prometheusScope.Counter("overall_request_recv_bytes").Inc(recvBytes)
	m.prometheusScope.Counter("overall_sent_bytes").Inc(sentBytes)
	m.prometheusScope.Counter("overall_request_sent_bytes").Inc(sentBytes)
	m.prometheusScope.Timer("overall_latency_ms").Record(elapsed / time.Millisecond)
	m.PrometheusScope.Counter("overall_count").Inc(1)
	m.PrometheusScope.Counter("overall_request_count").Inc(1)
	m.PrometheusScope.Counter("overall_recv_bytes").Inc(recvBytes)
	m.PrometheusScope.Counter("overall_request_recv_bytes").Inc(recvBytes)
	m.PrometheusScope.Counter("overall_sent_bytes").Inc(sentBytes)
	m.PrometheusScope.Counter("overall_request_sent_bytes").Inc(sentBytes)
	m.PrometheusScope.Timer("overall_latency_ms").Record(elapsed / time.Millisecond)

	// Per-endpoint stats.
	m.prometheusScope.Counter(name + "_count").Inc(1)
	m.prometheusScope.Counter(name + "_recv_bytes").Inc(recvBytes)
	m.prometheusScope.Counter(name + "_sent_bytes").Inc(sentBytes)
	m.prometheusScope.Timer(name + "_latency_ms").Record(elapsed / time.Millisecond)
	m.PrometheusScope.Counter(name + "_count").Inc(1)
	m.PrometheusScope.Counter(name + "_recv_bytes").Inc(recvBytes)
	m.PrometheusScope.Counter(name + "_sent_bytes").Inc(sentBytes)
	m.PrometheusScope.Timer(name + "_latency_ms").Record(elapsed / time.Millisecond)

	// Error stats if applicable.
	if isErr {
		m.prometheusScope.Counter("overall_errors").Inc(1)
		m.prometheusScope.Counter("overall_request_errors").Inc(1)
		m.prometheusScope.Counter(name + "_errors").Inc(1)
		m.PrometheusScope.Counter("overall_errors").Inc(1)
		m.PrometheusScope.Counter("overall_request_errors").Inc(1)
		m.PrometheusScope.Counter(name + "_errors").Inc(1)
	}
}

func (m *Metrics) ApiBefore(name string, elapsed time.Duration, isErr bool) {
func (m *LocalMetrics) ApiBefore(name string, elapsed time.Duration, isErr bool) {
	name = "before_" + strings.TrimPrefix(name, API_PREFIX)

	// Global stats.
	m.prometheusScope.Counter("overall_before_count").Inc(1)
	m.prometheusScope.Timer("overall_before_latency_ms").Record(elapsed / time.Millisecond)
	m.PrometheusScope.Counter("overall_before_count").Inc(1)
	m.PrometheusScope.Timer("overall_before_latency_ms").Record(elapsed / time.Millisecond)

	// Per-endpoint stats.
	m.prometheusScope.Counter(name + "_count").Inc(1)
	m.prometheusScope.Timer(name + "_latency_ms").Record(elapsed / time.Millisecond)
	m.PrometheusScope.Counter(name + "_count").Inc(1)
	m.PrometheusScope.Timer(name + "_latency_ms").Record(elapsed / time.Millisecond)

	// Error stats if applicable.
	if isErr {
		m.prometheusScope.Counter("overall_before_errors").Inc(1)
		m.prometheusScope.Counter(name + "_errors").Inc(1)
		m.PrometheusScope.Counter("overall_before_errors").Inc(1)
		m.PrometheusScope.Counter(name + "_errors").Inc(1)
	}
}

func (m *Metrics) ApiAfter(name string, elapsed time.Duration, isErr bool) {
func (m *LocalMetrics) ApiAfter(name string, elapsed time.Duration, isErr bool) {
	name = "after_" + strings.TrimPrefix(name, API_PREFIX)

	// Global stats.
	m.prometheusScope.Counter("overall_after_count").Inc(1)
	m.prometheusScope.Timer("overall_after_latency_ms").Record(elapsed / time.Millisecond)
	m.PrometheusScope.Counter("overall_after_count").Inc(1)
	m.PrometheusScope.Timer("overall_after_latency_ms").Record(elapsed / time.Millisecond)

	// Per-endpoint stats.
	m.prometheusScope.Counter(name + "_count").Inc(1)
	m.prometheusScope.Timer(name + "_latency_ms").Record(elapsed / time.Millisecond)
	m.PrometheusScope.Counter(name + "_count").Inc(1)
	m.PrometheusScope.Timer(name + "_latency_ms").Record(elapsed / time.Millisecond)

	// Error stats if applicable.
	if isErr {
		m.prometheusScope.Counter("overall_after_errors").Inc(1)
		m.prometheusScope.Counter(name + "_errors").Inc(1)
		m.PrometheusScope.Counter("overall_after_errors").Inc(1)
		m.PrometheusScope.Counter(name + "_errors").Inc(1)
	}
}

func (m *Metrics) Message(recvBytes int64, isErr bool) {
func (m *LocalMetrics) Message(recvBytes int64, isErr bool) {
	//name = strings.TrimPrefix(name, API_PREFIX)

	// Increment ongoing statistics for current measurement window.
@@ -255,83 +303,83 @@ func (m *Metrics) Message(recvBytes int64, isErr bool) {
	//m.currentSentBytes.Add(sentBytes)

	// Global stats.
	m.prometheusScope.Counter("overall_count").Inc(1)
	m.prometheusScope.Counter("overall_message_count").Inc(1)
	m.prometheusScope.Counter("overall_recv_bytes").Inc(recvBytes)
	m.prometheusScope.Counter("overall_message_recv_bytes").Inc(recvBytes)
	//m.prometheusScope.Counter("overall_sent_bytes").Inc(sentBytes)
	//m.prometheusScope.Timer("overall_latency_ms").Record(elapsed / time.Millisecond)
	m.PrometheusScope.Counter("overall_count").Inc(1)
	m.PrometheusScope.Counter("overall_message_count").Inc(1)
	m.PrometheusScope.Counter("overall_recv_bytes").Inc(recvBytes)
	m.PrometheusScope.Counter("overall_message_recv_bytes").Inc(recvBytes)
	//m.PrometheusScope.Counter("overall_sent_bytes").Inc(sentBytes)
	//m.PrometheusScope.Timer("overall_latency_ms").Record(elapsed / time.Millisecond)

	// Per-message stats.
	//m.prometheusScope.Counter(name + "_count").Inc(1)
	//m.prometheusScope.Counter(name + "_recv_bytes").Inc(recvBytes)
	//m.prometheusScope.Counter(name + "_sent_bytes").Inc(sentBytes)
	//m.prometheusScope.Timer(name + "_latency_ms").Record(elapsed / time.Millisecond)
	//m.PrometheusScope.Counter(name + "_count").Inc(1)
	//m.PrometheusScope.Counter(name + "_recv_bytes").Inc(recvBytes)
	//m.PrometheusScope.Counter(name + "_sent_bytes").Inc(sentBytes)
	//m.PrometheusScope.Timer(name + "_latency_ms").Record(elapsed / time.Millisecond)

	// Error stats if applicable.
	if isErr {
		m.prometheusScope.Counter("overall_errors").Inc(1)
		m.prometheusScope.Counter("overall_message_errors").Inc(1)
		//m.prometheusScope.Counter(name + "_errors").Inc(1)
		m.PrometheusScope.Counter("overall_errors").Inc(1)
		m.PrometheusScope.Counter("overall_message_errors").Inc(1)
		//m.PrometheusScope.Counter(name + "_errors").Inc(1)
	}
}

func (m *Metrics) MessageBytesSent(sentBytes int64) {
func (m *LocalMetrics) MessageBytesSent(sentBytes int64) {
	// Increment ongoing statistics for current measurement window.
	m.currentSentBytes.Add(sentBytes)

	// Global stats.
	m.prometheusScope.Counter("overall_sent_bytes").Inc(sentBytes)
	m.prometheusScope.Counter("overall_message_sent_bytes").Inc(sentBytes)
	m.PrometheusScope.Counter("overall_sent_bytes").Inc(sentBytes)
	m.PrometheusScope.Counter("overall_message_sent_bytes").Inc(sentBytes)
}

// Set the absolute value of currently allocated Lua runtime VMs.
func (m *Metrics) GaugeRuntimes(value float64) {
	m.prometheusScope.Gauge("lua_runtimes").Update(value)
func (m *LocalMetrics) GaugeRuntimes(value float64) {
	m.PrometheusScope.Gauge("lua_runtimes").Update(value)
}

// Set the absolute value of currently allocated Lua runtime VMs.
func (m *Metrics) GaugeLuaRuntimes(value float64) {
	m.prometheusScope.Gauge("lua_runtimes").Update(value)
func (m *LocalMetrics) GaugeLuaRuntimes(value float64) {
	m.PrometheusScope.Gauge("lua_runtimes").Update(value)
}

// Set the absolute value of currently allocated JavaScript runtime VMs.
func (m *Metrics) GaugeJsRuntimes(value float64) {
	m.prometheusScope.Gauge("javascript_runtimes").Update(value)
func (m *LocalMetrics) GaugeJsRuntimes(value float64) {
	m.PrometheusScope.Gauge("javascript_runtimes").Update(value)
}

// Set the absolute value of currently running authoritative matches.
func (m *Metrics) GaugeAuthoritativeMatches(value float64) {
	m.prometheusScope.Gauge("authoritative_matches").Update(value)
func (m *LocalMetrics) GaugeAuthoritativeMatches(value float64) {
	m.PrometheusScope.Gauge("authoritative_matches").Update(value)
}

// Increment the number of dropped events.
func (m *Metrics) CountDroppedEvents(delta int64) {
	m.prometheusScope.Counter("dropped_events").Inc(delta)
func (m *LocalMetrics) CountDroppedEvents(delta int64) {
	m.PrometheusScope.Counter("dropped_events").Inc(delta)
}

// Increment the number of opened WS connections.
func (m *Metrics) CountWebsocketOpened(delta int64) {
	m.prometheusScope.Counter("socket_ws_opened").Inc(delta)
func (m *LocalMetrics) CountWebsocketOpened(delta int64) {
	m.PrometheusScope.Counter("socket_ws_opened").Inc(delta)
}

// Increment the number of closed WS connections.
func (m *Metrics) CountWebsocketClosed(delta int64) {
	m.prometheusScope.Counter("socket_ws_closed").Inc(delta)
func (m *LocalMetrics) CountWebsocketClosed(delta int64) {
	m.PrometheusScope.Counter("socket_ws_closed").Inc(delta)
}

// Set the absolute value of currently active sessions.
func (m *Metrics) GaugeSessions(value float64) {
	m.prometheusScope.Gauge("sessions").Update(value)
func (m *LocalMetrics) GaugeSessions(value float64) {
	m.PrometheusScope.Gauge("sessions").Update(value)
}

// Set the absolute value of currently tracked presences.
func (m *Metrics) GaugePresences(value float64) {
	m.prometheusScope.Gauge("presences").Update(value)
func (m *LocalMetrics) GaugePresences(value float64) {
	m.PrometheusScope.Gauge("presences").Update(value)
}

// CustomCounter adds the given delta to a counter with the specified name and tags.
func (m *Metrics) CustomCounter(name string, tags map[string]string, delta int64) {
func (m *LocalMetrics) CustomCounter(name string, tags map[string]string, delta int64) {
	scope := m.prometheusCustomScope
	if len(tags) != 0 {
		scope = scope.Tagged(tags)
@@ -340,7 +388,7 @@ func (m *Metrics) CustomCounter(name string, tags map[string]string, delta int64
}

// CustomGauge sets the given value to a gauge with the specified name and tags.
func (m *Metrics) CustomGauge(name string, tags map[string]string, value float64) {
func (m *LocalMetrics) CustomGauge(name string, tags map[string]string, value float64) {
	scope := m.prometheusCustomScope
	if len(tags) != 0 {
		scope = scope.Tagged(tags)
@@ -349,7 +397,7 @@ func (m *Metrics) CustomGauge(name string, tags map[string]string, value float64
}

// CustomTimer records the given value to a timer with the specified name and tags.
func (m *Metrics) CustomTimer(name string, tags map[string]string, value time.Duration) {
func (m *LocalMetrics) CustomTimer(name string, tags map[string]string, value time.Duration) {
	scope := m.prometheusCustomScope
	if len(tags) != 0 {
		scope = scope.Tagged(tags)
+3 −2
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@ package server
import (
	"context"
	"sync/atomic"
	"time"

	"google.golang.org/grpc/stats"
)
@@ -30,7 +31,7 @@ type metricsGrpcHandlerData struct {
}

type MetricsGrpcHandler struct {
	metrics *Metrics
	MetricsFn func(name string, elapsed time.Duration, recvBytes, sentBytes int64, isErr bool)
}

// TagRPC can attach some information to the given context.
@@ -51,7 +52,7 @@ func (m *MetricsGrpcHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
	case *stats.OutPayload:
		atomic.AddInt64(&data.sentBytes, int64(rs.WireLength))
	case *stats.End:
		m.metrics.Api(data.fullMethodName, rs.EndTime.Sub(rs.BeginTime), data.recvBytes, data.sentBytes, rs.Error != nil)
		m.MetricsFn(data.fullMethodName, rs.EndTime.Sub(rs.BeginTime), data.recvBytes, data.sentBytes, rs.Error != nil)
	}
}

Loading