Commit c3931524 authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Clean up match join handling.

parent 12d05213
Loading
Loading
Loading
Loading
+32 −17
Original line number Diff line number Diff line
@@ -103,15 +103,20 @@ func (m *MatchJoinMarkerList) ClearExpired(tick int64) []*MatchPresence {
type MatchPresenceList struct {
	sync.RWMutex
	size        *atomic.Int32
	presences   []*PresenceID
	presenceMap map[uuid.UUID]struct{}
	presences   []*MatchPresenceListItem
	presenceMap map[uuid.UUID]string
}

type MatchPresenceListItem struct {
	PresenceID *PresenceID
	Presence   *MatchPresence
}

func NewMatchPresenceList() *MatchPresenceList {
	return &MatchPresenceList{
		size:        atomic.NewInt32(0),
		presences:   make([]*PresenceID, 0, 10),
		presenceMap: make(map[uuid.UUID]struct{}, 10),
		presences:   make([]*MatchPresenceListItem, 0, 10),
		presenceMap: make(map[uuid.UUID]string, 10),
	}
}

@@ -120,11 +125,14 @@ func (m *MatchPresenceList) Join(joins []*MatchPresence) []*MatchPresence {
	m.Lock()
	for _, join := range joins {
		if _, ok := m.presenceMap[join.SessionID]; !ok {
			m.presences = append(m.presences, &PresenceID{
			m.presences = append(m.presences, &MatchPresenceListItem{
				PresenceID: &PresenceID{
					Node:      join.Node,
					SessionID: join.SessionID,
				},
				Presence: join,
			})
			m.presenceMap[join.SessionID] = struct{}{}
			m.presenceMap[join.SessionID] = join.Node
			processed = append(processed, join)
		}
	}
@@ -140,8 +148,8 @@ func (m *MatchPresenceList) Leave(leaves []*MatchPresence) []*MatchPresence {
	m.Lock()
	for _, leave := range leaves {
		if _, ok := m.presenceMap[leave.SessionID]; ok {
			for i, presenceID := range m.presences {
				if presenceID.SessionID == leave.SessionID && presenceID.Node == leave.Node {
			for i, presence := range m.presences {
				if presence.PresenceID.SessionID == leave.SessionID && presence.PresenceID.Node == leave.Node {
					m.presences = append(m.presences[:i], m.presences[i+1:]...)
					break
				}
@@ -160,21 +168,28 @@ func (m *MatchPresenceList) Leave(leaves []*MatchPresence) []*MatchPresence {
func (m *MatchPresenceList) Contains(presence *PresenceID) bool {
	var found bool
	m.RLock()
	for _, p := range m.presences {
		if p.SessionID == presence.SessionID && p.Node == p.Node {
			found = true
			break
		}
	if node, ok := m.presenceMap[presence.SessionID]; ok {
		found = node == presence.Node
	}
	m.RUnlock()
	return found
}

func (m *MatchPresenceList) List() []*PresenceID {
func (m *MatchPresenceList) ListPresenceIDs() []*PresenceID {
	m.RLock()
	list := make([]*PresenceID, 0, len(m.presences))
	for _, presence := range m.presences {
		list = append(list, presence)
		list = append(list, presence.PresenceID)
	}
	m.RUnlock()
	return list
}

func (m *MatchPresenceList) ListPresences() []*MatchPresence {
	m.RLock()
	list := make([]*MatchPresence, 0, len(m.presences))
	for _, presence := range m.presences {
		list = append(list, presence.Presence)
	}
	m.RUnlock()
	return list
+13 −9
Original line number Diff line number Diff line
@@ -78,8 +78,8 @@ type MatchRegistry interface {
	// Returns the total number of currently active authoritative matches.
	Count() int

	// Pass a user join attempt to a match handler. Returns if the match was found, if the join was accepted, a reason for any rejection, and the match label.
	JoinAttempt(ctx context.Context, id uuid.UUID, node string, userID, sessionID uuid.UUID, username, fromNode string, metadata map[string]string) (bool, bool, string, string)
	// Pass a user join attempt to a match handler. Returns if the match was found, if the join was accepted, if it's a new user for this match, a reason for any rejection, the match label, and the list of existing match participants.
	JoinAttempt(ctx context.Context, id uuid.UUID, node string, userID, sessionID uuid.UUID, username, fromNode string, metadata map[string]string) (bool, bool, bool, string, string, []*MatchPresence)
	// Notify a match handler that one or more users have successfully joined the match.
	// Expects that the caller has already determined the match is hosted on the current node.
	Join(id uuid.UUID, presences []*MatchPresence)
@@ -94,7 +94,6 @@ type MatchRegistry interface {
}

type LocalMatchRegistry struct {
	sync.RWMutex
	logger  *zap.Logger
	config  Config
	tracker Tracker
@@ -455,21 +454,26 @@ func (r *LocalMatchRegistry) Count() int {
	return int(r.matchCount.Load())
}

func (r *LocalMatchRegistry) JoinAttempt(ctx context.Context, id uuid.UUID, node string, userID, sessionID uuid.UUID, username, fromNode string, metadata map[string]string) (bool, bool, string, string) {
func (r *LocalMatchRegistry) JoinAttempt(ctx context.Context, id uuid.UUID, node string, userID, sessionID uuid.UUID, username, fromNode string, metadata map[string]string) (bool, bool, bool, string, string, []*MatchPresence) {
	if node != r.node {
		return false, false, "", ""
		return false, false, false, "", "", nil
	}

	m, ok := r.matches.Load(id)
	if !ok {
		return false, false, "", ""
		return false, false, false, "", "", nil
	}
	mh := m.(*MatchHandler)

	if mh.PresenceList.Contains(&PresenceID{Node: fromNode, SessionID: sessionID}) {
		// The user is already part of this match.
		return true, true, false, "", mh.Label(), mh.PresenceList.ListPresences()
	}

	resultCh := make(chan *MatchJoinResult, 1)
	if !mh.QueueJoinAttempt(ctx, resultCh, userID, sessionID, username, fromNode, metadata) {
		// The match call queue was full, so will be closed and therefore can't be joined.
		return true, false, "Match is not currently accepting join requests", ""
		return true, false, false, "Match is not currently accepting join requests", "", nil
	}

	// Set up a limit to how long the call will wait, default is 10 seconds.
@@ -477,12 +481,12 @@ func (r *LocalMatchRegistry) JoinAttempt(ctx context.Context, id uuid.UUID, node
	select {
	case <-timer.C:
		// The join attempt has timed out, join is assumed to be rejected.
		return true, false, "", ""
		return true, false, false, "", "", nil
	case r := <-resultCh:
		// Doesn't matter if the timer has fired concurrently, we're in the desired case anyway.
		timer.Stop()
		// The join attempt has returned a result.
		return true, r.Allow, r.Reason, r.Label
		return true, r.Allow, true, r.Reason, r.Label, mh.PresenceList.ListPresences()
	}
}

+67 −68
Original line number Diff line number Diff line
@@ -145,16 +145,16 @@ func (p *Pipeline) matchJoin(logger *zap.Logger, session Session, envelope *rtap
		return
	}

	// Decide if it's an authoritative or relayed match.
	mode := StreamModeMatchRelayed
	if node != "" {
		mode = StreamModeMatchAuthoritative
	}

	var mode uint8
	var label *wrappers.StringValue
	var presences []*rtapi.UserPresence
	username := session.Username()
	if node == "" {
		// Relayed match.
		mode = StreamModeMatchRelayed
		stream := PresenceStream{Mode: mode, Subject: matchID, Label: node}

	// Relayed matches must 'exist' by already having some members, unless they're being joined via a token.
	if mode == StreamModeMatchRelayed && !allowEmpty && !p.tracker.StreamExists(stream) {
		if !allowEmpty && !p.tracker.StreamExists(stream) {
			session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
				Code:    int32(rtapi.Error_MATCH_NOT_FOUND),
				Message: "Match not found",
@@ -162,20 +162,38 @@ func (p *Pipeline) matchJoin(logger *zap.Logger, session Session, envelope *rtap
			return
		}

	var label *wrappers.StringValue
	meta := p.tracker.GetLocalBySessionIDStreamUserID(session.ID(), stream, session.UserID())
	isNew := meta == nil
		isNew := p.tracker.GetLocalBySessionIDStreamUserID(session.ID(), stream, session.UserID()) == nil
		if isNew {
		username := session.Username()
		found := true
		allow := true
		var reason string
		var l string
		// The user is not yet part of the match, attempt to join.
		if mode == StreamModeMatchAuthoritative {
			// If it's an authoritative match, ask the match handler if it will allow the join.
			found, allow, reason, l = p.matchRegistry.JoinAttempt(session.Context(), matchID, node, session.UserID(), session.ID(), username, p.node, incoming.Metadata)
			m := PresenceMeta{
				Username: username,
				Format:   session.Format(),
			}
			if success, _ := p.tracker.Track(session.ID(), stream, session.UserID(), m, false); !success {
				// Presence creation was rejected due to `allowIfFirstForSession` flag, session is gone so no need to reply.
				return
			}
		}

		// Whether the user has just (successfully) joined the match or was already a member, return the match info anyway.
		ps := p.tracker.ListByStream(stream, false, true)
		presences = make([]*rtapi.UserPresence, 0, len(ps))
		for _, p := range ps {
			if isNew && p.UserID == session.UserID() && p.ID.SessionID == session.ID() {
				// Ensure the user themselves does not appear in the list of existing match presences.
				// Only for new joins, not if the user is joining a match they're already part of.
				continue
			}
			presences = append(presences, &rtapi.UserPresence{
				UserId:    p.UserID.String(),
				SessionId: p.ID.SessionID.String(),
				Username:  p.Meta.Username,
			})
		}
	} else {
		// Authoritative match.
		mode = StreamModeMatchAuthoritative

		found, allow, isNew, reason, l, ps := p.matchRegistry.JoinAttempt(session.Context(), matchID, node, session.UserID(), session.ID(), username, p.node, incoming.Metadata)
		if !found {
			// Match did not exist.
			session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
@@ -196,50 +214,31 @@ func (p *Pipeline) matchJoin(logger *zap.Logger, session Session, envelope *rtap
			}}})
			return
		}

		if isNew {
			stream := PresenceStream{Mode: mode, Subject: matchID, Label: node}
			m := PresenceMeta{
			Username: username,
				Username: session.Username(),
				Format:   session.Format(),
			}
		if success, _ := p.tracker.Track(session.ID(), stream, session.UserID(), m, false); !success {
			// Presence creation was rejected due to `allowIfFirstForSession` flag, session is gone so no need to reply.
			return
		}
		if mode == StreamModeMatchAuthoritative {
			// If we've reached here, it was an accepted authoritative join.
			label = &wrappers.StringValue{Value: l}
		}
		meta = &m
	} else if mode == StreamModeMatchAuthoritative {
		// The user was already in the match, and it's an authoritative match.
		// Look up the match label to return it anyway.
		l, err := p.matchRegistry.GetMatchLabel(session.Context(), matchID, node)
		if err != nil {
			// There was a problem looking up the label.
			logger.Error("Error looking up match label", zap.String("match_id", matchIDString), zap.String("node", node), zap.Error(err))
			session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
				Code:    int32(rtapi.Error_RUNTIME_EXCEPTION),
				Message: "Match label lookup failed.",
			}}})
			return
		}
		label = &wrappers.StringValue{Value: l}
			p.tracker.Track(session.ID(), stream, session.UserID(), m, false)
		}

	// Whether the user has just (successfully) joined the match or was already a member, return the match info anyway.
	ps := p.tracker.ListByStream(stream, false, true)
	presences := make([]*rtapi.UserPresence, 0, len(ps))
		label = &wrappers.StringValue{Value: l}
		presences = make([]*rtapi.UserPresence, 0, len(ps))
		for _, p := range ps {
		if isNew && p.UserID == session.UserID() && p.ID.SessionID == session.ID() {
			if isNew && p.UserID == session.UserID() && p.SessionID == session.ID() {
				// Ensure the user themselves does not appear in the list of existing match presences.
				// Only for new joins, not if the user is joining a match they're already part of.
				continue
			}
			presences = append(presences, &rtapi.UserPresence{
				UserId:    p.UserID.String(),
			SessionId: p.ID.SessionID.String(),
			Username:  p.Meta.Username,
				SessionId: p.SessionID.String(),
				Username:  p.Username,
			})
		}
	}

	session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Match{Match: &rtapi.Match{
		MatchId:       matchIDString,
@@ -250,7 +249,7 @@ func (p *Pipeline) matchJoin(logger *zap.Logger, session Session, envelope *rtap
		Self: &rtapi.UserPresence{
			UserId:    session.UserID().String(),
			SessionId: session.ID().String(),
			Username:  meta.Username,
			Username:  username,
		},
	}}})
}
+2 −2
Original line number Diff line number Diff line
@@ -253,7 +253,7 @@ func (r *RuntimeGoMatchCore) validateBroadcast(opCode int64, data []byte, presen
			}
		} else {
			// Validate multiple filtered recipients.
			actualPresenceIDs := r.presenceList.List()
			actualPresenceIDs := r.presenceList.ListPresenceIDs()
			for i := 0; i < len(presenceIDs); i++ {
				found := false
				presenceID := presenceIDs[i]
@@ -288,7 +288,7 @@ func (r *RuntimeGoMatchCore) validateBroadcast(opCode int64, data []byte, presen
	}}}

	if presenceIDs == nil {
		presenceIDs = r.presenceList.List()
		presenceIDs = r.presenceList.ListPresenceIDs()
	}

	return presenceIDs, msg, nil
+2 −2
Original line number Diff line number Diff line
@@ -674,7 +674,7 @@ func (r *RuntimeLuaMatchCore) validateBroadcast(l *lua.LState) ([]*PresenceID, *
				return nil, nil
			}
		} else {
			actualPresenceIDs := r.presenceList.List()
			actualPresenceIDs := r.presenceList.ListPresenceIDs()
			for i := 0; i < len(presenceIDs); i++ {
				found := false
				presenceID := presenceIDs[i]
@@ -709,7 +709,7 @@ func (r *RuntimeLuaMatchCore) validateBroadcast(l *lua.LState) ([]*PresenceID, *
	}}}

	if presenceIDs == nil {
		presenceIDs = r.presenceList.List()
		presenceIDs = r.presenceList.ListPresenceIDs()
	}

	return presenceIDs, msg