Commit 89caf473 authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Improve authoritative match join attempt and stop semantics.

parent 4144913c
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -8,10 +8,13 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr
- Lua runtime tournament listings now return duration, end active, and end time fields.
- Lua runtime tournament end hooks now contain duration, end active, and end time fields.
- Lua runtime tournament reset hooks now contain duration, end active, and end time fields.
- Separate configuration for maximum number of concurrent join requests to authoritative matches.

### Changed
- Rejoining a match the user is already part of will now return the match label.
- Allow tournament joins before the start of the tournament active period.
- Authoritative matches now complete their stop phase faster to avoid unnecessary processing.
- Authoritative match join attempts now have their own bounded queue and no longer count towards the match call queue limit.

### Fixed
- Correctly report execution mode in Lua runtime after hooks.
+9 −4
Original line number Diff line number Diff line
@@ -141,6 +141,9 @@ func ParseArgs(logger *zap.Logger, args []string) Config {
	if mainConfig.GetMatch().CallQueueSize < 1 {
		logger.Fatal("Match call queue size must be >= 1", zap.Int("match.call_queue_size", mainConfig.GetMatch().CallQueueSize))
	}
	if mainConfig.GetMatch().JoinAttemptQueueSize < 1 {
		logger.Fatal("Match join attempt queue size must be >= 1", zap.Int("match.join_attempt_queue_size", mainConfig.GetMatch().JoinAttemptQueueSize))
	}

	// If the runtime path is not overridden, set it to `datadir/modules`.
	if mainConfig.GetRuntime().Path == "" {
@@ -457,6 +460,7 @@ func NewRuntimeConfig() *RuntimeConfig {
type MatchConfig struct {
	InputQueueSize       int `yaml:"input_queue_size" json:"input_queue_size" usage:"Size of the authoritative match buffer that stores client messages until they can be processed by the next tick. Default 128."`
	CallQueueSize        int `yaml:"call_queue_size" json:"call_queue_size" usage:"Size of the authoritative match buffer that sequences calls to match handler callbacks to ensure no overlaps. Default 128."`
	JoinAttemptQueueSize int `yaml:"join_attempt_queue_size" json:"join_attempt_queue_size" usage:"Size of the authoritative match buffer that limits the number of in-progress join attempts. Default 128."`
}

// NewMatchConfig creates a new MatchConfig struct.
@@ -464,6 +468,7 @@ func NewMatchConfig() *MatchConfig {
	return &MatchConfig{
		InputQueueSize:       128,
		CallQueueSize:        128,
		JoinAttemptQueueSize: 128,
	}
}

+64 −19
Original line number Diff line number Diff line
@@ -87,6 +87,7 @@ type MatchHandler struct {
	inputCh       chan *MatchDataMessage
	ticker        *time.Ticker
	callCh        chan func(*MatchHandler)
	joinAttemptCh chan func(*MatchHandler)
	stopCh        chan struct{}
	stopped       *atomic.Bool

@@ -135,6 +136,7 @@ func NewMatchHandler(logger *zap.Logger, config Config, matchRegistry MatchRegis
		inputCh: make(chan *MatchDataMessage, config.GetMatch().InputQueueSize),
		// Ticker below.
		callCh:        make(chan func(mh *MatchHandler), config.GetMatch().CallQueueSize),
		joinAttemptCh: make(chan func(mh *MatchHandler), config.GetMatch().JoinAttemptQueueSize),
		stopCh:        make(chan struct{}),
		stopped:       atomic.NewBool(false),

@@ -156,12 +158,15 @@ func NewMatchHandler(logger *zap.Logger, config Config, matchRegistry MatchRegis
				return
			case <-mh.ticker.C:
				// Tick, queue a match loop invocation.
				if !mh.QueueCall(loop) {
				if !mh.queueCall(loop) {
					return
				}
			case call := <-mh.callCh:
				// An invocation to one of the match functions.
				// An invocation to one of the match functions, not including join attempts.
				call(mh)
			case joinAttempt := <-mh.joinAttemptCh:
				// An invocation to the join attempt match function.
				joinAttempt(mh)
			}
		}
	}()
@@ -187,7 +192,11 @@ func (mh *MatchHandler) Close() {
	mh.ticker.Stop()
}

func (mh *MatchHandler) QueueCall(f func(*MatchHandler)) bool {
func (mh *MatchHandler) queueCall(f func(*MatchHandler)) bool {
	if mh.stopped.Load() {
		return false
	}

	select {
	case mh.callCh <- f:
		return true
@@ -200,6 +209,10 @@ func (mh *MatchHandler) QueueCall(f func(*MatchHandler)) bool {
}

func (mh *MatchHandler) QueueData(m *MatchDataMessage) {
	if mh.stopped.Load() {
		return
	}

	select {
	case mh.inputCh <- m:
		return
@@ -231,8 +244,12 @@ func loop(mh *MatchHandler) {
	mh.tick++
}

func JoinAttempt(ctx context.Context, resultCh chan *MatchJoinResult, userID, sessionID uuid.UUID, username, node string, metadata map[string]string) func(mh *MatchHandler) {
	return func(mh *MatchHandler) {
func (mh *MatchHandler) QueueJoinAttempt(ctx context.Context, resultCh chan *MatchJoinResult, userID, sessionID uuid.UUID, username, node string, metadata map[string]string) bool {
	if mh.stopped.Load() {
		return false
	}

	joinAttempt := func(mh *MatchHandler) {
		select {
		case <-ctx.Done():
			// Do not process the match join attempt through the match handler if the client has gone away between
@@ -264,10 +281,24 @@ func JoinAttempt(ctx context.Context, resultCh chan *MatchJoinResult, userID, se
		mh.state = state
		resultCh <- &MatchJoinResult{Allow: allow, Reason: reason, Label: mh.Label.Load()}
	}

	select {
	case mh.joinAttemptCh <- joinAttempt:
		return true
	default:
		// Match join queue is full, the handler isn't processing these fast enough or there are just too many.
		// Not necessarily a match processing speed problem, so the match is not closed for these.
		mh.logger.Warn("Match handler join attempt queue full")
		return false
	}
}

func (mh *MatchHandler) QueueJoin(joins []*MatchPresence) bool {
	if mh.stopped.Load() {
		return false
	}

func Join(joins []*MatchPresence) func(mh *MatchHandler) {
	return func(mh *MatchHandler) {
	join := func(mh *MatchHandler) {
		if mh.stopped.Load() {
			return
		}
@@ -286,10 +317,16 @@ func Join(joins []*MatchPresence) func(mh *MatchHandler) {

		mh.state = state
	}

	return mh.queueCall(join)
}

func (mh *MatchHandler) QueueLeave(leaves []*MatchPresence) bool {
	if mh.stopped.Load() {
		return false
	}

func Leave(leaves []*MatchPresence) func(mh *MatchHandler) {
	return func(mh *MatchHandler) {
	leave := func(mh *MatchHandler) {
		if mh.stopped.Load() {
			return
		}
@@ -308,10 +345,16 @@ func Leave(leaves []*MatchPresence) func(mh *MatchHandler) {

		mh.state = state
	}

	return mh.queueCall(leave)
}

func (mh *MatchHandler) QueueTerminate(graceSeconds int) bool {
	if mh.stopped.Load() {
		return false
	}

func Terminate(graceSeconds int) func(mh *MatchHandler) {
	return func(mh *MatchHandler) {
	terminate := func(mh *MatchHandler) {
		if mh.stopped.Load() {
			return
		}
@@ -335,4 +378,6 @@ func Terminate(graceSeconds int) func(mh *MatchHandler) {
			mh.Stop()
		}
	}

	return mh.queueCall(terminate)
}
+5 −5
Original line number Diff line number Diff line
@@ -537,7 +537,7 @@ func (r *LocalMatchRegistry) Stop(graceSeconds int) chan struct{} {

	for _, mh := range r.matches {
		// Don't care if the call queue is full, match is supposed to end anyway.
		mh.QueueCall(Terminate(graceSeconds))
		mh.QueueTerminate(graceSeconds)
	}
	r.RUnlock()
	return r.stoppedCh
@@ -558,9 +558,9 @@ func (r *LocalMatchRegistry) JoinAttempt(ctx context.Context, id uuid.UUID, node
	}

	resultCh := make(chan *MatchJoinResult, 1)
	if !mh.QueueCall(JoinAttempt(ctx, resultCh, userID, sessionID, username, fromNode, metadata)) {
	if !mh.QueueJoinAttempt(ctx, resultCh, userID, sessionID, username, fromNode, metadata) {
		// The match call queue was full, so will be closed and therefore can't be joined.
		return true, false, "", ""
		return true, false, "Match is not currently accepting join requests", ""
	}

	// Set up a limit to how long the call will wait, default is 10 seconds.
@@ -588,7 +588,7 @@ func (r *LocalMatchRegistry) Join(id uuid.UUID, presences []*MatchPresence) {
	}

	// Doesn't matter if the call queue was full here. If the match is being closed then joins don't matter anyway.
	mh.QueueCall(Join(presences))
	mh.QueueJoin(presences)
}

func (r *LocalMatchRegistry) Leave(id uuid.UUID, presences []*MatchPresence) {
@@ -602,7 +602,7 @@ func (r *LocalMatchRegistry) Leave(id uuid.UUID, presences []*MatchPresence) {
	}

	// Doesn't matter if the call queue was full here. If the match is being closed then leaves don't matter anyway.
	mh.QueueCall(Leave(presences))
	mh.QueueLeave(presences)
}

func (r *LocalMatchRegistry) Kick(stream PresenceStream, presences []*MatchPresence) {