Loading server/config.go +1 −1 Original line number Diff line number Diff line Loading @@ -946,7 +946,7 @@ func NewMatchmakerConfig() *MatchmakerConfig { IntervalSec: 15, MaxIntervals: 2, BatchPoolSize: 32, RevPrecision: true, RevPrecision: false, RevThreshold: 1, } } Loading server/matchmaker.go +90 −83 Original line number Diff line number Diff line Loading @@ -400,7 +400,8 @@ func (m *LocalMatchmaker) Process() { continue } outerMutualMatch, err := validateMatch(m, threshold, indexReader, hitIndex.ParsedQuery, hit.ID, ticket) if !threshold && m.config.GetMatchmaker().RevPrecision { outerMutualMatch, err := validateMatch(m, indexReader, hitIndex.ParsedQuery, hit.ID, ticket) if err != nil { m.logger.Error("error validating mutual match", zap.Error(err)) continue Loading @@ -408,6 +409,7 @@ func (m *LocalMatchmaker) Process() { // This search hit is not a mutual match with the outer ticket. continue } } if index.MaxCount < hitIndex.MaxCount && hitIndex.Intervals <= m.config.GetMatchmaker().MaxIntervals { // This match would be less than the search hit's preferred max, and they can still wait. Let them wait more. Loading Loading @@ -444,7 +446,8 @@ func (m *LocalMatchmaker) Process() { sessionIdConflict = true break } entryMatchesSearchHitQuery, err := validateMatch(m, threshold, indexReader, hitIndex.ParsedQuery, hit.ID, entry.Ticket) if !threshold && m.config.GetMatchmaker().RevPrecision { entryMatchesSearchHitQuery, err := validateMatch(m, indexReader, hitIndex.ParsedQuery, hit.ID, entry.Ticket) if err != nil { mutualMatchConflict = true m.logger.Error("error validating mutual match", zap.Error(err)) Loading @@ -456,7 +459,7 @@ func (m *LocalMatchmaker) Process() { } // MatchmakerEntry does not have the query, read it out of indexes. if entriesIndexEntry, ok := m.indexes[entry.Ticket]; ok { searchHitMatchesEntryQuery, err := validateMatch(m, threshold, indexReader, entriesIndexEntry.ParsedQuery, entry.Ticket, hit.ID) searchHitMatchesEntryQuery, err := validateMatch(m, indexReader, entriesIndexEntry.ParsedQuery, entry.Ticket, hit.ID) if err != nil { mutualMatchConflict = true m.logger.Error("error validating mutual match", zap.Error(err)) Loading @@ -469,7 +472,7 @@ func (m *LocalMatchmaker) Process() { } else { m.logger.Warn("matchmaker missing index entry for entry combo") } } } if sessionIdConflict || mutualMatchConflict { continue Loading Loading @@ -609,7 +612,11 @@ func (m *LocalMatchmaker) Process() { m.Unlock() if matchedEntriesCount := len(matchedEntries); matchedEntriesCount > 0 { wg := &sync.WaitGroup{} wg.Add(matchedEntriesCount) for _, entries := range matchedEntries { go func(entries []*MatchmakerEntry) { var tokenOrMatchID string var isMatchID bool var err error Loading Loading @@ -664,9 +671,13 @@ func (m *LocalMatchmaker) Process() { // Route outgoing message. m.router.SendToPresenceIDs(m.logger, []*PresenceID{{Node: entry.Presence.Node, SessionID: entry.Presence.SessionID}}, outgoing, true) } wg.Done() }(entries) } wg.Wait() if m.matchedEntriesFn != nil { go m.matchedEntriesFn(matchedEntries) } if m.matchedEntriesFn != nil && len(matchedEntries) > 0 { m.matchedEntriesFn(matchedEntries) } } Loading Loading @@ -1293,11 +1304,7 @@ func MapMatchmakerIndex(id string, in *MatchmakerIndex) (*bluge.Document, error) return rv, nil } func validateMatch(m *LocalMatchmaker, threshold bool, r *bluge.Reader, fromTicketQuery bluge.Query, fromTicket, toTicket string) (bool, error) { if threshold || !m.config.GetMatchmaker().RevPrecision { return true, nil } func validateMatch(m *LocalMatchmaker, r *bluge.Reader, fromTicketQuery bluge.Query, fromTicket, toTicket string) (bool, error) { cache, found := m.revCache[fromTicket] if found { if cachedResult, seenBefore := cache[toTicket]; seenBefore { Loading Loading
server/config.go +1 −1 Original line number Diff line number Diff line Loading @@ -946,7 +946,7 @@ func NewMatchmakerConfig() *MatchmakerConfig { IntervalSec: 15, MaxIntervals: 2, BatchPoolSize: 32, RevPrecision: true, RevPrecision: false, RevThreshold: 1, } } Loading
server/matchmaker.go +90 −83 Original line number Diff line number Diff line Loading @@ -400,7 +400,8 @@ func (m *LocalMatchmaker) Process() { continue } outerMutualMatch, err := validateMatch(m, threshold, indexReader, hitIndex.ParsedQuery, hit.ID, ticket) if !threshold && m.config.GetMatchmaker().RevPrecision { outerMutualMatch, err := validateMatch(m, indexReader, hitIndex.ParsedQuery, hit.ID, ticket) if err != nil { m.logger.Error("error validating mutual match", zap.Error(err)) continue Loading @@ -408,6 +409,7 @@ func (m *LocalMatchmaker) Process() { // This search hit is not a mutual match with the outer ticket. continue } } if index.MaxCount < hitIndex.MaxCount && hitIndex.Intervals <= m.config.GetMatchmaker().MaxIntervals { // This match would be less than the search hit's preferred max, and they can still wait. Let them wait more. Loading Loading @@ -444,7 +446,8 @@ func (m *LocalMatchmaker) Process() { sessionIdConflict = true break } entryMatchesSearchHitQuery, err := validateMatch(m, threshold, indexReader, hitIndex.ParsedQuery, hit.ID, entry.Ticket) if !threshold && m.config.GetMatchmaker().RevPrecision { entryMatchesSearchHitQuery, err := validateMatch(m, indexReader, hitIndex.ParsedQuery, hit.ID, entry.Ticket) if err != nil { mutualMatchConflict = true m.logger.Error("error validating mutual match", zap.Error(err)) Loading @@ -456,7 +459,7 @@ func (m *LocalMatchmaker) Process() { } // MatchmakerEntry does not have the query, read it out of indexes. if entriesIndexEntry, ok := m.indexes[entry.Ticket]; ok { searchHitMatchesEntryQuery, err := validateMatch(m, threshold, indexReader, entriesIndexEntry.ParsedQuery, entry.Ticket, hit.ID) searchHitMatchesEntryQuery, err := validateMatch(m, indexReader, entriesIndexEntry.ParsedQuery, entry.Ticket, hit.ID) if err != nil { mutualMatchConflict = true m.logger.Error("error validating mutual match", zap.Error(err)) Loading @@ -469,7 +472,7 @@ func (m *LocalMatchmaker) Process() { } else { m.logger.Warn("matchmaker missing index entry for entry combo") } } } if sessionIdConflict || mutualMatchConflict { continue Loading Loading @@ -609,7 +612,11 @@ func (m *LocalMatchmaker) Process() { m.Unlock() if matchedEntriesCount := len(matchedEntries); matchedEntriesCount > 0 { wg := &sync.WaitGroup{} wg.Add(matchedEntriesCount) for _, entries := range matchedEntries { go func(entries []*MatchmakerEntry) { var tokenOrMatchID string var isMatchID bool var err error Loading Loading @@ -664,9 +671,13 @@ func (m *LocalMatchmaker) Process() { // Route outgoing message. m.router.SendToPresenceIDs(m.logger, []*PresenceID{{Node: entry.Presence.Node, SessionID: entry.Presence.SessionID}}, outgoing, true) } wg.Done() }(entries) } wg.Wait() if m.matchedEntriesFn != nil { go m.matchedEntriesFn(matchedEntries) } if m.matchedEntriesFn != nil && len(matchedEntries) > 0 { m.matchedEntriesFn(matchedEntries) } } Loading Loading @@ -1293,11 +1304,7 @@ func MapMatchmakerIndex(id string, in *MatchmakerIndex) (*bluge.Document, error) return rv, nil } func validateMatch(m *LocalMatchmaker, threshold bool, r *bluge.Reader, fromTicketQuery bluge.Query, fromTicket, toTicket string) (bool, error) { if threshold || !m.config.GetMatchmaker().RevPrecision { return true, nil } func validateMatch(m *LocalMatchmaker, r *bluge.Reader, fromTicketQuery bluge.Query, fromTicket, toTicket string) (bool, error) { cache, found := m.revCache[fromTicket] if found { if cachedResult, seenBefore := cache[toTicket]; seenBefore { Loading