Commit fec55117 authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Improve transaction retry handling. (#191)

parent 68050952
Loading
Loading
Loading
Loading
+66 −10
Original line number Diff line number Diff line
@@ -11,6 +11,12 @@
  packages = ["."]
  revision = "97c69685293dce4c0a2d0b19535179bbc976e4d2"

[[projects]]
  branch = "master"
  name = "github.com/cockroachdb/cockroach-go"
  packages = ["crdb"]
  revision = "59c0560478b705bf9bd12f9252224a0fad7c87df"

[[projects]]
  name = "github.com/davecgh/go-spew"
  packages = ["spew"]
@@ -41,7 +47,12 @@

[[projects]]
  name = "github.com/gogo/protobuf"
  packages = ["jsonpb","proto","sortkeys","types"]
  packages = [
    "jsonpb",
    "proto",
    "sortkeys",
    "types"
  ]
  revision = "100ba4e885062801d56799d78530b73b178a78f3"
  version = "v0.4"

@@ -82,7 +93,10 @@

[[projects]]
  name = "github.com/lib/pq"
  packages = [".","oid"]
  packages = [
    ".",
    "oid"
  ]
  revision = "22cb3e4c487ce6242e2b03369219e5631eed1221"

[[projects]]
@@ -92,7 +106,10 @@

[[projects]]
  name = "github.com/rubenv/sql-migrate"
  packages = [".","sqlparse"]
  packages = [
    ".",
    "sqlparse"
  ]
  revision = "a3ed23a40ebd39f82bf2a36768ed7d595f2bdc1e"

[[projects]]
@@ -114,7 +131,12 @@

[[projects]]
  name = "github.com/yuin/gopher-lua"
  packages = [".","ast","parse","pm"]
  packages = [
    ".",
    "ast",
    "parse",
    "pm"
  ]
  revision = "2243d714d6c94951d8ccca8c851836ff47d401c9"

[[projects]]
@@ -125,28 +147,62 @@

[[projects]]
  name = "go.uber.org/zap"
  packages = [".","buffer","internal/bufferpool","internal/color","internal/exit","internal/multierror","zapcore"]
  packages = [
    ".",
    "buffer",
    "internal/bufferpool",
    "internal/color",
    "internal/exit",
    "internal/multierror",
    "zapcore"
  ]
  revision = "9cabc84638b70e564c3dab2766efcb1ded2aac9f"
  version = "v1.4.1"

[[projects]]
  name = "golang.org/x/crypto"
  packages = ["bcrypt","blowfish","chacha20poly1305","chacha20poly1305/internal/chacha20","poly1305"]
  packages = [
    "bcrypt",
    "blowfish",
    "chacha20poly1305",
    "chacha20poly1305/internal/chacha20",
    "poly1305"
  ]
  revision = "f6b343c37ca80bfa8ea539da67a0b621f84fab1d"

[[projects]]
  name = "golang.org/x/net"
  packages = ["context","context/ctxhttp"]
  packages = [
    "context",
    "context/ctxhttp"
  ]
  revision = "69d4b8aa71caaaa75c3dfc11211d1be495abec7c"

[[projects]]
  name = "golang.org/x/oauth2"
  packages = [".","google","internal","jws","jwt"]
  packages = [
    ".",
    "google",
    "internal",
    "jws",
    "jwt"
  ]
  revision = "cce311a261e6fcf29de72ca96827bdb0b7d9c9e6"

[[projects]]
  name = "google.golang.org/appengine"
  packages = [".","internal","internal/app_identity","internal/base","internal/datastore","internal/log","internal/modules","internal/remote_api","internal/urlfetch","urlfetch"]
  packages = [
    ".",
    "internal",
    "internal/app_identity",
    "internal/base",
    "internal/datastore",
    "internal/log",
    "internal/modules",
    "internal/remote_api",
    "internal/urlfetch",
    "urlfetch"
  ]
  revision = "ad2570cd3913654e00c5f0183b39d2f998e54046"

[[projects]]
@@ -158,6 +214,6 @@
[solve-meta]
  analyzer-name = "dep"
  analyzer-version = 1
  inputs-digest = "35278ed3991c30453da43a354426a228439d974e91cb9dd5f6992f71681f7306"
  inputs-digest = "fb1120d629db08a0def85cdcc7fc257216e137c50ca001a2b03ae090629f5e7f"
  solver-name = "gps-cdcl"
  solver-version = 1
+3 −0
Original line number Diff line number Diff line
@@ -68,3 +68,6 @@

[[constraint]]
  name = "github.com/wirepair/netcode"

[[constraint]]
  name = "github.com/cockroachdb/cockroach-go"
+186 −157
Original line number Diff line number Diff line
@@ -26,6 +26,8 @@ import (
	"encoding/gob"
	"nakama/pkg/jsonpatch"

	"context"
	"github.com/cockroachdb/cockroach-go/crdb"
	"go.uber.org/zap"
)

@@ -67,6 +69,45 @@ type StorageKeyUpdate struct {
	Patch           jsonpatch.ExtendedPatch
}

var (
	ErrRowsAffectedCount = errors.New("rows_affected_count")
	ErrBadInput          = errors.New("bad input")
	ErrRejected          = errors.New("rejected")
)

// A type that wraps an outgoing client-facing error together with an underlying cause error.
type statusError struct {
	code   Error_Code
	status error
	cause  error
}

// Implement the error interface.
func (s *statusError) Error() string {
	return s.status.Error()
}

// Implement the crdb.ErrorCauser interface to allow the crdb.ExecuteInTx wrapper to figure out whether to retry or not.
func (s *statusError) Cause() error {
	return s.cause
}

func (s *statusError) Code() Error_Code {
	return s.code
}

func (s *statusError) Status() error {
	return s.status
}

func StatusError(code Error_Code, status, cause error) error {
	return &statusError{
		code:   code,
		status: status,
		cause:  cause,
	}
}

func StorageList(logger *zap.Logger, db *sql.DB, caller string, userID string, bucket string, collection string, limit int64, cursor string) ([]*StorageData, string, Error_Code, error) {
	// We list by at least User ID, or bucket as a list criteria.
	if userID == "" && bucket == "" {
@@ -369,6 +410,7 @@ func StorageWrite(logger *zap.Logger, db *sql.DB, caller string, data []*Storage
		return nil, RUNTIME_EXCEPTION, errors.New("Could not write storage")
	}

	err = crdb.ExecuteInTx(context.Background(), tx, func() error {
		// Execute each storage write.
		for i, d := range data {
			id := generateNewId()
@@ -410,19 +452,12 @@ DO UPDATE SET value = $6::BYTEA, version = $7, read = $8, write = $9, updated_at
			res, err := tx.Exec(query, params...)
			if err != nil {
				logger.Error("Could not write storage, exec error", zap.Error(err))
			if e := tx.Rollback(); e != nil {
				logger.Error("Could not write storage, rollback error", zap.Error(e))
			}
			return nil, RUNTIME_EXCEPTION, errors.New("Could not write storage")
				return err
			}

			// Check there was exactly 1 row affected.
			if rowsAffected, _ := res.RowsAffected(); rowsAffected != 1 {
			err = tx.Rollback()
			if err != nil {
				logger.Error("Could not write storage, rollback error", zap.Error(err))
			}
			return nil, STORAGE_REJECTED, errors.New("Storage write rejected: not found, version check failed, or permission denied")
				return StatusError(STORAGE_REJECTED, errors.New("Storage write rejected: not found, version check failed, or permission denied"), ErrRowsAffectedCount)
			}

			keys[i] = &StorageKey{
@@ -434,9 +469,14 @@ DO UPDATE SET value = $6::BYTEA, version = $7, read = $8, write = $9, updated_at
			}
		}

	err = tx.Commit()
		return nil
	})

	if err != nil {
		logger.Error("Could not write storage, commit error", zap.Error(err))
		if e, ok := err.(*statusError); ok {
			return nil, e.Code(), e.Status()
		}
		logger.Error("Could not write storage, transaction error", zap.Error(err))
		return nil, RUNTIME_EXCEPTION, errors.New("Could not write storage")
	}

@@ -462,30 +502,31 @@ func StorageUpdate(logger *zap.Logger, db *sql.DB, caller string, updates []*Sto
		return nil, RUNTIME_EXCEPTION, errors.New("Could not update storage")
	}

	err = crdb.ExecuteInTx(context.Background(), tx, func() error {
		// Process each update one by one.
		for i, update := range updates {
			// Check the storage identifiers.
			if update.Key.Bucket == "" || update.Key.Collection == "" || update.Key.Record == "" {
			return nil, BAD_INPUT, errors.New(fmt.Sprintf("Invalid update index %v: Invalid values for bucket, collection, or record", i))
				return StatusError(BAD_INPUT, errors.New(fmt.Sprintf("Invalid update index %v: Invalid values for bucket, collection, or record", i)), ErrBadInput)
			}

			// Check permission values.
			if update.PermissionRead < 0 || update.PermissionRead > 2 {
			return nil, BAD_INPUT, errors.New(fmt.Sprintf("Invalid update index %v: Invalid read permission", i))
				return StatusError(BAD_INPUT, errors.New(fmt.Sprintf("Invalid update index %v: Invalid read permission", i)), ErrBadInput)
			}
			if update.PermissionWrite < 0 || update.PermissionWrite > 1 {
			return nil, BAD_INPUT, errors.New(fmt.Sprintf("Invalid update index %v: Invalid write permission", i))
				return StatusError(BAD_INPUT, errors.New(fmt.Sprintf("Invalid update index %v: Invalid write permission", i)), ErrBadInput)
			}

			// If a user ID is provided, validate the format.
			if update.Key.UserId != "" {
				if caller != "" && caller != update.Key.UserId {
					// If the caller is a client, only allow them to write their own data.
				return nil, BAD_INPUT, errors.New(fmt.Sprintf("Invalid update index %v: A client can only write their own records", i))
					return StatusError(BAD_INPUT, errors.New(fmt.Sprintf("Invalid update index %v: A client can only write their own records", i)), ErrBadInput)
				}
			} else if caller != "" {
				// If the caller is a client, do not allow them to write global data.
			return nil, BAD_INPUT, errors.New(fmt.Sprintf("Invalid update index %v: A client cannot write global records", i))
				return StatusError(BAD_INPUT, errors.New(fmt.Sprintf("Invalid update index %v: A client cannot write global records", i)), ErrBadInput)
			}

			query := `
@@ -507,24 +548,18 @@ WHERE bucket = $1 AND collection = $2 AND user_id = $3 AND record = $4 AND delet
				// Only fail on critical database or row scan errors.
				// If no row was available we still allow storage updates to perform fresh inserts.
				logger.Error("Could not update storage, query row error", zap.Error(err))
			if e := tx.Rollback(); e != nil {
				logger.Error("Could not update storage, rollback error", zap.Error(e))
			}
			return nil, RUNTIME_EXCEPTION, errors.New("Could not update storage")
				return err
			}

			// Check if we need an immediate version compare.
			// If-None-Match and there's an existing version OR If-Match and the existing version doesn't match.
			if update.Key.Version != "" && ((update.Key.Version == "*" && version != "") || (update.Key.Version != "*" && update.Key.Version != version)) {
			if e := tx.Rollback(); e != nil {
				logger.Error("Could not update storage, rollback error", zap.Error(e))
			}
			return nil, STORAGE_REJECTED, errors.New(fmt.Sprintf("Storage update index %v rejected: not found, version check failed, or permission denied", i))
				return StatusError(STORAGE_REJECTED, errors.New(fmt.Sprintf("Storage update index %v rejected: not found, version check failed, or permission denied", i)), ErrRejected)
			}

			// Check write permission if caller is not script runtime.
			if caller != "" && write.Valid && write.Int64 != 1 {
			return nil, STORAGE_REJECTED, errors.New(fmt.Sprintf("Storage update index %v rejected: not found, version check failed, or permission denied", i))
				return StatusError(STORAGE_REJECTED, errors.New(fmt.Sprintf("Storage update index %v rejected: not found, version check failed, or permission denied", i)), ErrRejected)
			}

			// Allow updates to create new records.
@@ -535,10 +570,7 @@ WHERE bucket = $1 AND collection = $2 AND user_id = $3 AND record = $4 AND delet
			// Apply the patch operations.
			newValue, err := update.Patch.Apply(value)
			if err != nil {
			if e := tx.Rollback(); e != nil {
				logger.Error("Could not update storage, rollback error", zap.Error(e))
			}
			return nil, STORAGE_REJECTED, errors.New(fmt.Sprintf("Storage update index %v rejected: %v", i, err.Error()))
				return StatusError(STORAGE_REJECTED, errors.New(fmt.Sprintf("Storage update index %v rejected: %v", i, err.Error())), ErrRejected)
			}
			newVersion := fmt.Sprintf("%x", sha256.Sum256(newValue))

@@ -566,19 +598,12 @@ DO UPDATE SET value = $6::BYTEA, version = $7, read = $8, write = $9, updated_at
			res, err := tx.Exec(query, params...)
			if err != nil {
				logger.Error("Could not update storage, exec error", zap.Error(err))
			if e := tx.Rollback(); e != nil {
				logger.Error("Could not update storage, rollback error", zap.Error(e))
			}
			return nil, RUNTIME_EXCEPTION, errors.New("Could not update storage")
				return err
			}

			// Check there was exactly 1 row affected.
			if rowsAffected, _ := res.RowsAffected(); rowsAffected != 1 {
			err = tx.Rollback()
			if err != nil {
				logger.Error("Could not update storage, rollback error", zap.Error(err))
			}
			return nil, STORAGE_REJECTED, errors.New(fmt.Sprintf("Storage update index %v rejected: not found, version check failed, or permission denied", i))
				return StatusError(STORAGE_REJECTED, errors.New(fmt.Sprintf("Storage update index %v rejected: not found, version check failed, or permission denied", i)), ErrRowsAffectedCount)
			}

			keys[i] = &StorageKey{
@@ -590,10 +615,14 @@ DO UPDATE SET value = $6::BYTEA, version = $7, read = $8, write = $9, updated_at
			}
		}

	// Commit the transaction.
	err = tx.Commit()
		return nil
	})

	if err != nil {
		logger.Error("Could not update storage, commit error", zap.Error(err))
		if e, ok := err.(*statusError); ok {
			return nil, e.Code(), e.Status()
		}
		logger.Error("Could not write storage, transaction error", zap.Error(err))
		return nil, RUNTIME_EXCEPTION, errors.New("Could not update storage")
	}