Commit 6bf1d996 authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Return more specific error messages from storage write operations.

parent 971a7d12
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -45,6 +45,7 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr
- All schema and query statements that use the '1970-01-01 00:00:00' constant now specify UTC timezone.
- Storage write error message are more descriptive for when values must be encoded JSON objects.
- Storage listing operations now treat empty owner IDs as listing across all data rather than system-owned data.
- Storage write operations now return more specific error messages.

### Fixed
- CRON expressions for leaderboard and tournament resets now allow concurrent usage safely.
+75 −97
Original line number Diff line number Diff line
@@ -16,6 +16,7 @@ package server

import (
	"bytes"
	"crypto/md5"
	"database/sql"
	"encoding/base64"
	"encoding/gob"
@@ -34,6 +35,12 @@ import (
	"google.golang.org/grpc/codes"
)

var (
	ErrStorageRejectedVersion    = errors.New("Storage write rejected - version check failed.")
	ErrStorageRejectedPermission = errors.New("Storage write rejected - permission denied.")
	ErrStorageWriteFailed        = errors.New("Storage write failed.")
)

type storageCursor struct {
	Key    string
	UserID uuid.UUID
@@ -466,8 +473,8 @@ func StorageWriteObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, au
		for _, op := range ops {
			ack, writeErr := storageWriteObject(ctx, logger, tx, authoritativeWrite, op.OwnerID, op.Object)
			if writeErr != nil {
				if writeErr == sql.ErrNoRows {
					return StatusError(codes.InvalidArgument, "Storage write rejected.", errors.New("Storage write rejected - not found, version check failed, or permission denied."))
				if writeErr == ErrStorageRejectedVersion || writeErr == ErrStorageRejectedPermission {
					return StatusError(codes.InvalidArgument, "Storage write rejected.", writeErr)
				}

				logger.Debug("Error writing storage objects.", zap.Error(err))
@@ -489,115 +496,86 @@ func StorageWriteObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, au
}

func storageWriteObject(ctx context.Context, logger *zap.Logger, tx *sql.Tx, authoritativeWrite bool, ownerID string, object *api.WriteStorageObject) (*api.StorageObjectAck, error) {
	permissionRead := int32(1)
	if object.GetPermissionRead() != nil {
		permissionRead = object.GetPermissionRead().GetValue()
	var dbVersion sql.NullString
	var dbPermissionWrite sql.NullInt64
	var dbPermissionRead sql.NullInt64
	err := tx.QueryRowContext(ctx, "SELECT version, read, write FROM storage WHERE collection = $1 AND key = $2 AND user_id = $3", object.Collection, object.Key, ownerID).Scan(&dbVersion, &dbPermissionRead, &dbPermissionWrite)
	if err != nil {
		if err == sql.ErrNoRows {
			if object.Version != "" && object.Version != "*" {
				// Conditional write with a specific version but the object did not exist at all.
				return nil, ErrStorageRejectedVersion
			}
		} else {
			logger.Debug("Error in write storage object pre-flight.", zap.Any("object", object), zap.Error(err))
			return nil, err
		}
	}

	permissionWrite := int32(1)
	if object.GetPermissionWrite() != nil {
		permissionWrite = object.GetPermissionWrite().GetValue()
	if dbVersion.Valid && (object.Version == "*" || (object.Version != "" && object.Version != dbVersion.String)) {
		// An object existed and it's a conditional write that either:
		// - Expects no object.
		// - Or expects a given version bit it does not match.
		return nil, ErrStorageRejectedVersion
	}

	params := []interface{}{object.GetCollection(), object.GetKey(), object.GetValue(), object.GetValue(), permissionRead, permissionWrite, ownerID}
	query, params := getStorageWriteQuery(authoritativeWrite, object.GetVersion(), params)
	if dbPermissionWrite.Valid && dbPermissionWrite.Int64 == 0 && !authoritativeWrite {
		// Non-authoritative write to an existing storage object with permission 0.
		return nil, ErrStorageRejectedPermission
	}

	ack := &api.StorageObjectAck{}
	newVersion := fmt.Sprintf("%x", md5.Sum([]byte(object.Value)))
	newPermissionRead := int32(1)
	if object.PermissionRead != nil {
		newPermissionRead = object.PermissionRead.Value
	}
	newPermissionWrite := int32(1)
	if object.PermissionWrite != nil {
		newPermissionWrite = object.PermissionWrite.Value
	}

	if dbVersion.Valid && dbVersion.String == newVersion && dbPermissionRead.Int64 == int64(newPermissionRead) && dbPermissionWrite.Int64 == int64(newPermissionWrite) {
		// Stored object existed, and exactly matches the new object's version and read/write permissions.
		ack := &api.StorageObjectAck{
			Collection: object.Collection,
			Key:        object.Key,
			Version:    newVersion,
		}
		if ownerID != uuid.Nil.String() {
			ack.UserId = ownerID
		}
		return ack, nil
	}

	if err := tx.QueryRowContext(ctx, query, params...).Scan(&ack.Collection, &ack.Key, &ack.Version); err != nil {
		if err == sql.ErrNoRows {
			logger.Debug("Could not write storage object.", zap.Error(err), zap.String("query", query), zap.Any("object", object))
	var query string
	if dbVersion.Valid {
		// Updating an existing storage object.
		query = "UPDATE storage SET value = $4, version = $5, read = $6, write = $7, update_time = now() WHERE collection = $1 AND key = $2 AND user_id = $3::UUID"
	} else {
		// Inserting a new storage object.
		query = "INSERT INTO storage (collection, key, user_id, value, version, read, write, create_time, update_time) VALUES ($1, $2, $3::UUID, $4, $5, $6, $7, now(), now())"
	}

	res, err := tx.ExecContext(ctx, query, object.Collection, object.Key, ownerID, object.Value, newVersion, newPermissionRead, newPermissionWrite)
	if err != nil {
		logger.Debug("Could not write storage object, exec error.", zap.Any("object", object), zap.String("query", query), zap.Error(err))
		return nil, err
	}

	return ack, nil
	if rowsAffected, _ := res.RowsAffected(); rowsAffected != 1 {
		logger.Debug("Could not write storage object, rowsAffected error.", zap.Any("object", object), zap.String("query", query), zap.Error(err))
		return nil, ErrStorageWriteFailed
	}

func getStorageWriteQuery(authoritativeWrite bool, version string, params []interface{}) (string, []interface{}) {
	query := ""

	// Write storage objects authoritatively, disregarding permissions.
	if authoritativeWrite {
		if version == "" {
			query = `
INSERT INTO storage (collection, key, value, version, read, write, create_time, update_time, user_id)
SELECT $1, $2, $3, md5($4::VARCHAR), $5, $6, now(), now(), $7::UUID
ON CONFLICT (collection, key, user_id)
DO UPDATE SET value = $3, version = md5($4::VARCHAR), read = $5, write = $6, update_time = now()
RETURNING collection, key, version`
		} else if version == "*" { // if-none-match
			query = `
INSERT INTO storage (collection, key, value, version, read, write, create_time, update_time, user_id)
SELECT $1, $2, $3, md5($4::VARCHAR), $5, $6, now(), now(), $7::UUID
WHERE NOT EXISTS
	(SELECT key FROM storage
		WHERE user_id = $7::UUID
		AND collection = $1::VARCHAR
		AND key = $2::VARCHAR)
RETURNING collection, key, version`
		} else { // if-match
			params = append(params, version)
			query = `
INSERT INTO storage (collection, key, value, version, read, write, create_time, update_time, user_id)
SELECT $1, $2, $3, md5($4::VARCHAR), $5, $6, now(), now(), $7::UUID
WHERE EXISTS
	(SELECT key FROM storage
		WHERE user_id = $7::UUID
		AND collection = $1::VARCHAR
		AND key = $2::VARCHAR
		AND version = $8::VARCHAR)
ON CONFLICT (collection, key, user_id)
DO UPDATE SET value = $3, version = md5($4::VARCHAR), read = $5, write = $6, update_time = now()
RETURNING collection, key, version`
	ack := &api.StorageObjectAck{
		Collection: object.Collection,
		Key:        object.Key,
		Version:    newVersion,
	}
	} else {
		if version == "" {
			query = `
INSERT INTO storage (collection, key, value, version, read, write, create_time, update_time, user_id)
SELECT $1, $2, $3, md5($4::VARCHAR), $5, $6, now(), now(), $7::UUID
WHERE NOT EXISTS
	(SELECT key FROM storage
		WHERE user_id = $7::UUID
		AND collection = $1::VARCHAR
		AND key = $2::VARCHAR
		AND write = 0)
ON CONFLICT (collection, key, user_id)
DO UPDATE SET value = $3, version = md5($4::VARCHAR), read = $5, write = $6, update_time = now()
RETURNING collection, key, version`
		} else if version == "*" { // if-none-match
			query = `
INSERT INTO storage (collection, key, value, version, read, write, create_time, update_time, user_id)
SELECT $1, $2, $3, md5($4::VARCHAR), $5, $6, now(), now(), $7::UUID
WHERE NOT EXISTS
	(SELECT key FROM storage
		WHERE user_id = $7::UUID
		AND collection = $1::VARCHAR
		AND key = $2::VARCHAR)
RETURNING collection, key, version`
		} else { // if-match
			params = append(params, version)
			query = `
INSERT INTO storage (collection, key, value, version, read, write, create_time, update_time, user_id)
SELECT $1, $2, $3, md5($4::VARCHAR), $5, $6, now(), now(), $7::UUID
WHERE EXISTS
	(SELECT key FROM storage
		WHERE user_id = $7::UUID
		AND collection = $1::VARCHAR
		AND key = $2::VARCHAR
		AND version = $8::VARCHAR
		AND write = 1)
ON CONFLICT (collection, key, user_id)
DO UPDATE SET value = $3, version = md5($4::VARCHAR), read = $5, write = $6, update_time = now()
RETURNING collection, key, version`
		}
	}

	return query, params
	if ownerID != uuid.Nil.String() {
		ack.UserId = ownerID
	}

	return ack, nil
}

func StorageDeleteObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, authoritativeDelete bool, ops StorageOpDeletes) (codes.Code, error) {
+8 −8
Original line number Diff line number Diff line
@@ -152,7 +152,7 @@ func TestStorageWriteRuntimeGlobalSingleIfMatchNotExists(t *testing.T) {
	assert.Nil(t, acks, "acks was not nil")
	assert.Equal(t, codes.InvalidArgument, code, "code did not match")
	assert.NotNil(t, err, "err was nil")
	assert.Equal(t, "Storage write rejected - not found, version check failed, or permission denied.", err.Error(), "error message did not match")
	assert.Equal(t, "Storage write rejected - version check failed.", err.Error(), "error message did not match")
}

func TestStorageWriteRuntimeGlobalSingleIfMatchExists(t *testing.T) {
@@ -256,7 +256,7 @@ func TestStorageWriteRuntimeGlobalSingleIfMatchExistsFail(t *testing.T) {
	assert.Nil(t, acks, "acks was not nil")
	assert.Equal(t, codes.InvalidArgument, code, "code did not match")
	assert.NotNil(t, err, "err was nil")
	assert.Equal(t, "Storage write rejected - not found, version check failed, or permission denied.", err.Error(), "error message did not match")
	assert.Equal(t, "Storage write rejected - version check failed.", err.Error(), "error message did not match")
}

func TestStorageWriteRuntimeGlobalSingleIfNoneMatchNotExists(t *testing.T) {
@@ -334,7 +334,7 @@ func TestStorageWriteRuntimeGlobalSingleIfNoneMatchExists(t *testing.T) {
	assert.Nil(t, acks, "acks was not nil")
	assert.Equal(t, codes.InvalidArgument, code, "code did not match")
	assert.NotNil(t, err, "err was nil")
	assert.Equal(t, "Storage write rejected - not found, version check failed, or permission denied.", err.Error(), "error message did not match")
	assert.Equal(t, "Storage write rejected - version check failed.", err.Error(), "error message did not match")
}

func TestStorageWriteRuntimeGlobalMultipleIfMatchNotExists(t *testing.T) {
@@ -370,7 +370,7 @@ func TestStorageWriteRuntimeGlobalMultipleIfMatchNotExists(t *testing.T) {
	assert.Nil(t, acks, "acks was not nil")
	assert.Equal(t, codes.InvalidArgument, code, "code did not match")
	assert.NotNil(t, err, "err was nil")
	assert.Equal(t, "Storage write rejected - not found, version check failed, or permission denied.", err.Error(), "error message did not match")
	assert.Equal(t, "Storage write rejected - version check failed.", err.Error(), "error message did not match")
}

func TestStorageWritePipelineUserSingle(t *testing.T) {
@@ -635,7 +635,7 @@ func TestStorageWritePipelineIfMatchNotExists(t *testing.T) {
	assert.Nil(t, acks, "acks was not nil")
	assert.Equal(t, codes.InvalidArgument, code, "code did not match")
	assert.NotNil(t, err, "err was nil")
	assert.Equal(t, "Storage write rejected - not found, version check failed, or permission denied.", err.Error(), "error message did not match")
	assert.Equal(t, "Storage write rejected - version check failed.", err.Error(), "error message did not match")
}

func TestStorageWritePipelineIfMatchExistsFail(t *testing.T) {
@@ -687,7 +687,7 @@ func TestStorageWritePipelineIfMatchExistsFail(t *testing.T) {
	assert.Nil(t, acks, "acks was not nil")
	assert.Equal(t, codes.InvalidArgument, code, "code did not match")
	assert.NotNil(t, err, "err was nil")
	assert.Equal(t, "Storage write rejected - not found, version check failed, or permission denied.", err.Error(), "error message did not match")
	assert.Equal(t, "Storage write rejected - version check failed.", err.Error(), "error message did not match")
}

func TestStorageWritePipelineIfMatchExists(t *testing.T) {
@@ -828,7 +828,7 @@ func TestStorageWritePipelineIfNoneMatchExists(t *testing.T) {
	assert.Nil(t, acks, "acks was not nil")
	assert.Equal(t, codes.InvalidArgument, code, "code did not match")
	assert.NotNil(t, err, "err was nil")
	assert.Equal(t, "Storage write rejected - not found, version check failed, or permission denied.", err.Error(), "error message did not match")
	assert.Equal(t, "Storage write rejected - version check failed.", err.Error(), "error message did not match")
}

func TestStorageWritePipelinePermissionFail(t *testing.T) {
@@ -880,7 +880,7 @@ func TestStorageWritePipelinePermissionFail(t *testing.T) {
	assert.Nil(t, acks, "acks was not nil")
	assert.Equal(t, codes.InvalidArgument, code, "code did not match")
	assert.NotNil(t, err, "err was nil")
	assert.Equal(t, "Storage write rejected - not found, version check failed, or permission denied.", err.Error(), "error message did not match")
	assert.Equal(t, "Storage write rejected - permission denied.", err.Error(), "error message did not match")
}

func TestStorageFetchRuntimeGlobalPrivate(t *testing.T) {