Commit 391c5353 authored by Maxim Ivanov's avatar Maxim Ivanov
Browse files

Remove use of read column in pagination queries

parent 3f1fc4f2
Loading
Loading
Loading
Loading
+6 −0
Original line number Diff line number Diff line
ALTER TABLE storage
  drop constraint storage_pkey, add primary key using index "storage_collection_key_user_id_key";

DROP INDEX storage_pkey RESTRICT;

CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS collection_user_id_key_idx ON storage (collection, user_id, key);
+13 −18
Original line number Diff line number Diff line
@@ -151,16 +151,10 @@ func StorageListObjectsAll(ctx context.Context, logger *zap.Logger, db *sql.DB,
	cursorQuery := ""
	params := []interface{}{collection, limit + 1}
	if storageCursor != nil {
		if authoritative {
			// Authoritative listings observe the read permission in the cursor.
			cursorQuery = ` AND (collection, read, key, user_id) > ($1, $3, $4, $5) `
			params = append(params, storageCursor.Read, storageCursor.Key, storageCursor.UserID)
		} else {
			// Non-authoritative listings can only ever list for read permission 2 in this type of listing.
			cursorQuery = ` AND (collection, read, key, user_id) > ($1, 2, $3, $4) `
		//TODO: check if we can reorder collection,user_id, key
		cursorQuery = ` AND (collection, key, user_id) > ($1, $3, $4) `
		params = append(params, storageCursor.Key, storageCursor.UserID)
	}
	}

	var query string
	if authoritative {
@@ -168,14 +162,14 @@ func StorageListObjectsAll(ctx context.Context, logger *zap.Logger, db *sql.DB,
SELECT collection, key, user_id, value, version, read, write, create_time, update_time
FROM storage
WHERE collection = $1` + cursorQuery + `
ORDER BY read ASC, key ASC, user_id ASC
ORDER BY key ASC, user_id ASC
LIMIT $2`
	} else {
		query = `
SELECT collection, key, user_id, value, version, read, write, create_time, update_time
FROM storage
WHERE collection = $1 AND read >= 2` + cursorQuery + `
ORDER BY read ASC, key ASC, user_id ASC
ORDER BY key ASC, user_id ASC
LIMIT $2`
	}

@@ -208,7 +202,7 @@ func StorageListObjectsPublicReadUser(ctx context.Context, logger *zap.Logger, d
	params := []interface{}{collection, userID, limit + 1}
	if storageCursor != nil {
		// Ignore cursor read permission and user ID, the listing operation itself is only scoped to one user and public read permission.
		cursorQuery = ` AND (collection, read, user_id, key) > ($1, 2, $2, $4) `
		cursorQuery = ` AND key > $4 `
		params = append(params, storageCursor.Key)
	}

@@ -248,23 +242,23 @@ func StorageListObjectsUser(ctx context.Context, logger *zap.Logger, db *sql.DB,
	params := []interface{}{collection, userID, limit + 1}
	if storageCursor != nil {
		// User ID is always a known user based on the type of the listing operation.
		cursorQuery = ` AND (collection, user_id, read, key) > ($1, $2, $4, $5) `
		params = append(params, storageCursor.Read, storageCursor.Key)
		cursorQuery = ` AND key > $5 `
		params = append(params, storageCursor.Key)
	}

	query := `
SELECT collection, key, user_id, value, version, read, write, create_time, update_time
FROM storage
WHERE collection = $1 AND user_id = $2 AND read >= 1 ` + cursorQuery + `
ORDER BY read ASC, key ASC
ORDER BY key ASC
LIMIT $3`
	if authoritative {
		// List across all read permissions.
		query = `
SELECT collection, key, user_id, value, version, read, write, create_time, update_time
FROM storage
WHERE collection = $1 AND user_id = $2 AND read >= 0 ` + cursorQuery + `
ORDER BY read ASC, key ASC
WHERE collection = $1 AND user_id = $2` + cursorQuery + `
ORDER BY key ASC
LIMIT $3`
	}

@@ -414,6 +408,7 @@ func StorageReadObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, cal
			whereClause += fmt.Sprintf(" (collection = $%v AND key = $%v AND user_id = $%v AND read = 2) ", l+1, l+2, l+3)
			params = append(params, id.Collection, id.Key, uuid.Nil)
		} else {
			// Either public object from any user or private object from the caller
			whereClause += fmt.Sprintf(" (collection = $%v AND key = $%v AND user_id = $%v AND (read = 2 OR (read = 1 AND user_id = $%v))) ", l+1, l+2, l+3, l+4)
			params = append(params, id.Collection, id.Key, id.UserId, caller)
		}
@@ -596,7 +591,7 @@ func storageWriteObject(ctx context.Context, logger *zap.Logger, metrics Metrics
	case !dbVersion.Valid && object.Version == "":
		// An existing storage object was not present, and no OCC of any kind is specified.
		// Separate to the case above to handle concurrent non-OCC object creations, where all but the first must become updates.
		query = "INSERT INTO storage (collection, key, user_id, value, version, read, write, create_time, update_time) VALUES ($1, $2, $3::UUID, $4, $5, $6, $7, now(), now()) ON CONFLICT (collection, read, key, user_id) DO UPDATE SET value = $4, version = $5, read = $6, write = $7, update_time = now()"
		query = "INSERT INTO storage (collection, key, user_id, value, version, read, write, create_time, update_time) VALUES ($1, $2, $3::UUID, $4, $5, $6, $7, now(), now()) ON CONFLICT (collection, key, user_id) DO UPDATE SET value = $4, version = $5, read = $6, write = $7, update_time = now()"
		// Respect permissions in non-authoritative writes, where this operation also loses the race to insert the object.
		if !authoritativeWrite {
			query += " WHERE storage.write = 1"