Commit fbd0cc7f authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Clean up join marker handling.

parent f405964f
Loading
Loading
Loading
Loading
+54 −42
Original line number Diff line number Diff line
@@ -72,7 +72,7 @@ type MatchHandler struct {
	router        MessageRouter

	JoinMarkerList *MatchJoinMarkerList
	presenceList   *MatchPresenceList
	PresenceList   *MatchPresenceList
	core           RuntimeMatchCore

	// Identification not (directly) controlled by match init.
@@ -102,9 +102,7 @@ type MatchHandler struct {
}

func NewMatchHandler(logger *zap.Logger, config Config, matchRegistry MatchRegistry, router MessageRouter, core RuntimeMatchCore, id uuid.UUID, node string, params map[string]interface{}) (*MatchHandler, error) {
	presenceList := &MatchPresenceList{
		presences: make([]*PresenceID, 0, 10),
	}
	presenceList := NewMatchPresenceList()

	deferredCh := make(chan *DeferredMessage, config.GetMatch().DeferredQueueSize)
	deferMessageFn := func(msg *DeferredMessage) error {
@@ -132,10 +130,8 @@ func NewMatchHandler(logger *zap.Logger, config Config, matchRegistry MatchRegis
		matchRegistry: matchRegistry,
		router:        router,

		JoinMarkerList: &MatchJoinMarkerList{
			joinMarkers: make(map[uuid.UUID]*MatchJoinMarker),
		},
		presenceList: presenceList,
		JoinMarkerList: NewMatchJoinMarkerList(config, int64(rateInt)),
		PresenceList:   presenceList,
		core:           core,

		ID:    id,
@@ -277,7 +273,11 @@ func loop(mh *MatchHandler) {

	// Every 30 seconds clear expired join markers.
	if mh.tick%(mh.Rate*30) == 0 {
		mh.JoinMarkerList.ClearExpired(mh.tick)
		presences := mh.JoinMarkerList.ClearExpired(mh.tick)
		if len(presences) != 0 {
			// Doesn't matter if the call queue was full here. If the match is being closed then leaves don't matter anyway.
			mh.QueueLeave(presences)
		}
	}

	mh.state = state
@@ -320,9 +320,11 @@ func (mh *MatchHandler) QueueJoinAttempt(ctx context.Context, resultCh chan<- *M

		mh.state = state
		if allow {
			// Keep join markers for up to 120 seconds.
			mh.JoinMarkerList.Add(sessionID, mh.tick+(mh.Rate*120))
			presence := &MatchPresence{Node: node, UserID: userID, SessionID: sessionID, Username: username}
			mh.JoinMarkerList.Add(presence, mh.tick)
			mh.QueueJoin([]*MatchPresence{presence}, false)
		}
		// Signal client.
		resultCh <- &MatchJoinResult{Allow: allow, Reason: reason, Label: mh.core.Label()}
	}

@@ -337,7 +339,7 @@ func (mh *MatchHandler) QueueJoinAttempt(ctx context.Context, resultCh chan<- *M
	}
}

func (mh *MatchHandler) QueueJoin(joins []*MatchPresence) bool {
func (mh *MatchHandler) QueueJoin(joins []*MatchPresence, mark bool) bool {
	if mh.stopped.Load() {
		return false
	}
@@ -347,13 +349,17 @@ func (mh *MatchHandler) QueueJoin(joins []*MatchPresence) bool {
			return
		}

		mh.presenceList.Join(joins)

		// Just marking joins.
		if mark {
			for _, join := range joins {
				mh.JoinMarkerList.Mark(join.SessionID)
			}
			return
		}

		state, err := mh.core.MatchJoin(mh.tick, mh.state, joins)
		processed := mh.PresenceList.Join(joins)
		if len(processed) != 0 {
			state, err := mh.core.MatchJoin(mh.tick, mh.state, processed)
			if err != nil {
				mh.Stop()
				mh.logger.Warn("Stopping match after error from match_join execution", zap.Int64("tick", mh.tick), zap.Error(err))
@@ -367,6 +373,7 @@ func (mh *MatchHandler) QueueJoin(joins []*MatchPresence) bool {

			mh.state = state
		}
	}

	return mh.queueCall(join)
}
@@ -381,7 +388,11 @@ func (mh *MatchHandler) QueueLeave(leaves []*MatchPresence) bool {
			return
		}

		mh.presenceList.Leave(leaves)
		processed := mh.PresenceList.Leave(leaves)
		if len(processed) != 0 {
			for _, leave := range processed {
				mh.JoinMarkerList.Mark(leave.SessionID)
			}

			state, err := mh.core.MatchLeave(mh.tick, mh.state, leaves)
			if err != nil {
@@ -397,6 +408,7 @@ func (mh *MatchHandler) QueueLeave(leaves []*MatchPresence) bool {

			mh.state = state
		}
	}

	return mh.queueCall(leave)
}
+65 −36
Original line number Diff line number Diff line
@@ -52,84 +52,109 @@ func (p *MatchPresence) GetStatus() string {

// Used to monitor when match presences begin and complete their match join process.
type MatchJoinMarker struct {
	presence   *MatchPresence
	expiryTick int64
	marked     *atomic.Bool
	ch         chan struct{}
}

type MatchJoinMarkerList struct {
	sync.RWMutex
	expiryDelayMs int64
	tickRate      int64
	joinMarkers   map[uuid.UUID]*MatchJoinMarker
}

func (m *MatchJoinMarkerList) Add(sessionID uuid.UUID, expiryTick int64) {
	m.Lock()
	m.joinMarkers[sessionID] = &MatchJoinMarker{
		expiryTick: expiryTick,
		marked:     atomic.NewBool(false),
		ch:         make(chan struct{}),
func NewMatchJoinMarkerList(config Config, tickRate int64) *MatchJoinMarkerList {
	return &MatchJoinMarkerList{
		expiryDelayMs: int64(config.GetMatch().JoinMarkerDeadlineMs),
		tickRate:      tickRate,
		joinMarkers:   make(map[uuid.UUID]*MatchJoinMarker),
	}
	m.Unlock()
}

func (m *MatchJoinMarkerList) Get(sessionID uuid.UUID) <-chan struct{} {
	var ch chan struct{}
	m.RLock()
	if joinMarker, ok := m.joinMarkers[sessionID]; ok {
		ch = joinMarker.ch
func (m *MatchJoinMarkerList) Add(presence *MatchPresence, currentTick int64) {
	m.Lock()
	m.joinMarkers[presence.SessionID] = &MatchJoinMarker{
		presence:   presence,
		expiryTick: currentTick + (m.tickRate * (m.expiryDelayMs / 1000)),
	}
	m.RUnlock()
	return ch
	m.Unlock()
}

func (m *MatchJoinMarkerList) Mark(sessionID uuid.UUID) {
	m.RLock()
	if joinMarker, ok := m.joinMarkers[sessionID]; ok {
		if joinMarker.marked.CAS(false, true) {
			close(joinMarker.ch)
		}
	}
	m.RUnlock()
	m.Lock()
	delete(m.joinMarkers, sessionID)
	m.Unlock()
}

func (m *MatchJoinMarkerList) ClearExpired(tick int64) {
func (m *MatchJoinMarkerList) ClearExpired(tick int64) []*MatchPresence {
	presences := make([]*MatchPresence, 0)
	m.Lock()
	for sessionID, joinMarker := range m.joinMarkers {
		if joinMarker.expiryTick <= tick {
			presences = append(presences, joinMarker.presence)
			delete(m.joinMarkers, sessionID)
		}
	}
	m.Unlock()
	return presences
}

// Maintains the match presences for routing and validation purposes.
type MatchPresenceList struct {
	sync.RWMutex
	size        *atomic.Int32
	presences   []*PresenceID
	presenceMap map[uuid.UUID]struct{}
}

func NewMatchPresenceList() *MatchPresenceList {
	return &MatchPresenceList{
		size:        atomic.NewInt32(0),
		presences:   make([]*PresenceID, 0, 10),
		presenceMap: make(map[uuid.UUID]struct{}, 10),
	}
}

func (m *MatchPresenceList) Join(joins []*MatchPresence) {
func (m *MatchPresenceList) Join(joins []*MatchPresence) []*MatchPresence {
	processed := make([]*MatchPresence, 0, len(joins))
	m.Lock()
	for _, join := range joins {
		if _, ok := m.presenceMap[join.SessionID]; !ok {
			m.presences = append(m.presences, &PresenceID{
				Node:      join.Node,
				SessionID: join.SessionID,
			})
			m.presenceMap[join.SessionID] = struct{}{}
			processed = append(processed, join)
		}
	}
	m.Unlock()
	if l := len(processed); l != 0 {
		m.size.Add(int32(l))
	}
	return processed
}

func (m *MatchPresenceList) Leave(leaves []*MatchPresence) {
func (m *MatchPresenceList) Leave(leaves []*MatchPresence) []*MatchPresence {
	processed := make([]*MatchPresence, 0, len(leaves))
	m.Lock()
	for _, leave := range leaves {
		if _, ok := m.presenceMap[leave.SessionID]; ok {
			for i, presenceID := range m.presences {
				if presenceID.SessionID == leave.SessionID && presenceID.Node == leave.Node {
					m.presences = append(m.presences[:i], m.presences[i+1:]...)
					break
				}
			}
			delete(m.presenceMap, leave.SessionID)
			processed = append(processed, leave)
		}
	}
	m.Unlock()
	if l := len(processed); l != 0 {
		m.size.Sub(int32(l))
	}
	return processed
}

func (m *MatchPresenceList) Contains(presence *PresenceID) bool {
@@ -154,3 +179,7 @@ func (m *MatchPresenceList) List() []*PresenceID {
	m.RUnlock()
	return list
}

func (m *MatchPresenceList) Size() int {
	return int(m.size.Load())
}
+1 −32
Original line number Diff line number Diff line
@@ -93,8 +93,6 @@ type MatchRegistry interface {
	// Pass a data payload (usually from a user) to the appropriate match handler.
	// Assumes that the data sender has already been validated as a match participant before this call.
	SendData(id uuid.UUID, node string, userID, sessionID uuid.UUID, username, fromNode string, opCode int64, data []byte, receiveTime int64)
	// Wait for the match to confirm a user has completed their join process.
	AwaitJoinMarker(ctx context.Context, id uuid.UUID, node string, sessionID uuid.UUID, fromNode string) error
}

type LocalMatchRegistry struct {
@@ -567,7 +565,7 @@ func (r *LocalMatchRegistry) Join(id uuid.UUID, presences []*MatchPresence) {
	}

	// Doesn't matter if the call queue was full here. If the match is being closed then joins don't matter anyway.
	mh.QueueJoin(presences)
	mh.QueueJoin(presences, true)
}

func (r *LocalMatchRegistry) Leave(id uuid.UUID, presences []*MatchPresence) {
@@ -617,32 +615,3 @@ func (r *LocalMatchRegistry) SendData(id uuid.UUID, node string, userID, session
		ReceiveTime: receiveTime,
	})
}

func (r *LocalMatchRegistry) AwaitJoinMarker(ctx context.Context, id uuid.UUID, node string, sessionID uuid.UUID, fromNode string) error {
	if node != r.node {
		return ErrNoJoinMarker
	}

	var mh *MatchHandler
	var ok bool
	r.RLock()
	mh, ok = r.matches[id]
	r.RUnlock()
	if !ok {
		return ErrNoJoinMarker
	}

	ch := mh.JoinMarkerList.Get(sessionID)
	if ch == nil {
		return ErrNoJoinMarker
	}

	select {
	case <-ctx.Done():
		return ctx.Err()
	case <-ch:
		// Join marker received.
	}

	return nil
}
+0 −7
Original line number Diff line number Diff line
@@ -15,7 +15,6 @@
package server

import (
	"context"
	"fmt"
	"strings"

@@ -207,12 +206,6 @@ func (p *Pipeline) matchJoin(logger *zap.Logger, session Session, envelope *rtap
		}
		if mode == StreamModeMatchAuthoritative {
			// If we've reached here, it was an accepted authoritative join.
			ctx, ctxCancelFn := context.WithTimeout(session.Context(), time.Duration(p.config.GetMatch().JoinMarkerDeadlineMs)*time.Millisecond)
			if err := p.matchRegistry.AwaitJoinMarker(ctx, matchID, node, session.ID(), p.node); err != nil {
				// There was an error or a timeout waiting for the join marker, return to the client anyway since the tracker update was successful.
				logger.Error("Error waiting for match join marker", zap.Error(err))
			}
			ctxCancelFn()
			label = &wrappers.StringValue{Value: l}
		}
		meta = &m