Commit 21f3b474 authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Handle updates during leaderboard schedule reset window.

parent b5e020f6
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -4,6 +4,8 @@ All notable changes to this project are documented below.
The format is based on [keep a changelog](http://keepachangelog.com) and this project uses [semantic versioning](http://semver.org).

## [Unreleased]
### Fixed
- Handle updates during leaderboard schedule reset window.

## [2.6.0] - 2019-07-01
### Added
+55 −27
Original line number Diff line number Diff line
@@ -40,10 +40,11 @@ type LocalLeaderboardScheduler struct {
	cache     LeaderboardCache
	rankCache LeaderboardRankCache
	runtime   *Runtime

	endActiveTimer *time.Timer
	expiryTimer    *time.Timer
	nearEndActiveIds []string
	nearExpiryIds    []string
	lastEnd        int64
	lastExpiry     int64

	active *atomic.Uint32

@@ -59,6 +60,11 @@ func NewLocalLeaderboardScheduler(logger *zap.Logger, db *sql.DB, cache Leaderbo
		cache:     cache,
		rankCache: rankCache,

		// endActiveTimer only initialized when needed.
		// expiryTimer only initialized when needed.
		// lastEnd only initialized when needed.
		// lastExpiry only initialized when needed.

		active: atomic.NewUint32(1),

		ctx:         ctx,
@@ -67,11 +73,15 @@ func NewLocalLeaderboardScheduler(logger *zap.Logger, db *sql.DB, cache Leaderbo
}

func (ls *LocalLeaderboardScheduler) Start(runtime *Runtime) {
	ls.logger.Info("Leaderboard scheduler start")

	ls.runtime = runtime
	ls.Update()
}

func (ls *LocalLeaderboardScheduler) Pause() {
	ls.logger.Info("Leaderboard scheduler pause")

	if !ls.active.CAS(1, 0) {
		// Already paused.
		return
@@ -98,6 +108,8 @@ func (ls *LocalLeaderboardScheduler) Pause() {
}

func (ls *LocalLeaderboardScheduler) Resume() {
	ls.logger.Info("Leaderboard scheduler resume")

	if !ls.active.CAS(0, 1) {
		// Already active.
		return
@@ -198,19 +210,16 @@ func (ls *LocalLeaderboardScheduler) Update() {

	endActiveDuration := time.Duration(-1)
	if earliestEndActive > -1 {
		endActiveDuration = time.Unix(earliestEndActive, 0).UTC().Sub(now) + time.Second
		endActiveDuration = time.Unix(earliestEndActive, 0).UTC().Sub(now)
	}

	expiryDuration := time.Duration(-1)
	if earliestExpiry > -1 {
		expiryDuration = time.Unix(earliestExpiry, 0).UTC().Sub(now) + time.Second
		expiryDuration = time.Unix(earliestExpiry, 0).UTC().Sub(now)
	}

	// Replace IDs earmarked for end and expiry, and restart timers as needed.
	ls.Lock()
	ls.nearEndActiveIds = endActiveLeaderboardIds
	ls.nearExpiryIds = expiryLeaderboardIds

	if ls.endActiveTimer != nil {
		if !ls.endActiveTimer.Stop() {
			select {
@@ -228,21 +237,23 @@ func (ls *LocalLeaderboardScheduler) Update() {
		}
	}
	if endActiveDuration > -1 {
		ls.logger.Debug("Setting timer to run end active function", zap.Duration("end_active", endActiveDuration), zap.Strings("ids", ls.nearEndActiveIds))
		ls.logger.Debug("Setting timer to run end active function", zap.Duration("end_active", endActiveDuration), zap.Strings("ids", endActiveLeaderboardIds))
		ls.endActiveTimer = time.AfterFunc(endActiveDuration, func() {
			ls.invokeEndActiveElapse(time.Unix(earliestEndActive-1, 0).UTC())
			ls.invokeEndActiveElapse(time.Unix(earliestEndActive, 0).UTC(), endActiveLeaderboardIds)
		})
	}
	if expiryDuration > -1 {
		ls.logger.Debug("Setting timer to run expiry function", zap.Duration("expiry", expiryDuration), zap.Strings("ids", ls.nearExpiryIds))
		ls.logger.Debug("Setting timer to run expiry function", zap.Duration("expiry", expiryDuration), zap.Strings("ids", expiryLeaderboardIds))
		ls.expiryTimer = time.AfterFunc(expiryDuration, func() {
			ls.invokeExpiryElapse(time.Unix(earliestExpiry-1, 0).UTC())
			ls.invokeExpiryElapse(time.Unix(earliestExpiry, 0).UTC(), expiryLeaderboardIds)
		})
	}
	ls.Unlock()

	ls.logger.Info("Leaderboard scheduler update", zap.Duration("end_active", endActiveDuration), zap.Int("end_active_count", len(endActiveLeaderboardIds)), zap.Duration("expiry", expiryDuration), zap.Int("expiry_count", len(expiryLeaderboardIds)))
}

func (ls *LocalLeaderboardScheduler) invokeEndActiveElapse(t time.Time) {
func (ls *LocalLeaderboardScheduler) invokeEndActiveElapse(t time.Time, ids []string) {
	if ls.active.Load() != 1 {
		// Not active.
		return
@@ -254,12 +265,20 @@ func (ls *LocalLeaderboardScheduler) invokeEndActiveElapse(t time.Time) {
		return
	}

	// Immediately schedule the next invocation to avoid any gaps caused by time spent processing below.
	ls.Update()

	ts := t.Unix()
	ls.Lock()
	ids := ls.nearEndActiveIds
	if ls.lastEnd != 0 && ls.lastEnd >= ts {
		// Avoid running duplicate or delayed scheduling.
		ls.Unlock()
		return
	}
	ls.lastEnd = ts
	ls.Unlock()

	// Immediately schedule the next invocation to avoid any gaps caused by time spent processing below.
	ls.Update()
	ls.logger.Info("Leaderboard scheduler end active", zap.Int("count", len(ids)))

	// Process the current set of tournament ends.
	for _, id := range ids {
@@ -284,7 +303,7 @@ WHERE id = $1`
	}
}

func (ls *LocalLeaderboardScheduler) invokeExpiryElapse(t time.Time) {
func (ls *LocalLeaderboardScheduler) invokeExpiryElapse(t time.Time, ids []string) {
	if ls.active.Load() != 1 {
		// Not active.
		return
@@ -293,14 +312,23 @@ func (ls *LocalLeaderboardScheduler) invokeExpiryElapse(t time.Time) {
	fnLeaderboardReset := ls.runtime.LeaderboardReset()
	fnTournamentReset := ls.runtime.TournamentReset()

	ls.Lock()
	ids := ls.nearExpiryIds
	ls.Unlock()
	ts := t.Unix()

	// Immediately schedule the next invocation to avoid any gaps caused by time spent processing below.
	ls.rankCache.TrimExpired(t.Unix())
	ls.rankCache.TrimExpired(ts)
	ls.Update()

	ls.Lock()
	if ls.lastExpiry != 0 && ls.lastExpiry >= ts {
		// Avoid running duplicate or delayed scheduling.
		ls.Unlock()
		return
	}
	ls.lastExpiry = ts
	ls.Unlock()

	ls.logger.Info("Leaderboard scheduler expiry reset", zap.Int("count", len(ids)))

	// Process the current set of leaderboard and tournament resets.
	for _, id := range ids {
		leaderboardOrTournament := ls.cache.Get(id)