Commit 7e18b09d authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Allow subscribing to status updates directly from other users. (#189)

parent 194c11d0
Loading
Loading
Loading
Loading
+413 −139

File changed.

Preview size limit exceeded, changes collapsed.

+45 −3
Original line number Diff line number Diff line
@@ -83,10 +83,20 @@ message Envelope {
    Notifications notifications = 23;
    // RPC call or response.
    api.Rpc rpc = 24;
    // An incoming status snapshot for some set of users.
    Status status = 25;
    // Start following some set of users to receive their status updates.
    StatusFollow status_follow = 26;
    // An incoming status update.
    StatusPresenceEvent status_presence_event = 27;
    // Stop following some set of users to no longer receive their status updates.
    StatusUnfollow status_unfollow = 28;
    // Set the user's own status.
    StatusUpdate status_update = 29;
    // A data message delivered over a stream.
    StreamData stream_data = 25;
    StreamData stream_data = 30;
    // Presence update for a particular stream.
    StreamPresenceEvent stream_presence_event = 26;
    StreamPresenceEvent stream_presence_event = 31;
  }
}

@@ -341,6 +351,38 @@ message Notifications {
  repeated api.Notification notifications = 1;
}

// A snapshot of statuses for some set of users.
message Status {
  // User statuses.
  repeated UserPresence presences = 1;
}

// Start receiving status updates for some set of users.
message StatusFollow {
  // Users to follow.
  repeated string user_ids = 1;
}

// A batch of status updates for a given user.
message StatusPresenceEvent {
  // New statuses for the user.
  repeated UserPresence joins = 2;
  // Previous statuses for the user.
  repeated UserPresence leaves = 3;
}

// Stop receiving status updates for some set of users.
message StatusUnfollow {
  // Users to unfollow.
  repeated string user_ids = 1;
}

// Set the user's own status.
message StatusUpdate {
  // Status string to set, if not present the user will appear offline.
  google.protobuf.StringValue status = 1;
}

// Represents identifying information for a stream.
message Stream {
  // Mode identifies the type of stream.
@@ -384,5 +426,5 @@ message UserPresence {
  // Whether this presence generates persistent data/messages, if applicable for the stream type.
  bool persistence = 4;
  // A user-set status message for this stream, if applicable.
  string status = 5;
  google.protobuf.StringValue status = 5;
}
+6 −0
Original line number Diff line number Diff line
@@ -142,6 +142,12 @@ func (p *Pipeline) ProcessRequest(logger *zap.Logger, session Session, envelope
		p.matchmakerRemove(logger, session, envelope)
	case *rtapi.Envelope_Rpc:
		p.rpc(logger, session, envelope)
	case *rtapi.Envelope_StatusFollow:
		p.statusFollow(logger, session, envelope)
	case *rtapi.Envelope_StatusUnfollow:
		p.statusUnfollow(logger, session, envelope)
	case *rtapi.Envelope_StatusUpdate:
		p.statusUpdate(logger, session, envelope)
	default:
		// If we reached this point the envelope was valid but the contents are missing or unknown.
		// Usually caused by a version mismatch, and should cause the session making this pipeline request to close.
+166 −0
Original line number Diff line number Diff line
// Copyright 2018 The Nakama Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
	"github.com/golang/protobuf/ptypes/wrappers"
	"github.com/heroiclabs/nakama/rtapi"
	"github.com/satori/go.uuid"
	"go.uber.org/zap"
	"strconv"
	"strings"
)

func (p *Pipeline) statusFollow(logger *zap.Logger, session Session, envelope *rtapi.Envelope) {
	incoming := envelope.GetStatusFollow()

	if len(incoming.UserIds) == 0 {
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Status{Status: &rtapi.Status{
			Presences: make([]*rtapi.UserPresence, 0),
		}}})
		return
	}

	uniqueUserIDs := make(map[uuid.UUID]struct{}, len(incoming.UserIds))
	for _, uid := range incoming.UserIds {
		userID, err := uuid.FromString(uid)
		if err != nil {
			session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
				Code:    int32(rtapi.Error_BAD_INPUT),
				Message: "Invalid user identifier",
			}}})
			return
		}
		uniqueUserIDs[userID] = struct{}{}
	}

	userIDs := make([]interface{}, 0, len(uniqueUserIDs))
	statements := make([]string, 0, len(uniqueUserIDs))
	index := 1
	for userID, _ := range uniqueUserIDs {
		userIDs = append(userIDs, userID)
		statements = append(statements, "$"+strconv.Itoa(index)+"::UUID")
		index++
	}

	query := "SELECT COUNT(id) FROM users WHERE id IN (" + strings.Join(statements, ", ") + ")"
	var dbCount int
	err := p.db.QueryRow(query, userIDs...).Scan(&dbCount)
	if err != nil {
		logger.Error("Error checking users in status follow", zap.Error(err))
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_RUNTIME_EXCEPTION),
			Message: "Could not check users",
		}}})
		return
	}
	if dbCount != len(userIDs) {
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_BAD_INPUT),
			Message: "One or more users do not exist",
		}}})
		return
	}

	presences := make([]*rtapi.UserPresence, 0, len(userIDs))
	for userID, _ := range uniqueUserIDs {
		stream := PresenceStream{Mode: StreamModeStatus, Subject: userID}
		success, _ := p.tracker.Track(session.ID(), stream, session.UserID(), PresenceMeta{Format: session.Format(), Username: session.Username(), Hidden: true}, false)
		if !success {
			session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
				Code:    int32(rtapi.Error_RUNTIME_EXCEPTION),
				Message: "Could not follow user status",
			}}})
			return
		}

		ps := p.tracker.ListByStream(stream, false)
		for _, p := range ps {
			presences = append(presences, &rtapi.UserPresence{
				UserId:    p.UserID.String(),
				SessionId: p.ID.SessionID.String(),
				Username:  p.Meta.Username,
				Status:    &wrappers.StringValue{Value: p.Meta.Status},
			})
		}
	}

	session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Status{Status: &rtapi.Status{
		Presences: presences,
	}}})
}

