Commit b2bdc11a authored by Fernando Takagi's avatar Fernando Takagi
Browse files

Dynamic query assembling.

parent 56814441
Loading
Loading
Loading
Loading
+113 −30
Original line number Diff line number Diff line
@@ -23,6 +23,7 @@ import (
	"encoding/gob"
	"encoding/hex"
	"errors"
	"fmt"
	"sort"
	"time"

@@ -415,12 +416,109 @@ func storageListObjects(rows *sql.Rows, limit int) (*api.StorageObjectList, erro
	return objectList, nil
}

type storageQueryArg struct {
	name   string
	dbType string
	param  any
}

func StorageReadObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, caller uuid.UUID, objectIDs []*api.ReadStorageObjectId) (*api.StorageObjects, error) {
	collectionParam := make([]string, 0, len(objectIDs))
	keyParam := make([]string, 0, len(objectIDs))
	userIdParam := make([]uuid.UUID, 0, len(objectIDs))

	// When selecting variable number of object we'd like to keep number of
	collectionSet := make(map[string]struct{})
	keySet := make(map[string]struct{})
	userIdSet := make(map[uuid.UUID]struct{})

	isSingleCollection := true
	isSingleKey := true
	isSingleUserId := true

	multipleArgs := make([]storageQueryArg, 0, 3)
	singleArgs := make([]storageQueryArg, 0, 3)

	for _, id := range objectIDs {
		collectionParam = append(collectionParam, id.Collection)
		if isSingleCollection {
			_, ok := collectionSet[id.Collection]
			if !ok {
				collectionSet[id.Collection] = struct{}{}
			} else {
				isSingleCollection = false
				collectionSet = make(map[string]struct{})
				multipleArgs = append(multipleArgs, storageQueryArg{name: "collection", dbType: "text[]", param: collectionParam})
			}
		}

		keyParam = append(keyParam, id.Key)
		if isSingleKey {
			_, ok := keySet[id.Key]
			if !ok {
				keySet[id.Key] = struct{}{}
			} else {
				isSingleKey = false
				keySet = make(map[string]struct{})
				multipleArgs = append(multipleArgs, storageQueryArg{name: "key", dbType: "text[]", param: keyParam})
			}
		}

		var reqUid uuid.UUID
		if uid := id.GetUserId(); uid != "" {
			if uid, err := uuid.FromString(uid); err == nil {
				reqUid = uid
			} else {
				logger.Error("Could not read storage objects. Unable to parse requested user_id", zap.Error(err))
				return nil, err
			}
		}
		userIdParam = append(userIdParam, reqUid)
		if isSingleUserId {
			_, ok := userIdSet[reqUid]
			if !ok {
				userIdSet[reqUid] = struct{}{}
			} else {
				isSingleUserId = false
				userIdSet = make(map[uuid.UUID]struct{})
				multipleArgs = append(multipleArgs, storageQueryArg{name: "user_id", dbType: "uuid[]", param: userIdParam})
			}
		}
	}

	if isSingleCollection {
		singleArgs = append(singleArgs, storageQueryArg{name: "collection", param: collectionParam[0]})
	}
	if isSingleKey {
		singleArgs = append(singleArgs, storageQueryArg{name: "key", param: keyParam[0]})
	}
	if isSingleUserId {
		singleArgs = append(singleArgs, storageQueryArg{name: "user_id", param: userIdParam[0]})
	}

	var query string
	var params []any
	switch len(multipleArgs) {
	case 0:
		query = fmt.Sprintf(`
SELECT collection, key, user_id, value, version, read, write, create_time, update_time FROM storage WHERE %s = $1 AND %s = $2 and %s = $3
`, singleArgs[0].name, singleArgs[1].name, singleArgs[2].name)
		params = []any{singleArgs[0].param, singleArgs[1].param, singleArgs[2].param}
	case 1:
		query = fmt.Sprintf(`
SELECT collection, key, user_id, value, version, read, write, create_time, update_time FROM storage WHERE %s = $1 AND %s = $2 and %s = ANY($3::%s)
`, singleArgs[0].name, singleArgs[1].name, multipleArgs[0].name, multipleArgs[0].dbType)
		params = []any{singleArgs[0].param, singleArgs[1].param, multipleArgs[0].param}
	case 2:
		query = fmt.Sprintf(`
SELECT collection, key, user_id, value, version, read, write, create_time, update_time FROM storage NATURAL JOIN ROWS FROM (
  unnest($1::%s),
  unnest($2::%s)
) t(%s, %s)
WHERE %s = $3
`, multipleArgs[0].dbType, multipleArgs[1].dbType, multipleArgs[0].name, multipleArgs[1].name, singleArgs[0].name)
		params = []any{multipleArgs[0].param, multipleArgs[1].param, singleArgs[0].param}
	case 3:
		// When selecting a variable number of objects we'd like to keep number of
		// SQL query arguments constant, otherwise query statistics explode, because
		// from PostgreSQL perspective query with different number of arguments is a distinct query
		//
@@ -434,14 +532,18 @@ func StorageReadObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, cal
		// ) v(a, b)
		//
		// This way regardless of how many objects we query, we pass same number of args: one per column
	query := `SELECT collection, key, user_id, value, version, read, write, create_time, update_time
		FROM storage
		NATURAL JOIN ROWS FROM (
			unnest($1::text[]),
			unnest($2::text[]),
			unnest($3::uuid[])
		) v(collection, key, user_id)
	`
		query = fmt.Sprintf(`
SELECT collection, key, user_id, value, version, read, write, create_time, update_time FROM storage NATURAL JOIN ROWS FROM (
  unnest($1::%s),
  unnest($2::%s),
  unnest($3::%s)
) t(%s, %s, %s)
`, multipleArgs[0].dbType, multipleArgs[1].dbType, multipleArgs[2].dbType, multipleArgs[0].name, multipleArgs[1].name, multipleArgs[2].name)
		params = []any{multipleArgs[0].param, multipleArgs[1].param, multipleArgs[2].param}
	default:
		logger.Error("Unexpected code path.", zap.Int("multipleArgs", len(multipleArgs)))
		return nil, errors.New("unexpected code path")
	}

	if caller != uuid.Nil {
		// Caller is not nil: either read public (read=2) object from requested user
@@ -449,25 +551,6 @@ func StorageReadObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, cal
		query += `
		WHERE (read = 2 or (read = 1 and storage.user_id = $4))
		`
	}

	for _, id := range objectIDs {
		collectionParam = append(collectionParam, id.Collection)
		keyParam = append(keyParam, id.Key)
		var reqUid uuid.UUID
		if uid := id.GetUserId(); uid != "" {
			if uid, err := uuid.FromString(uid); err == nil {
				reqUid = uid
			} else {
				logger.Error("Could not read storage objects. Unable to parse requested user_id", zap.Error(err))
				return nil, err
			}
		}
		userIdParam = append(userIdParam, reqUid)
	}

	params := []interface{}{collectionParam, keyParam, userIdParam}
	if caller != uuid.Nil {
		params = append(params, caller)
	}