Commit 2a0c1564 authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Improve metric names for existing non-RPC aggregate data.

parent 7cc1329e
Loading
Loading
Loading
Loading
+23 −12
Original line number Diff line number Diff line
@@ -37,6 +37,8 @@ import (
	"github.com/satori/go.uuid"
	"go.opencensus.io/plugin/ocgrpc"
	"go.opencensus.io/plugin/ochttp"
	"go.opencensus.io/stats"
	"go.opencensus.io/tag"
	"go.opencensus.io/trace"
	"go.uber.org/zap"
	"golang.org/x/net/context"
@@ -146,16 +148,6 @@ func StartApiServer(logger *zap.Logger, startupLogger *zap.Logger, db *sql.DB, j
	// Special case routes. Do NOT enable compression on WebSocket route, it results in "http: response.Write on hijacked connection" errors.
	grpcGatewayRouter.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(200) }).Methods("GET")
	grpcGatewayRouter.HandleFunc("/ws", NewSocketWsAcceptor(logger, config, sessionRegistry, matchmaker, tracker, jsonpbMarshaler, jsonpbUnmarshaler, pipeline)).Methods("GET")
	// TODO restore when admin endpoints are available.
	//grpcGatewayRouter.HandleFunc("/rpcz", func(w http.ResponseWriter, r *http.Request) {
	//	zpages.Handler.ServeHTTP(w, r)
	//})
	//grpcGatewayRouter.HandleFunc("/tracez", func(w http.ResponseWriter, r *http.Request) {
	//	zpages.Handler.ServeHTTP(w, r)
	//})
	//grpcGatewayRouter.HandleFunc("/public/", func(w http.ResponseWriter, r *http.Request) {
	//	zpages.Handler.ServeHTTP(w, r)
	//})

	// Enable stats recording on all request paths except:
	// "/" is not tracked at all.
