Commit 4f1f4c8b authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Add additional group functions. Merge #102

parent bd1473e1
Loading
Loading
Loading
Loading
+6 −0
Original line number Diff line number Diff line
@@ -7,6 +7,9 @@ The format is based on [keep a changelog](http://keepachangelog.com/) and this p
### Added
- New storage partial update feature.
- Log warning messages at startup when using insecure default parameter values.
- Add script runtime function to update groups.
- Add script runtime function to list groups a user is part of.
- Add script runtime function to list users belonging to a group.

### Changed
- Use Lua table for Content field when creating new notifications.
@@ -18,6 +21,9 @@ The format is based on [keep a changelog](http://keepachangelog.com/) and this p
- Matchmake token expiry increased from 15 seconds to 30 seconds.
- Script runtime `os.date()` function now returns correct day of year.
- Script runtime contexts passed to function hooks now use `PascalCase` naming for fields. For example `context.user_id` must now be `context.UserId`.
- Remove `admin` sub-command.
- Group leave operations now return a specific error code when the last admin attempts to leave.
- Group self list operations now return the user's membership type to each group.

## [1.0.0-rc.1] - 2017-07-18
### Added

cmd/admin.go

deleted100644 → 0
+0 −152
Original line number Diff line number Diff line
// Copyright 2017 The Nakama Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cmd

import (
	"database/sql"
	"encoding/base64"
	"encoding/json"
	"flag"
	"fmt"
	"net/url"
	"os"

	"github.com/gorhill/cronexpr"
	"github.com/satori/go.uuid"
	"go.uber.org/zap"
)

type adminService struct {
	DSNS   string
	logger *zap.Logger
}

func AdminParse(args []string, logger *zap.Logger) {
	if len(args) == 0 {
		logger.Fatal("Admin requires a subcommand. Available commands are: 'create-leaderboard'.")
	}

	var exec func([]string, *zap.Logger)
	switch args[0] {
	case "create-leaderboard":
		exec = createLeaderboard
	default:
		logger.Fatal("Unrecognized admin subcommand. Available commands are: 'create-leaderboard'.")
	}

	exec(args[1:], logger)
	os.Exit(0)
}

func createLeaderboard(args []string, logger *zap.Logger) {
	var dbAddress string
	var id string
	var authoritative bool
	var sortOrder string
	var resetSchedule string
	var metadata string

	flags := flag.NewFlagSet("admin", flag.ExitOnError)
	flags.StringVar(&dbAddress, "database.address", "root@localhost:26257", "Address of CockroachDB server (username:password@address:port/dbname)")
	flags.StringVar(&id, "id", "", "ID to assign to the leaderboard.")
	flags.BoolVar(&authoritative, "authoritative", false, "True if clients may not submit scores directly, false otherwise.")
	flags.StringVar(&sortOrder, "sort", "desc", "Leaderboard sort order, 'asc' or 'desc'.")
	flags.StringVar(&resetSchedule, "reset", "", "Optional reset schedule in CRON format.")
	flags.StringVar(&metadata, "metadata", "{}", "Optional additional metadata as a JSON string.")

	if err := flags.Parse(args); err != nil {
		logger.Fatal("Could not parse admin flags.")
	}

	if dbAddress == "" {
		logger.Fatal("Database connection details are required.")
	}

	query := `INSERT INTO leaderboard (id, authoritative, sort_order, reset_schedule, metadata)
	VALUES ($1, $2, $3, $4, $5)`
	params := []interface{}{}

	// ID.
	if id == "" {
		params = append(params, uuid.NewV4().Bytes())
	} else {
		params = append(params, []byte(id))
	}

	// Authoritative.
	params = append(params, authoritative)

	// Sort order.
	if sortOrder == "asc" {
		params = append(params, 0)
	} else if sortOrder == "desc" {
		params = append(params, 1)
	} else {
		logger.Fatal("Invalid sort value, must be 'asc' or 'desc'.")
	}

	// Count is hardcoded in the INSERT above.

	// Reset schedule.
	if resetSchedule != "" {
		_, err := cronexpr.Parse(resetSchedule)
		if err != nil {
			logger.Fatal("Reset schedule must be a valid CRON expression.")
		}
		params = append(params, resetSchedule)
	} else {
		params = append(params, nil)
	}

	// Metadata.
	metadataBytes := []byte(metadata)
	var maybeJSON map[string]interface{}
	if json.Unmarshal(metadataBytes, &maybeJSON) != nil {
		logger.Fatal("Metadata must be a valid JSON string.")
	}
	params = append(params, metadataBytes)

	rawurl := fmt.Sprintf("postgresql://%s?sslmode=disable", dbAddress)
	url, err := url.Parse(rawurl)
	if err != nil {
		logger.Fatal("Bad connection URL", zap.Error(err))
	}

	logger.Info("Database connection", zap.String("db", dbAddress))

	// Default to "nakama" as DB name.
	dbname := "nakama"
	if len(url.Path) > 1 {
		dbname = url.Path[1:]
	}
	url.Path = fmt.Sprintf("/%s", dbname)
	db, err := sql.Open(dialect, url.String())
	if err != nil {
		logger.Fatal("Failed to open database", zap.Error(err))
	}
	if err = db.Ping(); err != nil {
		logger.Fatal("Error pinging database", zap.Error(err))
	}

	res, err := db.Exec(query, params...)
	if err != nil {
		logger.Fatal("Error creating leaderboard", zap.Error(err))
	}
	if rowsAffected, _ := res.RowsAffected(); rowsAffected != 1 {
		logger.Fatal("Error creating leaderboard, unexpected insert result")
	}

	logger.Info("Leaderboard created", zap.String("base64(id)", base64.StdEncoding.EncodeToString(params[0].([]byte))))
}
+0 −2
Original line number Diff line number Diff line
@@ -62,8 +62,6 @@ func main() {
			cmd.DoctorParse(os.Args[2:])
		case "migrate":
			cmd.MigrateParse(os.Args[2:], cmdLogger)
		case "admin":
			cmd.AdminParse(os.Args[2:], cmdLogger)
		}
	}

+77 −57
Original line number Diff line number Diff line
@@ -66,14 +66,16 @@ message Error {
    USER_HANDLE_INUSE = 10;
    /// Group names must be unique and it's already in use.
    GROUP_NAME_INUSE = 11;
    /// Group leave operation not allowed because the user is the last admin.
    GROUP_LAST_ADMIN = 12;
    /// Storage write operation failed.
    STORAGE_REJECTED = 12;
    STORAGE_REJECTED = 13;
    /// Match with given ID was not found in the system.
    MATCH_NOT_FOUND = 13;
    MATCH_NOT_FOUND = 14;
    /// Runtime function name was not found in system registry.
    RUNTIME_FUNCTION_NOT_FOUND = 14;
    RUNTIME_FUNCTION_NOT_FOUND = 15;
    /// Runtime function caused an internal server error and did not complete.
    RUNTIME_FUNCTION_EXCEPTION = 15;
    RUNTIME_FUNCTION_EXCEPTION = 16;
  }

  /// Error code - must be one of the Error.Code enums above.
@@ -218,56 +220,57 @@ message Envelope {
    TGroupUsersKick group_users_kick = 27;
    TGroupUsersPromote group_users_promote = 28;
    TGroups groups = 29;
    TGroupUsers group_users = 30;

    TTopicsJoin topics_join = 31;
    TTopicsLeave topics_leave = 32;
    TTopicMessageSend topic_message_send = 33;
    TTopicMessagesList topic_messages_list = 34;
    TTopics topics = 35;
    TTopicMessageAck topic_message_ack = 36;
    TopicMessage topic_message = 37;
    TTopicMessages topic_messages = 38;
    TopicPresence topic_presence = 39;

    TMatchCreate match_create = 40;
    TMatchesJoin matches_join = 41;
    TMatchesLeave matches_leave = 42;
    MatchDataSend match_data_send = 43;
    TMatch match = 44;
    TMatches matches = 45;
    MatchData match_data = 46;
    MatchPresence match_presence = 47;

    TStorageList storage_list = 48;
    TStorageFetch storage_fetch = 49;
    TStorageWrite storage_write = 50;
    TStorageUpdate storage_update = 51;
    TStorageRemove storage_remove = 52;
    TStorageData storage_data = 53;
    TStorageKeys storage_keys = 54;

    TLeaderboardsList leaderboards_list = 55;
    TLeaderboardRecordsWrite leaderboard_records_write = 56;
    TLeaderboardRecordsFetch leaderboard_records_fetch = 57;
    TLeaderboardRecordsList leaderboard_records_list = 58;
    TLeaderboards leaderboards = 59;
    TLeaderboardRecords leaderboard_records = 60;

    TMatchmakeAdd matchmake_add = 61;
    TMatchmakeRemove matchmake_remove = 62;
    TMatchmakeTicket matchmake_ticket = 63;
    MatchmakeMatched matchmake_matched = 64;

    TRpc rpc = 65;

    TPurchaseValidation purchase = 66;
    TPurchaseRecord purchase_record = 67;

    TNotificationsList notifications_list = 68;
    TNotificationsRemove notifications_remove = 69;
    TNotifications notifications = 70;
    Notifications live_notifications = 71;
    TGroupsSelf groups_self = 30;
    TGroupUsers group_users = 31;

    TTopicsJoin topics_join = 32;
    TTopicsLeave topics_leave = 33;
    TTopicMessageSend topic_message_send = 34;
    TTopicMessagesList topic_messages_list = 35;
    TTopics topics = 36;
    TTopicMessageAck topic_message_ack = 37;
    TopicMessage topic_message = 38;
    TTopicMessages topic_messages = 39;
    TopicPresence topic_presence = 40;

    TMatchCreate match_create = 41;
    TMatchesJoin matches_join = 42;
    TMatchesLeave matches_leave = 43;
    MatchDataSend match_data_send = 44;
    TMatch match = 45;
    TMatches matches = 46;
    MatchData match_data = 47;
    MatchPresence match_presence = 48;

    TStorageList storage_list = 49;
    TStorageFetch storage_fetch = 50;
    TStorageWrite storage_write = 51;
    TStorageUpdate storage_update = 52;
    TStorageRemove storage_remove = 53;
    TStorageData storage_data = 54;
    TStorageKeys storage_keys = 55;

    TLeaderboardsList leaderboards_list = 56;
    TLeaderboardRecordsWrite leaderboard_records_write = 57;
    TLeaderboardRecordsFetch leaderboard_records_fetch = 58;
    TLeaderboardRecordsList leaderboard_records_list = 59;
    TLeaderboards leaderboards = 60;
    TLeaderboardRecords leaderboard_records = 61;

    TMatchmakeAdd matchmake_add = 62;
    TMatchmakeRemove matchmake_remove = 63;
    TMatchmakeTicket matchmake_ticket = 64;
    MatchmakeMatched matchmake_matched = 65;

    TRpc rpc = 66;

    TPurchaseValidation purchase = 67;
    TPurchaseRecord purchase_record = 68;

    TNotificationsList notifications_list = 69;
    TNotificationsRemove notifications_remove = 70;
    TNotifications notifications = 71;
    Notifications live_notifications = 72;
  }
}

