From 2940ddb50e3adc0a63adc689bb4e8ada42b7bbe5 Mon Sep 17 00:00:00 2001 From: Andrei Mihu Date: Fri, 4 Jan 2019 15:41:29 +0000 Subject: [PATCH] Better batching of wallet updates. --- CHANGELOG.md | 4 ++ server/core_wallet.go | 87 +++++++++++++++++++++++++++++++++---------- server/tracker.go | 17 +++++++-- 3 files changed, 84 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 08b5601de..310eb21af 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,9 +7,13 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr ### Added - Make authoritative match join attempt marker deadline configurable. +### Changed +- Better batching of wallet updates. + ### Fixed - Correctly register deferred messages sent from authoritative matches. - Correctly cancel Lua authoritative match context when match initialization fails. +- Improve decoding of Steam authentication responses to correctly unwrap payload. ## [2.3.0] - 2018-12-31 ### Added diff --git a/server/core_wallet.go b/server/core_wallet.go index 2a0401a91..d52d6d5c6 100644 --- a/server/core_wallet.go +++ b/server/core_wallet.go @@ -19,6 +19,8 @@ import ( "database/sql" "encoding/json" "fmt" + "strconv" + "strings" "github.com/cockroachdb/cockroach-go/crdb" "github.com/gofrs/uuid" @@ -74,6 +76,15 @@ func UpdateWallets(ctx context.Context, logger *zap.Logger, db *sql.DB, updates return nil } + params := make([]interface{}, 0, len(updates)) + statements := make([]string, 0, len(updates)) + for _, update := range updates { + params = append(params, update.UserID) + statements = append(statements, "$"+strconv.Itoa(len(params))+"::UUID") + } + + query := "SELECT id, wallet FROM users WHERE id IN (" + strings.Join(statements, ",") + ")" + tx, err := db.BeginTx(ctx, nil) if err != nil { logger.Error("Could not begin database transaction.", zap.Error(err)) @@ -81,41 +92,61 @@ func UpdateWallets(ctx context.Context, logger *zap.Logger, db *sql.DB, updates } if err = crdb.ExecuteInTx(ctx, tx, func() error { - for _, update := range updates { + // Select the wallets from the DB and decode them. + wallets := make(map[string]map[string]interface{}, len(updates)) + rows, err := tx.QueryContext(ctx, query, params...) + if err != nil { + logger.Debug("Error retrieving user wallets.", zap.Error(err)) + return err + } + for rows.Next() { + var id string var wallet sql.NullString - query := "SELECT wallet FROM users WHERE id = $1::UUID" - err := tx.QueryRowContext(ctx, query, update.UserID).Scan(&wallet) + err = rows.Scan(&id, &wallet) if err != nil { - logger.Debug("Error retrieving user wallet.", zap.String("user_id", update.UserID.String()), zap.Error(err)) + rows.Close() + logger.Debug("Error reading user wallets.", zap.Error(err)) return err } var walletMap map[string]interface{} err = json.Unmarshal([]byte(wallet.String), &walletMap) if err != nil { - logger.Debug("Error converting current user wallet.", zap.String("user_id", update.UserID.String()), zap.Error(err)) + rows.Close() + logger.Debug("Error converting user wallet.", zap.String("user_id", id), zap.Error(err)) return err } + wallets[id] = walletMap + } + rows.Close() + + // Prepare the set of wallet updates and ledger updates. + updatedWallets := make(map[string][]byte, len(updates)) + if updateLedger { + statements = make([]string, 0, len(updates)) + params = make([]interface{}, 0, len(updates)*4) + } + for _, update := range updates { + userID := update.UserID.String() + walletMap, ok := wallets[userID] + if !ok { + // Wallet update for a user that does not exist. Skip it. + continue + } walletMap, err = applyWalletUpdate(walletMap, update.Changeset, "") if err != nil { // Programmer error, no need to log. return err } - walletData, err := json.Marshal(walletMap) if err != nil { - logger.Debug("Error converting new user wallet.", zap.String("user_id", update.UserID.String()), zap.Error(err)) - return err - } - - query = "UPDATE users SET update_time = now(), wallet = $2 WHERE id = $1::UUID" - _, err = tx.ExecContext(ctx, query, update.UserID, string(walletData)) - if err != nil { - logger.Debug("Error writing user wallet.", zap.String("user_id", update.UserID.String()), zap.Error(err)) + logger.Debug("Error converting new user wallet.", zap.String("user_id", userID), zap.Error(err)) return err } + updatedWallets[userID] = walletData + // Prepare ledger updates if needed. if updateLedger { changesetData, err := json.Marshal(update.Changeset) if err != nil { @@ -123,12 +154,28 @@ func UpdateWallets(ctx context.Context, logger *zap.Logger, db *sql.DB, updates return err } - query = "INSERT INTO wallet_ledger (id, user_id, changeset, metadata) VALUES ($1::UUID, $2::UUID, $3, $4)" - _, err = tx.ExecContext(ctx, query, uuid.Must(uuid.NewV4()), update.UserID, changesetData, update.Metadata) - if err != nil { - logger.Debug("Error writing user wallet ledger.", zap.String("user_id", update.UserID.String()), zap.Error(err)) - return err - } + params = append(params, uuid.Must(uuid.NewV4()), userID, changesetData, update.Metadata) + statements = append(statements, fmt.Sprintf("($%v::UUID, $%v, $%v, $%v)", strconv.Itoa(len(params)-3), strconv.Itoa(len(params)-2), strconv.Itoa(len(params)-1), strconv.Itoa(len(params)))) + } + } + + // Write the updated wallets. + query = "UPDATE users SET update_time = now(), wallet = $2 WHERE id = $1" + for userID, updatedWallet := range updatedWallets { + _, err = tx.ExecContext(ctx, query, userID, updatedWallet) + if err != nil { + logger.Debug("Error writing user wallet.", zap.String("user_id", userID), zap.Error(err)) + return err + } + } + + // Write the ledger updates, if any. + if updateLedger && (len(statements) > 0) { + query = "INSERT INTO wallet_ledger (id, user_id, changeset, metadata) VALUES " + strings.Join(statements, ", ") + _, err = tx.ExecContext(ctx, query, params...) + if err != nil { + logger.Debug("Error writing user wallet ledgers.", zap.Error(err)) + return err } } return nil diff --git a/server/tracker.go b/server/tracker.go index 816691c0f..264c589d1 100644 --- a/server/tracker.go +++ b/server/tracker.go @@ -16,6 +16,7 @@ package server import ( "bytes" + "context" "github.com/golang/protobuf/proto" "sync" @@ -161,33 +162,41 @@ type LocalTracker struct { jsonpbMarshaler *jsonpb.Marshaler name string eventsCh chan *PresenceEvent - stopCh chan struct{} presencesByStream map[uint8]map[PresenceStream]map[presenceCompact]PresenceMeta presencesBySession map[uuid.UUID]map[presenceCompact]PresenceMeta + + ctx context.Context + ctxCancelFn context.CancelFunc } func StartLocalTracker(logger *zap.Logger, config Config, sessionRegistry *SessionRegistry, jsonpbMarshaler *jsonpb.Marshaler) Tracker { + ctx, ctxCancelFn := context.WithCancel(context.Background()) + t := &LocalTracker{ logger: logger, sessionRegistry: sessionRegistry, jsonpbMarshaler: jsonpbMarshaler, name: config.GetName(), eventsCh: make(chan *PresenceEvent, config.GetTracker().EventQueueSize), - stopCh: make(chan struct{}), presencesByStream: make(map[uint8]map[PresenceStream]map[presenceCompact]PresenceMeta), presencesBySession: make(map[uuid.UUID]map[presenceCompact]PresenceMeta), + + ctx: ctx, + ctxCancelFn: ctxCancelFn, } + go func() { // Asynchronously process and dispatch presence events. for { select { - case <-t.stopCh: + case <-t.ctx.Done(): return case e := <-t.eventsCh: t.processEvent(e) } } }() + return t } @@ -201,7 +210,7 @@ func (t *LocalTracker) SetMatchLeaveListener(f func(id uuid.UUID, leaves []*Matc func (t *LocalTracker) Stop() { // No need to explicitly clean up the events channel, just let the application exit. - close(t.stopCh) + t.ctxCancelFn() } func (t *LocalTracker) Track(sessionID uuid.UUID, stream PresenceStream, userID uuid.UUID, meta PresenceMeta, allowIfFirstForSession bool) (bool, bool) { -- GitLab