From 267c0ac999d46d9e101dc0f3e8a945aebdeb3f9c Mon Sep 17 00:00:00 2001 From: Andrei Mihu Date: Sat, 29 Dec 2018 21:39:43 +0000 Subject: [PATCH] Allow more time for match handlers to ack joins before returning to clients. --- CHANGELOG.md | 1 + server/match_handler.go | 80 +++++----------- server/match_presence.go | 151 +++++++++++++++++++++++++++++++ server/match_registry.go | 61 +++++++------ server/pipeline_match.go | 19 +++- server/runtime.go | 2 +- server/runtime_go_match_core.go | 2 +- server/runtime_lua_match_core.go | 2 +- 8 files changed, 224 insertions(+), 94 deletions(-) create mode 100644 server/match_presence.go diff --git a/CHANGELOG.md b/CHANGELOG.md index c5edcb26d..f66a29984 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr - Lua runtime group create function now sets the correct default max size if one is not specified. - Improve socket session close semantics. - Session logging now prints correct remote address if available when the connection is through a proxy. +- Authoritative match join attempts now wait until the handler acknowledges the join before returning to clients. ### Fixed - Correctly report execution mode in Lua runtime after hooks. diff --git a/server/match_handler.go b/server/match_handler.go index e5739cc11..e6680daa6 100644 --- a/server/match_handler.go +++ b/server/match_handler.go @@ -17,7 +17,6 @@ package server import ( "context" "fmt" - "sync" "time" "github.com/gofrs/uuid" @@ -26,58 +25,6 @@ import ( "go.uber.org/zap" ) -type MatchPresenceList struct { - sync.RWMutex - presences []*PresenceID -} - -func (m *MatchPresenceList) Join(joins []*MatchPresence) { - m.Lock() - for _, join := range joins { - m.presences = append(m.presences, &PresenceID{ - Node: join.Node, - SessionID: join.SessionID, - }) - } - m.Unlock() -} - -func (m *MatchPresenceList) Leave(leaves []*MatchPresence) { - m.Lock() - for _, leave := range leaves { - for i, presenceID := range m.presences { - if presenceID.SessionID == leave.SessionID && presenceID.Node == leave.Node { - m.presences = append(m.presences[:i], m.presences[i+1:]...) - break - } - } - } - m.Unlock() -} - -func (m *MatchPresenceList) Contains(presence *PresenceID) bool { - var found bool - m.RLock() - for _, p := range m.presences { - if p.SessionID == presence.SessionID && p.Node == p.Node { - found = true - break - } - } - m.RUnlock() - return found -} - -func (m *MatchPresenceList) List() []*PresenceID { - m.RLock() - list := make([]*PresenceID, 0, len(m.presences)) - for _, presence := range m.presences { - list = append(list, presence) - } - m.RUnlock() - return list -} - type MatchDataMessage struct { UserID uuid.UUID SessionID uuid.UUID @@ -125,8 +72,9 @@ type MatchHandler struct { tracker Tracker router MessageRouter - presenceList *MatchPresenceList - core RuntimeMatchCore + JoinMarkerList *MatchJoinMarkerList + presenceList *MatchPresenceList + core RuntimeMatchCore // Identification not (directly) controlled by match init. ID uuid.UUID @@ -148,7 +96,7 @@ type MatchHandler struct { deferredCh chan *DeferredMessage // Configuration set by match init. - Rate int + Rate int64 // Match state. state interface{} @@ -184,6 +132,9 @@ func NewMatchHandler(logger *zap.Logger, config Config, matchRegistry MatchRegis logger: logger, matchRegistry: matchRegistry, + JoinMarkerList: &MatchJoinMarkerList{ + joinMarkers: make(map[uuid.UUID]*MatchJoinMarker), + }, presenceList: presenceList, core: core, @@ -206,7 +157,7 @@ func NewMatchHandler(logger *zap.Logger, config Config, matchRegistry MatchRegis stopCh: make(chan struct{}), stopped: atomic.NewBool(false), - Rate: rateInt, + Rate: int64(rateInt), state: state, } @@ -320,11 +271,16 @@ func loop(mh *MatchHandler) { return } + // Every 30 seconds clear expired join markers. + if mh.tick%(mh.Rate*30) == 0 { + mh.JoinMarkerList.ClearExpired(mh.tick) + } + mh.state = state mh.tick++ } -func (mh *MatchHandler) QueueJoinAttempt(ctx context.Context, resultCh chan *MatchJoinResult, userID, sessionID uuid.UUID, username, node string, metadata map[string]string) bool { +func (mh *MatchHandler) QueueJoinAttempt(ctx context.Context, resultCh chan<- *MatchJoinResult, userID, sessionID uuid.UUID, username, node string, metadata map[string]string) bool { if mh.stopped.Load() { return false } @@ -359,6 +315,10 @@ func (mh *MatchHandler) QueueJoinAttempt(ctx context.Context, resultCh chan *Mat } mh.state = state + if allow { + // Keep join markers for up to 120 seconds. + mh.JoinMarkerList.Add(sessionID, mh.tick+(mh.Rate*120)) + } resultCh <- &MatchJoinResult{Allow: allow, Reason: reason, Label: mh.core.Label()} } @@ -385,6 +345,10 @@ func (mh *MatchHandler) QueueJoin(joins []*MatchPresence) bool { mh.presenceList.Join(joins) + for _, join := range joins { + mh.JoinMarkerList.Mark(join.SessionID) + } + state, err := mh.core.MatchJoin(mh.tick, mh.state, joins) if err != nil { mh.Stop() diff --git a/server/match_presence.go b/server/match_presence.go new file mode 100644 index 000000000..9860dfc2b --- /dev/null +++ b/server/match_presence.go @@ -0,0 +1,151 @@ +// Copyright 2018 The Nakama Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "github.com/gofrs/uuid" + "sync" +) + +// Represents routing and identify information for a single match participant. +type MatchPresence struct { + Node string + UserID uuid.UUID + SessionID uuid.UUID + Username string +} + +func (p *MatchPresence) GetUserId() string { + return p.UserID.String() +} +func (p *MatchPresence) GetSessionId() string { + return p.SessionID.String() +} +func (p *MatchPresence) GetNodeId() string { + return p.Node +} +func (p *MatchPresence) GetHidden() bool { + return false +} +func (p *MatchPresence) GetPersistence() bool { + return false +} +func (p *MatchPresence) GetUsername() string { + return p.Username +} +func (p *MatchPresence) GetStatus() string { + return "" +} + +// Used to monitor when match presences begin and complete their match join process. +type MatchJoinMarker struct { + expiryTick int64 + ch chan struct{} +} + +type MatchJoinMarkerList struct { + sync.RWMutex + joinMarkers map[uuid.UUID]*MatchJoinMarker +} + +func (m *MatchJoinMarkerList) Add(sessionID uuid.UUID, expiryTick int64) { + m.Lock() + m.joinMarkers[sessionID] = &MatchJoinMarker{ + expiryTick: expiryTick, + ch: make(chan struct{}), + } + m.Unlock() +} + +func (m *MatchJoinMarkerList) Get(sessionID uuid.UUID) <-chan struct{} { + var ch chan struct{} + m.RLock() + if joinMarker, ok := m.joinMarkers[sessionID]; ok { + ch = joinMarker.ch + } + m.RUnlock() + return ch +} + +func (m *MatchJoinMarkerList) Mark(sessionID uuid.UUID) { + m.RLock() + if joinMarker, ok := m.joinMarkers[sessionID]; ok { + close(joinMarker.ch) + } + m.RUnlock() +} + +func (m *MatchJoinMarkerList) ClearExpired(tick int64) { + m.Lock() + for sessionID, joinMarker := range m.joinMarkers { + if joinMarker.expiryTick <= tick { + delete(m.joinMarkers, sessionID) + } + } + m.Unlock() +} + +// Maintains the match presences for routing and validation purposes. +type MatchPresenceList struct { + sync.RWMutex + presences []*PresenceID +} + +func (m *MatchPresenceList) Join(joins []*MatchPresence) { + m.Lock() + for _, join := range joins { + m.presences = append(m.presences, &PresenceID{ + Node: join.Node, + SessionID: join.SessionID, + }) + } + m.Unlock() +} + +func (m *MatchPresenceList) Leave(leaves []*MatchPresence) { + m.Lock() + for _, leave := range leaves { + for i, presenceID := range m.presences { + if presenceID.SessionID == leave.SessionID && presenceID.Node == leave.Node { + m.presences = append(m.presences[:i], m.presences[i+1:]...) + break + } + } + } + m.Unlock() +} + +func (m *MatchPresenceList) Contains(presence *PresenceID) bool { + var found bool + m.RLock() + for _, p := range m.presences { + if p.SessionID == presence.SessionID && p.Node == p.Node { + found = true + break + } + } + m.RUnlock() + return found +} + +func (m *MatchPresenceList) List() []*PresenceID { + m.RLock() + list := make([]*PresenceID, 0, len(m.presences)) + for _, presence := range m.presences { + list = append(list, presence) + } + m.RUnlock() + return list +} diff --git a/server/match_registry.go b/server/match_registry.go index 43aa660f5..858351026 100644 --- a/server/match_registry.go +++ b/server/match_registry.go @@ -43,6 +43,7 @@ var ( ErrMatchLabelTooLong = errors.New("match label too long, must be 0-2048 bytes") ErrDeferredBroadcastFull = errors.New("too many deferred message broadcasts per tick") + ErrNoJoinMarker = errors.New("no join marker received") ) type MatchIndexEntry struct { @@ -51,35 +52,6 @@ type MatchIndexEntry struct { LabelString string `json:"label_string"` } -type MatchPresence struct { - Node string - UserID uuid.UUID - SessionID uuid.UUID - Username string -} - -func (p *MatchPresence) GetUserId() string { - return p.UserID.String() -} -func (p *MatchPresence) GetSessionId() string { - return p.SessionID.String() -} -func (p *MatchPresence) GetNodeId() string { - return p.Node -} -func (p *MatchPresence) GetHidden() bool { - return false -} -func (p *MatchPresence) GetPersistence() bool { - return false -} -func (p *MatchPresence) GetUsername() string { - return p.Username -} -func (p *MatchPresence) GetStatus() string { - return "" -} - type MatchJoinResult struct { Allow bool Reason string @@ -119,6 +91,8 @@ type MatchRegistry interface { // Pass a data payload (usually from a user) to the appropriate match handler. // Assumes that the data sender has already been validated as a match participant before this call. SendData(id uuid.UUID, node string, userID, sessionID uuid.UUID, username, fromNode string, opCode int64, data []byte, receiveTime int64) + // Wait for the match to confirm a user has completed their join process. + AwaitJoinMarker(ctx context.Context, id uuid.UUID, node string, sessionID uuid.UUID, fromNode string) error } type LocalMatchRegistry struct { @@ -631,3 +605,32 @@ func (r *LocalMatchRegistry) SendData(id uuid.UUID, node string, userID, session ReceiveTime: receiveTime, }) } + +func (r *LocalMatchRegistry) AwaitJoinMarker(ctx context.Context, id uuid.UUID, node string, sessionID uuid.UUID, fromNode string) error { + if node != r.node { + return ErrNoJoinMarker + } + + var mh *MatchHandler + var ok bool + r.RLock() + mh, ok = r.matches[id] + r.RUnlock() + if !ok { + return ErrNoJoinMarker + } + + ch := mh.JoinMarkerList.Get(sessionID) + if ch == nil { + return ErrNoJoinMarker + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-ch: + // Join marker received. + } + + return nil +} diff --git a/server/pipeline_match.go b/server/pipeline_match.go index 1bb178c6f..2f67104c5 100644 --- a/server/pipeline_match.go +++ b/server/pipeline_match.go @@ -15,6 +15,7 @@ package server import ( + "context" "fmt" "strings" @@ -196,10 +197,6 @@ func (p *Pipeline) matchJoin(logger *zap.Logger, session Session, envelope *rtap }}}) return } - if mode == StreamModeMatchAuthoritative { - // If we've reached here, it was an accepted authoritative join. - label = &wrappers.StringValue{Value: l} - } m := PresenceMeta{ Username: username, Format: session.Format(), @@ -208,6 +205,20 @@ func (p *Pipeline) matchJoin(logger *zap.Logger, session Session, envelope *rtap // Presence creation was rejected due to `allowIfFirstForSession` flag, session is gone so no need to reply. return } + if mode == StreamModeMatchAuthoritative { + // If we've reached here, it was an accepted authoritative join. + ctx, ctxCancelFn := context.WithTimeout(session.Context(), 5*time.Second) + if err := p.matchRegistry.AwaitJoinMarker(ctx, matchID, node, session.ID(), p.node); err != nil { + if err != context.Canceled { + ctxCancelFn() + } + // There was an error or a timeout waiting for the join marker, return to the client anyway since the tracker update was successful. + logger.Error("Error waiting for match join marker", zap.Error(err)) + } else { + ctxCancelFn() + } + label = &wrappers.StringValue{Value: l} + } meta = &m } else if mode == StreamModeMatchAuthoritative { // The user was already in the match, and it's an authoritative match. diff --git a/server/runtime.go b/server/runtime.go index dc6088b41..77b88b57e 100644 --- a/server/runtime.go +++ b/server/runtime.go @@ -219,7 +219,7 @@ type RuntimeMatchCore interface { MatchJoinAttempt(tick int64, state interface{}, userID, sessionID uuid.UUID, username, node string, metadata map[string]string) (interface{}, bool, string, error) MatchJoin(tick int64, state interface{}, joins []*MatchPresence) (interface{}, error) MatchLeave(tick int64, state interface{}, leaves []*MatchPresence) (interface{}, error) - MatchLoop(tick int64, state interface{}, inputCh chan *MatchDataMessage) (interface{}, error) + MatchLoop(tick int64, state interface{}, inputCh <-chan *MatchDataMessage) (interface{}, error) MatchTerminate(tick int64, state interface{}, graceSeconds int) (interface{}, error) Label() string Cancel() diff --git a/server/runtime_go_match_core.go b/server/runtime_go_match_core.go index f560a72a1..7c7426f02 100644 --- a/server/runtime_go_match_core.go +++ b/server/runtime_go_match_core.go @@ -142,7 +142,7 @@ func (r *RuntimeGoMatchCore) MatchLeave(tick int64, state interface{}, leaves [] return newState, nil } -func (r *RuntimeGoMatchCore) MatchLoop(tick int64, state interface{}, inputCh chan *MatchDataMessage) (interface{}, error) { +func (r *RuntimeGoMatchCore) MatchLoop(tick int64, state interface{}, inputCh <-chan *MatchDataMessage) (interface{}, error) { // Drain the input queue into a slice. size := len(inputCh) messages := make([]runtime.MatchData, size) diff --git a/server/runtime_lua_match_core.go b/server/runtime_lua_match_core.go index 021ec0e6f..f3c72aa96 100644 --- a/server/runtime_lua_match_core.go +++ b/server/runtime_lua_match_core.go @@ -406,7 +406,7 @@ func (r *RuntimeLuaMatchCore) MatchLeave(tick int64, state interface{}, leaves [ return newState, nil } -func (r *RuntimeLuaMatchCore) MatchLoop(tick int64, state interface{}, inputCh chan *MatchDataMessage) (interface{}, error) { +func (r *RuntimeLuaMatchCore) MatchLoop(tick int64, state interface{}, inputCh <-chan *MatchDataMessage) (interface{}, error) { // Drain the input queue into a Lua table. size := len(inputCh) input := r.vm.CreateTable(size, 0) -- GitLab