Commit 931d2cdc authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Improve leaderboard rank cache population at startup.

parent 6ced5f30
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -15,6 +15,7 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr
- Use crypto random to seed global random instance if possible.
- Allow migrate subcommand to use database names that contain dashes.
- Add senderID param to channelIdBuild function.
- Improve leaderboard rank cache population at startup.

### Changed
- JavaScript global variables are made immutable after the `InitModule` function is invoked.
+1 −1
Original line number Diff line number Diff line
@@ -143,7 +143,7 @@ func main() {
	tracker := server.StartLocalTracker(logger, config, sessionRegistry, statusRegistry, metrics, jsonpbMarshaler)
	router := server.NewLocalMessageRouter(sessionRegistry, tracker, jsonpbMarshaler)
	leaderboardCache := server.NewLocalLeaderboardCache(logger, startupLogger, db)
	leaderboardRankCache := server.NewLocalLeaderboardRankCache(startupLogger, db, config.GetLeaderboard(), leaderboardCache)
	leaderboardRankCache := server.NewLocalLeaderboardRankCache(ctx, startupLogger, db, config.GetLeaderboard(), leaderboardCache)
	leaderboardScheduler := server.NewLocalLeaderboardScheduler(logger, db, config, leaderboardCache, leaderboardRankCache)
	matchRegistry := server.NewLocalMatchRegistry(logger, startupLogger, config, sessionRegistry, tracker, router, metrics, config.GetName())
	tracker.SetMatchJoinListener(matchRegistry.Join)
+66 −40
Original line number Diff line number Diff line
@@ -15,6 +15,7 @@
package server

import (
	"context"
	"database/sql"
	"sync"
	"time"
@@ -100,7 +101,7 @@ type LocalLeaderboardRankCache struct {

var _ LeaderboardRankCache = &LocalLeaderboardRankCache{}

func NewLocalLeaderboardRankCache(startupLogger *zap.Logger, db *sql.DB, config *LeaderboardConfig, leaderboardCache LeaderboardCache) LeaderboardRankCache {
func NewLocalLeaderboardRankCache(ctx context.Context, startupLogger *zap.Logger, db *sql.DB, config *LeaderboardConfig, leaderboardCache LeaderboardCache) LeaderboardRankCache {
	cache := &LocalLeaderboardRankCache{
		blacklistIds: make(map[string]struct{}, len(config.BlacklistRankCache)),
		blacklistAll: len(config.BlacklistRankCache) == 1 && config.BlacklistRankCache[0] == "*",
@@ -156,29 +157,40 @@ func NewLocalLeaderboardRankCache(startupLogger *zap.Logger, db *sql.DB, config
			}
			cache.Unlock()

			expiryTime := time.Unix(expiryUnix, 0).UTC()

			// Look up all active records for this leaderboard.
			query := `
SELECT owner_id, score, subscore
FROM leaderboard_record
WHERE leaderboard_id = $1 AND expiry_time = $2`
			rows, err := db.Query(query, leaderboard.Id, time.Unix(expiryUnix, 0).UTC())
			var score int64
			var subscore int64
			var ownerIDStr string
			for {
				ranks := make(map[uuid.UUID]skiplist.Interface, 10_000)

				query := "SELECT owner_id, score, subscore FROM leaderboard_record WHERE leaderboard_id = $1 AND expiry_time = $2"
				params := []interface{}{leaderboard.Id, expiryTime}
				if ownerIDStr != "" {
					query += " AND (leaderboard_id, expiry_time, score, subscore, owner_id) > ($1, $2, $3, $4, $5)"
					params = append(params, score, subscore, ownerIDStr)
				}
				// Does not need to be in leaderboard order, sorting is done in the rank cache structure anyway.
				query += " ORDER BY leaderboard_id ASC, expiry_time ASC, score ASC, subscore ASC, owner_id ASC LIMIT 10000"

				rows, err := db.QueryContext(ctx, query, params...)
				if err != nil {
					startupLogger.Error("Failed to caching leaderboard ranks", zap.String("leaderboard_id", leaderboard.Id), zap.Error(err))
				continue
					break
				}

			// Process the records.
				// Read score information.
				for rows.Next() {
				var ownerIDStr string
				var score int64
				var subscore int64

					if err = rows.Scan(&ownerIDStr, &score, &subscore); err != nil {
						_ = rows.Close()
						startupLogger.Error("Failed to scan leaderboard rank data", zap.String("leaderboard_id", leaderboard.Id), zap.Error(err))
						break
					}
					ownerID, err := uuid.FromString(ownerIDStr)
					if err != nil {
						_ = rows.Close()
						startupLogger.Error("Failed to parse scanned leaderboard rank data", zap.String("leaderboard_id", leaderboard.Id), zap.String("owner_id", ownerIDStr), zap.Error(err))
						break
					}
@@ -198,17 +210,31 @@ WHERE leaderboard_id = $1 AND expiry_time = $2`
							Subscore: subscore,
						}
					}
					ranks[ownerID] = rankData
				}
				_ = rows.Close()

				rankCount := len(ranks)
				if rankCount == 0 {
					// Empty batch of results, end pagination for this leaderboard.
					break
				}
				// Insert into rank cache in batches.
				rankCache.Lock()
				for ownerID, rankData := range ranks {
					if _, alreadyInserted := rankCache.owners[ownerID]; alreadyInserted {
					rankCache.Unlock()
						continue
					}
					rankCache.owners[ownerID] = rankData
					rankCache.cache.Insert(rankData)
				}
				rankCache.Unlock()

				// Stop pagination when reaching the last (incomplete) page.
				if rankCount < 10_000 {
					break
				}
			}
			_ = rows.Close()
		}

		startupLogger.Info("Leaderboard rank cache initialization completed successfully", zap.Strings("cached", cachedLeaderboards), zap.Strings("skipped", skippedLeaderboards))