Commit 19b9ecf5 authored by Maxim Ivanov's avatar Maxim Ivanov
Browse files

chore(matchmaker): run matchmaking loop on index, not entries

atomic unit of matchmaking is matchmaking index, but MM logic used to operate
on entries, which required converting to index in multiple places.

This commits makes MM loop to operate on indexes, producing final entries
only at the very end.
parent ccf77ab5
Loading
Loading
Loading
Loading
+61 −78
Original line number Diff line number Diff line
@@ -400,7 +400,7 @@ func (m *LocalMatchmaker) Process() {
		}

		// Form possible combinations, in case multiple matches might be suitable.
		entryCombos := make([][]*MatchmakerEntry, 0, 5)
		entryCombos := make([][]*MatchmakerIndex, 0, 5)
		lastHitCounter := len(blugeMatches.Hits) - 1
		for hitCounter, hit := range blugeMatches.Hits {
			hitIndex, ok := m.indexes[hit.ID]
@@ -419,34 +419,18 @@ func (m *LocalMatchmaker) Process() {
				continue
			}

			hitEntries, ok := m.entries[hit.ID]
			if !ok {
				// Ticket did not exist, should not happen.
				m.logger.Warn("matchmaker process missing entries", zap.String("ticket", hit.ID))
				continue
			}

			var foundCombo []*MatchmakerEntry
			var foundCombo []*MatchmakerIndex
		combos:
			for entryComboIdx, entryCombo := range entryCombos {
				if len(entryCombo)+len(hitEntries)+index.Count <= index.MaxCount {
				if sumCount(entryCombo)+hitIndex.Count+index.Count <= index.MaxCount {
					// There is room in this combo for these entries. Check if there are session ID conflicts with current combo.
					entryIndexesUniq := make(map[*MatchmakerIndex]struct{}, len(entryCombo))
					for _, e := range entryCombo {
						if foundIndex, ok := m.indexes[e.Ticket]; ok {
							entryIndexesUniq[foundIndex] = struct{}{}
						} else {
							m.logger.Warn("matchmaker missing index entry for entry combo")
						}
					}

					for eix := range entryIndexesUniq {
					for _, eix := range entryCombo {
						if m.doesConflictWithIndex(threshold, indexReader, hitIndex, eix, true) {
							continue combos
						}
					}

					entryCombo = append(entryCombo, hitEntries...)
					entryCombo = append(entryCombo, hitIndex)
					entryCombos[entryComboIdx] = entryCombo

					foundCombo = entryCombo
@@ -455,8 +439,7 @@ func (m *LocalMatchmaker) Process() {
			}
			// Either processing first hit, or current hit entries combined with previous hits may tip over index.MaxCount.
			if foundCombo == nil {
				entryCombo := make([]*MatchmakerEntry, len(hitEntries))
				copy(entryCombo, hitEntries)
				entryCombo := append(make([]*MatchmakerIndex, 0, 5), hitIndex)
				entryCombos = append(entryCombos, entryCombo)

				foundCombo = entryCombo
@@ -467,7 +450,7 @@ func (m *LocalMatchmaker) Process() {
			// * The combo at least satisfies the min count.
			// * The combo does not exceed the max count.
			// * There are no more hits that may further fill the found combo, so we get as close as possible to the max count.
			if l := len(foundCombo) + index.Count; l == index.MaxCount || (lastInterval && l >= index.MinCount && l <= index.MaxCount && hitCounter >= lastHitCounter) {
			if l := sumCount(foundCombo) + index.Count; l == index.MaxCount || (lastInterval && l >= index.MinCount && l <= index.MaxCount && hitCounter >= lastHitCounter) {
				if rem := l % index.CountMultiple; rem != 0 {
					foundCombo, ok = m.roundToCountMultiple(foundCombo, rem, index)
					if !ok {
@@ -480,25 +463,17 @@ func (m *LocalMatchmaker) Process() {
				// * The found combo size satisfies the maximum count.
				// * The found combo size satisfies the count multiple.
				// For any condition failures it does not matter which specific condition is not met.
				var conditionFailed bool
				for _, e := range foundCombo {
					if foundIndex, ok := m.indexes[e.Ticket]; ok && (foundIndex.MinCount > l || foundIndex.MaxCount < l || l%foundIndex.CountMultiple != 0) {
						conditionFailed = true
						break
					}
				}
				if conditionFailed {
				for _, foundIndex := range foundCombo {
					if foundIndex.MinCount > l || foundIndex.MaxCount < l || l%foundIndex.CountMultiple != 0 {
						continue
					}
				}

				// Found a suitable match.
				ticketEntries, ok := m.entries[ticket]
				matchedEntries, ok = m.finalizeMatchMaking(index, foundCombo)
				if !ok {
					// Ticket did not exist, should not happen.
					m.logger.Warn("matchmaker process missing entries", zap.String("ticket", hit.ID))
					break
					continue // original code had break here
				}
				matchedEntries = m.finalizeMatchMaking(ticketEntries, foundCombo)

				break
			}
@@ -619,24 +594,19 @@ func (m *LocalMatchmaker) sendMatchMakingResult(matchedEntries [][]*MatchmakerEn
}

// remove rem entries to satisfy CountMultiple condition
func (m *LocalMatchmaker) roundToCountMultiple(foundCombo []*MatchmakerEntry, rem int, index *MatchmakerIndex) ([]*MatchmakerEntry, bool) {
func (m *LocalMatchmaker) roundToCountMultiple(foundCombo []*MatchmakerIndex, rem int, index *MatchmakerIndex) ([]*MatchmakerIndex, bool) {
	// The size of the combination being considered does not satisfy the count multiple.
	// Attempt to adjust the combo by removing the smallest possible number of entries.
	// Prefer keeping entries that have been in the matchmaker the longest, if possible.
	eligibleIndexesUniq := make(map[*MatchmakerIndex]struct{}, len(foundCombo))
	for _, e := range foundCombo {
	eligibleIndexes := make([]*MatchmakerIndex, len(foundCombo))
	for _, foundIndex := range foundCombo {
		// Only tickets individually less <= the removable size are considered.
		// For example removing a party of 3 when we're only looking to remove 2 is not allowed.
		if foundIndex, ok := m.indexes[e.Ticket]; ok && foundIndex.Count <= rem {
			eligibleIndexesUniq[foundIndex] = struct{}{}
		if foundIndex.Count <= rem {
			eligibleIndexes = append(eligibleIndexes, foundIndex)
		}
	}

	eligibleIndexes := make([]*MatchmakerIndex, 0, len(eligibleIndexesUniq))
	for idx := range eligibleIndexesUniq {
		eligibleIndexes = append(eligibleIndexes, idx)
	}

	eligibleGroups := groupIndexes(eligibleIndexes, rem)
	if len(eligibleGroups) == 0 {
		// No possible combination to remove, unlikely but guard.
@@ -653,58 +623,63 @@ func (m *LocalMatchmaker) roundToCountMultiple(foundCombo []*MatchmakerEntry, re
				foundCombo[i] = foundCombo[len(foundCombo)-1]
				foundCombo[len(foundCombo)-1] = nil
				foundCombo = foundCombo[:len(foundCombo)-1]
				i--
				break
			}
		}
	}

	if (len(foundCombo)+index.Count)%index.CountMultiple != 0 {
	if (sumCount(foundCombo)+index.Count)%index.CountMultiple != 0 {
		// Removal was insufficient, the combo is still not valid for the required multiple.
		return nil, false
	}
	return foundCombo, true
}

func (m *LocalMatchmaker) finalizeMatchMaking(entries []*MatchmakerEntry, foundCombo []*MatchmakerEntry) [][]*MatchmakerEntry {
	matchedEntries := make([][]*MatchmakerEntry, 0, 5)
	currentMatchedEntries := append(foundCombo, entries...)
func (m *LocalMatchmaker) finalizeMatchMaking(index *MatchmakerIndex, foundCombo []*MatchmakerIndex) ([][]*MatchmakerEntry, bool) {
	matchedEntries := make([]*MatchmakerEntry, 0, sumCount(foundCombo)+index.Count)

	matchedEntries = append(matchedEntries, currentMatchedEntries)
	matchedIndexes := append(foundCombo, index)
	for _, idx := range matchedIndexes {
		entries, ok := m.entries[idx.Ticket]
		if !ok {
			// Ticket did not exist, should not happen.
			m.logger.Warn("matchmaker process missing entries", zap.String("ticket", idx.Ticket))
			return nil, false
		}
		matchedEntries = append(matchedEntries, entries...)

		// Remove all entries/indexes that have just matched. It must be done here so any following process iterations
		// cannot pick up the same tickets to match against.
	ticketsToDelete := make(map[string]struct{}, len(currentMatchedEntries))
	for _, entry := range currentMatchedEntries {
		if _, ok := ticketsToDelete[entry.Ticket]; !ok {
			m.batch.Delete(bluge.Identifier(entry.Ticket))
			ticketsToDelete[entry.Ticket] = struct{}{}
		}
		delete(m.entries, entry.Ticket)
		delete(m.indexes, entry.Ticket)
		delete(m.activeIndexes, entry.Ticket)
		delete(m.revCache, entry.Ticket)
		if sessionTickets, ok := m.sessionTickets[entry.Presence.SessionId]; ok {
		m.batch.Delete(bluge.Identifier(idx.Ticket))
		delete(m.entries, idx.Ticket)
		delete(m.indexes, idx.Ticket)
		delete(m.activeIndexes, idx.Ticket)
		delete(m.revCache, idx.Ticket)

		for sid := range idx.SessionIDs {
			if sessionTickets, ok := m.sessionTickets[sid]; ok {
				if l := len(sessionTickets); l <= 1 {
				delete(m.sessionTickets, entry.Presence.SessionId)
					delete(m.sessionTickets, sid)
				} else {
				delete(sessionTickets, entry.Ticket)
					delete(sessionTickets, idx.Ticket)
				}
			}
		if entry.PartyId != "" {
			if partyTickets, ok := m.partyTickets[entry.PartyId]; ok {
		}

		if partyTickets, ok := m.partyTickets[idx.PartyId]; ok {
			if l := len(partyTickets); l <= 1 {
					delete(m.partyTickets, entry.PartyId)
				delete(m.partyTickets, idx.PartyId)
			} else {
					delete(partyTickets, entry.Ticket)
				}
				delete(partyTickets, idx.Ticket)
			}
		}
	}

	if err := m.indexWriter.Batch(m.batch); err != nil {
		m.logger.Error("error deleting matchmaker process entries batch", zap.Error(err))
	}
	m.batch.Reset()
	return matchedEntries
	return [][]*MatchmakerEntry{matchedEntries}, true
}

func (m *LocalMatchmaker) Add(ctx context.Context, presences []*MatchmakerPresence, sessionID, partyId, query string, minCount, maxCount, countMultiple int, stringProperties map[string]string, numericProperties map[string]float64) (string, int64, error) {
@@ -1366,3 +1341,11 @@ func validateMatch(m *LocalMatchmaker, r *bluge.Reader, fromTicketQuery bluge.Qu

	return valid, nil
}

func sumCount(combo []*MatchmakerIndex) int {
	result := 0
	for _, idx := range combo {
		result += idx.Count
	}
	return result
}