Commit c661bc46 authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Authoritative match dispatcher is no longer usable after match stops.

parent 90e09236
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -16,6 +16,9 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr
- Update IAP validation example for Android Publisher v3 API.
- Relayed multiplayer matches allow echoing messages back to sender if they're in the filter list.
- Upgrade Facebook authentication to use version 5.0 of the Facebook Graph API.
- Upgrade devconsole serialize-javascript (2.1.1) dependency.
- Ensure authoritative match dispatcher is no longer usable after match stops.
- Deferred message broadcasts now process just before match ends if match handler functions return an error.

### Fixed
- Correctly read pagination cursor in notification listings.
+40 −19
Original line number Diff line number Diff line
@@ -105,7 +105,7 @@ type MatchHandler struct {
	state interface{}
}

func NewMatchHandler(logger *zap.Logger, config Config, matchRegistry MatchRegistry, router MessageRouter, core RuntimeMatchCore, id uuid.UUID, node string, params map[string]interface{}) (*MatchHandler, error) {
func NewMatchHandler(logger *zap.Logger, config Config, matchRegistry MatchRegistry, router MessageRouter, core RuntimeMatchCore, id uuid.UUID, node string, stopped *atomic.Bool, params map[string]interface{}) (*MatchHandler, error) {
	presenceList := NewMatchPresenceList()

	deferredCh := make(chan *DeferredMessage, config.GetMatch().DeferredQueueSize)
@@ -155,7 +155,7 @@ func NewMatchHandler(logger *zap.Logger, config Config, matchRegistry MatchRegis
		joinAttemptCh: make(chan func(mh *MatchHandler), config.GetMatch().JoinAttemptQueueSize),
		deferredCh:    deferredCh,
		stopCh:        make(chan struct{}),
		stopped:       atomic.NewBool(false),
		stopped:       stopped,

		Rate: int64(rateInt),

@@ -203,6 +203,10 @@ func (mh *MatchHandler) Close() {
	if !mh.stopped.CAS(false, true) {
		return
	}

	// Ensure any remaining deferred broadcasts are sent.
	mh.processDeferred()

	mh.core.Cancel()
	close(mh.stopCh)
	mh.ticker.Stop()
@@ -256,20 +260,11 @@ func loop(mh *MatchHandler) {
		return
	}

	// Broadcast any deferred messages before checking for nil state, to make sure any final messages are sent.
	deferredCount := len(mh.deferredCh)
	if deferredCount != 0 {
		deferredMessages := make([]*DeferredMessage, deferredCount)
		for i := 0; i < deferredCount; i++ {
			msg := <-mh.deferredCh
			deferredMessages[i] = msg
		}

		mh.router.SendDeferred(mh.logger, deferredMessages)
	}

	// Check if we need to stop the match.
	if state == nil {
	if state != nil {
		// Broadcast any deferred messages. If match will be stopped broadcasting will be handled as part of the match end cycle.
		mh.processDeferred()
	} else {
		mh.Stop()
		mh.logger.Info("Match loop returned nil or no state, stopping match")
		return
@@ -288,6 +283,19 @@ func loop(mh *MatchHandler) {
	mh.tick++
}

func (mh *MatchHandler) processDeferred() {
	deferredCount := len(mh.deferredCh)
	if deferredCount != 0 {
		deferredMessages := make([]*DeferredMessage, deferredCount)
		for i := 0; i < deferredCount; i++ {
			msg := <-mh.deferredCh
			deferredMessages[i] = msg
		}

		mh.router.SendDeferred(mh.logger, deferredMessages)
	}
}

func (mh *MatchHandler) QueueJoinAttempt(ctx context.Context, resultCh chan<- *MatchJoinResult, userID, sessionID uuid.UUID, username, node string, metadata map[string]string) bool {
	if mh.stopped.Load() {
		return false
@@ -315,7 +323,11 @@ func (mh *MatchHandler) QueueJoinAttempt(ctx context.Context, resultCh chan<- *M
			resultCh <- &MatchJoinResult{Allow: false}
			return
		}
		if state == nil {

		if state != nil {
			// Broadcast any deferred messages. If match will be stopped broadcasting will be handled as part of the match end cycle.
			mh.processDeferred()
		} else {
			mh.Stop()
			mh.logger.Info("Match join attempt returned nil or no state, stopping match")
			resultCh <- &MatchJoinResult{Allow: false}
@@ -369,7 +381,10 @@ func (mh *MatchHandler) QueueJoin(joins []*MatchPresence, mark bool) bool {
				mh.logger.Warn("Stopping match after error from match_join execution", zap.Int64("tick", mh.tick), zap.Error(err))
				return
			}
			if state == nil {
			if state != nil {
				// Broadcast any deferred messages. If match will be stopped broadcasting will be handled as part of the match end cycle.
				mh.processDeferred()
			} else {
				mh.Stop()
				mh.logger.Info("Match join returned nil or no state, stopping match")
				return
@@ -404,7 +419,10 @@ func (mh *MatchHandler) QueueLeave(leaves []*MatchPresence) bool {
				mh.logger.Warn("Stopping match after error from match_leave execution", zap.Int("tick", int(mh.tick)), zap.Error(err))
				return
			}
			if state == nil {
			if state != nil {
				// Broadcast any deferred messages. If match will be stopped broadcasting will be handled as part of the match end cycle.
				mh.processDeferred()
			} else {
				mh.Stop()
				mh.logger.Info("Match leave returned nil or no state, stopping match")
				return
@@ -433,7 +451,10 @@ func (mh *MatchHandler) QueueTerminate(graceSeconds int) bool {
			mh.logger.Warn("Stopping match after error from match_terminate execution", zap.Int("tick", int(mh.tick)), zap.Error(err))
			return
		}
		if state == nil {
		if state != nil {
			// Broadcast any deferred messages. If match will be stopped broadcasting will be handled as part of the match end cycle.
			mh.processDeferred()
		} else {
			mh.Stop()
			mh.logger.Info("Match terminate returned nil or no state, stopping match")
			return
+6 −5
Original line number Diff line number Diff line
@@ -60,7 +60,7 @@ type MatchRegistry interface {
	// Create and start a new match, given a Lua module name or registered Go match function.
	CreateMatch(ctx context.Context, logger *zap.Logger, createFn RuntimeMatchCreateFunction, module string, params map[string]interface{}) (string, error)
	// Register and initialise a match that's ready to run.
	NewMatch(logger *zap.Logger, id uuid.UUID, core RuntimeMatchCore, params map[string]interface{}) (*MatchHandler, error)
	NewMatch(logger *zap.Logger, id uuid.UUID, core RuntimeMatchCore, stopped *atomic.Bool, params map[string]interface{}) (*MatchHandler, error)
	// Return a match handler by ID, only from the local node.
	GetMatch(id uuid.UUID) *MatchHandler
	// Remove a tracked match and ensure all its presences are cleaned up.
@@ -136,8 +136,9 @@ func NewLocalMatchRegistry(logger, startupLogger *zap.Logger, config Config, tra
func (r *LocalMatchRegistry) CreateMatch(ctx context.Context, logger *zap.Logger, createFn RuntimeMatchCreateFunction, module string, params map[string]interface{}) (string, error) {
	id := uuid.Must(uuid.NewV4())
	matchLogger := logger.With(zap.String("mid", id.String()))
	stopped := atomic.NewBool(false)

	core, err := createFn(ctx, matchLogger, id, r.node, module)
	core, err := createFn(ctx, matchLogger, id, r.node, stopped, module)
	if err != nil {
		return "", err
	}
@@ -146,7 +147,7 @@ func (r *LocalMatchRegistry) CreateMatch(ctx context.Context, logger *zap.Logger
	}

	// Start the match.
	mh, err := r.NewMatch(matchLogger, id, core, params)
	mh, err := r.NewMatch(matchLogger, id, core, stopped, params)
	if err != nil {
		return "", fmt.Errorf("error creating match: %v", err.Error())
	}
@@ -154,13 +155,13 @@ func (r *LocalMatchRegistry) CreateMatch(ctx context.Context, logger *zap.Logger
	return mh.IDStr, nil
}

func (r *LocalMatchRegistry) NewMatch(logger *zap.Logger, id uuid.UUID, core RuntimeMatchCore, params map[string]interface{}) (*MatchHandler, error) {
func (r *LocalMatchRegistry) NewMatch(logger *zap.Logger, id uuid.UUID, core RuntimeMatchCore, stopped *atomic.Bool, params map[string]interface{}) (*MatchHandler, error) {
	if r.stopped.Load() {
		// Server is shutting down, reject new matches.
		return nil, errors.New("shutdown in progress")
	}

	match, err := NewMatchHandler(logger, r.config, r, r.router, core, id, r.node, params)
	match, err := NewMatchHandler(logger, r.config, r, r.router, core, id, r.node, stopped, params)
	if err != nil {
		return nil, err
	}
+2 −1
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@ package server
import (
	"context"
	"database/sql"
	"go.uber.org/atomic"
	"os"
	"path/filepath"
	"strings"
@@ -167,7 +168,7 @@ type (

	RuntimeMatchmakerMatchedFunction func(ctx context.Context, entries []*MatchmakerEntry) (string, bool, error)

	RuntimeMatchCreateFunction       func(ctx context.Context, logger *zap.Logger, id uuid.UUID, node string, name string) (RuntimeMatchCore, error)
	RuntimeMatchCreateFunction       func(ctx context.Context, logger *zap.Logger, id uuid.UUID, node string, stopped *atomic.Bool, name string) (RuntimeMatchCore, error)
	RuntimeMatchDeferMessageFunction func(msg *DeferredMessage) error

	RuntimeTournamentEndFunction   func(ctx context.Context, tournament *api.Tournament, end, reset int64) error
+3 −2
Original line number Diff line number Diff line
@@ -19,6 +19,7 @@ import (
	"database/sql"
	"errors"
	"github.com/golang/protobuf/jsonpb"
	"go.uber.org/atomic"
	"path/filepath"
	"plugin"
	"strings"
@@ -1820,7 +1821,7 @@ func NewRuntimeProviderGo(logger, startupLogger *zap.Logger, db *sql.DB, jsonpbM

	match := make(map[string]func(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule) (runtime.Match, error), 0)
	matchLock := &sync.RWMutex{}
	matchCreateFn := func(ctx context.Context, logger *zap.Logger, id uuid.UUID, node string, name string) (RuntimeMatchCore, error) {
	matchCreateFn := func(ctx context.Context, logger *zap.Logger, id uuid.UUID, node string, stopped *atomic.Bool, name string) (RuntimeMatchCore, error) {
		matchLock.RLock()
		fn, ok := match[name]
		matchLock.RUnlock()
@@ -1835,7 +1836,7 @@ func NewRuntimeProviderGo(logger, startupLogger *zap.Logger, db *sql.DB, jsonpbM
			return nil, err
		}

		return NewRuntimeGoMatchCore(logger, matchRegistry, router, id, node, db, env, nk, match)
		return NewRuntimeGoMatchCore(logger, matchRegistry, router, id, node, stopped, db, env, nk, match)
	}
	nk.SetMatchCreateFn(matchCreateFn)
	matchNamesListFn := func() []string {
Loading