Commit 4b10efdc authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Handle query and parameter resets on wallet update retries.

parent e67097c9
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -24,6 +24,9 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr
- Expired tournaments will no longer be listed nor any records will be returned.
- Unlink device identifiers on console user account details page.
- Add missing index drop on migrate down.
- Handle query and parameter resets on wallet update retries.
- Reset list of friend IDs in Facebook import when retrying the operation.
- Reset notifications in friend add when retrying the operation.

## [2.5.1] - 2019-05-03
### Changed
+4 −1
Original line number Diff line number Diff line
@@ -633,7 +633,7 @@ func importFacebookFriends(ctx context.Context, logger *zap.Logger, db *sql.DB,
		return nil
	}

	friendUserIDs := make([]uuid.UUID, 0)
	var friendUserIDs []uuid.UUID

	tx, err := db.BeginTx(ctx, nil)
	if err != nil {
@@ -731,6 +731,9 @@ func importFacebookFriends(ctx context.Context, logger *zap.Logger, db *sql.DB,
		}
		_ = rows.Close()

		// If the transaction is retried ensure we wipe any friend user IDs that may have been recorded by previous attempts.
		friendUserIDs = make([]uuid.UUID, 0)

		for _, friendID := range possibleFriendIDs {
			position := fmt.Sprintf("%v", time.Now().UTC().UnixNano())

+4 −1
Original line number Diff line number Diff line
@@ -150,7 +150,7 @@ func AddFriends(ctx context.Context, logger *zap.Logger, db *sql.DB, messageRout
		uniqueFriendIDs[fid] = struct{}{}
	}

	notificationToSend := make(map[string]bool)
	var notificationToSend map[string]bool

	tx, err := db.BeginTx(ctx, nil)
	if err != nil {
@@ -159,6 +159,9 @@ func AddFriends(ctx context.Context, logger *zap.Logger, db *sql.DB, messageRout
	}

	if err = ExecuteInTx(ctx, tx, func() error {
		// If the transaction is retried ensure we wipe any notifications that may have been prepared by previous attempts.
		notificationToSend = make(map[string]bool)

		for id := range uniqueFriendIDs {
			isFriendAccept, addFriendErr := addFriend(ctx, logger, tx, userID, id)
			if addFriendErr == nil {
+1 −0
Original line number Diff line number Diff line
@@ -461,6 +461,7 @@ func StorageWriteObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, au
	}

	if err = ExecuteInTx(ctx, tx, func() error {
		// If the transaction is retried ensure we wipe any acks that may have been prepared by previous attempts.
		acks = make([]*api.StorageObjectAck, 0, ops.Len())

		for _, op := range ops {
+10 −10
Original line number Diff line number Diff line
@@ -75,14 +75,14 @@ func UpdateWallets(ctx context.Context, logger *zap.Logger, db *sql.DB, updates
		return nil
	}

	params := make([]interface{}, 0, len(updates))
	statements := make([]string, 0, len(updates))
	initialParams := make([]interface{}, 0, len(updates))
	initialStatements := make([]string, 0, len(updates))
	for _, update := range updates {
		params = append(params, update.UserID)
		statements = append(statements, "$"+strconv.Itoa(len(params))+"::UUID")
		initialParams = append(initialParams, update.UserID)
		initialStatements = append(initialStatements, "$"+strconv.Itoa(len(initialParams))+"::UUID")
	}

	query := "SELECT id, wallet FROM users WHERE id IN (" + strings.Join(statements, ",") + ")"
	initialQuery := "SELECT id, wallet FROM users WHERE id IN (" + strings.Join(initialStatements, ",") + ")"

	tx, err := db.BeginTx(ctx, nil)
	if err != nil {
@@ -93,7 +93,7 @@ func UpdateWallets(ctx context.Context, logger *zap.Logger, db *sql.DB, updates
	if err = ExecuteInTx(ctx, tx, func() error {
		// Select the wallets from the DB and decode them.
		wallets := make(map[string]map[string]interface{}, len(updates))
		rows, err := tx.QueryContext(ctx, query, params...)
		rows, err := tx.QueryContext(ctx, initialQuery, initialParams...)
		if err != nil {
			logger.Debug("Error retrieving user wallets.", zap.Error(err))
			return err
@@ -123,6 +123,8 @@ func UpdateWallets(ctx context.Context, logger *zap.Logger, db *sql.DB, updates
		// Prepare the set of wallet updates and ledger updates.
		updatedWallets := make(map[string][]byte, len(updates))
		updateOrder := make([]string, 0, len(updates))
		var statements []string
		var params []interface{}
		if updateLedger {
			statements = make([]string, 0, len(updates))
			params = make([]interface{}, 0, len(updates)*4)
@@ -165,7 +167,6 @@ func UpdateWallets(ctx context.Context, logger *zap.Logger, db *sql.DB, updates
			sort.Strings(updateOrder)

			// Write the updated wallets.
			query = "UPDATE users SET update_time = now(), wallet = $2 WHERE id = $1"
			for _, userID := range updateOrder {
				updatedWallet, ok := updatedWallets[userID]
				if !ok {
@@ -173,7 +174,7 @@ func UpdateWallets(ctx context.Context, logger *zap.Logger, db *sql.DB, updates
					logger.Warn("Missing wallet update for user.", zap.String("user_id", userID))
					continue
				}
				_, err = tx.ExecContext(ctx, query, userID, updatedWallet)
				_, err = tx.ExecContext(ctx, "UPDATE users SET update_time = now(), wallet = $2 WHERE id = $1", userID, updatedWallet)
				if err != nil {
					logger.Debug("Error writing user wallet.", zap.String("user_id", userID), zap.Error(err))
					return err
@@ -182,8 +183,7 @@ func UpdateWallets(ctx context.Context, logger *zap.Logger, db *sql.DB, updates

			// Write the ledger updates, if any.
			if updateLedger && (len(statements) > 0) {
				query = "INSERT INTO wallet_ledger (id, user_id, changeset, metadata) VALUES " + strings.Join(statements, ", ")
				_, err = tx.ExecContext(ctx, query, params...)
				_, err = tx.ExecContext(ctx, "INSERT INTO wallet_ledger (id, user_id, changeset, metadata) VALUES "+strings.Join(statements, ", "), params...)
				if err != nil {
					logger.Debug("Error writing user wallet ledgers.", zap.Error(err))
					return err