Loading CHANGELOG.md +3 −0 Original line number Diff line number Diff line Loading @@ -4,6 +4,9 @@ All notable changes to this project are documented below. The format is based on [keep a changelog](http://keepachangelog.com) and this project uses [semantic versioning](http://semver.org). ## [Unreleased] ### Changed - Ensure storage write ops return acks in the same order as inputs. ### Fixed - Fix data returned by StreamUserList in JS runtime. - Allow passing lists of presences as match init parameters to Go runtime matches. Loading server/core_storage.go +15 −5 Original line number Diff line number Diff line Loading @@ -486,9 +486,19 @@ func StorageWriteObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, au } func storageWriteObjects(ctx context.Context, logger *zap.Logger, tx *sql.Tx, authoritativeWrite bool, ops StorageOpWrites) ([]*api.StorageObjectAck, error) { acks := make([]*api.StorageObjectAck, 0, ops.Len()) for _, op := range ops { // Ensure writes are processed in a consistent order to avoid deadlocks from concurrent operations. // Sorting done on a copy to ensure we don't modify the input, which may be re-used on transaction retries. sortedOps := make(StorageOpWrites, 0, len(ops)) indexedOps := make(map[*StorageOpWrite]int, len(ops)) for i, op := range ops { sortedOps = append(sortedOps, op) indexedOps[op] = i } sort.Sort(sortedOps) // Run operations in the sorted order. acks := make([]*api.StorageObjectAck, ops.Len()) for _, op := range sortedOps { ack, writeErr := storageWriteObject(ctx, logger, tx, authoritativeWrite, op.OwnerID, op.Object) if writeErr != nil { if writeErr == runtime.ErrStorageRejectedVersion || writeErr == runtime.ErrStorageRejectedPermission { Loading @@ -498,9 +508,9 @@ func storageWriteObjects(ctx context.Context, logger *zap.Logger, tx *sql.Tx, au logger.Debug("Error writing storage objects.", zap.Error(writeErr)) return nil, writeErr } acks = append(acks, ack) // Acks are returned in the original order. acks[indexedOps[op]] = ack } return acks, nil } Loading Loading
CHANGELOG.md +3 −0 Original line number Diff line number Diff line Loading @@ -4,6 +4,9 @@ All notable changes to this project are documented below. The format is based on [keep a changelog](http://keepachangelog.com) and this project uses [semantic versioning](http://semver.org). ## [Unreleased] ### Changed - Ensure storage write ops return acks in the same order as inputs. ### Fixed - Fix data returned by StreamUserList in JS runtime. - Allow passing lists of presences as match init parameters to Go runtime matches. Loading
server/core_storage.go +15 −5 Original line number Diff line number Diff line Loading @@ -486,9 +486,19 @@ func StorageWriteObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, au } func storageWriteObjects(ctx context.Context, logger *zap.Logger, tx *sql.Tx, authoritativeWrite bool, ops StorageOpWrites) ([]*api.StorageObjectAck, error) { acks := make([]*api.StorageObjectAck, 0, ops.Len()) for _, op := range ops { // Ensure writes are processed in a consistent order to avoid deadlocks from concurrent operations. // Sorting done on a copy to ensure we don't modify the input, which may be re-used on transaction retries. sortedOps := make(StorageOpWrites, 0, len(ops)) indexedOps := make(map[*StorageOpWrite]int, len(ops)) for i, op := range ops { sortedOps = append(sortedOps, op) indexedOps[op] = i } sort.Sort(sortedOps) // Run operations in the sorted order. acks := make([]*api.StorageObjectAck, ops.Len()) for _, op := range sortedOps { ack, writeErr := storageWriteObject(ctx, logger, tx, authoritativeWrite, op.OwnerID, op.Object) if writeErr != nil { if writeErr == runtime.ErrStorageRejectedVersion || writeErr == runtime.ErrStorageRejectedPermission { Loading @@ -498,9 +508,9 @@ func storageWriteObjects(ctx context.Context, logger *zap.Logger, tx *sql.Tx, au logger.Debug("Error writing storage objects.", zap.Error(writeErr)) return nil, writeErr } acks = append(acks, ack) // Acks are returned in the original order. acks[indexedOps[op]] = ack } return acks, nil } Loading