Unverified Commit f25f4d6d authored by Fernando Takagi's avatar Fernando Takagi Committed by GitHub
Browse files

Better scaling for high number of tickets on matchmaker. (#1013)

parent 6ab0d2d3
Loading
Loading
Loading
Loading
+22 −22
Original line number Diff line number Diff line
@@ -314,17 +314,6 @@ func (m *LocalMatchmaker) Process() {

	m.Unlock()

	indexReader, err := m.indexWriter.Reader()
	if err != nil {
		m.logger.Error("error accessing index reader", zap.Error(err))
		return
	}
	defer func() {
		if err := indexReader.Close(); err != nil {
			m.logger.Error("error closing index reader", zap.Error(err))
		}
	}()

	var threshold bool
	var timer *time.Timer
	if m.active.Load() == 1 && m.revThresholdFn != nil {
@@ -386,14 +375,22 @@ func (m *LocalMatchmaker) Process() {
		// matches are equivalent, the longest waiting tickets first.
		searchRequest.SortBy([]string{"-_score", "created_at"})

		indexReader, err := m.indexWriter.Reader()
		if err != nil {
			m.logger.Error("error accessing index reader", zap.Error(err))
			continue
		}

		result, err := indexReader.Search(m.ctx, searchRequest)
		if err != nil {
			_ = indexReader.Close()
			m.logger.Error("error searching index", zap.Error(err))
			continue
		}

		blugeMatches, err := IterateBlugeMatches(result, map[string]struct{}{}, m.logger)
		if err != nil {
			_ = indexReader.Close()
			m.logger.Error("error iterating search results", zap.Error(err))
			continue
		}
@@ -592,21 +589,32 @@ func (m *LocalMatchmaker) Process() {

				matchedEntries = append(matchedEntries, currentMatchedEntries)

				var batchSize int
				batch := bluge.NewBatch()
				// Mark tickets as unavailable for further use in this process iteration.
				for _, currentMatchedEntry := range currentMatchedEntries {
					if _, found := selectedTickets[currentMatchedEntry.Ticket]; found {
						continue
					}
					selectedTickets[currentMatchedEntry.Ticket] = struct{}{}
					batchSize++
					batch.Delete(bluge.Identifier(currentMatchedEntry.Ticket))
				}
				if batchSize > 0 {
					if err := m.indexWriter.Batch(batch); err != nil {
						m.logger.Error("error deleting matchmaker process entries batch", zap.Error(err))
					}
				}

				break
			}
		}
		err = indexReader.Close()
		if err != nil {
			m.logger.Error("error closing index reader", zap.Error(err))
			continue
		}
	}

	var batchSize int
	batch := bluge.NewBatch()

	m.Lock()

@@ -636,8 +644,6 @@ func (m *LocalMatchmaker) Process() {
		ticketsToDelete := make(map[string]struct{}, len(currentMatchedEntries))
		for _, entry := range currentMatchedEntries {
			if _, ok := ticketsToDelete[entry.Ticket]; !ok {
				batchSize++
				batch.Delete(bluge.Identifier(entry.Ticket))
				ticketsToDelete[entry.Ticket] = struct{}{}
			}
			delete(m.indexes, entry.Ticket)
@@ -662,12 +668,6 @@ func (m *LocalMatchmaker) Process() {
		}
	}

	if batchSize > 0 {
		if err := m.indexWriter.Batch(batch); err != nil {
			m.logger.Error("error deleting matchmaker process entries batch", zap.Error(err))
		}
	}

	m.Unlock()

	if matchedEntriesCount := len(matchedEntries); matchedEntriesCount > 0 {