Commit b5082743 authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Mark authoritative match messages with a receive timestamp. (#210)

parent fe8a589e
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -10,6 +10,8 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr
- New runtime pool min/max size options.
- New user ban and unban functions.
- RPC functions triggered by HTTP GET requests now expose any custom query parameters.
- Authoritative match messages now carry a receive timestamp field.
- Track additional metrics for function calls, before/after hooks, and internal components.

### Changed
- The avatar URL fields in various domain objects now support up to 512 characters for FBIG.
+31 −4
Original line number Diff line number Diff line
@@ -36,6 +36,8 @@ import (
	"github.com/heroiclabs/nakama/social"
	"github.com/satori/go.uuid"
	"go.opencensus.io/plugin/ocgrpc"
	"go.opencensus.io/plugin/ochttp"
	"go.opencensus.io/trace"
	"go.uber.org/zap"
	"golang.org/x/net/context"
	"google.golang.org/grpc"
@@ -129,6 +131,7 @@ func StartApiServer(logger *zap.Logger, startupLogger *zap.Logger, db *sql.DB, j
	dialOpts := []grpc.DialOption{
		//TODO (mo, zyro): Do we need to pass the statsHandler here as well?
		grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(int(config.GetSocket().MaxMessageSizeBytes))),
		grpc.WithStatsHandler(&ocgrpc.ClientHandler{}),
	}
	if config.GetSocket().TLSCert != nil {
		dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewServerTLSFromCert(&config.GetSocket().TLSCert[0])))
@@ -144,14 +147,28 @@ func StartApiServer(logger *zap.Logger, startupLogger *zap.Logger, db *sql.DB, j
	grpcGatewayRouter.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(200) }).Methods("GET")
	grpcGatewayRouter.HandleFunc("/ws", NewSocketWsAcceptor(logger, config, sessionRegistry, matchmaker, tracker, jsonpbMarshaler, jsonpbUnmarshaler, pipeline)).Methods("GET")
	// TODO restore when admin endpoints are available.
	//grpcGatewayRouter.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
	//	w.Header().Set("Content-Type", "text/html; charset=utf-8")
	//	zpages.WriteHTMLRpczPage(w)
	//grpcGatewayRouter.HandleFunc("/rpcz", func(w http.ResponseWriter, r *http.Request) {
	//	zpages.Handler.ServeHTTP(w, r)
	//})
	//grpcGatewayRouter.HandleFunc("/tracez", func(w http.ResponseWriter, r *http.Request) {
	//	zpages.Handler.ServeHTTP(w, r)
	//})
	//grpcGatewayRouter.HandleFunc("/public/", func(w http.ResponseWriter, r *http.Request) {
	//	zpages.Handler.ServeHTTP(w, r)
	//})

	// Enable stats recording on all request paths except:
	// "/" is not tracked at all.
	// "/ws" implements its own separate tracking.
	handlerWithStats := &ochttp.Handler{
		Handler:          grpcGateway,
		IsPublicEndpoint: true,
	}

	// Default to passing request to GRPC Gateway.
	// Enable max size check on requests coming arriving the gateway.
	// Enable compression on responses sent by the gateway.
	handlerWithGzip := handlers.CompressHandler(grpcGateway)
	handlerWithGzip := handlers.CompressHandler(handlerWithStats)
	maxMessageSizeBytes := config.GetSocket().MaxMessageSizeBytes
	handlerWithMaxBody := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		// Check max body size before decompressing incoming request body.
