Commit fbe9d808 authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Limit maximum number of concurrent leaderboard/tournament callback executions.

parent 79b71854
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -15,6 +15,7 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr
- Developer console presence count is no longer added together across nodes.
- Runtime create tournament calls always return any existing tournament after repeated calls with the same ID.
- Upgrade to Go 1.13.4 and Debian buster-slim for base docker images.
- Limit maximum number of concurrent leaderboard/tournament callback executions.

### Fixed
- Correctly handle errors when concurrently writing new storage objects.
+1 −1
Original line number Diff line number Diff line
@@ -122,7 +122,7 @@ func main() {
	router := server.NewLocalMessageRouter(sessionRegistry, tracker, jsonpbMarshaler)
	leaderboardCache := server.NewLocalLeaderboardCache(logger, startupLogger, db)
	leaderboardRankCache := server.NewLocalLeaderboardRankCache(startupLogger, db, config.GetLeaderboard(), leaderboardCache)
	leaderboardScheduler := server.NewLocalLeaderboardScheduler(logger, db, leaderboardCache, leaderboardRankCache)
	leaderboardScheduler := server.NewLocalLeaderboardScheduler(logger, db, config, leaderboardCache, leaderboardRankCache)
	matchRegistry := server.NewLocalMatchRegistry(logger, startupLogger, config, tracker, router, config.GetName())
	tracker.SetMatchJoinListener(matchRegistry.Join)
	tracker.SetMatchLeaveListener(matchRegistry.Leave)
+15 −5
Original line number Diff line number Diff line
@@ -213,6 +213,12 @@ func CheckConfig(logger *zap.Logger, config Config) map[string]string {
	if config.GetTracker().EventQueueSize < 1 {
		logger.Fatal("Tracker presence event queue size must be >= 1", zap.Int("tracker.event_queue_size", config.GetTracker().EventQueueSize))
	}
	if config.GetLeaderboard().CallbackQueueSize < 1 {
		logger.Fatal("Leaderboard callback queue stack size must be >= 1", zap.Int("leaderboard.callback_queue_size", config.GetLeaderboard().CallbackQueueSize))
	}
	if config.GetLeaderboard().CallbackQueueWorkers < 1 {
		logger.Fatal("Leaderboard callback queue workers must be >= 1", zap.Int("leaderboard.callback_queue_workers", config.GetLeaderboard().CallbackQueueWorkers))
	}

	// If the runtime path is not overridden, set it to `datadir/modules`.
	if config.GetRuntime().Path == "" {
@@ -606,7 +612,7 @@ type RuntimeConfig struct {
	MaxCount          int               `yaml:"max_count" json:"max_count" usage:"Maximum number of runtime instances to allocate. Default 256."`
	CallStackSize     int               `yaml:"call_stack_size" json:"call_stack_size" usage:"Size of each runtime instance's call stack. Default 128."`
	RegistrySize      int               `yaml:"registry_size" json:"registry_size" usage:"Size of each runtime instance's registry. Default 512."`
	EventQueueSize    int               `yaml:"event_queue_size" json:"event_queue_size" usage:"Size of the event queue buffer. Default 8192."`
	EventQueueSize    int               `yaml:"event_queue_size" json:"event_queue_size" usage:"Size of the event queue buffer. Default 65536."`
	EventQueueWorkers int               `yaml:"event_queue_workers" json:"event_queue_workers" usage:"Number of workers to use for concurrent processing of events. Default 8."`
}

@@ -632,7 +638,7 @@ type MatchConfig struct {
	CallQueueSize        int `yaml:"call_queue_size" json:"call_queue_size" usage:"Size of the authoritative match buffer that sequences calls to match handler callbacks to ensure no overlaps. Default 128."`
	JoinAttemptQueueSize int `yaml:"join_attempt_queue_size" json:"join_attempt_queue_size" usage:"Size of the authoritative match buffer that limits the number of in-progress join attempts. Default 128."`
	DeferredQueueSize    int `yaml:"deferred_queue_size" json:"deferred_queue_size" usage:"Size of the authoritative match buffer that holds deferred message broadcasts until the end of each loop execution. Default 128."`
	JoinMarkerDeadlineMs int `yaml:"join_marker_deadline_ms" json:"join_marker_deadline_ms" usage:"Deadline in milliseconds that client authoritative match joins will wait for match handlers to acknowledge joins. Default 5000."`
	JoinMarkerDeadlineMs int `yaml:"join_marker_deadline_ms" json:"join_marker_deadline_ms" usage:"Deadline in milliseconds that client authoritative match joins will wait for match handlers to acknowledge joins. Default 15000."`
}

// NewMatchConfig creates a new MatchConfig struct.
@@ -642,7 +648,7 @@ func NewMatchConfig() *MatchConfig {
		CallQueueSize:        128,
		JoinAttemptQueueSize: 128,
		DeferredQueueSize:    128,
		JoinMarkerDeadlineMs: 5000,
		JoinMarkerDeadlineMs: 15000,
	}
}

@@ -690,11 +696,15 @@ func NewConsoleConfig() *ConsoleConfig {
// LeaderboardConfig is configuration relevant to the leaderboard system.
type LeaderboardConfig struct {
	BlacklistRankCache   []string `yaml:"blacklist_rank_cache" json:"blacklist_rank_cache" usage:"Disable rank cache for leaderboards with matching identifiers. To disable rank cache entirely, use '*', otherwise leave blank to enable rank cache."`
	CallbackQueueSize    int      `yaml:"callback_queue_size" json:"callback_queue_size" usage:"Size of the leaderboard and tournament callback queue that sequences expiry/reset/end invocations. Default 65536."`
	CallbackQueueWorkers int      `yaml:"callback_queue_workers" json:"callback_queue_workers" usage:"Number of workers to use for concurrent processing of leaderboard and tournament callbacks. Default 8."`
}

// NewLeaderboardConfig creates a new LeaderboardConfig struct.
func NewLeaderboardConfig() *LeaderboardConfig {
	return &LeaderboardConfig{
		BlacklistRankCache:   []string{},
		CallbackQueueSize:    65536,
		CallbackQueueWorkers: 8,
	}
}
+110 −71
Original line number Diff line number Diff line
@@ -25,6 +25,13 @@ import (
	"go.uber.org/zap"
)

type LeaderboardSchedulerCallback struct {
	id          string
	leaderboard *Leaderboard
	ts          int64
	t           time.Time
}

type LeaderboardScheduler interface {
	Start(runtime *Runtime)
	Pause()
@@ -37,26 +44,33 @@ type LocalLeaderboardScheduler struct {
	sync.Mutex
	logger    *zap.Logger
	db        *sql.DB
	config    Config
	cache     LeaderboardCache
	rankCache LeaderboardRankCache
	runtime   *Runtime

	fnLeaderboardReset RuntimeLeaderboardResetFunction
	fnTournamentReset  RuntimeTournamentResetFunction
	fnTournamentEnd    RuntimeTournamentEndFunction

	endActiveTimer *time.Timer
	expiryTimer    *time.Timer
	lastEnd        int64
	lastExpiry     int64

	started bool
	queue   chan *LeaderboardSchedulerCallback
	active  *atomic.Uint32

	ctx         context.Context
	ctxCancelFn context.CancelFunc
}

func NewLocalLeaderboardScheduler(logger *zap.Logger, db *sql.DB, cache LeaderboardCache, rankCache LeaderboardRankCache) LeaderboardScheduler {
func NewLocalLeaderboardScheduler(logger *zap.Logger, db *sql.DB, config Config, cache LeaderboardCache, rankCache LeaderboardRankCache) LeaderboardScheduler {
	ctx, ctxCancelFn := context.WithCancel(context.Background())
	return &LocalLeaderboardScheduler{
		logger:    logger,
		db:        db,
		config:    config,
		cache:     cache,
		rankCache: rankCache,

@@ -65,6 +79,7 @@ func NewLocalLeaderboardScheduler(logger *zap.Logger, db *sql.DB, cache Leaderbo
		// lastEnd only initialized when needed.
		// lastExpiry only initialized when needed.

		queue:  make(chan *LeaderboardSchedulerCallback, config.GetLeaderboard().CallbackQueueSize),
		active: atomic.NewUint32(1),

		ctx:         ctx,
@@ -74,8 +89,18 @@ func NewLocalLeaderboardScheduler(logger *zap.Logger, db *sql.DB, cache Leaderbo

func (ls *LocalLeaderboardScheduler) Start(runtime *Runtime) {
	ls.logger.Info("Leaderboard scheduler start")
	ls.started = true

	// Capture callback references, if any are registered.
	ls.fnLeaderboardReset = runtime.LeaderboardReset()
	ls.fnTournamentReset = runtime.TournamentReset()
	ls.fnTournamentEnd = runtime.TournamentEnd()

	// Start the required number of callback workers.
	for i := 0; i < ls.config.GetLeaderboard().CallbackQueueWorkers; i++ {
		go ls.invokeCallback()
	}

	ls.runtime = runtime
	ls.Update()
}

@@ -141,7 +166,7 @@ func (ls *LocalLeaderboardScheduler) Stop() {
}

func (ls *LocalLeaderboardScheduler) Update() {
	if ls.runtime == nil {
	if !ls.started {
		// In case the update is called during runtime VM init, skip setting timers until ready.
		return
	}
@@ -239,13 +264,13 @@ func (ls *LocalLeaderboardScheduler) Update() {
	if endActiveDuration > -1 {
		ls.logger.Debug("Setting timer to run end active function", zap.Duration("end_active", endActiveDuration), zap.Strings("ids", endActiveLeaderboardIds))
		ls.endActiveTimer = time.AfterFunc(endActiveDuration, func() {
			ls.invokeEndActiveElapse(time.Unix(earliestEndActive, 0).UTC(), endActiveLeaderboardIds)
			ls.queueEndActiveElapse(time.Unix(earliestEndActive, 0).UTC(), endActiveLeaderboardIds)
		})
	}
	if expiryDuration > -1 {
		ls.logger.Debug("Setting timer to run expiry function", zap.Duration("expiry", expiryDuration), zap.Strings("ids", expiryLeaderboardIds))
		ls.expiryTimer = time.AfterFunc(expiryDuration, func() {
			ls.invokeExpiryElapse(time.Unix(earliestExpiry, 0).UTC(), expiryLeaderboardIds)
			ls.queueExpiryElapse(time.Unix(earliestExpiry, 0).UTC(), expiryLeaderboardIds)
		})
	}
	ls.Unlock()
@@ -253,15 +278,14 @@ func (ls *LocalLeaderboardScheduler) Update() {
	ls.logger.Info("Leaderboard scheduler update", zap.Duration("end_active", endActiveDuration), zap.Int("end_active_count", len(endActiveLeaderboardIds)), zap.Duration("expiry", expiryDuration), zap.Int("expiry_count", len(expiryLeaderboardIds)))
}

func (ls *LocalLeaderboardScheduler) invokeEndActiveElapse(t time.Time, ids []string) {
func (ls *LocalLeaderboardScheduler) queueEndActiveElapse(t time.Time, ids []string) {
	if ls.active.Load() != 1 {
		// Not active.
		return
	}

	// Skip processing if there is no tournament end callback registered.
	fn := ls.runtime.TournamentEnd()
	if fn == nil {
	if ls.fnTournamentEnd == nil {
		return
	}

@@ -280,41 +304,21 @@ func (ls *LocalLeaderboardScheduler) invokeEndActiveElapse(t time.Time, ids []st

	ls.logger.Info("Leaderboard scheduler end active", zap.Int("count", len(ids)))

	go func() {
		// Process the current set of tournament ends.
		for _, id := range ids {
		query := `SELECT 
id, sort_order, reset_schedule, metadata, create_time, 
category, description, duration, end_time, max_size, max_num_score, title, size, start_time
FROM leaderboard
WHERE id = $1`
		row := ls.db.QueryRowContext(ls.ctx, query, id)
		tournament, err := parseTournament(row, t)
		if err != nil {
			if err != sql.ErrNoRows {
				// Do not log if tournament was deleted before it reached the scheduler here.
				ls.logger.Error("Error retrieving tournament to invoke end callback", zap.Error(err), zap.String("id", id))
			}
			continue
		}

		// Trigger callback on a goroutine so any extended processing does not block future scheduling.
		go func() {
			if err := fn(ls.ctx, tournament, int64(tournament.EndActive), int64(tournament.NextReset)); err != nil {
				ls.logger.Warn("Failed to invoke tournament end callback", zap.Error(err))
			// Will block if the queue is full.
			ls.queue <- &LeaderboardSchedulerCallback{id: id, ts: ts}
		}
	}()
}
}

func (ls *LocalLeaderboardScheduler) invokeExpiryElapse(t time.Time, ids []string) {
func (ls *LocalLeaderboardScheduler) queueExpiryElapse(t time.Time, ids []string) {
	if ls.active.Load() != 1 {
		// Not active.
		return
	}

	fnLeaderboardReset := ls.runtime.LeaderboardReset()
	fnTournamentReset := ls.runtime.TournamentReset()

	ts := t.Unix()
	tMinusOne := time.Unix(ts-1, 0).UTC()

@@ -333,14 +337,34 @@ func (ls *LocalLeaderboardScheduler) invokeExpiryElapse(t time.Time, ids []strin

	ls.logger.Info("Leaderboard scheduler expiry reset", zap.Int("count", len(ids)))

	// Process the current set of leaderboard and tournament resets.
	go func() {
		// Queue the current set of leaderboard and tournament resets.
		// Executes inside a goroutine to ensure further invocation timings are not skewed.
		for _, id := range ids {
		leaderboardOrTournament := ls.cache.Get(id)
		if leaderboardOrTournament == nil {
			leaderboard := ls.cache.Get(id)
			if leaderboard == nil {
				// Cached entry was deleted before it reached the scheduler here.
				continue
			}
		if leaderboardOrTournament.IsTournament() {
			if !leaderboard.IsTournament() && ls.fnLeaderboardReset == nil {
				// Skip further processing if there is no leaderboard reset callback registered.
				// Tournaments have some processing to do even if no callback is registered.
				continue
			}
			// Will block if queue is full.
			ls.queue <- &LeaderboardSchedulerCallback{leaderboard: leaderboard, ts: ts, t: tMinusOne}
		}
	}()
}

func (ls *LocalLeaderboardScheduler) invokeCallback() {
	for {
		select {
		case <-ls.ctx.Done():
			return
		case callback := <-ls.queue:
			if callback.leaderboard != nil {
				if callback.leaderboard.IsTournament() {
					// Tournament, fetch most up to date info for size etc.
					// Some processing is needed even if there is no runtime callback registered for tournament reset.
					query := `SELECT 
@@ -348,35 +372,50 @@ id, sort_order, reset_schedule, metadata, create_time,
category, description, duration, end_time, max_size, max_num_score, title, size, start_time
FROM leaderboard
WHERE id = $1`
			row := ls.db.QueryRowContext(ls.ctx, query, id)
			tournament, err := parseTournament(row, tMinusOne)
					row := ls.db.QueryRowContext(ls.ctx, query, callback.id)
					tournament, err := parseTournament(row, callback.t)
					if err != nil {
				ls.logger.Error("Error retrieving tournament to invoke reset callback", zap.Error(err), zap.String("id", id))
						ls.logger.Error("Error retrieving tournament to invoke reset callback", zap.Error(err), zap.String("id", callback.id))
						continue
					}

					// Reset tournament size in DB to make it immediately usable for the next active period.
			if _, err := ls.db.ExecContext(ls.ctx, "UPDATE leaderboard SET size = 0 WHERE id = $1", id); err != nil {
				ls.logger.Error("Could not reset leaderboard size", zap.Error(err), zap.String("id", id))
					if _, err := ls.db.ExecContext(ls.ctx, "UPDATE leaderboard SET size = 0 WHERE id = $1", callback.id); err != nil {
						ls.logger.Error("Could not reset leaderboard size", zap.Error(err), zap.String("id", callback.id))
					}

			if fnTournamentReset != nil {
				// Trigger callback on a goroutine so any extended processing does not block future scheduling.
				go func() {
					if err := fnTournamentReset(ls.ctx, tournament, int64(tournament.EndActive), int64(tournament.NextReset)); err != nil {
					if ls.fnTournamentReset != nil {
						if err := ls.fnTournamentReset(ls.ctx, tournament, int64(tournament.EndActive), int64(tournament.NextReset)); err != nil {
							ls.logger.Warn("Failed to invoke tournament reset callback", zap.Error(err))
						}
				}()
					}
				} else {
					// Leaderboard.
			if fnLeaderboardReset != nil {
				// Trigger callback on a goroutine so any extended processing does not block future scheduling.
				go func() {
					if err := fnLeaderboardReset(ls.ctx, leaderboardOrTournament, ts); err != nil {
					// fnLeaderboardReset cannot be nil here, if it was the callback would not be queued at all.
					if err := ls.fnLeaderboardReset(ls.ctx, callback.leaderboard, callback.ts); err != nil {
						ls.logger.Warn("Failed to invoke leaderboard reset callback", zap.Error(err))
					}
				}()
				}
			} else {
				query := `SELECT 
id, sort_order, reset_schedule, metadata, create_time, 
category, description, duration, end_time, max_size, max_num_score, title, size, start_time
FROM leaderboard
WHERE id = $1`
				row := ls.db.QueryRowContext(ls.ctx, query, callback.id)
				tournament, err := parseTournament(row, callback.t)
				if err != nil {
					if err != sql.ErrNoRows {
						// Do not log if tournament was deleted before it reached the scheduler here.
						ls.logger.Error("Error retrieving tournament to invoke end callback", zap.Error(err), zap.String("id", callback.id))
					}
					continue
				}

				// fnTournamentEnd cannot be nil here, if it was the callback would not be queued at all.
				if err := ls.fnTournamentEnd(ls.ctx, tournament, int64(tournament.EndActive), int64(tournament.NextReset)); err != nil {
					ls.logger.Warn("Failed to invoke tournament end callback", zap.Error(err))
				}
			}
		}
	}