Commit 2d466c60 authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Improve transaction retry behaviour. (#190)

parent 425c599a
Loading
Loading
Loading
Loading
+6 −1
Original line number Diff line number Diff line
@@ -18,6 +18,11 @@
  packages = ["quantile"]
  revision = "3a771d992973f24aa725d07868b467d1ddfceafb"

[[projects]]
  name = "github.com/cockroachdb/cockroach-go"
  packages = ["crdb"]
  revision = "59c0560478b705bf9bd12f9252224a0fad7c87df"

[[projects]]
  name = "github.com/davecgh/go-spew"
  packages = ["spew"]
@@ -392,6 +397,6 @@
[solve-meta]
  analyzer-name = "dep"
  analyzer-version = 1
  inputs-digest = "92727e62a718744c1d17c1a4e2c19e4979ccc4d50296bb91c11f5f40b28a6e34"
  inputs-digest = "2bfb07763f890649c350608b5262df72b50c5e2c8e8552d2d147d45b59c3e586"
  solver-name = "gps-cdcl"
  solver-version = 1
+4 −0
Original line number Diff line number Diff line
@@ -61,3 +61,7 @@
[[constraint]]
  name = "github.com/stretchr/testify"
  version = "~1.2.1"

[[constraint]]
  name = "github.com/cockroachdb/cockroach-go"
  revision = "59c0560478b705bf9bd12f9252224a0fad7c87df"
+18 −8
Original line number Diff line number Diff line
@@ -15,10 +15,10 @@
package server

import (
	"database/sql"
	"strconv"
	"strings"

	"github.com/cockroachdb/cockroach-go/crdb"
	"github.com/golang/protobuf/ptypes/empty"
	"github.com/heroiclabs/nakama/api"
	"github.com/lib/pq"
@@ -72,37 +72,47 @@ func (s *ApiServer) LinkDevice(ctx context.Context, in *api.AccountDevice) (*emp
		return nil, status.Error(codes.InvalidArgument, "Device ID invalid, must be 10-128 bytes.")
	}

	fnErr := Transact(s.logger, s.db, func(tx *sql.Tx) error {
	tx, err := s.db.Begin()
	if err != nil {
		s.logger.Error("Could not begin database transaction.", zap.Error(err))
		return nil, status.Error(codes.Internal, "Error linking Device ID.")
	}

	err = crdb.ExecuteInTx(ctx, tx, func() error {
		userID := ctx.Value(ctxUserIDKey{})

		var dbDeviceIdLinkedUser int64
		err := tx.QueryRow("SELECT COUNT(id) FROM user_device WHERE id = $1 AND user_id = $2 LIMIT 1", deviceID, userID).Scan(&dbDeviceIdLinkedUser)
		if err != nil {
			s.logger.Error("Cannot link device ID.", zap.Error(err), zap.Any("input", in))
			return status.Error(codes.Internal, "Error linking Device ID.")
			return err
		}

		if dbDeviceIdLinkedUser == 0 {
			_, err = tx.Exec("INSERT INTO user_device (id, user_id) VALUES ($1, $2)", deviceID, userID)
			if err != nil {
				if e, ok := err.(*pq.Error); ok && e.Code == dbErrorUniqueViolation {
					return status.Error(codes.AlreadyExists, "Device ID already in use.")
					return StatusError(codes.AlreadyExists, "Device ID already in use.", err)
				}
				s.logger.Error("Cannot link device ID.", zap.Error(err), zap.Any("input", in))
				return status.Error(codes.Internal, "Error linking Device ID.")
				return err
			}
		}

		_, err = tx.Exec("UPDATE users SET update_time = now() WHERE id = $1", userID)
		if err != nil {
			s.logger.Error("Cannot update users table while linking.", zap.Error(err), zap.Any("input", in))
			return status.Error(codes.Internal, "Error linking Device ID.")
			return err
		}
		return nil
	})

	if fnErr != nil {
		return nil, fnErr
	if err != nil {
		if e, ok := err.(*statusError); ok {
			return nil, e.Status()
		}
		s.logger.Error("Error in database transaction.", zap.Error(err))
		return nil, status.Error(codes.Internal, "Error linking Device ID.")
	}

	return &empty.Empty{}, nil
+18 −8
Original line number Diff line number Diff line
@@ -15,10 +15,10 @@
package server

import (
	"database/sql"
	"strconv"
	"strings"

	"github.com/cockroachdb/cockroach-go/crdb"
	"github.com/golang/protobuf/ptypes/empty"
	"github.com/heroiclabs/nakama/api"
	"go.uber.org/zap"
@@ -61,7 +61,13 @@ func (s *ApiServer) UnlinkDevice(ctx context.Context, in *api.AccountDevice) (*e
		return nil, status.Error(codes.InvalidArgument, "A device ID must be supplied.")
	}

	fnErr := Transact(s.logger, s.db, func(tx *sql.Tx) error {
	tx, err := s.db.Begin()
	if err != nil {
		s.logger.Error("Could not begin database transaction.", zap.Error(err))
		return nil, status.Error(codes.Internal, "Could not unlink Device ID.")
	}

	err = crdb.ExecuteInTx(ctx, tx, func() error {
		userID := ctx.Value(ctxUserIDKey{})

		query := `DELETE FROM user_device WHERE id = $2 AND user_id = $1
@@ -77,26 +83,30 @@ AND (EXISTS (SELECT id FROM users WHERE id = $1 AND
		res, err := tx.Exec(query, userID, in.Id)
		if err != nil {
			s.logger.Error("Could not unlink device ID.", zap.Error(err), zap.Any("input", in))
			return status.Error(codes.Internal, "Could not unlink Device ID.")
			return err
		}
		if count, _ := res.RowsAffected(); count == 0 {
			return status.Error(codes.PermissionDenied, "Cannot unlink last account identifier. Check profile exists and is not last link.")
			return StatusError(codes.PermissionDenied, "Cannot unlink last account identifier. Check profile exists and is not last link.", ErrRowsAffectedCount)
		}

		res, err = tx.Exec("UPDATE users SET update_time = now() WHERE id = $1", userID)
		if err != nil {
			s.logger.Error("Could not unlink device ID.", zap.Error(err), zap.Any("input", in))
			return status.Error(codes.Internal, "Could not unlink Device ID.")
			return err
		}
		if count, _ := res.RowsAffected(); count == 0 {
			return status.Error(codes.PermissionDenied, "Cannot unlink last account identifier. Check profile exists and is not last link.")
			return StatusError(codes.PermissionDenied, "Cannot unlink last account identifier. Check profile exists and is not last link.", ErrRowsAffectedCount)
		}

		return nil
	})

	if fnErr != nil {
		return nil, fnErr
	if err != nil {
		if e, ok := err.(*statusError); ok {
			return nil, e.Status()
		}
		s.logger.Error("Error in database transaction.", zap.Error(err))
		return nil, status.Error(codes.Internal, "Could not unlink device ID.")
	}

	return &empty.Empty{}, nil
+29 −10
Original line number Diff line number Diff line
@@ -24,6 +24,8 @@ import (

	"errors"

	"context"
	"github.com/cockroachdb/cockroach-go/crdb"
	"github.com/golang/protobuf/ptypes/timestamp"
	"github.com/heroiclabs/nakama/api"
	"github.com/heroiclabs/nakama/social"
@@ -142,7 +144,14 @@ func AuthenticateDevice(logger *zap.Logger, db *sql.DB, deviceID, username strin

	// Create a new account.
	userID := uuid.Must(uuid.NewV4()).String()
	fnErr := Transact(logger, db, func(tx *sql.Tx) error {

	tx, err := db.Begin()
	if err != nil {
		logger.Error("Could not begin database transaction.", zap.Error(err))
		return "", "", false, status.Error(codes.Internal, "Error finding or creating user account.")
	}

	err = crdb.ExecuteInTx(context.Background(), tx, func() error {
		query := `
INSERT INTO users (id, username, create_time, update_time)
SELECT $1 AS id,
@@ -159,35 +168,39 @@ WHERE NOT EXISTS
			if err == sql.ErrNoRows {
				// A concurrent write has inserted this device ID.
				logger.Debug("Did not insert new user as device ID already exists.", zap.Error(err), zap.String("deviceID", deviceID), zap.String("username", username), zap.Bool("create", create))
				return status.Error(codes.Internal, "Error finding or creating user account.")
				return StatusError(codes.Internal, "Error finding or creating user account.", err)
			} else if e, ok := err.(*pq.Error); ok && e.Code == dbErrorUniqueViolation && strings.Contains(e.Message, "users_username_key") {
				return status.Error(codes.AlreadyExists, "Username is already in use.")
				return StatusError(codes.AlreadyExists, "Username is already in use.", err)
			}
			logger.Error("Cannot find or create user with device ID.", zap.Error(err), zap.String("deviceID", deviceID), zap.String("username", username), zap.Bool("create", create))
			return status.Error(codes.Internal, "Error finding or creating user account.")
			return err
		}

		if rowsAffectedCount, _ := result.RowsAffected(); rowsAffectedCount != 1 {
			logger.Error("Did not insert new user.", zap.Int64("rows_affected", rowsAffectedCount))
			return status.Error(codes.Internal, "Error finding or creating user account.")
			return StatusError(codes.Internal, "Error finding or creating user account.", ErrRowsAffectedCount)
		}

		query = "INSERT INTO user_device (id, user_id) VALUES ($1, $2)"
		result, err = tx.Exec(query, deviceID, userID)
		if err != nil {
			logger.Error("Cannot add device ID.", zap.Error(err), zap.String("deviceID", deviceID), zap.String("username", username), zap.Bool("create", create))
			return status.Error(codes.Internal, "Error finding or creating user account.")
			return err
		}

		if rowsAffectedCount, _ := result.RowsAffected(); rowsAffectedCount != 1 {
			logger.Error("Did not insert new user.", zap.Int64("rows_affected", rowsAffectedCount))
			return status.Error(codes.Internal, "Error finding or creating user account.")
			return StatusError(codes.Internal, "Error finding or creating user account.", ErrRowsAffectedCount)
		}

		return nil
	})
	if fnErr != nil {
		return "", "", false, fnErr
	if err != nil {
		if e, ok := err.(*statusError); ok {
			return "", "", false, e.Status()
		}
		logger.Error("Error in database transaction.", zap.Error(err))
		return "", "", false, status.Error(codes.Internal, "Error finding or creating user account.")
	}

	return userID, username, true, nil
@@ -541,7 +554,13 @@ func importFacebookFriends(logger *zap.Logger, db *sql.DB, messageRouter Message
	position := time.Now().UTC().Unix()
	friendUserIDs := make([]uuid.UUID, 0)

	err = Transact(logger, db, func(tx *sql.Tx) error {
	tx, err := db.Begin()
	if err != nil {
		logger.Error("Could not begin database transaction.", zap.Error(err))
		return status.Error(codes.Internal, "Error importing Facebook friends.")
	}

	err = crdb.ExecuteInTx(context.Background(), tx, func() error {
		if reset {
			// Reset all friends for the current user, replacing them entirely with their Facebook friends.
			// Note: will NOT remove blocked users.
Loading