@@ -227,7 +244,15 @@ func apiInterceptorFunc(logger *zap.Logger, config Config, runtimePool *RuntimeP
			expiry = ctx.Value(ctxExpiryKey{}).(int64)
		}

		// Method name to use for before/after stats.
		var methodName string
		if parts := strings.SplitN(info.FullMethod, "/", 3); len(parts) == 3 {
			methodName = parts[2]
		}

		span := trace.NewSpan(fmt.Sprintf("nakama.api-before.Nakama.%v", methodName), nil, trace.StartOptions{})
		beforeHookResult, hookErr := invokeReqBeforeHook(logger, config, runtimePool, jsonpbMarshaler, jsonpbUnmarshaler, "", uid, username, expiry, info.FullMethod, req)
		span.End()
		if hookErr != nil {
			return nil, hookErr
		} else if beforeHookResult == nil {
@@ -241,7 +266,9 @@ func apiInterceptorFunc(logger *zap.Logger, config Config, runtimePool *RuntimeP

		handlerResult, handlerErr := handler(ctx, beforeHookResult)
		if handlerErr == nil {
			span := trace.NewSpan(fmt.Sprintf("nakama.api-after.Nakama.%v", methodName), nil, trace.StartOptions{})
			invokeReqAfterHook(logger, config, runtimePool, jsonpbMarshaler, "", uid, username, expiry, info.FullMethod, handlerResult)
			span.End()
		}
		return handlerResult, handlerErr
	}
+22 −12
Original line number Diff line number Diff line
@@ -105,6 +105,12 @@ func ParseArgs(logger *zap.Logger, args []string) Config {
	if mainConfig.GetRuntime().MinCount > mainConfig.GetRuntime().MaxCount {
		logger.Fatal("Minimum runtime instance count must be less than or equal to maximum runtime instance count", zap.Int("runtime.min_count", mainConfig.GetRuntime().MinCount), zap.Int("runtime.max_count", mainConfig.GetRuntime().MaxCount))
	}
	if mainConfig.GetRuntime().CallStackSize < 1 {
		logger.Fatal("Runtime instance call stack size must be >= 1", zap.Int("runtime.call_stack_size", mainConfig.GetRuntime().CallStackSize))
	}
	if mainConfig.GetRuntime().RegistrySize < 128 {
		logger.Fatal("Runtime instance registry size must be >= 128", zap.Int("runtime.registry_size", mainConfig.GetRuntime().RegistrySize))
	}

	// If the runtime path is not overridden, set it to `datadir/modules`.
	if mainConfig.GetRuntime().Path == "" {
@@ -374,7 +380,9 @@ type RuntimeConfig struct {
	Path          string   `yaml:"path" json:"path" usage:"Path for the server to scan for *.lua files."`
	HTTPKey       string   `yaml:"http_key" json:"http_key" usage:"Runtime HTTP Invocation key."`
	MinCount      int      `yaml:"min_count" json:"min_count" usage:"Minimum number of runtime instances to allocate. Default 16."`
	MaxCount    int      `yaml:"max_count" json:"max_count" usage:"Maximum number of runtime instances to allocate. Default 65536."`
	MaxCount      int      `yaml:"max_count" json:"max_count" usage:"Maximum number of runtime instances to allocate. Default 256."`
	CallStackSize int      `yaml:"call_stack_size" json:"call_stack_size" usage:"Size of each runtime instance's call stack. Default 128."`
	RegistrySize  int      `yaml:"registry_size" json:"registry_size" usage:"Size of each runtime instance's registry. Default 512."`
}

// NewRuntimeConfig creates a new RuntimeConfig struct.
@@ -385,7 +393,9 @@ func NewRuntimeConfig() *RuntimeConfig {
		Path:          "",
		HTTPKey:       "defaultkey",
		MinCount:      16,
		MaxCount:    65536,
		MaxCount:      256,
		CallStackSize: 128,
		RegistrySize:  512,
	}
}

+11 −9
Original line number Diff line number Diff line
@@ -40,6 +40,7 @@ type MatchDataMessage struct {
	Node        string
	OpCode      int64
	Data        []byte
	ReceiveTime int64
}

type MatchHandler struct {
@@ -83,8 +84,8 @@ type MatchHandler struct {
func NewMatchHandler(logger *zap.Logger, db *sql.DB, config Config, socialClient *social.Client, leaderboardCache LeaderboardCache, sessionRegistry *SessionRegistry, matchRegistry MatchRegistry, tracker Tracker, router MessageRouter, stdLibs map[string]lua.LGFunction, once *sync.Once, id uuid.UUID, node string, name string, params interface{}) (*MatchHandler, error) {
	// Set up the Lua VM that will handle this match.
	vm := lua.NewState(lua.Options{
		CallStackSize:       1024,
		RegistrySize:        1024,
		CallStackSize:       config.GetRuntime().CallStackSize,
		RegistrySize:        config.GetRuntime().RegistrySize,
		SkipOpenLibs:        true,
		IncludeGoStackTrace: true,
	})
@@ -326,7 +327,7 @@ func loop(mh *MatchHandler) {
		presence.RawSetString("username", lua.LString(msg.Username))
		presence.RawSetString("node", lua.LString(msg.Node))

		in := mh.vm.CreateTable(0, 3)
		in := mh.vm.CreateTable(0, 4)
		in.RawSetString("sender", presence)
		in.RawSetString("op_code", lua.LNumber(msg.OpCode))
		if msg.Data != nil {
@@ -334,6 +335,7 @@ func loop(mh *MatchHandler) {
		} else {
			in.RawSetString("data", lua.LNil)
		}
		in.RawSetString("receive_time_ms", lua.LNumber(msg.ReceiveTime))

		input.RawSetInt(i, in)
	}
+9 −8
Original line number Diff line number Diff line
@@ -72,7 +72,7 @@ type MatchRegistry interface {
	Kick(stream PresenceStream, presences []*MatchPresence)
	// Pass a data payload (usually from a user) to the appropriate match handler.
	// Assumes that the data sender has already been validated as a match participant before this call.
	SendData(id uuid.UUID, node string, userID, sessionID uuid.UUID, username, fromNode string, opCode int64, data []byte)
	SendData(id uuid.UUID, node string, userID, sessionID uuid.UUID, username, fromNode string, opCode int64, data []byte, receiveTime int64)
}

type LocalMatchRegistry struct {
@@ -324,7 +324,7 @@ func (r *LocalMatchRegistry) Kick(stream PresenceStream, presences []*MatchPrese
	}
}

func (r *LocalMatchRegistry) SendData(id uuid.UUID, node string, userID, sessionID uuid.UUID, username, fromNode string, opCode int64, data []byte) {
func (r *LocalMatchRegistry) SendData(id uuid.UUID, node string, userID, sessionID uuid.UUID, username, fromNode string, opCode int64, data []byte, receiveTime int64) {
	if node != r.node {
		return
	}
@@ -345,5 +345,6 @@ func (r *LocalMatchRegistry) SendData(id uuid.UUID, node string, userID, session
		Node:        node,
		OpCode:      opCode,
		Data:        data,
		ReceiveTime: receiveTime,
	})
}
Loading