Commit f4cfa6a6 authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Expose more metrics for socket activity. (#476)

parent 8a03ecf8
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -6,6 +6,7 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr
## [Unreleased]
### Added
- Event contexts now contain user information for external events.
- Expose more metrics for socket activity.
- New [Docker release](https://hub.docker.com/repository/docker/heroiclabs/nakama-dsym) of the server with debug symbols enabled.

### Fixed
+44 −0
Original line number Diff line number Diff line
@@ -165,8 +165,11 @@ func (m *Metrics) Api(name string, elapsed time.Duration, recvBytes, sentBytes i

	// Global stats.
	m.prometheusScope.Counter("overall_count").Inc(1)
	m.prometheusScope.Counter("overall_request_count").Inc(1)
	m.prometheusScope.Counter("overall_recv_bytes").Inc(recvBytes)
	m.prometheusScope.Counter("overall_request_recv_bytes").Inc(recvBytes)
	m.prometheusScope.Counter("overall_sent_bytes").Inc(sentBytes)
	m.prometheusScope.Counter("overall_request_sent_bytes").Inc(sentBytes)
	m.prometheusScope.Timer("overall_latency_ms").Record(elapsed / time.Millisecond)

	// Per-endpoint stats.
@@ -178,6 +181,7 @@ func (m *Metrics) Api(name string, elapsed time.Duration, recvBytes, sentBytes i
	// Error stats if applicable.
	if isErr {
		m.prometheusScope.Counter("overall_errors").Inc(1)
		m.prometheusScope.Counter("overall_request_errors").Inc(1)
		m.prometheusScope.Counter(name + "_errors").Inc(1)
	}
}
@@ -218,6 +222,46 @@ func (m *Metrics) ApiAfter(name string, elapsed time.Duration, isErr bool) {
	}
}

func (m *Metrics) Message(recvBytes int64, isErr bool) {
	//name = strings.TrimPrefix(name, API_PREFIX)

	// Increment ongoing statistics for current measurement window.
	//m.currentMsTotal.Add(int64(elapsed / time.Millisecond))
	m.currentReqCount.Inc()
	m.currentRecvBytes.Add(recvBytes)
	//m.currentSentBytes.Add(sentBytes)

	// Global stats.
	m.prometheusScope.Counter("overall_count").Inc(1)
	m.prometheusScope.Counter("overall_message_count").Inc(1)
	m.prometheusScope.Counter("overall_recv_bytes").Inc(recvBytes)
	m.prometheusScope.Counter("overall_message_recv_bytes").Inc(recvBytes)
	//m.prometheusScope.Counter("overall_sent_bytes").Inc(sentBytes)
	//m.prometheusScope.Timer("overall_latency_ms").Record(elapsed / time.Millisecond)

	// Per-message stats.
	//m.prometheusScope.Counter(name + "_count").Inc(1)
	//m.prometheusScope.Counter(name + "_recv_bytes").Inc(recvBytes)
	//m.prometheusScope.Counter(name + "_sent_bytes").Inc(sentBytes)
	//m.prometheusScope.Timer(name + "_latency_ms").Record(elapsed / time.Millisecond)

	// Error stats if applicable.
	if isErr {
		m.prometheusScope.Counter("overall_errors").Inc(1)
		m.prometheusScope.Counter("overall_message_errors").Inc(1)
		//m.prometheusScope.Counter(name + "_errors").Inc(1)
	}
}

func (m *Metrics) MessageBytesSent(sentBytes int64) {
	// Increment ongoing statistics for current measurement window.
	m.currentSentBytes.Add(sentBytes)

	// Global stats.
	m.prometheusScope.Counter("overall_sent_bytes").Inc(sentBytes)
	m.prometheusScope.Counter("overall_message_sent_bytes").Inc(sentBytes)
}

// Set the absolute value of currently allocated Lua runtime VMs.
func (m *Metrics) GaugeRuntimes(value float64) {
	m.prometheusScope.Gauge("lua_runtimes").Update(value)
+1 −1
Original line number Diff line number Diff line
@@ -138,7 +138,7 @@ func (p *Pipeline) ProcessRequest(logger *zap.Logger, session Session, envelope
				}}}, true)
				return true
			} else if hookResult == nil {
				// if result is nil, requested resource is disabled. Sessions calling disabled resources will be close.
				// If result is nil, requested resource is disabled. Sessions calling disabled resources will be closed.
				logger.Warn("Intercepted a disabled resource.", zap.String("resource", messageName))
				session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
					Code:    int32(rtapi.Error_UNRECOGNIZED_PAYLOAD),
+16 −1
Original line number Diff line number Diff line
@@ -62,6 +62,7 @@ type sessionWS struct {
	sessionRegistry SessionRegistry
	matchmaker      Matchmaker
	tracker         Tracker
	metrics         *Metrics
	pipeline        *Pipeline
	runtime         *Runtime

@@ -73,7 +74,7 @@ type sessionWS struct {
	outgoingCh             chan []byte
}

func NewSessionWS(logger *zap.Logger, config Config, format SessionFormat, sessionID, userID uuid.UUID, username string, vars map[string]string, expiry int64, clientIP string, clientPort string, jsonpbMarshaler *jsonpb.Marshaler, jsonpbUnmarshaler *jsonpb.Unmarshaler, conn *websocket.Conn, sessionRegistry SessionRegistry, matchmaker Matchmaker, tracker Tracker, pipeline *Pipeline, runtime *Runtime) Session {
func NewSessionWS(logger *zap.Logger, config Config, format SessionFormat, sessionID, userID uuid.UUID, username string, vars map[string]string, expiry int64, clientIP string, clientPort string, jsonpbMarshaler *jsonpb.Marshaler, jsonpbUnmarshaler *jsonpb.Unmarshaler, conn *websocket.Conn, sessionRegistry SessionRegistry, matchmaker Matchmaker, tracker Tracker, metrics *Metrics, pipeline *Pipeline, runtime *Runtime) Session {
	sessionLogger := logger.With(zap.String("uid", userID.String()), zap.String("sid", sessionID.String()))

	sessionLogger.Info("New WebSocket session connected", zap.Uint8("format", uint8(format)))
@@ -110,6 +111,7 @@ func NewSessionWS(logger *zap.Logger, config Config, format SessionFormat, sessi
		sessionRegistry: sessionRegistry,
		matchmaker:      matchmaker,
		tracker:         tracker,
		metrics:         metrics,
		pipeline:        pipeline,
		runtime:         runtime,

@@ -183,6 +185,7 @@ func (s *sessionWS) Consume() {
	go s.processOutgoing()

	var reason string
	var data []byte

IncomingLoop:
	for {
@@ -211,6 +214,7 @@ IncomingLoop:
			s.receivedMessageCounter = s.config.GetSocket().PingBackoffThreshold
			if !s.maybeResetPingTimer() {
				// Problems resetting the ping timer indicate an error so we need to close the loop.
				reason = "error updating ping timer"
				break
			}
		}
@@ -244,6 +248,14 @@ IncomingLoop:
				break IncomingLoop
			}
		}

		// Update incoming message metrics.
		s.metrics.Message(int64(len(data)), false)
	}

	if reason != "" {
		// Update incoming message metrics.
		s.metrics.Message(int64(len(data)), true)
	}

	s.Close(reason)
@@ -317,6 +329,9 @@ OutgoingLoop:
				break OutgoingLoop
			}
			s.Unlock()

			// Update outgoing message metrics.
			s.metrics.MessageBytesSent(int64(len(payload)))
		}
	}

