From 2166d6e1885174ff2eb49ce34df78d879a64205b Mon Sep 17 00:00:00 2001 From: Andrei Mihu Date: Wed, 25 Apr 2018 22:26:38 +0100 Subject: [PATCH] Nakama 1.4.3 release. --- CHANGELOG.md | 4 ++ Makefile | 2 +- server/core_storage.go | 142 +++++++++++++++++------------------------ 3 files changed, 65 insertions(+), 83 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 838d2a756..400e016cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,10 @@ The format is based on [keep a changelog](http://keepachangelog.com/) and this p ## [Unreleased] +## [1.4.3] - 2018-04-25 +### Changed +- Improve storage remove transactional behaviour. + ## [1.4.2] - 2018-04-22 ### Changed - Improve handling of transactional retries. diff --git a/Makefile b/Makefile index 118443137..d51bc0f67 100644 --- a/Makefile +++ b/Makefile @@ -13,7 +13,7 @@ # limitations under the License. BINNAME := nakama -VERSION := 1.5.0-dev +VERSION := 1.4.3 BUILDDIR := build COMMITID := $(shell git rev-parse --short HEAD 2>/dev/null || echo nosha) DOCKERDIR := install/docker/nakama diff --git a/server/core_storage.go b/server/core_storage.go index 9f7673cb5..f49bd1642 100644 --- a/server/core_storage.go +++ b/server/core_storage.go @@ -73,6 +73,7 @@ var ( ErrRowsAffectedCount = errors.New("rows_affected_count") ErrBadInput = errors.New("bad input") ErrRejected = errors.New("rejected") + ErrNoDeletes = errors.New("no deletes") ) // A type that wraps an outgoing client-facing error together with an underlying cause error. @@ -451,7 +452,6 @@ DO UPDATE SET value = $6::BYTEA, version = $7, read = $8, write = $9, updated_at // Execute the query. res, err := tx.Exec(query, params...) if err != nil { - logger.Error("Could not write storage, exec error", zap.Error(err)) return err } @@ -547,7 +547,6 @@ WHERE bucket = $1 AND collection = $2 AND user_id = $3 AND record = $4 AND delet if err != nil && err != sql.ErrNoRows { // Only fail on critical database or row scan errors. // If no row was available we still allow storage updates to perform fresh inserts. - logger.Error("Could not update storage, query row error", zap.Error(err)) return err } @@ -597,7 +596,6 @@ DO UPDATE SET value = $6::BYTEA, version = $7, read = $8, write = $9, updated_at // Execute the query. res, err := tx.Exec(query, params...) if err != nil { - logger.Error("Could not update storage, exec error", zap.Error(err)) return err } @@ -691,99 +689,79 @@ func StorageRemove(logger *zap.Logger, db *sql.DB, caller string, keys []*Storag return RUNTIME_EXCEPTION, errors.New("Could not remove storage") } - // Execute the query. - queryRes, err := tx.Query(query, params...) - if err != nil { - logger.Error("Could not remove storage, query error", zap.Error(err)) - if e := tx.Rollback(); e != nil { - // Rollback to explicitly end the transaction, but will return an error because there are no updates yet. - logger.Debug("Could not rollback transaction in remove storage after query error", zap.Error(e)) - } - return RUNTIME_EXCEPTION, errors.New("Could not remove storage") - } - defer queryRes.Close() - - query = "UPDATE storage SET deleted_at = $1, updated_at = $1 WHERE id IN (" - params = []interface{}{nowMs()} - - var id sql.NullString - var bucket sql.NullString - var collection sql.NullString - var record sql.NullString - var userId sql.NullString - var write sql.NullInt64 - var version sql.NullString - for queryRes.Next() { - err = queryRes.Scan(&id, &bucket, &collection, &record, &userId, &write, &version) + err = crdb.ExecuteInTx(context.Background(), tx, func() error { + // Execute the query. + queryRes, err := tx.Query(query, params...) if err != nil { - logger.Error("Could not remove storage, scan error", zap.Error(err)) - if e := tx.Rollback(); e != nil { - // Rollback to explicitly end the transaction, but will return an error because there are no updates yet. - logger.Debug("Could not rollback transaction in remove storage after scan error", zap.Error(e)) - } - return RUNTIME_EXCEPTION, errors.New("Could not remove storage") + return err } + defer queryRes.Close() - key := ops[struct { - bucket string - collection string - record string - userId string - }{ - bucket: bucket.String, - collection: collection.String, - record: record.String, - userId: userId.String, - }] - - // Check permission. - if caller != "" && write.Int64 != 1 { - if e := tx.Rollback(); e != nil { - // Rollback to explicitly end the transaction, but will return an error because there are no updates yet. - logger.Debug("Could not rollback transaction in remove storage after permission rejected", zap.Error(e)) + query = "UPDATE storage SET deleted_at = $1, updated_at = $1 WHERE id IN (" + params = []interface{}{nowMs()} + + var id sql.NullString + var bucket sql.NullString + var collection sql.NullString + var record sql.NullString + var userId sql.NullString + var write sql.NullInt64 + var version sql.NullString + for queryRes.Next() { + err = queryRes.Scan(&id, &bucket, &collection, &record, &userId, &write, &version) + if err != nil { + return err } - return STORAGE_REJECTED, errors.New("Storage remove rejected: not found, version check failed, or permission denied") - } - // Check version. - if key.Version != "" && key.Version != version.String { - if e := tx.Rollback(); e != nil { - // Rollback to explicitly end the transaction, but will return an error because there are no updates yet. - logger.Debug("Could not rollback transaction in remove storage after version rejected", zap.Error(e)) + key := ops[struct { + bucket string + collection string + record string + userId string + }{ + bucket: bucket.String, + collection: collection.String, + record: record.String, + userId: userId.String, + }] + + // Check permission. + if caller != "" && write.Int64 != 1 { + return StatusError(STORAGE_REJECTED, errors.New("Storage remove rejected: not found, version check failed, or permission denied"), ErrRejected) } - return STORAGE_REJECTED, errors.New("Storage remove rejected: not found, version check failed, or permission denied") - } - l := len(params) - if l != 1 { - query += ", " + // Check version. + if key.Version != "" && key.Version != version.String { + return StatusError(STORAGE_REJECTED, errors.New("Storage remove rejected: not found, version check failed, or permission denied"), ErrRejected) + } + + l := len(params) + if l != 1 { + query += ", " + } + query += fmt.Sprintf("$%v", l+1) + params = append(params, id.String) } - query += fmt.Sprintf("$%v", l+1) - params = append(params, id.String) - } - // Nothing to delete. - if len(params) == 1 { - if e := tx.Rollback(); e != nil { - // Rollback to explicitly end the transaction, but will return an error because there are no updates yet. - logger.Debug("Could not rollback transaction in remove storage after no deletes needed", zap.Error(e)) + // Nothing to delete. + if len(params) == 1 { + return StatusError(0, nil, ErrNoDeletes) } - return 0, nil - } - query += ")" - _, err = tx.Exec(query, params...) - if err != nil { - logger.Error("Could not remove storage, exec error", zap.Error(err)) - if e := tx.Rollback(); e != nil { - logger.Warn("Could not rollback transaction in remove storage after exec error", zap.Error(e)) + query += ")" + _, err = tx.Exec(query, params...) + if err != nil { + return err } - return RUNTIME_EXCEPTION, errors.New("Could not remove storage") - } - err = tx.Commit() + return nil + }) + if err != nil { - logger.Error("Could not remove storage, commit error", zap.Error(err)) + if e, ok := err.(*statusError); ok { + return e.Code(), e.Status() + } + logger.Error("Could not remove storage, transaction error", zap.Error(err)) return RUNTIME_EXCEPTION, errors.New("Could not remove storage") } -- GitLab