From 4f95134cf28b4d455c73a82fba4e56ff021d9e11 Mon Sep 17 00:00:00 2001 From: Andrei Mihu Date: Tue, 28 Feb 2023 15:06:17 +0000 Subject: [PATCH] Better handling of matchmaker operations while the interval process is running. --- CHANGELOG.md | 1 + server/matchmaker.go | 379 +++++++++++++++++++------------------- server/matchmaker_test.go | 4 +- 3 files changed, 194 insertions(+), 190 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 058b57a5e..a26910350 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr ### Changed - Improve graceful shutdown of Google IAP receipt processor. - If In-App Purchases validation contain mismatching userIDs, do not return an error. +- Better handling of matchmaker operations while the interval process is running. ### Fixed - Consistent validation of override operator in runtime leaderboard record writes. diff --git a/server/matchmaker.go b/server/matchmaker.go index cef50ed22..71ea6e02e 100644 --- a/server/matchmaker.go +++ b/server/matchmaker.go @@ -23,7 +23,6 @@ import ( "time" "github.com/blugelabs/bluge" - "github.com/blugelabs/bluge/index" "github.com/gofrs/uuid" jwt "github.com/golang-jwt/jwt/v4" "github.com/heroiclabs/nakama-common/rtapi" @@ -107,6 +106,7 @@ type MatchmakerIndex struct { StringProperties map[string]string `json:"-"` NumericProperties map[string]float64 `json:"-"` ParsedQuery bluge.Query `json:"-"` + Entries []*MatchmakerEntry `json:"-"` } type MatchmakerExtract struct { @@ -199,20 +199,17 @@ type LocalMatchmaker struct { ctxCancelFn context.CancelFunc matchedEntriesFn func([][]*MatchmakerEntry) - batch *index.Batch indexWriter *bluge.Writer // All tickets for a session ID. sessionTickets map[string]map[string]struct{} // All tickets for a party ID. partyTickets map[string]map[string]struct{} - // All entries for a given ticket. - entries map[string][]*MatchmakerEntry // Index for each ticket. indexes map[string]*MatchmakerIndex // Indexes that have not yet reached their max interval count. activeIndexes map[string]*MatchmakerIndex // Reverse lookup cache for mutual matching. - revCache map[string]map[string]bool + revCache *MapOf[string, map[string]bool] revThresholdFn func() *time.Timer } @@ -238,14 +235,12 @@ func NewLocalMatchmaker(logger, startupLogger *zap.Logger, config Config, router ctx: ctx, ctxCancelFn: ctxCancelFn, - batch: bluge.NewBatch(), indexWriter: indexWriter, sessionTickets: make(map[string]map[string]struct{}), partyTickets: make(map[string]map[string]struct{}), - entries: make(map[string][]*MatchmakerEntry), indexes: make(map[string]*MatchmakerIndex), activeIndexes: make(map[string]*MatchmakerIndex), - revCache: make(map[string]map[string]bool), + revCache: &MapOf[string, map[string]bool]{}, } if revThreshold := m.config.GetMatchmaker().RevThreshold; revThreshold > 0 && m.config.GetMatchmaker().RevPrecision { @@ -290,30 +285,55 @@ func (m *LocalMatchmaker) Process() { matchedEntries := make([][]*MatchmakerEntry, 0, 5) startTime := time.Now() - - m.Lock() - - activeIndexCount := len(m.activeIndexes) - indexCount := len(m.indexes) - + var activeIndexCount, indexCount int defer func() { m.metrics.Matchmaker(float64(indexCount), float64(activeIndexCount), time.Now().Sub(startTime)) }() + expiredActiveIndexes := make([]string, 0, 10) + + m.Lock() + + activeIndexCount = len(m.activeIndexes) + indexCount = len(m.indexes) + // No active matchmaking tickets, the pool may be non-empty but there are no new tickets to check/query with. if activeIndexCount == 0 { m.Unlock() return } + activeIndexesCopy := make(map[string]*MatchmakerIndex, activeIndexCount) + for ticket, activeIndex := range m.activeIndexes { + activeIndexesCopy[ticket] = activeIndex + } + indexesCopy := make(map[string]*MatchmakerIndex, indexCount) + for ticket, index := range m.indexes { + indexesCopy[ticket] = index + } + + m.Unlock() + + indexReader, err := m.indexWriter.Reader() + if err != nil { + m.logger.Error("error accessing index reader", zap.Error(err)) + return + } + defer func() { + if err := indexReader.Close(); err != nil { + m.logger.Error("error closing index reader", zap.Error(err)) + } + }() + var threshold bool var timer *time.Timer - if m.revThresholdFn != nil { + if m.active.Load() == 1 && m.revThresholdFn != nil { timer = m.revThresholdFn() defer timer.Stop() } - for ticket, index := range m.activeIndexes { + selectedTickets := make(map[string]struct{}, activeIndexCount*2) + for ticket, activeIndex := range activeIndexesCopy { if !threshold && timer != nil { select { case <-timer.C: @@ -322,12 +342,12 @@ func (m *LocalMatchmaker) Process() { } } - index.Intervals++ - lastInterval := index.Intervals >= m.config.GetMatchmaker().MaxIntervals || index.MinCount == index.MaxCount + activeIndex.Intervals++ + lastInterval := activeIndex.Intervals >= m.config.GetMatchmaker().MaxIntervals || activeIndex.MinCount == activeIndex.MaxCount if lastInterval { // Drop from active indexes if it has reached its max intervals, or if its min/max counts are equal. In the // latter case keeping it active would have the same result as leaving it in the pool, so this saves work. - delete(m.activeIndexes, ticket) + expiredActiveIndexes = append(expiredActiveIndexes, ticket) } if m.active.Load() != 1 { @@ -337,61 +357,55 @@ func (m *LocalMatchmaker) Process() { indexQuery := bluge.NewBooleanQuery() // Results must match the query string. - indexQuery.AddMust(index.ParsedQuery) + indexQuery.AddMust(activeIndex.ParsedQuery) // Results must also have compatible min/max ranges, for example 2-4 must not match with 6-8. minCountRange := bluge.NewNumericRangeInclusiveQuery( - float64(index.MinCount), math.Inf(1), true, true). + float64(activeIndex.MinCount), math.Inf(1), true, true). SetField("min_count") indexQuery.AddMust(minCountRange) maxCountRange := bluge.NewNumericRangeInclusiveQuery( - math.Inf(-1), float64(index.MaxCount), true, true). + math.Inf(-1), float64(activeIndex.MaxCount), true, true). SetField("max_count") indexQuery.AddMust(maxCountRange) // Results must not include the current party, if any. - if index.PartyId != "" { - partyIdQuery := bluge.NewTermQuery(index.PartyId) + if activeIndex.PartyId != "" { + partyIdQuery := bluge.NewTermQuery(activeIndex.PartyId) partyIdQuery.SetField("party_id") indexQuery.AddMustNot(partyIdQuery) } - searchRequest := bluge.NewTopNSearch(len(m.indexes), indexQuery) + searchRequest := bluge.NewTopNSearch(indexCount, indexQuery) // Sort results to try and select the best match, or if the // matches are equivalent, the longest waiting tickets first. searchRequest.SortBy([]string{"-_score", "created_at"}) - indexReader, err := m.indexWriter.Reader() - if err != nil { - m.logger.Error("error accessing index reader", zap.Error(err)) - continue - } - result, err := indexReader.Search(m.ctx, searchRequest) if err != nil { - _ = indexReader.Close() m.logger.Error("error searching index", zap.Error(err)) continue } blugeMatches, err := IterateBlugeMatches(result, map[string]struct{}{}, m.logger) if err != nil { - _ = indexReader.Close() m.logger.Error("error iterating search results", zap.Error(err)) continue } - err = indexReader.Close() - if err != nil { - m.logger.Error("error closing index reader", zap.Error(err)) - continue - } - - for idx, hit := range blugeMatches.Hits { - if hit.ID == ticket { + for i := 0; i < len(blugeMatches.Hits); i++ { + hitTicket := blugeMatches.Hits[i].ID + if hitTicket == ticket { // Remove the current ticket. - blugeMatches.Hits = append(blugeMatches.Hits[:idx], blugeMatches.Hits[idx+1:]...) - break + blugeMatches.Hits = append(blugeMatches.Hits[:i], blugeMatches.Hits[i+1:]...) + if len(selectedTickets) == 0 { + break + } + i-- + } else if _, found := selectedTickets[hitTicket]; found { + // Ticket has already been selected for another match during this process iteration. + blugeMatches.Hits = append(blugeMatches.Hits[:i], blugeMatches.Hits[i+1:]...) + i-- } } @@ -399,7 +413,7 @@ func (m *LocalMatchmaker) Process() { entryCombos := make([][]*MatchmakerEntry, 0, 5) lastHitCounter := len(blugeMatches.Hits) - 1 for hitCounter, hit := range blugeMatches.Hits { - hitIndex, ok := m.indexes[hit.ID] + hitIndex, ok := indexesCopy[hit.ID] if !ok { // Ticket did not exist, should not happen. m.logger.Warn("matchmaker process missing index", zap.String("ticket", hit.ID)) @@ -407,7 +421,7 @@ func (m *LocalMatchmaker) Process() { } if !threshold && m.config.GetMatchmaker().RevPrecision { - outerMutualMatch, err := validateMatch(m, indexReader, hitIndex.ParsedQuery, hit.ID, ticket) + outerMutualMatch, err := validateMatch(m.ctx, m.revCache, indexReader, hitIndex.ParsedQuery, hit.ID, ticket) if err != nil { m.logger.Error("error validating mutual match", zap.Error(err)) continue @@ -417,14 +431,14 @@ func (m *LocalMatchmaker) Process() { } } - if index.MaxCount < hitIndex.MaxCount && hitIndex.Intervals <= m.config.GetMatchmaker().MaxIntervals { + if activeIndex.MaxCount < hitIndex.MaxCount && hitIndex.Intervals <= m.config.GetMatchmaker().MaxIntervals { // This match would be less than the search hit's preferred max, and they can still wait. Let them wait more. continue } // Check if there are overlapping session IDs, and if so these tickets are ineligible to match together. var sessionIdConflict bool - for sessionID := range index.SessionIDs { + for sessionID := range activeIndex.SessionIDs { if _, found := hitIndex.SessionIDs[sessionID]; found { sessionIdConflict = true break @@ -434,26 +448,19 @@ func (m *LocalMatchmaker) Process() { continue } - entries, ok := m.entries[hit.ID] - if !ok { - // Ticket did not exist, should not happen. - m.logger.Warn("matchmaker process missing entries", zap.String("ticket", hit.ID)) - continue - } - var foundComboIdx int var foundCombo []*MatchmakerEntry - var mutualMatchConflict bool for entryComboIdx, entryCombo := range entryCombos { - if len(entryCombo)+len(entries)+index.Count <= index.MaxCount { - // There is room in this combo for these entries. Check if there are session ID conflicts with current combo. + if len(entryCombo)+len(hitIndex.Entries)+activeIndex.Count <= activeIndex.MaxCount { + // There is room in this combo for these entries. Check if there are session ID or mutual match conflicts with current combo. + var mutualMatchConflict bool for _, entry := range entryCombo { if _, found := hitIndex.SessionIDs[entry.Presence.SessionId]; found { sessionIdConflict = true break } if !threshold && m.config.GetMatchmaker().RevPrecision { - entryMatchesSearchHitQuery, err := validateMatch(m, indexReader, hitIndex.ParsedQuery, hit.ID, entry.Ticket) + entryMatchesSearchHitQuery, err := validateMatch(m.ctx, m.revCache, indexReader, hitIndex.ParsedQuery, hit.ID, entry.Ticket) if err != nil { mutualMatchConflict = true m.logger.Error("error validating mutual match", zap.Error(err)) @@ -464,8 +471,8 @@ func (m *LocalMatchmaker) Process() { break } // MatchmakerEntry does not have the query, read it out of indexes. - if entriesIndexEntry, ok := m.indexes[entry.Ticket]; ok { - searchHitMatchesEntryQuery, err := validateMatch(m, indexReader, entriesIndexEntry.ParsedQuery, entry.Ticket, hit.ID) + if entriesIndexEntry, ok := indexesCopy[entry.Ticket]; ok { + searchHitMatchesEntryQuery, err := validateMatch(m.ctx, m.revCache, indexReader, entriesIndexEntry.ParsedQuery, entry.Ticket, hit.ID) if err != nil { mutualMatchConflict = true m.logger.Error("error validating mutual match", zap.Error(err)) @@ -484,7 +491,7 @@ func (m *LocalMatchmaker) Process() { continue } - entryCombo = append(entryCombo, entries...) + entryCombo = append(entryCombo, hitIndex.Entries...) entryCombos[entryComboIdx] = entryCombo foundCombo = entryCombo @@ -492,10 +499,10 @@ func (m *LocalMatchmaker) Process() { break } } - // Either processing first hit, or current hit entries combined with previous hits may tip over index.MaxCount. + // Either processing first hit, or current hit entries combined with previous hits may tip over activeIndex.MaxCount. if foundCombo == nil { - entryCombo := make([]*MatchmakerEntry, len(entries)) - copy(entryCombo, entries) + entryCombo := make([]*MatchmakerEntry, len(hitIndex.Entries)) + copy(entryCombo, hitIndex.Entries) entryCombos = append(entryCombos, entryCombo) foundCombo = entryCombo @@ -507,8 +514,8 @@ func (m *LocalMatchmaker) Process() { // * The combo at least satisfies the min count. // * The combo does not exceed the max count. // * There are no more hits that may further fill the found combo, so we get as close as possible to the max count. - if l := len(foundCombo) + index.Count; l == index.MaxCount || (lastInterval && l >= index.MinCount && l <= index.MaxCount && hitCounter >= lastHitCounter) { - if rem := l % index.CountMultiple; rem != 0 { + if l := len(foundCombo) + activeIndex.Count; l == activeIndex.MaxCount || (lastInterval && l >= activeIndex.MinCount && l <= activeIndex.MaxCount && hitCounter >= lastHitCounter) { + if rem := l % activeIndex.CountMultiple; rem != 0 { // The size of the combination being considered does not satisfy the count multiple. // Attempt to adjust the combo by removing the smallest possible number of entries. // Prefer keeping entries that have been in the matchmaker the longest, if possible. @@ -516,7 +523,7 @@ func (m *LocalMatchmaker) Process() { for _, e := range foundCombo { // Only tickets individually less <= the removable size are considered. // For example removing a party of 3 when we're only looking to remove 2 is not allowed. - if foundIndex, ok := m.indexes[e.Ticket]; ok && foundIndex.Count <= rem { + if foundIndex, ok := indexesCopy[e.Ticket]; ok && foundIndex.Count <= rem { eligibleIndexesUniq[foundIndex] = struct{}{} } } @@ -548,9 +555,9 @@ func (m *LocalMatchmaker) Process() { } // We've removed something, update the known size of the currently considered combo. - l = len(foundCombo) + index.Count + l = len(foundCombo) + activeIndex.Count - if l%index.CountMultiple != 0 { + if l%activeIndex.CountMultiple != 0 { // Removal was insufficient, the combo is still not valid for the required multiple. continue } @@ -563,7 +570,7 @@ func (m *LocalMatchmaker) Process() { // For any condition failures it does not matter which specific condition is not met. var conditionFailed bool for _, e := range foundCombo { - if foundIndex, ok := m.indexes[e.Ticket]; ok && (foundIndex.MinCount > l || foundIndex.MaxCount < l || l%foundIndex.CountMultiple != 0) { + if foundIndex, ok := indexesCopy[e.Ticket]; ok && (foundIndex.MinCount > l || foundIndex.MaxCount < l || l%foundIndex.CountMultiple != 0) { conditionFailed = true break } @@ -573,58 +580,89 @@ func (m *LocalMatchmaker) Process() { } // Found a suitable match. - entries, ok := m.entries[ticket] - if !ok { - // Ticket did not exist, should not happen. - m.logger.Warn("matchmaker process missing entries", zap.String("ticket", hit.ID)) - break - } - currentMatchedEntries := append(foundCombo, entries...) + currentMatchedEntries := append(foundCombo, activeIndex.Entries...) // Remove the found combos from currently tracked list. entryCombos = append(entryCombos[:foundComboIdx], entryCombos[foundComboIdx+1:]...) matchedEntries = append(matchedEntries, currentMatchedEntries) - // Remove all entries/indexes that have just matched. It must be done here so any following process iterations - // cannot pick up the same tickets to match against. - ticketsToDelete := make(map[string]struct{}, len(currentMatchedEntries)) - for _, entry := range currentMatchedEntries { - if _, ok := ticketsToDelete[entry.Ticket]; !ok { - m.batch.Delete(bluge.Identifier(entry.Ticket)) - ticketsToDelete[entry.Ticket] = struct{}{} - } - delete(m.entries, entry.Ticket) - delete(m.indexes, entry.Ticket) - delete(m.activeIndexes, entry.Ticket) - delete(m.revCache, entry.Ticket) - if sessionTickets, ok := m.sessionTickets[entry.Presence.SessionId]; ok { - if l := len(sessionTickets); l <= 1 { - delete(m.sessionTickets, entry.Presence.SessionId) - } else { - delete(sessionTickets, entry.Ticket) - } - } - if entry.PartyId != "" { - if partyTickets, ok := m.partyTickets[entry.PartyId]; ok { - if l := len(partyTickets); l <= 1 { - delete(m.partyTickets, entry.PartyId) - } else { - delete(partyTickets, entry.Ticket) - } - } + // Mark tickets as unavailable for further use in this process iteration. + for _, currentMatchedEntry := range currentMatchedEntries { + if _, found := selectedTickets[currentMatchedEntry.Ticket]; found { + continue } + selectedTickets[currentMatchedEntry.Ticket] = struct{}{} } - if err := m.indexWriter.Batch(m.batch); err != nil { - m.logger.Error("error deleting matchmaker process entries batch", zap.Error(err)) - } - m.batch.Reset() break } } } + var batchSize int + batch := bluge.NewBatch() + + m.Lock() + + for _, ticket := range expiredActiveIndexes { + delete(m.activeIndexes, ticket) + } + + for i := 0; i < len(matchedEntries); i++ { + // Check that the current matched entries are all still present and eligible for the match to be formed. + currentMatchedEntries := matchedEntries[i] + var incomplete bool + for _, entry := range currentMatchedEntries { + if _, found := m.indexes[entry.Ticket]; !found { + incomplete = true + break + } + } + if incomplete { + matchedEntries[i] = matchedEntries[len(matchedEntries)-1] + matchedEntries[len(matchedEntries)-1] = nil + matchedEntries = matchedEntries[:len(matchedEntries)-1] + i-- + continue + } + + // Remove all entries/indexes that have just matched. + ticketsToDelete := make(map[string]struct{}, len(currentMatchedEntries)) + for _, entry := range currentMatchedEntries { + if _, ok := ticketsToDelete[entry.Ticket]; !ok { + batchSize++ + batch.Delete(bluge.Identifier(entry.Ticket)) + ticketsToDelete[entry.Ticket] = struct{}{} + } + delete(m.indexes, entry.Ticket) + delete(m.activeIndexes, entry.Ticket) + m.revCache.Delete(entry.Ticket) + if sessionTickets, ok := m.sessionTickets[entry.Presence.SessionId]; ok { + if l := len(sessionTickets); l <= 1 { + delete(m.sessionTickets, entry.Presence.SessionId) + } else { + delete(sessionTickets, entry.Ticket) + } + } + if entry.PartyId != "" { + if partyTickets, ok := m.partyTickets[entry.PartyId]; ok { + if l := len(partyTickets); l <= 1 { + delete(m.partyTickets, entry.PartyId) + } else { + delete(partyTickets, entry.Ticket) + } + } + } + } + } + + if batchSize > 0 { + if err := m.indexWriter.Batch(batch); err != nil { + m.logger.Error("error deleting matchmaker process entries batch", zap.Error(err)) + } + } + m.Unlock() if matchedEntriesCount := len(matchedEntries); matchedEntriesCount > 0 { @@ -789,14 +827,14 @@ func (m *LocalMatchmaker) Add(ctx context.Context, presences []*MatchmakerPresen return "", 0, runtime.ErrMatchmakerIndex } - entries := make([]*MatchmakerEntry, 0, len(presences)) + index.Entries = make([]*MatchmakerEntry, 0, len(presences)) for _, presence := range presences { if _, ok := m.sessionTickets[presence.SessionId]; ok { m.sessionTickets[presence.SessionId][ticket] = struct{}{} } else { m.sessionTickets[presence.SessionId] = map[string]struct{}{ticket: {}} } - entries = append(entries, &MatchmakerEntry{ + index.Entries = append(index.Entries, &MatchmakerEntry{ Ticket: ticket, Presence: presence, Properties: properties, @@ -812,9 +850,9 @@ func (m *LocalMatchmaker) Add(ctx context.Context, presences []*MatchmakerPresen m.partyTickets[partyId] = map[string]struct{}{ticket: {}} } } - m.entries[ticket] = entries m.indexes[ticket] = index m.activeIndexes[ticket] = index + m.revCache.Store(ticket, make(map[string]bool, 10)) m.Unlock() return ticket, createdAt, nil @@ -830,7 +868,6 @@ func (m *LocalMatchmaker) Insert(extracts []*MatchmakerExtract) error { batch := bluge.NewBatch() indexes := make(map[string]*MatchmakerIndex, len(extracts)) - entries := make(map[string][]*MatchmakerEntry, len(extracts)) for _, extract := range extracts { parsedQuery, err := ParseQueryString(extract.Query) @@ -890,9 +927,9 @@ func (m *LocalMatchmaker) Insert(extracts []*MatchmakerExtract) error { batch.Insert(matchmakerIndexDoc) - extractEntries := make([]*MatchmakerEntry, 0, len(extract.Presences)) + index.Entries = make([]*MatchmakerEntry, 0, len(extract.Presences)) for _, presence := range extract.Presences { - extractEntries = append(extractEntries, &MatchmakerEntry{ + index.Entries = append(index.Entries, &MatchmakerEntry{ Ticket: extract.Ticket, Presence: presence, Properties: properties, @@ -901,7 +938,6 @@ func (m *LocalMatchmaker) Insert(extracts []*MatchmakerExtract) error { NumericProperties: extract.NumericProperties, }) } - entries[extract.Ticket] = extractEntries indexes[extract.Ticket] = index } @@ -914,6 +950,7 @@ func (m *LocalMatchmaker) Insert(extracts []*MatchmakerExtract) error { } for ticket, index := range indexes { m.indexes[ticket] = index + m.revCache.Store(ticket, make(map[string]bool, 10)) if index.Intervals < m.config.GetMatchmaker().MaxIntervals { m.activeIndexes[ticket] = index } @@ -924,10 +961,7 @@ func (m *LocalMatchmaker) Insert(extracts []*MatchmakerExtract) error { m.partyTickets[index.PartyId] = map[string]struct{}{ticket: {}} } } - } - for ticket, ticketEntries := range entries { - m.entries[ticket] = ticketEntries - for _, entry := range ticketEntries { + for _, entry := range index.Entries { if _, ok := m.sessionTickets[entry.Presence.SessionId]; ok { m.sessionTickets[entry.Presence.SessionId][ticket] = struct{}{} } else { @@ -953,14 +987,9 @@ func (m *LocalMatchmaker) Extract() []*MatchmakerExtract { if index.Node != m.node { continue } - entries, ok := m.entries[ticket] - if !ok { - m.logger.Warn("matchmaker extract found ticket with no entries", zap.String("ticket", ticket)) - continue - } extract := &MatchmakerExtract{ - Presences: make([]*MatchmakerPresence, 0, len(entries)), + Presences: make([]*MatchmakerPresence, 0, len(index.Entries)), SessionID: index.SessionID, PartyId: index.PartyId, Query: index.Query, @@ -975,7 +1004,7 @@ func (m *LocalMatchmaker) Extract() []*MatchmakerExtract { CreatedAt: index.CreatedAt, Node: index.Node, } - for _, entry := range entries { + for _, entry := range index.Entries { extract.Presences = append(extract.Presences, entry.Presence) } @@ -998,13 +1027,7 @@ func (m *LocalMatchmaker) RemoveSession(sessionID, ticket string) error { } delete(m.indexes, ticket) - entries, ok := m.entries[ticket] - if !ok { - m.logger.Warn("matchmaker remove session found ticket with no entries", zap.String("ticket", ticket)) - } - delete(m.entries, ticket) - - for _, entry := range entries { + for _, entry := range index.Entries { if sessionTickets, ok := m.sessionTickets[entry.Presence.SessionId]; ok { if l := len(sessionTickets); l <= 1 { delete(m.sessionTickets, entry.Presence.SessionId) @@ -1025,7 +1048,7 @@ func (m *LocalMatchmaker) RemoveSession(sessionID, ticket string) error { } delete(m.activeIndexes, ticket) - delete(m.revCache, ticket) + m.revCache.Delete(ticket) if err := m.indexWriter.Delete(bluge.Identifier(ticket)); err != nil { m.Unlock() @@ -1062,15 +1085,9 @@ func (m *LocalMatchmaker) RemoveSessionAll(sessionID string) error { delete(m.indexes, ticket) delete(m.activeIndexes, ticket) - delete(m.revCache, ticket) - - entries, ok := m.entries[ticket] - if !ok { - m.logger.Warn("matchmaker remove session all found ticket with no entries", zap.String("ticket", ticket)) - } - delete(m.entries, ticket) + m.revCache.Delete(ticket) - for _, entry := range entries { + for _, entry := range index.Entries { if entry.Presence.SessionId == sessionID { // Already deleted above. continue @@ -1115,13 +1132,7 @@ func (m *LocalMatchmaker) RemoveParty(partyID, ticket string) error { } delete(m.indexes, ticket) - entries, ok := m.entries[ticket] - if !ok { - m.logger.Warn("matchmaker remove party found ticket with no entries", zap.String("ticket", ticket)) - } - delete(m.entries, ticket) - - for _, entry := range entries { + for _, entry := range index.Entries { if sessionTickets, ok := m.sessionTickets[entry.Presence.SessionId]; ok { if l := len(sessionTickets); l <= 1 { delete(m.sessionTickets, entry.Presence.SessionId) @@ -1140,7 +1151,7 @@ func (m *LocalMatchmaker) RemoveParty(partyID, ticket string) error { } delete(m.activeIndexes, ticket) - delete(m.revCache, ticket) + m.revCache.Delete(ticket) if err := m.indexWriter.Delete(bluge.Identifier(ticket)); err != nil { m.Unlock() @@ -1168,7 +1179,7 @@ func (m *LocalMatchmaker) RemovePartyAll(partyID string) error { for ticket := range partyTickets { batch.Delete(bluge.Identifier(ticket)) - _, ok := m.indexes[ticket] + partyIndex, ok := m.indexes[ticket] if !ok { // Ticket did not exist, should not happen. m.logger.Warn("matchmaker remove party all found ticket with no index", zap.String("ticket", ticket)) @@ -1177,15 +1188,9 @@ func (m *LocalMatchmaker) RemovePartyAll(partyID string) error { delete(m.indexes, ticket) delete(m.activeIndexes, ticket) - delete(m.revCache, ticket) + m.revCache.Delete(ticket) - entries, ok := m.entries[ticket] - if !ok { - m.logger.Warn("matchmaker remove party all found ticket with no entries", zap.String("ticket", ticket)) - } - delete(m.entries, ticket) - - for _, entry := range entries { + for _, entry := range partyIndex.Entries { if sessionTickets, ok := m.sessionTickets[entry.Presence.SessionId]; ok { if l := len(sessionTickets); l <= 1 { delete(m.sessionTickets, entry.Presence.SessionId) @@ -1210,6 +1215,7 @@ func (m *LocalMatchmaker) RemoveAll(node string) { m.Lock() + var removedCount uint32 for ticket, index := range m.indexes { if index.Node != node { continue @@ -1217,10 +1223,11 @@ func (m *LocalMatchmaker) RemoveAll(node string) { batch.Delete(bluge.Identifier(ticket)) + removedCount++ delete(m.indexes, ticket) delete(m.activeIndexes, ticket) - delete(m.revCache, ticket) + m.revCache.Delete(ticket) if index.PartyId != "" { partyTickets, ok := m.partyTickets[index.PartyId] @@ -1233,13 +1240,7 @@ func (m *LocalMatchmaker) RemoveAll(node string) { } } - entries, ok := m.entries[ticket] - if !ok { - m.logger.Warn("matchmaker remove all found ticket with no entries", zap.String("ticket", ticket)) - } - delete(m.entries, ticket) - - for _, entry := range entries { + for _, entry := range index.Entries { if sessionTickets, ok := m.sessionTickets[entry.Presence.SessionId]; ok { if l := len(sessionTickets); l <= 1 { delete(m.sessionTickets, entry.Presence.SessionId) @@ -1250,6 +1251,11 @@ func (m *LocalMatchmaker) RemoveAll(node string) { } } + if removedCount == 0 { + m.Unlock() + return + } + err := m.indexWriter.Batch(batch) m.Unlock() if err != nil { @@ -1262,6 +1268,7 @@ func (m *LocalMatchmaker) Remove(tickets []string) { m.Lock() + var removedCount uint32 for _, ticket := range tickets { index, found := m.indexes[ticket] if !found { @@ -1270,10 +1277,11 @@ func (m *LocalMatchmaker) Remove(tickets []string) { batch.Delete(bluge.Identifier(ticket)) + removedCount++ delete(m.indexes, ticket) delete(m.activeIndexes, ticket) - delete(m.revCache, ticket) + m.revCache.Delete(ticket) if index.PartyId != "" { partyTickets, ok := m.partyTickets[index.PartyId] @@ -1286,13 +1294,7 @@ func (m *LocalMatchmaker) Remove(tickets []string) { } } - entries, ok := m.entries[ticket] - if !ok { - m.logger.Warn("matchmaker remove all found ticket with no entries", zap.String("ticket", ticket)) - } - delete(m.entries, ticket) - - for _, entry := range entries { + for _, entry := range index.Entries { if sessionTickets, ok := m.sessionTickets[entry.Presence.SessionId]; ok { if l := len(sessionTickets); l <= 1 { delete(m.sessionTickets, entry.Presence.SessionId) @@ -1303,6 +1305,11 @@ func (m *LocalMatchmaker) Remove(tickets []string) { } } + if removedCount == 0 { + m.Unlock() + return + } + err := m.indexWriter.Batch(batch) m.Unlock() if err != nil { @@ -1326,12 +1333,14 @@ func MapMatchmakerIndex(id string, in *MatchmakerIndex) (*bluge.Document, error) return rv, nil } -func validateMatch(m *LocalMatchmaker, r *bluge.Reader, fromTicketQuery bluge.Query, fromTicket, toTicket string) (bool, error) { - cache, found := m.revCache[fromTicket] - if found { - if cachedResult, seenBefore := cache[toTicket]; seenBefore { - return cachedResult, nil - } +func validateMatch(ctx context.Context, revCache *MapOf[string, map[string]bool], r *bluge.Reader, fromTicketQuery bluge.Query, fromTicket, toTicket string) (bool, error) { + cache, found := revCache.Load(fromTicket) + if !found { + return false, nil + } + + if cachedResult, seenBefore := cache[toTicket]; seenBefore { + return cachedResult, nil } idQuery := bluge.NewTermQuery(toTicket).SetField("_id") @@ -1340,18 +1349,14 @@ func validateMatch(m *LocalMatchmaker, r *bluge.Reader, fromTicketQuery bluge.Qu topQuery.AddMust(idQuery, fromTicketQuery) req := bluge.NewTopNSearch(0, topQuery).WithStandardAggregations() - dmi, err := r.Search(m.ctx, req) + dmi, err := r.Search(ctx, req) if err != nil { return false, err } valid := dmi.Aggregations().Count() == 1 - if found { - cache[toTicket] = valid - } else { - m.revCache[fromTicket] = map[string]bool{toTicket: valid} - } + cache[toTicket] = valid return valid, nil } diff --git a/server/matchmaker_test.go b/server/matchmaker_test.go index db30ba4c1..9b85d193f 100644 --- a/server/matchmaker_test.go +++ b/server/matchmaker_test.go @@ -1716,14 +1716,12 @@ func NewLocalBenchMatchmaker(logger, startupLogger *zap.Logger, config Config, r ctx: ctx, ctxCancelFn: ctxCancelFn, - batch: bluge.NewBatch(), indexWriter: indexWriter, sessionTickets: make(map[string]map[string]struct{}), partyTickets: make(map[string]map[string]struct{}), - entries: make(map[string][]*MatchmakerEntry), indexes: make(map[string]*MatchmakerIndex), activeIndexes: make(map[string]*MatchmakerIndex), - revCache: make(map[string]map[string]bool), + revCache: &MapOf[string, map[string]bool]{}, } if tickerActive { -- GitLab