Commit ce963ea1 authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Do not return storage list cursor unless there are further objects.

parent 4b10efdc
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -27,6 +27,7 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr
- Handle query and parameter resets on wallet update retries.
- Reset list of friend IDs in Facebook import when retrying the operation.
- Reset notifications in friend add when retrying the operation.
- Do not return storage list cursor unless there are further objects.

## [2.5.1] - 2019-05-03
### Changed
+29 −24
Original line number Diff line number Diff line
@@ -147,7 +147,7 @@ func StorageListObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, cal

func StorageListObjectsAll(ctx context.Context, logger *zap.Logger, db *sql.DB, authoritative bool, collection string, limit int, cursor string, storageCursor *storageCursor) (*api.StorageObjectList, error) {
	cursorQuery := ""
	params := []interface{}{collection, limit}
	params := []interface{}{collection, limit + 1}
	if storageCursor != nil {
		cursorQuery = ` AND (collection, read, key, user_id) > ($1, 2, $3, $4) `
		params = append(params, storageCursor.Key, storageCursor.UserID)
@@ -173,7 +173,7 @@ LIMIT $2`
		rows, err := db.QueryContext(ctx, query, params...)
		if err != nil {
			if err == sql.ErrNoRows {
				objects = &api.StorageObjectList{Objects: make([]*api.StorageObject, 0), Cursor: cursor}
				objects = &api.StorageObjectList{Objects: make([]*api.StorageObject, 0)}
				return nil
			} else {
				logger.Error("Could not list storage.", zap.Error(err), zap.String("collection", collection), zap.Int("limit", limit), zap.String("cursor", cursor))
@@ -182,7 +182,7 @@ LIMIT $2`
		}
		// rows.Close() called in storageListObjects

		objects, err = storageListObjects(rows, cursor)
		objects, err = storageListObjects(rows, limit)
		if err != nil {
			logger.Error("Could not list storage.", zap.Error(err), zap.String("collection", collection), zap.Int("limit", limit), zap.String("cursor", cursor))
			return err
@@ -195,7 +195,7 @@ LIMIT $2`

func StorageListObjectsPublicReadUser(ctx context.Context, logger *zap.Logger, db *sql.DB, userID uuid.UUID, collection string, limit int, cursor string, storageCursor *storageCursor) (*api.StorageObjectList, error) {
	cursorQuery := ""
	params := []interface{}{collection, userID, limit}
	params := []interface{}{collection, userID, limit + 1}
	if storageCursor != nil {
		cursorQuery = ` AND (collection, read, key, user_id) > ($1, 2, $4, $5) `
		params = append(params, storageCursor.Key, storageCursor.UserID)
@@ -212,7 +212,7 @@ LIMIT $3`
		rows, err := db.QueryContext(ctx, query, params...)
		if err != nil {
			if err == sql.ErrNoRows {
				objects = &api.StorageObjectList{Objects: make([]*api.StorageObject, 0), Cursor: cursor}
				objects = &api.StorageObjectList{Objects: make([]*api.StorageObject, 0)}
				return nil
			} else {
				logger.Error("Could not list storage.", zap.Error(err), zap.String("collection", collection), zap.Int("limit", limit), zap.String("cursor", cursor))
@@ -221,7 +221,7 @@ LIMIT $3`
		}
		// rows.Close() called in storageListObjects

		objects, err = storageListObjects(rows, cursor)
		objects, err = storageListObjects(rows, limit)
		if err != nil {
			logger.Error("Could not list storage.", zap.Error(err), zap.String("collection", collection), zap.Int("limit", limit), zap.String("cursor", cursor))
			return err
@@ -234,7 +234,7 @@ LIMIT $3`

func StorageListObjectsUser(ctx context.Context, logger *zap.Logger, db *sql.DB, authoritative bool, userID uuid.UUID, collection string, limit int, cursor string, storageCursor *storageCursor) (*api.StorageObjectList, error) {
	cursorQuery := ""
	params := []interface{}{collection, userID, limit}
	params := []interface{}{collection, userID, limit + 1}
	if storageCursor != nil {
		cursorQuery = ` AND (collection, read, key, user_id) > ($1, $4, $5, $6) `
		params = append(params, storageCursor.Read, storageCursor.Key, storageCursor.UserID)
@@ -259,7 +259,7 @@ LIMIT $3`
		rows, err := db.QueryContext(ctx, query, params...)
		if err != nil {
			if err == sql.ErrNoRows {
				objects = &api.StorageObjectList{Objects: make([]*api.StorageObject, 0), Cursor: cursor}
				objects = &api.StorageObjectList{Objects: make([]*api.StorageObject, 0)}
				return nil
			} else {
				logger.Error("Could not list storage.", zap.Error(err), zap.String("collection", collection), zap.Int("limit", limit), zap.String("cursor", cursor))
@@ -268,7 +268,7 @@ LIMIT $3`
		}
		// rows.Close() called in storageListObjects

		objects, err = storageListObjects(rows, cursor)
		objects, err = storageListObjects(rows, limit)
		if err != nil {
			logger.Error("Could not list storage.", zap.Error(err), zap.String("collection", collection), zap.Int("limit", limit), zap.String("cursor", cursor))
			return err
@@ -326,9 +326,24 @@ WHERE user_id = $1`
	return objects, err
}

func storageListObjects(rows *sql.Rows, cursor string) (*api.StorageObjectList, error) {
	objects := make([]*api.StorageObject, 0)
func storageListObjects(rows *sql.Rows, limit int) (*api.StorageObjectList, error) {
	var lastObject *api.StorageObject
	var newCursor *storageCursor
	objects := make([]*api.StorageObject, 0, limit)
	for rows.Next() {
		// If we've read enough, but there is at least 1 more, use the last read as the cursor and stop here.
		if len(objects) >= limit && lastObject != nil {
			newCursor = &storageCursor{
				Key:  lastObject.Key,
				Read: lastObject.PermissionRead,
			}
			if lastObject.UserId != "" {
				newCursor.UserID = uuid.FromStringOrNil(lastObject.UserId)
			}
			break
		}

		// There is still room for more objects, read the next one.
		o := &api.StorageObject{CreateTime: &timestamp.Timestamp{}, UpdateTime: &timestamp.Timestamp{}}
		var createTime pgtype.Timestamptz
		var updateTime pgtype.Timestamptz
@@ -342,6 +357,7 @@ func storageListObjects(rows *sql.Rows, cursor string) (*api.StorageObjectList,
		o.UpdateTime.Seconds = updateTime.Time.Unix()

		objects = append(objects, o)
		lastObject = o
	}
	_ = rows.Close()

@@ -349,22 +365,11 @@ func storageListObjects(rows *sql.Rows, cursor string) (*api.StorageObjectList,
		return nil, rows.Err()
	}

	// Prepare the response and include the cursor, if any.
	objectList := &api.StorageObjectList{
		Objects: objects,
		Cursor:  cursor,
	}

	if len(objects) > 0 {
		lastObject := objects[len(objects)-1]
		newCursor := &storageCursor{
			Key:  lastObject.Key,
			Read: lastObject.PermissionRead,
		}

		if lastObject.UserId != "" {
			newCursor.UserID = uuid.FromStringOrNil(lastObject.UserId)
		}

	if newCursor != nil {
		cursorBuf := new(bytes.Buffer)
		if err := gob.NewEncoder(cursorBuf).Encode(newCursor); err != nil {
			return nil, err
+99 −2
Original line number Diff line number Diff line
@@ -1857,7 +1857,7 @@ func TestStorageListRuntimeUser(t *testing.T) {
	assert.Equal(t, codes.OK, code, "code was not OK")
	assert.NotNil(t, list, "list was nil")
	assert.Len(t, list.Objects, 3, "values length was not 3")
	assert.NotEmpty(t, list.Cursor, "cursor was empty")
	assert.Empty(t, list.Cursor, "cursor was not empty")
}

func TestStorageListPipelineUserSelf(t *testing.T) {
@@ -1916,7 +1916,7 @@ func TestStorageListPipelineUserSelf(t *testing.T) {
	assert.Len(t, list.Objects, 2, "values length was not 2")
	assert.Equal(t, "a", list.Objects[0].Key, "values[0].Key was not a")
	assert.Equal(t, "b", list.Objects[1].Key, "values[1].Key was not b")
	assert.NotEmpty(t, list.Cursor, "cursor was empty")
	assert.Empty(t, list.Cursor, "cursor was not empty")
}

func TestStorageListPipelineUserOther(t *testing.T) {
@@ -1975,3 +1975,100 @@ func TestStorageListPipelineUserOther(t *testing.T) {
	assert.Len(t, values.Objects, 0, "values length was not 0")
	assert.Equal(t, "", values.Cursor, "cursor was not nil")
}

func TestStorageListNoRepeats(t *testing.T) {
	db := NewDB(t)
	defer db.Close()

	uid := uuid.Must(uuid.NewV4())
	InsertUser(t, db, uid)
	collection := GenerateString()

	ops := server.StorageOpWrites{
		&server.StorageOpWrite{
			OwnerID: uid.String(),
			Object: &api.WriteStorageObject{
				Collection:      collection,
				Key:             "1",
				Value:           "{}",
				PermissionRead:  &wrappers.Int32Value{Value: 2},
				PermissionWrite: &wrappers.Int32Value{Value: 1},
			},
		},
		&server.StorageOpWrite{
			OwnerID: uid.String(),
			Object: &api.WriteStorageObject{
				Collection:      collection,
				Key:             "2",
				Value:           "{}",
				PermissionRead:  &wrappers.Int32Value{Value: 2},
				PermissionWrite: &wrappers.Int32Value{Value: 1},
			},
		},
		&server.StorageOpWrite{
			OwnerID: uid.String(),
			Object: &api.WriteStorageObject{
				Collection:      collection,
				Key:             "3",
				Value:           "{}",
				PermissionRead:  &wrappers.Int32Value{Value: 2},
				PermissionWrite: &wrappers.Int32Value{Value: 1},
			},
		},
		&server.StorageOpWrite{
			OwnerID: uid.String(),
			Object: &api.WriteStorageObject{
				Collection:      collection,
				Key:             "4",
				Value:           "{}",
				PermissionRead:  &wrappers.Int32Value{Value: 2},
				PermissionWrite: &wrappers.Int32Value{Value: 1},
			},
		},
		&server.StorageOpWrite{
			OwnerID: uid.String(),
			Object: &api.WriteStorageObject{
				Collection:      collection,
				Key:             "5",
				Value:           "{}",
				PermissionRead:  &wrappers.Int32Value{Value: 2},
				PermissionWrite: &wrappers.Int32Value{Value: 1},
			},
		},
		&server.StorageOpWrite{
			OwnerID: uid.String(),
			Object: &api.WriteStorageObject{
				Collection:      collection,
				Key:             "6",
				Value:           "{}",
				PermissionRead:  &wrappers.Int32Value{Value: 2},
				PermissionWrite: &wrappers.Int32Value{Value: 1},
			},
		},
		&server.StorageOpWrite{
			OwnerID: uid.String(),
			Object: &api.WriteStorageObject{
				Collection:      collection,
				Key:             "7",
				Value:           "{}",
				PermissionRead:  &wrappers.Int32Value{Value: 2},
				PermissionWrite: &wrappers.Int32Value{Value: 1},
			},
		},
	}

	acks, code, err := server.StorageWriteObjects(context.Background(), logger, db, false, ops)

	assert.Nil(t, err, "err was not nil")
	assert.Equal(t, codes.OK, code, "code was not OK")
	assert.NotNil(t, acks, "acks was nil")
	assert.Len(t, acks.Acks, 7, "acks length was not 7")

	values, code, err := server.StorageListObjects(context.Background(), logger, db, uuid.Must(uuid.NewV4()), &uid, collection, 10, "")

	assert.Nil(t, err, "err was not nil")
	assert.Equal(t, codes.OK, code, "code was not OK")
	assert.NotNil(t, values, "values was nil")
	assert.Len(t, values.Objects, 7, "values length was not 7")
	assert.Equal(t, "", values.Cursor, "cursor was not nil")
}