@@ -444,7 +447,7 @@ message Friend {
  /// Invite(1): Current user has sent an invitation.
  /// Invited(2): Current user has received an invitation.
  /// Blocked(3): Current user has blocked this friend.
  int64 type = 2;
  int64 state = 2;
}

/**
@@ -577,7 +580,7 @@ message TGroupsRemove {
/**
 * TGroupsSelfList fetches a list of groups that the current user is part of.
 *
 * @returns TGroups
 * @returns TGroupsSelf
 */
message TGroupsSelfList {}

@@ -630,6 +633,23 @@ message TGroups {
  bytes cursor = 2;
}

/**
 * TGroupsSelf contains a list of groups a particular user belongs to, and the user's relationsip to each.
 */
message TGroupsSelf {
  message GroupSelf {
    /// The core group information.
    Group group = 1;
    /// The user's relationship to the group. One of:
    /// Admin(0): User is an admin for this group.
    /// Member(1): User is a regular member of this group.
    /// Join(2): User is currently waiting to be accepted in this group.
    int64 state = 2;
  }

  repeated GroupSelf groups_self = 1;
}

/**
 * GroupUser is the core domain type representing a user that belongs to a group and their relationship status with the group.
 */
@@ -639,7 +659,7 @@ message GroupUser {
  /// Admin(0): User is an admin for this group.
  /// Member(1): User is a regular member of this group.
  /// Join(2): User is currently waiting to be accepted in this group.
  int64 type = 2;
  int64 state = 2;
}