func (p *Pipeline) statusUnfollow(logger *zap.Logger, session Session, envelope *rtapi.Envelope) {
	incoming := envelope.GetStatusUnfollow()

	if len(incoming.UserIds) == 0 {
		session.Send(&rtapi.Envelope{Cid: envelope.Cid})
		return
	}

	userIDs := make([]uuid.UUID, 0, len(incoming.UserIds))
	for _, uid := range incoming.UserIds {
		userID, err := uuid.FromString(uid)
		if err != nil {
			session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
				Code:    int32(rtapi.Error_BAD_INPUT),
				Message: "Invalid user identifier",
			}}})
			return
		}
		userIDs = append(userIDs, userID)
	}

	for _, userID := range userIDs {
		p.tracker.Untrack(session.ID(), PresenceStream{Mode: StreamModeStatus, Subject: userID}, session.UserID())
	}

	session.Send(&rtapi.Envelope{Cid: envelope.Cid})
}

func (p *Pipeline) statusUpdate(logger *zap.Logger, session Session, envelope *rtapi.Envelope) {
	incoming := envelope.GetStatusUpdate()

	if incoming.Status == nil {
		p.tracker.Untrack(session.ID(), PresenceStream{Mode: StreamModeStatus, Subject: session.UserID()}, session.UserID())

		session.Send(&rtapi.Envelope{Cid: envelope.Cid})
		return
	}

	if len(incoming.Status.Value) > 128 {
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_BAD_INPUT),
			Message: "Status must be 128 characters or less",
		}}})
		return
	}

	success := p.tracker.Update(session.ID(), PresenceStream{Mode: StreamModeStatus, Subject: session.UserID()}, session.UserID(), PresenceMeta{
		Format:   session.Format(),
		Username: session.Username(),
		Status:   incoming.Status.Value,
	}, false)

	if !success {
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_RUNTIME_EXCEPTION),
			Message: "Error tracking status update",
		}}})
		return
	}

	session.Send(&rtapi.Envelope{Cid: envelope.Cid})
}
+8 −0
Original line number Diff line number Diff line
@@ -43,6 +43,11 @@ func NewSocketWsAcceptor(logger *zap.Logger, config Config, sessionRegistry *Ses
			return
		}

		status := true
		if r.URL.Query().Get("status") == "false" {
			status = false
		}

		// Upgrade to WebSocket.
		conn, err := upgrader.Upgrade(w, r, nil)
		if err != nil {
@@ -59,6 +64,9 @@ func NewSocketWsAcceptor(logger *zap.Logger, config Config, sessionRegistry *Ses

		// Register initial presences for this session.
		tracker.Track(s.ID(), PresenceStream{Mode: StreamModeNotifications, Subject: s.UserID()}, s.UserID(), PresenceMeta{Format: s.Format(), Username: s.Username(), Hidden: true}, true)
		if status {
			tracker.Track(s.ID(), PresenceStream{Mode: StreamModeStatus, Subject: s.UserID()}, s.UserID(), PresenceMeta{Format: s.Format(), Username: s.Username(), Status: ""}, false)
		}

		// Allow the server to begin processing incoming messages from this session.
		s.Consume(pipeline.ProcessRequest)
Loading