diff --git a/main.go b/main.go index 6d05ebf79f947ea68b569177787de16026488cc7..c813da82e28e08b589996e35f85f250cecd060c7 100644 --- a/main.go +++ b/main.go @@ -130,7 +130,7 @@ func main() { // Start up server components. cookie := newOrLoadCookie(config) - metrics := server.NewMetrics(logger, startupLogger, db, config) + metrics := server.NewLocalMetrics(logger, startupLogger, db, config) sessionRegistry := server.NewLocalSessionRegistry(metrics) sessionCache := server.NewLocalSessionCache(config) statusRegistry := server.NewStatusRegistry(logger, config, sessionRegistry, jsonpbMarshaler) @@ -217,13 +217,13 @@ func main() { // Gracefully stop remaining server components. apiServer.Stop() consoleServer.Stop() - metrics.Stop(logger) matchmaker.Stop() leaderboardScheduler.Stop() tracker.Stop() statusRegistry.Stop() sessionCache.Stop() sessionRegistry.Stop() + metrics.Stop(logger) if gaenabled { _ = ga.SendSessionStop(telemetryClient, gacode, cookie) diff --git a/server/api.go b/server/api.go index 0a1052b1e289c9b09b74919b25eaa90b4ee4c7e2..b59e75cc69737bc6951df63d71d6d2d22e3335a8 100644 --- a/server/api.go +++ b/server/api.go @@ -73,13 +73,13 @@ type ApiServer struct { matchRegistry MatchRegistry tracker Tracker router MessageRouter - metrics *Metrics + metrics Metrics runtime *Runtime grpcServer *grpc.Server grpcGatewayServer *http.Server } -func StartApiServer(logger *zap.Logger, startupLogger *zap.Logger, db *sql.DB, protojsonMarshaler *protojson.MarshalOptions, protojsonUnmarshaler *protojson.UnmarshalOptions, config Config, socialClient *social.Client, leaderboardCache LeaderboardCache, leaderboardRankCache LeaderboardRankCache, sessionRegistry SessionRegistry, sessionCache SessionCache, statusRegistry *StatusRegistry, matchRegistry MatchRegistry, matchmaker Matchmaker, tracker Tracker, router MessageRouter, metrics *Metrics, pipeline *Pipeline, runtime *Runtime) *ApiServer { +func StartApiServer(logger *zap.Logger, startupLogger *zap.Logger, db *sql.DB, protojsonMarshaler *protojson.MarshalOptions, protojsonUnmarshaler *protojson.UnmarshalOptions, config Config, socialClient *social.Client, leaderboardCache LeaderboardCache, leaderboardRankCache LeaderboardRankCache, sessionRegistry SessionRegistry, sessionCache SessionCache, statusRegistry *StatusRegistry, matchRegistry MatchRegistry, matchmaker Matchmaker, tracker Tracker, router MessageRouter, metrics Metrics, pipeline *Pipeline, runtime *Runtime) *ApiServer { var gatewayContextTimeoutMs string if config.GetSocket().IdleTimeoutMs > 500 { // Ensure the GRPC Gateway timeout is just under the idle timeout (if possible) to ensure it has priority. @@ -91,7 +91,7 @@ func StartApiServer(logger *zap.Logger, startupLogger *zap.Logger, db *sql.DB, p } serverOpts := []grpc.ServerOption{ - grpc.StatsHandler(&MetricsGrpcHandler{metrics: metrics}), + grpc.StatsHandler(&MetricsGrpcHandler{MetricsFn: metrics.Api}), grpc.MaxRecvMsgSize(int(config.GetSocket().MaxRequestSizeBytes)), grpc.UnaryInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { ctx, err := securityInterceptorFunc(logger, config, sessionCache, ctx, req, info) @@ -538,7 +538,7 @@ func extractClientAddress(logger *zap.Logger, clientAddr string) (string, string return clientIP, clientPort } -func traceApiBefore(ctx context.Context, logger *zap.Logger, metrics *Metrics, fullMethodName string, fn func(clientIP, clientPort string) error) error { +func traceApiBefore(ctx context.Context, logger *zap.Logger, metrics Metrics, fullMethodName string, fn func(clientIP, clientPort string) error) error { clientIP, clientPort := extractClientAddressFromContext(logger, ctx) start := time.Now() @@ -550,7 +550,7 @@ func traceApiBefore(ctx context.Context, logger *zap.Logger, metrics *Metrics, f return err } -func traceApiAfter(ctx context.Context, logger *zap.Logger, metrics *Metrics, fullMethodName string, fn func(clientIP, clientPort string) error) { +func traceApiAfter(ctx context.Context, logger *zap.Logger, metrics Metrics, fullMethodName string, fn func(clientIP, clientPort string) error) { clientIP, clientPort := extractClientAddressFromContext(logger, ctx) start := time.Now() diff --git a/server/match_registry.go b/server/match_registry.go index 1d21a2abc9b1bc138c2dc18d18891751465c0aa2..f46b67caf9f74a961eb36b6a1135723b1e1c53fe 100644 --- a/server/match_registry.go +++ b/server/match_registry.go @@ -131,7 +131,7 @@ type LocalMatchRegistry struct { sessionRegistry SessionRegistry tracker Tracker router MessageRouter - metrics *Metrics + metrics Metrics node string ctx context.Context @@ -148,7 +148,7 @@ type LocalMatchRegistry struct { stoppedCh chan struct{} } -func NewLocalMatchRegistry(logger, startupLogger *zap.Logger, config Config, sessionRegistry SessionRegistry, tracker Tracker, router MessageRouter, metrics *Metrics, node string) MatchRegistry { +func NewLocalMatchRegistry(logger, startupLogger *zap.Logger, config Config, sessionRegistry SessionRegistry, tracker Tracker, router MessageRouter, metrics Metrics, node string) MatchRegistry { mapping := bleve.NewIndexMapping() mapping.DefaultAnalyzer = keyword.Name diff --git a/server/metrics.go b/server/metrics.go index cffe97d8b911104292da77ec5649557debfd2240..89ca0a2da18ddd39fe0c4194b078f13419265cef 100644 --- a/server/metrics.go +++ b/server/metrics.go @@ -30,43 +30,75 @@ import ( "go.uber.org/zap" ) -type Metrics struct { +type Metrics interface { + Stop(logger *zap.Logger) + + SnapshotLatencyMs() float64 + SnapshotRateSec() float64 + SnapshotRecvKbSec() float64 + SnapshotSentKbSec() float64 + + Api(name string, elapsed time.Duration, recvBytes, sentBytes int64, isErr bool) + ApiBefore(name string, elapsed time.Duration, isErr bool) + ApiAfter(name string, elapsed time.Duration, isErr bool) + + Message(recvBytes int64, isErr bool) + MessageBytesSent(sentBytes int64) + + GaugeRuntimes(value float64) + GaugeLuaRuntimes(value float64) + GaugeJsRuntimes(value float64) + GaugeAuthoritativeMatches(value float64) + CountDroppedEvents(delta int64) + CountWebsocketOpened(delta int64) + CountWebsocketClosed(delta int64) + GaugeSessions(value float64) + GaugePresences(value float64) + + CustomCounter(name string, tags map[string]string, delta int64) + CustomGauge(name string, tags map[string]string, value float64) + CustomTimer(name string, tags map[string]string, value time.Duration) +} + +var _ Metrics = &LocalMetrics{} + +type LocalMetrics struct { logger *zap.Logger config Config db *sql.DB cancelFn context.CancelFunc - SnapshotLatencyMs *atomic.Float64 - SnapshotRateSec *atomic.Float64 - SnapshotRecvKbSec *atomic.Float64 - SnapshotSentKbSec *atomic.Float64 + snapshotLatencyMs *atomic.Float64 + snapshotRateSec *atomic.Float64 + snapshotRecvKbSec *atomic.Float64 + snapshotSentKbSec *atomic.Float64 currentReqCount *atomic.Int64 currentMsTotal *atomic.Int64 currentRecvBytes *atomic.Int64 currentSentBytes *atomic.Int64 - prometheusScope tally.Scope + PrometheusScope tally.Scope prometheusCustomScope tally.Scope prometheusCloser io.Closer prometheusHTTPServer *http.Server } -func NewMetrics(logger, startupLogger *zap.Logger, db *sql.DB, config Config) *Metrics { +func NewLocalMetrics(logger, startupLogger *zap.Logger, db *sql.DB, config Config) *LocalMetrics { ctx, cancelFn := context.WithCancel(context.Background()) - m := &Metrics{ + m := &LocalMetrics{ logger: logger, config: config, db: db, cancelFn: cancelFn, - SnapshotLatencyMs: atomic.NewFloat64(0), - SnapshotRateSec: atomic.NewFloat64(0), - SnapshotRecvKbSec: atomic.NewFloat64(0), - SnapshotSentKbSec: atomic.NewFloat64(0), + snapshotLatencyMs: atomic.NewFloat64(0), + snapshotRateSec: atomic.NewFloat64(0), + snapshotRecvKbSec: atomic.NewFloat64(0), + snapshotSentKbSec: atomic.NewFloat64(0), currentMsTotal: atomic.NewInt64(0), currentReqCount: atomic.NewInt64(0), @@ -87,13 +119,13 @@ func NewMetrics(logger, startupLogger *zap.Logger, db *sql.DB, config Config) *M sentBytes := float64(m.currentSentBytes.Swap(0)) if reqCount > 0 { - m.SnapshotLatencyMs.Store(totalMs / reqCount) + m.snapshotLatencyMs.Store(totalMs / reqCount) } else { - m.SnapshotLatencyMs.Store(0) + m.snapshotLatencyMs.Store(0) } - m.SnapshotRateSec.Store(reqCount / snapshotFrequencySec) - m.SnapshotRecvKbSec.Store((recvBytes / 1024) / snapshotFrequencySec) - m.SnapshotSentKbSec.Store((sentBytes / 1024) / snapshotFrequencySec) + m.snapshotRateSec.Store(reqCount / snapshotFrequencySec) + m.snapshotRecvKbSec.Store((recvBytes / 1024) / snapshotFrequencySec) + m.snapshotSentKbSec.Store((sentBytes / 1024) / snapshotFrequencySec) } } }() @@ -108,14 +140,14 @@ func NewMetrics(logger, startupLogger *zap.Logger, db *sql.DB, config Config) *M if namespace := config.GetMetrics().Namespace; namespace != "" { tags["namespace"] = namespace } - m.prometheusScope, m.prometheusCloser = tally.NewRootScope(tally.ScopeOptions{ + m.PrometheusScope, m.prometheusCloser = tally.NewRootScope(tally.ScopeOptions{ Prefix: config.GetMetrics().Prefix, Tags: tags, CachedReporter: reporter, Separator: prometheus.DefaultSeparator, SanitizeOptions: &prometheus.DefaultSanitizerOpts, }, time.Duration(config.GetMetrics().ReportingFreqSec)*time.Second) - m.prometheusCustomScope = m.prometheusScope.SubScope(config.GetMetrics().CustomPrefix) + m.prometheusCustomScope = m.PrometheusScope.SubScope(config.GetMetrics().CustomPrefix) // Check if exposing Prometheus metrics directly is enabled. if config.GetMetrics().PrometheusPort > 0 { @@ -144,25 +176,25 @@ func NewMetrics(logger, startupLogger *zap.Logger, db *sql.DB, config Config) *M return m } -func (m *Metrics) refreshDBStats(next http.Handler) http.Handler { +func (m *LocalMetrics) refreshDBStats(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { dbStats := m.db.Stats() - m.prometheusScope.Gauge("db_max_open_conns").Update(float64(dbStats.MaxOpenConnections)) - m.prometheusScope.Gauge("db_total_open_conns").Update(float64(dbStats.OpenConnections)) - m.prometheusScope.Gauge("db_in_use_conns").Update(float64(dbStats.InUse)) - m.prometheusScope.Gauge("db_idle_conns").Update(float64(dbStats.Idle)) - m.prometheusScope.Gauge("db_total_wait_count").Update(float64(dbStats.WaitCount)) - m.prometheusScope.Gauge("db_total_wait_time_nanos").Update(float64(dbStats.WaitDuration)) - m.prometheusScope.Gauge("db_total_max_idle_closed").Update(float64(dbStats.MaxIdleClosed)) - m.prometheusScope.Gauge("db_total_max_idle_time_closed").Update(float64(dbStats.MaxIdleTimeClosed)) - m.prometheusScope.Gauge("db_total_max_lifetime_closed").Update(float64(dbStats.MaxLifetimeClosed)) + m.PrometheusScope.Gauge("db_max_open_conns").Update(float64(dbStats.MaxOpenConnections)) + m.PrometheusScope.Gauge("db_total_open_conns").Update(float64(dbStats.OpenConnections)) + m.PrometheusScope.Gauge("db_in_use_conns").Update(float64(dbStats.InUse)) + m.PrometheusScope.Gauge("db_idle_conns").Update(float64(dbStats.Idle)) + m.PrometheusScope.Gauge("db_total_wait_count").Update(float64(dbStats.WaitCount)) + m.PrometheusScope.Gauge("db_total_wait_time_nanos").Update(float64(dbStats.WaitDuration)) + m.PrometheusScope.Gauge("db_total_max_idle_closed").Update(float64(dbStats.MaxIdleClosed)) + m.PrometheusScope.Gauge("db_total_max_idle_time_closed").Update(float64(dbStats.MaxIdleTimeClosed)) + m.PrometheusScope.Gauge("db_total_max_lifetime_closed").Update(float64(dbStats.MaxLifetimeClosed)) next.ServeHTTP(w, r) }) } -func (m *Metrics) Stop(logger *zap.Logger) { +func (m *LocalMetrics) Stop(logger *zap.Logger) { if m.prometheusHTTPServer != nil { // Stop Prometheus server if one is running. if err := m.prometheusHTTPServer.Shutdown(context.Background()); err != nil { @@ -177,7 +209,23 @@ func (m *Metrics) Stop(logger *zap.Logger) { m.cancelFn() } -func (m *Metrics) Api(name string, elapsed time.Duration, recvBytes, sentBytes int64, isErr bool) { +func (m *LocalMetrics) SnapshotLatencyMs() float64 { + return m.snapshotLatencyMs.Load() +} + +func (m *LocalMetrics) SnapshotRateSec() float64 { + return m.snapshotRateSec.Load() +} + +func (m *LocalMetrics) SnapshotRecvKbSec() float64 { + return m.snapshotRecvKbSec.Load() +} + +func (m *LocalMetrics) SnapshotSentKbSec() float64 { + return m.snapshotSentKbSec.Load() +} + +func (m *LocalMetrics) Api(name string, elapsed time.Duration, recvBytes, sentBytes int64, isErr bool) { name = strings.TrimPrefix(name, API_PREFIX) // Increment ongoing statistics for current measurement window. @@ -187,65 +235,65 @@ func (m *Metrics) Api(name string, elapsed time.Duration, recvBytes, sentBytes i m.currentSentBytes.Add(sentBytes) // Global stats. - m.prometheusScope.Counter("overall_count").Inc(1) - m.prometheusScope.Counter("overall_request_count").Inc(1) - m.prometheusScope.Counter("overall_recv_bytes").Inc(recvBytes) - m.prometheusScope.Counter("overall_request_recv_bytes").Inc(recvBytes) - m.prometheusScope.Counter("overall_sent_bytes").Inc(sentBytes) - m.prometheusScope.Counter("overall_request_sent_bytes").Inc(sentBytes) - m.prometheusScope.Timer("overall_latency_ms").Record(elapsed / time.Millisecond) + m.PrometheusScope.Counter("overall_count").Inc(1) + m.PrometheusScope.Counter("overall_request_count").Inc(1) + m.PrometheusScope.Counter("overall_recv_bytes").Inc(recvBytes) + m.PrometheusScope.Counter("overall_request_recv_bytes").Inc(recvBytes) + m.PrometheusScope.Counter("overall_sent_bytes").Inc(sentBytes) + m.PrometheusScope.Counter("overall_request_sent_bytes").Inc(sentBytes) + m.PrometheusScope.Timer("overall_latency_ms").Record(elapsed / time.Millisecond) // Per-endpoint stats. - m.prometheusScope.Counter(name + "_count").Inc(1) - m.prometheusScope.Counter(name + "_recv_bytes").Inc(recvBytes) - m.prometheusScope.Counter(name + "_sent_bytes").Inc(sentBytes) - m.prometheusScope.Timer(name + "_latency_ms").Record(elapsed / time.Millisecond) + m.PrometheusScope.Counter(name + "_count").Inc(1) + m.PrometheusScope.Counter(name + "_recv_bytes").Inc(recvBytes) + m.PrometheusScope.Counter(name + "_sent_bytes").Inc(sentBytes) + m.PrometheusScope.Timer(name + "_latency_ms").Record(elapsed / time.Millisecond) // Error stats if applicable. if isErr { - m.prometheusScope.Counter("overall_errors").Inc(1) - m.prometheusScope.Counter("overall_request_errors").Inc(1) - m.prometheusScope.Counter(name + "_errors").Inc(1) + m.PrometheusScope.Counter("overall_errors").Inc(1) + m.PrometheusScope.Counter("overall_request_errors").Inc(1) + m.PrometheusScope.Counter(name + "_errors").Inc(1) } } -func (m *Metrics) ApiBefore(name string, elapsed time.Duration, isErr bool) { +func (m *LocalMetrics) ApiBefore(name string, elapsed time.Duration, isErr bool) { name = "before_" + strings.TrimPrefix(name, API_PREFIX) // Global stats. - m.prometheusScope.Counter("overall_before_count").Inc(1) - m.prometheusScope.Timer("overall_before_latency_ms").Record(elapsed / time.Millisecond) + m.PrometheusScope.Counter("overall_before_count").Inc(1) + m.PrometheusScope.Timer("overall_before_latency_ms").Record(elapsed / time.Millisecond) // Per-endpoint stats. - m.prometheusScope.Counter(name + "_count").Inc(1) - m.prometheusScope.Timer(name + "_latency_ms").Record(elapsed / time.Millisecond) + m.PrometheusScope.Counter(name + "_count").Inc(1) + m.PrometheusScope.Timer(name + "_latency_ms").Record(elapsed / time.Millisecond) // Error stats if applicable. if isErr { - m.prometheusScope.Counter("overall_before_errors").Inc(1) - m.prometheusScope.Counter(name + "_errors").Inc(1) + m.PrometheusScope.Counter("overall_before_errors").Inc(1) + m.PrometheusScope.Counter(name + "_errors").Inc(1) } } -func (m *Metrics) ApiAfter(name string, elapsed time.Duration, isErr bool) { +func (m *LocalMetrics) ApiAfter(name string, elapsed time.Duration, isErr bool) { name = "after_" + strings.TrimPrefix(name, API_PREFIX) // Global stats. - m.prometheusScope.Counter("overall_after_count").Inc(1) - m.prometheusScope.Timer("overall_after_latency_ms").Record(elapsed / time.Millisecond) + m.PrometheusScope.Counter("overall_after_count").Inc(1) + m.PrometheusScope.Timer("overall_after_latency_ms").Record(elapsed / time.Millisecond) // Per-endpoint stats. - m.prometheusScope.Counter(name + "_count").Inc(1) - m.prometheusScope.Timer(name + "_latency_ms").Record(elapsed / time.Millisecond) + m.PrometheusScope.Counter(name + "_count").Inc(1) + m.PrometheusScope.Timer(name + "_latency_ms").Record(elapsed / time.Millisecond) // Error stats if applicable. if isErr { - m.prometheusScope.Counter("overall_after_errors").Inc(1) - m.prometheusScope.Counter(name + "_errors").Inc(1) + m.PrometheusScope.Counter("overall_after_errors").Inc(1) + m.PrometheusScope.Counter(name + "_errors").Inc(1) } } -func (m *Metrics) Message(recvBytes int64, isErr bool) { +func (m *LocalMetrics) Message(recvBytes int64, isErr bool) { //name = strings.TrimPrefix(name, API_PREFIX) // Increment ongoing statistics for current measurement window. @@ -255,83 +303,83 @@ func (m *Metrics) Message(recvBytes int64, isErr bool) { //m.currentSentBytes.Add(sentBytes) // Global stats. - m.prometheusScope.Counter("overall_count").Inc(1) - m.prometheusScope.Counter("overall_message_count").Inc(1) - m.prometheusScope.Counter("overall_recv_bytes").Inc(recvBytes) - m.prometheusScope.Counter("overall_message_recv_bytes").Inc(recvBytes) - //m.prometheusScope.Counter("overall_sent_bytes").Inc(sentBytes) - //m.prometheusScope.Timer("overall_latency_ms").Record(elapsed / time.Millisecond) + m.PrometheusScope.Counter("overall_count").Inc(1) + m.PrometheusScope.Counter("overall_message_count").Inc(1) + m.PrometheusScope.Counter("overall_recv_bytes").Inc(recvBytes) + m.PrometheusScope.Counter("overall_message_recv_bytes").Inc(recvBytes) + //m.PrometheusScope.Counter("overall_sent_bytes").Inc(sentBytes) + //m.PrometheusScope.Timer("overall_latency_ms").Record(elapsed / time.Millisecond) // Per-message stats. - //m.prometheusScope.Counter(name + "_count").Inc(1) - //m.prometheusScope.Counter(name + "_recv_bytes").Inc(recvBytes) - //m.prometheusScope.Counter(name + "_sent_bytes").Inc(sentBytes) - //m.prometheusScope.Timer(name + "_latency_ms").Record(elapsed / time.Millisecond) + //m.PrometheusScope.Counter(name + "_count").Inc(1) + //m.PrometheusScope.Counter(name + "_recv_bytes").Inc(recvBytes) + //m.PrometheusScope.Counter(name + "_sent_bytes").Inc(sentBytes) + //m.PrometheusScope.Timer(name + "_latency_ms").Record(elapsed / time.Millisecond) // Error stats if applicable. if isErr { - m.prometheusScope.Counter("overall_errors").Inc(1) - m.prometheusScope.Counter("overall_message_errors").Inc(1) - //m.prometheusScope.Counter(name + "_errors").Inc(1) + m.PrometheusScope.Counter("overall_errors").Inc(1) + m.PrometheusScope.Counter("overall_message_errors").Inc(1) + //m.PrometheusScope.Counter(name + "_errors").Inc(1) } } -func (m *Metrics) MessageBytesSent(sentBytes int64) { +func (m *LocalMetrics) MessageBytesSent(sentBytes int64) { // Increment ongoing statistics for current measurement window. m.currentSentBytes.Add(sentBytes) // Global stats. - m.prometheusScope.Counter("overall_sent_bytes").Inc(sentBytes) - m.prometheusScope.Counter("overall_message_sent_bytes").Inc(sentBytes) + m.PrometheusScope.Counter("overall_sent_bytes").Inc(sentBytes) + m.PrometheusScope.Counter("overall_message_sent_bytes").Inc(sentBytes) } // Set the absolute value of currently allocated Lua runtime VMs. -func (m *Metrics) GaugeRuntimes(value float64) { - m.prometheusScope.Gauge("lua_runtimes").Update(value) +func (m *LocalMetrics) GaugeRuntimes(value float64) { + m.PrometheusScope.Gauge("lua_runtimes").Update(value) } // Set the absolute value of currently allocated Lua runtime VMs. -func (m *Metrics) GaugeLuaRuntimes(value float64) { - m.prometheusScope.Gauge("lua_runtimes").Update(value) +func (m *LocalMetrics) GaugeLuaRuntimes(value float64) { + m.PrometheusScope.Gauge("lua_runtimes").Update(value) } // Set the absolute value of currently allocated JavaScript runtime VMs. -func (m *Metrics) GaugeJsRuntimes(value float64) { - m.prometheusScope.Gauge("javascript_runtimes").Update(value) +func (m *LocalMetrics) GaugeJsRuntimes(value float64) { + m.PrometheusScope.Gauge("javascript_runtimes").Update(value) } // Set the absolute value of currently running authoritative matches. -func (m *Metrics) GaugeAuthoritativeMatches(value float64) { - m.prometheusScope.Gauge("authoritative_matches").Update(value) +func (m *LocalMetrics) GaugeAuthoritativeMatches(value float64) { + m.PrometheusScope.Gauge("authoritative_matches").Update(value) } // Increment the number of dropped events. -func (m *Metrics) CountDroppedEvents(delta int64) { - m.prometheusScope.Counter("dropped_events").Inc(delta) +func (m *LocalMetrics) CountDroppedEvents(delta int64) { + m.PrometheusScope.Counter("dropped_events").Inc(delta) } // Increment the number of opened WS connections. -func (m *Metrics) CountWebsocketOpened(delta int64) { - m.prometheusScope.Counter("socket_ws_opened").Inc(delta) +func (m *LocalMetrics) CountWebsocketOpened(delta int64) { + m.PrometheusScope.Counter("socket_ws_opened").Inc(delta) } // Increment the number of closed WS connections. -func (m *Metrics) CountWebsocketClosed(delta int64) { - m.prometheusScope.Counter("socket_ws_closed").Inc(delta) +func (m *LocalMetrics) CountWebsocketClosed(delta int64) { + m.PrometheusScope.Counter("socket_ws_closed").Inc(delta) } // Set the absolute value of currently active sessions. -func (m *Metrics) GaugeSessions(value float64) { - m.prometheusScope.Gauge("sessions").Update(value) +func (m *LocalMetrics) GaugeSessions(value float64) { + m.PrometheusScope.Gauge("sessions").Update(value) } // Set the absolute value of currently tracked presences. -func (m *Metrics) GaugePresences(value float64) { - m.prometheusScope.Gauge("presences").Update(value) +func (m *LocalMetrics) GaugePresences(value float64) { + m.PrometheusScope.Gauge("presences").Update(value) } // CustomCounter adds the given delta to a counter with the specified name and tags. -func (m *Metrics) CustomCounter(name string, tags map[string]string, delta int64) { +func (m *LocalMetrics) CustomCounter(name string, tags map[string]string, delta int64) { scope := m.prometheusCustomScope if len(tags) != 0 { scope = scope.Tagged(tags) @@ -340,7 +388,7 @@ func (m *Metrics) CustomCounter(name string, tags map[string]string, delta int64 } // CustomGauge sets the given value to a gauge with the specified name and tags. -func (m *Metrics) CustomGauge(name string, tags map[string]string, value float64) { +func (m *LocalMetrics) CustomGauge(name string, tags map[string]string, value float64) { scope := m.prometheusCustomScope if len(tags) != 0 { scope = scope.Tagged(tags) @@ -349,7 +397,7 @@ func (m *Metrics) CustomGauge(name string, tags map[string]string, value float64 } // CustomTimer records the given value to a timer with the specified name and tags. -func (m *Metrics) CustomTimer(name string, tags map[string]string, value time.Duration) { +func (m *LocalMetrics) CustomTimer(name string, tags map[string]string, value time.Duration) { scope := m.prometheusCustomScope if len(tags) != 0 { scope = scope.Tagged(tags) diff --git a/server/metrics_grpc_handler.go b/server/metrics_grpc_handler.go index ff8be96592eceeba04b92454d9fedb8a8168d00a..9443fed7e45a0f9a5f375a93e903602d031da7dc 100644 --- a/server/metrics_grpc_handler.go +++ b/server/metrics_grpc_handler.go @@ -17,6 +17,7 @@ package server import ( "context" "sync/atomic" + "time" "google.golang.org/grpc/stats" ) @@ -30,7 +31,7 @@ type metricsGrpcHandlerData struct { } type MetricsGrpcHandler struct { - metrics *Metrics + MetricsFn func(name string, elapsed time.Duration, recvBytes, sentBytes int64, isErr bool) } // TagRPC can attach some information to the given context. @@ -51,7 +52,7 @@ func (m *MetricsGrpcHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { case *stats.OutPayload: atomic.AddInt64(&data.sentBytes, int64(rs.WireLength)) case *stats.End: - m.metrics.Api(data.fullMethodName, rs.EndTime.Sub(rs.BeginTime), data.recvBytes, data.sentBytes, rs.Error != nil) + m.MetricsFn(data.fullMethodName, rs.EndTime.Sub(rs.BeginTime), data.recvBytes, data.sentBytes, rs.Error != nil) } } diff --git a/server/runtime.go b/server/runtime.go index 646183ef53e8ded44f1bf2d6d63e9fd081f3d941..49d1c456133c1a3e9fe83a20e61035fb1e56c0a7 100644 --- a/server/runtime.go +++ b/server/runtime.go @@ -564,7 +564,7 @@ func CheckRuntime(logger *zap.Logger, config Config) error { return nil } -func NewRuntime(ctx context.Context, logger, startupLogger *zap.Logger, db *sql.DB, protojsonMarshaler *protojson.MarshalOptions, protojsonUnmarshaler *protojson.UnmarshalOptions, config Config, socialClient *social.Client, leaderboardCache LeaderboardCache, leaderboardRankCache LeaderboardRankCache, leaderboardScheduler LeaderboardScheduler, sessionRegistry SessionRegistry, sessionCache SessionCache, matchRegistry MatchRegistry, tracker Tracker, metrics *Metrics, streamManager StreamManager, router MessageRouter) (*Runtime, *RuntimeInfo, error) { +func NewRuntime(ctx context.Context, logger, startupLogger *zap.Logger, db *sql.DB, protojsonMarshaler *protojson.MarshalOptions, protojsonUnmarshaler *protojson.UnmarshalOptions, config Config, socialClient *social.Client, leaderboardCache LeaderboardCache, leaderboardRankCache LeaderboardRankCache, leaderboardScheduler LeaderboardScheduler, sessionRegistry SessionRegistry, sessionCache SessionCache, matchRegistry MatchRegistry, tracker Tracker, metrics Metrics, streamManager StreamManager, router MessageRouter) (*Runtime, *RuntimeInfo, error) { runtimeConfig := config.GetRuntime() startupLogger.Info("Initialising runtime", zap.String("path", runtimeConfig.Path)) diff --git a/server/runtime_event.go b/server/runtime_event.go index be92d08dd9309bc785022a33b20541da732bfab0..f173c515845441f4115de56872c88221b52f8043 100644 --- a/server/runtime_event.go +++ b/server/runtime_event.go @@ -22,7 +22,7 @@ import ( type RuntimeEventQueue struct { logger *zap.Logger - metrics *Metrics + metrics Metrics ch chan func() @@ -30,7 +30,7 @@ type RuntimeEventQueue struct { ctxCancelFn context.CancelFunc } -func NewRuntimeEventQueue(logger *zap.Logger, config Config, metrics *Metrics) *RuntimeEventQueue { +func NewRuntimeEventQueue(logger *zap.Logger, config Config, metrics Metrics) *RuntimeEventQueue { b := &RuntimeEventQueue{ logger: logger, metrics: metrics, diff --git a/server/runtime_go.go b/server/runtime_go.go index 7fb9c69a9e6d714656128a5ee0f3b97aecf78327..566db89bcc091dfc1bdea96a26da827786b9a13e 100644 --- a/server/runtime_go.go +++ b/server/runtime_go.go @@ -2327,7 +2327,7 @@ func (ri *RuntimeGoInitializer) RegisterMatch(name string, fn func(ctx context.C return nil } -func NewRuntimeProviderGo(ctx context.Context, logger, startupLogger *zap.Logger, db *sql.DB, protojsonMarshaler *protojson.MarshalOptions, config Config, socialClient *social.Client, leaderboardCache LeaderboardCache, leaderboardRankCache LeaderboardRankCache, leaderboardScheduler LeaderboardScheduler, sessionRegistry SessionRegistry, sessionCache SessionCache, matchRegistry MatchRegistry, tracker Tracker, metrics *Metrics, streamManager StreamManager, router MessageRouter, rootPath string, paths []string, eventQueue *RuntimeEventQueue, matchProvider *MatchProvider) ([]string, map[string]RuntimeRpcFunction, map[string]RuntimeBeforeRtFunction, map[string]RuntimeAfterRtFunction, *RuntimeBeforeReqFunctions, *RuntimeAfterReqFunctions, RuntimeMatchmakerMatchedFunction, RuntimeTournamentEndFunction, RuntimeTournamentResetFunction, RuntimeLeaderboardResetFunction, *RuntimeEventFunctions, func() []string, error) { +func NewRuntimeProviderGo(ctx context.Context, logger, startupLogger *zap.Logger, db *sql.DB, protojsonMarshaler *protojson.MarshalOptions, config Config, socialClient *social.Client, leaderboardCache LeaderboardCache, leaderboardRankCache LeaderboardRankCache, leaderboardScheduler LeaderboardScheduler, sessionRegistry SessionRegistry, sessionCache SessionCache, matchRegistry MatchRegistry, tracker Tracker, metrics Metrics, streamManager StreamManager, router MessageRouter, rootPath string, paths []string, eventQueue *RuntimeEventQueue, matchProvider *MatchProvider) ([]string, map[string]RuntimeRpcFunction, map[string]RuntimeBeforeRtFunction, map[string]RuntimeAfterRtFunction, *RuntimeBeforeReqFunctions, *RuntimeAfterReqFunctions, RuntimeMatchmakerMatchedFunction, RuntimeTournamentEndFunction, RuntimeTournamentResetFunction, RuntimeLeaderboardResetFunction, *RuntimeEventFunctions, func() []string, error) { runtimeLogger := NewRuntimeGoLogger(logger) node := config.GetName() env := config.GetRuntime().Environment diff --git a/server/runtime_go_nakama.go b/server/runtime_go_nakama.go index b59547fc98a601a2d7dab1d39ecdd8ae6aa65e35..42d28664681f0e407e2549de5ad4bb499e551199 100644 --- a/server/runtime_go_nakama.go +++ b/server/runtime_go_nakama.go @@ -54,7 +54,7 @@ type RuntimeGoNakamaModule struct { sessionCache SessionCache matchRegistry MatchRegistry tracker Tracker - metrics *Metrics + metrics Metrics streamManager StreamManager router MessageRouter @@ -65,7 +65,7 @@ type RuntimeGoNakamaModule struct { matchCreateFn RuntimeMatchCreateFunction } -func NewRuntimeGoNakamaModule(logger *zap.Logger, db *sql.DB, protojsonMarshaler *protojson.MarshalOptions, config Config, socialClient *social.Client, leaderboardCache LeaderboardCache, leaderboardRankCache LeaderboardRankCache, leaderboardScheduler LeaderboardScheduler, sessionRegistry SessionRegistry, sessionCache SessionCache, matchRegistry MatchRegistry, tracker Tracker, metrics *Metrics, streamManager StreamManager, router MessageRouter) *RuntimeGoNakamaModule { +func NewRuntimeGoNakamaModule(logger *zap.Logger, db *sql.DB, protojsonMarshaler *protojson.MarshalOptions, config Config, socialClient *social.Client, leaderboardCache LeaderboardCache, leaderboardRankCache LeaderboardRankCache, leaderboardScheduler LeaderboardScheduler, sessionRegistry SessionRegistry, sessionCache SessionCache, matchRegistry MatchRegistry, tracker Tracker, metrics Metrics, streamManager StreamManager, router MessageRouter) *RuntimeGoNakamaModule { return &RuntimeGoNakamaModule{ logger: logger, db: db, diff --git a/server/runtime_javascript.go b/server/runtime_javascript.go index 355e16e576bab8c89c760b11fafe5e3da6eb88df..c5a39f768c9b49744ee9d94013eba2ce5deb1ece 100644 --- a/server/runtime_javascript.go +++ b/server/runtime_javascript.go @@ -143,7 +143,7 @@ type RuntimeProviderJS struct { maxCount uint32 currentCount *atomic.Uint32 newFn func() *RuntimeJS - metrics *Metrics + metrics Metrics } func (rp *RuntimeProviderJS) Rpc(ctx context.Context, id string, queryParams map[string][]string, userID, username string, vars map[string]string, expiry int64, sessionID, clientIP, clientPort, lang, payload string) (string, error, codes.Code) { @@ -570,7 +570,7 @@ func (rp *RuntimeProviderJS) Put(r *RuntimeJS) { } } -func NewRuntimeProviderJS(logger, startupLogger *zap.Logger, db *sql.DB, protojsonMarshaler *protojson.MarshalOptions, protojsonUnmarshaler *protojson.UnmarshalOptions, config Config, socialClient *social.Client, leaderboardCache LeaderboardCache, leaderboardRankCache LeaderboardRankCache, leaderboardScheduler LeaderboardScheduler, sessionRegistry SessionRegistry, sessionCache SessionCache, matchRegistry MatchRegistry, tracker Tracker, metrics *Metrics, streamManager StreamManager, router MessageRouter, eventFn RuntimeEventCustomFunction, path, entrypoint string, matchProvider *MatchProvider) ([]string, map[string]RuntimeRpcFunction, map[string]RuntimeBeforeRtFunction, map[string]RuntimeAfterRtFunction, *RuntimeBeforeReqFunctions, *RuntimeAfterReqFunctions, RuntimeMatchmakerMatchedFunction, RuntimeTournamentEndFunction, RuntimeTournamentResetFunction, RuntimeLeaderboardResetFunction, error) { +func NewRuntimeProviderJS(logger, startupLogger *zap.Logger, db *sql.DB, protojsonMarshaler *protojson.MarshalOptions, protojsonUnmarshaler *protojson.UnmarshalOptions, config Config, socialClient *social.Client, leaderboardCache LeaderboardCache, leaderboardRankCache LeaderboardRankCache, leaderboardScheduler LeaderboardScheduler, sessionRegistry SessionRegistry, sessionCache SessionCache, matchRegistry MatchRegistry, tracker Tracker, metrics Metrics, streamManager StreamManager, router MessageRouter, eventFn RuntimeEventCustomFunction, path, entrypoint string, matchProvider *MatchProvider) ([]string, map[string]RuntimeRpcFunction, map[string]RuntimeBeforeRtFunction, map[string]RuntimeAfterRtFunction, *RuntimeBeforeReqFunctions, *RuntimeAfterReqFunctions, RuntimeMatchmakerMatchedFunction, RuntimeTournamentEndFunction, RuntimeTournamentResetFunction, RuntimeLeaderboardResetFunction, error) { startupLogger.Info("Initialising JavaScript runtime provider", zap.String("path", path), zap.String("entrypoint", entrypoint)) modCache, err := cacheJavascriptModules(startupLogger, path, entrypoint) diff --git a/server/runtime_lua.go b/server/runtime_lua.go index b0e9d5bc65f53ae115ffcbb08ba656d4a5e7b01c..60f73012c8be463e1432c494cfdd87683f80525c 100644 --- a/server/runtime_lua.go +++ b/server/runtime_lua.go @@ -94,7 +94,7 @@ type RuntimeProviderLua struct { sessionRegistry SessionRegistry matchRegistry MatchRegistry tracker Tracker - metrics *Metrics + metrics Metrics router MessageRouter stdLibs map[string]lua.LGFunction @@ -107,7 +107,7 @@ type RuntimeProviderLua struct { statsCtx context.Context } -func NewRuntimeProviderLua(logger, startupLogger *zap.Logger, db *sql.DB, protojsonMarshaler *protojson.MarshalOptions, protojsonUnmarshaler *protojson.UnmarshalOptions, config Config, socialClient *social.Client, leaderboardCache LeaderboardCache, leaderboardRankCache LeaderboardRankCache, leaderboardScheduler LeaderboardScheduler, sessionRegistry SessionRegistry, sessionCache SessionCache, matchRegistry MatchRegistry, tracker Tracker, metrics *Metrics, streamManager StreamManager, router MessageRouter, eventFn RuntimeEventCustomFunction, rootPath string, paths []string, matchProvider *MatchProvider) ([]string, map[string]RuntimeRpcFunction, map[string]RuntimeBeforeRtFunction, map[string]RuntimeAfterRtFunction, *RuntimeBeforeReqFunctions, *RuntimeAfterReqFunctions, RuntimeMatchmakerMatchedFunction, RuntimeTournamentEndFunction, RuntimeTournamentResetFunction, RuntimeLeaderboardResetFunction, error) { +func NewRuntimeProviderLua(logger, startupLogger *zap.Logger, db *sql.DB, protojsonMarshaler *protojson.MarshalOptions, protojsonUnmarshaler *protojson.UnmarshalOptions, config Config, socialClient *social.Client, leaderboardCache LeaderboardCache, leaderboardRankCache LeaderboardRankCache, leaderboardScheduler LeaderboardScheduler, sessionRegistry SessionRegistry, sessionCache SessionCache, matchRegistry MatchRegistry, tracker Tracker, metrics Metrics, streamManager StreamManager, router MessageRouter, eventFn RuntimeEventCustomFunction, rootPath string, paths []string, matchProvider *MatchProvider) ([]string, map[string]RuntimeRpcFunction, map[string]RuntimeBeforeRtFunction, map[string]RuntimeAfterRtFunction, *RuntimeBeforeReqFunctions, *RuntimeAfterReqFunctions, RuntimeMatchmakerMatchedFunction, RuntimeTournamentEndFunction, RuntimeTournamentResetFunction, RuntimeLeaderboardResetFunction, error) { startupLogger.Info("Initialising Lua runtime provider", zap.String("path", rootPath)) // Load Lua modules into memory by reading the file contents. No evaluation/execution at this stage. diff --git a/server/session_registry.go b/server/session_registry.go index 8d65743b465d1a4f6bdfc12713d44a4e4c29d87c..42bc64c403dd0b7ebd73f8636756150ecdd669a2 100644 --- a/server/session_registry.go +++ b/server/session_registry.go @@ -66,13 +66,13 @@ type SessionRegistry interface { } type LocalSessionRegistry struct { - metrics *Metrics + metrics Metrics sessions *sync.Map sessionCount *atomic.Int32 } -func NewLocalSessionRegistry(metrics *Metrics) SessionRegistry { +func NewLocalSessionRegistry(metrics Metrics) SessionRegistry { return &LocalSessionRegistry{ metrics: metrics, diff --git a/server/session_ws.go b/server/session_ws.go index 0ebdcfb426dc6198588b4e4438acfab3fd7a18b1..f10b39d79f74a00a5ab1bbf831dbcab646445f7b 100644 --- a/server/session_ws.go +++ b/server/session_ws.go @@ -62,7 +62,7 @@ type sessionWS struct { statusRegistry *StatusRegistry matchmaker Matchmaker tracker Tracker - metrics *Metrics + metrics Metrics pipeline *Pipeline runtime *Runtime @@ -74,7 +74,7 @@ type sessionWS struct { outgoingCh chan []byte } -func NewSessionWS(logger *zap.Logger, config Config, format SessionFormat, sessionID, userID uuid.UUID, username string, vars map[string]string, expiry int64, clientIP, clientPort, lang string, protojsonMarshaler *protojson.MarshalOptions, protojsonUnmarshaler *protojson.UnmarshalOptions, conn *websocket.Conn, sessionRegistry SessionRegistry, statusRegistry *StatusRegistry, matchmaker Matchmaker, tracker Tracker, metrics *Metrics, pipeline *Pipeline, runtime *Runtime) Session { +func NewSessionWS(logger *zap.Logger, config Config, format SessionFormat, sessionID, userID uuid.UUID, username string, vars map[string]string, expiry int64, clientIP, clientPort, lang string, protojsonMarshaler *protojson.MarshalOptions, protojsonUnmarshaler *protojson.UnmarshalOptions, conn *websocket.Conn, sessionRegistry SessionRegistry, statusRegistry *StatusRegistry, matchmaker Matchmaker, tracker Tracker, metrics Metrics, pipeline *Pipeline, runtime *Runtime) Session { sessionLogger := logger.With(zap.String("uid", userID.String()), zap.String("sid", sessionID.String())) sessionLogger.Info("New WebSocket session connected", zap.Uint8("format", uint8(format))) diff --git a/server/socket_ws.go b/server/socket_ws.go index e39ec6480257d04f6bb7cc8b0b44c3aac2abaeb2..9b54e62dce332541b7d3dd16c9dd2c4c3697c2af 100644 --- a/server/socket_ws.go +++ b/server/socket_ws.go @@ -25,7 +25,7 @@ import ( "google.golang.org/protobuf/encoding/protojson" ) -func NewSocketWsAcceptor(logger *zap.Logger, config Config, sessionRegistry SessionRegistry, sessionCache SessionCache, statusRegistry *StatusRegistry, matchmaker Matchmaker, tracker Tracker, metrics *Metrics, runtime *Runtime, protojsonMarshaler *protojson.MarshalOptions, protojsonUnmarshaler *protojson.UnmarshalOptions, pipeline *Pipeline) func(http.ResponseWriter, *http.Request) { +func NewSocketWsAcceptor(logger *zap.Logger, config Config, sessionRegistry SessionRegistry, sessionCache SessionCache, statusRegistry *StatusRegistry, matchmaker Matchmaker, tracker Tracker, metrics Metrics, runtime *Runtime, protojsonMarshaler *protojson.MarshalOptions, protojsonUnmarshaler *protojson.UnmarshalOptions, pipeline *Pipeline) func(http.ResponseWriter, *http.Request) { upgrader := &websocket.Upgrader{ ReadBufferSize: config.GetSocket().ReadBufferSizeBytes, WriteBufferSize: config.GetSocket().WriteBufferSizeBytes, diff --git a/server/status_handler.go b/server/status_handler.go index cdc2d53276112254fb7e7172c85369c2c5b07e1f..1a8cf60dac64a5755cdd624d7db581879d0c9380 100644 --- a/server/status_handler.go +++ b/server/status_handler.go @@ -31,11 +31,11 @@ type LocalStatusHandler struct { sessionRegistry SessionRegistry matchRegistry MatchRegistry tracker Tracker - metrics *Metrics + metrics Metrics node string } -func NewLocalStatusHandler(logger *zap.Logger, sessionRegistry SessionRegistry, matchRegistry MatchRegistry, tracker Tracker, metrics *Metrics, node string) StatusHandler { +func NewLocalStatusHandler(logger *zap.Logger, sessionRegistry SessionRegistry, matchRegistry MatchRegistry, tracker Tracker, metrics Metrics, node string) StatusHandler { return &LocalStatusHandler{ logger: logger, sessionRegistry: sessionRegistry, @@ -55,10 +55,10 @@ func (s *LocalStatusHandler) GetStatus(ctx context.Context) ([]*console.StatusLi PresenceCount: int32(s.tracker.Count()), MatchCount: int32(s.matchRegistry.Count()), GoroutineCount: int32(runtime.NumGoroutine()), - AvgLatencyMs: s.metrics.SnapshotLatencyMs.Load(), - AvgRateSec: s.metrics.SnapshotRateSec.Load(), - AvgInputKbs: s.metrics.SnapshotRecvKbSec.Load(), - AvgOutputKbs: s.metrics.SnapshotSentKbSec.Load(), + AvgLatencyMs: s.metrics.SnapshotLatencyMs(), + AvgRateSec: s.metrics.SnapshotRateSec(), + AvgInputKbs: s.metrics.SnapshotRecvKbSec(), + AvgOutputKbs: s.metrics.SnapshotSentKbSec(), }, }, nil } diff --git a/server/tracker.go b/server/tracker.go index 412773065bf6b2a59c1de77f46ba271a1b4a3f9d..b691143794d0ed68a268d840b916bb05bac1dc1e 100644 --- a/server/tracker.go +++ b/server/tracker.go @@ -181,7 +181,7 @@ type LocalTracker struct { partyLeaveListener func(id uuid.UUID, leaves []*Presence) sessionRegistry SessionRegistry statusRegistry *StatusRegistry - metrics *Metrics + metrics Metrics protojsonMarshaler *protojson.MarshalOptions name string eventsCh chan *PresenceEvent @@ -193,7 +193,7 @@ type LocalTracker struct { ctxCancelFn context.CancelFunc } -func StartLocalTracker(logger *zap.Logger, config Config, sessionRegistry SessionRegistry, statusRegistry *StatusRegistry, metrics *Metrics, protojsonMarshaler *protojson.MarshalOptions) Tracker { +func StartLocalTracker(logger *zap.Logger, config Config, sessionRegistry SessionRegistry, statusRegistry *StatusRegistry, metrics Metrics, protojsonMarshaler *protojson.MarshalOptions) Tracker { ctx, ctxCancelFn := context.WithCancel(context.Background()) t := &LocalTracker{