Commit d816c630 authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Add notification wiring. Merge #103

parent 19674bd1
Loading
Loading
Loading
Loading
+6 −0
Original line number Diff line number Diff line
@@ -10,6 +10,12 @@ The format is based on [keep a changelog](http://keepachangelog.com/) and this p
- Add script runtime function to update groups.
- Add script runtime function to list groups a user is part of.
- Add script runtime function to list users belonging to a group.
- Send in-app notification on friend request.
- Send in-app notification on friend request accept.
- Send in-app notification when a Facebook friend signs into the game for the first time.
- Send in-app notification to group admins when a user requests to join a private group.
- Send in-app notification to the user when they are added to a group or their request to join a private group is accepted.
- Send in-app notification to the user when someone wants to DM chat.

### Changed
- Use Lua table for Content field when creating new notifications.
+52 −7
Original line number Diff line number Diff line
@@ -18,15 +18,20 @@ import (
	"database/sql"
	"errors"

	"encoding/json"
	"fmt"
	"github.com/satori/go.uuid"
	"go.uber.org/zap"
)

func friendAdd(logger *zap.Logger, db *sql.DB, userID []byte, friendID []byte) error {
func friendAdd(logger *zap.Logger, db *sql.DB, ns *NotificationService, userID []byte, handle string, friendID []byte) error {
	tx, txErr := db.Begin()
	if txErr != nil {
		return txErr
	}

	isFriendAccept := false
	updatedAt := nowMs()
	var err error
	defer func() {
		if err != nil {
@@ -34,13 +39,52 @@ func friendAdd(logger *zap.Logger, db *sql.DB, userID []byte, friendID []byte) e
				logger.Error("Could not rollback transaction", zap.Error(rollbackErr))
			}
		} else {
			if err = tx.Commit(); err != nil {
				logger.Error("Could not commit transaction", zap.Error(err))
			if e := tx.Commit(); e != nil {
				logger.Error("Could not commit transaction", zap.Error(e))
				err = e
				return
			}

			// If the operation was successful, send a notification.
			content, e := json.Marshal(map[string]interface{}{"handle": handle})
			if e != nil {
				logger.Warn("Failed to send friend add notification", zap.Error(e))
				return
			}
			var recipient []byte
			var sender []byte
			var subject string
			var code int64
			if isFriendAccept {
				recipient = userID
				sender = friendID
				subject = fmt.Sprintf("%v accepted your friend request", handle)
				code = NOTIFICATION_FRIEND_ACCEPT
			} else {
				recipient = friendID
				sender = userID
				subject = fmt.Sprintf("%v wants to add you as a friend", handle)
				code = NOTIFICATION_FRIEND_REQUEST
			}

			if e := ns.NotificationSend([]*NNotification{
				&NNotification{
					Id:         uuid.NewV4().Bytes(),
					UserID:     recipient,
					Subject:    subject,
					Content:    content,
					Code:       code,
					SenderID:   sender,
					CreatedAt:  updatedAt,
					ExpiresAt:  updatedAt + ns.expiryMs,
					Persistent: true,
				},
			}); e != nil {
				logger.Warn("Failed to send friend add notification", zap.Error(e))
			}
		}
	}()

	updatedAt := nowMs()
	// Mark an invite as accepted, if one was in place.
	res, err := tx.Exec(`
UPDATE user_edge SET state = 0, updated_at = $3
@@ -52,7 +96,8 @@ OR (source_id = $2 AND destination_id = $1 AND state = 1)
	}
	// If both edges were updated, it was accepting an invite was successful.
	if rowsAffected, _ := res.RowsAffected(); rowsAffected == 2 {
		return nil
		isFriendAccept = true
		return err
	}

	// If no edge updates took place, it's a new invite being set up.
@@ -94,12 +139,12 @@ OR source_id = $3`,
	return nil
}

func friendAddHandle(logger *zap.Logger, db *sql.DB, userID []byte, friendHandle string) error {
func friendAddHandle(logger *zap.Logger, db *sql.DB, ns *NotificationService, userID []byte, handle string, friendHandle string) error {
	var friendIdBytes []byte
	err := db.QueryRow("SELECT id FROM users WHERE handle = $1", friendHandle).Scan(&friendIdBytes)
	if err != nil {
		return err
	}

	return friendAdd(logger, db, userID, friendIdBytes)
	return friendAdd(logger, db, ns, userID, handle, friendIdBytes)
}
+9 −0
Original line number Diff line number Diff line
@@ -28,6 +28,15 @@ import (
	"go.uber.org/zap"
)

const (
	NOTIFICATION_DM_REQUEST         int64 = 1
	NOTIFICATION_FRIEND_REQUEST     int64 = 2
	NOTIFICATION_FRIEND_ACCEPT      int64 = 3
	NOTIFICATION_GROUP_ADD          int64 = 4
	NOTIFICATION_GROUP_JOIN_REQUEST int64 = 5
	NOTIFICATION_FRIEND_JOIN_GAME   int64 = 6
)

type notificationResumableCursor struct {
	Expiry         int64
	NotificationID []byte
+50 −10
Original line number Diff line number Diff line
@@ -18,6 +18,7 @@ import (
	"database/sql"
	"errors"

	"encoding/json"
	"fmt"
	"github.com/lib/pq"
	"github.com/satori/go.uuid"
@@ -81,10 +82,12 @@ FROM users ` + filterQuery
	return users, nil
}

func (p *pipeline) addFacebookFriends(logger *zap.Logger, userID []byte, accessToken string) {
func (p *pipeline) addFacebookFriends(logger *zap.Logger, userID []byte, handle string, fbid string, accessToken string) {
	var tx *sql.Tx
	var err error

	ts := nowMs()
	friendUserIDs := make([]interface{}, 0)
	defer func() {
		if err != nil {
			logger.Error("Could not import friends from Facebook", zap.Error(err))
@@ -100,7 +103,39 @@ func (p *pipeline) addFacebookFriends(logger *zap.Logger, userID []byte, accessT
				if err != nil {
					logger.Error("Could not commit transaction", zap.Error(err))
				} else {
					logger.Info("Imported friends from Facebook")
					logger.Debug("Imported friends from Facebook")

					// Send out notifications.
					if len(friendUserIDs) != 0 {
						content, err := json.Marshal(map[string]interface{}{"handle": handle, "facebook_id": fbid})
						if err != nil {
							logger.Warn("Failed to send Facebook friend join notifications", zap.Error(err))
							return
						}
						subject := "Your friend has just joined the game"
						expiresAt := ts + p.notificationService.expiryMs

						notifications := make([]*NNotification, len(friendUserIDs))
						for i, friendUserID := range friendUserIDs {
							fid := friendUserID.([]byte)
							notifications[i] = &NNotification{
								Id:         uuid.NewV4().Bytes(),
								UserID:     fid,
								Subject:    subject,
								Content:    content,
								Code:       NOTIFICATION_FRIEND_JOIN_GAME,
								SenderID:   userID,
								CreatedAt:  ts,
								ExpiresAt:  expiresAt,
								Persistent: true,
							}
						}

						err = p.notificationService.NotificationSend(notifications)
						if err != nil {
							logger.Warn("Failed to send Facebook friend join notifications", zap.Error(err))
						}
					}
				}
			}
		}
@@ -135,11 +170,10 @@ func (p *pipeline) addFacebookFriends(logger *zap.Logger, userID []byte, accessT
	}
	defer rows.Close()

	updatedAt := nowMs()
	queryEdge := "INSERT INTO user_edge (source_id, position, updated_at, destination_id, state) VALUES "
	paramsEdge := []interface{}{userID, updatedAt}
	paramsEdge := []interface{}{userID, ts}
	queryEdgeMetadata := "UPDATE user_edge_metadata SET count = count + 1, updated_at = $1 WHERE source_id IN ("
	paramsEdgeMetadata := []interface{}{updatedAt}
	paramsEdgeMetadata := []interface{}{ts}
	for rows.Next() {
		var currentUser []byte
		err = rows.Scan(&currentUser)
@@ -181,7 +215,13 @@ func (p *pipeline) addFacebookFriends(logger *zap.Logger, userID []byte, accessT
		return
	}
	// Update edge metadata for current user to bump count by number of new friends.
	_, err = tx.Exec(`UPDATE user_edge_metadata SET count = $1, updated_at = $2 WHERE source_id = $3`, len(paramsEdge)-2, updatedAt, userID)
	_, err = tx.Exec(`UPDATE user_edge_metadata SET count = $1, updated_at = $2 WHERE source_id = $3`, len(paramsEdge)-2, ts, userID)
	if err != nil {
		return
	}

	// Track the user IDs to notify their friend has joined the game.
	friendUserIDs = paramsEdge[2:]
}

func (p *pipeline) getFriends(filterQuery string, userID []byte) ([]*Friend, error) {
@@ -277,13 +317,13 @@ func (p *pipeline) friendAddById(l *zap.Logger, session *session, envelope *Enve
		return
	}

	if err := friendAdd(logger, p.db, session.userID.Bytes(), friendID.Bytes()); err != nil {
	if err := friendAdd(logger, p.db, p.notificationService, session.userID.Bytes(), session.handle.Load(), friendID.Bytes()); err != nil {
		logger.Error("Could not add friend", zap.Error(err))
		session.Send(ErrorMessageRuntimeException(envelope.CollationId, "Failed to add friend"))
		return
	}

	logger.Info("Added friend")
	logger.Debug("Added friend")
	session.Send(&Envelope{CollationId: envelope.CollationId})
}

@@ -294,13 +334,13 @@ func (p *pipeline) friendAddByHandle(l *zap.Logger, session *session, envelope *
	}

	logger := l.With(zap.String("friend_handle", friendHandle))
	if err := friendAddHandle(logger, p.db, session.userID.Bytes(), friendHandle); err != nil {
	if err := friendAddHandle(logger, p.db, p.notificationService, session.userID.Bytes(), session.handle.Load(), friendHandle); err != nil {
		logger.Error("Could not add friend", zap.Error(err))
		session.Send(ErrorMessageRuntimeException(envelope.CollationId, "Failed to add friend"))
		return
	}

	logger.Info("Added friend")
	logger.Debug("Added friend")
	session.Send(&Envelope{CollationId: envelope.CollationId})
}

+105 −10
Original line number Diff line number Diff line
@@ -23,6 +23,7 @@ import (
	"strconv"
	"strings"

	"fmt"
	"github.com/lib/pq"
	"github.com/satori/go.uuid"
	"go.uber.org/zap"
@@ -498,6 +499,13 @@ func (p *pipeline) groupJoin(l *zap.Logger, session *session, envelope *Envelope

	logger := l.With(zap.String("group_id", groupID.String()))

	ts := nowMs()

	// Group admin user IDs to notify there's a new user join request, if the group is private.
	var groupName sql.NullString
	privateGroup := false
	adminUserIDs := make([][]byte, 0)

	tx, err := p.db.Begin()
	if err != nil {
		logger.Error("Could not add user to group", zap.Error(err))
@@ -522,31 +530,65 @@ func (p *pipeline) groupJoin(l *zap.Logger, session *session, envelope *Envelope
				logger.Info("User joined group")
				session.Send(&Envelope{CollationId: envelope.CollationId})

				if !privateGroup {
					// If the user was added directly.
					err = p.storeAndDeliverMessage(logger, session, &TopicId{Id: &TopicId_GroupId{GroupId: groupID.Bytes()}}, 1, []byte("{}"))
					if err != nil {
						logger.Error("Error handling group user join notification topic message", zap.Error(err))
					}
				} else if len(adminUserIDs) != 0 {
					// If the user has requested to join and there are admins to notify.
					handle := session.handle.Load()
					name := groupName.String
					content, err := json.Marshal(map[string]string{"handle": handle, "name": name})
					if err != nil {
						logger.Warn("Failed to send group join request notification", zap.Error(err))
						return
					}
					subject := fmt.Sprintf("%v wants to join your group %v", handle, name)
					userID := session.userID.Bytes()
					expiresAt := ts + p.notificationService.expiryMs

					notifications := make([]*NNotification, len(adminUserIDs))
					for i, adminUserID := range adminUserIDs {
						notifications[i] = &NNotification{
							Id:         uuid.NewV4().Bytes(),
							UserID:     adminUserID,
							Subject:    subject,
							Content:    content,
							Code:       NOTIFICATION_GROUP_JOIN_REQUEST,
							SenderID:   userID,
							CreatedAt:  ts,
							ExpiresAt:  expiresAt,
							Persistent: true,
						}
					}

					err = p.notificationService.NotificationSend(notifications)
					if err != nil {
						logger.Warn("Failed to send group join request notification", zap.Error(err))
					}
				}
			}
		}
	}()

	var groupState sql.NullInt64
	err = tx.QueryRow("SELECT state FROM groups WHERE id = $1 AND disabled_at = 0", groupID.Bytes()).Scan(&groupState)
	err = tx.QueryRow("SELECT state, name FROM groups WHERE id = $1 AND disabled_at = 0", groupID.Bytes()).Scan(&groupState, &groupName)
	if err != nil {
		return
	}

	userState := 1
	if groupState.Int64 == 1 {
		privateGroup = true
		userState = 2
	}

	updatedAt := nowMs()

	res, err := tx.Exec(`
INSERT INTO group_edge (source_id, position, updated_at, destination_id, state)
VALUES ($1, $2, $2, $3, $4), ($3, $2, $2, $1, $4)`,
		groupID.Bytes(), updatedAt, session.userID.Bytes(), userState)
		groupID.Bytes(), ts, session.userID.Bytes(), userState)

	if err != nil {
		return
@@ -557,12 +599,33 @@ VALUES ($1, $2, $2, $3, $4), ($3, $2, $2, $1, $4)`,
		return
	}

	if groupState.Int64 == 0 {
		_, err = tx.Exec("UPDATE groups SET count = count + 1, updated_at = $2 WHERE id = $1", groupID.Bytes(), updatedAt)
	// If the group is not private and the user joined directly, increase the group count.
	if !privateGroup {
		_, err = tx.Exec("UPDATE groups SET count = count + 1, updated_at = $2 WHERE id = $1", groupID.Bytes(), ts)
	}
	if err != nil {
		return
	}

	// If group is private, look up admin user IDs to notify about a new user requesting to join.
	if privateGroup {
		rows, e := tx.Query("SELECT destination_id FROM group_edge WHERE source_id = $1 AND state = 0", groupID.Bytes())
		if e != nil {
			logger.Warn("Failed to send group join request notification", zap.Error(e))
			return
		}
		defer rows.Close()

		for rows.Next() {
			var adminUserID []byte
			e = rows.Scan(&adminUserID)
			if e != nil {
				logger.Warn("Failed to send group join request notification", zap.Error(e))
				return
			}
			adminUserIDs = append(adminUserIDs, adminUserID)
		}
	}
}

func (p *pipeline) groupLeave(l *zap.Logger, session *session, envelope *Envelope) {
@@ -709,7 +772,9 @@ func (p *pipeline) groupUserAdd(l *zap.Logger, session *session, envelope *Envel
	}

	logger := l.With(zap.String("group_id", groupID.String()), zap.String("user_id", userID.String()))
	ts := nowMs()
	var handle string
	var name string

	tx, err := p.db.Begin()
	if err != nil {
@@ -743,6 +808,30 @@ func (p *pipeline) groupUserAdd(l *zap.Logger, session *session, envelope *Envel
				err = p.storeAndDeliverMessage(logger, session, &TopicId{Id: &TopicId_GroupId{GroupId: groupID.Bytes()}}, 2, data)
				if err != nil {
					logger.Error("Error handling group user added notification topic message", zap.Error(err))
					return
				}

				adminHandle := session.handle.Load()
				content, err := json.Marshal(map[string]string{"handle": adminHandle, "name": name})
				if err != nil {
					logger.Warn("Failed to send group add notification", zap.Error(err))
					return
				}
				err = p.notificationService.NotificationSend([]*NNotification{
					&NNotification{
						Id:         uuid.NewV4().Bytes(),
						UserID:     userID.Bytes(),
						Subject:    fmt.Sprintf("%v has added you to group %v", adminHandle, name),
						Content:    content,
						Code:       NOTIFICATION_GROUP_ADD,
						SenderID:   session.userID.Bytes(),
						CreatedAt:  ts,
						ExpiresAt:  ts + p.notificationService.expiryMs,
						Persistent: true,
					},
				})
				if err != nil {
					logger.Warn("Failed to send group add notification", zap.Error(err))
				}
			}
		}
@@ -754,6 +843,12 @@ func (p *pipeline) groupUserAdd(l *zap.Logger, session *session, envelope *Envel
		return
	}

	// Look up the name of the group.
	err = tx.QueryRow("SELECT name FROM groups WHERE id = $1", groupID.Bytes()).Scan(&name)
	if err != nil {
		return
	}

	res, err := tx.Exec(`
INSERT INTO group_edge (source_id, position, updated_at, destination_id, state)
SELECT data.id, data.position, data.updated_at, data.destination, data.state
@@ -768,7 +863,7 @@ AND
  EXISTS (SELECT id FROM groups WHERE id = $1::BYTEA AND disabled_at = 0)
ON CONFLICT (source_id, destination_id)
DO UPDATE SET state = 1, updated_at = $2::INT`,
		groupID.Bytes(), nowMs(), userID.Bytes(), session.userID.Bytes())
		groupID.Bytes(), ts, userID.Bytes(), session.userID.Bytes())

	if err != nil {
		return
Loading