Loading CHANGELOG.md +4 −0 Original line number Diff line number Diff line Loading @@ -5,6 +5,10 @@ The format is based on [keep a changelog](http://keepachangelog.com/) and this p ## [Unreleased] ## [1.4.3] - 2018-04-25 ### Changed - Improve storage remove transactional behaviour. ## [1.4.2] - 2018-04-22 ### Changed - Improve handling of transactional retries. Loading Makefile +1 −1 Original line number Diff line number Diff line Loading @@ -13,7 +13,7 @@ # limitations under the License. BINNAME := nakama VERSION := 1.5.0-dev VERSION := 1.4.3 BUILDDIR := build COMMITID := $(shell git rev-parse --short HEAD 2>/dev/null || echo nosha) DOCKERDIR := install/docker/nakama Loading server/core_storage.go +60 −82 Original line number Diff line number Diff line Loading @@ -73,6 +73,7 @@ var ( ErrRowsAffectedCount = errors.New("rows_affected_count") ErrBadInput = errors.New("bad input") ErrRejected = errors.New("rejected") ErrNoDeletes = errors.New("no deletes") ) // A type that wraps an outgoing client-facing error together with an underlying cause error. Loading Loading @@ -451,7 +452,6 @@ DO UPDATE SET value = $6::BYTEA, version = $7, read = $8, write = $9, updated_at // Execute the query. res, err := tx.Exec(query, params...) if err != nil { logger.Error("Could not write storage, exec error", zap.Error(err)) return err } Loading Loading @@ -547,7 +547,6 @@ WHERE bucket = $1 AND collection = $2 AND user_id = $3 AND record = $4 AND delet if err != nil && err != sql.ErrNoRows { // Only fail on critical database or row scan errors. // If no row was available we still allow storage updates to perform fresh inserts. logger.Error("Could not update storage, query row error", zap.Error(err)) return err } Loading Loading @@ -597,7 +596,6 @@ DO UPDATE SET value = $6::BYTEA, version = $7, read = $8, write = $9, updated_at // Execute the query. res, err := tx.Exec(query, params...) if err != nil { logger.Error("Could not update storage, exec error", zap.Error(err)) return err } Loading Loading @@ -691,15 +689,11 @@ func StorageRemove(logger *zap.Logger, db *sql.DB, caller string, keys []*Storag return RUNTIME_EXCEPTION, errors.New("Could not remove storage") } err = crdb.ExecuteInTx(context.Background(), tx, func() error { // Execute the query. queryRes, err := tx.Query(query, params...) if err != nil { logger.Error("Could not remove storage, query error", zap.Error(err)) if e := tx.Rollback(); e != nil { // Rollback to explicitly end the transaction, but will return an error because there are no updates yet. logger.Debug("Could not rollback transaction in remove storage after query error", zap.Error(e)) } return RUNTIME_EXCEPTION, errors.New("Could not remove storage") return err } defer queryRes.Close() Loading @@ -716,12 +710,7 @@ func StorageRemove(logger *zap.Logger, db *sql.DB, caller string, keys []*Storag for queryRes.Next() { err = queryRes.Scan(&id, &bucket, &collection, &record, &userId, &write, &version) if err != nil { logger.Error("Could not remove storage, scan error", zap.Error(err)) if e := tx.Rollback(); e != nil { // Rollback to explicitly end the transaction, but will return an error because there are no updates yet. logger.Debug("Could not rollback transaction in remove storage after scan error", zap.Error(e)) } return RUNTIME_EXCEPTION, errors.New("Could not remove storage") return err } key := ops[struct { Loading @@ -738,20 +727,12 @@ func StorageRemove(logger *zap.Logger, db *sql.DB, caller string, keys []*Storag // Check permission. if caller != "" && write.Int64 != 1 { if e := tx.Rollback(); e != nil { // Rollback to explicitly end the transaction, but will return an error because there are no updates yet. logger.Debug("Could not rollback transaction in remove storage after permission rejected", zap.Error(e)) } return STORAGE_REJECTED, errors.New("Storage remove rejected: not found, version check failed, or permission denied") return StatusError(STORAGE_REJECTED, errors.New("Storage remove rejected: not found, version check failed, or permission denied"), ErrRejected) } // Check version. if key.Version != "" && key.Version != version.String { if e := tx.Rollback(); e != nil { // Rollback to explicitly end the transaction, but will return an error because there are no updates yet. logger.Debug("Could not rollback transaction in remove storage after version rejected", zap.Error(e)) } return STORAGE_REJECTED, errors.New("Storage remove rejected: not found, version check failed, or permission denied") return StatusError(STORAGE_REJECTED, errors.New("Storage remove rejected: not found, version check failed, or permission denied"), ErrRejected) } l := len(params) Loading @@ -764,26 +745,23 @@ func StorageRemove(logger *zap.Logger, db *sql.DB, caller string, keys []*Storag // Nothing to delete. if len(params) == 1 { if e := tx.Rollback(); e != nil { // Rollback to explicitly end the transaction, but will return an error because there are no updates yet. logger.Debug("Could not rollback transaction in remove storage after no deletes needed", zap.Error(e)) } return 0, nil return StatusError(0, nil, ErrNoDeletes) } query += ")" _, err = tx.Exec(query, params...) if err != nil { logger.Error("Could not remove storage, exec error", zap.Error(err)) if e := tx.Rollback(); e != nil { logger.Warn("Could not rollback transaction in remove storage after exec error", zap.Error(e)) } return RUNTIME_EXCEPTION, errors.New("Could not remove storage") return err } err = tx.Commit() return nil }) if err != nil { logger.Error("Could not remove storage, commit error", zap.Error(err)) if e, ok := err.(*statusError); ok { return e.Code(), e.Status() } logger.Error("Could not remove storage, transaction error", zap.Error(err)) return RUNTIME_EXCEPTION, errors.New("Could not remove storage") } Loading Loading
CHANGELOG.md +4 −0 Original line number Diff line number Diff line Loading @@ -5,6 +5,10 @@ The format is based on [keep a changelog](http://keepachangelog.com/) and this p ## [Unreleased] ## [1.4.3] - 2018-04-25 ### Changed - Improve storage remove transactional behaviour. ## [1.4.2] - 2018-04-22 ### Changed - Improve handling of transactional retries. Loading
Makefile +1 −1 Original line number Diff line number Diff line Loading @@ -13,7 +13,7 @@ # limitations under the License. BINNAME := nakama VERSION := 1.5.0-dev VERSION := 1.4.3 BUILDDIR := build COMMITID := $(shell git rev-parse --short HEAD 2>/dev/null || echo nosha) DOCKERDIR := install/docker/nakama Loading
server/core_storage.go +60 −82 Original line number Diff line number Diff line Loading @@ -73,6 +73,7 @@ var ( ErrRowsAffectedCount = errors.New("rows_affected_count") ErrBadInput = errors.New("bad input") ErrRejected = errors.New("rejected") ErrNoDeletes = errors.New("no deletes") ) // A type that wraps an outgoing client-facing error together with an underlying cause error. Loading Loading @@ -451,7 +452,6 @@ DO UPDATE SET value = $6::BYTEA, version = $7, read = $8, write = $9, updated_at // Execute the query. res, err := tx.Exec(query, params...) if err != nil { logger.Error("Could not write storage, exec error", zap.Error(err)) return err } Loading Loading @@ -547,7 +547,6 @@ WHERE bucket = $1 AND collection = $2 AND user_id = $3 AND record = $4 AND delet if err != nil && err != sql.ErrNoRows { // Only fail on critical database or row scan errors. // If no row was available we still allow storage updates to perform fresh inserts. logger.Error("Could not update storage, query row error", zap.Error(err)) return err } Loading Loading @@ -597,7 +596,6 @@ DO UPDATE SET value = $6::BYTEA, version = $7, read = $8, write = $9, updated_at // Execute the query. res, err := tx.Exec(query, params...) if err != nil { logger.Error("Could not update storage, exec error", zap.Error(err)) return err } Loading Loading @@ -691,15 +689,11 @@ func StorageRemove(logger *zap.Logger, db *sql.DB, caller string, keys []*Storag return RUNTIME_EXCEPTION, errors.New("Could not remove storage") } err = crdb.ExecuteInTx(context.Background(), tx, func() error { // Execute the query. queryRes, err := tx.Query(query, params...) if err != nil { logger.Error("Could not remove storage, query error", zap.Error(err)) if e := tx.Rollback(); e != nil { // Rollback to explicitly end the transaction, but will return an error because there are no updates yet. logger.Debug("Could not rollback transaction in remove storage after query error", zap.Error(e)) } return RUNTIME_EXCEPTION, errors.New("Could not remove storage") return err } defer queryRes.Close() Loading @@ -716,12 +710,7 @@ func StorageRemove(logger *zap.Logger, db *sql.DB, caller string, keys []*Storag for queryRes.Next() { err = queryRes.Scan(&id, &bucket, &collection, &record, &userId, &write, &version) if err != nil { logger.Error("Could not remove storage, scan error", zap.Error(err)) if e := tx.Rollback(); e != nil { // Rollback to explicitly end the transaction, but will return an error because there are no updates yet. logger.Debug("Could not rollback transaction in remove storage after scan error", zap.Error(e)) } return RUNTIME_EXCEPTION, errors.New("Could not remove storage") return err } key := ops[struct { Loading @@ -738,20 +727,12 @@ func StorageRemove(logger *zap.Logger, db *sql.DB, caller string, keys []*Storag // Check permission. if caller != "" && write.Int64 != 1 { if e := tx.Rollback(); e != nil { // Rollback to explicitly end the transaction, but will return an error because there are no updates yet. logger.Debug("Could not rollback transaction in remove storage after permission rejected", zap.Error(e)) } return STORAGE_REJECTED, errors.New("Storage remove rejected: not found, version check failed, or permission denied") return StatusError(STORAGE_REJECTED, errors.New("Storage remove rejected: not found, version check failed, or permission denied"), ErrRejected) } // Check version. if key.Version != "" && key.Version != version.String { if e := tx.Rollback(); e != nil { // Rollback to explicitly end the transaction, but will return an error because there are no updates yet. logger.Debug("Could not rollback transaction in remove storage after version rejected", zap.Error(e)) } return STORAGE_REJECTED, errors.New("Storage remove rejected: not found, version check failed, or permission denied") return StatusError(STORAGE_REJECTED, errors.New("Storage remove rejected: not found, version check failed, or permission denied"), ErrRejected) } l := len(params) Loading @@ -764,26 +745,23 @@ func StorageRemove(logger *zap.Logger, db *sql.DB, caller string, keys []*Storag // Nothing to delete. if len(params) == 1 { if e := tx.Rollback(); e != nil { // Rollback to explicitly end the transaction, but will return an error because there are no updates yet. logger.Debug("Could not rollback transaction in remove storage after no deletes needed", zap.Error(e)) } return 0, nil return StatusError(0, nil, ErrNoDeletes) } query += ")" _, err = tx.Exec(query, params...) if err != nil { logger.Error("Could not remove storage, exec error", zap.Error(err)) if e := tx.Rollback(); e != nil { logger.Warn("Could not rollback transaction in remove storage after exec error", zap.Error(e)) } return RUNTIME_EXCEPTION, errors.New("Could not remove storage") return err } err = tx.Commit() return nil }) if err != nil { logger.Error("Could not remove storage, commit error", zap.Error(err)) if e, ok := err.(*statusError); ok { return e.Code(), e.Status() } logger.Error("Could not remove storage, transaction error", zap.Error(err)) return RUNTIME_EXCEPTION, errors.New("Could not remove storage") } Loading