Commit 7f4afe35 authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Additional matchmaker options.

parent 9807cc45
Loading
Loading
Loading
Loading
+6 −4
Original line number Diff line number Diff line
@@ -933,6 +933,7 @@ type MatchmakerConfig struct {
	IntervalSec   int  `yaml:"interval_sec" json:"interval_sec" usage:"How quickly the matchmaker attempts to form matches, in seconds. Default 15."`
	MaxIntervals  int  `yaml:"max_intervals" json:"max_intervals" usage:"How many intervals the matchmaker attempts to find matches at the max player count, before allowing min count. Default 2."`
	BatchPoolSize int  `yaml:"batch_pool_size" json:"batch_pool_size" usage:"Number of concurrent indexing batches that will be allocated."`
	RevPrecision  bool `yaml:"rev_precision" json:"rev_precision" usage:"Reverse matching precision. Default true."`
}

func NewMatchmakerConfig() *MatchmakerConfig {
@@ -941,6 +942,7 @@ func NewMatchmakerConfig() *MatchmakerConfig {
		IntervalSec:   15,
		MaxIntervals:  2,
		BatchPoolSize: 32,
		RevPrecision:  true,
	}
}

+1 −1
Original line number Diff line number Diff line
@@ -37,7 +37,7 @@ func loggerForTest(t *testing.T) *zap.Logger {

// loggerForBenchmark allows for easily adjusting log output produced by tests in one place
func loggerForBenchmark(b *testing.B) *zap.Logger {
	return NewJSONLogger(os.Stdout, zapcore.ErrorLevel, JSONFormat)
	return NewJSONLogger(os.Stdout, zapcore.WarnLevel, JSONFormat)
}

type fatalable interface {
+35 −11
Original line number Diff line number Diff line
@@ -204,6 +204,7 @@ type LocalMatchmaker struct {
	entries          map[string][]*MatchmakerEntry
	indexes          map[string]*MatchmakerIndex
	activeIndexes    map[string]*MatchmakerIndex
	revCache         map[string]map[string]bool
}

func NewLocalMatchmaker(logger, startupLogger *zap.Logger, config Config, router MessageRouter, runtime *Runtime) Matchmaker {
@@ -234,6 +235,7 @@ func NewLocalMatchmaker(logger, startupLogger *zap.Logger, config Config, router
		entries:        make(map[string][]*MatchmakerEntry),
		indexes:        make(map[string]*MatchmakerIndex),
		activeIndexes:  make(map[string]*MatchmakerIndex),
		revCache:       make(map[string]map[string]bool),
	}

	go func() {
@@ -368,7 +370,7 @@ func (m *LocalMatchmaker) Process() {
				continue
			}

			outerMutualMatch, err := validateMatch(m.ctx, indexReader, hitIndex.Query, ticket)
			outerMutualMatch, err := validateMatch(m, indexReader, hitIndex.Query, hit.ID, ticket)
			if err != nil {
				m.logger.Error("error validating mutual match", zap.Error(err))
				continue
@@ -412,7 +414,7 @@ func (m *LocalMatchmaker) Process() {
							sessionIdConflict = true
							break
						}
						entryMatchesSearchHitQuery, err := validateMatch(m.ctx, indexReader, hitIndex.Query, entry.Ticket)
						entryMatchesSearchHitQuery, err := validateMatch(m, indexReader, hitIndex.Query, hit.ID, entry.Ticket)
						if err != nil {
							mutualMatchConflict = true
							m.logger.Error("error validating mutual match", zap.Error(err))
@@ -424,7 +426,7 @@ func (m *LocalMatchmaker) Process() {
						}
						// MatchmakerEntry does not have the query, read it out of indexes.
						if entriesIndexEntry, ok := m.indexes[entry.Ticket]; ok {
							searchHitMatchesEntryQuery, err := validateMatch(m.ctx, indexReader, entriesIndexEntry.Query, hit.ID)
							searchHitMatchesEntryQuery, err := validateMatch(m, indexReader, entriesIndexEntry.Query, entry.Ticket, hit.ID)
							if err != nil {
								mutualMatchConflict = true
								m.logger.Error("error validating mutual match", zap.Error(err))
@@ -547,6 +549,7 @@ func (m *LocalMatchmaker) Process() {
					delete(m.entries, entry.Ticket)
					delete(m.indexes, entry.Ticket)
					delete(m.activeIndexes, entry.Ticket)
					delete(m.revCache, entry.Ticket)
					if sessionTickets, ok := m.sessionTickets[entry.Presence.SessionId]; ok {
						if l := len(sessionTickets); l <= 1 {
							delete(m.sessionTickets, entry.Presence.SessionId)
@@ -945,6 +948,7 @@ func (m *LocalMatchmaker) RemoveSession(sessionID, ticket string) error {
	}

	delete(m.activeIndexes, ticket)
	delete(m.revCache, ticket)

	if err := m.indexWriter.Delete(bluge.Identifier(ticket)); err != nil {
		m.Unlock()
@@ -981,6 +985,7 @@ func (m *LocalMatchmaker) RemoveSessionAll(sessionID string) error {
		delete(m.indexes, ticket)

		delete(m.activeIndexes, ticket)
		delete(m.revCache, ticket)

		entries, ok := m.entries[ticket]
		if !ok {
@@ -1058,6 +1063,7 @@ func (m *LocalMatchmaker) RemoveParty(partyID, ticket string) error {
	}

	delete(m.activeIndexes, ticket)
	delete(m.revCache, ticket)

	if err := m.indexWriter.Delete(bluge.Identifier(ticket)); err != nil {
		m.Unlock()
@@ -1094,6 +1100,7 @@ func (m *LocalMatchmaker) RemovePartyAll(partyID string) error {
		delete(m.indexes, ticket)

		delete(m.activeIndexes, ticket)
		delete(m.revCache, ticket)

		entries, ok := m.entries[ticket]
		if !ok {
@@ -1136,6 +1143,7 @@ func (m *LocalMatchmaker) RemoveAll(node string) {
		delete(m.indexes, ticket)

		delete(m.activeIndexes, ticket)
		delete(m.revCache, ticket)

		if index.PartyId != "" {
			partyTickets, ok := m.partyTickets[index.PartyId]
@@ -1188,6 +1196,7 @@ func (m *LocalMatchmaker) Remove(tickets []string) {
		delete(m.indexes, ticket)

		delete(m.activeIndexes, ticket)
		delete(m.revCache, ticket)

		if index.PartyId != "" {
			partyTickets, ok := m.partyTickets[index.PartyId]
@@ -1240,26 +1249,41 @@ func MapMatchmakerIndex(id string, in *MatchmakerIndex) (*bluge.Document, error)
	return rv, nil
}

func validateMatch(ctx context.Context, r *bluge.Reader, queryStr string, ticket string) (bool, error) {
func validateMatch(m *LocalMatchmaker, r *bluge.Reader, queryStr string, fromTicket, toTicket string) (bool, error) {
	if !m.config.GetMatchmaker().RevPrecision {
		return true, nil
	}

	cache, found := m.revCache[fromTicket]
	if found {
		if cachedResult, seenBefore := cache[toTicket]; seenBefore {
			return cachedResult, nil
		}
	}

	ticketQuery, err := ParseQueryString(queryStr)
	if err != nil {
		return false, err
	}

	idQuery := bluge.NewTermQuery(ticket).SetField("_id")
	idQuery := bluge.NewTermQuery(toTicket).SetField("_id")

	topQuery := bluge.NewBooleanQuery()
	topQuery.AddMust(ticketQuery, idQuery)
	topQuery.AddMust(idQuery, ticketQuery)

	req := bluge.NewTopNSearch(0, topQuery).WithStandardAggregations()
	dmi, err := r.Search(ctx, req)
	dmi, err := r.Search(m.ctx, req)
	if err != nil {
		return false, err
	}

	if dmi.Aggregations().Count() != 1 {
		return false, nil
	valid := dmi.Aggregations().Count() == 1

	if found {
		cache[toTicket] = valid
	} else {
		m.revCache[fromTicket] = map[string]bool{toTicket: valid}
	}

	return true, nil
	return valid, nil
}
+323 −173

File changed.

Preview size limit exceeded, changes collapsed.

+1 −1
Original line number Diff line number Diff line
@@ -59,7 +59,7 @@ func createTestPartyHandler(t *testing.T, logger *zap.Logger) (*PartyHandler, fu

	node := "node1"

	mm, cleanup, _ := createTestMatchmaker(t, logger, nil)
	mm, cleanup, _ := createTestMatchmaker(t, logger, true, nil)
	tt := testTracker{}
	tsm := testStreamManager{}
	dmr := DummyMessageRouter{}