+4 −6
Original line number Diff line number Diff line
@@ -15,17 +15,15 @@
package server

import (
	"context"
	"net"
	"net/http"

	"github.com/gofrs/uuid"
	"github.com/golang/protobuf/jsonpb"
	"github.com/gorilla/websocket"
	"go.uber.org/zap"
	"net"
	"net/http"
)

var SocketWsStatsCtx = context.Background()

func NewSocketWsAcceptor(logger *zap.Logger, config Config, sessionRegistry SessionRegistry, matchmaker Matchmaker, tracker Tracker, metrics *Metrics, runtime *Runtime, jsonpbMarshaler *jsonpb.Marshaler, jsonpbUnmarshaler *jsonpb.Unmarshaler, pipeline *Pipeline) func(http.ResponseWriter, *http.Request) {
	upgrader := &websocket.Upgrader{
		ReadBufferSize:  config.GetSocket().ReadBufferSizeBytes,
@@ -88,7 +86,7 @@ func NewSocketWsAcceptor(logger *zap.Logger, config Config, sessionRegistry Sess
		metrics.CountWebsocketOpened(1)

		// Wrap the connection for application handling.
		session := NewSessionWS(logger, config, format, sessionID, userID, username, vars, expiry, clientIP, clientPort, jsonpbMarshaler, jsonpbUnmarshaler, conn, sessionRegistry, matchmaker, tracker, pipeline, runtime)
		session := NewSessionWS(logger, config, format, sessionID, userID, username, vars, expiry, clientIP, clientPort, jsonpbMarshaler, jsonpbUnmarshaler, conn, sessionRegistry, matchmaker, tracker, metrics, pipeline, runtime)

		// Add to the session registry.
		sessionRegistry.Add(session)