/**
+248 −5
Original line number Diff line number Diff line
@@ -20,6 +20,7 @@ import (
	"strconv"
	"strings"

	"fmt"
	"github.com/satori/go.uuid"
	"go.uber.org/zap"
)
@@ -181,11 +182,17 @@ func groupCreate(tx *sql.Tx, g *GroupCreateParam) (*Group, error) {
		values = append(values, g.Metadata)
	}

	r := tx.QueryRow(`
INSERT INTO groups (id, creator_id, name, state, count, created_at, updated_at, `+strings.Join(columns, ", ")+")"+`
VALUES ($1, $2, $3, $4, 1, $5, $5, `+strings.Join(params, ",")+")"+`
RETURNING id, creator_id, name, description, avatar_url, lang, utc_offset_ms, metadata, state, count, created_at, updated_at
`, values...)
	query := "INSERT INTO groups (id, creator_id, name, state, count, created_at, updated_at"
	if len(columns) != 0 {
		query += ", " + strings.Join(columns, ", ")
	}
	query += ") VALUES ($1, $2, $3, $4, 1, $5, $5"
	if len(params) != 0 {
		query += ", " + strings.Join(params, ",")
	}
	query += ") RETURNING id, creator_id, name, description, avatar_url, lang, utc_offset_ms, metadata, state, count, created_at, updated_at"

	r := tx.QueryRow(query, values...)

	group, err := extractGroup(r)
	if err != nil {
@@ -212,3 +219,239 @@ VALUES ($1, $2, $2, $3, 0), ($3, $2, $2, $1, 0)`,

	return group, nil
}

func GroupsUpdate(logger *zap.Logger, db *sql.DB, caller uuid.UUID, updates []*TGroupsUpdate_GroupUpdate) (Error_Code, error) {
	tx, err := db.Begin()
	if err != nil {
		logger.Error("Could not update groups, begin error", zap.Error(err))
		return RUNTIME_EXCEPTION, errors.New("Could not update groups")
	}

	code := RUNTIME_EXCEPTION
	defer func() {
		if err != nil {
			logger.Error("Could not update groups", zap.Error(err))
			if tx != nil {
				if e := tx.Rollback(); e != nil {
					logger.Error("Could not update groups, rollback error", zap.Error(e))
				}
			}
		} else {
			if e := tx.Commit(); e != nil {
				logger.Error("Could not update groups, commit error", zap.Error(e))
				err = errors.New("Could not update groups")
			}
		}
	}()

	for _, g := range updates {
		// TODO notify members that group has been updated.
		groupID, err := uuid.FromBytes(g.GroupId)
		if err != nil {
			code = BAD_INPUT
			err = errors.New("Group ID is not valid.")
			return code, err
		}

		groupLogger := logger.With(zap.String("group_id", groupID.String()))

		statements := make([]string, 6)
		params := make([]interface{}, 7)

		params[0] = groupID.Bytes()

		statements[0] = "updated_at = $2"
		params[1] = nowMs()

		statements[1] = "description = $3"
		params[2] = g.Description

		statements[2] = "avatar_url = $4"
		params[3] = g.AvatarUrl

		statements[3] = "lang = $5"
		params[4] = g.Lang

		statements[4] = "metadata = $6"
		params[5] = g.Metadata

		statements[5] = "state = $7"
		params[6] = 0
		if g.Private {
			params[6] = 1
		}

		if g.Name != "" {
			statements = append(statements, "name = $8")
			params = append(params, g.Name)
		}

		query := "UPDATE groups SET " + strings.Join(statements, ", ") + " WHERE id = $1"

		// If the caller is not the script runtime, apply group membership and admin role checks.
		if caller != uuid.Nil {
			params = append(params, caller.Bytes())
			query += fmt.Sprintf(" AND EXISTS (SELECT source_id FROM group_edge WHERE source_id = $1 AND destination_id = $%v AND state = 0)", len(params))
		}

		res, err := db.Exec(query, params...)
		if err != nil {
			if strings.HasSuffix(err.Error(), "violates unique constraint \"groups_name_key\"") {
				code = GROUP_NAME_INUSE
				err = fmt.Errorf("Name is in use: %v", g.Name)
			} else {
				groupLogger.Error("Could not update group, exec error", zap.Error(err))
				err = errors.New("Could not update group")
			}
			return code, err
		}
		if affectedRows, _ := res.RowsAffected(); affectedRows == 0 {
			code = BAD_INPUT
			err = errors.New("Could not accept group join envelope. Group may not exists with the given ID")
			return code, err
		}

		groupLogger.Debug("Updated group")
	}

	return code, err
}

func GroupsSelfList(logger *zap.Logger, db *sql.DB, caller uuid.UUID, userID uuid.UUID) ([]*TGroupsSelf_GroupSelf, Error_Code, error) {
	// Pipeline callers can only list their own groups.
	if caller != uuid.Nil && caller != userID {
		return nil, BAD_INPUT, errors.New("Users can only list their own joined groups")
	}

	rows, err := db.Query(`
SELECT id, creator_id, name, description, avatar_url, lang, utc_offset_ms, metadata, groups.state, count, created_at, groups.updated_at, group_edge.state
FROM groups
JOIN group_edge ON (group_edge.source_id = id)
WHERE group_edge.destination_id = $1 AND disabled_at = 0 AND (group_edge.state = 1 OR group_edge.state = 0)
`, userID.Bytes())

	if err != nil {
		logger.Error("Could not list joined groups, query error", zap.Error(err))
		return nil, RUNTIME_EXCEPTION, errors.New("Could not list joined groups")
	}
	defer rows.Close()

	groups := make([]*TGroupsSelf_GroupSelf, 0)
	for rows.Next() {
		var id []byte
		var creatorID []byte
		var name sql.NullString
		var description sql.NullString
		var avatarURL sql.NullString
		var lang sql.NullString
		var utcOffsetMs sql.NullInt64
		var metadata []byte
		var state sql.NullInt64
		var count sql.NullInt64
		var createdAt sql.NullInt64
		var updatedAt sql.NullInt64
		var userState sql.NullInt64

		err := rows.Scan(&id, &creatorID, &name,
			&description, &avatarURL, &lang,
			&utcOffsetMs, &metadata, &state,
			&count, &createdAt, &updatedAt, &userState)

		if err != nil {
			logger.Error("Could not list joined groups, scan error", zap.Error(err))
			return nil, RUNTIME_EXCEPTION, errors.New("Could not list joined groups")
		}

		desc := ""
		if description.Valid {
			desc = description.String
		}

		avatar := ""
		if avatarURL.Valid {
			avatar = avatarURL.String
		}

		private := state.Int64 == 1

		groups = append(groups, &TGroupsSelf_GroupSelf{
			Group: &Group{
				Id:          id,
				CreatorId:   creatorID,
				Name:        name.String,
				Description: desc,
				AvatarUrl:   avatar,
				Lang:        lang.String,
				UtcOffsetMs: utcOffsetMs.Int64,
				Metadata:    metadata,
				Private:     private,
				Count:       count.Int64,
				CreatedAt:   createdAt.Int64,
				UpdatedAt:   updatedAt.Int64,
			},
			State: userState.Int64,
		})
	}

	return groups, 0, nil
}

func GroupUsersList(logger *zap.Logger, db *sql.DB, caller uuid.UUID, groupID uuid.UUID) ([]*GroupUser, Error_Code, error) {
	groupLogger := logger.With(zap.String("group_id", groupID.String()))

	query := `
SELECT u.id, u.handle, u.fullname, u.avatar_url,
	u.lang, u.location, u.timezone, u.metadata,
	u.created_at, u.updated_at, u.last_online_at, ge.state
FROM users u, group_edge ge
WHERE u.id = ge.source_id AND ge.destination_id = $1`

	rows, err := db.Query(query, groupID.Bytes())
	if err != nil {
		groupLogger.Error("Could not get group users, query error", zap.Error(err))
		return nil, RUNTIME_EXCEPTION, errors.New("Could not get group users")
	}
	defer rows.Close()

	users := make([]*GroupUser, 0)

	for rows.Next() {
		var id []byte
		var handle sql.NullString
		var fullname sql.NullString
		var avatarURL sql.NullString
		var lang sql.NullString
		var location sql.NullString
		var timezone sql.NullString
		var metadata []byte
		var createdAt sql.NullInt64
		var updatedAt sql.NullInt64
		var lastOnlineAt sql.NullInt64
		var state sql.NullInt64

		err = rows.Scan(&id, &handle, &fullname, &avatarURL, &lang, &location, &timezone, &metadata, &createdAt, &updatedAt, &lastOnlineAt, &state)
		if err != nil {
			groupLogger.Error("Could not get group users, scan error", zap.Error(err))
			return nil, RUNTIME_EXCEPTION, errors.New("Could not get group users")
		}

		users = append(users, &GroupUser{
			User: &User{
				Id:           id,
				Handle:       handle.String,
				Fullname:     fullname.String,
				AvatarUrl:    avatarURL.String,
				Lang:         lang.String,
				Location:     location.String,
				Timezone:     timezone.String,
				Metadata:     metadata,
				CreatedAt:    createdAt.Int64,
				UpdatedAt:    updatedAt.Int64,
				LastOnlineAt: lastOnlineAt.Int64,
			},
			State: state.Int64,
		})
	}

	return users, 0, nil
}
Loading