Unverified Commit 75dee504 authored by Simon Esposito's avatar Simon Esposito Committed by GitHub
Browse files

Ensure concurrent leaderboard creation is idempotent (#857)

Upgrade db migrate lib and use embed data sources.

Resolves #821
parent 6d236b13
Loading
Loading
Loading
Loading
+4 −5
Original line number Diff line number Diff line
@@ -18,7 +18,7 @@ require (
	github.com/jackc/pgerrcode v0.0.0-20201024163028-a0d42d470451
	github.com/jackc/pgtype v1.8.1
	github.com/jackc/pgx/v4 v4.13.0
	github.com/rubenv/sql-migrate v0.0.0-20210408115534-a32ed26c37ea
	github.com/rubenv/sql-migrate v1.1.1
	github.com/stretchr/testify v1.7.0
	github.com/uber-go/tally/v4 v4.1.1
	go.uber.org/atomic v1.9.0
@@ -50,6 +50,7 @@ require (
	github.com/dlclark/regexp2 v1.4.1-0.20201116162257-a2a8dda75c91 // indirect
	github.com/felixge/httpsnoop v1.0.1 // indirect
	github.com/ghodss/yaml v1.0.0 // indirect
	github.com/go-gorp/gorp/v3 v3.0.2 // indirect
	github.com/go-sourcemap/sourcemap v2.1.3+incompatible // indirect
	github.com/golang/glog v1.0.0 // indirect
	github.com/golang/protobuf v1.5.2 // indirect
@@ -68,11 +69,9 @@ require (
	github.com/prometheus/common v0.29.0 // indirect
	github.com/prometheus/procfs v0.6.0 // indirect
	github.com/twmb/murmur3 v1.1.6 // indirect
	github.com/ziutek/mymysql v1.5.4 // indirect
	go.uber.org/multierr v1.6.0 // indirect
	golang.org/x/net v0.0.0-20210525063256-abc453219eb5 // indirect
	golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 // indirect
	golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d // indirect
	golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e // indirect
	golang.org/x/text v0.3.7 // indirect
	gopkg.in/gorp.v1 v1.7.2 // indirect
	gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
+174 −53

File changed.

Preview size limit exceeded, changes collapsed.

+1 −0
Original line number Diff line number Diff line
@@ -75,6 +75,7 @@ func main() {
			return
		case "migrate":
			migrate.Parse(os.Args[2:], tmpLogger)
			return
		case "check":
			// Parse any command line args to look up runtime path.
			// Use full config structure even if not all of its options are available in this command.
+7 −42
Original line number Diff line number Diff line
@@ -20,11 +20,9 @@ import (
	"errors"
	"flag"
	"fmt"
	"io/ioutil"
	"math"
	"net/url"
	"os"
	"path"
	"strings"
	"time"

@@ -58,7 +56,7 @@ type migrationService struct {
	dbAddress    string
	limit        int
	loggerFormat server.LoggingFormat
	migrations   *migrate.AssetMigrationSource
	migrations   *migrate.EmbedFileSystemMigrationSource
	db           *sql.DB
}

@@ -66,25 +64,9 @@ func StartupCheck(logger *zap.Logger, db *sql.DB) {
	migrate.SetTable(migrationTable)
	migrate.SetIgnoreUnknown(true)

	ms := &migrate.AssetMigrationSource{
		Asset: func(_path string) ([]byte, error) {
			f, err := sqlMigrateFS.Open(path.Join("sql", _path))
			if err != nil {
				return nil, err
			}
			return ioutil.ReadAll(f)
		},
		AssetDir: func(_path string) ([]string, error) {
			entries, err := sqlMigrateFS.ReadDir(path.Join("sql", _path))
			if err != nil {
				return nil, err
			}
			files := make([]string, 0, len(entries))
			for _, dirEntry := range entries {
				files = append(files, dirEntry.Name())
			}
			return files, nil
		},
	ms := &migrate.EmbedFileSystemMigrationSource{
		FileSystem: sqlMigrateFS,
		Root:       "sql",
	}

	migrations, err := ms.FindMigrations()
@@ -113,25 +95,9 @@ func Parse(args []string, tmpLogger *zap.Logger) {
	migrate.SetTable(migrationTable)
	migrate.SetIgnoreUnknown(true)
	ms := &migrationService{
		migrations: &migrate.AssetMigrationSource{
			Asset: func(_path string) ([]byte, error) {
				f, err := sqlMigrateFS.Open(path.Join("sql", _path))
				if err != nil {
					return nil, err
				}
				return ioutil.ReadAll(f)
			},
			AssetDir: func(_path string) ([]string, error) {
				entries, err := sqlMigrateFS.ReadDir(path.Join("sql", _path))
				if err != nil {
					return nil, err
				}
				files := make([]string, 0, len(entries))
				for _, dirEntry := range entries {
					files = append(files, dirEntry.Name())
				}
				return files, nil
			},
		migrations: &migrate.EmbedFileSystemMigrationSource{
			FileSystem: sqlMigrateFS,
			Root:       "sql",
		},
	}

@@ -237,7 +203,6 @@ func Parse(args []string, tmpLogger *zap.Logger) {

	exec(logger)
	db.Close()
	os.Exit(0)
}

func (ms *migrationService) up(logger *zap.Logger) {
+20 −2
Original line number Diff line number Diff line
@@ -18,7 +18,9 @@ import (
	"context"
	"database/sql"
	"encoding/json"
	"errors"
	"fmt"
	"github.com/jackc/pgconn"
	"log"
	"sort"
	"strconv"
@@ -331,9 +333,25 @@ func (l *LocalLeaderboardCache) Create(ctx context.Context, id string, authorita
	var createTime pgtype.Timestamptz
	err = l.db.QueryRowContext(ctx, query, params...).Scan(&createTime)
	if err != nil {
		var pgErr *pgconn.PgError
		if errors.As(err, &pgErr) && pgErr.Code == dbErrorUniqueViolation {
			// Concurrent attempt at creating the leaderboard, to keep idempotency query the existing leaderboard data.
			if err = l.db.QueryRowContext(ctx, "SELECT authoritative, sort_order, operator, COALESCE(reset_schedule, ''), metadata, create_time FROM leaderboard WHERE id = $1", id).Scan(&authoritative, &sortOrder, &operator, &resetSchedule, &metadata, &createTime); err != nil {
				l.logger.Error("Error retrieving leaderboard", zap.Error(err))
				return nil, err
			}
			if resetSchedule != "" {
				expr, err = cronexpr.Parse(resetSchedule)
				if err != nil {
					l.logger.Error("Error parsing leaderboard reset schedule", zap.Error(err))
					return nil, err
				}
			}
		} else {
			l.logger.Error("Error creating leaderboard", zap.Error(err))
			return nil, err
		}
	}

	// Then add to cache.
	leaderboard := &Leaderboard{
Loading