Commit d3d79027 authored by Mo Firouz's avatar Mo Firouz
Browse files

Tweak Group queries. Merged #17

parent a4948707
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -18,7 +18,7 @@ The format is based on [keep a changelog](http://keepachangelog.com/) and this p
### Fixed

- Fix issue where random handle generator wasn't seeded properly.
- Fix issues in executing Friend and Storage queries.
- Fix issues in executing Friend, Storage and Group queries.
- Fix sending Close frame message in the Websocket to gracefully close connection.

## [0.10.0] - 2017-01-14
+3 −2
Original line number Diff line number Diff line
@@ -18,8 +18,9 @@ import (
	"database/sql"
	"fmt"

	"github.com/uber-go/zap"
	"nakama/pkg/social"

	"github.com/uber-go/zap"
)

type pipeline struct {
@@ -42,7 +43,7 @@ func NewPipeline(config Config, db *sql.DB, socialClient *social.Client, tracker
}

func (p *pipeline) processRequest(logger zap.Logger, session *session, envelope *Envelope) {
	logger.Debug(fmt.Sprintf("Received %T message", envelope.Payload))
	logger.Debug(fmt.Sprintf("Received %T message", envelope.Payload), zap.String("collation_id", envelope.CollationId))

	switch envelope.Payload.(type) {
	case *Envelope_Logout:
+47 −19
Original line number Diff line number Diff line
@@ -23,6 +23,7 @@ import (
	"strconv"
	"strings"

	"github.com/lib/pq"
	"github.com/satori/go.uuid"
	"github.com/uber-go/zap"
)
@@ -38,14 +39,14 @@ type groupCursor struct {
}

func (p *pipeline) extractGroup(r scanner) (*Group, error) {
	var id sql.RawBytes
	var creatorID sql.RawBytes
	var id []byte
	var creatorID []byte
	var name sql.NullString
	var description sql.NullString
	var avatarURL sql.NullString
	var lang sql.NullString
	var utcOffsetMs sql.NullInt64
	var metadata sql.RawBytes
	var metadata []byte
	var state sql.NullInt64
	var count sql.NullInt64
	var createdAt sql.NullInt64
@@ -145,19 +146,19 @@ func (p *pipeline) groupCreate(logger zap.Logger, session *session, envelope *En

	if g.Description != "" {
		columns = append(columns, "description")
		params = append(params, "$"+strconv.Itoa(len(values)))
		params = append(params, "$"+strconv.Itoa(len(values)+1))
		values = append(values, g.Description)
	}

	if g.AvatarUrl != "" {
		columns = append(columns, "avatar_url")
		params = append(params, "$"+strconv.Itoa(len(values)))
		params = append(params, "$"+strconv.Itoa(len(values)+1))
		values = append(values, g.AvatarUrl)
	}

	if g.Lang != "" {
		columns = append(columns, "lang")
		params = append(params, "$"+strconv.Itoa(len(values)))
		params = append(params, "$"+strconv.Itoa(len(values)+1))
		values = append(values, g.Lang)
	}

@@ -176,8 +177,8 @@ func (p *pipeline) groupCreate(logger zap.Logger, session *session, envelope *En

	r := tx.QueryRow(`
INSERT INTO groups (creator_id, name, state, created_at, updated_at, `+strings.Join(columns, ", ")+")"+`
VALUES ($1, $2, $3, $3, `+strings.Join(params, ",")+")"+`
RETURNING id, creator_id, name, description, avatar_url, lang, utc_offset_ms, metadata, state, count, created_at, updated_at, disabled_at
VALUES ($1, $2, $3, $4, $4, `+strings.Join(params, ", ")+")"+`
RETURNING id, creator_id, name, description, avatar_url, lang, utc_offset_ms, metadata, state, count, created_at, updated_at
`, values...)

	group, err = p.extractGroup(r)
@@ -215,7 +216,7 @@ func (p *pipeline) groupUpdate(l zap.Logger, session *session, envelope *Envelop

	// Make this `var js interface{}` if we want to allow top-level JSON arrays.
	var maybeJSON map[string]interface{}
	if json.Unmarshal(g.Metadata, &maybeJSON) != nil {
	if len(g.Metadata) != 0 && json.Unmarshal(g.Metadata, &maybeJSON) != nil {
		session.Send(&Envelope{CollationId: envelope.CollationId, Payload: &Envelope_Error{&Error{Reason: "Metadata must be a valid JSON object"}}})
		return
	}
@@ -318,7 +319,7 @@ AND
	EXISTS (SELECT source_id FROM group_edge WHERE source_id = $1 AND destination_id = $2 AND state = 0)
	`, groupID.Bytes(), session.userID.Bytes())

	if err == nil {
	if err != nil {
		return
	}

@@ -338,7 +339,7 @@ AND
func (p *pipeline) groupsFetch(logger zap.Logger, session *session, envelope *Envelope) {
	g := envelope.GetGroupsFetch()

	validGroupIds := make([][]byte, 0)
	validGroupIds := make([]interface{}, 0)
	statements := make([]string, 0)

	for _, gid := range g.GroupIds {
@@ -354,7 +355,7 @@ func (p *pipeline) groupsFetch(logger zap.Logger, session *session, envelope *En
	rows, err := p.db.Query(
		`SELECT id, creator_id, name, description, avatar_url, lang, utc_offset_ms, metadata, state, count, created_at, updated_at
FROM groups WHERE disabled_at = 0 AND ( `+strings.Join(statements, " OR ")+" )",
		validGroupIds)
		validGroupIds...)
	if err != nil {
		logger.Error("Could not get groups", zap.Error(err))
		session.Send(&Envelope{CollationId: envelope.CollationId, Payload: &Envelope_Error{&Error{Reason: "Could not get groups"}}})
@@ -391,6 +392,7 @@ func (p *pipeline) groupsList(logger zap.Logger, session *session, envelope *Env
	}

	foundCursor := false
	paramNumber := 1
	if incoming.Cursor != nil {
		var c groupCursor
		if err := gob.NewDecoder(bytes.NewReader(incoming.Cursor)).Decode(&c); err != nil {
@@ -402,6 +404,7 @@ func (p *pipeline) groupsList(logger zap.Logger, session *session, envelope *Env
		params = append(params, c.Primary)
		params = append(params, c.Secondary)
		params = append(params, c.GroupID)
		paramNumber = len(params)
	}

	orderBy := "DESC"
@@ -417,19 +420,19 @@ func (p *pipeline) groupsList(logger zap.Logger, session *session, envelope *Env
		if foundCursor {
			cursorQuery = "(lang, count, id) " + comparison + " ($1, $2, $3) AND"
		}
		filterQuery = "lang >= $" + strconv.Itoa(len(params)) + " AND"
		filterQuery = "lang >= $" + strconv.Itoa(paramNumber) + " AND"
		params = append(params, incoming.GetLang())
	} else if incoming.GetCreatedAt() != 0 {
		if foundCursor {
			cursorQuery = "(created_at, count, id) " + comparison + " ($1, $2, $3) AND"
		}
		filterQuery = "created_at >= $" + strconv.Itoa(len(params)) + " AND"
		filterQuery = "created_at >= $" + strconv.Itoa(paramNumber) + " AND"
		params = append(params, incoming.GetCreatedAt())
	} else if incoming.GetCount() != 0 {
		if foundCursor {
			cursorQuery = "(count, updated_at, id) " + comparison + " ($1, $2, $3) AND"
		}
		filterQuery = "count <= $" + strconv.Itoa(len(params)) + " AND"
		filterQuery = "count <= $" + strconv.Itoa(paramNumber) + " AND"
		params = append(params, incoming.GetCount())
	}

@@ -701,6 +704,27 @@ func (p *pipeline) groupLeave(l zap.Logger, session *session, envelope *Envelope
		}
	}()

	// first remove any invitation from user
	// and if this wasn't an invitation then
	// look to see if the user is an admin
	// and remove the user from group and update group count
	res, err := tx.Exec(`
DELETE FROM group_edge
WHERE
	(source_id = $1 AND destination_id = $2 AND state = 2)
OR
	(source_id = $2 AND destination_id = $1 AND state = 2)`,
		groupID.Bytes(), session.userID.Bytes())

	if err != nil {
		return
	}

	if count, _ := res.RowsAffected(); count > 0 {
		logger.Info("Group invitation removed.")
		return
	}

	var adminCount sql.NullInt64
	err = tx.QueryRow(`
SELECT COUNT(source_id)	FROM group_edge
@@ -722,7 +746,7 @@ AND
		return
	}

	res, err := tx.Exec(`
	res, err = tx.Exec(`
DELETE FROM group_edge
WHERE
	(source_id = $1 AND destination_id = $2)
@@ -865,7 +889,11 @@ func (p *pipeline) groupUserKick(l zap.Logger, session *session, envelope *Envel
	}
	defer func() {
		if err != nil {
			if _, ok := err.(*pq.Error); ok {
				logger.Error("Could not kick user from group", zap.Error(err))
			} else {
				logger.Warn("Could not kick user from group", zap.Error(err))
			}
			err = tx.Rollback()
			if err != nil {
				logger.Error("Could not rollback transaction", zap.Error(err))
@@ -902,8 +930,8 @@ AND
	}

	if count, _ := res.RowsAffected(); count == 0 {
		failureReason = "Cannot kick from group - Make sure user is part of the group or group exists"
		err = errors.New("Cannot kick from group - Make sure user is part of the group or group exists")
		failureReason = "Cannot kick from group - Make sure user is part of the group and is admin or group exists"
		err = errors.New("Cannot kick from group - Make sure user is part of the group and is admin or group exists")
		return
	}

+1 −1
Original line number Diff line number Diff line
@@ -578,7 +578,7 @@ func (p *pipeline) storeAndDeliverMessage(logger zap.Logger, session *session, t
	var messageID []byte
	var handle string
	err := p.db.QueryRow(`INSERT INTO message (topic, topic_type, user_id, created_at, expires_at, handle, type, data)
SELECT $1, $2, $3, $4, handle, $5, $6
SELECT $1, $2, $3, $4, $5, handle, $6, $7
FROM users
WHERE id = $3
RETURNING message_id, handle`, topicBytes, topicType, session.userID.Bytes(), createdAt, expiresAt, msgType, data).Scan(&messageID, &handle)
+7 −3
Original line number Diff line number Diff line
@@ -18,6 +18,8 @@ import (
	"sync"
	"time"

	"fmt"

	"github.com/gogo/protobuf/proto"
	"github.com/gorilla/websocket"
	"github.com/satori/go.uuid"
@@ -119,8 +121,10 @@ func (s *session) pingNow() bool {
	return true
}

func (s *session) Send(response proto.Message) error {
	payload, err := proto.Marshal(response)
func (s *session) Send(envelope *Envelope) error {
	s.logger.Debug(fmt.Sprintf("Sending %T message", envelope.Payload), zap.String("collation_id", envelope.CollationId))

	payload, err := proto.Marshal(envelope)

	if err != nil {
		s.logger.Warn("Could not marshall Response to byte[]", zap.Error(err))