diff --git a/server/matchmaker.go b/server/matchmaker.go index 05942fbabdb41a3814e537a63f2e74d31616607c..bd34c6a772590ba69a403015d30461376e5bbd33 100644 --- a/server/matchmaker.go +++ b/server/matchmaker.go @@ -314,17 +314,6 @@ func (m *LocalMatchmaker) Process() { m.Unlock() - indexReader, err := m.indexWriter.Reader() - if err != nil { - m.logger.Error("error accessing index reader", zap.Error(err)) - return - } - defer func() { - if err := indexReader.Close(); err != nil { - m.logger.Error("error closing index reader", zap.Error(err)) - } - }() - var threshold bool var timer *time.Timer if m.active.Load() == 1 && m.revThresholdFn != nil { @@ -386,14 +375,22 @@ func (m *LocalMatchmaker) Process() { // matches are equivalent, the longest waiting tickets first. searchRequest.SortBy([]string{"-_score", "created_at"}) + indexReader, err := m.indexWriter.Reader() + if err != nil { + m.logger.Error("error accessing index reader", zap.Error(err)) + continue + } + result, err := indexReader.Search(m.ctx, searchRequest) if err != nil { + _ = indexReader.Close() m.logger.Error("error searching index", zap.Error(err)) continue } blugeMatches, err := IterateBlugeMatches(result, map[string]struct{}{}, m.logger) if err != nil { + _ = indexReader.Close() m.logger.Error("error iterating search results", zap.Error(err)) continue } @@ -592,22 +589,33 @@ func (m *LocalMatchmaker) Process() { matchedEntries = append(matchedEntries, currentMatchedEntries) + var batchSize int + batch := bluge.NewBatch() // Mark tickets as unavailable for further use in this process iteration. for _, currentMatchedEntry := range currentMatchedEntries { if _, found := selectedTickets[currentMatchedEntry.Ticket]; found { continue } selectedTickets[currentMatchedEntry.Ticket] = struct{}{} + batchSize++ + batch.Delete(bluge.Identifier(currentMatchedEntry.Ticket)) + } + if batchSize > 0 { + if err := m.indexWriter.Batch(batch); err != nil { + m.logger.Error("error deleting matchmaker process entries batch", zap.Error(err)) + } } break } } + err = indexReader.Close() + if err != nil { + m.logger.Error("error closing index reader", zap.Error(err)) + continue + } } - var batchSize int - batch := bluge.NewBatch() - m.Lock() for _, ticket := range expiredActiveIndexes { @@ -636,8 +644,6 @@ func (m *LocalMatchmaker) Process() { ticketsToDelete := make(map[string]struct{}, len(currentMatchedEntries)) for _, entry := range currentMatchedEntries { if _, ok := ticketsToDelete[entry.Ticket]; !ok { - batchSize++ - batch.Delete(bluge.Identifier(entry.Ticket)) ticketsToDelete[entry.Ticket] = struct{}{} } delete(m.indexes, entry.Ticket) @@ -662,12 +668,6 @@ func (m *LocalMatchmaker) Process() { } } - if batchSize > 0 { - if err := m.indexWriter.Batch(batch); err != nil { - m.logger.Error("error deleting matchmaker process entries batch", zap.Error(err)) - } - } - m.Unlock() if matchedEntriesCount := len(matchedEntries); matchedEntriesCount > 0 {