@@ -254,9 +246,19 @@ func apiInterceptorFunc(logger *zap.Logger, config Config, runtimePool *RuntimeP
			methodName = parts[2]
		}

		span := trace.NewSpan(fmt.Sprintf("nakama.api-before.Nakama.%v", methodName), nil, trace.StartOptions{})
		// Stats measurement start boundary.
		name := fmt.Sprintf("nakama.api-before.Nakama.%v", methodName)
		statsCtx, _ := tag.New(context.Background(), tag.Upsert(MetricsFunction, name))
		startNanos := time.Now().UTC().UnixNano()
		span := trace.NewSpan(name, nil, trace.StartOptions{})

		// Actual before hook function execution.
		beforeHookResult, hookErr := invokeReqBeforeHook(logger, config, runtimePool, jsonpbMarshaler, jsonpbUnmarshaler, "", uid, username, expiry, info.FullMethod, req)

		// Stats measurement end boundary.
		span.End()
		stats.Record(statsCtx, MetricsApiTimeSpentMsec.M(float64(time.Now().UTC().UnixNano()-startNanos)/1000), MetricsApiCount.M(1))

		if hookErr != nil {
			return nil, hookErr
		} else if beforeHookResult == nil {
@@ -270,9 +272,18 @@ func apiInterceptorFunc(logger *zap.Logger, config Config, runtimePool *RuntimeP

		handlerResult, handlerErr := handler(ctx, beforeHookResult)
		if handlerErr == nil {
			span := trace.NewSpan(fmt.Sprintf("nakama.api-after.Nakama.%v", methodName), nil, trace.StartOptions{})
			// Stats measurement start boundary.
			name := fmt.Sprintf("nakama.api-after.Nakama.%v", methodName)
			statsCtx, _ := tag.New(context.Background(), tag.Upsert(MetricsFunction, name))
			startNanos := time.Now().UTC().UnixNano()
			span := trace.NewSpan(name, nil, trace.StartOptions{})

			// Actual after hook function execution.
			invokeReqAfterHook(logger, config, runtimePool, jsonpbMarshaler, "", uid, username, expiry, info.FullMethod, handlerResult)

			// Stats measurement end boundary.
			span.End()
			stats.Record(statsCtx, MetricsApiTimeSpentMsec.M(float64(time.Now().UTC().UnixNano()-startNanos)/1000), MetricsApiCount.M(1))
		}
		return handlerResult, handlerErr
	}
+10 −0
Original line number Diff line number Diff line
@@ -27,6 +27,7 @@ import (
	"github.com/grpc-ecosystem/grpc-gateway/runtime"
	"github.com/heroiclabs/nakama/console"
	"go.opencensus.io/plugin/ocgrpc"
	"go.opencensus.io/zpages"
	"go.uber.org/zap"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
@@ -86,6 +87,15 @@ func StartConsoleServer(logger *zap.Logger, startupLogger *zap.Logger, config Co
	grpcGatewayRouter := mux.NewRouter()
	//TODO server HTML content here.
	grpcGatewayRouter.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(200) }).Methods("GET")
	grpcGatewayRouter.HandleFunc("/rpcz", func(w http.ResponseWriter, r *http.Request) {
		zpages.Handler.ServeHTTP(w, r)
	})
	grpcGatewayRouter.HandleFunc("/tracez", func(w http.ResponseWriter, r *http.Request) {
		zpages.Handler.ServeHTTP(w, r)
	})
	grpcGatewayRouter.HandleFunc("/public/", func(w http.ResponseWriter, r *http.Request) {
		zpages.Handler.ServeHTTP(w, r)
	})
	// Enable compression on gateway responses.
	handlerWithGzip := handlers.CompressHandler(grpcGateway)
	grpcGatewayRouter.NewRoute().Handler(handlerWithGzip)
+93 −3
Original line number Diff line number Diff line
@@ -22,20 +22,112 @@ import (

	"github.com/gorilla/handlers"
	"github.com/gorilla/mux"
	"github.com/prometheus/client_golang/prometheus"
	ocprometheus "go.opencensus.io/exporter/prometheus"
	"go.opencensus.io/exporter/stackdriver"
	"go.opencensus.io/plugin/ocgrpc"
	"go.opencensus.io/stats"
	"go.opencensus.io/stats/view"
	"go.opencensus.io/tag"
	"go.uber.org/zap"
)

var (
	// Metrics stats measurements.
	MetricsRuntimeCount, _          = stats.Int64("nakama/runtime/count", "Number of pooled runtime instances", stats.UnitNone)
	MetricsSocketWsTimeSpentMsec, _ = stats.Float64("nakama.socket/ws/server_elapsed_time", "Elapsed time in msecs spent in WebSocket connections", stats.UnitMilliseconds)
	MetricsSocketWsOpenCount, _     = stats.Int64("nakama.socket/ws/open_count", "Number of opened WebSocket connections", stats.UnitNone)
	MetricsSocketWsCloseCount, _    = stats.Int64("nakama.socket/ws/close_count", "Number of closed WebSocket connections", stats.UnitNone)
	MetricsApiTimeSpentMsec, _      = stats.Float64("nakama.api/server/server_elapsed_time", "Elapsed time in msecs spent in API functions", stats.UnitMilliseconds)
	MetricsApiCount, _              = stats.Int64("nakama.api/server/request_count", "Number of calls to API functions", stats.UnitNone)
	MetricsRtapiTimeSpentMsec, _    = stats.Float64("nakama.rtapi/server/server_elapsed_time", "Elapsed time in msecs spent in realtime socket functions", stats.UnitMilliseconds)
	MetricsRtapiCount, _            = stats.Int64("nakama.rtapi/server/request_count", "Number of calls to realtime socket functions", stats.UnitNone)

	// Metrics stats tag keys.
	MetricsFunction, _ = tag.NewKey("function")
)

type Metrics struct {
	prometheusHTTPServer *http.Server
}

func NewMetrics(logger, startupLogger *zap.Logger, config Config) *Metrics {
	m := &Metrics{}

	if err := view.Subscribe(&view.View{
		Name:        "nakama/runtime/count",
		Description: "Number of pooled runtime instances",
		TagKeys:     []tag.Key{},
		Measure:     MetricsRuntimeCount,
		Aggregation: view.Count(),
	}); err != nil {
		startupLogger.Fatal("Error subscribing runtime count metrics view", zap.Error(err))
	}
	if err := view.Subscribe(&view.View{
		Name:        "nakama.socket/ws/server_elapsed_time",
		Description: "Elapsed time in msecs spent in WebSocket connections",
		TagKeys:     []tag.Key{},
		Measure:     MetricsSocketWsTimeSpentMsec,
		Aggregation: ocgrpc.DefaultMillisecondsDistribution,
	}); err != nil {
		startupLogger.Fatal("Error subscribing socket ws elapsed time metrics view", zap.Error(err))
	}
	if err := view.Subscribe(&view.View{
		Name:        "nakama.socket/ws/open_count",
		Description: "Number of opened WebSocket connections",
		TagKeys:     []tag.Key{},
		Measure:     MetricsSocketWsOpenCount,
		Aggregation: view.Count(),
	}); err != nil {
		startupLogger.Fatal("Error subscribing socket ws opened count metrics view", zap.Error(err))
	}
	if err := view.Subscribe(&view.View{
		Name:        "nakama.socket/ws/close_count",
		Description: "Number of closed WebSocket connections",
		TagKeys:     []tag.Key{},
		Measure:     MetricsSocketWsCloseCount,
		Aggregation: view.Count(),
	}); err != nil {
		startupLogger.Fatal("Error subscribing socket ws count metrics view", zap.Error(err))
	}
	if err := view.Subscribe(&view.View{
		Name:        "nakama.api/server/server_elapsed_time",
		Description: "Elapsed time in msecs spent in API functions",
		TagKeys:     []tag.Key{MetricsFunction},
		Measure:     MetricsApiTimeSpentMsec,
		Aggregation: ocgrpc.DefaultMillisecondsDistribution,
	}); err != nil {
		startupLogger.Fatal("Error subscribing api elapsed time metrics view", zap.Error(err))
	}
	if err := view.Subscribe(&view.View{
		Name:        "nakama.api/server/request_count",
		Description: "Number of calls to API functions",
		TagKeys:     []tag.Key{MetricsFunction},
		Measure:     MetricsApiCount,
		Aggregation: view.Count(),
	}); err != nil {
		startupLogger.Fatal("Error subscribing api request count metrics view", zap.Error(err))
	}
	if err := view.Subscribe(&view.View{
		Name:        "nakama.rtapi/server/server_elapsed_time",
		Description: "Elapsed time in msecs spent in realtime socket functions",
		TagKeys:     []tag.Key{MetricsFunction},
		Measure:     MetricsRtapiTimeSpentMsec,
		Aggregation: ocgrpc.DefaultMillisecondsDistribution,
	}); err != nil {
		startupLogger.Fatal("Error subscribing rtapi elapsed time metrics view", zap.Error(err))
	}
	if err := view.Subscribe(&view.View{
		Name:        "nakama.rtapi/server/request_count",
		Description: "Number of calls to realtime socket functions",
		TagKeys:     []tag.Key{MetricsFunction},
		Measure:     MetricsRtapiCount,
		Aggregation: view.Count(),
	}); err != nil {
		startupLogger.Fatal("Error subscribing rtapi request count metrics view", zap.Error(err))
	}

	view.SetReportingPeriod(time.Duration(config.GetMetrics().ReportingFreqSec) * time.Second)

	if config.GetMetrics().StackdriverProjectID != "" {
		m.initStackdriver(logger, startupLogger, config)
	}
@@ -72,10 +164,8 @@ func (m *Metrics) initPrometheus(logger, startupLogger *zap.Logger, config Confi
		prefix += "-" + config.GetMetrics().Namespace
	}

	registry := prometheus.NewRegistry()
	exporter, err := ocprometheus.NewExporter(ocprometheus.Options{
		Namespace: prefix,
		Registry:  registry,
		OnError: func(err error) {
			logger.Error("Could not upload data to Prometheus", zap.Error(err))
		},
+36 −4
Original line number Diff line number Diff line
@@ -18,10 +18,15 @@ import (
	"database/sql"
	"fmt"

	"context"
	"github.com/golang/protobuf/jsonpb"
	"github.com/heroiclabs/nakama/rtapi"
	"go.opencensus.io/stats"
	"go.opencensus.io/tag"
	"go.opencensus.io/trace"
	"go.uber.org/zap"
	"strings"
	"time"
)

type Pipeline struct {
@@ -106,7 +111,7 @@ func (p *Pipeline) ProcessRequest(logger *zap.Logger, session Session, envelope
		pipelineName = "matchmakerRemove"
	case *rtapi.Envelope_Rpc:
		pipelineFn = p.rpc
		pipelineName = fmt.Sprintf("rpc.%v", envelope.GetRpc().Id)
		pipelineName = fmt.Sprintf("rpc.%v", strings.ToLower(envelope.GetRpc().Id))
	case *rtapi.Envelope_StatusFollow:
		pipelineFn = p.statusFollow
		pipelineName = "statusFollow"
@@ -135,9 +140,18 @@ func (p *Pipeline) ProcessRequest(logger *zap.Logger, session Session, envelope
	default:
		messageName = fmt.Sprintf("%T", envelope.Message)

		span := trace.NewSpan(fmt.Sprintf("nakama.rtapi-before.%v", pipelineName), nil, trace.StartOptions{})
		// Stats measurement start boundary.
		name := fmt.Sprintf("nakama.rtapi-before.%v", pipelineName)
		statsCtx, _ := tag.New(context.Background(), tag.Upsert(MetricsFunction, name))
		startNanos := time.Now().UTC().UnixNano()
		span := trace.NewSpan(name, nil, trace.StartOptions{})

		// Actual before hook function execution.
		hookResult, hookErr := invokeReqBeforeHook(logger, p.config, p.runtimePool, p.jsonpbMarshaler, p.jsonpbUnmarshaler, session.ID().String(), session.UserID(), session.Username(), session.Expiry(), messageName, envelope)

		// Stats measurement end boundary.
		span.End()
		stats.Record(statsCtx, MetricsRtapiTimeSpentMsec.M(float64(time.Now().UTC().UnixNano()-startNanos)/1000), MetricsRtapiCount.M(1))

		if hookErr != nil {
			session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
@@ -167,14 +181,32 @@ func (p *Pipeline) ProcessRequest(logger *zap.Logger, session Session, envelope
		envelope = resultCast
	}

	span := trace.NewSpan(fmt.Sprintf("nakama.rtapi.%v", pipelineName), nil, trace.StartOptions{})
	// Stats measurement start boundary.
	name := fmt.Sprintf("nakama.rtapi.%v", pipelineName)
	statsCtx, _ := tag.New(context.Background(), tag.Upsert(MetricsFunction, name))
	startNanos := time.Now().UTC().UnixNano()
	span := trace.NewSpan(name, nil, trace.StartOptions{})

	// Actual function execution.
	pipelineFn(logger, session, envelope)

	// Stats measurement end boundary.
	span.End()
	stats.Record(statsCtx, MetricsRtapiTimeSpentMsec.M(float64(time.Now().UTC().UnixNano()-startNanos)/1000), MetricsRtapiCount.M(1))

	if messageName != "" {
		span := trace.NewSpan(fmt.Sprintf("nakama.rtapi-after.%v", pipelineName), nil, trace.StartOptions{})
		// Stats measurement start boundary.
		name := fmt.Sprintf("nakama.rtapi-after.%v", pipelineName)
		statsCtx, _ := tag.New(context.Background(), tag.Upsert(MetricsFunction, name))
		startNanos := time.Now().UTC().UnixNano()
		span := trace.NewSpan(name, nil, trace.StartOptions{})

		// Actual after hook function execution.
		invokeReqAfterHook(logger, p.config, p.runtimePool, p.jsonpbMarshaler, session.ID().String(), session.UserID(), session.Username(), session.Expiry(), messageName, envelope)

		// Stats measurement end boundary.
		span.End()
		stats.Record(statsCtx, MetricsRtapiTimeSpentMsec.M(float64(time.Now().UTC().UnixNano()-startNanos)/1000), MetricsRtapiCount.M(1))
	}

	return true
+4 −11
Original line number Diff line number Diff line
@@ -57,15 +57,9 @@ type RuntimePool struct {
	newFn        func() *Runtime

	statsCtx context.Context
	statsRuntimeCount *stats.Int64Measure
}

func NewRuntimePool(logger, startupLogger *zap.Logger, db *sql.DB, config Config, socialClient *social.Client, leaderboardCache LeaderboardCache, sessionRegistry *SessionRegistry, matchRegistry MatchRegistry, tracker Tracker, router MessageRouter, stdLibs map[string]lua.LGFunction, moduleCache *ModuleCache, regCallbacks *RegCallbacks, once *sync.Once) *RuntimePool {
	statsRuntimeCount, err := stats.Int64("nakama.runtime.count", "Number of pooled runtime instances.", stats.UnitNone)
	if err != nil {
		startupLogger.Fatal("Error creating stats entry for runtime count", zap.Error(err))
	}

	rp := &RuntimePool{
		logger:       logger,
		regCallbacks: regCallbacks,
@@ -82,7 +76,6 @@ func NewRuntimePool(logger, startupLogger *zap.Logger, db *sql.DB, config Config
			return r
		},
		statsCtx: context.Background(),
		statsRuntimeCount: statsRuntimeCount,
	}

	// Warm up the pool.
@@ -92,7 +85,7 @@ func NewRuntimePool(logger, startupLogger *zap.Logger, db *sql.DB, config Config
		for i := 0; i < config.GetRuntime().MinCount; i++ {
			rp.poolCh <- rp.newFn()
		}
		stats.Record(rp.statsCtx, rp.statsRuntimeCount.M(int64(config.GetRuntime().MinCount)))
		stats.Record(rp.statsCtx, MetricsRuntimeCount.M(int64(config.GetRuntime().MinCount)))
	}
	startupLogger.Info("Allocated minimum runtime pool")

@@ -138,7 +131,7 @@ func (rp *RuntimePool) Get() *Runtime {
			// Allocate a new runtime.
			rp.currentCount++
			rp.Unlock()
			stats.Record(rp.statsCtx, rp.statsRuntimeCount.M(1))
			stats.Record(rp.statsCtx, MetricsRuntimeCount.M(1))
			return rp.newFn()
		}
	}
Loading