Commit 93c360a7 authored by Andrei Mihu's avatar Andrei Mihu Committed by Mo Firouz
Browse files

Add storage list operation. Merged #81

parent cf8d3599
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -4,6 +4,9 @@ All notable changes to this project are documented below.
The format is based on [keep a changelog](http://keepachangelog.com/) and this project uses [semantic versioning](http://semver.org/).

## [Unreleased]
### Added
- New storage list feature.

### Changed
- Run Facebook friends import after registration completes.

+17 −26
Original line number Diff line number Diff line
@@ -34,11 +34,11 @@ CREATE TABLE IF NOT EXISTS users (
    gamecenter_id  VARCHAR(64)   UNIQUE,
    steam_id       VARCHAR(64)   UNIQUE,
    custom_id      VARCHAR(64)   UNIQUE,
    created_at     INT           CHECK (created_at > 0) NOT NULL,
    updated_at     INT           CHECK (updated_at > 0) NOT NULL,
    verified_at    INT           CHECK (verified_at >= 0) DEFAULT 0 NOT NULL,
    disabled_at    INT           CHECK (disabled_at >= 0) DEFAULT 0 NOT NULL,
    last_online_at INT           CHECK (last_online_at >= 0) DEFAULT 0 NOT NULL
    created_at     BIGINT        CHECK (created_at > 0) NOT NULL,
    updated_at     BIGINT        CHECK (updated_at > 0) NOT NULL,
    verified_at    BIGINT        CHECK (verified_at >= 0) DEFAULT 0 NOT NULL,
    disabled_at    BIGINT        CHECK (disabled_at >= 0) DEFAULT 0 NOT NULL,
    last_online_at BIGINT        CHECK (last_online_at >= 0) DEFAULT 0 NOT NULL
);

-- This table should be replaced with an array column in the users table
@@ -58,7 +58,7 @@ CREATE TABLE IF NOT EXISTS user_edge (
    PRIMARY KEY (source_id, state, position),
    source_id      BYTEA    NOT NULL,
    position       BIGINT   NOT NULL, -- Used for sort order on rows
    updated_at     INT      CHECK (updated_at > 0) NOT NULL,
    updated_at     BIGINT   CHECK (updated_at > 0) NOT NULL,
    destination_id BYTEA    NOT NULL,
    state          SMALLINT DEFAULT 0 NOT NULL, -- friend(0), invite(1), invited(2), blocked(3), deleted(4), archived(5)

@@ -70,7 +70,7 @@ CREATE TABLE IF NOT EXISTS user_edge_metadata (
    source_id  BYTEA    NOT NULL,
    count      INT      DEFAULT 0 CHECK (count >= 0) NOT NULL,
    state      SMALLINT DEFAULT 0 CHECK (state >= 0) NOT NULL, -- Unused, currently only set to 0.
    updated_at INT      CHECK (updated_at > 0) NOT NULL
    updated_at BIGINT   CHECK (updated_at > 0) NOT NULL
);

CREATE TABLE IF NOT EXISTS groups (
@@ -87,9 +87,9 @@ CREATE TABLE IF NOT EXISTS groups (
    metadata      BYTEA         DEFAULT '{}' CHECK (length(metadata) < 16000) NOT NULL,
    state         SMALLINT      DEFAULT 0 CHECK (state >= 0) NOT NULL, -- public(0), private(1)
    count         INT           DEFAULT 0 CHECK (count >= 0) NOT NULL,
    created_at    INT           CHECK (created_at > 0) NOT NULL,
    updated_at    INT           CHECK (updated_at > 0) NOT NULL,
    disabled_at   INT           CHECK (disabled_at >= 0) DEFAULT 0 NOT NULL
    created_at    BIGINT        CHECK (created_at > 0) NOT NULL,
    updated_at    BIGINT        CHECK (updated_at > 0) NOT NULL,
    disabled_at   BIGINT        CHECK (disabled_at >= 0) DEFAULT 0 NOT NULL
);
CREATE INDEX IF NOT EXISTS count_updated_at_id_idx ON groups (count, updated_at, id, disabled_at);
CREATE INDEX IF NOT EXISTS created_at_count_id_idx ON groups (created_at, count, id, disabled_at);
@@ -101,7 +101,7 @@ CREATE TABLE IF NOT EXISTS group_edge (
    PRIMARY KEY (source_id, state, position),
    source_id      BYTEA    NOT NULL,
    position       BIGINT   NOT NULL, -- Used for sort order on rows
    updated_at     INT      CHECK (updated_at > 0) NOT NULL,
    updated_at     BIGINT   CHECK (updated_at > 0) NOT NULL,
    destination_id BYTEA    NOT NULL,
    state          SMALLINT CHECK (state >= 0) NOT NULL, -- admin(0), member(1), join(2), archived(3)

@@ -115,8 +115,8 @@ CREATE TABLE IF NOT EXISTS message (
    topic_type SMALLINT    NOT NULL, -- dm(0), room(1), group(2)
    message_id BYTEA       NOT NULL,
    user_id    BYTEA       NOT NULL,
    created_at INT         CHECK (created_at > 0) NOT NULL,
    expires_at INT         DEFAULT 0 CHECK (created_at >= 0) NOT NULL,
    created_at BIGINT      CHECK (created_at > 0) NOT NULL,
    expires_at BIGINT      DEFAULT 0 CHECK (created_at >= 0) NOT NULL,
    handle     VARCHAR(20) NOT NULL,
    type       SMALLINT    NOT NULL, -- chat(0), group_join(1), group_add(2), group_leave(3), group_kick(4), group_promoted(5)
    -- FIXME replace with JSONB
@@ -137,21 +137,12 @@ CREATE TABLE IF NOT EXISTS storage (
    version    BYTEA       NOT NULL,
    read       SMALLINT    DEFAULT 1 CHECK (read >= 0) NOT NULL,
    write      SMALLINT    DEFAULT 1 CHECK (write >= 0) NOT NULL,
    created_at INT         CHECK (created_at > 0) NOT NULL,
    updated_at INT         CHECK (updated_at > 0) NOT NULL,
    created_at BIGINT      CHECK (created_at > 0) NOT NULL,
    updated_at BIGINT      CHECK (updated_at > 0) NOT NULL,
    -- FIXME replace with TTL support
    expires_at INT         CHECK (expires_at >= 0) DEFAULT 0 NOT NULL,
    deleted_at INT         CHECK (deleted_at >= 0) DEFAULT 0 NOT NULL
    expires_at BIGINT      CHECK (expires_at >= 0) DEFAULT 0 NOT NULL,
    deleted_at BIGINT      CHECK (deleted_at >= 0) DEFAULT 0 NOT NULL
);
CREATE INDEX IF NOT EXISTS read_idx ON storage (read);
CREATE INDEX IF NOT EXISTS write_idx ON storage (write);
CREATE INDEX IF NOT EXISTS version_idx ON storage (version);
-- For sync fetch
CREATE INDEX IF NOT EXISTS user_id_bucket_updated_at_idx ON storage (user_id, bucket, updated_at);
-- For bulk deletes
CREATE INDEX IF NOT EXISTS user_id_deleted_at_idx ON storage (user_id, deleted_at);
CREATE INDEX IF NOT EXISTS user_id_bucket_deleted_at_idx ON storage (user_id, bucket, deleted_at);
CREATE INDEX IF NOT EXISTS user_id_bucket_collection_deleted_at_idx ON storage (user_id, bucket, collection, deleted_at);

-- +migrate Down
DROP TABLE IF EXISTS user_device;
+36 −0
Original line number Diff line number Diff line
/*
 * Copyright 2017 The Nakama Authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

-- +migrate Up
-- NOTE: not postgres compatible, it expects table.index rather than table@index.
DROP INDEX IF EXISTS storage@read_idx;
DROP INDEX IF EXISTS storage@write_idx;
DROP INDEX IF EXISTS storage@version_idx;
DROP INDEX IF EXISTS storage@user_id_bucket_updated_at_idx;
DROP INDEX IF EXISTS storage@user_id_deleted_at_idx;
DROP INDEX IF EXISTS storage@user_id_bucket_deleted_at_idx;
DROP INDEX IF EXISTS storage@user_id_bucket_collection_deleted_at_idx;

-- List by user first, then keep narrowing down.
CREATE INDEX IF NOT EXISTS deleted_at_user_id_read_bucket_collection_record_idx ON storage (deleted_at, user_id, read, bucket, collection, record);
CREATE INDEX IF NOT EXISTS deleted_at_user_id_bucket_read_collection_record_idx ON storage (deleted_at, user_id, bucket, read, collection, record);
CREATE INDEX IF NOT EXISTS deleted_at_user_id_bucket_collection_read_record_idx ON storage (deleted_at, user_id, bucket, collection, read, record);

-- List across users.
CREATE INDEX IF NOT EXISTS deleted_at_bucket_read_collection_record_user_id_idx ON storage (deleted_at, bucket, read, collection, record, user_id);
CREATE INDEX IF NOT EXISTS deleted_at_bucket_collection_read_record_user_id_idx ON storage (deleted_at, bucket, collection, read, record, user_id);

-- +migrate Down
+35 −19
Original line number Diff line number Diff line
@@ -236,25 +236,27 @@ message Envelope {
    MatchData match_data = 47;
    MatchPresence match_presence = 48;

    TStorageFetch storage_fetch = 49;
    TStorageWrite storage_write = 50;
    TStorageRemove storage_remove = 51;
    TStorageData storage_data = 52;
    TStorageKeys storage_keys = 53;
    TStorageList storage_list = 49;
    TStorageFetch storage_fetch = 50;
    TStorageWrite storage_write = 51;
    TStorageRemove storage_remove = 52;
    TStorageData storage_data = 53;
    TStorageKeys storage_keys = 54;

    TLeaderboardsList leaderboards_list = 54;
    TLeaderboardRecordsWrite leaderboard_records_write = 55;
    TLeaderboardRecordsFetch leaderboard_records_fetch = 56;
    TLeaderboardRecordsList leaderboard_records_list = 57;
    TLeaderboards leaderboards = 58;
    TLeaderboardRecords leaderboard_records = 59;
    TLeaderboardsList leaderboards_list = 55;
    TLeaderboardRecordsWrite leaderboard_records_write = 56;
    TLeaderboardRecordsFetch leaderboard_records_fetch = 57;
    TLeaderboardRecordsList leaderboard_records_list = 58;
    TLeaderboards leaderboards = 59;

    TMatchmakeAdd matchmake_add = 60;
    TMatchmakeRemove matchmake_remove = 61;
    TMatchmakeTicket matchmake_ticket = 62;
    MatchmakeMatched matchmake_matched = 63;
    TLeaderboardRecords leaderboard_records = 60;

    TRpc rpc = 64;
    TMatchmakeAdd matchmake_add = 61;
    TMatchmakeRemove matchmake_remove = 62;
    TMatchmakeTicket matchmake_ticket = 63;
    MatchmakeMatched matchmake_matched = 64;

    TRpc rpc = 65;
  }
}

@@ -1005,6 +1007,19 @@ enum StoragePermissionWrite {
  OWNER_WRITE = 1;
}

/**
 * TStorageList is used to list records from Storage
 *
 * @returns TStorageData
 */
message TStorageList {
  bytes user_id = 1;
  string bucket = 2;
  string collection = 3;
  int64 limit = 4;
  bytes cursor = 5;
}

/**
 * TStorageFetch is used to retrieve a list of records from Storage
 *
@@ -1039,6 +1054,7 @@ message TStorageData {
  }

  repeated StorageData data = 1;
  bytes cursor = 2;
}

/**
+201 −0
Original line number Diff line number Diff line
@@ -22,10 +22,19 @@ import (
	"errors"
	"fmt"

	"encoding/gob"
	"github.com/satori/go.uuid"
	"go.uber.org/zap"
)

type storageListCursor struct {
	Bucket     string
	Collection string
	Record     string
	UserID     []byte
	Read       int64
}

type StorageKey struct {
	Bucket     string
	Collection string
@@ -49,6 +58,198 @@ type StorageData struct {
	ExpiresAt       int64
}

func StorageList(logger *zap.Logger, db *sql.DB, caller uuid.UUID, userID []byte, bucket string, collection string, limit int64, cursor []byte) ([]*StorageData, []byte, Error_Code, error) {
	// We list by at least User ID, or bucket as a list criteria.
	if len(userID) == 0 && bucket == "" {
		return nil, nil, BAD_INPUT, errors.New("Either a User ID or a bucket is required as an initial list criteria")
	}
	if bucket == "" && collection != "" {
		return nil, nil, BAD_INPUT, errors.New("Cannot list by collection without listing by bucket first")
	}

	// If a user ID is provided, validate the format.
	owner := uuid.Nil
	if len(userID) != 0 {
		if uid, err := uuid.FromBytes(userID); err != nil {
			return nil, nil, BAD_INPUT, errors.New("Invalid user ID")
		} else {
			owner = uid
		}
	}

	// Validate the limit.
	if limit == 0 {
		limit = 10
	} else if limit < 10 || limit > 100 {
		return nil, nil, BAD_INPUT, errors.New("Limit must be between 10 and 100")
	}

	// Process the incoming cursor if one is provided.
	var incomingCursor *storageListCursor
	if len(cursor) != 0 {
		incomingCursor = &storageListCursor{}
		if err := gob.NewDecoder(bytes.NewReader(cursor)).Decode(incomingCursor); err != nil {
			return nil, nil, BAD_INPUT, errors.New("Invalid cursor data")
		}
	}

	// Select the correct index. NOTE: should be removed when DB index selection is smarter.
	index := ""
	if len(userID) == 0 {
		if bucket == "" {
			index = "deleted_at_user_id_read_bucket_collection_record_idx"
		} else if collection == "" {
			index = "deleted_at_user_id_bucket_read_collection_record_idx"
		} else {
			index = "deleted_at_user_id_bucket_collection_read_record_idx"
		}
	} else {
		if collection == "" {
			index = "deleted_at_bucket_read_collection_record_user_id_idx"
		} else {
			index = "deleted_at_bucket_collection_read_record_user_id_idx"
		}
	}

	// Set up the query.
	query := "SELECT user_id, bucket, collection, record, value, version, read, write, created_at, updated_at, expires_at FROM storage@" + index
	params := make([]interface{}, 0)

	// If cursor is present, give keyset clause priority over other parameters.
	if incomingCursor != nil {
		if len(userID) == 0 {
			if collection == "" {
				i := len(params)
				query += fmt.Sprintf(" WHERE (deleted_at, bucket, read, collection, record, user_id) > (0, $%v, $%v, $%v, $%v, $%v) AND deleted_at+deleted_at = 0 AND bucket = $%v", i+1, i+2, i+3, i+4, i+5, i+6)
				params = append(params, incomingCursor.Bucket, incomingCursor.Read, incomingCursor.Collection, incomingCursor.Record, incomingCursor.UserID, bucket)
			} else {
				i := len(params)
				query += fmt.Sprintf(" WHERE (deleted_at, bucket, collection, read, record, user_id) > (0, $%v, $%v, $%v, $%v, $%v) AND deleted_at+deleted_at = 0 AND bucket = $%v AND collection = $%v", i+1, i+2, i+3, i+4, i+5, i+6, i+7)
				params = append(params, incomingCursor.Bucket, incomingCursor.Collection, incomingCursor.Read, incomingCursor.Record, incomingCursor.UserID, bucket, collection)
			}
		} else {
			if bucket == "" {
				i := len(params)
				query += fmt.Sprintf(" WHERE (deleted_at, user_id, read, bucket, collection, record) > (0, $%v, $%v, $%v, $%v, $%v) AND deleted_at+deleted_at = 0 AND user_id = $%v", i+1, i+2, i+3, i+4, i+5, i+6)
				params = append(params, incomingCursor.UserID, incomingCursor.Read, incomingCursor.Bucket, incomingCursor.Collection, incomingCursor.Record, userID)
			} else if collection == "" {
				i := len(params)
				query += fmt.Sprintf(" WHERE (deleted_at, user_id, bucket, read, collection, record) > (0, $%v, $%v, $%v, $%v, $%v) AND deleted_at+deleted_at = 0 AND user_id = $%v AND bucket = $%v", i+1, i+2, i+3, i+4, i+5, i+6, i+7)
				params = append(params, incomingCursor.UserID, incomingCursor.Bucket, incomingCursor.Read, incomingCursor.Collection, incomingCursor.Record, userID, bucket)
			} else {
				i := len(params)
				query += fmt.Sprintf(" WHERE (deleted_at, user_id, bucket, collection, read, record) > (0, $%v, $%v, $%v, $%v, $%v) AND deleted_at+deleted_at = 0 AND user_id = $%v AND bucket = $%v AND collection = $%v", i+1, i+2, i+3, i+4, i+5, i+6, i+7, i+8)
				params = append(params, incomingCursor.UserID, incomingCursor.Bucket, incomingCursor.Collection, incomingCursor.Read, incomingCursor.Record, userID, bucket, collection)
			}
		}
	} else {
		// If no keyset, start all ranges with live records.
		query += " WHERE deleted_at = 0"
		// Apply filtering parameters as needed.
		if len(userID) != 0 {
			params = append(params, owner.Bytes())
			query += fmt.Sprintf(" AND user_id = $%v", len(params))
		}
		if bucket != "" {
			params = append(params, bucket)
			query += fmt.Sprintf(" AND bucket = $%v", len(params))
		}
		if collection != "" {
			params = append(params, collection)
			query += fmt.Sprintf(" AND collection = $%v", len(params))
		}
	}

	// Apply permissions as needed.
	if caller == uuid.Nil {
		// Script runtime can list all data regardless of read permission.
		query += " AND read >= 0"
	} else if len(userID) != 0 && caller == owner {
		// If listing by user first, and the caller is the user listing their own data.
		query += " AND read >= 1"
	} else {
		query += " AND read >= 2"
	}

	params = append(params, limit+1)
	query += fmt.Sprintf(" LIMIT $%v", len(params))

	// Execute the query.
	rows, err := db.Query(query, params...)
	if err != nil {
		logger.Error("Error in storage list", zap.Error(err))
		return nil, nil, RUNTIME_EXCEPTION, err
	}
	defer rows.Close()

	storageData := make([]*StorageData, 0)
	var outgoingCursor []byte

	// Parse the results.
	var dataUserID []byte
	var dataBucket sql.NullString
	var dataCollection sql.NullString
	var dataRecord sql.NullString
	var dataValue []byte
	var dataVersion []byte
	var dataRead sql.NullInt64
	var dataWrite sql.NullInt64
	var dataCreatedAt sql.NullInt64
	var dataUpdatedAt sql.NullInt64
	var dataExpiresAt sql.NullInt64
	for rows.Next() {
		if int64(len(storageData)) >= limit {
			cursorBuf := new(bytes.Buffer)
			newCursor := &storageListCursor{
				Bucket:     dataBucket.String,
				Collection: dataCollection.String,
				Record:     dataRecord.String,
				UserID:     dataUserID,
				Read:       dataRead.Int64,
			}
			if gob.NewEncoder(cursorBuf).Encode(newCursor); err != nil {
				logger.Error("Error creating storage list cursor", zap.Error(err))
				return nil, nil, RUNTIME_EXCEPTION, errors.New("Error listing storage data")
			}
			outgoingCursor = cursorBuf.Bytes()
			break
		}

		err := rows.Scan(&dataUserID, &dataBucket, &dataCollection, &dataRecord, &dataValue, &dataVersion,
			&dataRead, &dataWrite, &dataCreatedAt, &dataUpdatedAt, &dataExpiresAt)
		if err != nil {
			logger.Error("Could not execute storage list query", zap.Error(err))
			return nil, nil, RUNTIME_EXCEPTION, err
		}

		// Potentially coerce zero-length global owner field.
		if len(dataUserID) == 0 {
			dataUserID = nil
		}

		// Accumulate the response.
		storageData = append(storageData, &StorageData{
			Bucket:          dataBucket.String,
			Collection:      dataCollection.String,
			Record:          dataRecord.String,
			UserId:          dataUserID,
			Value:           dataValue,
			Version:         dataVersion,
			PermissionRead:  dataRead.Int64,
			PermissionWrite: dataWrite.Int64,
			CreatedAt:       dataCreatedAt.Int64,
			UpdatedAt:       dataUpdatedAt.Int64,
			ExpiresAt:       dataExpiresAt.Int64,
		})
	}
	if err = rows.Err(); err != nil {
		logger.Error("Could not execute storage list query", zap.Error(err))
		return nil, nil, RUNTIME_EXCEPTION, err
	}

	return storageData, outgoingCursor, 0, nil
}

func StorageFetch(logger *zap.Logger, db *sql.DB, caller uuid.UUID, keys []*StorageKey) ([]*StorageData, Error_Code, error) {
	// Ensure there is at least one key requested.
	if len(keys) == 0 {
Loading