Unverified Commit 21e3cbda authored by Maxim Ivanov's avatar Maxim Ivanov Committed by GitHub
Browse files

ExecuteInTx PostgreSQL version (#1045)

* Enforce contract of ExecuteInTx at the API level

Previously ExecuteInTx accepted open transaction, but required
users never to execute any commands on it prior to calling
ExecuteInTx. This API change enforces this contract by making
ExecuteInTx to open transaction internally and pass it to the
callback func.

* Implement PG version of ExecuteInTx which does fewer roundtrips to the Server

PostgreSQL doesn't benefit from SAVEPOINT/ROLLBACK logic like CockroachDB
does. With this change Nakama checks server DB engine and enables CockroachDB
optimization only when necessary.

There are 2 behviour change in the PG version of ExecuteInTx:

- it retries on all "Class 40" (a.k.a retriable) codes, not just
  serialization error:

	40000 	transaction_rollback
	40002 	transaction_integrity_constraint_violation
	40001 	serialization_failure
	40003 	statement_completion_unknown
	40P01 	deadlock_detected

- It doesn't ignore COMMIT result code anymore
parent e08c03e1
Loading
Loading
Loading
Loading
+5 −10
Original line number Diff line number Diff line
@@ -21,6 +21,10 @@ import (
	"encoding/base64"
	"encoding/gob"
	"encoding/json"
	"regexp"
	"strconv"
	"strings"

	"github.com/gofrs/uuid"
	"github.com/heroiclabs/nakama-common/api"
	"github.com/heroiclabs/nakama/v3/console"
@@ -31,9 +35,6 @@ import (
	"google.golang.org/grpc/status"
	"google.golang.org/protobuf/types/known/emptypb"
	"google.golang.org/protobuf/types/known/timestamppb"
	"regexp"
	"strconv"
	"strings"
)

var validTrigramFilterRegex = regexp.MustCompile("^%?[^%]{3,}%?$")
@@ -692,13 +693,7 @@ func (s *ConsoleServer) UpdateAccount(ctx context.Context, in *console.UpdateAcc
		return &emptypb.Empty{}, nil
	}

	tx, err := s.db.BeginTx(ctx, nil)
	if err != nil {
		s.logger.Error("Could not begin database transaction.", zap.Error(err))
		return nil, status.Error(codes.Internal, "An error occurred while trying to update the user.")
	}

	if err = ExecuteInTx(ctx, tx, func() error {
	if err = ExecuteInTx(ctx, s.db, func(tx *sql.Tx) error {
		for oldDeviceID, newDeviceID := range in.DeviceIds {
			if newDeviceID == "" {
				query := `DELETE FROM user_device WHERE id = $2 AND user_id = $1
+2 −14
Original line number Diff line number Diff line
@@ -317,13 +317,7 @@ func (s *ConsoleServer) DemoteGroupMember(ctx context.Context, in *console.Updat
		var message *api.ChannelMessage
		ts := time.Now().Unix()

		tx, err := db.BeginTx(ctx, nil)
		if err != nil {
			logger.Error("Could not begin database transaction.", zap.Error(err))
			return err
		}

		if err := ExecuteInTx(ctx, tx, func() error {
		if err := ExecuteInTx(ctx, db, func(tx *sql.Tx) error {
			query := ""
			if myState == 0 {
				// Ensure we aren't removing the last superadmin when deleting authoritatively.
@@ -463,13 +457,7 @@ func (s *ConsoleServer) PromoteGroupMember(ctx context.Context, in *console.Upda
		var message *api.ChannelMessage
		ts := time.Now().Unix()

		tx, err := db.BeginTx(ctx, nil)
		if err != nil {
			logger.Error("Could not begin database transaction.", zap.Error(err))
			return err
		}

		if err := ExecuteInTx(ctx, tx, func() error {
		if err := ExecuteInTx(ctx, db, func(tx *sql.Tx) error {
			if uid == caller {
				return errors.New("cannot promote self")
			}
+2 −7
Original line number Diff line number Diff line
@@ -16,6 +16,7 @@ package server

import (
	"context"
	"database/sql"

	"github.com/gofrs/uuid"
	"github.com/heroiclabs/nakama/v3/console"
@@ -96,13 +97,7 @@ func (s *ConsoleServer) UnlinkDevice(ctx context.Context, in *console.UnlinkDevi
		return nil, status.Error(codes.InvalidArgument, "Requires a valid device ID.")
	}

	tx, err := s.db.BeginTx(ctx, nil)
	if err != nil {
		s.logger.Error("Could not begin database transaction.", zap.Error(err))
		return nil, status.Error(codes.Internal, "Could not unlink Device ID.")
	}

	err = ExecuteInTx(ctx, tx, func() error {
	err = ExecuteInTx(ctx, s.db, func(tx *sql.Tx) error {
		query := `DELETE FROM user_device WHERE id = $2 AND user_id = $1
AND (EXISTS (SELECT id FROM users WHERE id = $1 AND
    (apple_id IS NOT NULL
+4 −16
Original line number Diff line number Diff line
@@ -243,13 +243,7 @@ WHERE u.id IN (` + strings.Join(statements, ",") + `)`
}

func UpdateAccounts(ctx context.Context, logger *zap.Logger, db *sql.DB, updates []*accountUpdate) error {
	tx, err := db.BeginTx(ctx, nil)
	if err != nil {
		logger.Error("Could not begin database transaction.", zap.Error(err))
		return err
	}

	if err = ExecuteInTx(ctx, tx, func() error {
	if err := ExecuteInTx(ctx, db, func(tx *sql.Tx) error {
		updateErr := updateAccounts(ctx, logger, tx, updates)
		if updateErr != nil {
			return updateErr
@@ -473,14 +467,8 @@ func ExportAccount(ctx context.Context, logger *zap.Logger, db *sql.DB, userID u
func DeleteAccount(ctx context.Context, logger *zap.Logger, db *sql.DB, config Config, leaderboardRankCache LeaderboardRankCache, sessionRegistry SessionRegistry, sessionCache SessionCache, tracker Tracker, userID uuid.UUID, recorded bool) error {
	ts := time.Now().UTC().Unix()

	tx, err := db.BeginTx(ctx, nil)
	if err != nil {
		logger.Error("Could not begin database transaction.", zap.Error(err))
		return err
	}

	var deleted bool
	if err := ExecuteInTx(ctx, tx, func() error {
	if err := ExecuteInTx(ctx, db, func(tx *sql.Tx) error {
		count, err := DeleteUser(ctx, tx, userID)
		if err != nil {
			logger.Debug("Could not delete user", zap.Error(err), zap.String("user_id", userID.String()))
@@ -520,11 +508,11 @@ func DeleteAccount(ctx context.Context, logger *zap.Logger, db *sql.DB, config C

	if deleted {
		// Logout and disconnect.
		if err = SessionLogout(config, sessionCache, userID, "", ""); err != nil {
		if err := SessionLogout(config, sessionCache, userID, "", ""); err != nil {
			return err
		}
		for _, presence := range tracker.ListPresenceIDByStream(PresenceStream{Mode: StreamModeNotifications, Subject: userID}) {
			if err = sessionRegistry.Disconnect(ctx, presence.SessionID, false); err != nil {
			if err := sessionRegistry.Disconnect(ctx, presence.SessionID, false); err != nil {
				return err
			}
		}
+3 −21
Original line number Diff line number Diff line
@@ -225,13 +225,7 @@ func AuthenticateDevice(ctx context.Context, logger *zap.Logger, db *sql.DB, dev
	// Create a new account.
	userID := uuid.Must(uuid.NewV4()).String()

	tx, err := db.BeginTx(ctx, nil)
	if err != nil {
		logger.Error("Could not begin database transaction.", zap.Error(err))
		return "", "", false, status.Error(codes.Internal, "Error finding or creating user account.")
	}

	err = ExecuteInTx(ctx, tx, func() error {
	err = ExecuteInTx(ctx, db, func(tx *sql.Tx) error {
		query := `
INSERT INTO users (id, username, create_time, update_time)
SELECT $1 AS id,
@@ -848,13 +842,7 @@ func importSteamFriends(ctx context.Context, logger *zap.Logger, db *sql.DB, mes
	}

	var friendUserIDs []uuid.UUID
	tx, err := db.BeginTx(ctx, nil)
	if err != nil {
		logger.Error("Could not begin database transaction.", zap.Error(err))
		return status.Error(codes.Internal, "Error importing Steam friends.")
	}

	err = ExecuteInTx(ctx, tx, func() error {
	err = ExecuteInTx(ctx, db, func(tx *sql.Tx) error {
		if reset {
			if err := resetUserFriends(ctx, tx, userID); err != nil {
				logger.Error("Could not reset user friends", zap.Error(err))
@@ -930,13 +918,7 @@ func importFacebookFriends(ctx context.Context, logger *zap.Logger, db *sql.DB,
	}

	var friendUserIDs []uuid.UUID
	tx, err := db.BeginTx(ctx, nil)
	if err != nil {
		logger.Error("Could not begin database transaction.", zap.Error(err))
		return status.Error(codes.Internal, "Error importing Facebook friends.")
	}

	err = ExecuteInTx(ctx, tx, func() error {
	err = ExecuteInTx(ctx, db, func(tx *sql.Tx) error {
		if reset {
			if err := resetUserFriends(ctx, tx, userID); err != nil {
				logger.Error("Could not reset user friends", zap.Error(err))
Loading