Commit 13af91f4 authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Improve matchmaker and match registry memory reuse.

parent 75cf6cac
Loading
Loading
Loading
Loading
+11 −6
Original line number Diff line number Diff line
@@ -265,6 +265,9 @@ func CheckConfig(logger *zap.Logger, config Config) map[string]string {
	if config.GetMatchmaker().MaxIntervals < 1 {
		logger.Fatal("Matchmaker max intervals must be >= 1", zap.Int("matchmaker.max_intervals", config.GetMatchmaker().MaxIntervals))
	}
	if config.GetMatchmaker().BatchPoolSize < 1 {
		logger.Fatal("Matchmaker batch pool size must be >= 1", zap.Int("matchmaker.batch_pool_size", config.GetMatchmaker().BatchPoolSize))
	}

	// If the runtime path is not overridden, set it to `datadir/modules`.
	if config.GetRuntime().Path == "" {
@@ -877,6 +880,7 @@ type MatchmakerConfig struct {
	MaxTickets    int `yaml:"max_tickets" json:"max_tickets" usage:"Maximum number of concurrent matchmaking tickets allowed per session or party. Default 3."`
	IntervalSec   int `yaml:"interval_sec" json:"interval_sec" usage:"How quickly the matchmaker attempts to form matches, in seconds. Default 15."`
	MaxIntervals  int `yaml:"max_intervals" json:"max_intervals" usage:"How many intervals the matchmaker attempts to find matches at the max player count, before allowing min count. Default 2."`
	BatchPoolSize int `yaml:"batch_pool_size" json:"batch_pool_size" usage:"Number of concurrent indexing batches that will be allocated."`
}

func NewMatchmakerConfig() *MatchmakerConfig {
@@ -884,5 +888,6 @@ func NewMatchmakerConfig() *MatchmakerConfig {
		MaxTickets:    3,
		IntervalSec:   15,
		MaxIntervals:  2,
		BatchPoolSize: 32,
	}
}
+80 −66
Original line number Diff line number Diff line
@@ -119,6 +119,7 @@ func NewLocalLeaderboardRankCache(startupLogger *zap.Logger, db *sql.DB, config

	nowTime := time.Now().UTC()

	go func() {
		skippedLeaderboards := make([]string, 0, 10)
		leaderboards := leaderboardCache.GetAllLeaderboards()
		cachedLeaderboards := make([]string, 0, len(leaderboards))
@@ -145,12 +146,17 @@ func NewLocalLeaderboardRankCache(startupLogger *zap.Logger, db *sql.DB, config
			}

			// Prepare structure to receive rank data.
		rankCache := &RankCache{
			key := LeaderboardWithExpiry{LeaderboardId: leaderboard.Id, Expiry: expiryUnix}
			cache.Lock()
			rankCache, found := cache.cache[key]
			if !found {
				rankCache = &RankCache{
					owners: make(map[uuid.UUID]skiplist.Interface),
					cache:  skiplist.New(),
				}
		key := LeaderboardWithExpiry{LeaderboardId: leaderboard.Id, Expiry: expiryUnix}
				cache.cache[key] = rankCache
			}
			cache.Unlock()

			// Look up all active records for this leaderboard.
			query := `
@@ -159,8 +165,8 @@ FROM leaderboard_record
WHERE leaderboard_id = $1 AND expiry_time = $2`
			rows, err := db.Query(query, leaderboard.Id, time.Unix(expiryUnix, 0).UTC())
			if err != nil {
			startupLogger.Fatal("Failed to caching leaderboard ranks", zap.String("leaderboard_id", leaderboard.Id), zap.Error(err))
			return nil
				startupLogger.Error("Failed to caching leaderboard ranks", zap.String("leaderboard_id", leaderboard.Id), zap.Error(err))
				continue
			}

			// Process the records.
@@ -170,13 +176,13 @@ WHERE leaderboard_id = $1 AND expiry_time = $2`
				var subscore int64

				if err = rows.Scan(&ownerIDStr, &score, &subscore); err != nil {
				startupLogger.Fatal("Failed to scan leaderboard rank data", zap.String("leaderboard_id", leaderboard.Id), zap.Error(err))
				return nil
					startupLogger.Error("Failed to scan leaderboard rank data", zap.String("leaderboard_id", leaderboard.Id), zap.Error(err))
					break
				}
				ownerID, err := uuid.FromString(ownerIDStr)
				if err != nil {
				startupLogger.Fatal("Failed to parse scanned leaderboard rank data", zap.String("leaderboard_id", leaderboard.Id), zap.String("owner_id", ownerIDStr), zap.Error(err))
				return nil
					startupLogger.Error("Failed to parse scanned leaderboard rank data", zap.String("leaderboard_id", leaderboard.Id), zap.String("owner_id", ownerIDStr), zap.Error(err))
					break
				}

				// Prepare new rank data for this leaderboard entry.
@@ -195,13 +201,21 @@ WHERE leaderboard_id = $1 AND expiry_time = $2`
					}
				}

				rankCache.Lock()
				if _, alreadyInserted := rankCache.owners[ownerID]; alreadyInserted {
					rankCache.Unlock()
					continue
				}
				rankCache.owners[ownerID] = rankData
				rankCache.cache.Insert(rankData)
				rankCache.Unlock()
			}
			_ = rows.Close()
		}

		startupLogger.Info("Leaderboard rank cache initialization completed successfully", zap.Strings("cached", cachedLeaderboards), zap.Strings("skipped", skippedLeaderboards))
	}()

	return cache
}

