Unverified Commit 50928e95 authored by Fernando Takagi's avatar Fernando Takagi Committed by GitHub
Browse files

Add server runtime function to send notification to all users. (#810)

parent f51c8874
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -8,6 +8,7 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr
- Add GroupUsersBan function to all runtimes.
- Add LeaderboardRecordsHaystack to all runtimes.
- Add Groups page and associated endpoints to the developer console.
- Add NotificationSendAll function to the runtimes, for sending a notification to all users.
- Log a warning when client IP address cannot be resolved.

### Changed
+1 −1
Original line number Diff line number Diff line
@@ -13,7 +13,7 @@ require (
	github.com/gorilla/mux v1.8.0
	github.com/gorilla/websocket v1.4.2
	github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0
	github.com/heroiclabs/nakama-common v1.21.1-0.20220317110306-60fbe58e3b1a
	github.com/heroiclabs/nakama-common v1.21.1-0.20220319173518-73e58191b475
	github.com/jackc/pgconn v1.10.0
	github.com/jackc/pgerrcode v0.0.0-20201024163028-a0d42d470451
	github.com/jackc/pgtype v1.8.1
+2 −2
Original line number Diff line number Diff line
@@ -257,8 +257,8 @@ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/heroiclabs/nakama-common v1.21.1-0.20220315125242-f39e5bc77bdb h1:FoxvzXI7Hna6Om/6VZCTwzEoflg+fXl+AxbRvyHu7cM=
github.com/heroiclabs/nakama-common v1.21.1-0.20220315125242-f39e5bc77bdb/go.mod h1:WF4YG46afwY3ibzsXnkt3zvhQ3tBY03IYeU7xSLr8HE=
github.com/heroiclabs/nakama-common v1.21.1-0.20220317110306-60fbe58e3b1a h1:VuU7YNVN/urZqNNcS05mF1U8UUpGlP1e7VtuH4lVJLs=
github.com/heroiclabs/nakama-common v1.21.1-0.20220317110306-60fbe58e3b1a/go.mod h1:WF4YG46afwY3ibzsXnkt3zvhQ3tBY03IYeU7xSLr8HE=
github.com/heroiclabs/nakama-common v1.21.1-0.20220319173518-73e58191b475 h1:50c0dq/Su29YFX/fTVhHIIZBeNlHrgMep29GalDK1Xo=
github.com/heroiclabs/nakama-common v1.21.1-0.20220319173518-73e58191b475/go.mod h1:WF4YG46afwY3ibzsXnkt3zvhQ3tBY03IYeU7xSLr8HE=
github.com/huandu/xstrings v1.3.2/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
+90 −0
Original line number Diff line number Diff line
@@ -20,6 +20,7 @@ import (
	"database/sql"
	"encoding/base64"
	"encoding/gob"
	"fmt"
	"strconv"
	"strings"
	"time"
@@ -83,6 +84,95 @@ func NotificationSend(ctx context.Context, logger *zap.Logger, db *sql.DB, messa
	return nil
}

func NotificationSendAll(ctx context.Context, logger *zap.Logger, db *sql.DB, tracker Tracker, messageRouter MessageRouter, notification *api.Notification) error {
	env := &rtapi.Envelope{
		Message: &rtapi.Envelope_Notifications{
			Notifications: &rtapi.Notifications{
				Notifications: []*api.Notification{notification},
			},
		},
	}

	// Non-persistent notifications don't need to work through all database users, just use currently connected notification streams.
	if !notification.Persistent {
		notificationStreamMode := StreamModeNotifications
		streams := tracker.CountByStreamModeFilter(map[uint8]*uint8{StreamModeNotifications: &notificationStreamMode})
		for streamPtr, count := range streams {
			if streamPtr == nil || count == 0 {
				continue
			}
			messageRouter.SendToStream(logger, *streamPtr, env, true)
		}
		return nil
	}

	const limit = 10_000

	// Start dispatch in paginated batches.
	go func() {
		// Switch to a background context, the caller should not wait for the full operation to complete.
		ctx := context.Background()
		notificationLogger := logger.With(zap.String("notification_subject", notification.Subject))

		var userIDStr string
		for {
			sends := make(map[uuid.UUID][]*api.Notification, limit)

			params := make([]interface{}, 0, 1)
			query := "SELECT id FROM users"
			if userIDStr != "" {
				query += " AND id > $1"
				params = append(params, userIDStr)
			}
			query += fmt.Sprintf(" ORDER BY id ASC LIMIT %d", limit)

			rows, err := db.QueryContext(ctx, query, params...)
			if err != nil {
				notificationLogger.Error("Failed to retrieve user data to send notification", zap.Error(err))
				return
			}

			for rows.Next() {
				if err = rows.Scan(&userIDStr); err != nil {
					_ = rows.Close()
					notificationLogger.Error("Failed to scan user data to send notification", zap.String("id", userIDStr), zap.Error(err))
					return
				}
				userID, err := uuid.FromString(userIDStr)
				if err != nil {
					_ = rows.Close()
					notificationLogger.Error("Failed to parse scanned user id data to send notification", zap.String("id", userIDStr), zap.Error(err))
					return
				}
				sends[userID] = []*api.Notification{notification}
			}
			_ = rows.Close()

			if len(sends) == 0 {
				// Pagination finished.
				return
			}

			if err := NotificationSave(ctx, notificationLogger, db, sends); err != nil {
				notificationLogger.Error("Failed to save persistent notifications", zap.Error(err))
				return
			}

			// Deliver live notifications to connected users.
			for userID, _ := range sends {
				messageRouter.SendToStream(logger, PresenceStream{Mode: StreamModeNotifications, Subject: userID}, env, true)
			}

			// Stop pagination when reaching the last (incomplete) page.
			if len(sends) < limit {
				return
			}
		}
	}()

	return nil
}

func NotificationList(ctx context.Context, logger *zap.Logger, db *sql.DB, userID uuid.UUID, limit int, cursor string, nc *notificationCacheableCursor) (*api.NotificationList, error) {
	params := []interface{}{userID}

+39 −0
Original line number Diff line number Diff line
@@ -1649,6 +1649,45 @@ func (n *RuntimeGoNakamaModule) NotificationsSend(ctx context.Context, notificat
	return NotificationSend(ctx, n.logger, n.db, n.router, ns)
}

// @group notifications
// @summary Send an in-app notification to all users.
// @param ctx(type=context.Context) The context object represents information about the server and requester.
// @param subject(type=string) Notification subject.
// @param content(type=map[string]interface{}) Notification content. Must be set but can be any empty map.
// @param code(type=int) Notification code to use. Must be greater than or equal to 0.
// @param persistent(type=bool) Whether to record this in the database for later listing.
// @return error(error) An optional error value if an error occurred.
func (n *RuntimeGoNakamaModule) NotificationSendAll(ctx context.Context, subject string, content map[string]interface{}, code int, persistent bool) error {
	if subject == "" {
		return errors.New("expects subject to be a non-empty string")
	}

	contentBytes, err := json.Marshal(content)
	if err != nil {
		return fmt.Errorf("failed to convert content: %s", err.Error())
	}
	contentString := string(contentBytes)

	if code <= 0 {
		return errors.New("expects code to number above 0")
	}

	senderID := uuid.Nil.String()
	createTime := &timestamppb.Timestamp{Seconds: time.Now().UTC().Unix()}

	not := &api.Notification{
		Id:         uuid.Must(uuid.NewV4()).String(),
		Subject:    subject,
		Content:    contentString,
		Code:       int32(code),
		SenderId:   senderID,
		Persistent: persistent,
		CreateTime: createTime,
	}

	return NotificationSendAll(ctx, n.logger, n.db, n.tracker, n.router, not)
}

// @group wallets
// @summary Update a user's wallet with the given changeset.
// @param ctx(type=context.Context) The context object represents information about the server and requester.
Loading