Commit 226ce512 authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Complete storage write batches in deterministic order.

parent 54e3efa5
Loading
Loading
Loading
Loading
+7 −0
Original line number Diff line number Diff line
@@ -4,6 +4,13 @@ All notable changes to this project are documented below.
The format is based on [keep a changelog](http://keepachangelog.com) and this project uses [semantic versioning](http://semver.org).

## [Unreleased]
### Changed
- Log more information when authoritative match handlers receive too many data messages.
- Ensure storage writes and deletes are performed in a consistent order within each batch.
- Ensure wallet updates are performed in a consistent order within each batch.

### Fixed
- Storage write batches now correctly abort when any query in the batch fails.

## [2.4.2] - 2019-03-25
### Added
+23 −11
Original line number Diff line number Diff line
@@ -152,12 +152,12 @@ func (s *ApiServer) ReadStorageObjects(ctx context.Context, in *api.ReadStorageO
}

func (s *ApiServer) WriteStorageObjects(ctx context.Context, in *api.WriteStorageObjectsRequest) (*api.StorageObjectAcks, error) {
	userID := ctx.Value(ctxUserIDKey{}).(uuid.UUID)
	userID := ctx.Value(ctxUserIDKey{}).(uuid.UUID).String()

	// Before hook.
	if fn := s.runtime.BeforeWriteStorageObjects(); fn != nil {
		beforeFn := func(clientIP, clientPort string) error {
			result, err, code := fn(ctx, s.logger, userID.String(), ctx.Value(ctxUsernameKey{}).(string), ctx.Value(ctxExpiryKey{}).(int64), clientIP, clientPort, in)
			result, err, code := fn(ctx, s.logger, userID, ctx.Value(ctxUsernameKey{}).(string), ctx.Value(ctxExpiryKey{}).(int64), clientIP, clientPort, in)
			if err != nil {
				return status.Error(code, err.Error())
			}
@@ -206,9 +206,15 @@ func (s *ApiServer) WriteStorageObjects(ctx context.Context, in *api.WriteStorag
		}
	}

	userObjects := map[uuid.UUID][]*api.WriteStorageObject{userID: in.GetObjects()}
	ops := make(StorageOpWrites, 0, len(in.GetObjects()))
	for _, object := range in.GetObjects() {
		ops = append(ops, &StorageOpWrite{
			OwnerID: userID,
			Object:  object,
		})
	}

	acks, code, err := StorageWriteObjects(ctx, s.logger, s.db, false, userObjects)
	acks, code, err := StorageWriteObjects(ctx, s.logger, s.db, false, ops)
	if err != nil {
		if code == codes.Internal {
			return nil, status.Error(codes.Internal, "Error writing storage objects.")
@@ -219,7 +225,7 @@ func (s *ApiServer) WriteStorageObjects(ctx context.Context, in *api.WriteStorag
	// After hook.
	if fn := s.runtime.AfterWriteStorageObjects(); fn != nil {
		afterFn := func(clientIP, clientPort string) {
			fn(ctx, s.logger, userID.String(), ctx.Value(ctxUsernameKey{}).(string), ctx.Value(ctxExpiryKey{}).(int64), clientIP, clientPort, acks, in)
			fn(ctx, s.logger, userID, ctx.Value(ctxUsernameKey{}).(string), ctx.Value(ctxExpiryKey{}).(int64), clientIP, clientPort, acks, in)
		}

		// Execute the after function lambda wrapped in a trace for stats measurement.
@@ -230,18 +236,18 @@ func (s *ApiServer) WriteStorageObjects(ctx context.Context, in *api.WriteStorag
}

func (s *ApiServer) DeleteStorageObjects(ctx context.Context, in *api.DeleteStorageObjectsRequest) (*empty.Empty, error) {
	userID := ctx.Value(ctxUserIDKey{}).(uuid.UUID)
	userID := ctx.Value(ctxUserIDKey{}).(uuid.UUID).String()

	// Before hook.
	if fn := s.runtime.BeforeDeleteStorageObjects(); fn != nil {
		beforeFn := func(clientIP, clientPort string) error {
			result, err, code := fn(ctx, s.logger, userID.String(), ctx.Value(ctxUsernameKey{}).(string), ctx.Value(ctxExpiryKey{}).(int64), clientIP, clientPort, in)
			result, err, code := fn(ctx, s.logger, userID, ctx.Value(ctxUsernameKey{}).(string), ctx.Value(ctxExpiryKey{}).(int64), clientIP, clientPort, in)
			if err != nil {
				return status.Error(code, err.Error())
			}
			if result == nil {
				// If result is nil, requested resource is disabled.
				s.logger.Warn("Intercepted a disabled resource.", zap.Any("resource", ctx.Value(ctxFullMethodKey{}).(string)), zap.String("uid", userID.String()))
				s.logger.Warn("Intercepted a disabled resource.", zap.Any("resource", ctx.Value(ctxFullMethodKey{}).(string)), zap.String("uid", userID))
				return status.Error(codes.NotFound, "Requested resource was not found.")
			}
			in = result
@@ -265,9 +271,15 @@ func (s *ApiServer) DeleteStorageObjects(ctx context.Context, in *api.DeleteStor
		}
	}

	objectIDs := map[uuid.UUID][]*api.DeleteStorageObjectId{userID: in.GetObjectIds()}
	ops := make(StorageOpDeletes, 0, len(in.GetObjectIds()))
	for _, objectID := range in.GetObjectIds() {
		ops = append(ops, &StorageOpDelete{
			OwnerID:  userID,
			ObjectID: objectID,
		})
	}

	if code, err := StorageDeleteObjects(ctx, s.logger, s.db, false, objectIDs); err != nil {
	if code, err := StorageDeleteObjects(ctx, s.logger, s.db, false, ops); err != nil {
		if code == codes.Internal {
			return nil, status.Error(codes.Internal, "Error deleting storage objects.")
		}
@@ -277,7 +289,7 @@ func (s *ApiServer) DeleteStorageObjects(ctx context.Context, in *api.DeleteStor
	// After hook.
	if fn := s.runtime.AfterDeleteStorageObjects(); fn != nil {
		afterFn := func(clientIP, clientPort string) {
			fn(ctx, s.logger, ctx.Value(ctxUserIDKey{}).(uuid.UUID).String(), ctx.Value(ctxUsernameKey{}).(string), ctx.Value(ctxExpiryKey{}).(int64), clientIP, clientPort, in)
			fn(ctx, s.logger, userID, ctx.Value(ctxUsernameKey{}).(string), ctx.Value(ctxExpiryKey{}).(int64), clientIP, clientPort, in)
		}

		// Execute the after function lambda wrapped in a trace for stats measurement.
+10 −8
Original line number Diff line number Diff line
@@ -46,14 +46,15 @@ func (s *ConsoleServer) DeleteStorageObject(ctx context.Context, in *console.Del
	if in.Key == "" {
		return nil, status.Error(codes.InvalidArgument, "Requires a valid key.")
	}
	userID, err := uuid.FromString(in.UserId)
	_, err := uuid.FromString(in.UserId)
	if err != nil {
		return nil, status.Error(codes.InvalidArgument, "Requires a valid user ID.")
	}

	code, err := StorageDeleteObjects(ctx, s.logger, s.db, true, map[uuid.UUID][]*api.DeleteStorageObjectId{
		userID: []*api.DeleteStorageObjectId{
			&api.DeleteStorageObjectId{
	code, err := StorageDeleteObjects(ctx, s.logger, s.db, true, StorageOpDeletes{
		&StorageOpDelete{
			OwnerID: in.UserId,
			ObjectID: &api.DeleteStorageObjectId{
				Collection: in.Collection,
				Key:        in.Key,
				Version:    in.Version,
@@ -159,7 +160,7 @@ func (s *ConsoleServer) WriteStorageObject(ctx context.Context, in *console.Writ
	if in.Key == "" {
		return nil, status.Error(codes.InvalidArgument, "Requires a valid key.")
	}
	userID, err := uuid.FromString(in.UserId)
	_, err := uuid.FromString(in.UserId)
	if err != nil {
		return nil, status.Error(codes.InvalidArgument, "Requires a valid user ID.")
	}
@@ -181,9 +182,10 @@ func (s *ConsoleServer) WriteStorageObject(ctx context.Context, in *console.Writ
		return nil, status.Error(codes.InvalidArgument, "Requires a valid JSON object value.")
	}

	acks, code, err := StorageWriteObjects(ctx, s.logger, s.db, true, map[uuid.UUID][]*api.WriteStorageObject{
		userID: []*api.WriteStorageObject{
			&api.WriteStorageObject{
	acks, code, err := StorageWriteObjects(ctx, s.logger, s.db, true, StorageOpWrites{
		&StorageOpWrite{
			OwnerID: in.UserId,
			Object: &api.WriteStorageObject{
				Collection:      in.Collection,
				Key:             in.Key,
				Value:           in.Value,
+98 −40
Original line number Diff line number Diff line
@@ -21,6 +21,7 @@ import (
	"encoding/gob"
	"errors"
	"fmt"
	"sort"

	"context"

@@ -39,6 +40,56 @@ type storageCursor struct {
	Read   int32
}

// Internal representation for a batch of storage write operations.
type StorageOpWrites []*StorageOpWrite

type StorageOpWrite struct {
	OwnerID string
	Object  *api.WriteStorageObject
}

func (s StorageOpWrites) Len() int {
	return len(s)
}
func (s StorageOpWrites) Swap(i, j int) {
	s[i], s[j] = s[j], s[i]
}
func (s StorageOpWrites) Less(i, j int) bool {
	s1, s2 := s[i], s[j]
	if s1.Object.Collection < s2.Object.Collection {
		return true
	}
	if s1.Object.Key < s2.Object.Key {
		return true
	}
	return s1.OwnerID < s2.OwnerID
}

// Internal representation for a batch of storage delete operations.
type StorageOpDeletes []*StorageOpDelete

type StorageOpDelete struct {
	OwnerID  string
	ObjectID *api.DeleteStorageObjectId
}

func (s StorageOpDeletes) Len() int {
	return len(s)
}
func (s StorageOpDeletes) Swap(i, j int) {
	s[i], s[j] = s[j], s[i]
}
func (s StorageOpDeletes) Less(i, j int) bool {
	s1, s2 := s[i], s[j]
	if s1.ObjectID.Collection < s2.ObjectID.Collection {
		return true
	}
	if s1.ObjectID.Key < s2.ObjectID.Key {
		return true
	}
	return s1.OwnerID < s2.OwnerID
}

func StorageListObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, caller uuid.UUID, ownerID *uuid.UUID, collection string, limit int, cursor string) (*api.StorageObjectList, codes.Code, error) {
	var sc *storageCursor = nil
	if cursor != "" {
@@ -397,8 +448,11 @@ WHERE
	return objects, err
}

func StorageWriteObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, authoritativeWrite bool, objects map[uuid.UUID][]*api.WriteStorageObject) (*api.StorageObjectAcks, codes.Code, error) {
	acks := &api.StorageObjectAcks{}
func StorageWriteObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, authoritativeWrite bool, ops StorageOpWrites) (*api.StorageObjectAcks, codes.Code, error) {
	// Ensure writes are processed in a consistent order.
	sort.Sort(ops)

	var acks []*api.StorageObjectAck

	tx, err := db.BeginTx(ctx, nil)
	if err != nil {
@@ -407,20 +461,20 @@ func StorageWriteObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, au
	}

	if err = crdb.ExecuteInTx(ctx, tx, func() error {
		for ownerID, userObjects := range objects {
			for _, object := range userObjects {
				ack, writeErr := storageWriteObject(ctx, logger, tx, authoritativeWrite, ownerID, object)
		acks = make([]*api.StorageObjectAck, 0, ops.Len())

		for _, op := range ops {
			ack, writeErr := storageWriteObject(ctx, logger, tx, authoritativeWrite, op.OwnerID, op.Object)
			if writeErr != nil {
				if writeErr == sql.ErrNoRows {
					return StatusError(codes.InvalidArgument, "Storage write rejected.", errors.New("Storage write rejected - not found, version check failed, or permission denied."))
				}

				logger.Debug("Error writing storage objects.", zap.Error(err))
					return err
				return writeErr
			}

				acks.Acks = append(acks.Acks, ack)
			}
			acks = append(acks, ack)
		}
		return nil
	}); err != nil {
@@ -431,10 +485,10 @@ func StorageWriteObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, au
		return nil, codes.Internal, err
	}

	return acks, codes.OK, nil
	return &api.StorageObjectAcks{Acks: acks}, codes.OK, nil
}

func storageWriteObject(ctx context.Context, logger *zap.Logger, tx *sql.Tx, authoritativeWrite bool, ownerID uuid.UUID, object *api.WriteStorageObject) (*api.StorageObjectAck, error) {
func storageWriteObject(ctx context.Context, logger *zap.Logger, tx *sql.Tx, authoritativeWrite bool, ownerID string, object *api.WriteStorageObject) (*api.StorageObjectAck, error) {
	permissionRead := int32(1)
	if object.GetPermissionRead() != nil {
		permissionRead = object.GetPermissionRead().GetValue()
@@ -449,12 +503,12 @@ func storageWriteObject(ctx context.Context, logger *zap.Logger, tx *sql.Tx, aut
	query, params := getStorageWriteQuery(authoritativeWrite, object.GetVersion(), params)

	ack := &api.StorageObjectAck{}
	if ownerID != uuid.Nil {
		ack.UserId = ownerID.String()
	if ownerID != uuid.Nil.String() {
		ack.UserId = ownerID
	}

	if err := tx.QueryRowContext(ctx, query, params...).Scan(&ack.Collection, &ack.Key, &ack.Version); err != nil {
		if err != sql.ErrNoRows {
		if err == sql.ErrNoRows {
			logger.Debug("Could not write storage object.", zap.Error(err), zap.String("query", query), zap.Any("object", object))
		}

@@ -546,7 +600,10 @@ RETURNING collection, key, version`
	return query, params
}

func StorageDeleteObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, authoritativeDelete bool, userObjectIDs map[uuid.UUID][]*api.DeleteStorageObjectId) (codes.Code, error) {
func StorageDeleteObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, authoritativeDelete bool, ops StorageOpDeletes) (codes.Code, error) {
	// Ensure deletes are processed in a consistent order.
	sort.Sort(ops)

	tx, err := db.BeginTx(ctx, nil)
	if err != nil {
		logger.Error("Could not begin database transaction.", zap.Error(err))
@@ -554,23 +611,25 @@ func StorageDeleteObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, a
	}

	if err = crdb.ExecuteInTx(ctx, tx, func() error {
		for ownerID, objectIDs := range userObjectIDs {
			for _, objectID := range objectIDs {
				params := []interface{}{objectID.GetCollection(), objectID.GetKey(), ownerID}
				query := "DELETE FROM storage WHERE collection = $1 AND key = $2 AND user_id = $3"

				if !authoritativeDelete {
		for _, op := range ops {
			params := []interface{}{op.ObjectID.Collection, op.ObjectID.Key, op.OwnerID}
			var query string
			if authoritativeDelete {
				// Deleting from the runtime.
				query = "DELETE FROM storage WHERE collection = $1 AND key = $2 AND user_id = $3"
			} else {
				// Direct client request to delete.
				query = "DELETE FROM storage WHERE collection = $1 AND key = $2 AND user_id = $3 AND write > 0"
			}

				if objectID.GetVersion() != "" {
					params = append(params, objectID.Version)
			if op.ObjectID.GetVersion() != "" {
				// Conditional delete.
				params = append(params, op.ObjectID.Version)
				query += fmt.Sprintf(" AND version = $4")
			}

			result, err := tx.ExecContext(ctx, query, params...)
			if err != nil {
					logger.Debug("Could not delete storage object.", zap.Error(err), zap.String("query", query), zap.Any("object_id", objectID))
				logger.Debug("Could not delete storage object.", zap.Error(err), zap.String("query", query), zap.Any("object_id", op.ObjectID))
				return err
			}

@@ -578,7 +637,6 @@ func StorageDeleteObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, a
				return StatusError(codes.InvalidArgument, "Storage delete rejected.", errors.New("Storage delete rejected - not found, version check failed, or permission denied."))
			}
		}
		}
		return nil
	}); err != nil {
		if e, ok := err.(*statusError); ok {
+29 −15
Original line number Diff line number Diff line
@@ -19,6 +19,7 @@ import (
	"database/sql"
	"encoding/json"
	"fmt"
	"sort"
	"strconv"
	"strings"

@@ -123,6 +124,7 @@ func UpdateWallets(ctx context.Context, logger *zap.Logger, db *sql.DB, updates

		// Prepare the set of wallet updates and ledger updates.
		updatedWallets := make(map[string][]byte, len(updates))
		updateOrder := make([]string, 0, len(updates))
		if updateLedger {
			statements = make([]string, 0, len(updates))
			params = make([]interface{}, 0, len(updates)*4)
@@ -145,6 +147,7 @@ func UpdateWallets(ctx context.Context, logger *zap.Logger, db *sql.DB, updates
				return err
			}
			updatedWallets[userID] = walletData
			updateOrder = append(updateOrder, userID)

			// Prepare ledger updates if needed.
			if updateLedger {
@@ -159,9 +162,19 @@ func UpdateWallets(ctx context.Context, logger *zap.Logger, db *sql.DB, updates
			}
		}

		if len(updatedWallets) > 0 {
			// Ensure updates are done in natural order of user ID.
			sort.Strings(updateOrder)

			// Write the updated wallets.
			query = "UPDATE users SET update_time = now(), wallet = $2 WHERE id = $1"
		for userID, updatedWallet := range updatedWallets {
			for _, userID := range updateOrder {
				updatedWallet, ok := updatedWallets[userID]
				if !ok {
					// Should not happen.
					logger.Warn("Missing wallet update for user.", zap.String("user_id", userID))
					continue
				}
				_, err = tx.ExecContext(ctx, query, userID, updatedWallet)
				if err != nil {
					logger.Debug("Error writing user wallet.", zap.String("user_id", userID), zap.Error(err))
@@ -178,6 +191,7 @@ func UpdateWallets(ctx context.Context, logger *zap.Logger, db *sql.DB, updates
					return err
				}
			}
		}
		return nil
	}); err != nil {
		logger.Error("Error updating wallets.", zap.Error(err))
Loading