Commit b0bcf00a authored by Andrei Mihu's avatar Andrei Mihu
Browse files

New deferred broadcast dispatcher function.

parent 6e07e5a0
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -16,6 +16,7 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr
- Go runtime authoritative matches now also print Match IDs in log lines generated within the match.
- Allow client email authentication requests to optionally authenticate with username/password instead of email/password.
- Allow runtime email authentication calls to authenticate with username/password instead of email/password.
- New authoritative match dispatcher function to defer message broadcasts until the end of the tick.

### Changed
- Replace standard logger supplied to the Go runtime with a more powerful interface.
+1 −0
Original line number Diff line number Diff line
@@ -671,6 +671,7 @@ type MatchData interface {

type MatchDispatcher interface {
	BroadcastMessage(opCode int64, data []byte, presences []Presence, sender Presence) error
	BroadcastMessageDeferred(opCode int64, data []byte, presences []Presence, sender Presence) error
	MatchKick(presences []Presence) error
	MatchLabelUpdate(label string) error
}
+11 −12
Original line number Diff line number Diff line
@@ -20,10 +20,9 @@ import (
	"github.com/heroiclabs/nakama/api"
	"github.com/heroiclabs/nakama/rtapi"
	"github.com/heroiclabs/nakama/runtime"
	"log"
)

func InitModule(ctx context.Context, logger *log.Logger, db *sql.DB, nk runtime.NakamaModule, initializer runtime.Initializer) error {
func InitModule(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule, initializer runtime.Initializer) error {
	if err := initializer.RegisterRpc("go_echo_sample", rpcEcho); err != nil {
		return err
	}
@@ -33,7 +32,7 @@ func InitModule(ctx context.Context, logger *log.Logger, db *sql.DB, nk runtime.
	if err := initializer.RegisterAfterGetAccount(afterGetAccount); err != nil {
		return err
	}
	if err := initializer.RegisterMatch("match", func(ctx context.Context, logger *log.Logger, db *sql.DB, nk runtime.NakamaModule) (runtime.Match, error) {
	if err := initializer.RegisterMatch("match", func(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule) (runtime.Match, error) {
		return &Match{}, nil
	}); err != nil {
		return err
@@ -41,17 +40,17 @@ func InitModule(ctx context.Context, logger *log.Logger, db *sql.DB, nk runtime.
	return nil
}

func rpcEcho(ctx context.Context, logger *log.Logger, db *sql.DB, nk runtime.NakamaModule, payload string) (string, error) {
func rpcEcho(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule, payload string) (string, error) {
	logger.Print("RUNNING IN GO")
	return payload, nil
}

func beforeChannelJoin(ctx context.Context, logger *log.Logger, db *sql.DB, nk runtime.NakamaModule, envelope *rtapi.Envelope) (*rtapi.Envelope, error) {
func beforeChannelJoin(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule, envelope *rtapi.Envelope) (*rtapi.Envelope, error) {
	logger.Printf("Intercepted request to join channel '%v'", envelope.GetChannelJoin().Target)
	return envelope, nil
}

func afterGetAccount(ctx context.Context, logger *log.Logger, db *sql.DB, nk runtime.NakamaModule, in *api.Account) error {
func afterGetAccount(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule, in *api.Account) error {
	logger.Printf("Intercepted response to get account '%v'", in)
	return nil
}
@@ -62,7 +61,7 @@ type MatchState struct {

type Match struct{}

func (m *Match) MatchInit(ctx context.Context, logger *log.Logger, db *sql.DB, nk runtime.NakamaModule, params map[string]interface{}) (interface{}, int, string) {
func (m *Match) MatchInit(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule, params map[string]interface{}) (interface{}, int, string) {
	var debug bool
	if d, ok := params["debug"]; ok {
		if dv, ok := d.(bool); ok {
@@ -82,7 +81,7 @@ func (m *Match) MatchInit(ctx context.Context, logger *log.Logger, db *sql.DB, n
	return state, tickRate, label
}

func (m *Match) MatchJoinAttempt(ctx context.Context, logger *log.Logger, db *sql.DB, nk runtime.NakamaModule, dispatcher runtime.MatchDispatcher, tick int64, state interface{}, presence runtime.Presence, metadata map[string]string) (interface{}, bool, string) {
func (m *Match) MatchJoinAttempt(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule, dispatcher runtime.MatchDispatcher, tick int64, state interface{}, presence runtime.Presence, metadata map[string]string) (interface{}, bool, string) {
	if state.(*MatchState).debug {
		logger.Printf("match join attempt username %v user_id %v session_id %v node %v with metadata %v", presence.GetUsername(), presence.GetUserId(), presence.GetSessionId(), presence.GetNodeId(), metadata)
	}
@@ -90,7 +89,7 @@ func (m *Match) MatchJoinAttempt(ctx context.Context, logger *log.Logger, db *sq
	return state, true, ""
}

func (m *Match) MatchJoin(ctx context.Context, logger *log.Logger, db *sql.DB, nk runtime.NakamaModule, dispatcher runtime.MatchDispatcher, tick int64, state interface{}, presences []runtime.Presence) interface{} {
func (m *Match) MatchJoin(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule, dispatcher runtime.MatchDispatcher, tick int64, state interface{}, presences []runtime.Presence) interface{} {
	if state.(*MatchState).debug {
		for _, presence := range presences {
			logger.Printf("match join username %v user_id %v session_id %v node %v", presence.GetUsername(), presence.GetUserId(), presence.GetSessionId(), presence.GetNodeId())
@@ -100,7 +99,7 @@ func (m *Match) MatchJoin(ctx context.Context, logger *log.Logger, db *sql.DB, n
	return state
}

func (m *Match) MatchLeave(ctx context.Context, logger *log.Logger, db *sql.DB, nk runtime.NakamaModule, dispatcher runtime.MatchDispatcher, tick int64, state interface{}, presences []runtime.Presence) interface{} {
func (m *Match) MatchLeave(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule, dispatcher runtime.MatchDispatcher, tick int64, state interface{}, presences []runtime.Presence) interface{} {
	if state.(*MatchState).debug {
		for _, presence := range presences {
			logger.Printf("match leave username %v user_id %v session_id %v node %v", presence.GetUsername(), presence.GetUserId(), presence.GetSessionId(), presence.GetNodeId())
@@ -110,7 +109,7 @@ func (m *Match) MatchLeave(ctx context.Context, logger *log.Logger, db *sql.DB,
	return state
}

func (m *Match) MatchLoop(ctx context.Context, logger *log.Logger, db *sql.DB, nk runtime.NakamaModule, dispatcher runtime.MatchDispatcher, tick int64, state interface{}, messages []runtime.MatchData) interface{} {
func (m *Match) MatchLoop(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule, dispatcher runtime.MatchDispatcher, tick int64, state interface{}, messages []runtime.MatchData) interface{} {
	if state.(*MatchState).debug {
		logger.Printf("match loop match_id %v tick %v", ctx.Value(runtime.RUNTIME_CTX_MATCH_ID), tick)
		logger.Printf("match loop match_id %v message count %v", ctx.Value(runtime.RUNTIME_CTX_MATCH_ID), len(messages))
@@ -122,7 +121,7 @@ func (m *Match) MatchLoop(ctx context.Context, logger *log.Logger, db *sql.DB, n
	return state
}

func (m *Match) MatchTerminate(ctx context.Context, logger *log.Logger, db *sql.DB, nk runtime.NakamaModule, dispatcher runtime.MatchDispatcher, tick int64, state interface{}, graceSeconds int) interface{} {
func (m *Match) MatchTerminate(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule, dispatcher runtime.MatchDispatcher, tick int64, state interface{}, graceSeconds int) interface{} {
	if state.(*MatchState).debug {
		logger.Printf("match terminate match_id %v tick %v", ctx.Value(runtime.RUNTIME_CTX_MATCH_ID), tick)
		logger.Printf("match terminate match_id %v grace seconds %v", ctx.Value(runtime.RUNTIME_CTX_MATCH_ID), graceSeconds)
+5 −0
Original line number Diff line number Diff line
@@ -144,6 +144,9 @@ func ParseArgs(logger *zap.Logger, args []string) Config {
	if mainConfig.GetMatch().JoinAttemptQueueSize < 1 {
		logger.Fatal("Match join attempt queue size must be >= 1", zap.Int("match.join_attempt_queue_size", mainConfig.GetMatch().JoinAttemptQueueSize))
	}
	if mainConfig.GetMatch().DeferredQueueSize < 1 {
		logger.Fatal("Match deferred queue size must be >= 1", zap.Int("match.deferred_queue_size", mainConfig.GetMatch().DeferredQueueSize))
	}

	// If the runtime path is not overridden, set it to `datadir/modules`.
	if mainConfig.GetRuntime().Path == "" {
@@ -474,6 +477,7 @@ type MatchConfig struct {
	InputQueueSize       int `yaml:"input_queue_size" json:"input_queue_size" usage:"Size of the authoritative match buffer that stores client messages until they can be processed by the next tick. Default 128."`
	CallQueueSize        int `yaml:"call_queue_size" json:"call_queue_size" usage:"Size of the authoritative match buffer that sequences calls to match handler callbacks to ensure no overlaps. Default 128."`
	JoinAttemptQueueSize int `yaml:"join_attempt_queue_size" json:"join_attempt_queue_size" usage:"Size of the authoritative match buffer that limits the number of in-progress join attempts. Default 128."`
	DeferredQueueSize    int `yaml:"deferred_queue_size" json:"deferred_queue_size" usage:"Size of the authoritative match buffer that holds deferred message broadcasts until the end of each loop execution. Default 128."`
}

// NewMatchConfig creates a new MatchConfig struct.
@@ -482,6 +486,7 @@ func NewMatchConfig() *MatchConfig {
		InputQueueSize:       128,
		CallQueueSize:        128,
		JoinAttemptQueueSize: 128,
		DeferredQueueSize:    128,
	}
}

+33 −12
Original line number Diff line number Diff line
@@ -145,20 +145,31 @@ type MatchHandler struct {
	stopCh        chan struct{}
	stopped       *atomic.Bool

	deferredCh chan *DeferredMessage

	// Configuration set by match init.
	Label *atomic.String
	Rate int

	// Match state.
	state interface{}
}

func NewMatchHandler(logger *zap.Logger, config Config, matchRegistry MatchRegistry, core RuntimeMatchCore, label *atomic.String, id uuid.UUID, node string, params map[string]interface{}) (*MatchHandler, error) {
func NewMatchHandler(logger *zap.Logger, config Config, matchRegistry MatchRegistry, core RuntimeMatchCore, id uuid.UUID, node string, params map[string]interface{}) (*MatchHandler, error) {
	presenceList := &MatchPresenceList{
		presences: make([]*PresenceID, 0, 10),
	}

	state, rateInt, labelStr, err := core.MatchInit(presenceList, params)
	deferredCh := make(chan *DeferredMessage, config.GetMatch().DeferredQueueSize)
	deferMessageFn := func(msg *DeferredMessage) error {
		select {
		case deferredCh <- msg:
			return nil
		default:
			return ErrDeferredBroadcastFull
		}
	}

	state, rateInt, err := core.MatchInit(presenceList, deferMessageFn, params)
	if err != nil {
		core.Cancel()
		return nil, err
@@ -167,11 +178,6 @@ func NewMatchHandler(logger *zap.Logger, config Config, matchRegistry MatchRegis
		core.Cancel()
		return nil, errors.New("Match initial state must not be nil")
	}
	err = matchRegistry.UpdateMatchLabel(id, labelStr)
	if err != nil {
		return nil, err
	}
	label.Store(labelStr)

	// Construct the match.
	mh := &MatchHandler{
@@ -196,10 +202,10 @@ func NewMatchHandler(logger *zap.Logger, config Config, matchRegistry MatchRegis
		// Ticker below.
		callCh:        make(chan func(mh *MatchHandler), config.GetMatch().CallQueueSize),
		joinAttemptCh: make(chan func(mh *MatchHandler), config.GetMatch().JoinAttemptQueueSize),
		deferredCh:    deferredCh,
		stopCh:        make(chan struct{}),
		stopped:       atomic.NewBool(false),

		Label: label,
		Rate: rateInt,

		state: state,
@@ -287,12 +293,27 @@ func loop(mh *MatchHandler) {
		return
	}

	// Execute the loop.
	state, err := mh.core.MatchLoop(mh.tick, mh.state, mh.inputCh)
	if err != nil {
		mh.Stop()
		mh.logger.Warn("Stopping match after error from match_loop execution", zap.Int64("tick", mh.tick), zap.Error(err))
		return
	}

	// Broadcast any deferred messages before checking for nil state, to make sure any final messages are sent.
	deferredCount := len(mh.deferredCh)
	if deferredCount != 0 {
		deferredMessages := make([]*DeferredMessage, deferredCount)
		for i := 0; i < deferredCount; i++ {
			msg := <-mh.deferredCh
			deferredMessages[i] = msg
		}

		mh.router.SendDeferred(mh.logger, true, StreamModeMatchAuthoritative, deferredMessages)
	}

	// Check if we need to stop the match.
	if state == nil {
		mh.Stop()
		mh.logger.Info("Match loop returned nil or no state, stopping match")
@@ -338,7 +359,7 @@ func (mh *MatchHandler) QueueJoinAttempt(ctx context.Context, resultCh chan *Mat
		}

		mh.state = state
		resultCh <- &MatchJoinResult{Allow: allow, Reason: reason, Label: mh.Label.Load()}
		resultCh <- &MatchJoinResult{Allow: allow, Reason: reason, Label: mh.core.Label()}
	}

	select {
Loading