Commit 2a6cdf9c authored by Fernando Takagi's avatar Fernando Takagi
Browse files

Add tournament termination function and guarding against race conditions on End callback

parent f4490d90
Loading
Loading
Loading
Loading
+23 −0
Original line number Diff line number Diff line
@@ -84,6 +84,29 @@ func TournamentDelete(ctx context.Context, cache LeaderboardCache, rankCache Lea
	return nil
}

func TournamentTerminate(ctx context.Context, cache LeaderboardCache, rankCache LeaderboardRankCache, scheduler LeaderboardScheduler, leaderboardId string) error {
	ts := time.Now().Unix()
	tMinusOne := time.Unix(ts-1, 0).UTC()

	if err := TournamentDelete(ctx, cache, rankCache, scheduler, leaderboardId); err != nil {
		return err
	}
	leaderboard := cache.Get(leaderboardId)
	if leaderboard == nil || !leaderboard.IsTournament() {
		// If it does not exist treat it as success.
		return nil
	}

	if leaderboard.EndTime > 0 && leaderboard.EndTime < ts {
		// Tournament has ended, callback has already run.
		return nil
	}

	scheduler.QueueCallback(&LeaderboardSchedulerCallback{id: leaderboardId, leaderboard: leaderboard, ts: ts, t: tMinusOne, callbackType: End})

	return nil
}

func TournamentAddAttempt(ctx context.Context, logger *zap.Logger, db *sql.DB, cache LeaderboardCache, leaderboardId string, owner string, count int) error {
	if count == 0 {
		// No-op.
+19 −17
Original line number Diff line number Diff line
@@ -43,6 +43,7 @@ const (
)

type Leaderboard struct {
	sync.Mutex         // Guarding EndCallbackInvoked
	Id                 string
	Authoritative      bool
	SortOrder          int
@@ -60,6 +61,7 @@ type Leaderboard struct {
	MaxNumScore        int
	Title              string
	StartTime          int64
	EndCallbackInvoked bool
}

func (l *Leaderboard) IsTournament() bool {
+40 −7
Original line number Diff line number Diff line
@@ -26,11 +26,19 @@ import (
	"go.uber.org/zap"
)

type CallbackType int

const (
	Expiry CallbackType = iota // Reset
	End
)

type LeaderboardSchedulerCallback struct {
	id           string
	leaderboard  *Leaderboard
	ts           int64
	t            time.Time
	callbackType CallbackType
}

type LeaderboardScheduler interface {
@@ -39,6 +47,7 @@ type LeaderboardScheduler interface {
	Resume()
	Stop()
	Update()
	QueueCallback(callback *LeaderboardSchedulerCallback)
}

type LocalLeaderboardScheduler struct {
@@ -279,6 +288,10 @@ func (ls *LocalLeaderboardScheduler) Update() {
	ls.logger.Info("Leaderboard scheduler update", zap.Duration("end_active", endActiveDuration), zap.Int("end_active_count", len(endActiveLeaderboardIds)), zap.Duration("expiry", expiryDuration), zap.Int("expiry_count", len(expiryLeaderboardIds)))
}

func (ls *LocalLeaderboardScheduler) QueueCallback(c *LeaderboardSchedulerCallback) {
	ls.queue <- c
}

func (ls *LocalLeaderboardScheduler) queueEndActiveElapse(t time.Time, ids []string) {
	if ls.active.Load() != 1 {
		// Not active.
@@ -311,8 +324,15 @@ func (ls *LocalLeaderboardScheduler) queueEndActiveElapse(t time.Time, ids []str
		// Process the current set of tournament ends.
		for _, id := range ids {
			currentId := id

			leaderboard := ls.cache.Get(id)
			if leaderboard == nil {
				// Cached entry was deleted before it reached the scheduler here.
				continue
			}

			// Will block if the queue is full.
			ls.queue <- &LeaderboardSchedulerCallback{id: currentId, ts: ts, t: tMinusOne}
			ls.queue <- &LeaderboardSchedulerCallback{id: currentId, leaderboard: leaderboard, ts: ts, t: tMinusOne, callbackType: End}
		}
	}()
}
@@ -357,7 +377,7 @@ func (ls *LocalLeaderboardScheduler) queueExpiryElapse(t time.Time, ids []string
				continue
			}
			// Will block if queue is full.
			ls.queue <- &LeaderboardSchedulerCallback{id: currentId, leaderboard: leaderboard, ts: ts, t: tMinusOne}
			ls.queue <- &LeaderboardSchedulerCallback{id: currentId, leaderboard: leaderboard, ts: ts, t: tMinusOne, callbackType: Expiry}
		}
	}()
}
@@ -368,7 +388,7 @@ func (ls *LocalLeaderboardScheduler) invokeCallback() {
		case <-ls.ctx.Done():
			return
		case callback := <-ls.queue:
			if callback.leaderboard != nil {
			if callback.callbackType == Expiry {
				if callback.leaderboard.IsTournament() {
					// Tournament, fetch most up to date info for size etc.
					// Some processing is needed even if there is no runtime callback registered for tournament reset.
@@ -412,6 +432,11 @@ WHERE id = $1`
					}
				}
			} else {
				// Skip processing if there is no tournament end callback registered.
				if ls.fnTournamentEnd == nil {
					continue
				}

				query := `SELECT
id, sort_order, operator, reset_schedule, metadata, create_time,
category, description, duration, end_time, max_size, max_num_score, title, size, start_time
@@ -427,10 +452,18 @@ WHERE id = $1`
					continue
				}

				callback.leaderboard.Lock()
				if callback.leaderboard.EndCallbackInvoked {
					callback.leaderboard.Unlock()
					// already activated once
					continue
				}
				// fnTournamentEnd cannot be nil here, if it was the callback would not be queued at all.
				if err := ls.fnTournamentEnd(ls.ctx, tournament, int64(tournament.EndActive), int64(tournament.NextReset)); err != nil {
					ls.logger.Warn("Failed to invoke tournament end callback", zap.Error(err))
				}
				callback.leaderboard.EndCallbackInvoked = true
				callback.leaderboard.Unlock()
			}
		}
	}