From 89caf473b915c47187fa7d5bb9f6bbdbf64b9a8f Mon Sep 17 00:00:00 2001 From: Andrei Mihu Date: Fri, 7 Dec 2018 18:07:42 +0000 Subject: [PATCH] Improve authoritative match join attempt and stop semantics. --- CHANGELOG.md | 3 ++ server/config.go | 13 +++++-- server/match_handler.go | 83 +++++++++++++++++++++++++++++++--------- server/match_registry.go | 10 ++--- 4 files changed, 81 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c46e54d20..13f3a139b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,10 +8,13 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr - Lua runtime tournament listings now return duration, end active, and end time fields. - Lua runtime tournament end hooks now contain duration, end active, and end time fields. - Lua runtime tournament reset hooks now contain duration, end active, and end time fields. +- Separate configuration for maximum number of concurrent join requests to authoritative matches. ### Changed - Rejoining a match the user is already part of will now return the match label. - Allow tournament joins before the start of the tournament active period. +- Authoritative matches now complete their stop phase faster to avoid unnecessary processing. +- Authoritative match join attempts now have their own bounded queue and no longer count towards the match call queue limit. ### Fixed - Correctly report execution mode in Lua runtime after hooks. diff --git a/server/config.go b/server/config.go index e7901c85f..c806850bb 100644 --- a/server/config.go +++ b/server/config.go @@ -141,6 +141,9 @@ func ParseArgs(logger *zap.Logger, args []string) Config { if mainConfig.GetMatch().CallQueueSize < 1 { logger.Fatal("Match call queue size must be >= 1", zap.Int("match.call_queue_size", mainConfig.GetMatch().CallQueueSize)) } + if mainConfig.GetMatch().JoinAttemptQueueSize < 1 { + logger.Fatal("Match join attempt queue size must be >= 1", zap.Int("match.join_attempt_queue_size", mainConfig.GetMatch().JoinAttemptQueueSize)) + } // If the runtime path is not overridden, set it to `datadir/modules`. if mainConfig.GetRuntime().Path == "" { @@ -455,15 +458,17 @@ func NewRuntimeConfig() *RuntimeConfig { // MatchConfig is configuration relevant to authoritative realtime multiplayer matches. type MatchConfig struct { - InputQueueSize int `yaml:"input_queue_size" json:"input_queue_size" usage:"Size of the authoritative match buffer that stores client messages until they can be processed by the next tick. Default 128."` - CallQueueSize int `yaml:"call_queue_size" json:"call_queue_size" usage:"Size of the authoritative match buffer that sequences calls to match handler callbacks to ensure no overlaps. Default 128."` + InputQueueSize int `yaml:"input_queue_size" json:"input_queue_size" usage:"Size of the authoritative match buffer that stores client messages until they can be processed by the next tick. Default 128."` + CallQueueSize int `yaml:"call_queue_size" json:"call_queue_size" usage:"Size of the authoritative match buffer that sequences calls to match handler callbacks to ensure no overlaps. Default 128."` + JoinAttemptQueueSize int `yaml:"join_attempt_queue_size" json:"join_attempt_queue_size" usage:"Size of the authoritative match buffer that limits the number of in-progress join attempts. Default 128."` } // NewMatchConfig creates a new MatchConfig struct. func NewMatchConfig() *MatchConfig { return &MatchConfig{ - InputQueueSize: 128, - CallQueueSize: 128, + InputQueueSize: 128, + CallQueueSize: 128, + JoinAttemptQueueSize: 128, } } diff --git a/server/match_handler.go b/server/match_handler.go index 5bfdc040d..3c7a90023 100644 --- a/server/match_handler.go +++ b/server/match_handler.go @@ -84,11 +84,12 @@ type MatchHandler struct { tick int64 // Control elements. - inputCh chan *MatchDataMessage - ticker *time.Ticker - callCh chan func(*MatchHandler) - stopCh chan struct{} - stopped *atomic.Bool + inputCh chan *MatchDataMessage + ticker *time.Ticker + callCh chan func(*MatchHandler) + joinAttemptCh chan func(*MatchHandler) + stopCh chan struct{} + stopped *atomic.Bool // Configuration set by match init. Label *atomic.String @@ -134,9 +135,10 @@ func NewMatchHandler(logger *zap.Logger, config Config, matchRegistry MatchRegis inputCh: make(chan *MatchDataMessage, config.GetMatch().InputQueueSize), // Ticker below. - callCh: make(chan func(mh *MatchHandler), config.GetMatch().CallQueueSize), - stopCh: make(chan struct{}), - stopped: atomic.NewBool(false), + callCh: make(chan func(mh *MatchHandler), config.GetMatch().CallQueueSize), + joinAttemptCh: make(chan func(mh *MatchHandler), config.GetMatch().JoinAttemptQueueSize), + stopCh: make(chan struct{}), + stopped: atomic.NewBool(false), Label: label, Rate: rateInt, @@ -156,12 +158,15 @@ func NewMatchHandler(logger *zap.Logger, config Config, matchRegistry MatchRegis return case <-mh.ticker.C: // Tick, queue a match loop invocation. - if !mh.QueueCall(loop) { + if !mh.queueCall(loop) { return } case call := <-mh.callCh: - // An invocation to one of the match functions. + // An invocation to one of the match functions, not including join attempts. call(mh) + case joinAttempt := <-mh.joinAttemptCh: + // An invocation to the join attempt match function. + joinAttempt(mh) } } }() @@ -187,7 +192,11 @@ func (mh *MatchHandler) Close() { mh.ticker.Stop() } -func (mh *MatchHandler) QueueCall(f func(*MatchHandler)) bool { +func (mh *MatchHandler) queueCall(f func(*MatchHandler)) bool { + if mh.stopped.Load() { + return false + } + select { case mh.callCh <- f: return true @@ -200,6 +209,10 @@ func (mh *MatchHandler) QueueCall(f func(*MatchHandler)) bool { } func (mh *MatchHandler) QueueData(m *MatchDataMessage) { + if mh.stopped.Load() { + return + } + select { case mh.inputCh <- m: return @@ -231,8 +244,12 @@ func loop(mh *MatchHandler) { mh.tick++ } -func JoinAttempt(ctx context.Context, resultCh chan *MatchJoinResult, userID, sessionID uuid.UUID, username, node string, metadata map[string]string) func(mh *MatchHandler) { - return func(mh *MatchHandler) { +func (mh *MatchHandler) QueueJoinAttempt(ctx context.Context, resultCh chan *MatchJoinResult, userID, sessionID uuid.UUID, username, node string, metadata map[string]string) bool { + if mh.stopped.Load() { + return false + } + + joinAttempt := func(mh *MatchHandler) { select { case <-ctx.Done(): // Do not process the match join attempt through the match handler if the client has gone away between @@ -264,10 +281,24 @@ func JoinAttempt(ctx context.Context, resultCh chan *MatchJoinResult, userID, se mh.state = state resultCh <- &MatchJoinResult{Allow: allow, Reason: reason, Label: mh.Label.Load()} } + + select { + case mh.joinAttemptCh <- joinAttempt: + return true + default: + // Match join queue is full, the handler isn't processing these fast enough or there are just too many. + // Not necessarily a match processing speed problem, so the match is not closed for these. + mh.logger.Warn("Match handler join attempt queue full") + return false + } } -func Join(joins []*MatchPresence) func(mh *MatchHandler) { - return func(mh *MatchHandler) { +func (mh *MatchHandler) QueueJoin(joins []*MatchPresence) bool { + if mh.stopped.Load() { + return false + } + + join := func(mh *MatchHandler) { if mh.stopped.Load() { return } @@ -286,10 +317,16 @@ func Join(joins []*MatchPresence) func(mh *MatchHandler) { mh.state = state } + + return mh.queueCall(join) } -func Leave(leaves []*MatchPresence) func(mh *MatchHandler) { - return func(mh *MatchHandler) { +func (mh *MatchHandler) QueueLeave(leaves []*MatchPresence) bool { + if mh.stopped.Load() { + return false + } + + leave := func(mh *MatchHandler) { if mh.stopped.Load() { return } @@ -308,10 +345,16 @@ func Leave(leaves []*MatchPresence) func(mh *MatchHandler) { mh.state = state } + + return mh.queueCall(leave) } -func Terminate(graceSeconds int) func(mh *MatchHandler) { - return func(mh *MatchHandler) { +func (mh *MatchHandler) QueueTerminate(graceSeconds int) bool { + if mh.stopped.Load() { + return false + } + + terminate := func(mh *MatchHandler) { if mh.stopped.Load() { return } @@ -335,4 +378,6 @@ func Terminate(graceSeconds int) func(mh *MatchHandler) { mh.Stop() } } + + return mh.queueCall(terminate) } diff --git a/server/match_registry.go b/server/match_registry.go index 1b21fdf9e..9c0faa23f 100644 --- a/server/match_registry.go +++ b/server/match_registry.go @@ -537,7 +537,7 @@ func (r *LocalMatchRegistry) Stop(graceSeconds int) chan struct{} { for _, mh := range r.matches { // Don't care if the call queue is full, match is supposed to end anyway. - mh.QueueCall(Terminate(graceSeconds)) + mh.QueueTerminate(graceSeconds) } r.RUnlock() return r.stoppedCh @@ -558,9 +558,9 @@ func (r *LocalMatchRegistry) JoinAttempt(ctx context.Context, id uuid.UUID, node } resultCh := make(chan *MatchJoinResult, 1) - if !mh.QueueCall(JoinAttempt(ctx, resultCh, userID, sessionID, username, fromNode, metadata)) { + if !mh.QueueJoinAttempt(ctx, resultCh, userID, sessionID, username, fromNode, metadata) { // The match call queue was full, so will be closed and therefore can't be joined. - return true, false, "", "" + return true, false, "Match is not currently accepting join requests", "" } // Set up a limit to how long the call will wait, default is 10 seconds. @@ -588,7 +588,7 @@ func (r *LocalMatchRegistry) Join(id uuid.UUID, presences []*MatchPresence) { } // Doesn't matter if the call queue was full here. If the match is being closed then joins don't matter anyway. - mh.QueueCall(Join(presences)) + mh.QueueJoin(presences) } func (r *LocalMatchRegistry) Leave(id uuid.UUID, presences []*MatchPresence) { @@ -602,7 +602,7 @@ func (r *LocalMatchRegistry) Leave(id uuid.UUID, presences []*MatchPresence) { } // Doesn't matter if the call queue was full here. If the match is being closed then leaves don't matter anyway. - mh.QueueCall(Leave(presences)) + mh.QueueLeave(presences) } func (r *LocalMatchRegistry) Kick(stream PresenceStream, presences []*MatchPresence) { -- GitLab