From 2dff888a7ba9ec1dd33c41d1f334ad68440e72b5 Mon Sep 17 00:00:00 2001 From: Andrei Mihu Date: Sun, 12 Feb 2017 23:15:45 +0100 Subject: [PATCH] Simplify protobuf presence messages across chat and multiplayer. --- CHANGELOG.md | 8 ++---- server/api.proto | 34 ++++++++++------------- server/pipeline_group.go | 10 +++---- server/pipeline_match.go | 6 ++-- server/pipeline_topic.go | 55 ++++++++++++++++++------------------- server/presence_notifier.go | 30 ++++++++++---------- 6 files changed, 68 insertions(+), 75 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fd9fdfd55..f95446a66 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,14 +4,12 @@ All notable changes to this project are documented below. The format is based on [keep a changelog](http://keepachangelog.com/) and this project uses [semantic versioning](http://semver.org/). ## [Unreleased] - ### Changed - -- Configuration in dashboard is now displayed as YAML. +- Server configuration in dashboard is now displayed as YAML. +- Update server protocol to simplify presence messages across chat and multiplayer. ### Fixed - -- Improve group SQL query with type information. +- Work around a limitation in cockroachdb with type information in group sub-queries. ## [0.11.0] - 2017-02-09 ### Added diff --git a/server/api.proto b/server/api.proto index ea88a64c3..0b0912979 100644 --- a/server/api.proto +++ b/server/api.proto @@ -328,7 +328,7 @@ message TGroupUserPromote { bytes user_id = 2; } -message Topic { +message TopicId { oneof id { bytes dm = 1; bytes room = 2; @@ -336,7 +336,7 @@ message Topic { } } -message TopicUser { +message UserPresence { bytes user_id = 1; bytes session_id = 2; } @@ -349,16 +349,17 @@ message TTopicJoin { } } message TTopic { - Topic topic = 1; - repeated TopicUser users = 2; + TopicId topic = 1; + repeated UserPresence presences = 2; + UserPresence self = 3; } message TTopicLeave { - Topic topic = 1; + TopicId topic = 1; } message TTopicMessageSend { - Topic topic = 1; + TopicId topic = 1; bytes data = 2; } @@ -370,7 +371,7 @@ message TTopicMessageAck { } message TopicMessage { - Topic topic = 1; + TopicId topic = 1; bytes user_id = 2; bytes message_id = 3; int64 created_at = 4; @@ -396,9 +397,9 @@ message TTopicMessages { } message TopicPresence { - Topic topic = 1; - repeated TopicUser joins = 2; - repeated TopicUser leaves = 3; + TopicId topic = 1; + repeated UserPresence joins = 2; + repeated UserPresence leaves = 3; } message TMatchCreate {} @@ -406,16 +407,11 @@ message TMatch { bytes id = 1; } -message MatchUser { - bytes user_id = 1; - bytes session_id = 2; -} - message TMatchJoin { bytes match_id = 1; } message TMatchUsers { - repeated MatchUser users = 1; + repeated UserPresence users = 1; } message TMatchDataSend { @@ -426,7 +422,7 @@ message TMatchDataSend { message MatchData { bytes match_id = 1; - MatchUser user = 2; + UserPresence user = 2; int64 op_code = 3; bytes data = 4; } @@ -437,8 +433,8 @@ message TMatchLeave { message MatchPresence { bytes match_id = 1; - repeated MatchUser joins = 2; - repeated MatchUser leaves = 3; + repeated UserPresence joins = 2; + repeated UserPresence leaves = 3; } message TStorageFetch { diff --git a/server/pipeline_group.go b/server/pipeline_group.go index 6b8729fb8..15ee294b1 100644 --- a/server/pipeline_group.go +++ b/server/pipeline_group.go @@ -663,7 +663,7 @@ VALUES ($1, $2, $2, $3, $4), ($3, $2, $2, $1, $4)`, return } - p.storeAndDeliverMessage(logger, session, &Topic{Id: &Topic_GroupId{GroupId: groupID.Bytes()}}, 1, []byte("{}")) + p.storeAndDeliverMessage(logger, session, &TopicId{Id: &TopicId_GroupId{GroupId: groupID.Bytes()}}, 1, []byte("{}")) } func (p *pipeline) groupLeave(l zap.Logger, session *session, envelope *Envelope) { @@ -770,7 +770,7 @@ OR return } - p.storeAndDeliverMessage(logger, session, &Topic{Id: &Topic_GroupId{GroupId: groupID.Bytes()}}, 3, []byte("{}")) + p.storeAndDeliverMessage(logger, session, &TopicId{Id: &TopicId_GroupId{GroupId: groupID.Bytes()}}, 3, []byte("{}")) } func (p *pipeline) groupUserAdd(l zap.Logger, session *session, envelope *Envelope) { @@ -859,7 +859,7 @@ DO UPDATE SET state = 1, updated_at = $2::INT`, } data, err := json.Marshal(map[string]string{"user_id": userID.String(), "handle": handle}) - p.storeAndDeliverMessage(logger, session, &Topic{Id: &Topic_GroupId{GroupId: groupID.Bytes()}}, 2, data) + p.storeAndDeliverMessage(logger, session, &TopicId{Id: &TopicId_GroupId{GroupId: groupID.Bytes()}}, 2, data) } func (p *pipeline) groupUserKick(l zap.Logger, session *session, envelope *Envelope) { @@ -953,7 +953,7 @@ AND } data, err := json.Marshal(map[string]string{"user_id": userID.String(), "handle": handle}) - p.storeAndDeliverMessage(logger, session, &Topic{Id: &Topic_GroupId{GroupId: groupID.Bytes()}}, 4, data) + p.storeAndDeliverMessage(logger, session, &TopicId{Id: &TopicId_GroupId{GroupId: groupID.Bytes()}}, 4, data) } func (p *pipeline) groupUserPromote(l zap.Logger, session *session, envelope *Envelope) { @@ -1011,7 +1011,7 @@ AND } data, _ := json.Marshal(map[string]string{"user_id": userID.String(), "handle": handle}) - p.storeAndDeliverMessage(logger, session, &Topic{Id: &Topic_GroupId{GroupId: groupID.Bytes()}}, 5, data) + p.storeAndDeliverMessage(logger, session, &TopicId{Id: &TopicId_GroupId{GroupId: groupID.Bytes()}}, 5, data) session.Send(&Envelope{CollationId: envelope.CollationId}) } diff --git a/server/pipeline_match.go b/server/pipeline_match.go index ad923319b..39c9c2dee 100644 --- a/server/pipeline_match.go +++ b/server/pipeline_match.go @@ -44,10 +44,10 @@ func (p *pipeline) matchJoin(logger zap.Logger, session *session, envelope *Enve p.tracker.Track(session.id, topic, session.userID, PresenceMeta{}) - users := make([]*MatchUser, len(ps)) + users := make([]*UserPresence, len(ps)) for i := 0; i < len(ps); i++ { p := ps[i] - users[i] = &MatchUser{ + users[i] = &UserPresence{ UserId: p.UserID.Bytes(), SessionId: p.ID.SessionID.Bytes(), } @@ -133,7 +133,7 @@ func (p *pipeline) matchDataSend(logger zap.Logger, session *session, envelope * Payload: &Envelope_MatchData{ MatchData: &MatchData{ MatchId: matchIDBytes, - User: &MatchUser{ + User: &UserPresence{ UserId: session.userID.Bytes(), SessionId: session.id.Bytes(), }, diff --git a/server/pipeline_topic.go b/server/pipeline_topic.go index a33bfc0e4..68090fb23 100644 --- a/server/pipeline_topic.go +++ b/server/pipeline_topic.go @@ -37,7 +37,7 @@ var invalidRoomRegex = regexp.MustCompilePOSIX("[[:cntrl:]]+") func (p *pipeline) topicJoin(logger zap.Logger, session *session, envelope *Envelope) { id := envelope.GetTopicJoin() - var topic *Topic + var topic *TopicId var trackerTopic string switch id.Id.(type) { case *TTopicJoin_UserId: @@ -69,10 +69,10 @@ func (p *pipeline) topicJoin(logger zap.Logger, session *session, envelope *Enve userIDString := session.userID.String() otherUserIDString := otherUserID.String() if userIDString < otherUserIDString { - topic = &Topic{Id: &Topic_Dm{Dm: append(session.userID.Bytes(), otherUserIDBytes...)}} + topic = &TopicId{Id: &TopicId_Dm{Dm: append(session.userID.Bytes(), otherUserIDBytes...)}} trackerTopic = "dm:" + userIDString + ":" + otherUserIDString } else { - topic = &Topic{Id: &Topic_Dm{Dm: append(otherUserIDBytes, session.userID.Bytes()...)}} + topic = &TopicId{Id: &TopicId_Dm{Dm: append(otherUserIDBytes, session.userID.Bytes()...)}} trackerTopic = "dm:" + otherUserIDString + ":" + userIDString } case *TTopicJoin_Room: @@ -91,7 +91,7 @@ func (p *pipeline) topicJoin(logger zap.Logger, session *session, envelope *Enve return } - topic = &Topic{Id: &Topic_Room{Room: room}} + topic = &TopicId{Id: &TopicId_Room{Room: room}} trackerTopic = "room:" + string(room) case *TTopicJoin_GroupId: // Check input is valid ID. @@ -114,7 +114,7 @@ func (p *pipeline) topicJoin(logger zap.Logger, session *session, envelope *Enve } trackerTopic = "group:" + groupID.String() - topic = &Topic{Id: &Topic_GroupId{GroupId: groupIDBytes}} + topic = &TopicId{Id: &TopicId_GroupId{GroupId: groupIDBytes}} case nil: session.Send(&Envelope{CollationId: envelope.CollationId, Payload: &Envelope_Error{&Error{Reason: "No topic ID found"}}}) return @@ -126,25 +126,24 @@ func (p *pipeline) topicJoin(logger zap.Logger, session *session, envelope *Enve // Track the presence, and gather current member list. p.tracker.Track(session.id, trackerTopic, session.userID, PresenceMeta{}) presences := p.tracker.ListByTopic(trackerTopic) - users := make([]*TopicUser, len(presences)-1) - j := 0 + + userPresences := make([]*UserPresence, len(presences)) for i := 0; i < len(presences); i++ { - // Do not report this session to itself. - if presences[i].UserID == session.userID && presences[i].ID.SessionID == session.id { - j += -1 - } else { - users[i+j] = &TopicUser{UserId: presences[i].UserID.Bytes(), SessionId: presences[i].ID.SessionID.Bytes()} - } + userPresences[i] = &UserPresence{UserId: presences[i].UserID.Bytes(), SessionId: presences[i].ID.SessionID.Bytes()} } - session.Send(&Envelope{CollationId: envelope.CollationId, Payload: &Envelope_Topic{Topic: &TTopic{Topic: topic, Users: users}}}) + session.Send(&Envelope{CollationId: envelope.CollationId, Payload: &Envelope_Topic{Topic: &TTopic{ + Topic: topic, + Presences: userPresences, + Self: &UserPresence{UserId: session.userID.Bytes(), SessionId: session.id.Bytes()}, + }}}) } func (p *pipeline) topicLeave(logger zap.Logger, session *session, envelope *Envelope) { topic := envelope.GetTopicLeave().Topic var trackerTopic string switch topic.Id.(type) { - case *Topic_Dm: + case *TopicId_Dm: // Check input is valid DM topic. bothUserIDBytes := topic.GetDm() if bothUserIDBytes == nil || len(bothUserIDBytes) != 32 { @@ -187,7 +186,7 @@ func (p *pipeline) topicLeave(logger zap.Logger, session *session, envelope *Env } trackerTopic = "dm:" + userID1String + ":" + userID2String - case *Topic_Room: + case *TopicId_Room: // Check input is valid room name. room := topic.GetRoom() if room == nil || len(room) < 1 || len(room) > 64 { @@ -204,7 +203,7 @@ func (p *pipeline) topicLeave(logger zap.Logger, session *session, envelope *Env } trackerTopic = "room:" + string(room) - case *Topic_GroupId: + case *TopicId_GroupId: // Check input is valid ID. groupIDBytes := topic.GetGroupId() groupID, err := uuid.FromBytes(groupIDBytes) @@ -248,7 +247,7 @@ func (p *pipeline) topicMessageSend(logger zap.Logger, session *session, envelop var trackerTopic string switch topic.Id.(type) { - case *Topic_Dm: + case *TopicId_Dm: // Check input is valid DM topic. bothUserIDBytes := topic.GetDm() if bothUserIDBytes == nil || len(bothUserIDBytes) != 32 { @@ -291,7 +290,7 @@ func (p *pipeline) topicMessageSend(logger zap.Logger, session *session, envelop } trackerTopic = "dm:" + userID1String + ":" + userID2String - case *Topic_Room: + case *TopicId_Room: // Check input is valid room name. room := topic.GetRoom() if room == nil || len(room) < 1 || len(room) > 64 { @@ -308,7 +307,7 @@ func (p *pipeline) topicMessageSend(logger zap.Logger, session *session, envelop } trackerTopic = "room:" + string(room) - case *Topic_GroupId: + case *TopicId_GroupId: // Check input is valid ID. groupIDBytes := topic.GetGroupId() groupID, err := uuid.FromBytes(groupIDBytes) @@ -357,7 +356,7 @@ func (p *pipeline) topicMessagesList(logger zap.Logger, session *session, envelo return } - var topic *Topic + var topic *TopicId var topicBytes []byte var topicType int64 switch input.Id.(type) { @@ -383,7 +382,7 @@ func (p *pipeline) topicMessagesList(logger zap.Logger, session *session, envelo } else { topicBytes = append(otherUserIDBytes, session.userID.Bytes()...) } - topic = &Topic{Id: &Topic_Dm{Dm: topicBytes}} + topic = &TopicId{Id: &TopicId_Dm{Dm: topicBytes}} topicType = 0 case *TTopicMessagesList_Room: // Check input is valid room name. @@ -401,7 +400,7 @@ func (p *pipeline) topicMessagesList(logger zap.Logger, session *session, envelo return } - topic = &Topic{Id: &Topic_Room{Room: room}} + topic = &TopicId{Id: &TopicId_Room{Room: room}} topicBytes = room topicType = 1 case *TTopicMessagesList_GroupId: @@ -424,7 +423,7 @@ func (p *pipeline) topicMessagesList(logger zap.Logger, session *session, envelo return } - topic = &Topic{Id: &Topic_GroupId{GroupId: groupIDBytes}} + topic = &TopicId{Id: &TopicId_GroupId{GroupId: groupIDBytes}} topicBytes = groupIDBytes topicType = 2 case nil: @@ -552,12 +551,12 @@ AND ue.source_id = $2`, checkUserID, blocksUserID).Scan(&uid, &state) } // Assumes `topic` has already been validated, or was constructed internally. -func (p *pipeline) storeAndDeliverMessage(logger zap.Logger, session *session, topic *Topic, msgType int64, data []byte) ([]byte, string, int64, int64, error) { +func (p *pipeline) storeAndDeliverMessage(logger zap.Logger, session *session, topic *TopicId, msgType int64, data []byte) ([]byte, string, int64, int64, error) { var trackerTopic string var topicBytes []byte var topicType int64 switch topic.Id.(type) { - case *Topic_Dm: + case *TopicId_Dm: bothUserIDBytes := topic.GetDm() userID1 := uuid.FromBytesOrNil(bothUserIDBytes[:16]) userID2 := uuid.FromBytesOrNil(bothUserIDBytes[16:]) @@ -565,11 +564,11 @@ func (p *pipeline) storeAndDeliverMessage(logger zap.Logger, session *session, t trackerTopic = "dm:" + userID1.String() + ":" + userID2.String() topicBytes = bothUserIDBytes topicType = 0 - case *Topic_Room: + case *TopicId_Room: trackerTopic = "room:" + string(topic.GetRoom()) topicBytes = []byte(topic.GetRoom()) topicType = 1 - case *Topic_GroupId: + case *TopicId_GroupId: trackerTopic = "group:" + uuid.FromBytesOrNil(topic.GetGroupId()).String() topicBytes = topic.GetGroupId() topicType = 2 diff --git a/server/presence_notifier.go b/server/presence_notifier.go index fea1af953..691029987 100644 --- a/server/presence_notifier.go +++ b/server/presence_notifier.go @@ -84,21 +84,21 @@ func (pn *presenceNotifier) HandleDiff(joins, leaves []Presence) { users := strings.SplitN(splitTopic[1], ":", 2) userID1 := uuid.FromStringOrNil(users[0]).Bytes() userID2 := uuid.FromStringOrNil(users[1]).Bytes() - t := &Topic{Id: &Topic_Dm{Dm: append(userID1, userID2...)}} + t := &TopicId{Id: &TopicId_Dm{Dm: append(userID1, userID2...)}} if tls, ok := topicLeaves[topic]; ok { pn.handleDiffTopic(t, to, tjs, tls) } else { pn.handleDiffTopic(t, to, tjs, nil) } case "room": - t := &Topic{Id: &Topic_Room{Room: []byte(splitTopic[1])}} + t := &TopicId{Id: &TopicId_Room{Room: []byte(splitTopic[1])}} if tls, ok := topicLeaves[topic]; ok { pn.handleDiffTopic(t, to, tjs, tls) } else { pn.handleDiffTopic(t, to, tjs, nil) } case "group": - t := &Topic{Id: &Topic_GroupId{GroupId: uuid.FromStringOrNil(splitTopic[1]).Bytes()}} + t := &TopicId{Id: &TopicId_GroupId{GroupId: uuid.FromStringOrNil(splitTopic[1]).Bytes()}} if tls, ok := topicLeaves[topic]; ok { pn.handleDiffTopic(t, to, tjs, tls) } else { @@ -129,13 +129,13 @@ func (pn *presenceNotifier) HandleDiff(joins, leaves []Presence) { users := strings.SplitN(splitTopic[1], ":", 2) userID1 := uuid.FromStringOrNil(users[0]).Bytes() userID2 := uuid.FromStringOrNil(users[1]).Bytes() - t := &Topic{Id: &Topic_Dm{Dm: append(userID1, userID2...)}} + t := &TopicId{Id: &TopicId_Dm{Dm: append(userID1, userID2...)}} pn.handleDiffTopic(t, to, nil, tls) case "room": - t := &Topic{Id: &Topic_Room{Room: []byte(splitTopic[1])}} + t := &TopicId{Id: &TopicId_Room{Room: []byte(splitTopic[1])}} pn.handleDiffTopic(t, to, nil, tls) case "group": - t := &Topic{Id: &Topic_GroupId{GroupId: uuid.FromStringOrNil(splitTopic[1]).Bytes()}} + t := &TopicId{Id: &TopicId_GroupId{GroupId: uuid.FromStringOrNil(splitTopic[1]).Bytes()}} pn.handleDiffTopic(t, to, nil, tls) default: pn.logger.Warn("Skipping presence notifications for unknown topic", zap.Object("topic", topic)) @@ -149,9 +149,9 @@ func (pn *presenceNotifier) handleDiffMatch(matchID []byte, to, joins, leaves [] MatchId: matchID, } if joins != nil { - muJoins := make([]*MatchUser, len(joins)) + muJoins := make([]*UserPresence, len(joins)) for i := 0; i < len(joins); i++ { - muJoins[i] = &MatchUser{ + muJoins[i] = &UserPresence{ UserId: joins[i].UserID.Bytes(), SessionId: joins[i].ID.SessionID.Bytes(), } @@ -159,9 +159,9 @@ func (pn *presenceNotifier) handleDiffMatch(matchID []byte, to, joins, leaves [] msg.Joins = muJoins } if leaves != nil { - muLeaves := make([]*MatchUser, len(leaves)) + muLeaves := make([]*UserPresence, len(leaves)) for i := 0; i < len(leaves); i++ { - muLeaves[i] = &MatchUser{ + muLeaves[i] = &UserPresence{ UserId: leaves[i].UserID.Bytes(), SessionId: leaves[i].ID.SessionID.Bytes(), } @@ -173,14 +173,14 @@ func (pn *presenceNotifier) handleDiffMatch(matchID []byte, to, joins, leaves [] pn.messageRouter.Send(pn.logger, to, &Envelope{Payload: &Envelope_MatchPresence{MatchPresence: msg}}) } -func (pn *presenceNotifier) handleDiffTopic(topic *Topic, to, joins, leaves []Presence) { +func (pn *presenceNotifier) handleDiffTopic(topic *TopicId, to, joins, leaves []Presence) { msg := &TopicPresence{ Topic: topic, } if joins != nil { - tuJoins := make([]*TopicUser, len(joins)) + tuJoins := make([]*UserPresence, len(joins)) for i := 0; i < len(joins); i++ { - tuJoins[i] = &TopicUser{ + tuJoins[i] = &UserPresence{ UserId: joins[i].UserID.Bytes(), SessionId: joins[i].ID.SessionID.Bytes(), } @@ -188,9 +188,9 @@ func (pn *presenceNotifier) handleDiffTopic(topic *Topic, to, joins, leaves []Pr msg.Joins = tuJoins } if leaves != nil { - tuLeaves := make([]*TopicUser, len(leaves)) + tuLeaves := make([]*UserPresence, len(leaves)) for i := 0; i < len(leaves); i++ { - tuLeaves[i] = &TopicUser{ + tuLeaves[i] = &UserPresence{ UserId: leaves[i].UserID.Bytes(), SessionId: leaves[i].ID.SessionID.Bytes(), } -- GitLab