Commit 2940ddb5 authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Better batching of wallet updates.

parent 171587a5
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -7,9 +7,13 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr
### Added
- Make authoritative match join attempt marker deadline configurable.

### Changed
- Better batching of wallet updates.

### Fixed
- Correctly register deferred messages sent from authoritative matches.
- Correctly cancel Lua authoritative match context when match initialization fails.
- Improve decoding of Steam authentication responses to correctly unwrap payload.

## [2.3.0] - 2018-12-31
### Added
+67 −20
Original line number Diff line number Diff line
@@ -19,6 +19,8 @@ import (
	"database/sql"
	"encoding/json"
	"fmt"
	"strconv"
	"strings"

	"github.com/cockroachdb/cockroach-go/crdb"
	"github.com/gofrs/uuid"
@@ -74,6 +76,15 @@ func UpdateWallets(ctx context.Context, logger *zap.Logger, db *sql.DB, updates
		return nil
	}

	params := make([]interface{}, 0, len(updates))
	statements := make([]string, 0, len(updates))
	for _, update := range updates {
		params = append(params, update.UserID)
		statements = append(statements, "$"+strconv.Itoa(len(params))+"::UUID")
	}

	query := "SELECT id, wallet FROM users WHERE id IN (" + strings.Join(statements, ",") + ")"

	tx, err := db.BeginTx(ctx, nil)
	if err != nil {
		logger.Error("Could not begin database transaction.", zap.Error(err))
@@ -81,41 +92,61 @@ func UpdateWallets(ctx context.Context, logger *zap.Logger, db *sql.DB, updates
	}

	if err = crdb.ExecuteInTx(ctx, tx, func() error {
		for _, update := range updates {
		// Select the wallets from the DB and decode them.
		wallets := make(map[string]map[string]interface{}, len(updates))
		rows, err := tx.QueryContext(ctx, query, params...)
		if err != nil {
			logger.Debug("Error retrieving user wallets.", zap.Error(err))
			return err
		}
		for rows.Next() {
			var id string
			var wallet sql.NullString
			query := "SELECT wallet FROM users WHERE id = $1::UUID"
			err := tx.QueryRowContext(ctx, query, update.UserID).Scan(&wallet)
			err = rows.Scan(&id, &wallet)
			if err != nil {
				logger.Debug("Error retrieving user wallet.", zap.String("user_id", update.UserID.String()), zap.Error(err))
				rows.Close()
				logger.Debug("Error reading user wallets.", zap.Error(err))
				return err
			}

			var walletMap map[string]interface{}
			err = json.Unmarshal([]byte(wallet.String), &walletMap)
			if err != nil {
				logger.Debug("Error converting current user wallet.", zap.String("user_id", update.UserID.String()), zap.Error(err))
				rows.Close()
				logger.Debug("Error converting user wallet.", zap.String("user_id", id), zap.Error(err))
				return err
			}

			wallets[id] = walletMap
		}
		rows.Close()

		// Prepare the set of wallet updates and ledger updates.
		updatedWallets := make(map[string][]byte, len(updates))
		if updateLedger {
			statements = make([]string, 0, len(updates))
			params = make([]interface{}, 0, len(updates)*4)
		}
		for _, update := range updates {
			userID := update.UserID.String()
			walletMap, ok := wallets[userID]
			if !ok {
				// Wallet update for a user that does not exist. Skip it.
				continue
			}
			walletMap, err = applyWalletUpdate(walletMap, update.Changeset, "")
			if err != nil {
				// Programmer error, no need to log.
				return err
			}

			walletData, err := json.Marshal(walletMap)
			if err != nil {
				logger.Debug("Error converting new user wallet.", zap.String("user_id", update.UserID.String()), zap.Error(err))
				return err
			}

			query = "UPDATE users SET update_time = now(), wallet = $2 WHERE id = $1::UUID"
			_, err = tx.ExecContext(ctx, query, update.UserID, string(walletData))
			if err != nil {
				logger.Debug("Error writing user wallet.", zap.String("user_id", update.UserID.String()), zap.Error(err))
				logger.Debug("Error converting new user wallet.", zap.String("user_id", userID), zap.Error(err))
				return err
			}
			updatedWallets[userID] = walletData

			// Prepare ledger updates if needed.
			if updateLedger {
				changesetData, err := json.Marshal(update.Changeset)
				if err != nil {
@@ -123,13 +154,29 @@ func UpdateWallets(ctx context.Context, logger *zap.Logger, db *sql.DB, updates
					return err
				}

				query = "INSERT INTO wallet_ledger (id, user_id, changeset, metadata) VALUES ($1::UUID, $2::UUID, $3, $4)"
				_, err = tx.ExecContext(ctx, query, uuid.Must(uuid.NewV4()), update.UserID, changesetData, update.Metadata)
				params = append(params, uuid.Must(uuid.NewV4()), userID, changesetData, update.Metadata)
				statements = append(statements, fmt.Sprintf("($%v::UUID, $%v, $%v, $%v)", strconv.Itoa(len(params)-3), strconv.Itoa(len(params)-2), strconv.Itoa(len(params)-1), strconv.Itoa(len(params))))
			}
		}

		// Write the updated wallets.
		query = "UPDATE users SET update_time = now(), wallet = $2 WHERE id = $1"
		for userID, updatedWallet := range updatedWallets {
			_, err = tx.ExecContext(ctx, query, userID, updatedWallet)
			if err != nil {
					logger.Debug("Error writing user wallet ledger.", zap.String("user_id", update.UserID.String()), zap.Error(err))
				logger.Debug("Error writing user wallet.", zap.String("user_id", userID), zap.Error(err))
				return err
			}
		}

		// Write the ledger updates, if any.
		if updateLedger && (len(statements) > 0) {
			query = "INSERT INTO wallet_ledger (id, user_id, changeset, metadata) VALUES " + strings.Join(statements, ", ")
			_, err = tx.ExecContext(ctx, query, params...)
			if err != nil {
				logger.Debug("Error writing user wallet ledgers.", zap.Error(err))
				return err
			}
		}
		return nil
	}); err != nil {
+13 −4
Original line number Diff line number Diff line
@@ -16,6 +16,7 @@ package server

import (
	"bytes"
	"context"
	"github.com/golang/protobuf/proto"
	"sync"

@@ -161,33 +162,41 @@ type LocalTracker struct {
	jsonpbMarshaler    *jsonpb.Marshaler
	name               string
	eventsCh           chan *PresenceEvent
	stopCh             chan struct{}
	presencesByStream  map[uint8]map[PresenceStream]map[presenceCompact]PresenceMeta
	presencesBySession map[uuid.UUID]map[presenceCompact]PresenceMeta

	ctx         context.Context
	ctxCancelFn context.CancelFunc
}

func StartLocalTracker(logger *zap.Logger, config Config, sessionRegistry *SessionRegistry, jsonpbMarshaler *jsonpb.Marshaler) Tracker {
	ctx, ctxCancelFn := context.WithCancel(context.Background())

	t := &LocalTracker{
		logger:             logger,
		sessionRegistry:    sessionRegistry,
		jsonpbMarshaler:    jsonpbMarshaler,
		name:               config.GetName(),
		eventsCh:           make(chan *PresenceEvent, config.GetTracker().EventQueueSize),
		stopCh:             make(chan struct{}),
		presencesByStream:  make(map[uint8]map[PresenceStream]map[presenceCompact]PresenceMeta),
		presencesBySession: make(map[uuid.UUID]map[presenceCompact]PresenceMeta),

		ctx:         ctx,
		ctxCancelFn: ctxCancelFn,
	}

	go func() {
		// Asynchronously process and dispatch presence events.
		for {
			select {
			case <-t.stopCh:
			case <-t.ctx.Done():
				return
			case e := <-t.eventsCh:
				t.processEvent(e)
			}
		}
	}()

	return t
}

@@ -201,7 +210,7 @@ func (t *LocalTracker) SetMatchLeaveListener(f func(id uuid.UUID, leaves []*Matc

func (t *LocalTracker) Stop() {
	// No need to explicitly clean up the events channel, just let the application exit.
	close(t.stopCh)
	t.ctxCancelFn()
}

func (t *LocalTracker) Track(sessionID uuid.UUID, stream PresenceStream, userID uuid.UUID, meta PresenceMeta, allowIfFirstForSession bool) (bool, bool) {