Commit 267c0ac9 authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Allow more time for match handlers to ack joins before returning to clients.

parent 070c74c1
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -32,6 +32,7 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr
- Lua runtime group create function now sets the correct default max size if one is not specified.
- Improve socket session close semantics.
- Session logging now prints correct remote address if available when the connection is through a proxy.
- Authoritative match join attempts now wait until the handler acknowledges the join before returning to clients.

### Fixed
- Correctly report execution mode in Lua runtime after hooks.
+22 −58
Original line number Diff line number Diff line
@@ -17,7 +17,6 @@ package server
import (
	"context"
	"fmt"
	"sync"
	"time"

	"github.com/gofrs/uuid"
@@ -26,58 +25,6 @@ import (
	"go.uber.org/zap"
)

type MatchPresenceList struct {
	sync.RWMutex
	presences []*PresenceID
}

func (m *MatchPresenceList) Join(joins []*MatchPresence) {
	m.Lock()
	for _, join := range joins {
		m.presences = append(m.presences, &PresenceID{
			Node:      join.Node,
			SessionID: join.SessionID,
		})
	}
	m.Unlock()
}

func (m *MatchPresenceList) Leave(leaves []*MatchPresence) {
	m.Lock()
	for _, leave := range leaves {
		for i, presenceID := range m.presences {
			if presenceID.SessionID == leave.SessionID && presenceID.Node == leave.Node {
				m.presences = append(m.presences[:i], m.presences[i+1:]...)
				break
			}
		}
	}
	m.Unlock()
}

func (m *MatchPresenceList) Contains(presence *PresenceID) bool {
	var found bool
	m.RLock()
	for _, p := range m.presences {
		if p.SessionID == presence.SessionID && p.Node == p.Node {
			found = true
			break
		}
	}
	m.RUnlock()
	return found
}

func (m *MatchPresenceList) List() []*PresenceID {
	m.RLock()
	list := make([]*PresenceID, 0, len(m.presences))
	for _, presence := range m.presences {
		list = append(list, presence)
	}
	m.RUnlock()
	return list
}

type MatchDataMessage struct {
	UserID      uuid.UUID
	SessionID   uuid.UUID
@@ -125,6 +72,7 @@ type MatchHandler struct {
	tracker       Tracker
	router        MessageRouter

	JoinMarkerList *MatchJoinMarkerList
	presenceList   *MatchPresenceList
	core           RuntimeMatchCore

@@ -148,7 +96,7 @@ type MatchHandler struct {
	deferredCh chan *DeferredMessage

	// Configuration set by match init.
	Rate int
	Rate int64

	// Match state.
	state interface{}
@@ -184,6 +132,9 @@ func NewMatchHandler(logger *zap.Logger, config Config, matchRegistry MatchRegis
		logger:        logger,
		matchRegistry: matchRegistry,

		JoinMarkerList: &MatchJoinMarkerList{
			joinMarkers: make(map[uuid.UUID]*MatchJoinMarker),
		},
		presenceList: presenceList,
		core:         core,

@@ -206,7 +157,7 @@ func NewMatchHandler(logger *zap.Logger, config Config, matchRegistry MatchRegis
		stopCh:        make(chan struct{}),
		stopped:       atomic.NewBool(false),

		Rate: rateInt,
		Rate: int64(rateInt),

		state: state,
	}
@@ -320,11 +271,16 @@ func loop(mh *MatchHandler) {
		return
	}

	// Every 30 seconds clear expired join markers.
	if mh.tick%(mh.Rate*30) == 0 {
		mh.JoinMarkerList.ClearExpired(mh.tick)
	}

	mh.state = state
	mh.tick++
}

func (mh *MatchHandler) QueueJoinAttempt(ctx context.Context, resultCh chan *MatchJoinResult, userID, sessionID uuid.UUID, username, node string, metadata map[string]string) bool {
func (mh *MatchHandler) QueueJoinAttempt(ctx context.Context, resultCh chan<- *MatchJoinResult, userID, sessionID uuid.UUID, username, node string, metadata map[string]string) bool {
	if mh.stopped.Load() {
		return false
	}
@@ -359,6 +315,10 @@ func (mh *MatchHandler) QueueJoinAttempt(ctx context.Context, resultCh chan *Mat
		}

		mh.state = state
		if allow {
			// Keep join markers for up to 120 seconds.
			mh.JoinMarkerList.Add(sessionID, mh.tick+(mh.Rate*120))
		}
		resultCh <- &MatchJoinResult{Allow: allow, Reason: reason, Label: mh.core.Label()}
	}

@@ -385,6 +345,10 @@ func (mh *MatchHandler) QueueJoin(joins []*MatchPresence) bool {

		mh.presenceList.Join(joins)

		for _, join := range joins {
			mh.JoinMarkerList.Mark(join.SessionID)
		}

		state, err := mh.core.MatchJoin(mh.tick, mh.state, joins)
		if err != nil {
			mh.Stop()
+151 −0
Original line number Diff line number Diff line
// Copyright 2018 The Nakama Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
	"github.com/gofrs/uuid"
	"sync"
)

// Represents routing and identify information for a single match participant.
type MatchPresence struct {
	Node      string
	UserID    uuid.UUID
	SessionID uuid.UUID
	Username  string
}

func (p *MatchPresence) GetUserId() string {
	return p.UserID.String()
}
func (p *MatchPresence) GetSessionId() string {
	return p.SessionID.String()
}
func (p *MatchPresence) GetNodeId() string {
	return p.Node
}
func (p *MatchPresence) GetHidden() bool {
	return false
}
func (p *MatchPresence) GetPersistence() bool {
	return false
}
func (p *MatchPresence) GetUsername() string {
	return p.Username
}
func (p *MatchPresence) GetStatus() string {
	return ""
}

// Used to monitor when match presences begin and complete their match join process.
type MatchJoinMarker struct {
	expiryTick int64
	ch         chan struct{}
}

type MatchJoinMarkerList struct {
	sync.RWMutex
	joinMarkers map[uuid.UUID]*MatchJoinMarker
}

func (m *MatchJoinMarkerList) Add(sessionID uuid.UUID, expiryTick int64) {
	m.Lock()
	m.joinMarkers[sessionID] = &MatchJoinMarker{
		expiryTick: expiryTick,
		ch:         make(chan struct{}),
	}
	m.Unlock()
}

func (m *MatchJoinMarkerList) Get(sessionID uuid.UUID) <-chan struct{} {
	var ch chan struct{}
	m.RLock()
	if joinMarker, ok := m.joinMarkers[sessionID]; ok {
		ch = joinMarker.ch
	}
	m.RUnlock()
	return ch
}

func (m *MatchJoinMarkerList) Mark(sessionID uuid.UUID) {
	m.RLock()
	if joinMarker, ok := m.joinMarkers[sessionID]; ok {
		close(joinMarker.ch)
	}
	m.RUnlock()
}

func (m *MatchJoinMarkerList) ClearExpired(tick int64) {
	m.Lock()
	for sessionID, joinMarker := range m.joinMarkers {
		if joinMarker.expiryTick <= tick {
			delete(m.joinMarkers, sessionID)
		}
	}
	m.Unlock()
}

// Maintains the match presences for routing and validation purposes.
type MatchPresenceList struct {
	sync.RWMutex
	presences []*PresenceID
}

func (m *MatchPresenceList) Join(joins []*MatchPresence) {
	m.Lock()
	for _, join := range joins {
		m.presences = append(m.presences, &PresenceID{
			Node:      join.Node,
			SessionID: join.SessionID,
		})
	}
	m.Unlock()
}

func (m *MatchPresenceList) Leave(leaves []*MatchPresence) {
	m.Lock()
	for _, leave := range leaves {
		for i, presenceID := range m.presences {
			if presenceID.SessionID == leave.SessionID && presenceID.Node == leave.Node {
				m.presences = append(m.presences[:i], m.presences[i+1:]...)
				break
			}
		}
	}
	m.Unlock()
}

func (m *MatchPresenceList) Contains(presence *PresenceID) bool {
	var found bool
	m.RLock()
	for _, p := range m.presences {
		if p.SessionID == presence.SessionID && p.Node == p.Node {
			found = true
			break
		}
	}
	m.RUnlock()
	return found
}

func (m *MatchPresenceList) List() []*PresenceID {
	m.RLock()
	list := make([]*PresenceID, 0, len(m.presences))
	for _, presence := range m.presences {
		list = append(list, presence)
	}
	m.RUnlock()
	return list
}
+32 −29
Original line number Diff line number Diff line
@@ -43,6 +43,7 @@ var (

	ErrMatchLabelTooLong     = errors.New("match label too long, must be 0-2048 bytes")
	ErrDeferredBroadcastFull = errors.New("too many deferred message broadcasts per tick")
	ErrNoJoinMarker          = errors.New("no join marker received")
)

type MatchIndexEntry struct {
@@ -51,35 +52,6 @@ type MatchIndexEntry struct {
	LabelString string                 `json:"label_string"`
}

type MatchPresence struct {
	Node      string
	UserID    uuid.UUID
	SessionID uuid.UUID
	Username  string
}

func (p *MatchPresence) GetUserId() string {
	return p.UserID.String()
}
func (p *MatchPresence) GetSessionId() string {
	return p.SessionID.String()
}
func (p *MatchPresence) GetNodeId() string {
	return p.Node
}
func (p *MatchPresence) GetHidden() bool {
	return false
}
func (p *MatchPresence) GetPersistence() bool {
	return false
}
func (p *MatchPresence) GetUsername() string {
	return p.Username
}
func (p *MatchPresence) GetStatus() string {
	return ""
}

type MatchJoinResult struct {
	Allow  bool
	Reason string
@@ -119,6 +91,8 @@ type MatchRegistry interface {
	// Pass a data payload (usually from a user) to the appropriate match handler.
	// Assumes that the data sender has already been validated as a match participant before this call.
	SendData(id uuid.UUID, node string, userID, sessionID uuid.UUID, username, fromNode string, opCode int64, data []byte, receiveTime int64)
	// Wait for the match to confirm a user has completed their join process.
	AwaitJoinMarker(ctx context.Context, id uuid.UUID, node string, sessionID uuid.UUID, fromNode string) error
}

type LocalMatchRegistry struct {
@@ -631,3 +605,32 @@ func (r *LocalMatchRegistry) SendData(id uuid.UUID, node string, userID, session
		ReceiveTime: receiveTime,
	})
}

func (r *LocalMatchRegistry) AwaitJoinMarker(ctx context.Context, id uuid.UUID, node string, sessionID uuid.UUID, fromNode string) error {
	if node != r.node {
		return ErrNoJoinMarker
	}

	var mh *MatchHandler
	var ok bool
	r.RLock()
	mh, ok = r.matches[id]
	r.RUnlock()
	if !ok {
		return ErrNoJoinMarker
	}

	ch := mh.JoinMarkerList.Get(sessionID)
	if ch == nil {
		return ErrNoJoinMarker
	}

	select {
	case <-ctx.Done():
		return ctx.Err()
	case <-ch:
		// Join marker received.
	}

	return nil
}
+15 −4
Original line number Diff line number Diff line
@@ -15,6 +15,7 @@
package server

import (
	"context"
	"fmt"
	"strings"

@@ -196,10 +197,6 @@ func (p *Pipeline) matchJoin(logger *zap.Logger, session Session, envelope *rtap
			}}})
			return
		}
		if mode == StreamModeMatchAuthoritative {
			// If we've reached here, it was an accepted authoritative join.
			label = &wrappers.StringValue{Value: l}
		}
		m := PresenceMeta{
			Username: username,
			Format:   session.Format(),
@@ -208,6 +205,20 @@ func (p *Pipeline) matchJoin(logger *zap.Logger, session Session, envelope *rtap
			// Presence creation was rejected due to `allowIfFirstForSession` flag, session is gone so no need to reply.
			return
		}
		if mode == StreamModeMatchAuthoritative {
			// If we've reached here, it was an accepted authoritative join.
			ctx, ctxCancelFn := context.WithTimeout(session.Context(), 5*time.Second)
			if err := p.matchRegistry.AwaitJoinMarker(ctx, matchID, node, session.ID(), p.node); err != nil {
				if err != context.Canceled {
					ctxCancelFn()
				}
				// There was an error or a timeout waiting for the join marker, return to the client anyway since the tracker update was successful.
				logger.Error("Error waiting for match join marker", zap.Error(err))
			} else {
				ctxCancelFn()
			}
			label = &wrappers.StringValue{Value: l}
		}
		meta = &m
	} else if mode == StreamModeMatchAuthoritative {
		// The user was already in the match, and it's an authoritative match.
Loading