+4 −3
Original line number Diff line number Diff line
@@ -181,13 +181,14 @@ func NewLocalMatchRegistry(logger, startupLogger *zap.Logger, config Config, ses

	go func() {
		ticker := time.NewTicker(time.Duration(config.GetMatch().LabelUpdateIntervalMs) * time.Millisecond)
		batch := r.index.NewBatch()
		for {
			select {
			case <-ctx.Done():
				ticker.Stop()
				return
			case <-ticker.C:
				r.processLabelUpdates()
				r.processLabelUpdates(batch)
			}
		}
	}()
@@ -195,7 +196,7 @@ func NewLocalMatchRegistry(logger, startupLogger *zap.Logger, config Config, ses
	return r
}

func (r *LocalMatchRegistry) processLabelUpdates() {
func (r *LocalMatchRegistry) processLabelUpdates(batch *bleve.Batch) {
	r.pendingUpdatesMutex.Lock()
	if len(r.pendingUpdates) == 0 {
		r.pendingUpdatesMutex.Unlock()
@@ -205,7 +206,6 @@ func (r *LocalMatchRegistry) processLabelUpdates() {
	r.pendingUpdates = make(map[string]*MatchIndexEntry, len(pendingUpdates)+10)
	r.pendingUpdatesMutex.Unlock()

	batch := r.index.NewBatch()
	for id, op := range pendingUpdates {
		if op == nil {
			batch.Delete(id)
@@ -219,6 +219,7 @@ func (r *LocalMatchRegistry) processLabelUpdates() {
	if err := r.index.Batch(batch); err != nil {
		r.logger.Error("error processing match label updates", zap.Error(err))
	}
	batch.Reset()
}

func (r *LocalMatchRegistry) CreateMatch(ctx context.Context, logger *zap.Logger, createFn RuntimeMatchCreateFunction, module string, params map[string]interface{}) (string, error) {
+38 −17
Original line number Diff line number Diff line
@@ -135,6 +135,7 @@ type LocalMatchmaker struct {
	ctxCancelFn context.CancelFunc

	index          bleve.Index
	batchPool      chan *bleve.Batch
	sessionTickets map[string]map[string]struct{}
	partyTickets   map[string]map[string]struct{}
	entries        map[string][]*MatchmakerEntry
@@ -165,6 +166,7 @@ func NewLocalMatchmaker(logger, startupLogger *zap.Logger, config Config, router
		ctxCancelFn: ctxCancelFn,

		index:          index,
		batchPool:      make(chan *bleve.Batch, config.GetMatchmaker().BatchPoolSize),
		sessionTickets: make(map[string]map[string]struct{}),
		partyTickets:   make(map[string]map[string]struct{}),
		entries:        make(map[string][]*MatchmakerEntry),
@@ -172,14 +174,19 @@ func NewLocalMatchmaker(logger, startupLogger *zap.Logger, config Config, router
		activeIndexes:  make(map[string]*MatchmakerIndex),
	}

	for i := 0; i < config.GetMatchmaker().BatchPoolSize; i++ {
		m.batchPool <- m.index.NewBatch()
	}

	go func() {
		ticker := time.NewTicker(time.Duration(config.GetMatchmaker().IntervalSec) * time.Second)
		batch := m.index.NewBatch()
		for {
			select {
			case <-ctx.Done():
				return
			case <-ticker.C:
				m.process()
				m.process(batch)
			}
		}
	}()
@@ -192,7 +199,7 @@ func (m *LocalMatchmaker) Stop() {
	m.ctxCancelFn()
}

func (m *LocalMatchmaker) process() {
func (m *LocalMatchmaker) process(batch *bleve.Batch) {
	matchedEntries := make([][]*MatchmakerEntry, 0, 5)

	m.Lock()
@@ -327,7 +334,6 @@ func (m *LocalMatchmaker) process() {

				// Remove all entries/indexes that have just matched. It must be done here so any following process iterations
				// cannot pick up the same tickets to match against.
				batch := m.index.NewBatch()
				ticketsToDelete := make(map[string]struct{}, len(currentMatchedEntries))
				for _, entry := range currentMatchedEntries {
					if _, ok := ticketsToDelete[entry.Ticket]; !ok {
@@ -357,6 +363,7 @@ func (m *LocalMatchmaker) process() {
				if err := m.index.Batch(batch); err != nil {
					m.logger.Error("error deleting matchmaker process entries batch", zap.Error(err))
				}
				batch.Reset()

				break
			}
@@ -572,17 +579,19 @@ func (m *LocalMatchmaker) RemoveSession(sessionID, ticket string) error {
}

func (m *LocalMatchmaker) RemoveSessionAll(sessionID string) error {
	batch := <-m.batchPool

	m.Lock()

	sessionTickets, ok := m.sessionTickets[sessionID]
	if !ok {
		// Session does not have any active matchmaking tickets.
		m.Unlock()
		m.batchPool <- batch
		return nil
	}
	delete(m.sessionTickets, sessionID)

	batch := m.index.NewBatch()
	for ticket := range sessionTickets {
		batch.Delete(ticket)

@@ -627,15 +636,20 @@ func (m *LocalMatchmaker) RemoveSessionAll(sessionID string) error {
		}
	}

	if batch.Size() > 0 {
		if err := m.index.Batch(batch); err != nil {
	if batch.Size() == 0 {
		m.Unlock()
			m.logger.Error("error deleting matchmaker entries batch", zap.Error(err))
			return ErrMatchmakerDelete
		}
		m.batchPool <- batch
		return nil
	}

	err := m.index.Batch(batch)
	m.Unlock()
	batch.Reset()
	m.batchPool <- batch
	if err != nil {
		m.logger.Error("error deleting matchmaker entries batch", zap.Error(err))
		return ErrMatchmakerDelete
	}
	return nil
}

@@ -687,17 +701,19 @@ func (m *LocalMatchmaker) RemoveParty(partyID, ticket string) error {
}

func (m *LocalMatchmaker) RemovePartyAll(partyID string) error {
	batch := <-m.batchPool

	m.Lock()

	partyTickets, ok := m.partyTickets[partyID]
	if !ok {
		// Party does not have any active matchmaking tickets.
		m.Unlock()
		m.batchPool <- batch
		return nil
	}
	delete(m.partyTickets, partyID)

	batch := m.index.NewBatch()
	for ticket := range partyTickets {
		batch.Delete(ticket)

@@ -728,14 +744,19 @@ func (m *LocalMatchmaker) RemovePartyAll(partyID string) error {
		}
	}

	if batch.Size() > 0 {
		if err := m.index.Batch(batch); err != nil {
	if batch.Size() == 0 {
		m.Unlock()
			m.logger.Error("error deleting matchmaker entries batch", zap.Error(err))
			return ErrMatchmakerDelete
		}
		m.batchPool <- batch
		return nil
	}

	err := m.index.Batch(batch)
	m.Unlock()
	batch.Reset()
	m.batchPool <- batch
	if err != nil {
		m.logger.Error("error deleting matchmaker entries batch", zap.Error(err))
		return ErrMatchmakerDelete
	}
	return nil
}